npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@goplasmatic/dataflow-wasm

v3.0.1

Published

WebAssembly bindings for the dataflow-rs rules engine

Readme

Dataflow-rs

A high-performance rules engine for IFTTT-style automation in Rust with zero-overhead JSONLogic evaluation.

License: Apache 2.0 Rust Crates.io


Dataflow-rs is a lightweight rules engine that lets you define IF → THEN → THAT automation in JSON. Rules are evaluated using pre-compiled JSONLogic for zero runtime overhead, and actions execute asynchronously for high throughput. Whether you're routing events, validating data, or building complex automation pipelines, Dataflow-rs gives you enterprise-grade performance with minimal complexity.

How It Works: IF → THEN → THAT

┌─────────────────────────────────────────────────────────────────┐
│  Rule (Workflow)                                                │
│                                                                 │
│  IF    condition matches        →  JSONLogic against any field  │
│  THEN  execute actions (tasks)  →  map, validate, custom logic  │
│  THAT  chain more rules         →  priority-ordered execution   │
└─────────────────────────────────────────────────────────────────┘

Example: IF order.total > 1000 THEN apply_discount AND notify_manager

Core Concepts

| Rules Engine | Workflow Engine | Description | |---|---|---| | Rule | Workflow | A condition + actions bundle — IF condition THEN execute actions | | Action | Task | An individual processing step (map, validate, or custom function) | | RulesEngine | Engine | Evaluates rules against messages and executes matching actions |

Both naming conventions are fully supported — use whichever fits your mental model.

Getting Started

1. Add to Cargo.toml

[dependencies]
dataflow-rs = "3.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
serde_json = "1.0"

2. Define Rules in JSON

{
    "id": "premium_order",
    "name": "Premium Order Processing",
    "condition": {">=": [{"var": "data.order.total"}, 1000]},
    "tasks": [
        {
            "id": "apply_discount",
            "name": "Apply Premium Discount",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {
                            "path": "data.order.discount",
                            "logic": {"*": [{"var": "data.order.total"}, 0.1]}
                        },
                        {
                            "path": "data.order.final_total",
                            "logic": {"-": [{"var": "data.order.total"}, {"*": [{"var": "data.order.total"}, 0.1]}]}
                        }
                    ]
                }
            }
        }
    ]
}

3. Run the Engine

use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let workflow = Workflow::from_json(r#"{ ... }"#)?; // Your rule JSON

    // Create engine — all JSONLogic compiled once here
    let engine = Engine::builder().with_workflow(workflow).build()?;

    // Process a message
    let payload = Arc::new(json!({"order": {"total": 1500}}));
    let mut message = Message::new(payload);
    engine.process_message(&mut message).await?;

    println!("Discount: {}", message.data()["order"]["discount"]); // 150
    Ok(())
}

Handling Errors — Two Channels

process_message reports errors through two complementary channels:

  • Result::Err signals that the engine stopped early (a task failed without continue_on_error, or an engine-level error occurred).
  • message.errors() always contains every error encountered, including errors from tasks that ran with continue_on_error = true and so didn't short-circuit the workflow.

A short-circuit ? will surface only the first kind. For full coverage:

