npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@cogitator-ai/worker

v0.2.2

Published

Distributed job queue for Cogitator agent execution

Downloads

403

Readme

@cogitator-ai/worker

Distributed job queue for Cogitator agent execution. Built on BullMQ for reliable, scalable background processing.

Installation

pnpm add @cogitator-ai/worker ioredis

Features

  • BullMQ-Based - Reliable job processing with Redis
  • Job Types - Agents, workflows, and swarms
  • Auto-Retry - Exponential backoff for failed jobs
  • Priority Queue - Process important jobs first
  • Delayed Jobs - Schedule jobs for later execution
  • Prometheus Metrics - Built-in HPA support
  • Redis Cluster - Production-ready scalability
  • Graceful Shutdown - Wait for active jobs before stopping

Quick Start

Producer: Add Jobs

import { JobQueue } from '@cogitator-ai/worker';

const queue = new JobQueue({
  redis: { host: 'localhost', port: 6379 },
});

const agentConfig = {
  name: 'Assistant',
  instructions: 'You are a helpful assistant.',
  model: 'openai/gpt-4',
  provider: 'openai' as const,
  tools: [],
};

const job = await queue.addAgentJob(agentConfig, 'Hello, world!', {
  threadId: 'user-123',
  priority: 1,
});

console.log(`Job added: ${job.id}`);

Consumer: Process Jobs

import { WorkerPool } from '@cogitator-ai/worker';

const pool = new WorkerPool({
  redis: { host: 'localhost', port: 6379 },
  concurrency: 5,
  workerCount: 2,
});

await pool.start();

Job Queue

The JobQueue class manages job creation and status tracking.

Creating a Queue

import { JobQueue } from '@cogitator-ai/worker';

const queue = new JobQueue({
  name: 'my-queue',
  redis: {
    host: 'localhost',
    port: 6379,
    password: 'secret',
  },
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});

Queue Configuration

interface QueueConfig {
  name?: string; // Default: 'cogitator-jobs'
  redis: {
    host?: string; // Default: 'localhost'
    port?: number; // Default: 6379
    password?: string;
    cluster?: {
      nodes: { host: string; port: number }[];
    };
  };
  defaultJobOptions?: {
    attempts?: number; // Default: 3
    backoff?: {
      type: 'exponential' | 'fixed';
      delay: number; // Delay in ms
    };
    removeOnComplete?: boolean | number; // Default: 100
    removeOnFail?: boolean | number; // Default: 500
  };
}

Adding Jobs

Agent Jobs:

const agentConfig: SerializedAgent = {
  name: 'Researcher',
  instructions: 'Research and summarize topics.',
  model: 'openai/gpt-4',
  provider: 'openai',
  temperature: 0.7,
  maxTokens: 2048,
  tools: [
    {
      name: 'search',
      description: 'Search the web',
      parameters: { type: 'object', properties: { query: { type: 'string' } } },
    },
  ],
};

const job = await queue.addAgentJob(agentConfig, 'Research quantum computing', {
  threadId: 'thread-123',
  priority: 1, // Lower = higher priority
  delay: 5000, // Delay 5 seconds
  metadata: { userId: 'user-456' },
});

Workflow Jobs:

const workflowConfig: SerializedWorkflow = {
  id: 'data-pipeline',
  name: 'Data Pipeline',
  nodes: [
    { id: 'fetch', type: 'agent', config: { agentConfig: fetchAgent } },
    { id: 'process', type: 'transform', config: { transform: 'uppercase' } },
    { id: 'store', type: 'agent', config: { agentConfig: storeAgent } },
  ],
  edges: [
    { from: 'fetch', to: 'process' },
    { from: 'process', to: 'store' },
  ],
};

await queue.addWorkflowJob(
  workflowConfig,
  { source: 'api' },
  {
    runId: 'run-789',
    priority: 2,
  }
);

Swarm Jobs:

const swarmConfig: SerializedSwarm = {
  topology: 'collaborative',
  agents: [researcherConfig, writerConfig, editorConfig],
  coordinator: coordinatorConfig,
  maxRounds: 3,
  consensusThreshold: 0.8,
};

await queue.addSwarmJob(swarmConfig, 'Write an article about AI', {
  priority: 1,
  metadata: { project: 'blog' },
});

Queue Methods

const job = await queue.getJob('job-id');

const state = await queue.getJobState('job-id');
// 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' | 'unknown'

const metrics = await queue.getMetrics();

await queue.pause();
await queue.resume();

await queue.clean(60 * 60 * 1000, 1000, 'completed');
await queue.clean(24 * 60 * 60 * 1000, 100, 'failed');

const bullQueue = queue.getQueue();

await queue.close();

Worker Pool

The WorkerPool processes jobs with configurable concurrency.

Creating a Worker Pool

import { WorkerPool } from '@cogitator-ai/worker';

