npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

a2a-redis

v0.1.0

Published

Redis integrations for the Agent-to-Agent (A2A) JavaScript/TypeScript SDK

Readme

a2a-redis

Redis integrations for the Agent-to-Agent (A2A) JavaScript/TypeScript SDK.

This package provides Redis-backed implementations of core A2A components for persistent task storage, reliable event queue management, and push notification configuration using Redis.

This package is the JavaScript/TypeScript implementation, inspired by the Python a2a-redis package.

Features

  • RedisTaskStore & RedisJSONTaskStore: Redis-backed task storage using hashes or JSON
  • RedisStreamsQueueManager & RedisStreamsEventQueue: Persistent, reliable event queues with consumer groups
  • RedisPubSubQueueManager & RedisPubSubEventQueue: Real-time, low-latency event broadcasting
  • RedisPushNotificationConfigStore: Task-based push notification configuration storage
  • Consumer Group Strategies for Streams: Flexible load balancing and instance isolation patterns
  • Next.js Serverless Ready: Built-in helpers for singleton Redis clients and route handlers

Language Support

This package is the JavaScript/TypeScript implementation. For Python, see the a2a-redis Python package.

Why separate implementations?

  • Each language has its own ecosystem and best practices
  • Python version: Designed for AsyncIO, Starlette, FastAPI
  • JavaScript version: Designed for Node.js, Next.js, serverless environments

Identical API patterns make it easy to use both versions interchangeably or migrate between languages.

Key JavaScript/TypeScript Advantages

  • Next.js Integration: Built-in helpers for serverless Redis clients and route handlers (nextjs-helpers)
  • TypeScript Strict Mode: Full type safety with strict type checking
  • ESM Support: Native ES modules, tree-shakeable imports
  • Singleton Pattern: Optimized Redis client pooling for serverless functions
  • No runtime overhead: Compile-time type checking catches errors early

Parallel API Between Languages

Both implementations share identical component names and patterns:

| Component | Python | TypeScript | |-----------|--------|-----------| | Task Storage | RedisTaskStore | RedisTaskStore | | JSON Storage | RedisJSONTaskStore | RedisJSONTaskStore | | Streams Queue | RedisStreamsQueueManager | RedisStreamsQueueManager | | Pub/Sub Queue | RedisPubSubQueueManager | RedisPubSubQueueManager | | Push Notifications | RedisPushNotificationConfigStore | RedisPushNotificationConfigStore | | Consumer Strategy | ConsumerGroupStrategy | ConsumerGroupStrategy |

Installation

npm install a2a-redis
# or with pnpm
pnpm add a2a-redis

Both redis and @a2a-js/sdk are peer dependencies and must be installed in your project.

Next.js Users

If you're integrating a2a-redis with Next.js, see the Next.js Integration Guide for:

  • Singleton Redis client setup for serverless
  • Route handler examples
  • Multi-turn conversation patterns
  • Production deployment tips

Quick Start

import { RedisTaskStore, RedisStreamsQueueManager, RedisPushNotificationConfigStore } from 'a2a-redis';
import { createRedisClient } from 'a2a-redis/utils';
import { DefaultRequestHandler } from 'a2a-sdk/server';
import { A2AExpressApplication } from 'a2a-sdk/server/express';

// Create Redis client with connection management
const redisClient = createRedisClient({
  url: 'redis://localhost:6379/0',
  maxConnections: 50,
});

// Initialize Redis components
const taskStore = new RedisTaskStore(redisClient, { prefix: 'myapp:tasks:' });
const queueManager = new RedisStreamsQueueManager(redisClient, { prefix: 'myapp:queues:' });
const pushConfigStore = new RedisPushNotificationConfigStore(redisClient, { prefix: 'myapp:push:' });

// Use with A2A request handler
const requestHandler = new DefaultRequestHandler({
  agentExecutor: yourAgentExecutor,
  taskStore,
  queueManager,
  pushConfigStore,
});

// Create A2A server
const server = new A2AExpressApplication({
  agentCard: yourAgentCard,
  httpHandler: requestHandler,
});

Connecting to Hosted Redis

For production deployments, you can connect to hosted Redis services like Redis Cloud, AWS ElastiCache, or Heroku Redis:

Using Redis URL

import { createRedisClient } from 'a2a-redis/utils';

// From environment variable
const redisClient = createRedisClient({
  url: process.env.REDIS_URL, // e.g., redis://user:password@host:port/db
  maxConnections: 50,
});

