@microfox/ai-worker
v1.0.3
Published
Background worker runtime for ai-router - SQS-based async agent execution
Readme
@microfox/ai-worker
Background worker runtime for ai-router - SQS-based async agent execution.
Overview
@microfox/ai-worker enables you to run long-running AI agents asynchronously on AWS Lambda, triggered via SQS queues. This allows you to bypass Vercel's timeout limits while maintaining a unified developer experience.
Features
- Unified DX: Define agent logic in one place (
app/ai/agents/...), deploy automatically to Lambda - SQS-based: Reliable message queuing with automatic retries
- Webhook callbacks: Receive completion notifications back to your Next.js app
- Local development: Run handlers immediately in development mode
- Type-safe: Full TypeScript support with Zod schema validation
Installation
npm install @microfox/ai-workerQuick Start
1. Create a Background Worker
// app/ai/agents/video-processing.worker.ts
import { createWorker, type WorkerConfig } from '@microfox/ai-worker';
import { z } from 'zod';
// Export workerConfig separately (best practice - CLI extracts this automatically)
export const workerConfig: WorkerConfig = {
timeout: 900, // 15 minutes
memorySize: 2048, // 2GB
// Optional: Lambda layers
// layers: ['arn:aws:lambda:${aws:region}:${aws:accountId}:layer:ffmpeg:1'],
};
export const videoProcessingAgent = createWorker({
id: 'video-processing',
inputSchema: z.object({ url: z.string() }),
outputSchema: z.object({ processedUrl: z.string() }),
handler: async ({ input, ctx }) => {
// This runs on AWS Lambda
const result = await heavyVideoProcessing(input.url);
return { processedUrl: result };
},
});2. Dispatch from an Orchestrator
// app/ai/orchestrator.ts
import { videoProcessingAgent } from './agents/video-processing.worker';
// Dispatch to background worker
const result = await videoProcessingAgent.dispatch(
{ url: 'https://example.com/video.mp4' },
{
webhookUrl: 'https://myapp.com/api/ai/callback', // optional
mode: 'remote', // optional: "auto" | "local" | "remote"
jobId: 'unique-job-id', // Optional
metadata: { userId: '123' }, // Optional
}
);
// Returns: { messageId: string, status: 'queued', jobId: string }3. Handle Webhook Callbacks
// app/api/ai/callback/route.ts
import { NextRequest, NextResponse } from 'next/server';
export async function POST(request: NextRequest) {
const { jobId, workerId, status, output, error } = await request.json();
if (status === 'success') {
// Update your database, trigger follow-up agents, etc.
await updateJobStatus(jobId, 'completed', output);
} else {
// Handle error
await updateJobStatus(jobId, 'failed', error);
}
return NextResponse.json({ success: true });
}4. Deploy Workers
# Scan app/ai/**/*.worker.ts and deploy to AWS
npx @microfox/ai-worker-cli@latest pushConfiguration
Environment Variables
Required for Next.js:
WORKER_BASE_URL- Base URL of your workers service (server-side only). We append/workers/triggerand/workers/configinternally when needed (e.g.https://.../prod). For client-side, useuseWorkflowJobwhich calls your app's/api/workflows/*routes.WORKERS_TRIGGER_API_KEY- Optional API key for trigger authentication (sent asx-workers-trigger-key)
Required for Lambda (set via deploy script):
AWS_REGION- AWS region for SQS/LambdaSTAGE- Deployment stage (dev/stage/prod)MONGODB_URIorDATABASE_MONGODB_URI- For job store (and internalJobs / await polling).- Any secrets your workers need (OPENAI_KEY, DATABASE_URL, etc.)
Worker-to-worker (Lambda): When a worker calls another via ctx.dispatchWorker, the CLI injects WORKER_QUEUE_URL_<SANITIZED_ID> (e.g. WORKER_QUEUE_URL_COST_USAGE_AI) into that function’s environment. Same-service callees get this automatically; cross-service callees require setting the env var manually.
Worker Configuration
Best Practice: Export workerConfig as a separate const from your worker file:
import { type WorkerConfig } from '@microfox/ai-worker';
export const workerConfig: WorkerConfig = {
timeout: 300, // Lambda timeout in seconds (max 900)
memorySize: 512, // Lambda memory in MB (128-10240)
layers: ['arn:aws:lambda:${aws:region}:${aws:accountId}:layer:ffmpeg:1'], // Optional Lambda layers
};The CLI will automatically extract this configuration when generating serverless.yml. You do not need to pass it to createWorker().
Architecture
┌─────────────┐
│ Next.js │
│ Orchestrator│
└──────┬──────┘
│ dispatch()
▼
┌─────────────┐
│ AWS SQS │
│ Queue │
└──────┬──────┘
│ trigger
▼
┌─────────────┐
│AWS Lambda │
│ Worker │
└──────┬──────┘
│ POST
▼
┌─────────────┐
│ Webhook │
│ Callback │
└─────────────┘API Reference
createWorker<INPUT, OUTPUT>(config)
Creates a background agent with the specified configuration.
Parameters:
id: string- Unique worker IDinputSchema: ZodType<INPUT>- Input validation schemaoutputSchema: ZodType<OUTPUT>- Output validation schemahandler: WorkerHandler<INPUT, OUTPUT>- Handler functionworkerConfig?: WorkerConfig- Deprecated: Prefer exportingworkerConfigas a separate const
Returns: BackgroundAgent<INPUT, OUTPUT> with a dispatch() method
dispatch(input, options)
Dispatches a job to the background worker.
Parameters:
input: INPUT- Input data (validated againstinputSchema)options: { webhookUrl?: string, jobId?: string, metadata?: Record<string, any> }
Returns: Promise<{ messageId: string, status: 'queued', jobId: string }>
Worker-to-worker: ctx.dispatchWorker(workerId, input, options?)
Inside a worker handler, call another worker (fire-and-forget or await):
handler: async ({ ctx }) => {
await ctx.dispatchWorker('other-worker', {}, { await: true });
};- Fire-and-forget:
ctx.dispatchWorker(id, input)— enqueues and returns{ jobId, messageId }. Parent job’sinternalJobsis appended. - Await:
ctx.dispatchWorker(id, input, { await: true })— enqueues, appends tointernalJobs, then polls the job store until the child completes or fails. Returns{ jobId, messageId, output }or throws. OptionalpollIntervalMs,pollTimeoutMs.
The CLI detects ctx.dispatchWorker('id', ...) and adds WORKER_QUEUE_URL_<ID> to that Lambda’s env. Local dev uses the HTTP trigger when queue URL is not set.
License
MIT
