@cloudamqp/a2a-amqp
v0.1.1
Published
AMQP EventBus adapter for A2A protocol with LavinMQ streams support
Readme
a2a-amqp
AMQP-backed EventBus and WorkQueue for scaling A2A agents with long-running tasks.
Why?
A2A agents often need to handle long-running tasks (LLM calls, complex processing, etc.). Running these tasks inline in HTTP handlers causes:
- Timeout issues: HTTP connections timeout on long tasks
- Scaling problems: Single server bottlenecks
- Resource waste: Servers blocked waiting for tasks to complete
This library solves these problems by:
- Queuing tasks via AMQP instead of processing inline
- Distributing work across multiple worker processes
- Event sourcing all task events for replay and recovery
- Streaming results back via SSE while workers process in the background
Architecture
HTTP Request → Server (enqueues task) → Returns immediately
↓
AMQP Queue
↓
Worker Pool (scales horizontally)
↓
Process task & publish events
↓
AMQP Stream (event sourcing)
↓
Client streams results via SSEInstallation
# Installing using bun
bun add @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client
# Or with npm
npm install @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-clientRequires: LavinMQ or RabbitMQ with stream support
docker run -d -p 5672:5672 -p 15672:15672 cloudamqp/lavinmq:latestQuick Start
1. HTTP Server (enqueues tasks)
import { AMQPAgentBackend, QueuingRequestHandler } from "@84codes/a2a-amqp";
import { A2AExpressApp } from "@a2a-js/sdk/server/express";
import express from "express";
// Create AMQP backend
const backend = await AMQPAgentBackend.create({
url: "amqp://localhost:5672",
agentName: "my-agent",
});
// Create request handler (handles task queuing + event projection)
const requestHandler = new QueuingRequestHandler(agentCard, backend);
await requestHandler.initialize();
// Setup Express with A2A routes
const app = express();
new A2AExpressApp(requestHandler).setupRoutes(app, "/");
app.listen(3000);2. Worker Process (processes tasks)
import { AMQPAgentBackend, WorkerEventBus } from "@84codes/a2a-amqp";
import { AgentExecutor, RequestContext } from "@a2a-js/sdk/server";
// Create backend with same agent name as server
const backend = await AMQPAgentBackend.create({
url: "amqp://localhost:5672",
agentName: "my-agent",
});
// Initialize work queue
await backend.workQueue.initialize();
class MyExecutor implements AgentExecutor {
async execute(context: RequestContext, eventBus: ExecutionEventBus) {
// Your long-running task logic here
eventBus.publish({
kind: "status-update",
taskId: context.taskId,
contextId: context.contextId,
status: { state: "working", timestamp: new Date().toISOString() },
final: false,
});
// ... do work ...
eventBus.publish({
kind: "status-update",
taskId: context.taskId,
contextId: context.contextId,
status: { state: "completed", timestamp: new Date().toISOString() },
final: true,
});
eventBus.finished();
}
}
const executor = new MyExecutor();
// Start consuming with async generator pattern
const messages = backend.workQueue.start();
for await (const taskMessage of messages) {
const { taskId, contextId, requestContext } = taskMessage;
// Create request context
const context = new RequestContext(
requestContext.userMessage,
taskId,
contextId,
requestContext.task,
requestContext.referenceTasks
);
// Create event bus for publishing task events
const eventBus = new WorkerEventBus(backend.amqpConnection, taskId, contextId);
// Execute task
await executor.execute(context, eventBus);
}3. Scale horizontally
Run multiple workers to process tasks in parallel:
# Terminal 1: HTTP Server
bun run server
# Terminal 2-N: Workers (scale as needed)
bun run worker
bun run worker # Add more workers for higher throughputFeatures
- Work Queue: Distribute tasks across multiple worker processes
- Event Sourcing: All task events stored in AMQP streams for replay
- In-Memory Projection: Fast task lookups with automatic recovery from streams
- SSE Streaming: Automatic streaming of task events back to clients
- Horizontal Scaling: Add more workers to increase throughput
- Graceful Shutdown: Clean consumer and connection handling
- Type-Safe: Full TypeScript support with Zod validation
Configuration
interface AMQPAgentBackendConfig {
url: string; // AMQP broker URL
agentName: string; // Agent identifier
streamRetention?: string; // Event retention (default: "7d")
streamMaxBytes?: number; // Max stream size (default: 1GB)
workQueueName?: string; // Custom work queue name
exchangeName?: string; // Custom exchange name
logger?: Logger; // Custom logger
connection?: {
heartbeat?: number; // Heartbeat interval in seconds
reconnectDelay?: number; // Reconnection delay in ms
maxReconnectAttempts?: number;// Max reconnection attempts
};
publishing?: {
persistent?: boolean; // Persistent messages (default: true)
confirmMode?: boolean; // Publisher confirms (default: true)
messageTtl?: number; // Message TTL in ms (0 = no expiration)
};
}Examples
See complete working examples:
src/examples/http-server.ts- HTTP server with queuingsrc/examples/worker.ts- Worker process
# Run the example
bun run server # Terminal 1
bun run worker # Terminal 2
# Send a request
curl -X POST http://localhost:3000/ \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"messages/send","params":{"message":{"kind":"message","role":"user","messageId":"1","contextId":"ctx-1","parts":[{"kind":"text","text":"Hello"}]}}}'Testing
bun run test # Run all tests (unit + integration)
bun run test:unit # Unit tests only
bun run test:integration # Integration tests only
bun run test:watch # Watch mode
bun run test:coverage # With coverageLicense
MIT
