npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@vaultgradient/pipequery-lang

v1.9.4

Published

Pipe-based query language for filtering, transforming, and aggregating data

Readme


Features

Engine (npm library)

  • Pipe-based syntax — chain operations with |, inspired by Unix pipes and SQL
  • Minimal dependencies — lightweight core engine
  • TypeScript-first — full type definitions included
  • 25+ aggregate functions — basic, statistical, and financial aggregations
  • LiveQuery — streaming queries with delta/patch support
  • Editor support — CodeMirror 6, Monaco, and TextMate grammars
  • React components — visual pipeline builder out of the box

CLI (pq)

  • MCP server — plug pipequery into Claude Desktop, Claude Code, Cursor, Copilot, or any MCP client with one command
  • 11 data sources — REST, WebSocket, file, static, Postgres, MySQL, SQLite, Kafka, Snowflake, ClickHouse, MongoDB
  • Push-down to SQL + Mongo — pipe expressions targeting Postgres / MySQL / Snowflake / ClickHouse compile to native SQL; targeting MongoDB compile to find() / aggregate(). No in-memory materialization for those engines.
  • Telegram bot + natural languagepq telegram serve exposes the same MCP verbs in chat; pass --anthropic-key and Claude translates plain English into pipequery
  • Alert watchespq watch add polls a query and posts a Telegram notification on a state transition
  • Live API endpointspq endpoint add /api/foo -q "..." exposes a JSON endpoint immediately, no config edit
  • Terminal dashboard — resizable TUI with 7 visualization types and live SSE updates

Install

npm install @vaultgradient/pipequery-lang

CLI — pq

PipeQuery also ships a CLI tool for building data pipelines, live API endpoints, terminal dashboards, MCP servers, and Telegram bots.

npm install -g @vaultgradient/pipequery-cli

pq init                  # scaffold a project
pq serve -d              # start the server as a daemon
pq dashboard             # launch the TUI dashboard
pq query "crypto | sort(price desc) | first(5)"
pq stop                  # stop the server

# Create a live API endpoint on the fly — no config needed
pq endpoint add /api/top-coins -q "crypto | sort(market_cap desc) | first(10)"
# → http://localhost:3000/api/top-coins is instantly available

Data sources

11 source types out of the box, all addable at runtime via pq source add:

| Type | What it polls / streams | Push-down | |------|--------------------------|-----------| | rest | HTTP GET, with ${ENV_VAR} interpolation in url/headers/params and optional auth: bearer | — | | websocket | WSS stream with optional subscribe payload + heartbeat keepalive (Binance / Coinbase / Kraken / etc.) | — | | file | Local JSON / CSV with optional watch: mode | — | | static | Inline JSON in yaml | — | | postgres (incl. TimescaleDB) | SELECT query | ✅ where/sort/first/select/distinct/rollup/aggregates → SQL | | mysql / MariaDB | SELECT query | ✅ same operator set, MySQL dialect | | sqlite | Local SQLite file | — | | kafka (incl. Redpanda) | Topic subscriber, ring-buffered | — | | snowflake | SELECT query against Snowflake | ✅ same operator set, Snowflake dialect | | clickhouse | SELECT query over HTTP/HTTPS | ✅ same operator set, ClickHouse dialect | | mongodb | find() against a collection (with optional default filter) | ✅ where/sort/first/select → find(); rollup/aggregates → aggregate() |

Push-down is auto-routed: when a pipe expression targets a source whose adapter supports it, the engine compiles to native SQL (or Mongo plan) and runs it on the database — no in-memory materialization. Unsupported pipeline shapes transparently fall back to in-process execution.

Run with Docker

The server and CLI are fully decoupled — deploy the server anywhere with Docker and control it from your local terminal:

# Run the server in Docker
docker run -p 3000:3000 ghcr.io/andreadito/pipequery

# From any folder on your machine, connect and start working
pq remote connect http://localhost:3000
pq source add crypto -t rest -u "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&per_page=20" -i 30s
pq endpoint add /api/top -q "crypto | sort(market_cap desc) | first(5)"
curl http://localhost:3000/api/top

