npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@egintegrations/workflow

v0.1.0

Published

Workflow orchestration library with state machines, queue services, and ETL framework for building complex workflows and background job processing.

Readme

@egintegrations/workflow

Workflow orchestration library with state machines, queue services, and ETL framework for building complex workflows and background job processing.

Features

  • State Machine: Generic state machine with guards, actions, and history tracking
  • Queue Service: Background job queue with priorities, retries, and concurrency control
  • ETL Framework: Extract-Transform-Load pipelines with batch processing and progress tracking

Installation

npm install @egintegrations/workflow

Usage

State Machine

Create workflows with state transitions, guards, and actions:

import { StateMachine } from '@egintegrations/workflow';

enum OrderState {
  DRAFT = 'DRAFT',
  SUBMITTED = 'SUBMITTED',
  APPROVED = 'APPROVED',
  REJECTED = 'REJECTED',
}

enum OrderEvent {
  SUBMIT = 'SUBMIT',
  APPROVE = 'APPROVE',
  REJECT = 'REJECT',
}

const orderMachine = new StateMachine({
  initialState: OrderState.DRAFT,
  transitions: [
    { from: OrderState.DRAFT, to: OrderState.SUBMITTED, event: OrderEvent.SUBMIT },
    {
      from: OrderState.SUBMITTED,
      to: OrderState.APPROVED,
      event: OrderEvent.APPROVE,
      guard: (ctx) => ctx.amount < 10000, // Only auto-approve small orders
      action: (from, to, event, ctx) => {
        console.log(`Order ${ctx.orderId} approved`);
      },
    },
    { from: OrderState.SUBMITTED, to: OrderState.REJECTED, event: OrderEvent.REJECT },
  ],
  context: { orderId: '123', amount: 5000 },
  enableHistory: true,
  onTransition: (from, to, event, ctx) => {
    console.log(`Transitioned from ${from} to ${to}`);
  },
});

// Check if transition is possible
if (orderMachine.can(OrderEvent.SUBMIT)) {
  await orderMachine.transition(OrderEvent.SUBMIT);
}

console.log(orderMachine.getCurrentState()); // SUBMITTED
console.log(orderMachine.getHistory()); // Array of state transitions

State Machine Features

  • Generic Types: Type-safe states, events, and context
  • Guards: Conditional transitions based on context
  • Actions: Execute code during transitions
  • History: Track all state transitions with timestamps
  • Async Support: Guards and actions can be async
  • Context Management: Store and update workflow data

Queue Service

Process background jobs with priorities, retries, and concurrency control:

import { InMemoryQueue } from '@egintegrations/workflow';

const queue = new InMemoryQueue({
  pollInterval: 1000, // Check for jobs every second
  concurrency: 5, // Process up to 5 jobs concurrently
  enableDeadLetter: true, // Move failed jobs to dead letter queue
});

// Register job handler
queue.process<{ email: string; subject: string }>('send-email', async (job) => {
  console.log(`Sending email to ${job.data.email}`);
  await sendEmail(job.data.email, job.data.subject);
});

// Enqueue jobs
const jobId = await queue.enqueue(
  'send-email',
  {
    email: '[email protected]',
    subject: 'Welcome!',
  },
  {
    priority: 10, // Higher priority jobs run first
    maxRetries: 3, // Retry up to 3 times on failure
  }
);

// Check job status
const job = await queue.getStatus(jobId);
console.log(job?.status); // PENDING, PROCESSING, COMPLETED, FAILED, CANCELLED

// Manage jobs
await queue.cancel(jobId); // Cancel a pending job
await queue.retry(failedJobId); // Retry a failed job
await queue.clearCompleted(); // Remove completed jobs
await queue.stop(); // Stop processing jobs

Queue Features

  • Priority Queuing: Higher priority jobs run first
  • Automatic Retries: Configurable retry attempts with exponential backoff
  • Concurrency Control: Limit concurrent job execution
  • Job Management: Cancel, retry, and query jobs
  • Status Tracking: Track job lifecycle (PENDING → PROCESSING → COMPLETED/FAILED)
  • Dead Letter Queue: Failed jobs moved to separate queue
  • Type Safety: Typed job data and handlers

ETL Pipeline

Build data extraction, transformation, and loading pipelines:

import {
  ETLPipeline,
  ArrayExtractor,
  FunctionTransformer,
  ArrayLoader,
} from '@egintegrations/workflow';

interface User {
  id: number;
  name: string;
  email: string;
  age: number;
}

interface TransformedUser {
  userId: number;
  displayName: string;
  contact: string;
}

const sourceData: User[] = [
  { id: 1, name: 'Alice', email: '[email protected]', age: 30 },
  { id: 2, name: 'Bob', email: '[email protected]', age: 25 },
];

