npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

neuroline

v0.10.2

Published

Framework-agnostic pipeline orchestration with typed jobs and pluggable storage.

Downloads

144

Readme

Framework-agnostic pipeline orchestration library with typed jobs and pluggable storage.

English | Русский

Neuroline

Demo GitHub

Framework-agnostic pipeline orchestration library with support for:

  • Sequential and parallel job execution
  • Persistent state storage (MongoDB, in-memory, or custom)
  • Type-safe jobs with synapses for data transformation
  • Cacheable jobs (skip re-execution on identical input)
  • Idempotency (re-running with same input data returns existing pipeline)

Changelog

Detailed release notes and migration steps: CHANGELOG.

Installation

yarn add neuroline

For MongoDB storage:

yarn add neuroline mongoose

Quick Start

1. Creating a Job

A Job is a pure function with a defined interface:

import type { JobDefinition, JobContext } from 'neuroline';

interface MyJobInput {
  url: string;
}

interface MyJobOutput {
  data: string;
  fetchedAt: Date;
}

interface MyJobOptions {
  timeout?: number;
}

export const fetchDataJob: JobDefinition<MyJobInput, MyJobOutput, MyJobOptions> = {
  name: 'fetch-data',

  async execute(input, options, ctx) {
    ctx.logger.info('Fetching data', { url: input.url });

    const timeout = options?.timeout ?? 5000;
    const response = await fetch(input.url, { signal: AbortSignal.timeout(timeout) });
    const data = await response.text();

    ctx.logger.info('Data fetched', { length: data.length });

    return {
      data,
      fetchedAt: new Date(),
    };
  },
};

2. Pipeline Configuration

import type { PipelineConfig } from 'neuroline';
import { fetchDataJob, processDataJob, saveResultJob } from './jobs';

interface PipelineInput {
  url: string;
  userId: string;
}

export const myPipelineConfig: PipelineConfig<PipelineInput> = {
  name: 'my-neuroline',

  stages: [
    // Stage 1: single job
    fetchDataJob,

    // Stage 2: two jobs execute in parallel
    [
      {
        job: processDataJob,
        // synapses transform data for the job
        synapses: (ctx) => ({
          rawData: ctx.getArtifact<{ data: string }>('fetch-data')?.data ?? '',
          userId: ctx.pipelineInput.userId,
        }),
      },
      {
        job: notifyJob,
        synapses: (ctx) => ({
          userId: ctx.pipelineInput.userId,
          message: 'Processing started',
        }),
      },
    ],

    // Stage 3: final job
    {
      job: saveResultJob,
      synapses: (ctx) => ({
        processedData: ctx.getArtifact('process-data'),
        userId: ctx.pipelineInput.userId,
      }),
    },
  ],

  // Optional: custom hash function
  computeInputHash: (input) => `${input.userId}-${input.url}`,
};

PipelineConfig<PipelineInput> types both computeInputHash(input) and ctx.pipelineInput in every synapses.

3. Creating and Using PipelineManager

With In-Memory Storage (for testing)

import { PipelineManager, InMemoryPipelineStorage } from 'neuroline';
import { myPipelineConfig } from './pipelines';

const storage = new InMemoryPipelineStorage();
const manager = new PipelineManager({
  storage,
  logger: console, // or your logger
});

// Register pipeline
manager.registerPipeline(myPipelineConfig);

// Start pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-neuroline', {
  data: { url: 'https://api.example.com/data', userId: 'user-123' },
  jobOptions: {
    'fetch-data': { timeout: 10000 },
  },
});

// Poll status
const status = await manager.getStatus(pipelineId);
console.log(status);
// { status: 'processing', currentJobIndex: 1, totalJobs: 4, currentJobName: 'process-data' }

// Get result (artifact) of a specific job (default = last job)
const result = await manager.getResult(pipelineId);
console.log(result);
// { pipelineId: '...', jobName: 'save-result', status: 'done', artifact: {...} }

// Get result (artifact) by job name
const computeResult = await manager.getResult(pipelineId, 'process-data');
console.log(computeResult);
// { pipelineId: '...', jobName: 'process-data', status: 'done', artifact: {...} }

With MongoDB Storage

import mongoose from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';

// Create model
const PipelineModel = mongoose.model('Pipeline', PipelineSchema);

// Create manager
const storage = new MongoPipelineStorage(PipelineModel);
const manager = new PipelineManager({ storage, logger: console });

manager.registerPipeline(myPipelineConfig);

API Reference

PipelineManager

constructor(options: PipelineManagerOptions)

interface PipelineManagerOptions {
  storage: PipelineStorage; // Required
  logger?: JobLogger; // Optional
}

registerPipeline(config: PipelineConfig): void

Registers a pipeline configuration. Must be called before startPipeline.

startPipeline<TData>(pipelineType: string, input: PipelineInput<TData>): Promise<StartPipelineResponse>

Starts a pipeline or returns existing one (if found by input data hash).

interface PipelineInput<TData> {
  data: TData; // Input data
  jobOptions?: Record<string, unknown>; // Options for jobs (key = job name)
}

interface StartPipelineResponse {
  pipelineId: string; // ID for polling
  isNew: boolean; // true if created, false if already existed
}

getStatus(pipelineId: string): Promise<PipelineStatusResponse>

Returns current pipeline status.