Works the same with a remote server — just pq remote connect https://my-server.example.com:3000.

The dashboard features a resizable 2-column grid with live SSE updates and 7 visualization types. See cli/README.md for full documentation.

Use with AI (MCP)

pq mcp serve                      # stdio — plug into Claude Desktop / Cursor / Claude Code
pq mcp serve --http --port 3001   # HTTP/SSE — for remote clients or hosted deployments
pq mcp serve --http --auth-token "$PIPEQUERY_MCP_TOKEN"   # bearer-token auth on the HTTP transport
pq mcp serve --attach http://localhost:3000  # attach to a running `pq serve`

Your AI agent gets 5 tools: query, list_sources, describe_source, list_endpoints, call_endpoint. Push-down auto-routing applies here too — agents asking "top 5 paid orders" against a Postgres / MySQL / Snowflake / ClickHouse / Mongo source compile to native SQL or Mongo plans automatically. See cli/README.md for client setup recipes.

Telegram bot + alerts

# Bot exposes the same MCP verbs as Telegram slash commands
PIPEQUERY_TG_BOT_TOKEN=<token> pq telegram serve --allow-user @yourname

# Add --anthropic-key to enable plain-English questions (Haiku 4.5 + prompt caching)
ANTHROPIC_API_KEY=sk-ant-... pq telegram serve --allow-user @yourname

# Add --log-file ./bot.jsonl to record every event as JSONL for jq / SIEM analysis
pq telegram serve --allow-user @yourname --log-file ./bot.jsonl

# Alert watches — query → notification on state transition
pq watch add btc-dip \
  --query "crypto | where(symbol == 'BTC' && price < 50000)" \
  --interval 60s --fire-when when_non_empty \
  --telegram-chat-id -1001234567890 \
  --telegram-message "🚨 BTC dipped: \${{ .price }}"

Bot commands mirror MCP tools (/sources, /describe, /endpoints, /call, /query); plain text gets translated to pipequery via Anthropic's API when configured. Watches support when_non_empty (default), when_empty, and on_change fire conditions — idempotent across the chosen mode, no flapping. See cli/README.md for the full surface.

Quick Start

import { query } from '@vaultgradient/pipequery-lang';

const data = [
  { name: 'Laptop', price: 999, category: 'Electronics' },
  { name: 'Mouse', price: 29, category: 'Electronics' },
  { name: 'Desk', price: 349, category: 'Furniture' },
];

// Filter and sort
query(data, 'where(price > 100) | sort(price desc)');
// → [{ name: 'Laptop', ... }, { name: 'Desk', ... }]

// Aggregate
query(data, 'rollup(sum(price) as total, count() as n)');
// → [{ total: 1377, n: 3 }]

// Group and aggregate
query(data, 'groupBy(category) | rollup(avg(price) as avgPrice)');
// → [{ category: 'Electronics', avgPrice: 514 }, { category: 'Furniture', avgPrice: 349 }]

Query Syntax

PipeQuery chains operations with the | pipe operator:

source | operation1(...) | operation2(...) | ...

Operations

| Operation | Description | Example | |-----------|-------------|---------| | where(expr) | Filter rows | where(price > 100) | | select(fields...) | Pick fields | select(name, price) | | sort(expr dir) | Sort rows | sort(price desc) | | groupBy(keys...) | Group rows | groupBy(category) | | rollup(aggs...) | Group + aggregate | rollup(sum(price) as total) | | join(source, cond) | Join tables | join(orders, id == orderId) | | first(n) | Take first N | first(10) | | last(n) | Take last N | last(5) | | distinct(fields?) | Remove duplicates | distinct(category) | | map(exprs...) | Transform rows | map(price * 1.1 as newPrice) | | pivot(field, aggs) | Pivot table | pivot(region, sum(sales)) | | flatten(field?) | Flatten arrays | flatten(tags) | | transpose(header?) | Transpose matrix | transpose(name) | | reduce(init, acc) | Reduce to scalar | reduce(0, $acc + price) |

Aggregate Functions

