npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@fluxgraph/core

v0.1.2

Published

Real-time graph-based stream processing for Cloudflare Workers and Durable Objects

Readme

FluxGraph

🌊 Real-time graph-based stream processing and AI orchestration for Cloudflare Workers

FluxGraph is a lightweight, high-performance stream processing library with built-in AI workflow capabilities, designed specifically for edge computing environments. Build complex data pipelines and AI agents that run directly on Cloudflare's global network - combining the power of LangGraph-style orchestration with real-time stream processing in a package that's 10x smaller than alternatives.

Features

  • 🚀 Real-time Processing - Process data streams with millisecond latency
  • 🔀 Graph-based Architecture - Create complex topologies with parallel and conditional paths
  • 🤖 AI-Native - Built-in LLM, tool calling, and memory nodes for AI workflows
  • 📊 Built-in Aggregations - Time, count, and session-based windowing
  • 🔄 Backpressure Handling - Automatic buffering and flow control
  • 🛡️ Error Resilience - Retry policies and error recovery strategies
  • 🎯 Type-safe - Full TypeScript support with comprehensive types
  • ☁️ Edge-native - Optimized for Cloudflare Workers and Durable Objects
  • 🔁 Agent Loops - Support for cyclic graphs enabling ReAct and autonomous agents
  • 💾 State Management - Built-in memory and checkpointing for long-running workflows

Installation

npm install @fluxgraph/core

Quick Start

import { Graph, nodes } from '@fluxgraph/core';

// Define your graph
const graph = new Graph({
  name: 'Transaction Processor',
  nodes: [
    nodes.source('webhook', {
      type: 'websocket',
      url: 'wss://api.example.com/transactions'
    }),
    
    nodes.transform('normalize', {
      function: (data) => ({
        ...data,
        amount: data.amount / 100
      })
    }),
    
    nodes.filter('large-only', {
      function: (data) => data.amount > 100
    }),
    
    nodes.aggregate('hourly-summary', {
      window: 'time',
      duration: 3600,
      function: (packets) => ({
        total: packets.reduce((sum, p) => sum + p.data.amount, 0),
        count: packets.length
      })
    }),
    
    nodes.sink('alerts', {
      type: 'http',
      url: 'https://alerts.example.com/webhook'
    })
  ],
  
  edges: [
    ['webhook', 'normalize'],
    ['normalize', 'large-only'],
    ['large-only', 'hourly-summary'],
    ['hourly-summary', 'alerts']
  ]
});

// Start processing
await graph.start();

// Inject data manually
await graph.inject('webhook', { amount: 15000, currency: 'USD' });

// Subscribe to outputs
graph.subscribe('alerts', (packet) => {
  console.log('Alert triggered:', packet.data);
});

AI Workflows Comparison

FluxGraph now includes powerful AI workflow capabilities, making it a lightweight alternative to popular AI orchestration frameworks:

| Feature | FluxGraph | LangGraph | Pydantic AI | LlamaIndex | CrewAI | |---------|-----------|-----------|-------------|------------|--------| | Stream Processing | ✅ Excellent (RxJS-based) | ✅ Good | ⚠️ Limited | ⚠️ Limited | ❌ No | | Graph Architecture | ✅ Yes | ✅ Yes | ⚠️ Chain-based | ⚠️ Chain-based | ✅ Yes | | AI-specific Nodes | ✅ LLM, Tool, Memory | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Agents | | State Management | ✅ Built-in + Durable Objects | ✅ Built-in | ✅ Built-in | ✅ Built-in | ✅ Yes | | Cycles/Agent Loops | ✅ Yes (ReAct, etc.) | ✅ Yes | ✅ Yes | ⚠️ Limited | ✅ Yes | | Tool Calling | ✅ Parallel + Sequential | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes | | Memory Types | ✅ Conversation, Semantic, Hybrid | ✅ Yes | ⚠️ Basic | ✅ Yes | ✅ Yes | | Checkpointing | ✅ Yes | ✅ Yes | ✅ Yes | ⚠️ Limited | ⚠️ Limited | | Edge Runtime | ✅ Cloudflare Workers | ❌ No | ❌ No | ❌ No | ❌ No | | Bundle Size | ✅ ~179KB | ❌ ~1.7MB | ❌ Python only | ❌ Python only | ❌ Python only | | Streaming LLM | ✅ Native | ✅ Yes | ⚠️ Limited | ✅ Yes | ⚠️ Limited | | TypeScript | ✅ First-class | ✅ Yes | ❌ Python | ❌ Python | ❌ Python | | Real-time Data | ✅ Excellent | ⚠️ Limited | ❌ No | ❌ No | ❌ No | | Production Ready | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes | ⚠️ Beta |

Why Choose FluxGraph for AI Workflows?

  • 🚀 Edge-Native: Only framework that runs on Cloudflare Workers
  • ⚡ Lightweight: ~179KB vs 1.7MB for LangGraph (10x smaller)
  • 🌊 Streaming-First: Built on RxJS for excellent real-time performance
  • 🔧 Flexible: Combine AI with real-time data processing
  • 💾 Durable: Native integration with Durable Objects for persistence