interface PipelineStatusResponse {
  status: 'processing' | 'awaiting_manual' | 'done' | 'error';
  currentJobIndex: number;
  totalJobs: number;
  currentJobName?: string;
  error?: { message: string; jobName?: string };
}

runManualJob(pipelineId: string, jobName: string, options?: StartPipelineOptions): Promise<void>

Triggers a manual job (changes status from awaiting_manual to pending and resumes pipeline execution).

getResult(pipelineId: string, jobName?: string): Promise<PipelineResultResponse>

Returns result (artifact) for a single job. If jobName is not provided, returns the last job result.

interface PipelineResultResponse {
  pipelineId: string;
  jobName: string;
  status: 'pending' | 'awaiting_manual' | 'processing' | 'done' | 'error';
  artifact: unknown | null | undefined; // undefined = not yet executed, null = executed but no result
}

getPipeline(pipelineId: string): Promise<PipelineState | null>

Returns full pipeline state (for debugging).

JobDefinition

interface JobDefinition<TInput, TOutput, TOptions> {
  name: string;
  execute: (
    input: TInput,
    options: TOptions | undefined,
    context: JobContext
  ) => Promise<TOutput | null>;
}

JobContext

interface JobContext {
  pipelineId: string;
  jobIndex: number;
  logger: {
    info: (msg: string, data?: Record<string, unknown>) => void;
    error: (msg: string, data?: Record<string, unknown>) => void;
    warn: (msg: string, data?: Record<string, unknown>) => void;
  };
}

PipelineConfig

interface PipelineConfig<TInput> {
  name: string;
  stages: PipelineStage<TInput>[];
  computeInputHash?: (input: TInput) => string;
}

// Stage: single job or array of jobs (parallel)
type PipelineStage<TPipelineInput> = StageItem<TPipelineInput> | StageItem<TPipelineInput>[];

// StageItem: JobDefinition or JobInPipeline
type StageItem<TPipelineInput> =
  | JobDefinition<any, any, any>
  | JobInPipeline<TPipelineInput, any, any, any>;

interface JobInPipeline<TPipelineInput, TInput, TOutput, TOptions> {
  job: JobDefinition<TInput, TOutput, TOptions>;
  synapses?: (ctx: SynapseContext<TPipelineInput>) => TInput;
  retries?: number; // Количество ретраев при ошибке (по умолчанию: 0)
  retryDelay?: number; // Задержка между ретраями в мс (по умолчанию: 1000)
  manual?: boolean; // Job requires manual trigger (status: awaiting_manual)
}

SynapseContext

Context for synapses function:

interface SynapseContext<TPipelineInput> {
  pipelineInput: TPipelineInput;
  getArtifact: <T>(jobName: string) => T | undefined;
}

Storage

InMemoryPipelineStorage

Built-in in-memory storage for testing and prototyping.

import { InMemoryPipelineStorage } from 'neuroline';

const storage = new InMemoryPipelineStorage();

// For testing
storage.clear(); // Clear all data
storage.getAll(); // Get all pipelines

MongoPipelineStorage

MongoDB storage (requires mongoose as peer dependency).

import mongoose from 'mongoose';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';

const PipelineModel = mongoose.model('Pipeline', PipelineSchema);
const storage = new MongoPipelineStorage(PipelineModel);

Custom Storage

Implement the PipelineStorage interface:

import type { PipelineStorage, PipelineState, JobStatus, PipelineStatus } from 'neuroline';

class MyCustomStorage implements PipelineStorage {
    async findById(pipelineId: string): Promise<PipelineState | null> { ... }
    async findAll(params?: PaginationParams): Promise<PaginatedResult<PipelineState>> { ... }
    async create(state: PipelineState): Promise<PipelineState> { ... }
    async delete(pipelineId: string): Promise<boolean> { ... }
    async updateStatus(pipelineId: string, status: PipelineStatus): Promise<void> { ... }
    async updateJobStatus(pipelineId: string, jobIndex: number, status: JobStatus, startedAt?: Date): Promise<void> { ... }
    async updateJobArtifact(pipelineId: string, jobIndex: number, artifact: unknown, finishedAt: Date): Promise<void> { ... }
    async appendJobError(pipelineId: string, jobIndex: number, error: JobError, isFinal: boolean, finishedAt?: Date): Promise<void> { ... }
    async updateCurrentJobIndex(pipelineId: string, jobIndex: number): Promise<void> { ... }
    async updateJobInput(pipelineId: string, jobIndex: number, input: unknown, options?: unknown): Promise<void> { ... }
    async updateJobRetryCount(pipelineId: string, jobIndex: number, retryCount: number, maxRetries: number): Promise<void> { ... }
    async findAndTimeoutStaleJobs(timeoutMs?: number): Promise<number> { ... }  // For stale jobs watchdog
}

NestJS Integration