const pool = new WorkerPool(
  {
    redis: { host: 'localhost', port: 6379 },
    workerCount: 2,
    concurrency: 5,
    lockDuration: 30000,
    stalledInterval: 30000,
  },
  {
    onJobStarted: (jobId, type) => {
      console.log(`Job ${jobId} (${type}) started`);
    },
    onJobCompleted: (jobId, result) => {
      console.log(`Job ${jobId} completed:`, result);
    },
    onJobFailed: (jobId, error) => {
      console.error(`Job ${jobId} failed:`, error);
    },
    onWorkerError: (error) => {
      console.error('Worker error:', error);
    },
  }
);

await pool.start();

Worker Configuration

interface WorkerConfig extends QueueConfig {
  workerCount?: number; // Default: 1
  concurrency?: number; // Default: 5
  lockDuration?: number; // Default: 30000ms
  stalledInterval?: number; // Default: 30000ms
}

| Option | Default | Description | | ----------------- | ------- | ------------------------------------------ | | workerCount | 1 | Number of worker instances | | concurrency | 5 | Concurrent jobs per worker | | lockDuration | 30000 | Lock timeout before job considered stalled | | stalledInterval | 30000 | Interval to check for stalled jobs |

Worker Events

interface WorkerPoolEvents {
  onJobStarted?: (jobId: string, type: 'agent' | 'workflow' | 'swarm') => void;
  onJobCompleted?: (jobId: string, result: JobResult) => void;
  onJobFailed?: (jobId: string, error: Error) => void;
  onWorkerError?: (error: Error) => void;
}

Pool Methods

await pool.start();

pool.isPoolRunning();

pool.getWorkerCount();

const metrics = await pool.getMetrics(await queue.getMetrics());

// Graceful shutdown (waits up to 30s for active jobs)
await pool.stop(30000);

// Force shutdown
await pool.forceStop();

Job Processors

Built-in processors handle each job type.

Using Processors Directly

import { processAgentJob, processWorkflowJob, processSwarmJob } from '@cogitator-ai/worker';

const agentResult = await processAgentJob({
  type: 'agent',
  jobId: 'job-1',
  agentConfig: myAgentConfig,
  input: 'Hello!',
  threadId: 'thread-1',
});

const workflowResult = await processWorkflowJob({
  type: 'workflow',
  jobId: 'job-2',
  workflowConfig: myWorkflowConfig,
  input: { data: [] },
  runId: 'run-1',
});

const swarmResult = await processSwarmJob({
  type: 'swarm',
  jobId: 'job-3',
  swarmConfig: mySwarmConfig,
  input: 'Solve this problem',
});

Job Results

Each job type returns a specific result structure.

Agent Job Result

interface AgentJobResult {
  type: 'agent';
  output: string;
  toolCalls: {
    name: string;
    input: unknown;
    output: unknown;
  }[];
  tokenUsage?: {
    prompt: number;
    completion: number;
    total: number;
  };
}

Workflow Job Result

interface WorkflowJobResult {
  type: 'workflow';
  output: Record<string, unknown>;
  nodeResults: Record<string, unknown>;
  duration: number;
}

Swarm Job Result

interface SwarmJobResult {
  type: 'swarm';
  output: string;
  rounds: number;
  agentOutputs: {
    agent: string;
    output: string;
  }[];
}

Prometheus Metrics

Built-in metrics for monitoring and Kubernetes HPA.

Exposing Metrics

import {
  JobQueue,
  WorkerPool,
  MetricsCollector,
  formatPrometheusMetrics,
} from '@cogitator-ai/worker';
import express from 'express';

const queue = new JobQueue({ redis: { host: 'localhost', port: 6379 } });
const pool = new WorkerPool({ redis: { host: 'localhost', port: 6379 } });
const metrics = new MetricsCollector();

const app = express();

app.get('/metrics', async (req, res) => {
  const queueMetrics = await queue.getMetrics();
  const fullMetrics = await pool.getMetrics(queueMetrics);
  res.type('text/plain').send(metrics.format(fullMetrics));
});

app.listen(9090);

Available Metrics

| Metric | Type | Description | | --------------------------------- | --------- | ------------------------------ | | cogitator_queue_depth | gauge | Total waiting + delayed jobs | | cogitator_queue_waiting | gauge | Jobs waiting to be processed | | cogitator_queue_active | gauge | Jobs currently being processed | | cogitator_queue_completed_total | counter | Total completed jobs | | cogitator_queue_failed_total | counter | Total failed jobs | | cogitator_queue_delayed | gauge | Scheduled/delayed jobs | | cogitator_workers_total | gauge | Active workers | | cogitator_job_duration_seconds | histogram | Job processing time | | cogitator_jobs_by_type_total | counter | Jobs by type |

Duration Histogram

import { DurationHistogram } from '@cogitator-ai/worker';

const histogram = new DurationHistogram('my_duration_seconds', 'Custom duration tracking');

histogram.observe(0.5);
histogram.observe(1.2);
histogram.observe(0.8);

console.log(histogram.format({ queue: 'main' }));

histogram.reset();

Metrics Collector

import { MetricsCollector } from '@cogitator-ai/worker';

const collector = new MetricsCollector();

collector.recordJob('agent', 1500);
collector.recordJob('workflow', 3200);

const output = collector.format(queueMetrics, { queue: 'main' });

