npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@datafn/server

v0.0.2

Published

DataFn server runtime - HTTP endpoints for DFQL query, mutations, and sync

Downloads

114

Readme

@datafn/server

HTTP server runtime for DataFn. Provides DFQL query execution, mutations with idempotency and optimistic concurrency, atomic transactions, full bidirectional sync (clone/pull/push/reconcile/seed), per-user/tenant isolation, REST endpoint generation, WebSocket real-time updates, plugin hooks, and configurable limits.

Installation

npm install @datafn/server @datafn/core @superfunctions/db @superfunctions/http

Features

| Feature | Description | |---------|-------------| | Query Execution | DFQL queries with filtering, sorting, pagination, relations, aggregations | | Mutations | CRUD with idempotency (clientId + mutationId deduplication) and optimistic concurrency (if guards) | | Relation Operations | relate, unrelate, modifyRelation for many-many and other relation types | | Transactions | Atomic multi-step operations with query + mutation steps | | Sync: Clone | Full data download for initial hydration | | Sync: Pull | Incremental cursor-based change sync | | Sync: Push | Upload offline mutations from client changelog | | Sync: Reconcile | Detect and resolve local/remote drift | | Sync: Seed | Seed data into the database | | Authorization | Per-action authorization hook | | Multi-User Isolation | Per-user/tenant namespace isolation for change tracking | | REST Endpoints | Optional REST wrappers over DFQL (GET/POST/PATCH/DELETE) | | WebSocket | Real-time cursor broadcast for live updates | | Plugins | Hook into queries, mutations, and sync on the server side | | Sequence Store | Pluggable serverSeq backend: Database, Redis, KV, or chained store | | Payload Limits | Configurable max query limits, transaction steps, and payload size | | KV Resource | Built-in key-value resource auto-included in schema |


Quick Start

import { createDatafnServer } from "@datafn/server";
import { drizzleAdapter } from "@superfunctions/db/adapters";
import { toHono } from "@superfunctions/http-hono";
import { Hono } from "hono";
import { serve } from "@hono/node-server";

const server = await createDatafnServer({
  schema: {
    resources: [
      {
        name: "tasks",
        version: 1,
        fields: [
          { name: "id", type: "string", required: true, unique: true },
          { name: "title", type: "string", required: true },
          { name: "completed", type: "boolean", required: true, default: false },
        ],
      },
    ],
  },
  db: drizzleAdapter({ db: myDrizzleInstance, dialect: "postgres" }),
  authorize: async (ctx, action, payload) => {
    return true; // Allow all (replace with real logic)
  },
});

const app = new Hono();
app.route("/", toHono(server.router));
serve({ fetch: app.fetch, port: 3000 });

Configuration

DatafnServerConfig

interface DatafnServerConfig<TContext = any> {
  /** DataFn schema — validated and normalized at startup */
  schema: DatafnSchema;

  /** Database adapter (e.g. drizzleAdapter, memoryAdapter) */
  db?: Adapter;

  /** Server-side plugins */
  plugins?: DatafnPlugin[];

  /**
   * Authorization callback. Called on every request after JSON parsing.
   * Return true to allow, false to reject with 403 FORBIDDEN.
   */
  authorize?: (
    ctx: TContext,
    action:
      | "status" | "query" | "mutation" | "transact"
      | "seed" | "clone" | "pull" | "push" | "reconcile",
    payload: unknown,
  ) => Promise<boolean> | boolean;

  /** Configurable limits */
  limits?: {
    /** Maximum number of records per query (default: 100) */
    maxLimit?: number;
    /** Maximum transaction steps */
    maxTransactSteps?: number;
    /** Maximum request body size in bytes */
    maxPayloadBytes?: number;
  };

  /** Custom server time provider (for testing) */
  getServerTime?: () => number;

  /**
   * Enable REST endpoint generation.
   * When true, generates REST wrappers alongside DFQL endpoints.
   */
  rest?: boolean;

  /**
   * Auth context provider for per-user/tenant isolation.
   * Extracts user/tenant context from request to create isolated namespaces.
   */
  authContextProvider?: {
    getContext: (ctx: TContext) => AuthContext | Promise<AuthContext>;
  };

