@multiplayer-app/ai-agent-node
v0.1.0-beta.12
Published
AI Agent Node.js library for multiplayer ai agents
Readme
@multiplayer-app/ai-agent-node
A Node.js library for building AI agent backends with support for multi-provider AI models, real-time communication, distributed processing, and chat management.
Features
- Multi-Provider AI Support: OpenAI, Anthropic, Google, and OpenRouter
- Streaming Chat Processing: Real-time message streaming with tool calling support
- Distributed Architecture: Kafka and Redis integration for scalable agent processing
- Real-time Communication: Socket.io integration for live updates
- Agent Process Management: Lifecycle management with event-driven architecture
- Artifact Storage: S3 integration for storing generated artifacts
- Context Management: Intelligent context limiting and attachment handling
- Built-in Tools: Form value proposal
Installation
npm install @multiplayer-app/ai-agent-node ai Prerequisites
- Node.js >= 18
- npm >= 8
Infrastructure Setup
The library requires several infrastructure services to function properly. These services can be started using Docker Compose or configured separately.
Starting Services
Use the startServices() function to initialize all required services:
import { startServices } from '@multiplayer-app/ai-agent-node';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
// Initialize repositories (example with MongoDB)
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
// Start all services
await startServices(chatRepository, messageRepository);This function initializes:
- Kafka Service: Connects to Kafka brokers and subscribes to topics for chat title generation and background processing
- Agent Store: Initializes the in-memory agent process store for managing active agent conversations
- Model Store: Fetches and caches available AI models from configured providers
- S3 Bucket: Ensures the configured S3 bucket exists for artifact storage
Stopping Services
Use the stopServices() function to gracefully shut down connections:
import { stopServices } from '@multiplayer-app/ai-agent-node';
// Stop all services
await stopServices();This disconnects:
- Redis connections (including pub/sub clients)
- Kafka consumer and producer connections
Infrastructure Services
Kafka
Purpose: Kafka is used for asynchronous, distributed processing of agent tasks:
- Chat Title Generation: Generates chat titles asynchronously after the first message to avoid blocking the main request
- Background Processing: Handles long-running agent tasks that don't need immediate response
- Scalability: Allows multiple service instances to process tasks in parallel
Topics:
chat-title-generation(default): Processes chat title generation requestsbackground-chat-processing(default): Handles background agent processing tasks
Redis
Purpose: Redis provides:
- Pub/Sub for Socket.IO: Enables real-time message broadcasting across multiple service instances
- Agent State Management: Stores temporary agent process state and event listeners
- Caching: Can be used for caching model information and other frequently accessed data
Connection: Redis is used for both direct key-value operations and pub/sub channels for Socket.IO adapter.
Docker Compose Setup
The project includes a docker-compose.yml file that sets up all required services:
# Start all services
docker-compose up -d
# Stop all services
docker-compose downThis starts:
- MongoDB (port 27017): Database for chat and message storage
- Redis (port 6379): Pub/sub and caching
- Zookeeper (port 2181): Required for Kafka
- Kafka (port 9092): Message broker
- MinIO (ports 9000, 9001): S3-compatible object storage for artifacts
Environment Variables
Configure the library using the following environment variables:
Redis Configuration
# Redis connection URL (alternative to host/port)
REDIS_URL=redis://localhost:6379
# Or use individual settings
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password # Optional
REDIS_DATABASE=0 # Optional, defaults to 0AI Provider API Keys
At least one provider API key is required:
# OpenAI
OPENAI_API_KEY=sk-...
# Anthropic
ANTHROPIC_API_KEY=sk-ant-...
# Google
GOOGLE_GENERATIVE_AI_API_KEY=...
# OpenRouter (supports multiple providers)
OPENROUTER_API_KEY=sk-or-...AI Configuration
# Maximum number of messages to include in context (default: 10)
MAX_CONTEXT_MESSAGES=10
# Default model to use if none specified
DEFAULT_MODEL=openai/gpt-4oS3 Configuration
# S3 bucket name for storing attachments and artifacts
S3_ATTACHMENTS_BUCKET=ai-agent-attachmentsKafka Configuration
# Kafka consumer group ID
KAFKA_GROUP_ID=ai-agent-node
# Kafka topics
KAFKA_CHAT_TITLE_GENERATION_TOPIC=chat-title-generation
KAFKA_BACKGROUND_CHAT_PROCESSING_TOPIC=background-chat-processingFile Processing (Optional)
# Enable text extraction from documents (default: true)
ENABLE_TEXT_EXTRACTION=true
# Maximum size for extracted text in bytes (default: 51200)
MAX_EXTRACTED_TEXT_SIZE=51200Usage Examples
Basic Chat Processing
The ChatProcessor class handles all chat operations. The most common use case is creating a streaming message:
import { ChatProcessor } from '@multiplayer-app/ai-agent-node';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
import { ArtifactStore } from '@multiplayer-app/ai-agent-node';
// Initialize repositories
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
const artifactStore = new ArtifactStore();
// Create processor
const chatProcessor = new ChatProcessor(
chatRepository,
messageRepository,
artifactStore
);
// Create a streaming message
const stream = await chatProcessor.createMessageStream({
content: 'Hello, how can you help me?',
contextKey: 'support',
userId: 'user-123'
});
// Stream is a Node.js PassThrough stream that emits SSE-formatted events
stream.on('data', (chunk) => {
// Process SSE chunk: "data: {...}\n\n"
const data = chunk.toString();
// Parse and handle stream chunks
});Stream Chunk Format
The stream emits Server-Sent Events (SSE) with the following chunk types:
// Chat metadata (first chunk for new chats)
{
type: 'chat',
chatId: 'chat-123',
chat: { /* AgentChat object */ }
}
// Message updates
{
type: 'message',
message: { /* AgentMessage object */ }
}
// Errors
{
type: 'error',
error: 'Error message'
}
// Stream end marker
[DONE]Listing and Getting Chats
// List chats with filters
const chats = await chatProcessor.listChats({
contextKey: 'support',
userId: 'user-123',
limit: 20,
sortField: 'updatedAt',
sortOrder: SortOrder.Desc
});
// Get a specific chat with messages
const chat = await chatProcessor.getChat('chat-id');
// Delete a chat
await chatProcessor.deleteChat('chat-id');Agent Setup
Agents are configured using a JSON configuration file that defines agents, their tools, and which contexts they're available in.
Config JSON Format
{
"models": ["gpt-4o", "claude-3-5-sonnet", "openai/gpt-4"],
"agents": [
{
"name": "support-agent",
"description": "Customer support agent",
"systemPrompt": "You are a helpful customer support agent...",
"defaultModel": "gpt-4o",
"temperature": 0.7,
"maxTokens": 2000,
"contextKeys": ["support", "general"],
"tools": [
{
"type": "api-tool",
"data": {
"title": "Get Order Status",
"description": "Retrieves the status of a customer order",
"method": "GET",
"url": "https://api.example.com/orders/{orderId}",
"headersToPass": ["Authorization"],
"needsApproval": false
}
},
{
"type": "web-search",
"data": {
"title": "web-search"
}
}
]
}
]
}Loading Config
import { ConfigStore } from '@multiplayer-app/ai-agent-node';
import fs from 'fs';
const configStore = ConfigStore.getInstance();
const rawConfig = JSON.parse(fs.readFileSync('agent-config.json', 'utf-8'));
configStore.loadConfig(rawConfig);API Tool Configuration
API tools allow agents to make HTTP requests to external services:
{
"type": "api-tool",
"data": {
"title": "Tool Name",
"description": "What this tool does",
"method": "GET|POST|PUT|DELETE|PATCH",
"url": "https://api.example.com/endpoint",
"headersToPass": ["Authorization", "X-Custom-Header"],
"body": {
"type": "object",
"properties": {
"param1": { "type": "string" },
"param2": { "type": "number" }
},
"required": ["param1"]
},
"queryParams": {
"type": "object",
"properties": {
"filter": { "type": "string" }
}
},
"needsApproval": true
}
}Key Fields:
needsApproval: Iftrue, the tool execution requires user approval before runningheadersToPass: List of request headers to forward from the original requestbody/queryParams: JSON Schema definitions for request parameters (converted to Zod schemas)
Web Search Tool
Web search tools enable agents to search the internet:
{
"type": "web-search",
"data": {
"title": "web-search"
}
}In-Code Agent Support
You can also add agents programmatically:
import { ConfigStore } from '@multiplayer-app/ai-agent-node';
import { AgentToolType } from '@multiplayer-app/ai-agent-types';
const configStore = ConfigStore.getInstance();
// Add an agent
configStore.addAgent(['support', 'sales'], {
name: 'custom-agent',
description: 'Custom agent added in code',
systemPrompt: 'You are a helpful assistant...',
defaultModel: 'gpt-4o',
temperature: 0.7,
tools: [
{
type: AgentToolType.WEB_SEARCH,
data: { title: 'web-search' }
}
]
});
// Add a tool to all existing agents
configStore.addToolToAllAgents({
type: AgentToolType.LOCAL_FUNCTION,
data: {
title: 'custom-tool',
needsApproval: false,
// ... tool implementation
}
});Tool Approval Flow
When a tool has needsApproval: true, the agent will request approval before execution:
- Agent generates a tool call with
requiresConfirmation: trueand anapprovalId - Chat status changes to
AgentStatus.WaitingForUserAction - User can approve or deny the tool call
- If approved, the tool executes; if denied, execution is skipped
// Process approval using createMessageStream (handles chat lookup internally)
const stream = await chatProcessor.createMessageStream({
chatId: 'chat-id',
messageId: 'message-id',
approvalId: 'approval-id',
approved: true,
reason: 'User approved'
});
// Or use streamMessage directly if you already have the chat object
const chat = await chatProcessor.getChat('chat-id');
await chatProcessor.streamMessage(chat, {
chatId: 'chat-id',
messageId: 'message-id',
approvalId: 'approval-id',
approved: true,
reason: 'User approved'
});User Attachments
Messages can include attachments for context:
import { AgentAttachmentType } from '@multiplayer-app/ai-agent-types';
const stream = await chatProcessor.createMessageStream({
content: 'Analyze this document',
contextKey: 'support',
attachments: [
{
id: 'att-1',
type: AgentAttachmentType.File,
name: 'document.pdf',
url: 'https://s3.../document.pdf',
mimeType: 'application/pdf',
size: 1024000
},
{
id: 'att-2',
type: AgentAttachmentType.Context,
name: 'Page Context',
metadata: {
kind: 'webSnippet',
source: { url: 'https://example.com' },
selectedText: 'Selected text from page'
}
}
]
});Database Access
The library uses repository interfaces from @multiplayer-app/ai-agent-db for database operations. You can use any implementation that conforms to these interfaces.
MongoDB Example
import mongo from '@multiplayer-app/ai-agent-mongo';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
// Connect to MongoDB
await mongo.connect();
// Create repositories
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
// Use with ChatProcessor
const chatProcessor = new ChatProcessor(
chatRepository,
messageRepository,
artifactStore
);Generic Repositories
The library works with any repository implementation that matches the interfaces:
import type {
AgentChatRepository,
AgentMessageRepository
} from '@multiplayer-app/ai-agent-db';
// Your custom implementation
class CustomChatRepository implements AgentChatRepository {
// Implement all required methods
}
class CustomMessageRepository implements AgentMessageRepository {
// Implement all required methods
}
// Use with ChatProcessor
const chatProcessor = new ChatProcessor(
new CustomChatRepository(),
new CustomMessageRepository(),
artifactStore
);Repository Methods
AgentChatRepository:
create(data): Create a new chatfindById(id): Find chat by IDupdate(id, data): Update chatdelete(id): Delete chatfindWithMessages(filter, options): Find chats with aggregated messages
AgentMessageRepository:
create(data): Create a new messagefindById(id): Find message by IDfindByChatId(chatId): Find all messages for a chatupdate(id, data): Update messagedelete(id): Delete message
Advanced Usage
Agent Process Management
Monitor and control agent processes:
import { agentStore, AgentProcessEventType } from '@multiplayer-app/ai-agent-node';
// Listen to agent process events
agentStore.addListener('chat-id', (event) => {
switch (event.type) {
case AgentProcessEventType.Update:
console.log('Agent updated:', event.data);
break;
case AgentProcessEventType.Finished:
console.log('Agent finished:', event.data);
break;
case AgentProcessEventType.Error:
console.error('Agent error:', event.data);
break;
}
});
// Stop an agent process
agentStore.stopAgentProcess('chat-id');Built-in Tools
The library includes built-in tools:
import {
createProposeFormValuesTool,
createGenerateChartTool
} from '@multiplayer-app/ai-agent-node';
// Add form proposal tool to all agents
configStore.addToolToAllAgents(createProposeFormValuesTool());
// Add chart generation tool to all agents
configStore.addToolToAllAgents(createGenerateChartTool());Context Limiting
The library automatically limits context to prevent token limit issues:
import { helpers } from '@multiplayer-app/ai-agent-node';
const limitedMessages = helpers.ContextLimiter.limitContext(messages, {
maxMessages: 10,
keepFirstUserMessage: true,
keepSystemMessages: true
});Artifact Management
Store and retrieve artifacts generated by agents:
// List artifacts for a chat
const artifacts = chatProcessor.listArtifacts('chat-id');
// Artifacts are automatically stored when generated by tools
// Access via ArtifactStore if needed
import { ArtifactStore } from '@multiplayer-app/ai-agent-node';
const artifactStore = new ArtifactStore();
const artifacts = artifactStore.listArtifacts('chat-id');