npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@feedloop/envoy

v0.1.6

Published

## 1. Project Overview

Downloads

17

Readme

Envoy Workflow & Job Scheduler

1. Project Overview

Envoy is a modular, extensible workflow engine and job scheduler for AI use cases built on Node.js, PostgreSQL, and Redis. It enables robust orchestration of complex, multi-step jobs using state machines, with support for parent/child workflows, retries, blocking, and distributed execution.

Key Features:

  • State machine-driven job orchestration
  • Parent/child (spawn) workflows
  • Blocking, waiting, and external event resolution
  • Robust retry and orphaned job cleanup
  • Pluggable state machines and custom job types
  • PostgreSQL for durability, Redis for fast distributed coordination

2. Architecture

  • Scheduler: Orchestrates job execution, manages queues, retries, and blocking.
  • Flow: Defines workflow logic as a series of states and transitions.
  • JobRepo: Persists jobs and their state in PostgreSQL.
  • Redis: Used for distributed job queues, locks, and fast status checks.
[Client/API] → [Scheduler] → [Flow] → [JobRepo (Postgres)]
                                 ↓
                              [Redis]

3. Getting Started

Prerequisites

  • Node.js (v18+ recommended)
  • PostgreSQL (14+)
  • Redis (6+)

Installation

In your project:

npm install @feedloop/envoy

Flow Concept: Minimal Example

A Flow is a workflow defined as a series of named states (nodes) and transitions. Each state can perform logic and decide what state to go to next.

import { StateFlow } from '@feedloop/envoy';

const flow = new StateFlow([
  {
    name: 'Start',
    onState: async ctx => {
      ctx.output('Hello!');
      return ctx;
    },
    router: { next: 'Middle' }
  },
  {
    name: 'Middle',
    onState: async ctx => {
      ctx.output(ctx.output() + ' Middle!');
      return ctx;
    },
    router: { next: 'End' }
  },
  {
    name: 'End',
    onState: async ctx => {
      ctx.output(ctx.output() + ' End!');
      return ctx;
    },
    router: { next: null }
  }
]);

const ctx = await sm.run();
console.log(ctx.output()); // "Hello! Middle! End!"

Routing Between States

Routing determines which state to transition to next after a state handler completes. There are two main approaches:

1. Static Routing

Use router: { next: 'StateName' } to always go to a specific state:

{
  name: 'A',
  onState: async ctx => { /* ... */ return ctx; },
  router: { next: 'B' }
}

2. Dynamic Routing (Router Function)

Use a router function to decide the next state based on context, output, or any logic:

{
  name: 'A',
  onState: async ctx => {
    ctx.output(Math.random());
    return ctx;
  },
  router: {
    // define the routes
    routes: {
      B: {
        description: "route to B"
      },
      C: {
        description: "route to C"
      }
    }
    onRoute: async ctx => {
      const value = ctx.output<number>();
      if (value > 0.5) {
        return 'B';
      } else {
        return 'C';
      }
    }
  }
}
  • You can implement conditional branching, loops, or even end the workflow by returning null.

3. Example: Conditional Branching

{
  name: 'Check',
  onState: async ctx => {
    ctx.output(ctx.input<number>());
    return ctx;
  },
  router: {
    routes: {
      Big: o => o,
      Small: o => o
    }
    onRoute: async ctx => {
      return ctx.output<number>() > 10
        ? 'Big'
        : 'Small'
    }
  }
},
{
  name: 'Big',
  onState: async ctx => { ctx.output('Big number!'); return ctx; },
  router: { next: null }
},
{
  name: 'Small',
  onState: async ctx => { ctx.output('Small number!'); return ctx; },
  router: { next: null }
}

This allows you to build complex, data-driven workflows with flexible transitions between states.


State Handlers: onEnter, onState, onExit

  • onEnter: Runs when entering the state (optional).
  • onState: Main logic for the state (required).
  • onExit: Runs when leaving the state (optional).
{
  name: 'Example',
  onEnter: async ctx => { ctx.set('entered', true); return ctx; },
  onState: async ctx => { /* main logic */ return ctx; },
  onExit: async ctx => { ctx.set('exited', true); return ctx; },
  router: { next: null }
}

The ctx Object

The ctx object (short for "context") is passed to every state handler and provides methods to access and manipulate the state machine's execution context. It allows you to:

  • Access the current state's input and output.
  • Store and retrieve arbitrary data across states.
  • Control execution flow (e.g., spawn child jobs, escalate, wait for events).
  • Track progress, state name, and step count.

Common ctx methods include:

  • ctx.input<T>(): Get the input for this state, optionally typed.
  • ctx.output<T>(value?): Set or get the output for this state.
  • ctx.set(key, value): Store custom data in the context.
  • ctx.get<T>(key): Retrieve custom data from the context.
  • ctx.state(): Get the current state name.
  • ctx.step(): Get the current step number.
  • ctx.done(): Check if the workflow is finished or in a terminal state.
  • ctx.spawn(workflowName, input): Spawn a child workflow/job.
  • ctx.waitFor(waitList): Block until external input or events are received.
  • ctx.escalate(user, message, inputs): Escalate to a human or external system for intervention.