| Category | Functions | |----------|-----------| | Basic | sum, avg, min, max, count | | Statistical | median, stddev, var, percentile, skew, kurt | | Financial | vwap, wavg, drawdown, sharpe, calmar, sortino, info_ratio | | Utility | distinct_count, sum_abs, first_value, last_value, pct |

Built-in Functions

| Category | Functions | |----------|-----------| | String | lower, upper, len, concat, trim, contains, startsWith, endsWith, substring, replace | | Logic | if, coalesce | | Math | abs, round |

where(contains(name, "Bitcoin"))          // substring search
where(startsWith(symbol, "BT"))           // prefix match
where(endsWith(email, ".com"))            // suffix match
select(trim(name) as name)               // strip whitespace
select(substring(name, 0, 5) as short)   // extract first 5 chars
select(replace(name, "old", "new") as r) // replace all occurrences

Window Functions

running_sum, running_avg, running_count, running_min, running_max, row_number, lag, lead

Expressions

price > 100 && category == 'Electronics'   // boolean logic
price * 0.9 as discounted                  // arithmetic + alias
nested.field.path                          // dot access

API

query(data, expression)

Execute a query on data. Accepts a raw array or a named DataContext for multi-table queries.

import { query } from '@vaultgradient/pipequery-lang';

// Array shorthand
query(items, 'where(price > 50) | sort(name asc)');

// Named context (for joins)
query(
  { orders, customers },
  'orders | join(customers, customerId == id) | select(orderId, name, total)'
);

compile(expression)

Pre-compile a query for repeated use. Returns a reusable function.

import { compile } from '@vaultgradient/pipequery-lang';

const fn = compile('where(price > 100) | sort(price desc)');
const result = fn({ _data: items });

parseQuery(expression)

Parse a query into its AST without executing.

import { parseQuery } from '@vaultgradient/pipequery-lang';

const ast = parseQuery('items | where(price > 100)');
// Inspect tokens, operations, expressions

liveQuery(data, expression, options)

Streaming query evaluator that accepts data patches and re-executes efficiently.

import { liveQuery } from '@vaultgradient/pipequery-lang';

const lq = liveQuery(initialData, 'where(active == true) | sort(updatedAt desc)', {
  key: 'id',
  throttle: 100,
});

lq.subscribe((result, stats) => {
  console.log(`${stats.rowCount} rows in ${stats.totalMs}ms`);
});

// Push incremental updates
lq.patch([{ id: 1, active: true, updatedAt: Date.now() }]);

// Clean up
lq.dispose();

clearCache()

Clear the internal compiled-query LRU cache (128 entries by default).

import { clearCache } from '@vaultgradient/pipequery-lang';

clearCache();

Error Handling

All errors include position info (position, line, column) for editor integration.

import { LexerError, ParseError, RuntimeError, DataWeaveError } from '@vaultgradient/pipequery-lang';

try {
  query(data, 'where(price >)');
} catch (e) {
  if (e instanceof ParseError) {
    console.log(`Syntax error at line ${e.line}, column ${e.column}`);
  }
}

Sub-packages

Syntax Highlighting

// CodeMirror 6
import { pipeQuery } from '@vaultgradient/pipequery-lang/highlighting';

// Monaco Editor
import { registerPipeQuery } from '@vaultgradient/pipequery-lang/highlighting';

A TextMate grammar is included at dist/highlighting/textmate/pipequery.tmLanguage.json for VS Code, IntelliJ, and Sublime Text.

Peer dependencies: @codemirror/language, @codemirror/state, @codemirror/view, @lezer/highlight (CodeMirror only)

React Components

import { PipeQueryBuilder } from '@vaultgradient/pipequery-lang/react';

<PipeQueryBuilder
  datasets={datasets}
  onQueryChange={(query) => console.log(query)}
/>

A visual pipeline builder component for constructing queries with drag-and-drop.

Peer dependencies: react, react-dom, @mui/material, @emotion/react, @emotion/styled, @mui/icons-material

Browser Support

Works in all modern browsers and Node.js 18+. The core engine uses only standard ES2020 features.

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

git clone https://github.com/andreadito/pipequery.git
cd pipequery
npm install
npm test

License

MIT © andreadito