model-context-protocol-distributed-streamable-http-server-transport
v0.1.4
Published
**Enterprise-grade MCP transport for horizontally scalable deployments**
Readme
Distributed Streamable HTTP Server Transport
Enterprise-grade MCP transport for horizontally scalable deployments
What This Transport Does
This package implements the MCP Streamable HTTP transport specification with a critical difference: it's designed for distributed deployments from day one.
In a typical MCP setup, your server runs on a single node. This works great for development and small-scale production. But what happens when you need to:
- Run multiple server instances behind a load balancer?
- Handle thousands of concurrent client sessions?
- Survive server restarts without losing session state?
- Deploy across multiple regions?
This is where the Distributed Streamable HTTP Transport shines.
The Architecture
┌─────────────────────────────────┐
│ Load Balancer │
└─────────────┬───────────────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ HTTP Node 1 │ │ HTTP Node 2 │ │ HTTP Node 3 │
│ (Transport) │ │ (Transport) │ │ (Transport) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
└─────────────────────────────┼─────────────────────────────┘
│
┌─────────────▼───────────────────┐
│ EventBroker │
│ (NATS / Kafka / Redis / ...) │
└─────────────┬───────────────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ (MCP Handler) │ │ (MCP Handler) │ │ (MCP Handler) │
└─────────────────┘ └─────────────────┘ └─────────────────┘Key Concepts
Sessions as Routing Keys
Every MCP connection is identified by a session ID (the Mcp-Session-Id header). In distributed mode, this session ID becomes the routing key for all messages:
- Messages are published to session-scoped topics
- Workers subscribe to specific sessions or use queue groups for load balancing
- Session state can be persisted externally (Redis, database, etc.)
EventBroker as the Backplane
The EventBroker interface abstracts your messaging infrastructure. Whether you use NATS JetStream, Kafka, Redis Streams, or something else, the transport doesn't care—it just publishes and subscribes.
interface EventBroker {
publish<TParams, TData>(
topic: Topic<TParams, TData>,
params: TParams,
data: TData
): Promise<EventId>;
subscribe<TParams, TData>(
topic: Topic<TParams, TData>,
params: TParams,
options?: SubscriptionOptions
): Subscription<TData>;
close(): Promise<void>;
}Typed Topic System
Topics are defined with TypeScript types that enforce compile-time safety:
// Request-scoped: mcp.{sessionId}.{requestId}.{direction}
const RequestInbound: Topic<RequestScopeParams, JSONRPCMessage>;
const RequestOutbound: Topic<RequestScopeParams, JSONRPCMessage>;
// Session-scoped: mcp.{sessionId}.bg.{direction}
const BackgroundOutbound: Topic<SessionScopeParams, JSONRPCMessage>;
const BackgroundInbound: Topic<SessionScopeParams, JSONRPCMessage>;How It Works
POST Request Flow (Client → Server)
- Client sends HTTP POST to
/mcpwith JSON-RPC request - Transport extracts/creates session ID
- Transport subscribes to
RequestOutboundfor this specific request - Transport delivers message to protocol layer via
messageHandler - Protocol processes request and publishes response to
RequestOutbound - Transport receives response via subscription
- Transport sends HTTP response (JSON or SSE stream)
GET Request Flow (Background Channel)
- Client opens SSE stream via HTTP GET to
/mcp - Transport validates session ID from header
- Transport subscribes to
BackgroundOutboundandBackgroundInbound - Server publishes notifications/requests to background topics
- Transport streams messages to client via SSE
- Supports
Last-Event-IDfor resumability
DELETE Request Flow (Session Termination)
- Client sends HTTP DELETE to
/mcpwith session ID - Transport validates session
- Transport calls
SessionManager.delete() - Session and associated resources are cleaned up
Installation
npm install model-context-protocol-distributed-streamable-http-server-transportYou'll also need:
model-context-protocol-sdkfor protocol + server- An EventBroker implementation (this package provides the interface)
- Optionally, a SessionManager implementation
Quick Start
Basic Setup
import { Server, ToolsFeature } from "model-context-protocol-sdk/server";
import {
DistributedStreamableHttpServerTransport
} from "model-context-protocol-distributed-streamable-http-server-transport";
import { InMemoryEventBroker } from "./my-event-broker";
import { InMemorySessionManager } from "./my-session-manager";
// Create MCP server with tools
const server = new Server({
serverInfo: { name: "my-distributed-server", version: "1.0.0" },
capabilities: { tools: { listChanged: true } }
});
const tools = new ToolsFeature();
tools.registerTool(
{ name: "echo", description: "Echo input", inputSchema: { type: "object" } },
async (args) => ({
content: [{ type: "text", text: String((args as any)?.text ?? "") }]
})
);
server.addFeature(tools);
// Create transport with broker and session manager
const transport = new DistributedStreamableHttpServerTransport({
httpServer: {
port: 3000,
endpoint: "/mcp"
},
eventBroker: new InMemoryEventBroker(),
sessions: new InMemorySessionManager()
});
// Connect and start
await server.connect(transport);
console.log("Server running at http://localhost:3000/mcp");Production Setup with NATS
import { connect, NatsConnection } from "nats";
class NatsEventBroker implements EventBroker {
private nc: NatsConnection;
private js: JetStreamClient;
async connect() {
this.nc = await connect({ servers: "nats://localhost:4222" });
this.js = this.nc.jetstream();
}
async publish<TParams, TData>(
topic: Topic<TParams, TData>,
params: TParams,
data: TData
): Promise<EventId> {
const subject = topic.subject(params);
const ack = await this.js.publish(subject, JSON.stringify(data));
return String(ack.seq);
}
subscribe<TParams, TData>(
topic: Topic<TParams, TData>,
params: TParams,
options?: SubscriptionOptions
): Subscription<TData> {
// Implementation with NATS JetStream consumer
}
async close() {
await this.nc.close();
}
}
// Use in transport
const transport = new DistributedStreamableHttpServerTransport({
httpServer: { port: 3000, endpoint: "/mcp" },
eventBroker: new NatsEventBroker(),
sessions: new RedisSessionManager()
});Configuration
Transport Options
interface DistributedStreamableHttpServerTransportOptions {
httpServer: {
port: number; // Required: port to listen on
host?: string; // Default: 0.0.0.0
endpoint?: string; // Default: /
middlewares?: Middleware[]; // Express-style middleware chain
};
streamableHttp?: {
responseTimeoutMs?: number; // Default: 30000
responseModeStrategy?: ResponseModeStrategy;
enableBackgroundChannel?: boolean; // Default: true
enableSessionTermination?: boolean; // Default: true
};
eventBroker: EventBroker; // Required: message broker
sessions?: SessionManager; // Optional: session persistence
}Response Mode Strategy
Control whether POST responses use JSON or SSE:
const customStrategy: ResponseModeStrategy = (messages, session) => {
// Default: SSE for tools/call, sampling/createMessage, prompts/get
// or when progressToken is present
// Custom: always use SSE for certain sessions
if (session?.getValue("preferStreaming")) {
return "sse";
}
return "json";
};
const transport = new DistributedStreamableHttpServerTransport({
// ...
streamableHttp: {
responseModeStrategy: customStrategy
}
});Topic Model Deep Dive
Subject Patterns
All topics follow a consistent naming convention:
mcp.{sessionId}.{scope}.{direction}- sessionId: UUID or custom session identifier
- scope: Either a
requestIdorbg(background) - direction:
inbound(to handlers) oroutbound(to client)
Message Flow Examples
Tool Call
Client POST (tools/call, id=req1, session=abc)
│
▼
Transport subscribes: mcp.abc.req1.outbound
│
▼
Transport delivers to protocol messageHandler
│
▼
Protocol processes, handler executes
│
▼
Protocol publishes result: mcp.abc.req1.outbound
│
▼
Transport receives via subscription
│
▼
Transport sends HTTP response to clientServer Notification
Server calls transport.send(notification)
│
▼
Transport publishes: mcp.abc.bg.outbound
│
▼
Client's GET SSE stream receives via subscription
│
▼
SSE event sent to clientServer Request (e.g., sampling/createMessage)
Server publishes request: mcp.abc.bg.inbound
│
▼
Client's GET SSE stream receives
│
▼
Client sends response via POST
│
▼
Normal request/response flowSession Management
SessionManager Interface
interface SessionManager {
create(request: SessionRequest): Session;
get(sessionId: string, request: SessionRequest): Session | undefined;
delete(sessionId: string, request: SessionRequest): void;
}
interface Session {
readonly id: string;
getValue<T>(key: string): T | undefined;
setValue<T>(key: string, value: T): void;
deleteValue(key: string): void;
}Session Lifecycle
- Creation: On first POST (initialize request) without session ID
- Retrieval: On subsequent requests with
Mcp-Session-Idheader - Deletion: On DELETE request or TTL expiration
Persistence Strategies
| Strategy | Pros | Cons | Use Case | |----------|------|------|----------| | In-memory | Fast, simple | Lost on restart | Development, testing | | Redis | Fast, distributed | Extra infrastructure | Production, scaling | | Database | Durable, queryable | Slower, complexity | Compliance, auditing |
Health Endpoints
The transport exposes health endpoints outside the MCP path:
GET /health → 200 { status: "healthy" }
GET /readiness → 200 { status: "ready" } or 503 { status: "not ready" }Use these for Kubernetes probes, load balancer health checks, etc.
Middleware Support
Add middleware for logging, authentication, CORS, etc.:
const loggingMiddleware: Middleware = async (req, res, next) => {
console.log(`${req.method} ${req.url}`);
await next();
};
const authMiddleware: Middleware = async (req, res, next) => {
const token = req.headers["authorization"];
if (!validateToken(token)) {
res.statusCode = 401;
res.end("Unauthorized");
return;
}
await next();
};
const transport = new DistributedStreamableHttpServerTransport({
httpServer: {
port: 3000,
endpoint: "/mcp",
middlewares: [loggingMiddleware, authMiddleware]
},
// ...
});SSE Resumability
The transport supports reconnection with Last-Event-ID:
- Each SSE event includes a broker-assigned
id(EventId) - Client stores the last received ID
- On reconnection, client sends
Last-Event-IDheader - Transport passes this to broker subscription
- Broker replays missed messages
Broker Requirements: Your EventBroker must support fromEventId in subscription options for replay.
Deployment Patterns
Pattern 1: Colocated (Simple)
Protocol handler runs in the same process as HTTP transport:
const server = new Server({ /* ... */ });
const transport = new DistributedStreamableHttpServerTransport({ /* ... */ });
await server.connect(transport);Best for: Development, small deployments
Pattern 2: Distributed Workers
HTTP transport and protocol handlers are separate:
// HTTP Node (Transport)
const transport = new DistributedStreamableHttpServerTransport({
httpServer: { port: 3000, endpoint: "/mcp" },
eventBroker: natsBroker
});
// No server.connect() - transport only receives and routes
// Worker Node (Handler)
const subscription = broker.subscribe(RequestInbound, { sessionId: "*" }, {
queueGroup: "mcp-workers"
});
for await (const msg of subscription) {
const result = await processRequest(msg.data);
await broker.publish(RequestOutbound, { sessionId, requestId }, result);
await msg.ack();
}Best for: High throughput, isolation, scaling workers independently
Pattern 3: Regional Deployment
Deploy transport nodes in each region, share broker and session store:
US-West US-East EU-West
┌──────────┐ ┌──────────┐ ┌──────────┐
│Transport │ │Transport │ │Transport │
│ Node │ │ Node │ │ Node │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└─────────────────────────┼─────────────────────────┘
│
┌──────────▼──────────┐
│ Global Broker │
│ (NATS Global) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Session Store │
│ (Redis Cluster) │
└─────────────────────┘Best for: Global low-latency, disaster recovery
Best Practices
EventBroker Implementation
- Delivery Semantics: Document whether your broker provides at-most-once or at-least-once delivery
- EventId Stability: Use monotonically increasing IDs for SSE resumability
- Queue Groups: Use them for worker load balancing
- Message Ordering: Ensure per-session ordering if your use case requires it
Session Management
- TTL: Set reasonable session expiration to clean up abandoned sessions
- External Store: Use Redis/database for production deployments
- Session Data: Keep session data small; use references to external storage for large objects
Monitoring
- Connection Metrics: Track active SSE connections per node
- Message Latency: Measure publish-to-receive time through broker
- Error Rates: Monitor handler errors, timeouts, broker failures
- Session Lifecycle: Track creation, usage, expiration
Building & Testing
From the repo root:
pnpm nx build model-context-protocol-distributed-streamable-http-server-transport
pnpm nx test model-context-protocol-distributed-streamable-http-server-transportAPI Reference
Classes
DistributedStreamableHttpServerTransport- Main transport class
Interfaces
EventBroker- Message broker abstractionSessionManager- Session persistence abstractionSession- Session state containerTopic<TParams, TData>- Typed topic definitionSubscription<TData>- Async iterable message streamBrokerMessage<TData>- Message wrapper with ack/nack
Topics
RequestInbound- Client requests to handlersRequestOutbound- Handler responses to transportBackgroundOutbound- Server notifications to clientsBackgroundInbound- Server requests to clientsSessionWildcard- All messages for a session (debugging)
Related Packages
- model-context-protocol-sdk - Protocol + server + client
- Example Server - Working example
- E2E Tests - Integration tests
License
MIT License - see LICENSE for details.
