a2a-redis
v0.1.0
Published
Redis integrations for the Agent-to-Agent (A2A) JavaScript/TypeScript SDK
Maintainers
Readme
a2a-redis
Redis integrations for the Agent-to-Agent (A2A) JavaScript/TypeScript SDK.
This package provides Redis-backed implementations of core A2A components for persistent task storage, reliable event queue management, and push notification configuration using Redis.
This package is the JavaScript/TypeScript implementation, inspired by the Python a2a-redis package.
Features
- RedisTaskStore & RedisJSONTaskStore: Redis-backed task storage using hashes or JSON
- RedisStreamsQueueManager & RedisStreamsEventQueue: Persistent, reliable event queues with consumer groups
- RedisPubSubQueueManager & RedisPubSubEventQueue: Real-time, low-latency event broadcasting
- RedisPushNotificationConfigStore: Task-based push notification configuration storage
- Consumer Group Strategies for Streams: Flexible load balancing and instance isolation patterns
- Next.js Serverless Ready: Built-in helpers for singleton Redis clients and route handlers
Language Support
This package is the JavaScript/TypeScript implementation. For Python, see the a2a-redis Python package.
Why separate implementations?
- Each language has its own ecosystem and best practices
- Python version: Designed for AsyncIO, Starlette, FastAPI
- JavaScript version: Designed for Node.js, Next.js, serverless environments
Identical API patterns make it easy to use both versions interchangeably or migrate between languages.
Key JavaScript/TypeScript Advantages
- Next.js Integration: Built-in helpers for serverless Redis clients and route handlers (
nextjs-helpers) - TypeScript Strict Mode: Full type safety with strict type checking
- ESM Support: Native ES modules, tree-shakeable imports
- Singleton Pattern: Optimized Redis client pooling for serverless functions
- No runtime overhead: Compile-time type checking catches errors early
Parallel API Between Languages
Both implementations share identical component names and patterns:
| Component | Python | TypeScript |
|-----------|--------|-----------|
| Task Storage | RedisTaskStore | RedisTaskStore |
| JSON Storage | RedisJSONTaskStore | RedisJSONTaskStore |
| Streams Queue | RedisStreamsQueueManager | RedisStreamsQueueManager |
| Pub/Sub Queue | RedisPubSubQueueManager | RedisPubSubQueueManager |
| Push Notifications | RedisPushNotificationConfigStore | RedisPushNotificationConfigStore |
| Consumer Strategy | ConsumerGroupStrategy | ConsumerGroupStrategy |
Installation
npm install a2a-redis
# or with pnpm
pnpm add a2a-redisBoth redis and @a2a-js/sdk are peer dependencies and must be installed in your project.
Next.js Users
If you're integrating a2a-redis with Next.js, see the Next.js Integration Guide for:
- Singleton Redis client setup for serverless
- Route handler examples
- Multi-turn conversation patterns
- Production deployment tips
Quick Start
import { RedisTaskStore, RedisStreamsQueueManager, RedisPushNotificationConfigStore } from 'a2a-redis';
import { createRedisClient } from 'a2a-redis/utils';
import { DefaultRequestHandler } from 'a2a-sdk/server';
import { A2AExpressApplication } from 'a2a-sdk/server/express';
// Create Redis client with connection management
const redisClient = createRedisClient({
url: 'redis://localhost:6379/0',
maxConnections: 50,
});
// Initialize Redis components
const taskStore = new RedisTaskStore(redisClient, { prefix: 'myapp:tasks:' });
const queueManager = new RedisStreamsQueueManager(redisClient, { prefix: 'myapp:queues:' });
const pushConfigStore = new RedisPushNotificationConfigStore(redisClient, { prefix: 'myapp:push:' });
// Use with A2A request handler
const requestHandler = new DefaultRequestHandler({
agentExecutor: yourAgentExecutor,
taskStore,
queueManager,
pushConfigStore,
});
// Create A2A server
const server = new A2AExpressApplication({
agentCard: yourAgentCard,
httpHandler: requestHandler,
});Connecting to Hosted Redis
For production deployments, you can connect to hosted Redis services like Redis Cloud, AWS ElastiCache, or Heroku Redis:
Using Redis URL
import { createRedisClient } from 'a2a-redis/utils';
// From environment variable
const redisClient = createRedisClient({
url: process.env.REDIS_URL, // e.g., redis://user:password@host:port/db
maxConnections: 50,
});Using Host, Port, and Password
import { createRedisClient } from 'a2a-redis/utils';
// For Redis Cloud or similar hosted services
const redisClient = createRedisClient({
host: 'your-redis-host.redis.cloud',
port: 19XXX,
password: process.env.REDIS_PASSWORD,
tls: true, // Required for most hosted services
maxConnections: 50,
});Environment Variables
# .env file
REDIS_URL=redis://:your-password@your-host:19XXX/0
# OR
REDIS_HOST=your-redis-host.redis.cloud
REDIS_PORT=19XXX
REDIS_PASSWORD=your-passwordQueue Components
The package provides both high-level queue managers and direct queue implementations:
Queue Managers
RedisStreamsQueueManager- Manages Redis Streams-based queuesRedisPubSubQueueManager- Manages Redis Pub/Sub-based queues- Both implement the A2A SDK's
QueueManagerinterface
Event Queues
RedisStreamsEventQueue- Direct Redis Streams queue implementationRedisPubSubEventQueue- Direct Redis Pub/Sub queue implementation- Both implement the
EventQueueinterface
Queue Types: Streams vs Pub/Sub
RedisStreamsQueueManager
Key Features:
- Persistent storage: Events remain in streams until explicitly trimmed
- Guaranteed delivery: Consumer groups with acknowledgments prevent message loss
- Load balancing: Multiple consumers can share work via consumer groups
- Failure recovery: Unacknowledged messages can be reclaimed by other consumers
- Event replay: Historical events can be re-read from any point in time
- Ordering: Maintains strict insertion order with unique message IDs
Use Cases:
- Task event queues requiring reliability
- Audit trails and event history
- Work distribution systems
- Systems requiring failure recovery
- Multi-consumer load balancing
Trade-offs:
- Higher memory usage (events persist)
- More complex setup (consumer groups)
- Slightly higher latency than pub/sub
RedisPubSubQueueManager
Key Features:
- Real-time delivery: Events delivered immediately to active subscribers
- No persistence: Events not stored, only delivered to active consumers
- Fire-and-forget: No acknowledgments or delivery guarantees
- Broadcasting: All subscribers receive all events
- Low latency: Minimal overhead for immediate delivery
- Minimal memory usage: No storage of events
Use Cases:
- Live status updates and notifications
- Real-time dashboard updates
- System event broadcasting
- Non-critical event distribution
- Low-latency requirements
- Simple fan-out scenarios
Not suitable for:
- Critical event processing requiring guarantees
- Systems requiring event replay or audit trails
- Offline-capable applications
- Work queues requiring load balancing
Components
Task Storage
RedisTaskStore
Stores task data in Redis using hashes with JSON serialization. Works with any Redis server.
import { RedisTaskStore } from 'a2a-redis';
const taskStore = new RedisTaskStore(redisClient, { prefix: 'mytasks:' });
// A2A TaskStore interface methods
await taskStore.save('task123', { status: 'pending', data: { key: 'value' } });
const task = await taskStore.get('task123');
const success = await taskStore.delete('task123');
// List all task IDs (utility method)
const taskIds = await taskStore.listTaskIds();RedisJSONTaskStore
Stores task data using Redis's JSON module for native JSON operations and complex nested data.
import { RedisJSONTaskStore } from 'a2a-redis';
// Requires Redis 8 or RedisJSON module
const jsonTaskStore = new RedisJSONTaskStore(redisClient, { prefix: 'mytasks:' });
// Same interface as RedisTaskStore but with native JSON support
await jsonTaskStore.save('task123', { complex: { nested: { data: 'value' } } });Task TTL/Expiration
Both RedisTaskStore and RedisJSONTaskStore support automatic expiration of task keys via Redis TTL. This is useful for ephemeral tasks that should not accumulate in Redis indefinitely.
import { RedisTaskStore } from 'a2a-redis';
// Create a task store with 30-minute task expiration
const taskStore = new RedisTaskStore(redisClient, {
prefix: 'tasks:',
ttl: 1800, // seconds (30 minutes)
});
// Each time save() is called, the TTL is reset
// Example: Task expires if not updated for 30 minutes
await taskStore.save({
id: 'task123',
status: 'pending',
createdAt: new Date().toISOString(),
});
// Update task status after 10 minutes - TTL is reset, now expires in another 30 minutes
setTimeout(() => {
taskStore.save({
id: 'task123',
status: 'completed',
completedAt: new Date().toISOString(),
});
}, 10 * 60 * 1000);
// Without TTL (default), tasks persist until manually deleted
const persistentStore = new RedisTaskStore(redisClient, { prefix: 'permanent:' });TTL Benefits:
- Prevents unbounded memory growth from accumulated completed tasks
- Automatic cleanup without manual intervention
- Configurable per task store
- TTL refreshes on each task update (moving expiration window)
Default: No TTL (tasks persist forever) - TTL is opt-in
Task Store Return Types
Both RedisTaskStore and RedisJSONTaskStore implement async methods that return optional types. Type aliases are exported for convenience:
import type { TaskLoadResult } from 'a2a-redis';
// TaskLoadResult = Task | undefined
const task: TaskLoadResult = await taskStore.load('task-123');
if (task) {
console.log(`Task status: ${task.status}`);
} else {
console.log('Task not found or has expired');
}Return types:
load(taskId)returnsPromise<TaskLoadResult>(Task | undefined)save(taskId, task)returnsPromise<void>delete(taskId)returnsPromise<void>listTaskIds()returnsPromise<string[]>
Queue Managers
Both queue managers implement the A2A QueueManager interface with full async support:
import { RedisStreamsQueueManager, RedisPubSubQueueManager } from 'a2a-redis';
import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';
// For reliable, persistent processing
const streamsManager = new RedisStreamsQueueManager(redisClient, {
prefix: 'myapp:streams:',
});
// For real-time, low-latency broadcasting
const pubsubManager = new RedisPubSubQueueManager(redisClient, {
prefix: 'myapp:pubsub:',
});
// With custom consumer group configuration (streams only)
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});
const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });
async function main() {
// Same interface for both managers
const queue = await streamsManager.createOrTap('task123');
// Enqueue events
await queue.enqueueEvent({ type: 'progress', message: 'Task started' });
await queue.enqueueEvent({ type: 'progress', message: '50% complete' });
// Dequeue events
try {
const event = await queue.dequeueEvent({ noWait: true }); // Non-blocking
console.log(`Got event: ${event}`);
await queue.taskDone(); // Acknowledge the message (streams only)
} catch (error) {
console.log('No events available');
}
// Close queue when done
await queue.close();
}
main();Consumer Group Strategies
The Streams queue manager supports different consumer group strategies:
import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';
// Multiple instances share work across a single consumer group
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});
// Each instance gets its own consumer group
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.INSTANCE_ISOLATED,
});
// Custom consumer group name
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.CUSTOM,
groupName: 'my_group',
});
const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });RedisPushNotificationConfigStore
Stores push notification configurations per task. Implements the A2A PushNotificationConfigStore interface.
import { RedisPushNotificationConfigStore } from 'a2a-redis';
import { PushNotificationConfig } from 'a2a-sdk';
const configStore = new RedisPushNotificationConfigStore(redisClient, {
prefix: 'myapp:push:',
});
// Create push notification config
const config = new PushNotificationConfig({
url: 'https://webhook.example.com/notify',
token: 'secret_token',
id: 'webhook_1',
});
// A2A interface methods
await configStore.setInfo('task123', config);
// Get all configs for a task
const configs = await configStore.getInfo('task123');
for (const config of configs) {
console.log(`Config ${config.id}: ${config.url}`);
}
// Delete specific config or all configs for a task
await configStore.deleteInfo('task123', 'webhook_1'); // Delete specific
await configStore.deleteInfo('task123'); // Delete allRedisContextStore
Manages conversation contexts for multi-turn interactions. Groups related tasks and provides history retrieval.
import { RedisContextStore } from 'a2a-redis';
// Create a context store with 1-hour expiration
const contextStore = new RedisContextStore(redisClient, {
prefix: 'contexts:',
ttl: 3600, // optional: auto-cleanup after 1 hour
});
// Create a context for a new conversation
await contextStore.createContext('conv-user-123', {
userId: 'user-123',
agent: 'travel-agent',
sessionId: 'session-abc',
});
// Add tasks as they're created in the conversation
await contextStore.addTaskToContext('conv-user-123', 'task-1');
await contextStore.addTaskToContext('conv-user-123', 'task-2');
await contextStore.addTaskToContext('conv-user-123', 'task-3');
// Retrieve context to see all conversation tasks
const context = await contextStore.getContext('conv-user-123');
console.log(`Conversation has ${context?.taskIds.length} tasks`);
console.log(`Started at: ${context?.createdAt}`);
console.log(`User: ${context?.metadata?.userId}`);
// Get just the task IDs
const taskIds = await contextStore.getContextTasks('conv-user-123');
for (const taskId of taskIds) {
const task = await taskStore.load(taskId);
console.log(`Task ${taskId} status: ${task?.status}`);
}
// Clean up when conversation ends
await contextStore.deleteContext('conv-user-123');Context Features:
- Multi-turn support: Group related tasks from a single conversation
- History retrieval: Get all tasks in a conversation
- Metadata storage: Store user/session info with the context
- TTL-based cleanup: Automatic expiration of old conversations
- Task isolation: Tasks added to context without duplicates
Default: No TTL (contexts persist) - TTL is opt-in
Context Store Return Types
The RedisContextStore implements async methods with optional return types. Type aliases are exported for convenience:
import type { ContextLookupResult } from 'a2a-redis';
// ContextLookupResult = Context | undefined
const context: ContextLookupResult = await contextStore.getContext('conv-user-123');
if (context) {
console.log(`Context created at: ${context.createdAt}`);
console.log(`Tasks in context: ${context.taskIds.length}`);
} else {
console.log('Context not found or has expired');
}Return types:
getContext(contextId)returnsPromise<ContextLookupResult>(Context | undefined)createContext(contextId, metadata?)returnsPromise<void>addTaskToContext(contextId, taskId)returnsPromise<void>getContextTasks(contextId)returnsPromise<string[]>deleteContext(contextId)returnsPromise<void>
Requirements
Required
- Node.js 18+
- TypeScript 5.0+ (if using TypeScript)
- redis (peer dependency) - redis.io >= 4.0.0 (the official Redis client)
- @a2a-js/sdk (peer dependency) >= 0.3.4
Both redis and @a2a-js/sdk must be installed in your project:
npm install redis @a2a-js/sdk a2a-redis
# or with pnpm
pnpm add redis @a2a-js/sdk a2a-redisOptional
- RedisJSON module for
RedisJSONTaskStore(enhanced nested data support) - Redis Stack or Redis with modules for full feature support
Development
# Install dependencies
npm install
# Run tests
npm test
# Run tests with coverage
npm run test:coverage
# Type checking
npm run type-check
# Linting and formatting
npm run lint
npm run format
# Run examples
npm run example:basic
npm run example:agentTesting
Tests use Redis database 15 for isolation and include both unit and integration tests:
# Run all tests
npm test
# Run specific test file
npm test -- task-store.test.ts
# Run with coverage
npm run test:coverageTest Architecture
Tests are designed to run efficiently with limited Redis connections, particularly important for free-tier hosted Redis (e.g., Redis Cloud free tier which allows only 1 concurrent connection):
- Shared Redis Client: All tests use a single Redis client initialized in
tests/setup.ts, reducing connection overhead - Single-threaded Execution: Tests run serially (
maxWorkers: 1) to prevent connection conflicts - Database Isolation: Tests use Redis database 15, separate from development/production databases
- Automatic Cleanup:
cleanupTestRedis()flushes database between tests
Pub/Sub Tests (Skipped)
Pub/Sub integration tests are skipped by default because they require duplicate connections:
- Redis Pub/Sub requires a separate connection for subscriptions (
client.duplicate()) - Free-tier Redis (1 connection limit) cannot support both main + pub/sub connections
- Unit tests for Pub/Sub still run with mock clients
- To run pub/sub integration tests, use a Redis instance with higher connection limits
Running Tests with Redis Cloud Free Tier
The test suite is optimized for Redis Cloud free tier:
Environment Setup
# Add to .env REDIS_URL=redis://user:password@host:port/dbRun Tests
npm testExpected Output
- ~200+ tests passing (148+ integration tests)
- ~30 pub/sub tests skipped (due to connection limits)
- ~17 failing tests (pre-existing, unrelated to connection limit)
The shared client pattern allows efficient testing even with connection constraints.
License
MIT