use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let engine = Engine::builder()
        .with_workflow(Workflow::from_json(r#"{ ... }"#)?)
        .build()?;

    let mut message = Message::from_value(&json!({"order": {"total": 1500}}));

    // `continue_on_error` tasks may record errors here without returning Err.
    if let Err(e) = engine.process_message(&mut message).await {
        eprintln!("engine halted: {e}");
    }

    // Always iterate `message.errors()` to see everything that went wrong.
    for err in message.errors() {
        eprintln!(
            "[{workflow_id}/{task_id}] {msg}",
            workflow_id = err.workflow_id.as_deref().unwrap_or("-"),
            task_id = err.task_id.as_deref().unwrap_or("-"),
            msg = err.message,
        );
    }

    Ok(())
}

Using Rules Engine Aliases

use dataflow_rs::{RulesEngine, Rule, Action};

// These are type aliases — same types, rules-engine terminology
let rule = Rule::from_json(r#"{ ... }"#)?;
let engine = RulesEngine::builder().with_workflow(rule).build()?;

Key Features

  • IF → THEN → THAT Model: Define rules with JSONLogic conditions, execute actions, chain with priority ordering.
  • Zero Runtime Compilation: All JSONLogic expressions pre-compiled at startup for optimal performance.
  • Full Context Access: Conditions can access any field — data, metadata, temp_data.
  • Async-First Architecture: Native async/await support with Tokio for high-throughput processing.
  • Execution Tracing: Step-by-step debugging with message snapshots after each action.
  • Built-in Functions: Parse, Map, Validate, Filter, Log, and Publish for complete data pipelines.
  • Pipeline Control Flow: Filter/gate function to halt workflows or skip tasks based on conditions.
  • Channel Routing: Route messages to specific workflow channels with O(1) lookup.
  • Workflow Lifecycle: Manage workflow status (active/paused/archived), versioning, and tagging.
  • Hot Reload: Swap workflows at runtime without re-registering custom functions.
  • Extensible: Add custom async actions by implementing the AsyncFunctionHandler trait.
  • Typed Integration Configs: Pre-validated configs for HTTP, Enrich, and Kafka integrations.
  • WebAssembly Support: Run rules in the browser with @goplasmatic/dataflow-wasm.
  • React UI Components: Visualize and debug rules with @goplasmatic/dataflow-ui.
  • Auditing: Full audit trail of all changes as data flows through the pipeline.

Architecture

Compilation Phase (Startup)

  1. All JSONLogic expressions compiled once when the Engine is created
  2. Compiled logic cached with Arc for zero-copy sharing
  3. Validates all expressions early, failing fast on errors

Execution Phase (Runtime)

  1. Engine evaluates each rule's condition against the message context
  2. Matching rules execute their actions with pre-compiled logic (zero compilation overhead)
  3. process_message() for normal execution, process_message_with_trace() for debugging
  4. Each action can be async, enabling I/O operations without blocking

Performance

  • Pre-Compilation: All JSONLogic compiled at startup, zero runtime overhead
  • Arc-Wrapped Logic: Zero-copy sharing of compiled expressions
  • Context Arc Caching: 50% improvement via cached Arc context
  • Async I/O: Non-blocking operations for external services
  • Predictable Latency: No runtime allocations for logic evaluation
cargo run --example benchmark           # Performance benchmark
cargo run --example rules_engine        # IFTTT-style rules engine demo
cargo run --example complete_workflow   # Parse → Transform → Validate pipeline

Custom Functions

Extend the engine with your own async actions. Each handler declares a typed Input (deserialized once at engine init), receives a TaskContext that records audit-trail changes automatically, and returns a TaskOutcome:

use async_trait::async_trait;
use dataflow_rs::{AsyncFunctionHandler, Engine, Result, TaskContext, TaskOutcome};
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;

/// Typed config for the handler — fails at `Engine::new()` if malformed,
/// not on first message.
#[derive(Deserialize)]
pub struct NotifyInput {
    pub channel: String,
}

pub struct NotifyManager;

#[async_trait]
impl AsyncFunctionHandler for NotifyManager {
    type Input = NotifyInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &NotifyInput,
    ) -> Result<TaskOutcome> {
        // Your custom async logic here (HTTP calls, DB writes, etc.)
        ctx.set(
            "data.notified_channel",
            OwnedDataValue::from(&json!(input.channel)),
        );
        Ok(TaskOutcome::Success)
    }
}

// Register handlers via the builder. `.register("name", h)` accepts any
// `AsyncFunctionHandler` and boxes it internally.
let engine = Engine::builder()
    .with_workflows(workflows)
    .register("notify_manager", NotifyManager)
    .build()?;

Built-in Functions

| Function | Purpose | Modifies Data | |----------|---------|---------------| | parse_json | Parse JSON from payload into data context | Yes | | parse_xml | Parse XML string into JSON data structure | Yes | | map | Data transformation using JSONLogic | Yes | | validation | Rule-based data validation | No (read-only) | | filter | Pipeline control flow — halt workflow or skip task | No | | log | Structured logging with JSONLogic expressions | No | | publish_json | Serialize data to JSON string | Yes | | publish_xml | Serialize data to XML string | Yes |

Filter (Pipeline Control Flow)

The filter function evaluates a JSONLogic condition and controls pipeline execution:

{
    "function": {
        "name": "filter",
        "input": {
            "condition": {"==": [{"var": "data.status"}, "active"]},
            "on_reject": "halt"
        }
    }
}
  • on_reject: "halt" — stops the entire workflow when the condition is false
  • on_reject: "skip" — skips just the current task and continues

Log (Structured Logging)

The log function outputs structured log messages using the log crate:

{
    "function": {
        "name": "log",
        "input": {
            "level": "info",
            "message": {"cat": ["Processing order ", {"var": "data.order.id"}]},
            "fields": {
                "total": {"var": "data.order.total"},
                "user": {"var": "data.user.name"}
            }
        }
    }
}

Log levels: trace, debug, info, warn, error. Messages and fields support JSONLogic expressions.

Channel Routing

Route messages to specific workflow channels for efficient O(1) dispatch:

// Workflows define their channel
// { "id": "order_rule", "channel": "orders", "status": "active", ... }

// Process only workflows on a specific channel
engine.process_message_for_channel("orders", &mut message).await?;

Only active workflows are included in channel routing. Workflows default to the "default" channel.

Workflow Lifecycle

Workflows support lifecycle management fields:

{
    "id": "my_rule",
    "channel": "orders",
    "version": 2,
    "status": "active",
    "tags": ["premium", "high-priority"],
    "created_at": "2025-01-15T10:00:00Z",
    "updated_at": "2025-06-01T14:30:00Z",
    "tasks": [...]
}

| Field | Type | Default | Description | |-------|------|---------|-------------| | channel | string | "default" | Channel for message routing | | version | number | 1 | Workflow version | | status | string | "active" | active, paused, or archived | | tags | array | [] | Arbitrary tags for organization | | created_at | datetime | null | Creation timestamp (ISO 8601) | | updated_at | datetime | null | Last update timestamp (ISO 8601) |

All fields are optional and backward-compatible with existing configurations.

Engine Hot Reload

Swap workflows at runtime without losing custom function registrations:

let new_workflows = vec![Workflow::from_json(r#"{ ... }"#)?];
let new_engine = engine.with_new_workflows(new_workflows);
// Old engine remains valid for in-flight messages

Related Packages

| Package | Description | |---------|-------------| | @goplasmatic/dataflow-wasm | WebAssembly bindings for browser execution | | @goplasmatic/dataflow-ui | React components for rule visualization and debugging |

Contributing

We welcome contributions! Feel free to fork the repository, make your changes, and submit a pull request. Please make sure to add tests for any new features.

About Plasmatic

Dataflow-rs is developed by the team at Plasmatic. We're passionate about building open-source tools for data processing and automation.

License

This project is licensed under the Apache License, Version 2.0. See the LICENSE file for more details.