const pipeline = new ETLPipeline({
  name: 'user-import',
  extractor: new ArrayExtractor(sourceData),
  transformer: new FunctionTransformer<User, TransformedUser>((user) => ({
    userId: user.id,
    displayName: user.name.toUpperCase(),
    contact: user.email,
  })),
  loader: new ArrayLoader<TransformedUser>(),
  batchSize: 100, // Process in batches of 100
  onProgress: (stats) => {
    console.log(`Loaded ${stats.loaded}/${stats.extracted} records`);
  },
  onError: (error, record) => {
    console.error(`Failed to process record:`, error);
  },
});

const result = await pipeline.run();
console.log(result);
// {
//   pipelineName: 'user-import',
//   status: 'completed',
//   extracted: 2,
//   transformed: 2,
//   filtered: 0,
//   loaded: 2,
//   failed: 0,
//   duration: 15,
//   startTime: Date,
//   endTime: Date
// }

Built-in Extractors

  • ArrayExtractor: Extract from in-memory array
  • JSONFileExtractor: Extract from JSON file
  • FunctionExtractor: Extract using custom function

Built-in Transformers

  • IdentityTransformer: Pass data through unchanged
  • FunctionTransformer: Transform using custom function
  • FilterTransformer: Filter records based on predicate
  • MapTransformer: Map fields from input to output
  • ComposeTransformer: Chain multiple transformers

Built-in Loaders

  • ArrayLoader: Load into in-memory array
  • JSONFileLoader: Load into JSON file
  • ConsoleLoader: Log records to console (debugging)
  • FunctionLoader: Load using custom function
  • BatchCallbackLoader: Execute callback for each batch

Custom ETL Components

Create custom extractors, transformers, and loaders:

import { Extractor, Transformer, Loader } from '@egintegrations/workflow';

// Custom extractor
class DatabaseExtractor implements Extractor<User> {
  async extract(): Promise<User[]> {
    return db.query('SELECT * FROM users');
  }
}

// Custom transformer
class UppercaseNameTransformer implements Transformer<User, User> {
  async transform(user: User): Promise<User> {
    return { ...user, name: user.name.toUpperCase() };
  }
}

// Custom loader
class DatabaseLoader implements Loader<User> {
  async load(users: User[]): Promise<number> {
    await db.insertMany(users);
    return users.length;
  }
}

API Documentation

StateMachine<S, E, C>

Constructor

new StateMachine({
  initialState: S,
  transitions: Transition<S, E, C>[],
  context?: C,
  onTransition?: (from: S, to: S, event: E, context: C) => void | Promise<void>,
  enableHistory?: boolean,
})

Methods

  • getCurrentState(): S - Get current state
  • getContext(): C | undefined - Get context
  • setContext(context: C): void - Update context
  • can(event: E): boolean - Check if transition is possible
  • getPossibleEvents(): E[] - Get all possible events from current state
  • transition(event: E): Promise<S> - Execute state transition
  • getHistory(): StateHistoryEntry<S, E>[] - Get transition history
  • clearHistory(): void - Clear history
  • reset(initialState?: S): void - Reset to initial state

InMemoryQueue

Constructor

new InMemoryQueue({
  pollInterval?: number, // Default: 1000ms
  concurrency?: number, // Default: 5
  enableDeadLetter?: boolean, // Default: false
})

Methods

  • enqueue<T>(type: string, data: T, options?: EnqueueOptions): Promise<string> - Add job to queue
  • process<T>(type: string, handler: JobHandler<T>): void - Register job handler
  • getStatus(jobId: string): Promise<Job | null> - Get job status
  • cancel(jobId: string): Promise<boolean> - Cancel pending job
  • retry(jobId: string): Promise<boolean> - Retry failed job
  • getJobsByStatus(status: JobStatus): Promise<Job[]> - Get jobs by status
  • clearCompleted(olderThan?: Date): Promise<number> - Clear completed jobs
  • stop(): Promise<void> - Stop processing

ETLPipeline<TExtract, TTransform>

Constructor

new ETLPipeline({
  name: string,
  extractor: Extractor<TExtract>,
  transformer: Transformer<TExtract, TTransform>,
  loader: Loader<TTransform>,
  batchSize?: number, // Default: 100
  onProgress?: (stats: ETLProgress) => void,
  onError?: (error: Error, record?: any) => void,
})

Methods

  • run(): Promise<ETLResult> - Execute the ETL pipeline

Source Project

Extracted from FTA:

  • State Machine: backend/app/services/state_machine.py - Generic state machine pattern
  • Queue Service: backend/app/services/queue.py - Background job queue pattern
  • ETL Framework: backend/app/services/etl.py - Extract-Transform-Load pattern

Refactored from Python to TypeScript with:

  • Generic type support
  • Pluggable extractors, transformers, and loaders
  • Modern async/await patterns
  • Comprehensive test coverage

Related Packages

  • @egintegrations/core-utils - Retry logic, error handling, health checks

License

MIT

Contributing

See the main egi-comp-library repository for contribution guidelines.