Kubernetes HPA Example

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: cogitator-workers
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: cogitator-workers
  minReplicas: 1
  maxReplicas: 10
  metrics:
    - type: External
      external:
        metric:
          name: cogitator_queue_depth
        target:
          type: AverageValue
          averageValue: 10

Redis Configuration

Single Node

const queue = new JobQueue({
  redis: {
    host: 'localhost',
    port: 6379,
    password: 'secret',
  },
});

Redis Cluster

const queue = new JobQueue({
  redis: {
    cluster: {
      nodes: [
        { host: 'redis-1', port: 6379 },
        { host: 'redis-2', port: 6379 },
        { host: 'redis-3', port: 6379 },
      ],
    },
    password: 'secret',
  },
});

Serialized Types

Jobs use serialized configurations that can be stored in Redis.

SerializedAgent

interface SerializedAgent {
  name: string;
  instructions: string;
  model: string;
  provider: 'ollama' | 'openai' | 'anthropic';
  temperature?: number;
  maxTokens?: number;
  tools: ToolSchema[];
}

SerializedWorkflow

interface SerializedWorkflow {
  id: string;
  name: string;
  nodes: SerializedWorkflowNode[];
  edges: SerializedWorkflowEdge[];
}

interface SerializedWorkflowNode {
  id: string;
  type: 'agent' | 'transform' | 'condition' | 'parallel';
  config: Record<string, unknown>;
}

interface SerializedWorkflowEdge {
  from: string;
  to: string;
  condition?: string;
}

SerializedSwarm

interface SerializedSwarm {
  topology: 'sequential' | 'hierarchical' | 'collaborative' | 'debate' | 'voting';
  agents: SerializedAgent[];
  coordinator?: SerializedAgent;
  maxRounds?: number;
  consensusThreshold?: number;
}

Examples

Complete Producer/Consumer

Producer (producer.ts):

import { JobQueue } from '@cogitator-ai/worker';

const queue = new JobQueue({
  redis: { host: 'localhost', port: 6379 },
});

async function main() {
  const agentConfig = {
    name: 'Summarizer',
    instructions: 'Summarize the given text concisely.',
    model: 'openai/gpt-4',
    provider: 'openai' as const,
    tools: [],
  };

  const texts = [
    'Long article about technology...',
    'Research paper on climate change...',
    'News story about economics...',
  ];

  for (const text of texts) {
    const job = await queue.addAgentJob(agentConfig, text, {
      priority: 1,
    });
    console.log(`Queued job: ${job.id}`);
  }

  await queue.close();
}

main();

Consumer (consumer.ts):

import { WorkerPool } from '@cogitator-ai/worker';

const pool = new WorkerPool(
  {
    redis: { host: 'localhost', port: 6379 },
    concurrency: 5,
  },
  {
    onJobStarted: (id, type) => console.log(`Starting ${type} job: ${id}`),
    onJobCompleted: (id, result) => console.log(`Completed: ${id}`, result),
    onJobFailed: (id, error) => console.error(`Failed: ${id}`, error),
  }
);

async function main() {
  await pool.start();
  console.log('Worker pool started');

  process.on('SIGTERM', async () => {
    console.log('Shutting down...');
    await pool.stop(30000);
    process.exit(0);
  });
}

main();

Job Status Monitoring

import { JobQueue } from '@cogitator-ai/worker';

const queue = new JobQueue({
  redis: { host: 'localhost', port: 6379 },
});

async function monitorJob(jobId: string) {
  let lastState = '';

  while (true) {
    const state = await queue.getJobState(jobId);

    if (state !== lastState) {
      console.log(`Job ${jobId}: ${state}`);
      lastState = state;
    }

    if (state === 'completed' || state === 'failed') {
      const job = await queue.getJob(jobId);
      if (job) {
        console.log('Result:', await job.returnvalue);
      }
      break;
    }

    await new Promise((r) => setTimeout(r, 1000));
  }
}

Priority Processing

await queue.addAgentJob(config, 'Low priority', { priority: 10 });
await queue.addAgentJob(config, 'Medium priority', { priority: 5 });
await queue.addAgentJob(config, 'High priority', { priority: 1 });
await queue.addAgentJob(config, 'Critical', { priority: 0 });

Delayed Jobs

await queue.addAgentJob(config, 'Run in 5 seconds', { delay: 5000 });
await queue.addAgentJob(config, 'Run in 1 minute', { delay: 60000 });
await queue.addAgentJob(config, 'Run in 1 hour', { delay: 3600000 });

Type Reference

import type {
  // Serialized configs
  SerializedAgent,
  SerializedWorkflow,
  SerializedWorkflowNode,
  SerializedWorkflowEdge,
  SerializedSwarm,

  // Job payloads
  JobPayload,
  AgentJobPayload,
  WorkflowJobPayload,
  SwarmJobPayload,

  // Job results
  JobResult,
  AgentJobResult,
  WorkflowJobResult,
  SwarmJobResult,

  // Configuration
  QueueConfig,
  WorkerConfig,
  QueueMetrics,
} from '@cogitator-ai/worker';

License

MIT