@fluxgraph/core
v0.1.2
Published
Real-time graph-based stream processing for Cloudflare Workers and Durable Objects
Maintainers
Readme
FluxGraph
🌊 Real-time graph-based stream processing and AI orchestration for Cloudflare Workers
FluxGraph is a lightweight, high-performance stream processing library with built-in AI workflow capabilities, designed specifically for edge computing environments. Build complex data pipelines and AI agents that run directly on Cloudflare's global network - combining the power of LangGraph-style orchestration with real-time stream processing in a package that's 10x smaller than alternatives.
Features
- 🚀 Real-time Processing - Process data streams with millisecond latency
- 🔀 Graph-based Architecture - Create complex topologies with parallel and conditional paths
- 🤖 AI-Native - Built-in LLM, tool calling, and memory nodes for AI workflows
- 📊 Built-in Aggregations - Time, count, and session-based windowing
- 🔄 Backpressure Handling - Automatic buffering and flow control
- 🛡️ Error Resilience - Retry policies and error recovery strategies
- 🎯 Type-safe - Full TypeScript support with comprehensive types
- ☁️ Edge-native - Optimized for Cloudflare Workers and Durable Objects
- 🔁 Agent Loops - Support for cyclic graphs enabling ReAct and autonomous agents
- 💾 State Management - Built-in memory and checkpointing for long-running workflows
Installation
npm install @fluxgraph/coreQuick Start
import { Graph, nodes } from '@fluxgraph/core';
// Define your graph
const graph = new Graph({
name: 'Transaction Processor',
nodes: [
nodes.source('webhook', {
type: 'websocket',
url: 'wss://api.example.com/transactions'
}),
nodes.transform('normalize', {
function: (data) => ({
...data,
amount: data.amount / 100
})
}),
nodes.filter('large-only', {
function: (data) => data.amount > 100
}),
nodes.aggregate('hourly-summary', {
window: 'time',
duration: 3600,
function: (packets) => ({
total: packets.reduce((sum, p) => sum + p.data.amount, 0),
count: packets.length
})
}),
nodes.sink('alerts', {
type: 'http',
url: 'https://alerts.example.com/webhook'
})
],
edges: [
['webhook', 'normalize'],
['normalize', 'large-only'],
['large-only', 'hourly-summary'],
['hourly-summary', 'alerts']
]
});
// Start processing
await graph.start();
// Inject data manually
await graph.inject('webhook', { amount: 15000, currency: 'USD' });
// Subscribe to outputs
graph.subscribe('alerts', (packet) => {
console.log('Alert triggered:', packet.data);
});AI Workflows Comparison
FluxGraph now includes powerful AI workflow capabilities, making it a lightweight alternative to popular AI orchestration frameworks:
| Feature | FluxGraph | LangGraph | Pydantic AI | LlamaIndex | CrewAI | |---------|-----------|-----------|-------------|------------|--------| | Stream Processing | ✅ Excellent (RxJS-based) | ✅ Good | ⚠️ Limited | ⚠️ Limited | ❌ No | | Graph Architecture | ✅ Yes | ✅ Yes | ⚠️ Chain-based | ⚠️ Chain-based | ✅ Yes | | AI-specific Nodes | ✅ LLM, Tool, Memory | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Agents | | State Management | ✅ Built-in + Durable Objects | ✅ Built-in | ✅ Built-in | ✅ Built-in | ✅ Yes | | Cycles/Agent Loops | ✅ Yes (ReAct, etc.) | ✅ Yes | ✅ Yes | ⚠️ Limited | ✅ Yes | | Tool Calling | ✅ Parallel + Sequential | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes | | Memory Types | ✅ Conversation, Semantic, Hybrid | ✅ Yes | ⚠️ Basic | ✅ Yes | ✅ Yes | | Checkpointing | ✅ Yes | ✅ Yes | ✅ Yes | ⚠️ Limited | ⚠️ Limited | | Edge Runtime | ✅ Cloudflare Workers | ❌ No | ❌ No | ❌ No | ❌ No | | Bundle Size | ✅ ~179KB | ❌ ~1.7MB | ❌ Python only | ❌ Python only | ❌ Python only | | Streaming LLM | ✅ Native | ✅ Yes | ⚠️ Limited | ✅ Yes | ⚠️ Limited | | TypeScript | ✅ First-class | ✅ Yes | ❌ Python | ❌ Python | ❌ Python | | Real-time Data | ✅ Excellent | ⚠️ Limited | ❌ No | ❌ No | ❌ No | | Production Ready | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes | ⚠️ Beta |
Why Choose FluxGraph for AI Workflows?
- 🚀 Edge-Native: Only framework that runs on Cloudflare Workers
- ⚡ Lightweight: ~179KB vs 1.7MB for LangGraph (10x smaller)
- 🌊 Streaming-First: Built on RxJS for excellent real-time performance
- 🔧 Flexible: Combine AI with real-time data processing
- 💾 Durable: Native integration with Durable Objects for persistence
AI Quick Start
ReAct Agent Example
import { GraphRunner } from '@fluxgraph/core';
import { reactAgentTemplate } from '@fluxgraph/core/templates';
// Create an autonomous agent
const agent = new GraphRunner(reactAgentTemplate);
await agent.initialize();
await agent.start();
// Give it a task
agent.inject('input', {
task: 'Research and summarize the latest AI trends'
});RAG Pipeline Example
import { GraphBuilder, LLMNode, MemoryNode } from '@fluxgraph/core';
const ragPipeline = GraphBuilder.create('RAG Pipeline')
.nodes(
{
id: 'vectorDB',
type: 'memory',
name: 'Vector Store',
memoryType: 'semantic',
embeddingDimension: 1536
},
{
id: 'llm',
type: 'llm',
name: 'GPT-4',
model: 'gpt-4',
systemPrompt: 'Answer based on the provided context.',
streaming: true
}
)
.flow('vectorDB', 'llm')
.build();Multi-Agent Collaboration
const multiAgent = GraphBuilder.create('Multi-Agent System')
.allowCycles() // Enable agent communication loops
.nodes(
{
id: 'coordinator',
type: 'llm',
name: 'Coordinator',
model: 'gpt-4',
systemPrompt: 'You coordinate multiple specialist agents.'
},
{
id: 'researcher',
type: 'llm',
name: 'Research Agent',
model: 'gpt-3.5-turbo',
systemPrompt: 'You are a research specialist.'
},
{
id: 'analyst',
type: 'llm',
name: 'Analysis Agent',
model: 'gpt-3.5-turbo',
systemPrompt: 'You analyze data and provide insights.'
}
)
.build();Use Cases
Financial Transaction Processing
const financialGraph = templates.financial.createAnomalyDetector({
thresholds: {
amount: 1000,
frequency: 10 // transactions per minute
},
alertUrl: 'https://your-webhook.com'
});IoT Data Aggregation
const iotGraph = templates.iot.createSensorAggregator({
sensors: ['temperature', 'humidity', 'pressure'],
aggregateWindow: 60, // seconds
outputFormat: 'prometheus'
});Real-time Analytics
const analyticsGraph = templates.analytics.createEventProcessor({
events: ['click', 'view', 'purchase'],
sessionTimeout: 1800, // 30 minutes
enrichment: {
geoip: true,
userAgent: true
}
});Durable Object Integration
export class StreamProcessor extends DurableObject {
private graph: Graph;
async fetch(request: Request) {
if (!this.graph) {
this.graph = new Graph(graphConfig);
await this.graph.start();
}
const url = new URL(request.url);
if (url.pathname === '/inject') {
const data = await request.json();
await this.graph.inject('input', data);
return new Response('OK');
}
if (url.pathname === '/metrics') {
return Response.json(this.graph.getMetrics());
}
return new Response('Not found', { status: 404 });
}
}Node Types
Source Nodes
- WebSocket - Real-time data streams
- HTTP - Polling or webhook endpoints
- Timer - Scheduled data generation
- Manual - Programmatic injection
Transform Nodes
- Data mapping and enrichment
- Format conversion
- Calculations and derived fields
Filter Nodes
- Conditional routing
- Data validation
- Sampling and rate limiting
Aggregate Nodes
- Time-based windows
- Count-based windows
- Session windows
- Custom aggregation functions
Sink Nodes
- WebSocket output
- HTTP webhooks
- Database writes
- Custom outputs
Advanced Features
Error Handling
const graph = new Graph({
// ...
errorStrategy: 'continue', // or 'stop', 'retry'
retryPolicy: {
maxRetries: 3,
backoffMultiplier: 2,
initialDelay: 1000
}
});Metrics and Monitoring
const metrics = graph.getMetrics();
console.log({
processed: metrics.packetsProcessed,
dropped: metrics.packetsDropped,
latency: metrics.averageLatency
});
graph.on('error', (event) => {
console.error('Graph error:', event);
});State Persistence
// Save graph state to Durable Object storage
const state = graph.getState();
await this.storage.put('graph-state', state);
// Restore on restart
const savedState = await this.storage.get('graph-state');
if (savedState) {
graph.restore(savedState);
}Performance
Streamflow is designed for high-throughput, low-latency processing:
- Process 10,000+ events/second per Durable Object
- Sub-millisecond processing latency
- Automatic backpressure handling
- Memory-efficient buffering
Contributing
We welcome contributions! Please see CONTRIBUTING.md for details.
License
MIT License - see LICENSE for details.
Support
Built with ❤️ for the edge computing community