Using Host, Port, and Password

import { createRedisClient } from 'a2a-redis/utils';

// For Redis Cloud or similar hosted services
const redisClient = createRedisClient({
  host: 'your-redis-host.redis.cloud',
  port: 19XXX,
  password: process.env.REDIS_PASSWORD,
  tls: true, // Required for most hosted services
  maxConnections: 50,
});

Environment Variables

# .env file
REDIS_URL=redis://:your-password@your-host:19XXX/0
# OR
REDIS_HOST=your-redis-host.redis.cloud
REDIS_PORT=19XXX
REDIS_PASSWORD=your-password

Queue Components

The package provides both high-level queue managers and direct queue implementations:

Queue Managers

  • RedisStreamsQueueManager - Manages Redis Streams-based queues
  • RedisPubSubQueueManager - Manages Redis Pub/Sub-based queues
  • Both implement the A2A SDK's QueueManager interface

Event Queues

  • RedisStreamsEventQueue - Direct Redis Streams queue implementation
  • RedisPubSubEventQueue - Direct Redis Pub/Sub queue implementation
  • Both implement the EventQueue interface

Queue Types: Streams vs Pub/Sub

RedisStreamsQueueManager

Key Features:

  • Persistent storage: Events remain in streams until explicitly trimmed
  • Guaranteed delivery: Consumer groups with acknowledgments prevent message loss
  • Load balancing: Multiple consumers can share work via consumer groups
  • Failure recovery: Unacknowledged messages can be reclaimed by other consumers
  • Event replay: Historical events can be re-read from any point in time
  • Ordering: Maintains strict insertion order with unique message IDs

Use Cases:

  • Task event queues requiring reliability
  • Audit trails and event history
  • Work distribution systems
  • Systems requiring failure recovery
  • Multi-consumer load balancing

Trade-offs:

  • Higher memory usage (events persist)
  • More complex setup (consumer groups)
  • Slightly higher latency than pub/sub

RedisPubSubQueueManager

Key Features:

  • Real-time delivery: Events delivered immediately to active subscribers
  • No persistence: Events not stored, only delivered to active consumers
  • Fire-and-forget: No acknowledgments or delivery guarantees
  • Broadcasting: All subscribers receive all events
  • Low latency: Minimal overhead for immediate delivery
  • Minimal memory usage: No storage of events

Use Cases:

  • Live status updates and notifications
  • Real-time dashboard updates
  • System event broadcasting
  • Non-critical event distribution
  • Low-latency requirements
  • Simple fan-out scenarios

Not suitable for:

  • Critical event processing requiring guarantees
  • Systems requiring event replay or audit trails
  • Offline-capable applications
  • Work queues requiring load balancing

Components

Task Storage

RedisTaskStore

Stores task data in Redis using hashes with JSON serialization. Works with any Redis server.

import { RedisTaskStore } from 'a2a-redis';

const taskStore = new RedisTaskStore(redisClient, { prefix: 'mytasks:' });

// A2A TaskStore interface methods
await taskStore.save('task123', { status: 'pending', data: { key: 'value' } });
const task = await taskStore.get('task123');
const success = await taskStore.delete('task123');

// List all task IDs (utility method)
const taskIds = await taskStore.listTaskIds();

RedisJSONTaskStore

Stores task data using Redis's JSON module for native JSON operations and complex nested data.

import { RedisJSONTaskStore } from 'a2a-redis';

// Requires Redis 8 or RedisJSON module
const jsonTaskStore = new RedisJSONTaskStore(redisClient, { prefix: 'mytasks:' });

// Same interface as RedisTaskStore but with native JSON support
await jsonTaskStore.save('task123', { complex: { nested: { data: 'value' } } });

Task TTL/Expiration

Both RedisTaskStore and RedisJSONTaskStore support automatic expiration of task keys via Redis TTL. This is useful for ephemeral tasks that should not accumulate in Redis indefinitely.

import { RedisTaskStore } from 'a2a-redis';

// Create a task store with 30-minute task expiration
const taskStore = new RedisTaskStore(redisClient, {
  prefix: 'tasks:',
  ttl: 1800, // seconds (30 minutes)
});

// Each time save() is called, the TTL is reset
// Example: Task expires if not updated for 30 minutes
await taskStore.save({
  id: 'task123',
  status: 'pending',
  createdAt: new Date().toISOString(),
});