import { Module, OnModuleInit } from '@nestjs/common';
import { MongooseModule, InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';

@Module({
  imports: [MongooseModule.forFeature([{ name: 'Pipeline', schema: PipelineSchema }])],
})
export class PipelineModule implements OnModuleInit {
  private manager: PipelineManager;

  constructor(
    @InjectModel('Pipeline') private pipelineModel: Model<any>,
    private logger: Logger
  ) {
    const storage = new MongoPipelineStorage(this.pipelineModel);
    this.manager = new PipelineManager({
      storage,
      logger: {
        info: (msg, data) => this.logger.log({ msg, ...data }),
        error: (msg, data) => this.logger.error({ msg, ...data }),
        warn: (msg, data) => this.logger.warn({ msg, ...data }),
      },
    });
  }

  onModuleInit() {
    this.manager.registerPipeline(myPipelineConfig);
  }
}

Stages and Parallel Execution

Pipeline
├── Stage 1: [jobA]                    ← sequential
├── Stage 2: [jobB, jobC, jobD]        ← parallel within stage
├── Stage 3: [jobE]                    ← sequential
└── Stage 4: [jobF, jobG]              ← parallel within stage
  • Stages execute sequentially (Stage 2 starts only after Stage 1 completes)
  • Jobs within a stage execute in parallel
  • If any job in a stage fails (after all retries), the entire pipeline is marked as error

Retry Mechanism

Jobs can be configured to automatically retry on failure:

const config: PipelineConfig = {
    name: 'my-pipeline',
    stages: [
        // Job with 2 retries and 1.5s delay between attempts
        {
            job: unstableJob,
            synapses: (ctx) => ({ ... }),
            retries: 2,        // Will try up to 3 times (1 initial + 2 retries)
            retryDelay: 1500,  // Wait 1.5 seconds between retries
        },
        // Job without retries (default behavior)
        normalJob,
    ],
};
  • retries: Number of additional attempts after initial failure (default: 0)
  • retryDelay: Delay in milliseconds before each retry (default: 1000)
  • retryCount and maxRetries are tracked in job state for monitoring

Manual Jobs

Jobs can be marked as manual to pause pipeline execution until triggered:

const config: PipelineConfig = {
  name: 'my-pipeline',
  stages: [initJob, { job: approvalJob, synapses: toApproval, manual: true }, finalizeJob],
};

When a stage with manual jobs is reached, automatic jobs execute normally. Once only manual jobs remain, the pipeline transitions to awaiting_manual status.

// Trigger a manual job
await manager.runManualJob(pipelineId, 'approval');

// Manual jobs can also be promoted before their stage is reached
// They will execute automatically when the stage starts
await manager.runManualJob(pipelineId, 'approval');
  • manual: true in JobInPipeline — job starts with awaiting_manual status
  • Mixed stages (manual + automatic jobs) are supported
  • Pipeline status becomes awaiting_manual when waiting for manual trigger
  • Stale jobs watchdog ignores awaiting_manual pipelines and jobs

Client API:

// Run manual job and resume polling
await client.runManualJob(pipelineId, 'approval');

// Or with polling in one call
const polling = await client.runManualJobAndPoll(pipelineId, 'approval', onUpdate, onError);

Cacheable Jobs

Jobs can be marked as cacheable to skip re-execution when the same input has been processed before. The cache key is { jobName, inputHash }, where inputHash = SHA-256(jobName + input + options).

const config: PipelineConfig = {
  name: 'my-pipeline',
  stages: [
    // Cacheable job — result is stored in cache after first execution
    { job: expensiveJob, synapses: toExpensive, cacheable: true },
    // Regular job — always executes
    normalJob,
  ],
};
  • cacheable: true in JobInPipeline — enables caching for the job
  • Cache is global by jobName — shared across all pipelines that use the same job (job name = implementation)
  • Cache is independent of pipeline lifecycle — deleting a pipeline does not clear the cache
  • restartPipelineFromJob bypasses the cache for the restarted job and updates it with the new result
  • Storage implementations:
    • InMemoryPipelineStorage — cache stored in a Map
    • MongoPipelineStorage — cache stored in a separate job_caches collection

Cache invalidation: if the execute implementation changes, the cache must be cleared manually (drop the job_caches collection or call storage.clear() for in-memory).

Stale Jobs Watchdog

When a process crashes during job execution, the job may remain in processing status forever ("stale job"). The watchdog monitors and automatically times out such jobs.

const manager = new PipelineManager({ storage, logger });

// Start watchdog (checks every minute, times out jobs after 20 minutes)
manager.startStaleJobsWatchdog({
  checkIntervalMs: 60_000, // Check every 1 minute
  jobTimeoutMs: 20 * 60_000, // Timeout after 20 minutes
  onStaleJobsFound: (count) => console.log(`Timed out ${count} stale jobs`),
});

// Stop watchdog on shutdown
manager.stopStaleJobsWatchdog();

// Check if watchdog is running
manager.isWatchdogRunning();

// Manual check (useful for testing)
const timedOutCount = await manager.timeoutStaleJobs();

StaleJobsWatchdogOptions

interface StaleJobsWatchdogOptions {
  checkIntervalMs?: number; // Default: 60000 (1 minute)
  jobTimeoutMs?: number; // Default: 1200000 (20 minutes)
  onStaleJobsFound?: (count: number) => void;
}

Idempotency

Pipelines are identified by input data hash:

// First call — creates pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-pipeline', {
  data: { url: 'https://example.com' },
});
// isNew = true

// Repeated call with same data — returns existing pipeline
const result2 = await manager.startPipeline('my-pipeline', {
  data: { url: 'https://example.com' },
});
// result2.pipelineId === pipelineId
// result2.isNew = false

For custom hashing:

const config: PipelineConfig<MyInput> = {
    name: 'my-pipeline',
    stages: [...],
    computeInputHash: (input) => `${input.userId}-${input.date}`,
};

Invalidation on Configuration Changes

When the pipeline structure changes (adding/removing/renaming jobs), old records are automatically invalidated:

