npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@cloudamqp/a2a-amqp

v0.1.1

Published

AMQP EventBus adapter for A2A protocol with LavinMQ streams support

Readme

a2a-amqp

AMQP-backed EventBus and WorkQueue for scaling A2A agents with long-running tasks.

Why?

A2A agents often need to handle long-running tasks (LLM calls, complex processing, etc.). Running these tasks inline in HTTP handlers causes:

  • Timeout issues: HTTP connections timeout on long tasks
  • Scaling problems: Single server bottlenecks
  • Resource waste: Servers blocked waiting for tasks to complete

This library solves these problems by:

  1. Queuing tasks via AMQP instead of processing inline
  2. Distributing work across multiple worker processes
  3. Event sourcing all task events for replay and recovery
  4. Streaming results back via SSE while workers process in the background

Architecture

HTTP Request → Server (enqueues task) → Returns immediately
                    ↓
               AMQP Queue
                    ↓
          Worker Pool (scales horizontally)
                    ↓
          Process task & publish events
                    ↓
          AMQP Stream (event sourcing)
                    ↓
          Client streams results via SSE

Installation

# Installing using bun
bun add @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client

# Or with npm
npm install @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client

Requires: LavinMQ or RabbitMQ with stream support

docker run -d -p 5672:5672 -p 15672:15672 cloudamqp/lavinmq:latest

Quick Start

1. HTTP Server (enqueues tasks)

import { AMQPAgentBackend, QueuingRequestHandler } from "@84codes/a2a-amqp";
import { A2AExpressApp } from "@a2a-js/sdk/server/express";
import express from "express";

// Create AMQP backend
const backend = await AMQPAgentBackend.create({
  url: "amqp://localhost:5672",
  agentName: "my-agent",
});

// Create request handler (handles task queuing + event projection)
const requestHandler = new QueuingRequestHandler(agentCard, backend);
await requestHandler.initialize();

// Setup Express with A2A routes
const app = express();
new A2AExpressApp(requestHandler).setupRoutes(app, "/");
app.listen(3000);

2. Worker Process (processes tasks)

import { AMQPAgentBackend, WorkerEventBus } from "@84codes/a2a-amqp";
import { AgentExecutor, RequestContext } from "@a2a-js/sdk/server";

// Create backend with same agent name as server
const backend = await AMQPAgentBackend.create({
  url: "amqp://localhost:5672",
  agentName: "my-agent",
});

// Initialize work queue
await backend.workQueue.initialize();

class MyExecutor implements AgentExecutor {
  async execute(context: RequestContext, eventBus: ExecutionEventBus) {
    // Your long-running task logic here
    eventBus.publish({
      kind: "status-update",
      taskId: context.taskId,
      contextId: context.contextId,
      status: { state: "working", timestamp: new Date().toISOString() },
      final: false,
    });

    // ... do work ...

    eventBus.publish({
      kind: "status-update",
      taskId: context.taskId,
      contextId: context.contextId,
      status: { state: "completed", timestamp: new Date().toISOString() },
      final: true,
    });
    eventBus.finished();
  }
}

const executor = new MyExecutor();

// Start consuming with async generator pattern
const messages = backend.workQueue.start();

for await (const taskMessage of messages) {
  const { taskId, contextId, requestContext } = taskMessage;

  // Create request context
  const context = new RequestContext(
    requestContext.userMessage,
    taskId,
    contextId,
    requestContext.task,
    requestContext.referenceTasks
  );

  // Create event bus for publishing task events
  const eventBus = new WorkerEventBus(backend.amqpConnection, taskId, contextId);

  // Execute task
  await executor.execute(context, eventBus);
}

3. Scale horizontally

Run multiple workers to process tasks in parallel:

# Terminal 1: HTTP Server
bun run server

# Terminal 2-N: Workers (scale as needed)
bun run worker
bun run worker  # Add more workers for higher throughput

Features

  • Work Queue: Distribute tasks across multiple worker processes
  • Event Sourcing: All task events stored in AMQP streams for replay
  • In-Memory Projection: Fast task lookups with automatic recovery from streams
  • SSE Streaming: Automatic streaming of task events back to clients
  • Horizontal Scaling: Add more workers to increase throughput
  • Graceful Shutdown: Clean consumer and connection handling
  • Type-Safe: Full TypeScript support with Zod validation

Configuration

interface AMQPAgentBackendConfig {
  url: string;                    // AMQP broker URL
  agentName: string;              // Agent identifier
  streamRetention?: string;       // Event retention (default: "7d")
  streamMaxBytes?: number;        // Max stream size (default: 1GB)
  workQueueName?: string;         // Custom work queue name
  exchangeName?: string;          // Custom exchange name
  logger?: Logger;                // Custom logger
  connection?: {
    heartbeat?: number;           // Heartbeat interval in seconds
    reconnectDelay?: number;      // Reconnection delay in ms
    maxReconnectAttempts?: number;// Max reconnection attempts
  };
  publishing?: {
    persistent?: boolean;         // Persistent messages (default: true)
    confirmMode?: boolean;        // Publisher confirms (default: true)
    messageTtl?: number;          // Message TTL in ms (0 = no expiration)
  };
}

Examples

See complete working examples:

  • src/examples/http-server.ts - HTTP server with queuing
  • src/examples/worker.ts - Worker process
# Run the example
bun run server  # Terminal 1
bun run worker  # Terminal 2

# Send a request
curl -X POST http://localhost:3000/ \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"messages/send","params":{"message":{"kind":"message","role":"user","messageId":"1","contextId":"ctx-1","parts":[{"kind":"text","text":"Hello"}]}}}'

Testing

bun run test             # Run all tests (unit + integration)
bun run test:unit        # Unit tests only
bun run test:integration # Integration tests only
bun run test:watch       # Watch mode
bun run test:coverage    # With coverage

License

MIT