The ctx object is a new context instance, so always return the updated ctx from your handler. immutable by default—modifications return

onState: async ctx => {
  const input = ctx.input<string>();
  ctx.set('foo', 42);
  ctx.output('Result: ' + input);
  return ctx;
}

Spawning Child Jobs

You can spawn child jobs from a parent job and wait for their results:

onState: async ctx => {
  ctx.spawn('childWorkflow', { some: 'input' });
  return ctx;
}
  • The parent job will block until the child job completes, then resume and receive the child's output.

Plugins: Extending Behavior

Plugins allow you to extend or intercept state transitions, add logging, metrics, or custom logic:

const myPlugin = {
  name: 'logger',
  onEnter: (ctx, state) => { console.log('Entering', state); },
  onExit: (ctx, state) => { console.log('Exiting', state); }
};

const sm = new StateFlow([...], { plugins: [myPlugin] });
  • Plugins can hook into onEnter, onExit, and other lifecycle events.

4. Job & Workflow Concepts

  • Job: A unit of work tracked in the database, processed by a state machine.
  • State Machine: Defines the workflow logic for a job, with named states and transitions.
  • Parent/Child Jobs: A job can spawn child jobs and wait for their results.
  • Blocking/Waiting: Jobs can block on external events or child jobs, and resume when unblocked.

Human-in-the-Loop Escalation

  • Escalation: A job can trigger a human escalation, blocking until a human approves or rejects it. Escalations are tracked in the escalations table and can be listed, approved, or rejected via the API or repository.
  • How to trigger: Use ctx.escalate(user, message, inputs) in your state machine to block and require human input. Example:
onState: async ctx => {
  ctx.escalate('alice', 'Approve this action?', [
    { id: 'reason', type: 'select', label: 'Reason', options: { a: 'A', b: 'B' } },
    { id: 'note', type: 'comment', label: 'Note' }
  ]);
  return ctx;
}
  • How to reply: Use scheduler.replyToEscalation(escalationId, { reason: 'a', note: 'Looks good' }, 'approved') to approve, or use 'rejected' to reject. The job will resume or error accordingly.
await scheduler.replyToEscalation(escalationId, { reason: 'a', note: 'OK' }, 'approved');
  • Escalation status: Escalations can be pending, approved, or rejected. The job will unblock and continue after a reply.

  • Retries: Jobs can be retried on failure up to a configurable limit.


5. API Usage

Instantiate scheduler

// To quickly get started, use `initScheduler` to create a scheduler instance with a database and built-in workflows:

import { initScheduler } from '@feedloop/envoy';

const scheduler = await initScheduler({
  redis: {
    host: 'localhost',      // or your Redis host
    port: 6379,             // optional, default 6379
    password: 'yourpassword'// optional
  },
  postgres: {
    host: 'localhost',      // or your Postgres host
    port: 5432,             // optional, default 5432
    password: 'postgres',   // optional
    database: 'postgres'    // optional
  },
  concurrency: 2,           // optional, number of jobs to run in parallel
  maxRetries: 3             // optional, max retries per job
});

Scheduling a Job

const jobId = await scheduler.schedule('myWorkflow', { input: 'data' });

Querying Job Status

const job = await scheduler.getJob(jobId);
console.log(job.status); // 'pending', 'running', 'done', 'failed', etc.

Cancelling or Failing a Job

await scheduler.cancelJob(jobId);
await scheduler.failJob(jobId, 'reason');

Resolving a Blocking Job

await scheduler.resolveBlockingJob(jobId, 'waitForId', { result: 42 });

6. Implementing a State Machine (Example: ToolAgent)

You can implement custom state machines for your workflows. Here's an example using the built-in ToolAgent, which plans and executes tool calls using an LLM:

import { ToolAgent, OpenAIProvider } from '@feedloop/envoy';

// Define your tools
const tools = [
  {
    name: 'add',
    description: 'Add two numbers',
    parameters: { a: 'number', b: 'number' },
    handler: async ({ a, b }) => a + b,
  },
  {
    name: 'echo',
    description: 'Echo a string',
    parameters: { text: 'string' },
    handler: async ({ text }) => text,
  },
];

// Create an LLM provider (e.g., OpenAI)
const provider = new OpenAIProvider({ apiKey: process.env.OPENAI_API_KEY! });

// Instantiate the ToolAgent state machine
const agent = new ToolAgent(provider, {
  tools,
  maxSteps: 10,
});

const ctx = await agent.run('What is 2 + 2?');
console.log('Final output:', ctx.output());
  • Define tools with a name, description, parameters, and handler function.
  • Instantiate ToolAgent with your tools and an LLM provider.
  • Run the agent with a user task; the agent will plan, call tools, and return the result.

You can implement your own state machines by extending StateFlow and defining custom states and transitions for your workflow needs.


7. Contributing

  • Fork the repo and create a feature branch.
  • Write tests for new features or bug fixes.
  • Follow code style and submit a pull request.

8. License

MIT