// Update task status after 10 minutes - TTL is reset, now expires in another 30 minutes
setTimeout(() => {
  taskStore.save({
    id: 'task123',
    status: 'completed',
    completedAt: new Date().toISOString(),
  });
}, 10 * 60 * 1000);

// Without TTL (default), tasks persist until manually deleted
const persistentStore = new RedisTaskStore(redisClient, { prefix: 'permanent:' });

TTL Benefits:

  • Prevents unbounded memory growth from accumulated completed tasks
  • Automatic cleanup without manual intervention
  • Configurable per task store
  • TTL refreshes on each task update (moving expiration window)

Default: No TTL (tasks persist forever) - TTL is opt-in

Task Store Return Types

Both RedisTaskStore and RedisJSONTaskStore implement async methods that return optional types. Type aliases are exported for convenience:

import type { TaskLoadResult } from 'a2a-redis';

// TaskLoadResult = Task | undefined
const task: TaskLoadResult = await taskStore.load('task-123');
if (task) {
  console.log(`Task status: ${task.status}`);
} else {
  console.log('Task not found or has expired');
}

Return types:

  • load(taskId) returns Promise<TaskLoadResult> (Task | undefined)
  • save(taskId, task) returns Promise<void>
  • delete(taskId) returns Promise<void>
  • listTaskIds() returns Promise<string[]>

Queue Managers

Both queue managers implement the A2A QueueManager interface with full async support:

import { RedisStreamsQueueManager, RedisPubSubQueueManager } from 'a2a-redis';
import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';

// For reliable, persistent processing
const streamsManager = new RedisStreamsQueueManager(redisClient, {
  prefix: 'myapp:streams:',
});

// For real-time, low-latency broadcasting
const pubsubManager = new RedisPubSubQueueManager(redisClient, {
  prefix: 'myapp:pubsub:',
});

// With custom consumer group configuration (streams only)
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});
const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });

async function main() {
  // Same interface for both managers
  const queue = await streamsManager.createOrTap('task123');

  // Enqueue events
  await queue.enqueueEvent({ type: 'progress', message: 'Task started' });
  await queue.enqueueEvent({ type: 'progress', message: '50% complete' });

  // Dequeue events
  try {
    const event = await queue.dequeueEvent({ noWait: true }); // Non-blocking
    console.log(`Got event: ${event}`);
    await queue.taskDone(); // Acknowledge the message (streams only)
  } catch (error) {
    console.log('No events available');
  }

  // Close queue when done
  await queue.close();
}

main();

Consumer Group Strategies

The Streams queue manager supports different consumer group strategies:

import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';

// Multiple instances share work across a single consumer group
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});

// Each instance gets its own consumer group
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.INSTANCE_ISOLATED,
});

// Custom consumer group name
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.CUSTOM,
  groupName: 'my_group',
});

const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });

RedisPushNotificationConfigStore

Stores push notification configurations per task. Implements the A2A PushNotificationConfigStore interface.

import { RedisPushNotificationConfigStore } from 'a2a-redis';
import { PushNotificationConfig } from 'a2a-sdk';

const configStore = new RedisPushNotificationConfigStore(redisClient, {
  prefix: 'myapp:push:',
});

// Create push notification config
const config = new PushNotificationConfig({
  url: 'https://webhook.example.com/notify',
  token: 'secret_token',
  id: 'webhook_1',
});

// A2A interface methods
await configStore.setInfo('task123', config);

// Get all configs for a task
const configs = await configStore.getInfo('task123');
for (const config of configs) {
  console.log(`Config ${config.id}: ${config.url}`);
}

// Delete specific config or all configs for a task
await configStore.deleteInfo('task123', 'webhook_1'); // Delete specific
await configStore.deleteInfo('task123'); // Delete all

RedisContextStore

Manages conversation contexts for multi-turn interactions. Groups related tasks and provides history retrieval.

import { RedisContextStore } from 'a2a-redis';

// Create a context store with 1-hour expiration
const contextStore = new RedisContextStore(redisClient, {
  prefix: 'contexts:',
  ttl: 3600, // optional: auto-cleanup after 1 hour
});

// Create a context for a new conversation
await contextStore.createContext('conv-user-123', {
  userId: 'user-123',
  agent: 'travel-agent',
  sessionId: 'session-abc',
});

