swarm-dispatch
v0.3.2
Published
Standardized task dispatch for multi-agent systems — poll, claim, spawn, monitor, retry, reconcile
Maintainers
Readme
swarm-dispatch
Standardized task dispatch for multi-agent systems. Poll a backlog, claim work, spawn agents, monitor completion, retry failures, reconcile external state changes.
Runtime-agnostic — bring your own task source and agent runtime.
Install
npm install swarm-dispatchQuick Start
import { createTaskDispatcher, createOpenTasksSource } from "swarm-dispatch";
const dispatcher = createTaskDispatcher(
// Task source — where to find work
createOpenTasksSource(opentasksClient),
// Agent runtime — how to spawn agents
{
spawn: async ({ prompt, taskId, role }) => {
const agent = await myAgentSystem.spawn({ task: prompt, role });
return { id: agent.id };
},
terminate: async (agentId, reason) => {
await myAgentSystem.terminate(agentId, reason);
},
onStopped: (callback) => {
return myAgentSystem.onLifecycleEvent((event) => {
if (event.type === "stopped") callback(event.agentId, event.reason);
});
},
},
// Config
{
claimantId: `${hostname}:${pid}`,
pollIntervalMs: 15_000,
defaultRole: "worker",
concurrency: { global: 5 },
retry: { maxRetries: 3, baseDelayMs: 10_000, maxDelayMs: 300_000 },
reconcile: { enabled: true, intervalMs: 60_000 },
}
);
await dispatcher.start();
// Observe events
dispatcher.onEvent((event) => {
console.log(event.type, event);
});
// Manual trigger
await dispatcher.dispatchNow();
await dispatcher.reconcileNow();
// Shutdown
await dispatcher.stop();Architecture
┌──────────────────────────────────────────────────────────┐
│ swarm-dispatch │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ Poll Timer │ │ Reconcile │ │ Lifecycle │ │
│ │ (setInterval)│ │ Timer │ │ Listener │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬─────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Dispatch Tracker │ │
│ │ active map · retry queue · dimension counters │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ │
│ │ Eligibility │ │ Prompt │ │
│ │ Checker │ │ Builder │ │
│ └─────────────┘ └──────────────┘ │
└─────────┬─────────────────────────────────┬──────────────┘
│ │
▼ ▼
DispatchTaskSource DispatchAgentRuntime
(your task system) (your agent system)Interfaces
DispatchTaskSource
Where tasks come from. Implement this for your task system.
interface DispatchTaskSource {
queryReady(opts?): Promise<DispatchTask[]>;
claim(taskId, claimantId): Promise<ClaimResult>;
release(taskId, claimantId): Promise<void>;
transition(taskId, action): Promise<void>;
getTask(taskId): Promise<DispatchTask>;
listInProgress?(): Promise<DispatchTask[]>;
}Built-in adapter: createOpenTasksSource(client) for opentasks.
DispatchAgentRuntime
How agents are spawned. Implement this for your agent system.
interface DispatchAgentRuntime {
spawn(opts: { prompt, taskId, role }): Promise<{ id: string }>;
terminate(agentId, reason): Promise<void>;
onStopped(callback): () => void;
}Features
| Feature | Description | |---|---| | Poll loop | Configurable interval, queries task source for ready work | | Eligibility | Static filters (tags, priority, age) + heuristic scoring + custom scorer | | Concurrency | Global limit + per-dimension limits (project, tracker, role, tag) | | Retry | Exponential backoff, configurable max retries | | Reconciliation | Detects external state changes (closed, blocked, reassigned), terminates stale agents | | Prompt builder | Default markdown template, fully replaceable | | Events | Observable dispatch lifecycle (dispatched, completed, failed, retrying, reconciled, poll) | | Recovery | Reconstructs tracker state from task source on boot |
Configuration
interface DispatchConfig {
claimantId: string; // Unique dispatcher instance ID
pollIntervalMs: number; // Poll cadence (default: 15000)
defaultRole: string; // Agent role (default: "worker")
concurrency: {
global: number; // Max concurrent agents (default: 3)
perDimension?: { // Per-dimension limits
project?: Record<string, number>;
tracker?: Record<string, number>;
role?: Record<string, number>;
tag?: Record<string, number>;
};
};
retry: {
maxRetries: number; // Max retries per task (default: 3)
baseDelayMs: number; // Base backoff delay (default: 10000)
maxDelayMs: number; // Max backoff cap (default: 300000)
};
eligibility?: {
tags?: string[]; // Only dispatch tasks with these tags
excludeTags?: string[]; // Skip tasks with these tags
minPriority?: number; // Priority threshold
minScore?: number; // Eligibility score threshold (0-1)
scorer?: (task) => EligibilityScore; // Custom scorer
};
reconcile?: {
enabled: boolean; // Enable reconciliation (default: true)
intervalMs: number; // Reconcile cadence (default: 60000)
shouldStop?: (task, dispatch) => boolean; // Custom stop check
};
promptBuilder?: PromptBuilder; // Custom prompt assembly
tags?: string[]; // Tag filter for queryReady
}License
MIT