// Version 1: pipeline with two jobs
const configV1: PipelineConfig = {
  name: 'my-pipeline',
  stages: [jobA, jobB],
};
manager.registerPipeline(configV1);

// Start — creates record in storage
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// Pipeline saved with configHash = hash(['jobA', 'jobB'])

// Version 2: added jobC
const configV2: PipelineConfig = {
  name: 'my-pipeline',
  stages: [jobA, jobB, jobC],
};
manager.registerPipeline(configV2);

// Start with same data
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// configHash changed → old record deleted → pipeline starts fresh

This is useful when:

  • Adding new jobs to pipeline
  • Removing obsolete jobs
  • Changing execution order
  • Renaming jobs

Types

All types are available for import:

import type {
  // Job
  JobDefinition,
  JobContext,
  JobLogger,
  JobStatus,
  JobState, // JobState<TInput, TOutput, TOptions> with generics

  // Pipeline
  PipelineConfig,
  PipelineStage,
  PipelineInput,
  PipelineStatus,
  PipelineState,
  JobError,
  JobError,

  // Synapse
  SynapseContext,
  JobInPipeline,
  StageItem,

  // Responses
  StartPipelineResponse,
  PipelineStatusResponse,
  PipelineResultResponse,

  // Storage
  PipelineStorage,
  PaginatedResult,
  PaginationParams,

  // Watchdog
  StaleJobsWatchdogOptions,
} from 'neuroline';

// MongoDB types (separate import)
import type { MongoPipelineDocument, MongoPipelineJobState } from 'neuroline/mongo';

JobState with Generics

JobState now supports generic types for input, output (artifact), and options:

interface JobError {
  message: string;
  stack?: string;
  attempt?: number;
  logs?: string[];
  data?: unknown;
}

interface JobState<TInput = unknown, TOutput = unknown, TOptions = unknown> {
  name: string;
  status: JobStatus;
  input?: TInput; // Input data (computed by synapses)
  options?: TOptions; // Job options (from jobOptions)
  artifact?: TOutput; // Output data (result of execute)
  errors: JobError[]; // Error history (empty array when no errors)
  startedAt?: Date;
  finishedAt?: Date;
}

Client API (neuroline/client)

Client module for browser-side interaction with Pipeline API.

Note: One client = one pipeline. The URL determines which pipeline to run.

PipelineClient

import { PipelineClient } from 'neuroline/client';

// Client for a specific pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });

// Start pipeline
const { pipelineId, isNew } = await client.start({
  input: { userId: 123 },
  jobOptions: { 'fetch-data': { timeout: 5000 } },
});

// Get status
const status = await client.getStatus(pipelineId);

// Get result (artifact) of the last job
const result = await client.getResult(pipelineId);

// Get job details (input, options, artifact)
const jobDetails = await client.getJobDetails(pipelineId, 'fetch-data');

// Trigger a manual job
await client.runManualJob(pipelineId, 'approval');

// Trigger a manual job and poll until pipeline completes
const polling = await client.runManualJobAndPoll(pipelineId, 'approval', onUpdate, onError);

Polling

// Manual polling
const { stop, completed } = client.poll(pipelineId, (event) => {
  console.log('Status:', event.status.status);
});

// Wait for completion
const finalEvent = await completed;

// Or stop polling manually
stop();

Start with Polling

// Client for specific pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });

// Start pipeline and immediately begin polling
const { pipelineId, stop, completed } = await client.startAndPoll(
  {
    input: { url: 'https://example.com' },
  },
  (event) => {
    // Called on each poll
    console.log('Progress:', event.status.currentJobIndex, '/', event.status.totalJobs);
  },
  (error) => {
    // Called on error
    console.error('Pipeline error:', error);
  }
);

React Hook Factory

import { useState, useCallback, useEffect, useRef, useMemo } from 'react';
import { createUsePipelineHook, PipelineClient } from 'neuroline/client';

// Create hook with React dependencies
const usePipeline = createUsePipelineHook({ useState, useCallback, useEffect, useRef });

