mcp-streaming
v0.1.2
Published
Connect streaming data to AI coding tools via MCP
Maintainers
Readme
mcp-stream
Connect streaming data to AI coding tools via MCP.
mcp-stream is a streaming data engine that connects MQTT, Kafka, and Webhook sources to AI coding tools (Claude Code, Cursor, etc.) via the Model Context Protocol.
Quickstart
1. Start the engine
git clone https://github.com/mcp-stream/mcp-stream
cd mcp-stream
cp .env.example .env
docker compose up -dThis starts Redis, Postgres, and the mcp-stream engine.
2. Add to your MCP client
{
"mcpServers": {
"mcp-stream": {
"command": "npx",
"args": ["mcp-streaming"],
"env": {
"MCP_STREAM_URL": "http://localhost:3000"
}
}
}
}3. Ask your AI to connect data
"Connect to the public MQTT broker at broker.emqx.io and subscribe to testtopic/#"
The AI will use the create_connection tool to connect, then read_stream to sample live data.
Features
- Multi-source — MQTT, Kafka, and Webhook ingestion
- Redis Streams — buffered, replayable data pipeline
- WebSocket relay — real-time fan-out with backfill and topic filtering
- MCP tools — 12 tools for managing connections, reading streams, and deploying watchers
- Watchers — server-side TypeScript processes for alerts, anomaly detection, and aggregation
- CLI —
mcp-stream status,mcp-stream logs --follow
Architecture
Source (MQTT / Kafka / Webhook)
|
v
Subscriber --> Redis Stream
|
+-------+-------+
| |
v v
StreamRelay Watcher processes
(XREAD BLOCK) (XREAD from same stream)
|
v
WebSocket clients
(backfill + live)CLI
npx mcp-streaming # Start MCP stdio server (default)
npx mcp-streaming start # Start the engine
npx mcp-streaming status # Show connections and watchers
npx mcp-streaming logs [id] # Tail stream data
npx mcp-streaming --version # Show versionMCP Tools
| Tool | Description |
|------|-------------|
| create_connection | Create MQTT/Kafka/Webhook connection |
| list_connections | List all connections |
| get_connection | Get connection status |
| destroy_connection | Delete a connection |
| read_stream | Read backfill + live entries from Redis |
| create_watcher | Deploy a TypeScript watcher script |
| list_watchers | List watchers on a connection |
| get_watcher | Get watcher details + source code |
| get_watcher_logs | Get watcher stdout/stderr logs |
| update_watcher | Update watcher config and/or script (triggers restart) |
| restart_watcher | Restart a stopped/crashed watcher |
| delete_watcher | Stop and remove a watcher |
HTTP API
POST /connections Create connection
GET /connections List connections
GET /connections/:id Get connection
DELETE /connections/:id Destroy connection
POST /connections/:id/ingest Webhook ingest
WS /connections/:id/ws WebSocket stream
POST /connections/:id/watchers Create watcher
GET /connections/:id/watchers List watchers
GET /connections/:id/watchers/:wid Get watcher
DELETE /connections/:id/watchers/:wid Delete watcher
PUT /connections/:id/watchers/:wid Update watcher
PUT /connections/:id/watchers/:wid/restart Restart watcher
GET /connections/:id/watchers/:wid/logs Get logs
POST /mcp MCP HTTP transport
GET /health Health checkConfiguration
| Variable | Default | Description |
|----------|---------|-------------|
| REDIS_URL | redis://localhost:6379 | Redis connection URL |
| DATABASE_URL | — | Postgres connection URL |
| PORT | 3000 | HTTP server port |
| HOST | 0.0.0.0 | HTTP server bind address |
| MCP_STREAM_URL | http://localhost:3000 | Service URL for stdio proxy |
| WATCHER_DIR | ./data/watchers | Directory for watcher scripts |
Development
npm install
docker compose up -d redis postgres # Start dependencies
npm run dev # Start with hot-reloadContributing
See CONTRIBUTING.md.
License
MIT — see LICENSE.
Want managed hosting? Check out JustinX Cloud — zero-ops deployment with team management, SSO, and SLAs.