  /**
   * Redis adapter for atomic operations (serverSeq, rate limiting, caching).
   * Used when dbMapping.serverseq = "redis".
   */
  redis?: RedisAdapter;

  /**
   * KV store adapter for simple key-value operations.
   * Must support incr() for serverSeq. Used when dbMapping.serverseq = "kv".
   */
  kvStore?: KVStoreAdapter;

  /**
   * Database mapping — choose which backend for each concern.
   * Default: main database for everything.
   */
  dbMapping?: DbMapping;
}

DbMapping

Route different server concerns to different backends:

type DbMapping = {
  /** Backend for serverSeq counter: "db" | "redis" | "kv" */
  serverseq?: "db" | "redis" | "kv";
  /** Backend for rate limiting */
  ratelimiting?: "db" | "redis" | "kv";
  /** Backend for caching */
  cache?: "db" | "redis" | "kv";
};

Endpoints

GET /datafn/status

Server health check, schema hash, capabilities, and limits.

{
  "ok": true,
  "result": {
    "schemaHash": "a1b2c3...",
    "capabilities": [
      "dfql.query", "dfql.mutation", "dfql.transact",
      "sync.seed", "sync.clone", "sync.pull", "sync.push", "sync.reconcile"
    ],
    "limits": { "maxLimit": 100 },
    "serverTimeMs": 1707000000000
  }
}

POST /datafn/query

Execute DFQL queries with full filtering, sorting, pagination, and relation expansion.

Request:

{
  "resource": "tasks",
  "version": 1,
  "select": ["id", "title", "completed"],
  "filters": { "completed": false },
  "sort": ["title:asc"],
  "limit": 20
}

Response:

{
  "ok": true,
  "result": {
    "data": [
      { "id": "task:1", "title": "Buy milk", "completed": false }
    ],
    "nextCursor": null
  }
}

Filter operators:

| Operator | Example | Description | |----------|---------|-------------| | eq (implicit) | { "status": "active" } | Equals | | eq (explicit) | { "status": { "eq": "active" } } | Equals | | ne | { "status": { "ne": "archived" } } | Not equals | | gt / gte | { "age": { "gt": 18 } } | Greater than / or equal | | lt / lte | { "age": { "lt": 65 } } | Less than / or equal | | like | { "name": { "like": "%john%" } } | Case-sensitive pattern match | | ilike | { "name": { "ilike": "%john%" } } | Case-insensitive pattern match | | is_null | { "deletedAt": { "is_null": true } } | Is null | | is_not_null | { "email": { "is_not_null": true } } | Is not null | | in | { "status": { "in": ["active", "pending"] } } | In set | | nin | { "status": { "nin": ["archived"] } } | Not in set | | contains | { "tags": { "contains": "urgent" } } | Array/JSON contains |

Logical groups:

{
  "$or": [
    { "status": "active" },
    { "priority": { "gte": 4 } }
  ]
}

POST /datafn/search

Execute cross-resource search through the configured server searchProvider.

Request:

{
  "query": "test",
  "resources": ["tasks", "projects"],
  "limit": 20,
  "prefix": true,
  "fuzzy": 0.2,
  "fieldBoosts": { "title": 2, "name": 1 }
}

Response:

{
  "ok": true,
  "result": {
    "results": [
      { "resource": "tasks", "id": "task:1", "score": 12.34, "data": {} }
    ]
  }
}

Search option parity with client paths:

  • prefix enables prefix matching.
  • fuzzy enables fuzzy matching.
  • fieldBoosts applies field-level weighting.

These options are validated at route/query boundaries and forwarded to provider execution.

Aggregation queries:

{
  "resource": "tasks",
  "version": 1,
  "groupBy": ["status"],
  "aggregations": {
    "count": { "fn": "count" },
    "avgPriority": { "fn": "avg", "field": "priority" }
  },
  "having": { "count": { "gt": 5 } }
}

POST /datafn/mutation

Execute mutations with idempotency and optimistic concurrency.

Insert:

{
  "resource": "tasks",
  "version": 1,
  "operation": "insert",
  "clientId": "client:device-1",
  "mutationId": "m-001",
  "record": { "title": "New task", "completed": false }
}

Merge (partial update):