// In component
function MyComponent() {
  // One client per pipeline
  const client = useMemo(() => new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' }), []);
  const { start, status, isRunning, error } = usePipeline(client);

  const handleStart = async () => {
    await start({ input: { userId: 123 } });
  };

  return (
    <div>
      <button onClick={handleStart} disabled={isRunning}>Start</button>
      {status && <div>Status: {status.status}</div>}
    </div>
  );
}

Exports

| Import path | Contents | | ------------------ | ----------------------------------------------------------------- | | neuroline | Core: types, PipelineManager, InMemoryPipelineStorage | | neuroline/mongo | MongoDB: MongoPipelineStorage, PipelineSchema, document types | | neuroline/client | Client: PipelineClient, createUsePipelineHook, types |

License

UNLICENSED


Neuroline

Demo GitHub

Фреймворк-агностик библиотека для оркестрации пайплайнов с поддержкой:

  • Последовательного и параллельного выполнения jobs
  • Персистентного хранения состояния (MongoDB, in-memory, или кастомное)
  • Типобезопасных jobs с synapses для трансформации данных
  • Кеширования jobs (пропуск повторного выполнения при идентичных входных данных)
  • Идемпотентности (повторный запуск с теми же входными данными возвращает существующий pipeline)

Changelog

Подробные заметки о релизах и миграции: CHANGELOG.

Установка

yarn add neuroline

Для MongoDB хранилища:

yarn add neuroline mongoose

Быстрый старт

1. Создание Job

Job — это чистая функция с определённым интерфейсом:

import type { JobDefinition, JobContext } from 'neuroline';

interface MyJobInput {
  url: string;
}

interface MyJobOutput {
  data: string;
  fetchedAt: Date;
}

interface MyJobOptions {
  timeout?: number;
}

export const fetchDataJob: JobDefinition<MyJobInput, MyJobOutput, MyJobOptions> = {
  name: 'fetch-data',

  async execute(input, options, ctx) {
    ctx.logger.info('Fetching data', { url: input.url });

    const timeout = options?.timeout ?? 5000;
    const response = await fetch(input.url, { signal: AbortSignal.timeout(timeout) });
    const data = await response.text();

    ctx.logger.info('Data fetched', { length: data.length });

    return {
      data,
      fetchedAt: new Date(),
    };
  },
};

2. Конфигурация Pipeline

import type { PipelineConfig } from 'neuroline';
import { fetchDataJob, processDataJob, saveResultJob } from './jobs';

interface PipelineInput {
  url: string;
  userId: string;
}

export const myPipelineConfig: PipelineConfig<PipelineInput> = {
  name: 'my-neuroline',

  stages: [
    // Stage 1: одна job
    fetchDataJob,

    // Stage 2: две jobs выполняются параллельно
    [
      {
        job: processDataJob,
        // synapses трансформирует данные для job
        synapses: (ctx) => ({
          rawData: ctx.getArtifact<{ data: string }>('fetch-data')?.data ?? '',
          userId: ctx.pipelineInput.userId,
        }),
      },
      {
        job: notifyJob,
        synapses: (ctx) => ({
          userId: ctx.pipelineInput.userId,
          message: 'Processing started',
        }),
      },
    ],

    // Stage 3: финальная job
    {
      job: saveResultJob,
      synapses: (ctx) => ({
        processedData: ctx.getArtifact('process-data'),
        userId: ctx.pipelineInput.userId,
      }),
    },
  ],

  // Опционально: кастомная функция хеширования
  computeInputHash: (input) => `${input.userId}-${input.url}`,
};

PipelineConfig<PipelineInput> типизирует и computeInputHash(input), и ctx.pipelineInput во всех synapses.

3. Создание и использование PipelineManager

С In-Memory хранилищем (для тестов)

import { PipelineManager, InMemoryPipelineStorage } from 'neuroline';
import { myPipelineConfig } from './pipelines';

const storage = new InMemoryPipelineStorage();
const manager = new PipelineManager({
  storage,
  logger: console, // или ваш логгер
});

// Регистрация pipeline
manager.registerPipeline(myPipelineConfig);

// Запуск pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-neuroline', {
  data: { url: 'https://api.example.com/data', userId: 'user-123' },
  jobOptions: {
    'fetch-data': { timeout: 10000 },
  },
});

// Polling статуса
const status = await manager.getStatus(pipelineId);
console.log(status);
// { status: 'processing', currentJobIndex: 1, totalJobs: 4, currentJobName: 'process-data' }

// Получение результата (артефакта) конкретной job (по умолчанию — последней)
const result = await manager.getResult(pipelineId);
console.log(result);
// { pipelineId: '...', jobName: 'save-result', status: 'done', artifact: {...} }

// Получение результата (артефакта) по имени job
const computeResult = await manager.getResult(pipelineId, 'process-data');
console.log(computeResult);
// { pipelineId: '...', jobName: 'process-data', status: 'done', artifact: {...} }

С MongoDB хранилищем

import mongoose from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';

// Создание модели
const PipelineModel = mongoose.model('Pipeline', PipelineSchema);

// Создание manager
const storage = new MongoPipelineStorage(PipelineModel);
const manager = new PipelineManager({ storage, logger: console });

manager.registerPipeline(myPipelineConfig);

API Reference

PipelineManager

constructor(options: PipelineManagerOptions)

interface PipelineManagerOptions {
  storage: PipelineStorage; // Обязательно
  logger?: JobLogger; // Опционально
}

registerPipeline(config: PipelineConfig): void

Регистрирует конфигурацию pipeline. Должен быть вызван до startPipeline.

startPipeline<TData>(pipelineType: string, input: PipelineInput<TData>): Promise<StartPipelineResponse>

Запускает pipeline или возвращает существующий (если найден по хешу входных данных).

interface PipelineInput<TData> {
  data: TData; // Входные данные
  jobOptions?: Record<string, unknown>; // Опции для jobs (ключ = имя job)
}

interface StartPipelineResponse {
  pipelineId: string; // ID для polling
  isNew: boolean; // true если создан, false если уже существовал
}

getStatus(pipelineId: string): Promise<PipelineStatusResponse>

Возвращает текущий статус pipeline.

interface PipelineStatusResponse {
  status: 'processing' | 'awaiting_manual' | 'done' | 'error';
  currentJobIndex: number;
  totalJobs: number;
  currentJobName?: string;
  error?: { message: string; jobName?: string };
}

runManualJob(pipelineId: string, jobName: string, options?: StartPipelineOptions): Promise<void>

Запускает manual job (меняет статус с awaiting_manual на pending и возобновляет выполнение pipeline).

getResult(pipelineId: string, jobName?: string): Promise<PipelineResultResponse>

Возвращает результат (артефакт) одной job. Если jobName не передан, возвращает результат последней job.

