intentkit-queue
v1.0.1
Published
BullMQ job queue adapter for IntentKit
Maintainers
Readme
intentkit-queue
BullMQ job queue adapter for IntentKit — background job management via the provider system.
This adapter lets AI agents (like Claude via Dispatch) add jobs to a queue, check job status, list jobs by state, and view queue statistics through MCP. It handles queue management only — workers that process jobs are user-defined, keeping your processing logic separate from the agent interface.
Install
npm install intentkit-queueRequires a running Redis instance (BullMQ uses Redis as its backing store).
Quick Start
import { defineFunction, IntentRegistry, createContext, serve, z } from 'intentkit';
import { createQueueProvider, type QueueClient } from 'intentkit-queue';
// Register your functions
const registry = new IntentRegistry().register(enqueueJob, getJob, listJobs);
// Create context (no database needed for queue-only projects)
const context = await createContext({ events: true });
// Boot MCP server with queue provider
await serve({
name: 'my-queue-agent',
registry,
context,
providers: [
createQueueProvider({
queueName: 'tasks',
redis: { host: '127.0.0.1', port: 6379 },
}),
],
});Configuration
| Option | Default | Description |
|--------|---------|-------------|
| queueName | — | Default queue name (required) |
| name | 'queue' | Provider name in ctx.providers |
| redis.host | '127.0.0.1' | Redis hostname |
| redis.port | 6379 | Redis port |
| redis.password | — | Redis password |
| redis.db | — | Redis database number |
| redisUrl | — | Redis URL (alternative to redis options) |
| defaultJobOptions.attempts | 3 | Default retry attempts |
| defaultJobOptions.backoff | — | Backoff strategy ({ type: 'exponential' \| 'fixed', delay: ms }) |
| defaultJobOptions.removeOnComplete | — | Auto-remove completed jobs (true or keep last N) |
| defaultJobOptions.removeOnFail | — | Auto-remove failed jobs (true or keep last N) |
Common Provider Configs
Local Redis:
createQueueProvider({
queueName: 'my-jobs',
redis: { host: '127.0.0.1', port: 6379 },
})Redis with password:
createQueueProvider({
queueName: 'my-jobs',
redis: { host: 'redis.example.com', port: 6379, password: process.env.REDIS_PASSWORD! },
})Redis URL (Upstash, Redis Cloud, Railway, etc.):
createQueueProvider({
queueName: 'my-jobs',
redisUrl: process.env.REDIS_URL!,
})With default job options:
createQueueProvider({
queueName: 'my-jobs',
redis: { host: '127.0.0.1' },
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 50, // Keep last 50 failed jobs
},
})Using in Functions
Access the queue client via ctx.providers.queue:
import { defineFunction, z } from 'intentkit';
import type { QueueClient } from 'intentkit-queue';
export const scheduleReport = defineFunction({
name: 'schedule_report',
intent: 'Schedule a report to be generated in the background',
permissions: ['report:create'],
requires: ['queue'], // Validates provider exists at startup
input: z.object({
report_type: z.string(),
parameters: z.record(z.unknown()),
}),
output: z.object({
job_id: z.string(),
state: z.string(),
}),
execute: async (input, ctx) => {
const queue = ctx.providers.queue as QueueClient;
const job = await queue.addJob('generate-report', {
type: input.report_type,
params: input.parameters,
});
return { job_id: job.id, state: job.state };
},
});Example Functions
The package includes 5 ready-to-use functions in functions/jobs.ts:
| Function | Intent | Permission |
|----------|--------|------------|
| enqueue_job | Add a job to the background queue | queue:write |
| get_job | Get the status and data of a specific job | queue:read |
| list_jobs | List jobs filtered by state | queue:read |
| remove_job | Remove a job from the queue | queue:write |
| get_queue_stats | Get queue statistics by state | queue:read |
Import and register them:
import { enqueueJob, getJob, listJobs, removeJob, getQueueStats } from 'intentkit-queue/functions';
const registry = new IntentRegistry()
.register(enqueueJob, getJob, listJobs, removeJob, getQueueStats);Job Lifecycle
Jobs in BullMQ follow this lifecycle:
enqueue_job (agent adds job)
↓
waiting → active → completed
↘ failed (retries based on attempts/backoff config)
↘ waiting (retry) → active → ...- waiting — Job is in the queue, waiting for a worker to pick it up
- active — A worker is currently processing the job
- completed — Worker finished successfully (returnValue available)
- failed — Worker threw an error (failedReason available)
- delayed — Job was added with a
delayoption and is not yet available
This adapter manages the queue side. To process jobs, create a BullMQ Worker in your own code:
import { Worker } from 'bullmq';
const worker = new Worker('tasks', async (job) => {
// Your processing logic here
console.log(`Processing ${job.name}:`, job.data);
return { result: 'done' };
}, {
connection: { host: '127.0.0.1', port: 6379 },
});QueueClient API
The full client interface for custom function implementations:
interface QueueClient {
// Jobs
addJob(name: string, data: Record<string, unknown>, options?: AddJobOptions): Promise<JobData>;
getJob(jobId: string): Promise<JobData | null>;
listJobs(state?: 'waiting' | 'active' | 'completed' | 'failed' | 'delayed', start?: number, end?: number): Promise<JobData[]>;
removeJob(jobId: string): Promise<boolean>;
// Stats
getQueueStats(): Promise<QueueStats>;
// Connection
ping(): Promise<boolean>;
disconnect(): Promise<void>;
}Claude Desktop Config
Add to ~/Library/Application Support/Claude/claude_desktop_config.json:
{
"mcpServers": {
"queue": {
"command": "node",
"args": ["path/to/your/serve.js"],
"env": {
"REDIS_URL": "redis://localhost:6379"
}
}
}
}Architecture
Claude (Dispatch / Desktop)
↓ MCP tool call
IntentKit (serve + permissions + hooks)
↓ ctx.providers.queue
intentkit-queue (QueueClientImpl)
↓
BullMQ
↓
Redis
↑
Worker (user-defined, separate process)The Redis connection is created at server startup and verified with a ping. healthCheck() pings Redis again. The connection is closed on shutdown via IntentKit's lifecycle manager.
License
MIT