{
  "resource": "tasks",
  "version": 1,
  "operation": "merge",
  "clientId": "client:device-1",
  "mutationId": "m-002",
  "id": "task:abc",
  "record": { "completed": true }
}

Delete:

{
  "resource": "tasks",
  "version": 1,
  "operation": "delete",
  "clientId": "client:device-1",
  "mutationId": "m-003",
  "id": "task:abc"
}

Optimistic concurrency (if guards):

{
  "resource": "tasks",
  "version": 1,
  "operation": "merge",
  "id": "task:abc",
  "record": { "status": "completed" },
  "if": { "status": "in_progress" }
}

The mutation is rejected with CONFLICT if the guard condition is not met.

Relation operations:

{
  "resource": "tasks",
  "version": 1,
  "operation": "relate",
  "id": "task:1",
  "relation": "tags",
  "targetId": "tag:urgent"
}

| Operation | Description | |-----------|-------------| | relate | Create a relation between records | | unrelate | Remove a relation | | modifyRelation | Update relation metadata |

Response:

{
  "ok": true,
  "result": {
    "ok": true,
    "mutationId": "m-001",
    "affectedIds": ["task:new-uuid"],
    "errors": [],
    "deduped": false
  }
}
  • deduped: true means the mutation was already applied (idempotency).

POST /datafn/transact

Atomic multi-step transactions mixing queries and mutations.

Request:

{
  "transactionId": "tx-batch",
  "atomic": true,
  "steps": [
    {
      "query": {
        "resource": "tasks",
        "version": 1,
        "select": ["id"],
        "filters": { "completed": false }
      }
    },
    {
      "mutation": {
        "resource": "tasks",
        "version": 1,
        "operation": "merge",
        "id": "task:1",
        "record": { "completed": true }
      }
    }
  ]
}

Response:

{
  "ok": true,
  "result": {
    "ok": true,
    "results": [
      { "kind": "query", "ok": true, "result": { "data": [...] } },
      { "kind": "mutation", "ok": true, "result": { ... } }
    ]
  }
}

POST /datafn/clone

Full data download for initial hydration. Supports pagination.

Request:

{
  "version": 1,
  "tables": ["tasks", "categories"]
}

Response:

{
  "ok": true,
  "result": {
    "ok": true,
    "data": {
      "tasks": [{ "id": "task:1", "title": "..." }],
      "categories": [{ "id": "cat:1", "name": "..." }]
    },
    "cursors": {
      "tasks": "42",
      "categories": "15"
    }
  }
}

POST /datafn/pull

Incremental sync — only returns changes since the last cursor.

Request:

{
  "version": 1,
  "cursors": {
    "tasks": "42",
    "categories": "15"
  }
}

Response:

{
  "ok": true,
  "result": {
    "ok": true,
    "records": {
      "tasks": [{ "id": "task:3", "title": "New since last pull", "__deleted": false }]
    },
    "cursors": {
      "tasks": "44",
      "categories": "15"
    }
  }
}

POST /datafn/push

Upload offline mutations from the client changelog.

Request:

{
  "version": 1,
  "mutations": [
    {
      "resource": "tasks",
      "version": 1,
      "operation": "insert",
      "clientId": "client:device-1",
      "mutationId": "m-offline-1",
      "record": { "title": "Created offline" }
    }
  ]
}

POST /datafn/reconcile

Detect and resolve drift between local and remote state.

Request:

{
  "version": 1,
  "counts": {
    "tasks": 42,
    "categories": 8
  }
}

POST /datafn/seed

Seed data into the database.

Request:

{
  "data": {
    "tasks": [
      { "id": "task:seed-1", "title": "Seeded Task", "completed": false }
    ]
  }
}

Multi-User / Multi-Tenant Isolation

Isolate change tracking per user or tenant. Each namespace gets its own serverSeq counter, ensuring users see only their own changes.

const server = await createDatafnServer({
  schema,
  db: adapter,
  authContextProvider: {
    getContext: (ctx) => ({
      userId: ctx.session?.userId,
      tenantId: ctx.session?.tenantId,
    }),
  },
});

Namespace format:

  • No auth: datafn (default)
  • Single-tenant: user:${userId}
  • Multi-tenant: tenant:${tenantId}:user:${userId}