interface PipelineResultResponse {
  pipelineId: string;
  jobName: string;
  status: 'pending' | 'awaiting_manual' | 'processing' | 'done' | 'error';
  artifact: unknown | null | undefined; // undefined = ещё не выполнена, null = выполнена без результата
}

getPipeline(pipelineId: string): Promise<PipelineState | null>

Возвращает полное состояние pipeline (для отладки).

JobDefinition

interface JobDefinition<TInput, TOutput, TOptions> {
  name: string;
  execute: (
    input: TInput,
    options: TOptions | undefined,
    context: JobContext
  ) => Promise<TOutput | null>;
}

JobContext

interface JobContext {
  pipelineId: string;
  jobIndex: number;
  logger: {
    info: (msg: string, data?: Record<string, unknown>) => void;
    error: (msg: string, data?: Record<string, unknown>) => void;
    warn: (msg: string, data?: Record<string, unknown>) => void;
  };
}

PipelineConfig

interface PipelineConfig<TInput> {
  name: string;
  stages: PipelineStage<TInput>[];
  computeInputHash?: (input: TInput) => string;
}

// Stage: одна job или массив jobs (параллельно)
type PipelineStage<TPipelineInput> = StageItem<TPipelineInput> | StageItem<TPipelineInput>[];

// StageItem: JobDefinition или JobInPipeline
type StageItem<TPipelineInput> =
  | JobDefinition<any, any, any>
  | JobInPipeline<TPipelineInput, any, any, any>;

interface JobInPipeline<TPipelineInput, TInput, TOutput, TOptions> {
  job: JobDefinition<TInput, TOutput, TOptions>;
  synapses?: (ctx: SynapseContext<TPipelineInput>) => TInput;
  retries?: number; // Количество ретраев при ошибке (по умолчанию: 0)
  retryDelay?: number; // Задержка между ретраями в мс (по умолчанию: 1000)
  manual?: boolean; // Job требует ручного запуска (статус: awaiting_manual)
}

SynapseContext

Контекст для функции synapses:

interface SynapseContext<TPipelineInput> {
  pipelineInput: TPipelineInput;
  getArtifact: <T>(jobName: string) => T | undefined;
}

Хранилище (Storage)

InMemoryPipelineStorage

Встроенное in-memory хранилище для тестов и прототипирования.

import { InMemoryPipelineStorage } from 'neuroline';

const storage = new InMemoryPipelineStorage();

// Для тестов
storage.clear(); // Очистить все данные
storage.getAll(); // Получить все pipelines

MongoPipelineStorage

MongoDB хранилище (требует mongoose как peer dependency).

import mongoose from 'mongoose';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';

const PipelineModel = mongoose.model('Pipeline', PipelineSchema);
const storage = new MongoPipelineStorage(PipelineModel);

Кастомное хранилище

Реализуйте интерфейс PipelineStorage:

import type { PipelineStorage, PipelineState, JobStatus, PipelineStatus, PaginatedResult, PaginationParams } from 'neuroline';

class MyCustomStorage implements PipelineStorage {
    async findById(pipelineId: string): Promise<PipelineState | null> { ... }
    async findAll(params?: PaginationParams): Promise<PaginatedResult<PipelineState>> { ... }
    async create(state: PipelineState): Promise<PipelineState> { ... }
    async delete(pipelineId: string): Promise<boolean> { ... }
    async updateStatus(pipelineId: string, status: PipelineStatus): Promise<void> { ... }
    async updateJobStatus(pipelineId: string, jobIndex: number, status: JobStatus, startedAt?: Date): Promise<void> { ... }
    async updateJobArtifact(pipelineId: string, jobIndex: number, artifact: unknown, finishedAt: Date): Promise<void> { ... }
    async appendJobError(pipelineId: string, jobIndex: number, error: JobError, isFinal: boolean, finishedAt?: Date): Promise<void> { ... }
    async updateCurrentJobIndex(pipelineId: string, jobIndex: number): Promise<void> { ... }
    async updateJobInput(pipelineId: string, jobIndex: number, input: unknown, options?: unknown): Promise<void> { ... }
    async updateJobRetryCount(pipelineId: string, jobIndex: number, retryCount: number, maxRetries: number): Promise<void> { ... }
    async findAndTimeoutStaleJobs(timeoutMs?: number): Promise<number> { ... }  // Для stale jobs watchdog
}

Интеграция с NestJS