// Add tasks as they're created in the conversation
await contextStore.addTaskToContext('conv-user-123', 'task-1');
await contextStore.addTaskToContext('conv-user-123', 'task-2');
await contextStore.addTaskToContext('conv-user-123', 'task-3');

// Retrieve context to see all conversation tasks
const context = await contextStore.getContext('conv-user-123');
console.log(`Conversation has ${context?.taskIds.length} tasks`);
console.log(`Started at: ${context?.createdAt}`);
console.log(`User: ${context?.metadata?.userId}`);

// Get just the task IDs
const taskIds = await contextStore.getContextTasks('conv-user-123');
for (const taskId of taskIds) {
  const task = await taskStore.load(taskId);
  console.log(`Task ${taskId} status: ${task?.status}`);
}

// Clean up when conversation ends
await contextStore.deleteContext('conv-user-123');

Context Features:

  • Multi-turn support: Group related tasks from a single conversation
  • History retrieval: Get all tasks in a conversation
  • Metadata storage: Store user/session info with the context
  • TTL-based cleanup: Automatic expiration of old conversations
  • Task isolation: Tasks added to context without duplicates

Default: No TTL (contexts persist) - TTL is opt-in

Context Store Return Types

The RedisContextStore implements async methods with optional return types. Type aliases are exported for convenience:

import type { ContextLookupResult } from 'a2a-redis';

// ContextLookupResult = Context | undefined
const context: ContextLookupResult = await contextStore.getContext('conv-user-123');
if (context) {
  console.log(`Context created at: ${context.createdAt}`);
  console.log(`Tasks in context: ${context.taskIds.length}`);
} else {
  console.log('Context not found or has expired');
}

Return types:

  • getContext(contextId) returns Promise<ContextLookupResult> (Context | undefined)
  • createContext(contextId, metadata?) returns Promise<void>
  • addTaskToContext(contextId, taskId) returns Promise<void>
  • getContextTasks(contextId) returns Promise<string[]>
  • deleteContext(contextId) returns Promise<void>

Requirements

Required

  • Node.js 18+
  • TypeScript 5.0+ (if using TypeScript)
  • redis (peer dependency) - redis.io >= 4.0.0 (the official Redis client)
  • @a2a-js/sdk (peer dependency) >= 0.3.4

Both redis and @a2a-js/sdk must be installed in your project:

npm install redis @a2a-js/sdk a2a-redis
# or with pnpm
pnpm add redis @a2a-js/sdk a2a-redis

Optional

  • RedisJSON module for RedisJSONTaskStore (enhanced nested data support)
  • Redis Stack or Redis with modules for full feature support

Development

# Install dependencies
npm install

# Run tests
npm test

# Run tests with coverage
npm run test:coverage

# Type checking
npm run type-check

# Linting and formatting
npm run lint
npm run format

# Run examples
npm run example:basic
npm run example:agent

Testing

Tests use Redis database 15 for isolation and include both unit and integration tests:

# Run all tests
npm test

# Run specific test file
npm test -- task-store.test.ts

# Run with coverage
npm run test:coverage

Test Architecture

Tests are designed to run efficiently with limited Redis connections, particularly important for free-tier hosted Redis (e.g., Redis Cloud free tier which allows only 1 concurrent connection):

  • Shared Redis Client: All tests use a single Redis client initialized in tests/setup.ts, reducing connection overhead
  • Single-threaded Execution: Tests run serially (maxWorkers: 1) to prevent connection conflicts
  • Database Isolation: Tests use Redis database 15, separate from development/production databases
  • Automatic Cleanup: cleanupTestRedis() flushes database between tests

Pub/Sub Tests (Skipped)

Pub/Sub integration tests are skipped by default because they require duplicate connections:

  • Redis Pub/Sub requires a separate connection for subscriptions (client.duplicate())
  • Free-tier Redis (1 connection limit) cannot support both main + pub/sub connections
  • Unit tests for Pub/Sub still run with mock clients
  • To run pub/sub integration tests, use a Redis instance with higher connection limits

Running Tests with Redis Cloud Free Tier

The test suite is optimized for Redis Cloud free tier:

  1. Environment Setup

    # Add to .env
    REDIS_URL=redis://user:password@host:port/db
  2. Run Tests

    npm test
  3. Expected Output

    • ~200+ tests passing (148+ integration tests)
    • ~30 pub/sub tests skipped (due to connection limits)
    • ~17 failing tests (pre-existing, unrelated to connection limit)

The shared client pattern allows efficient testing even with connection constraints.

License

MIT