Behavior:

  1. On each request, authContextProvider.getContext(ctx) is called
  2. A namespace is derived from userId and optional tenantId
  3. All change tracking (serverSeq, change records) uses this namespace
  4. Pull/clone only returns changes from the user's namespace
  5. If getContext() fails, falls back to default datafn namespace

Sequence Store

The server uses a SequenceStore for generating monotonically increasing sequence numbers. Choose the backend that fits your infrastructure:

// Default: use main database
const server = await createDatafnServer({
  schema,
  db: adapter,
});

// Redis for high-throughput atomic INCR
const server = await createDatafnServer({
  schema,
  db: adapter,
  redis: myRedisAdapter,
  dbMapping: { serverseq: "redis" },
});

// KV store (e.g. Cloudflare KV, DynamoDB)
const server = await createDatafnServer({
  schema,
  db: adapter,
  kvStore: myKVAdapter,
  dbMapping: { serverseq: "kv" },
});

Available implementations:

| Store | Backend | Best for | |-------|---------|----------| | DatabaseSequenceStore | Main DB | Simple deployments | | RedisSequenceStore | Redis | High throughput, atomic INCR | | KVSequenceStore | KV store | Serverless environments | | ChainedSequenceStore | Chain | Primary + database chain (e.g. Redis → DB) |

createSequenceStore(config)

Factory that creates the appropriate store based on dbMapping:

import { createSequenceStore } from "@datafn/server";

const store = createSequenceStore({
  db: myAdapter,
  redis: myRedisAdapter,
  dbMapping: { serverseq: "redis" },
});

REST Endpoints (Optional)

Enable REST wrappers alongside DFQL endpoints:

const server = await createDatafnServer({
  schema,
  db: adapter,
  rest: true, // Enable REST endpoints
});

Generated routes:

| Method | Path | Maps to | |--------|------|---------| | GET | /datafn/resources/:resource | Query wrapper | | POST | /datafn/resources/:resource | Insert wrapper | | PATCH | /datafn/resources/:resource/:id | Merge wrapper | | DELETE | /datafn/resources/:resource/:id | Delete wrapper |


WebSocket

The server exposes a WebSocket handler for real-time cursor broadcasts:

const server = await createDatafnServer({ schema, db: adapter });

// Wire WebSocket connections to server handler
wss.on("connection", (ws) => {
  const client = { id: ws.id, send: (msg) => ws.send(msg) };
  server.websocketHandler.addClient(client);
  ws.on("message", (data) => server.websocketHandler.handleMessage(client, data));
  ws.on("close", () => server.websocketHandler.removeClient(client));
});

Plugins

Server-side plugins intercept queries, mutations, and sync:

const auditPlugin: DatafnPlugin = {
  name: "server-audit",
  runsOn: ["server"],

  afterMutation(ctx, mutation, result) {
    console.log("Server mutation:", mutation);
  },

  beforeSync(ctx, phase, payload) {
    console.log(`Sync ${phase} started`);
    return payload; // Return modified payload or original
  },
};

const server = await createDatafnServer({
  schema,
  db: adapter,
  plugins: [auditPlugin],
});

Server Instance

createDatafnServer returns a DatafnServer:

interface DatafnServer<TContext = any> {
  /** HTTP router — wire to your framework (Hono, Express, etc.) */
  router: Router<TContext>;

  /** WebSocket handler for real-time updates */
  websocketHandler: {
    addClient(client: WebSocketClient): void;
    removeClient(client: WebSocketClient): void;
    handleMessage(client: WebSocketClient, data: string): void;
  };
}

Use @superfunctions/http-hono or @superfunctions/http-express to mount the router:

// Hono
import { toHono } from "@superfunctions/http-hono";
app.route("/", toHono(server.router));

// Express
import { toExpress } from "@superfunctions/http-express";
app.use("/", toExpress(server.router));

Exports

// Server factory and types
export { createDatafnServer }
export type { DatafnServerConfig, DatafnServer }
export type { StatusResult }

// Sequence store
export type { SequenceStore, DbMapping }
export {
  createSequenceStore,
  RedisSequenceStore,
  KVSequenceStore,
  DatabaseSequenceStore,
  ChainedSequenceStore,
}

License

MIT