import { Module, OnModuleInit } from '@nestjs/common';
import { MongooseModule, InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';

@Module({
  imports: [MongooseModule.forFeature([{ name: 'Pipeline', schema: PipelineSchema }])],
})
export class PipelineModule implements OnModuleInit {
  private manager: PipelineManager;

  constructor(
    @InjectModel('Pipeline') private pipelineModel: Model<any>,
    private logger: Logger
  ) {
    const storage = new MongoPipelineStorage(this.pipelineModel);
    this.manager = new PipelineManager({
      storage,
      logger: {
        info: (msg, data) => this.logger.log({ msg, ...data }),
        error: (msg, data) => this.logger.error({ msg, ...data }),
        warn: (msg, data) => this.logger.warn({ msg, ...data }),
      },
    });
  }

  onModuleInit() {
    this.manager.registerPipeline(myPipelineConfig);
  }
}

Stages и параллельное выполнение

Pipeline
├── Stage 1: [jobA]                    ← последовательно
├── Stage 2: [jobB, jobC, jobD]        ← параллельно внутри stage
├── Stage 3: [jobE]                    ← последовательно
└── Stage 4: [jobF, jobG]              ← параллельно внутри stage
  • Stages выполняются последовательно (Stage 2 начнётся только после завершения Stage 1)
  • Jobs внутри stage выполняются параллельно
  • Если любая job в stage завершается с ошибкой (после всех ретраев), весь pipeline помечается как error

Механизм Retry

Jobs можно настроить на автоматический retry при ошибке:

const config: PipelineConfig = {
    name: 'my-pipeline',
    stages: [
        // Job с 2 ретраями и задержкой 1.5с между попытками
        {
            job: unstableJob,
            synapses: (ctx) => ({ ... }),
            retries: 2,        // Максимум 3 попытки (1 начальная + 2 ретрая)
            retryDelay: 1500,  // Ожидание 1.5 секунды между ретраями
        },
        // Job без ретраев (поведение по умолчанию)
        normalJob,
    ],
};
  • retries: Количество дополнительных попыток после первой ошибки (по умолчанию: 0)
  • retryDelay: Задержка в миллисекундах перед каждым ретраем (по умолчанию: 1000)
  • retryCount и maxRetries отслеживаются в состоянии job для мониторинга

Manual Jobs

Jobs можно пометить как manual, чтобы pipeline останавливался и ждал ручного запуска:

const config: PipelineConfig = {
  name: 'my-pipeline',
  stages: [initJob, { job: approvalJob, synapses: toApproval, manual: true }, finalizeJob],
};

Когда stage с manual jobs достигнут, автоматические jobs выполняются как обычно. Когда остаются только manual jobs, pipeline переходит в статус awaiting_manual.

// Запуск manual job
await manager.runManualJob(pipelineId, 'approval');

// Manual job можно промоутить до достижения её stage —
// тогда она выполнится автоматически при старте stage
await manager.runManualJob(pipelineId, 'approval');
  • manual: true в JobInPipeline — job создаётся со статусом awaiting_manual
  • Поддерживаются смешанные stage (manual + автоматические jobs)
  • Статус pipeline становится awaiting_manual при ожидании ручного запуска
  • Stale jobs watchdog игнорирует awaiting_manual pipeline и jobs

Client API:

// Запуск manual job и возобновление polling
await client.runManualJob(pipelineId, 'approval');

// Или с polling в одном вызове
const polling = await client.runManualJobAndPoll(pipelineId, 'approval', onUpdate, onError);

Cacheable Jobs

Jobs можно пометить как cacheable, чтобы пропускать повторное выполнение при тех же входных данных. Ключ кеша — { jobName, inputHash }, где inputHash = SHA-256(jobName + input + options).

const config: PipelineConfig = {
  name: 'my-pipeline',
  stages: [
    // Cacheable job — результат сохраняется в кеш после первого выполнения
    { job: expensiveJob, synapses: toExpensive, cacheable: true },
    // Обычная job — выполняется всегда
    normalJob,
  ],
};
  • cacheable: true в JobInPipeline — включает кеширование для job
  • Кеш глобальный по jobName — разделяется между всеми pipeline, использующими эту job (имя job = реализация)
  • Кеш не зависит от жизненного цикла pipeline — удаление pipeline не очищает кеш
  • restartPipelineFromJob обходит кеш для перезапускаемой job и обновляет его новым результатом
  • Реализации в storage:
    • InMemoryPipelineStorage — кеш в Map
    • MongoPipelineStorage — кеш в отдельной коллекции job_caches

Инвалидация кеша: если реализация execute изменилась, кеш необходимо очистить вручную (удалить коллекцию job_caches или вызвать storage.clear() для in-memory).

Stale Jobs Watchdog

Если процесс падает во время выполнения джобы, она может навсегда остаться в статусе processing ("зависшая джоба"). Watchdog отслеживает и автоматически таймаутит такие джобы.

const manager = new PipelineManager({ storage, logger });

// Запуск watchdog (проверка раз в минуту, таймаут через 20 минут)
manager.startStaleJobsWatchdog({
  checkIntervalMs: 60_000, // Проверка каждую минуту
  jobTimeoutMs: 20 * 60_000, // Таймаут через 20 минут
  onStaleJobsFound: (count) => console.log(`Timed out ${count} stale jobs`),
});

// Остановка watchdog при shutdown
manager.stopStaleJobsWatchdog();

// Проверка работает ли watchdog
manager.isWatchdogRunning();

// Ручная проверка (полезно для тестов)
const timedOutCount = await manager.timeoutStaleJobs();

StaleJobsWatchdogOptions

interface StaleJobsWatchdogOptions {
  checkIntervalMs?: number; // По умолчанию: 60000 (1 минута)
  jobTimeoutMs?: number; // По умолчанию: 1200000 (20 минут)
  onStaleJobsFound?: (count: number) => void;
}

Idempotency (идемпотентность)

Pipeline идентифицируется по хешу входных данных:

// Первый вызов — создаёт pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-pipeline', {
  data: { url: 'https://example.com' },
});
// isNew = true

// Повторный вызов с теми же данными — возвращает существующий
const result2 = await manager.startPipeline('my-pipeline', {
  data: { url: 'https://example.com' },
});
// result2.pipelineId === pipelineId
// result2.isNew = false

Для кастомного хеширования:

