npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@thebutler/pipeline

v0.1.2

Published

Domain-agnostic message-processing pipeline for AI assistants. Used by Jack The Butler for hospitality; designed to be reused for other verticals.

Readme

@thebutler/pipeline

Domain-agnostic message-processing pipeline for AI assistants. Used by Jack The Butler for hospitality; designed to be reused for other verticals (trading, handyman, car dealer, …) without changes to the pipeline code itself.

The package provides:

  • A configurable pipeline runtime (createPipeline)
  • 12 reference stages covering the full inbound → AI response → outbound flow
  • A small, opinionated set of contracts the consumer implements

You provide the what (your domain's intents, prompts, entity model) and the how (your AI provider, storage, logger). The package provides the flow.

Requirements

  • Node.js ≥ 22
  • ESM only. This package ships as an ES module (no CommonJS build). Use it from an ESM project ("type": "module") or a bundler/TS setup that consumes ESM. require('@thebutler/pipeline') will not work.
  • TypeScript types are bundled — no separate @types package needed.

Install

npm install @thebutler/pipeline
# or: pnpm add @thebutler/pipeline
# or: yarn add @thebutler/pipeline

Stability

This package is pre-1.0 (0.x). Per semver, breaking changes can land in any minor release (0.1 → 0.2). Pin a version you control ("@thebutler/pipeline": "0.1.0" or "~0.1.0") and upgrade deliberately until a 1.0.0 stable API is published.

Quick start

First, implement the contracts (full list below). The smallest one is IntentProvider — here it is in full so you can see the shape of a real implementation:

import type { IntentProvider, Intent } from '@thebutler/pipeline';

const INTENTS: Intent[] = [
  {
    name: 'request.support',
    description: 'User is asking for help or has a problem',
    examples: ['my order is broken', 'I need help', 'this is not working'],
    metadata: { priority: 'high' }, // opaque to the pipeline; your stages read it
  },
  {
    name: 'smalltalk',
    description: 'Greetings and chit-chat',
    examples: ['hello', 'how are you'],
  },
];

class MyIntentProvider implements IntentProvider {
  list(): readonly Intent[] {
    return INTENTS;
  }
  get(name: string): Intent | null {
    return INTENTS.find((i) => i.name === name) ?? null;
  }
}

The other contracts (PromptProvider, EntityProvider, AIProvider, ConversationProvider, Logger) follow the same pattern — a small class implementing a typed interface. Then wire them into the pipeline:

import { createPipeline } from '@thebutler/pipeline';

const pipeline = createPipeline({
  intents: new MyIntentProvider(), // domain catalog (shown above)
  prompts: new MyPromptProvider(), // domain prompts
  services: {
    entities: new MyEntityProvider(),
    ai: new MyAIProvider(),
    conversation: new MyConversationProvider(),
    logger: pino(),
    // knowledge:   new MyKnowledgeProvider(),   // optional
    // memory:      new MyMemoryProvider(),      // optional
  },
  // systemLanguage: 'en',                       // optional, defaults to 'en'
  // stages: customStages,                       // optional, defaults to all 12
});

const response = await pipeline.process({
  id: 'msg-123',
  channel: 'whatsapp',
  channelId: '+15551234567',
  content: 'Can I get extra towels?',
  createdAt: new Date(),
});
// → { id, conversationId, content, createdAt, metadata? }

What you implement

The pipeline depends on 6 required + 2 optional contracts.

Required

| Contract | Methods | What it does | | ---------------------- | ------------------------------------------------------------------------------- | ---------------------------------------------------------------- | | IntentProvider | list(), get(name) | Returns the catalog of intents your classifier can pick from. | | PromptProvider | classifier(intents), responder(input), detector(), translator(from, to) | Returns the 4 system prompts your domain uses. | | EntityProvider | resolve(inbound), findById(id) | Resolves the "user" for an inbound message. | | AIProvider | complete(req), embed(req) | Wraps your LLM provider (Anthropic, OpenAI, Bedrock, Ollama, …). | | ConversationProvider | findOrCreate, findById, addMessage, getRecentMessages | Conversation persistence. | | Logger | debug, info, warn, error | Pino-style structured logging. |

Optional

| Contract | When you need it | | ------------------- | ----------------------------------------------------------- | | KnowledgeProvider | If you want RAG. Adds loadKnowledge to the pipeline. | | MemoryProvider | If you want long-term per-user memory. Adds loadMemories. |

Optional services that are missing cause their dependent stages to silently no-op, so you can opt into features incrementally.

The pipeline

12 stages run in this order by default:

1.  resolveConversation       — who is this, where are we
2.  detectLanguage            — BCP-47 code of the inbound
3.  translateInbound          — translate to systemLanguage (skip if same)
4.  loadHistory               — load recent turns for context
5.  saveInboundMessage        — persist what they said
6.  classifyIntent            — match the intent from the catalog
7.  computeEmbedding          — embed for retrieval (skip if no KB+memory)
8.  loadKnowledge             — RAG search (skip if no KB)
9.  loadMemories              — memory recall (skip if no memory)
10. generateResponse          — LLM call with full context
11. translateOutbound         — translate back (skip if same)
12. saveOutboundMessage       — persist response; build OutboundMessage

Every conditional stage no-ops cleanly when its inputs/services are missing, so the same defaults work for a single-language minimal deployment and a full multi-language RAG + memory deployment.

Customizing stages

import { createPipeline, defaultStages, saveInboundMessage } from '@thebutler/pipeline';

// 1. Replace entirely
createPipeline({
  /* ... */
  stages: [resolveConversation, generateResponse, saveOutboundMessage],
});

// 2. Append a custom stage
createPipeline({
  /* ... */
  stages: [...defaultStages, myCustomTelemetryStage],
});

// 3. Remove a specific stage
createPipeline({
  /* ... */
  stages: defaultStages.filter((s) => s !== saveInboundMessage),
});

// 4. Insert a stage at a specific position
createPipeline({
  /* ... */
  stages: [...defaultStages.slice(0, 4), myCustomStage, ...defaultStages.slice(4)],
});

Writing a custom stage

A stage is a typed async function: (ctx, env) => Promise<void>. It reads and mutates the shared ctx, uses env for services/config, and returns nothing. Use the exported Stage type for full type-safety:

import type { Stage } from '@thebutler/pipeline';

// Example 1 — a guard that short-circuits empty messages before any LLM call.
export const rejectEmpty: Stage = async (ctx, env) => {
  if (ctx.inbound.content.trim() !== '') return;

  env.services.logger.warn({ id: ctx.inbound.id }, 'empty message, skipping');

  ctx.outbound = {
    id: ctx.inbound.id,
    conversationId: ctx.conversation?.id ?? 'unknown',
    content: 'Sorry, I received an empty message — could you resend?',
    createdAt: new Date(),
  };
  ctx.done = true; // skip all remaining stages; process() returns ctx.outbound
};

// Example 2 — telemetry after the response is generated. Read-only on ctx.
export const logLatency: Stage = async (ctx, env) => {
  if (!ctx.aiResponse) return; // nothing generated (e.g. earlier short-circuit)
  env.services.logger.info(
    { ms: Date.now() - ctx.startTime, intent: ctx.classification?.intent },
    'pipeline completed'
  );
};

Place either one anywhere using the composition patterns above — [...defaultStages, logLatency] to append, or slice it into a specific position.

Short-circuiting: ctx.done is checked at the top of the stage loop, so setting ctx.done = true lets the current stage finish, then skips all remaining stages. pipeline.process() resolves to the final context with ctx.outbound populated — so a short-circuiting stage must set ctx.outbound first, or process() throws Pipeline finished without producing an outbound message (wrapped in a PipelineError).

Domain-specific context: to carry extra working state, extend MessageContext and type your stages as Stage<MyCtx>. All added fields must be optional (the pipeline starts from a bare { inbound, startTime }):

import type { MessageContext, Stage } from '@thebutler/pipeline';

interface MyCtx extends MessageContext {
  riskScore?: number;
}

const scoreRisk: Stage<MyCtx> = async (ctx, env) => {
  ctx.riskScore = computeRisk(ctx.inbound.content);
};

What's on ctx (MessageContext)

Populated top-to-bottom as the pipeline runs — a field is only set once its producing stage has executed, so check for undefined if your stage runs before that point.

| Field | Type | Set by / when | | --------------------- | -------------------------- | -------------------------------------------------------- | | inbound | InboundMessage | input — always present | | startTime | number | input — ms epoch at run start (for duration metrics) | | conversation | Conversation? | resolveConversation | | entity | Entity \| null? | resolveConversation (null if channel can't identify) | | inboundLanguage | string? | detectLanguage (BCP-47) | | inboundTranslation | string? | translateInbound (only if language ≠ systemLanguage) | | history | readonly Message[]? | loadHistory | | savedInboundId | string? | saveInboundMessage (DB-side id; ≠ inbound.id) | | classification | ClassificationResult? | classifyIntent | | inboundEmbedding | readonly number[]? | computeEmbedding | | knowledgeHits | readonly KnowledgeHit[]? | loadKnowledge | | memoryHits | readonly MemoryHit[]? | loadMemories | | aiResponse | AIResponse? | generateResponse | | outboundTranslation | string? | translateOutbound | | outbound | OutboundMessage? | saveOutboundMessage — the return value of process() | | done | boolean? | any stage — set true to short-circuit |

What's in env

Captured at createPipeline and passed unchanged to every stage:

| Field | Type | What it is | | -------------------- | ---------------- | --------------------------------------------------------------------------------------- | | env.intents | IntentProvider | your intent catalog | | env.prompts | PromptProvider | your domain prompts | | env.services | Services | entities, ai, conversation, logger (required); knowledge, memory (optional) | | env.systemLanguage | string | BCP-47 system language (defaults to 'en') |

Error handling

pipeline.process() throws when:

  • A stage throws an unhandled error
  • The pipeline completes without producing an outbound message

The package has no built-in retry / fallback machinery. Callers wrap with try/catch:

try {
  const response = await pipeline.process(inbound);
  await channel.send(response);
} catch (err) {
  logger.error({ err, inboundId: inbound.id }, 'Pipeline failed');
  // decide what to do — fallback message, retry, alert, etc.
}

Reference stages catch their own LLM/IO errors and log warnings (so a transient API failure on, say, classifyIntent doesn't kill the pipeline; the responder just runs without classification). Custom stages should follow the same pattern if they make external calls.

Multi-language

If systemLanguage differs from the user's detected language:

  1. detectLanguage sets ctx.inboundLanguage.
  2. translateInbound translates the inbound to systemLanguage; classifier/responder/RAG operate on the translation.
  3. translateOutbound translates the AI response back to the user's language.
  4. The persisted outbound contains the user-language text.

For a single-language deployment, the translation stages no-op (when detected language === systemLanguage).

Public surface

import {
  // Pipeline core
  createPipeline,
  type Pipeline,
  type PipelineConfig,
  type Env,
  type Services,
  type Stage,
  type MessageContext,

  // Wire types
  type InboundMessage,
  type OutboundMessage,
  type Conversation,
  type Message,

  // Domain types
  type Entity,
  type Intent,
  type ResponderInput,

  // AI types
  type AIModelTier,
  type AICompletionMessage,
  type AICompletionRequest,
  type AICompletionResult,
  type AIEmbeddingRequest,
  type AIEmbeddingResult,

  // Service contracts
  type EntityProvider,
  type IntentProvider,
  type PromptProvider,
  type AIProvider,
  type ConversationProvider,
  type Logger,
  type KnowledgeProvider,
  type MemoryProvider,

  // Service-related data shapes
  type LogFields,
  type KnowledgeHit,
  type KnowledgeSearchOptions,
  type MemoryHit,
  type NewMemory,
  type MemoryRecallOptions,

  // Inference results
  type ClassificationResult,
  type AIResponse,

  // Reference stages
  resolveConversation,
  detectLanguage,
  translateInbound,
  loadHistory,
  saveInboundMessage,
  classifyIntent,
  computeEmbedding,
  loadKnowledge,
  loadMemories,
  generateResponse,
  translateOutbound,
  saveOutboundMessage,
  defaultStages,
} from '@thebutler/pipeline';

Architecture in one diagram

┌──────────────────────────────────────────────────────────────────┐
│                     YOUR APP                                     │
│  ┌──────────────────────┐    ┌───────────────────────────────┐   │
│  │ Your channel handler │    │  Your service implementations │   │
│  │ (webhook, websocket) │    │  EntityProvider, AIProvider,  │   │
│  └──────────┬───────────┘    │  ConversationProvider, …      │   │
│             │                └──────────────┬────────────────┘   │
│             ▼                               │                    │
│  ┌─────────────────────────────────────────────────────────────┐ │
│  │  @thebutler/pipeline                                        │ │
│  │  createPipeline({ intents, prompts, services }).process()   │ │
│  │                                                             │ │
│  │     ┌──────────────────────────────────────────────┐        │ │
│  │     │ Stage 1 → Stage 2 → … → Stage 12             │        │ │
│  │     │  (calls your services via the contracts)     │        │ │
│  │     └──────────────────────────────────────────────┘        │ │
│  └─────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘

Roadmap (not in V1)

  • Streaming responses (AIProvider.completeStream)
  • Tool use / function calling
  • Stage middleware hooks (before, after, onError)
  • Activity-log telemetry provider
  • Memory-extraction reference stage

Links

  • Source: https://github.com/JackTheButler/JackTheButler/tree/main/packages/pipeline
  • Issues: https://github.com/JackTheButler/JackTheButler/issues

License

Elastic License 2.0 — same as the parent project. Source-available: you may use, copy, modify, and distribute it, but you may not offer it to third parties as a hosted/managed service exposing its substantial functionality, and you may not remove licensing notices. See LICENSE.txt for the full terms.