@vaultgradient/pipequery-lang
v1.9.4
Published
Pipe-based query language for filtering, transforming, and aggregating data
Maintainers
Readme
Features
Engine (npm library)
- Pipe-based syntax — chain operations with
|, inspired by Unix pipes and SQL - Minimal dependencies — lightweight core engine
- TypeScript-first — full type definitions included
- 25+ aggregate functions — basic, statistical, and financial aggregations
- LiveQuery — streaming queries with delta/patch support
- Editor support — CodeMirror 6, Monaco, and TextMate grammars
- React components — visual pipeline builder out of the box
CLI (pq)
- MCP server — plug pipequery into Claude Desktop, Claude Code, Cursor, Copilot, or any MCP client with one command
- 11 data sources — REST, WebSocket, file, static, Postgres, MySQL, SQLite, Kafka, Snowflake, ClickHouse, MongoDB
- Push-down to SQL + Mongo — pipe expressions targeting Postgres / MySQL / Snowflake / ClickHouse compile to native SQL; targeting MongoDB compile to
find()/aggregate(). No in-memory materialization for those engines. - Telegram bot + natural language —
pq telegram serveexposes the same MCP verbs in chat; pass--anthropic-keyand Claude translates plain English into pipequery - Alert watches —
pq watch addpolls a query and posts a Telegram notification on a state transition - Live API endpoints —
pq endpoint add /api/foo -q "..."exposes a JSON endpoint immediately, no config edit - Terminal dashboard — resizable TUI with 7 visualization types and live SSE updates
Install
npm install @vaultgradient/pipequery-langCLI — pq
PipeQuery also ships a CLI tool for building data pipelines, live API endpoints, terminal dashboards, MCP servers, and Telegram bots.
npm install -g @vaultgradient/pipequery-cli
pq init # scaffold a project
pq serve -d # start the server as a daemon
pq dashboard # launch the TUI dashboard
pq query "crypto | sort(price desc) | first(5)"
pq stop # stop the server
# Create a live API endpoint on the fly — no config needed
pq endpoint add /api/top-coins -q "crypto | sort(market_cap desc) | first(10)"
# → http://localhost:3000/api/top-coins is instantly availableData sources
11 source types out of the box, all addable at runtime via pq source add:
| Type | What it polls / streams | Push-down |
|------|--------------------------|-----------|
| rest | HTTP GET, with ${ENV_VAR} interpolation in url/headers/params and optional auth: bearer | — |
| websocket | WSS stream with optional subscribe payload + heartbeat keepalive (Binance / Coinbase / Kraken / etc.) | — |
| file | Local JSON / CSV with optional watch: mode | — |
| static | Inline JSON in yaml | — |
| postgres (incl. TimescaleDB) | SELECT query | ✅ where/sort/first/select/distinct/rollup/aggregates → SQL |
| mysql / MariaDB | SELECT query | ✅ same operator set, MySQL dialect |
| sqlite | Local SQLite file | — |
| kafka (incl. Redpanda) | Topic subscriber, ring-buffered | — |
| snowflake | SELECT query against Snowflake | ✅ same operator set, Snowflake dialect |
| clickhouse | SELECT query over HTTP/HTTPS | ✅ same operator set, ClickHouse dialect |
| mongodb | find() against a collection (with optional default filter) | ✅ where/sort/first/select → find(); rollup/aggregates → aggregate() |
Push-down is auto-routed: when a pipe expression targets a source whose adapter supports it, the engine compiles to native SQL (or Mongo plan) and runs it on the database — no in-memory materialization. Unsupported pipeline shapes transparently fall back to in-process execution.
Run with Docker
The server and CLI are fully decoupled — deploy the server anywhere with Docker and control it from your local terminal:
# Run the server in Docker
docker run -p 3000:3000 ghcr.io/andreadito/pipequery
# From any folder on your machine, connect and start working
pq remote connect http://localhost:3000
pq source add crypto -t rest -u "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&per_page=20" -i 30s
pq endpoint add /api/top -q "crypto | sort(market_cap desc) | first(5)"
curl http://localhost:3000/api/topWorks the same with a remote server — just pq remote connect https://my-server.example.com:3000.
The dashboard features a resizable 2-column grid with live SSE updates and 7 visualization types. See cli/README.md for full documentation.
Use with AI (MCP)
pq mcp serve # stdio — plug into Claude Desktop / Cursor / Claude Code
pq mcp serve --http --port 3001 # HTTP/SSE — for remote clients or hosted deployments
pq mcp serve --http --auth-token "$PIPEQUERY_MCP_TOKEN" # bearer-token auth on the HTTP transport
pq mcp serve --attach http://localhost:3000 # attach to a running `pq serve`Your AI agent gets 5 tools: query, list_sources, describe_source, list_endpoints, call_endpoint. Push-down auto-routing applies here too — agents asking "top 5 paid orders" against a Postgres / MySQL / Snowflake / ClickHouse / Mongo source compile to native SQL or Mongo plans automatically. See cli/README.md for client setup recipes.
Telegram bot + alerts
# Bot exposes the same MCP verbs as Telegram slash commands
PIPEQUERY_TG_BOT_TOKEN=<token> pq telegram serve --allow-user @yourname
# Add --anthropic-key to enable plain-English questions (Haiku 4.5 + prompt caching)
ANTHROPIC_API_KEY=sk-ant-... pq telegram serve --allow-user @yourname
# Add --log-file ./bot.jsonl to record every event as JSONL for jq / SIEM analysis
pq telegram serve --allow-user @yourname --log-file ./bot.jsonl
# Alert watches — query → notification on state transition
pq watch add btc-dip \
--query "crypto | where(symbol == 'BTC' && price < 50000)" \
--interval 60s --fire-when when_non_empty \
--telegram-chat-id -1001234567890 \
--telegram-message "🚨 BTC dipped: \${{ .price }}"Bot commands mirror MCP tools (/sources, /describe, /endpoints, /call, /query); plain text gets translated to pipequery via Anthropic's API when configured. Watches support when_non_empty (default), when_empty, and on_change fire conditions — idempotent across the chosen mode, no flapping. See cli/README.md for the full surface.
Quick Start
import { query } from '@vaultgradient/pipequery-lang';
const data = [
{ name: 'Laptop', price: 999, category: 'Electronics' },
{ name: 'Mouse', price: 29, category: 'Electronics' },
{ name: 'Desk', price: 349, category: 'Furniture' },
];
// Filter and sort
query(data, 'where(price > 100) | sort(price desc)');
// → [{ name: 'Laptop', ... }, { name: 'Desk', ... }]
// Aggregate
query(data, 'rollup(sum(price) as total, count() as n)');
// → [{ total: 1377, n: 3 }]
// Group and aggregate
query(data, 'groupBy(category) | rollup(avg(price) as avgPrice)');
// → [{ category: 'Electronics', avgPrice: 514 }, { category: 'Furniture', avgPrice: 349 }]Query Syntax
PipeQuery chains operations with the | pipe operator:
source | operation1(...) | operation2(...) | ...Operations
| Operation | Description | Example |
|-----------|-------------|---------|
| where(expr) | Filter rows | where(price > 100) |
| select(fields...) | Pick fields | select(name, price) |
| sort(expr dir) | Sort rows | sort(price desc) |
| groupBy(keys...) | Group rows | groupBy(category) |
| rollup(aggs...) | Group + aggregate | rollup(sum(price) as total) |
| join(source, cond) | Join tables | join(orders, id == orderId) |
| first(n) | Take first N | first(10) |
| last(n) | Take last N | last(5) |
| distinct(fields?) | Remove duplicates | distinct(category) |
| map(exprs...) | Transform rows | map(price * 1.1 as newPrice) |
| pivot(field, aggs) | Pivot table | pivot(region, sum(sales)) |
| flatten(field?) | Flatten arrays | flatten(tags) |
| transpose(header?) | Transpose matrix | transpose(name) |
| reduce(init, acc) | Reduce to scalar | reduce(0, $acc + price) |
Aggregate Functions
| Category | Functions |
|----------|-----------|
| Basic | sum, avg, min, max, count |
| Statistical | median, stddev, var, percentile, skew, kurt |
| Financial | vwap, wavg, drawdown, sharpe, calmar, sortino, info_ratio |
| Utility | distinct_count, sum_abs, first_value, last_value, pct |
Built-in Functions
| Category | Functions |
|----------|-----------|
| String | lower, upper, len, concat, trim, contains, startsWith, endsWith, substring, replace |
| Logic | if, coalesce |
| Math | abs, round |
where(contains(name, "Bitcoin")) // substring search
where(startsWith(symbol, "BT")) // prefix match
where(endsWith(email, ".com")) // suffix match
select(trim(name) as name) // strip whitespace
select(substring(name, 0, 5) as short) // extract first 5 chars
select(replace(name, "old", "new") as r) // replace all occurrencesWindow Functions
running_sum, running_avg, running_count, running_min, running_max, row_number, lag, lead
Expressions
price > 100 && category == 'Electronics' // boolean logic
price * 0.9 as discounted // arithmetic + alias
nested.field.path // dot accessAPI
query(data, expression)
Execute a query on data. Accepts a raw array or a named DataContext for multi-table queries.
import { query } from '@vaultgradient/pipequery-lang';
// Array shorthand
query(items, 'where(price > 50) | sort(name asc)');
// Named context (for joins)
query(
{ orders, customers },
'orders | join(customers, customerId == id) | select(orderId, name, total)'
);compile(expression)
Pre-compile a query for repeated use. Returns a reusable function.
import { compile } from '@vaultgradient/pipequery-lang';
const fn = compile('where(price > 100) | sort(price desc)');
const result = fn({ _data: items });parseQuery(expression)
Parse a query into its AST without executing.
import { parseQuery } from '@vaultgradient/pipequery-lang';
const ast = parseQuery('items | where(price > 100)');
// Inspect tokens, operations, expressionsliveQuery(data, expression, options)
Streaming query evaluator that accepts data patches and re-executes efficiently.
import { liveQuery } from '@vaultgradient/pipequery-lang';
const lq = liveQuery(initialData, 'where(active == true) | sort(updatedAt desc)', {
key: 'id',
throttle: 100,
});
lq.subscribe((result, stats) => {
console.log(`${stats.rowCount} rows in ${stats.totalMs}ms`);
});
// Push incremental updates
lq.patch([{ id: 1, active: true, updatedAt: Date.now() }]);
// Clean up
lq.dispose();clearCache()
Clear the internal compiled-query LRU cache (128 entries by default).
import { clearCache } from '@vaultgradient/pipequery-lang';
clearCache();Error Handling
All errors include position info (position, line, column) for editor integration.
import { LexerError, ParseError, RuntimeError, DataWeaveError } from '@vaultgradient/pipequery-lang';
try {
query(data, 'where(price >)');
} catch (e) {
if (e instanceof ParseError) {
console.log(`Syntax error at line ${e.line}, column ${e.column}`);
}
}Sub-packages
Syntax Highlighting
// CodeMirror 6
import { pipeQuery } from '@vaultgradient/pipequery-lang/highlighting';
// Monaco Editor
import { registerPipeQuery } from '@vaultgradient/pipequery-lang/highlighting';A TextMate grammar is included at dist/highlighting/textmate/pipequery.tmLanguage.json for VS Code, IntelliJ, and Sublime Text.
Peer dependencies: @codemirror/language, @codemirror/state, @codemirror/view, @lezer/highlight (CodeMirror only)
React Components
import { PipeQueryBuilder } from '@vaultgradient/pipequery-lang/react';
<PipeQueryBuilder
datasets={datasets}
onQueryChange={(query) => console.log(query)}
/>A visual pipeline builder component for constructing queries with drag-and-drop.
Peer dependencies: react, react-dom, @mui/material, @emotion/react, @emotion/styled, @mui/icons-material
Browser Support
Works in all modern browsers and Node.js 18+. The core engine uses only standard ES2020 features.
Contributing
Contributions are welcome! Please open an issue or submit a pull request.
git clone https://github.com/andreadito/pipequery.git
cd pipequery
npm install
npm testLicense
MIT © andreadito