AI Quick Start

ReAct Agent Example

import { GraphRunner } from '@fluxgraph/core';
import { reactAgentTemplate } from '@fluxgraph/core/templates';

// Create an autonomous agent
const agent = new GraphRunner(reactAgentTemplate);
await agent.initialize();
await agent.start();

// Give it a task
agent.inject('input', { 
  task: 'Research and summarize the latest AI trends' 
});

RAG Pipeline Example

import { GraphBuilder, LLMNode, MemoryNode } from '@fluxgraph/core';

const ragPipeline = GraphBuilder.create('RAG Pipeline')
  .nodes(
    {
      id: 'vectorDB',
      type: 'memory',
      name: 'Vector Store',
      memoryType: 'semantic',
      embeddingDimension: 1536
    },
    {
      id: 'llm',
      type: 'llm',
      name: 'GPT-4',
      model: 'gpt-4',
      systemPrompt: 'Answer based on the provided context.',
      streaming: true
    }
  )
  .flow('vectorDB', 'llm')
  .build();

Multi-Agent Collaboration

const multiAgent = GraphBuilder.create('Multi-Agent System')
  .allowCycles() // Enable agent communication loops
  .nodes(
    {
      id: 'coordinator',
      type: 'llm',
      name: 'Coordinator',
      model: 'gpt-4',
      systemPrompt: 'You coordinate multiple specialist agents.'
    },
    {
      id: 'researcher',
      type: 'llm',
      name: 'Research Agent',
      model: 'gpt-3.5-turbo',
      systemPrompt: 'You are a research specialist.'
    },
    {
      id: 'analyst',
      type: 'llm',
      name: 'Analysis Agent',
      model: 'gpt-3.5-turbo',
      systemPrompt: 'You analyze data and provide insights.'
    }
  )
  .build();

Use Cases

Financial Transaction Processing

const financialGraph = templates.financial.createAnomalyDetector({
  thresholds: {
    amount: 1000,
    frequency: 10 // transactions per minute
  },
  alertUrl: 'https://your-webhook.com'
});

IoT Data Aggregation

const iotGraph = templates.iot.createSensorAggregator({
  sensors: ['temperature', 'humidity', 'pressure'],
  aggregateWindow: 60, // seconds
  outputFormat: 'prometheus'
});

Real-time Analytics

const analyticsGraph = templates.analytics.createEventProcessor({
  events: ['click', 'view', 'purchase'],
  sessionTimeout: 1800, // 30 minutes
  enrichment: {
    geoip: true,
    userAgent: true
  }
});

Durable Object Integration

export class StreamProcessor extends DurableObject {
  private graph: Graph;

  async fetch(request: Request) {
    if (!this.graph) {
      this.graph = new Graph(graphConfig);
      await this.graph.start();
    }

    const url = new URL(request.url);
    
    if (url.pathname === '/inject') {
      const data = await request.json();
      await this.graph.inject('input', data);
      return new Response('OK');
    }

    if (url.pathname === '/metrics') {
      return Response.json(this.graph.getMetrics());
    }

    return new Response('Not found', { status: 404 });
  }
}

Node Types

Source Nodes

  • WebSocket - Real-time data streams
  • HTTP - Polling or webhook endpoints
  • Timer - Scheduled data generation
  • Manual - Programmatic injection

Transform Nodes

  • Data mapping and enrichment
  • Format conversion
  • Calculations and derived fields

Filter Nodes

  • Conditional routing
  • Data validation
  • Sampling and rate limiting

Aggregate Nodes

  • Time-based windows
  • Count-based windows
  • Session windows
  • Custom aggregation functions

Sink Nodes

  • WebSocket output
  • HTTP webhooks
  • Database writes
  • Custom outputs

Advanced Features

Error Handling

const graph = new Graph({
  // ...
  errorStrategy: 'continue', // or 'stop', 'retry'
  retryPolicy: {
    maxRetries: 3,
    backoffMultiplier: 2,
    initialDelay: 1000
  }
});

Metrics and Monitoring

const metrics = graph.getMetrics();
console.log({
  processed: metrics.packetsProcessed,
  dropped: metrics.packetsDropped,
  latency: metrics.averageLatency
});

graph.on('error', (event) => {
  console.error('Graph error:', event);
});

State Persistence

// Save graph state to Durable Object storage
const state = graph.getState();
await this.storage.put('graph-state', state);

// Restore on restart
const savedState = await this.storage.get('graph-state');
if (savedState) {
  graph.restore(savedState);
}

Performance

Streamflow is designed for high-throughput, low-latency processing:

  • Process 10,000+ events/second per Durable Object
  • Sub-millisecond processing latency
  • Automatic backpressure handling
  • Memory-efficient buffering

Contributing

We welcome contributions! Please see CONTRIBUTING.md for details.

License

MIT License - see LICENSE for details.

Support


Built with ❤️ for the edge computing community