const config: PipelineConfig<MyInput> = {
    name: 'my-pipeline',
    stages: [...],
    computeInputHash: (input) => `${input.userId}-${input.date}`,
};

Инвалидация при изменении конфигурации

При изменении структуры pipeline (добавление/удаление/переименование jobs) старые записи автоматически инвалидируются:

// Версия 1: pipeline с двумя jobs
const configV1: PipelineConfig = {
  name: 'my-pipeline',
  stages: [jobA, jobB],
};
manager.registerPipeline(configV1);

// Запускаем — создаётся запись в storage
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// Pipeline сохраняется с configHash = hash(['jobA', 'jobB'])

// Версия 2: добавили jobC
const configV2: PipelineConfig = {
  name: 'my-pipeline',
  stages: [jobA, jobB, jobC],
};
manager.registerPipeline(configV2);

// Запускаем с теми же данными
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// configHash изменился → старая запись удаляется → pipeline запускается заново

Это полезно при:

  • Добавлении новых jobs в pipeline
  • Удалении устаревших jobs
  • Изменении порядка выполнения
  • Переименовании jobs

Типы

Все типы доступны для импорта:

import type {
  // Job
  JobDefinition,
  JobContext,
  JobLogger,
  JobStatus,
  JobState, // JobState<TInput, TOutput, TOptions> с generics

  // Pipeline
  PipelineConfig,
  PipelineStage,
  PipelineInput,
  PipelineStatus,
  PipelineState,

  // Synapse
  SynapseContext,
  JobInPipeline,
  StageItem,

  // Responses
  StartPipelineResponse,
  PipelineStatusResponse,
  PipelineResultResponse,

  // Storage
  PipelineStorage,
  PaginatedResult,
  PaginationParams,

  // Watchdog
  StaleJobsWatchdogOptions,
} from 'neuroline';

// MongoDB типы (отдельный импорт)
import type { MongoPipelineDocument, MongoPipelineJobState } from 'neuroline/mongo';

JobState с Generics

JobState теперь поддерживает generic типы для input, output (artifact) и options:

interface JobError {
  message: string;
  stack?: string;
  attempt?: number;
  logs?: string[];
  data?: unknown;
}

interface JobState<TInput = unknown, TOutput = unknown, TOptions = unknown> {
  name: string;
  status: JobStatus;
  input?: TInput; // Входные данные (вычисленные synapses)
  options?: TOptions; // Опции job (из jobOptions)
  artifact?: TOutput; // Выходные данные (результат execute)
  errors: JobError[];
  startedAt?: Date;
  finishedAt?: Date;
}

Клиентский API (neuroline/client)

Клиентский модуль для взаимодействия с Pipeline API из браузера.

Примечание: Один клиент = один pipeline. URL определяет какой pipeline запускается.

PipelineClient

import { PipelineClient } from 'neuroline/client';

// Клиент для конкретного pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });

// Запуск pipeline
const { pipelineId, isNew } = await client.start({
  input: { userId: 123 },
  jobOptions: { 'fetch-data': { timeout: 5000 } },
});

// Получение статуса
const status = await client.getStatus(pipelineId);

// Получение результата (артефакта) последней job
const result = await client.getResult(pipelineId);

// Получение деталей job (input, options, artifact)
const jobDetails = await client.getJobDetails(pipelineId, 'fetch-data');

// Запуск manual job
await client.runManualJob(pipelineId, 'approval');

// Запуск manual job с polling до завершения pipeline
const polling = await client.runManualJobAndPoll(pipelineId, 'approval', onUpdate, onError);

Polling

// Ручной polling
const { stop, completed } = client.poll(pipelineId, (event) => {
  console.log('Статус:', event.status.status);
});

// Ожидание завершения
const finalEvent = await completed;

// Или ручная остановка polling
stop();

Запуск с Polling

// Клиент для конкретного pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });

// Запуск pipeline и немедленное начало polling
const { pipelineId, stop, completed } = await client.startAndPoll(
  {
    input: { url: 'https://example.com' },
  },
  (event) => {
    // Вызывается на каждый poll
    console.log('Прогресс:', event.status.currentJobIndex, '/', event.status.totalJobs);
  },
  (error) => {
    // Вызывается при ошибке
    console.error('Ошибка pipeline:', error);
  }
);

Фабрика React хука

import { useState, useCallback, useEffect, useRef, useMemo } from 'react';
import { createUsePipelineHook, PipelineClient } from 'neuroline/client';

// Создание хука с зависимостями React
const usePipeline = createUsePipelineHook({ useState, useCallback, useEffect, useRef });

// В компоненте
function MyComponent() {
  // Один клиент на pipeline
  const client = useMemo(() => new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' }), []);
  const { start, status, isRunning, error } = usePipeline(client);

  const handleStart = async () => {
    await start({ input: { userId: 123 } });
  };

  return (
    <div>
      <button onClick={handleStart} disabled={isRunning}>Запуск</button>
      {status && <div>Статус: {status.status}</div>}
    </div>
  );
}

Exports

| Import path | Содержимое | | ------------------ | ------------------------------------------------------------------ | | neuroline | Core: типы, PipelineManager, InMemoryPipelineStorage | | neuroline/mongo | MongoDB: MongoPipelineStorage, PipelineSchema, типы документов | | neuroline/client | Client: PipelineClient, createUsePipelineHook, типы |

License

UNLICENSED