@trap_stevo/cynq
v0.0.14
Published
The next-generation, event-driven, real-time CI/CD and pipeline orchestration engine. Unifies pipelines, triggers, deployments, metrics, and real-time observability into a single, composable core — powering ultra-dynamic continuous integration and deliver
Maintainers
Keywords
Readme
🧩 @trap_stevo/cynq
Automation Evolved.
The next-generation, event-driven, real-time CI/CD and pipeline orchestration engine.
Unifying pipelines, triggers, deployments, metrics, and real-time observability into a single, composable core — powering ultra-dynamic continuous integration and delivery workflows across any environment.
🚀 Features
- 🧠 Pipeline-as-Code – Define fully declarative, composable pipelines that run anywhere
- ⚡ Real-Time Engine – React instantly to events, webhooks, and remote triggers
- 🕹️ Dynamic Drivers – Register sources, runners, triggers, and deployers with simple interfaces
- 🌐 Cynq Route Engine – Secure HTTP layer for remote enqueues and external control
- 📡 Remote Execution Ready – Dispatch builds or deployments to other instances
- 🔁 Queue & Retry Logic – Resilient queue system with per-tenant isolation and idempotency
- 🔒 Security-First Architecture – HMAC / JWT auth, replay protection, allowlists, and TTL enforcement
- 📊 Integrated Metrics – Seamless telemetry for build time, deploy time, and success ratios
- 🧩 Vault Integration – Secret and credential management for pipelines
- 🛰️ Multi-Target Sync Loops – Orchestrate multiple pipelines in parallel
- 🧹 Graceful Shutdowns – Unified
close()method to stop routes, triggers, and sync loops safely
⚙️ System Requirements
| Requirement | Version | |--------------|----------| | Node.js | ≥ 19.x | | npm | ≥ 9.x | | OS | Windows, macOS, Linux |
🧩 Built-In Drivers
| Kind | Driver | Description |
|-----------|--------------------------|-----------------------------------------------------------------|
| source | git | Clone repositories (HTTPS / SSH) |
| source | http-archive | Download and extract a .tgz from a URL |
| source | local-folder | Use an existing local directory as source |
| runner | shell | Execute local shell scripts or commands |
| runner | exec | Spawn any binary with args (no shell) |
| runner | node | Run a Node.js script/entrypoint with args |
| runner | delay | Simple wait/sleep step |
| deployer| copy-folder | Copy build artifacts between directories |
| deployer| atomic-replace-folder| Overlay/mirror in-place updates (skip-unchanged, native fast paths) |
| deployer| ssh-rsync | Deploy files over SSH using rsync |
| deployer| remote-exec | Run a shell script on a remote host via SSH |
| deployer| cynq-enqueue | Enqueue a job on this Cynq instance |
| deployer| remote-cynq-enqueue | Enqueue a job on a remote Cynq instance |
| trigger | webhook | Listen for GitHub, Gitea, or GitLab webhooks |
| trigger | poll | Poll remote HTTP endpoints or files for changes |
| trigger | fs-watch | React to filesystem events using Vigilite |
source: git
Clone a Git repository into the workspace.
Capabilities
- Protocols:
ssh,https - Non-interactive by default (
GIT_TERMINAL_PROMPT=0,GIT_ASKPASS=echo) - Shallow, single-branch, blob-filtered clones for speed
- Basic retry on transient failures
- Credentials are not persisted (
credential.helper="") - Errors redact tokens in logs
Config
with.repo(string, required) — HTTPS or SSH URL
e.g.https://github.com/org/repo.gitor[email protected]:org/repo.gitwith.branch(string, default:"main") — branch to clonewith.depth(number, default:1) — shallow clone depthwith.token(string, optional) — HTTPS PAT (supportsvault:). Injected ashttps://x-access-token:TOKEN@…- If both
tokenandsshKeyPathare provided, SSH takes precedence.
- If both
with.username(string, default:"x-access-token") — HTTPS username used withtokenwith.sshKeyPath(string, optional) — path to private key for SSH cloneswith.strictHostKeyChecking(boolean, default:false) — SSHStrictHostKeyCheckingtogglewith.commit(string, optional) — specific commit/tag/sha to checkout; falls back to a targeted fetch if outside shallow historywith.submodules(boolean, default:false) — init/update submodules recursively (shallow)with.sparse(string[], optional) — sparse checkout paths (applied after clone, before checkout)with.clean(boolean, default:false) — remove destination workspace folder before cloningwith.lfs("skip"|true|false, default: false) —skip: don’t smudge LFS;true: rungit lfs install --local- (env) Honors standard proxy/CA env vars:
HTTPS_PROXY,HTTP_PROXY,NO_PROXY,SSL_CERT_FILE,GIT_SSL_NO_VERIFY
Behavior notes
- Clones with:
--single-branch --depth <n> --filter=blob:none - If
commitis specified and not in shallow history, performs:fetch origin <commit> --depth 1then checks out - After clone, sets
credential.helper=""to avoid storing credentials - Logs redact
https://user:***@host/...to protect tokens
Security
- Using
tokeninjects the PAT into the clone URL (briefly visible in process args). For zero-exposure, prefer SSH keys or an ASKPASS helper.
Examples
HTTPS with vault-backed PAT:
{
driver: "git",
with: {
repo: "https://github.com/Schoolconomy-Inc/schoolconomy-universal-interface.git",
branch: "main",
depth: 1,
token: "vault:github.deploy.token" // stored via Cynq secrets
}
}SSH with deploy key:
{
driver: "git",
with: {
repo: "[email protected]:org/app.git",
branch: "main",
depth: 1,
sshKeyPath: "/home/cynq/id_ed25519",
strictHostKeyChecking: true
}
}Exact commit + sparse checkout + submodules:
{
driver: "git",
with: {
repo: "https://github.com/org/mono.git",
branch: "main",
depth: 1,
token: "vault:github.deploy.token",
commit: "b3f1c0de",
sparse: ["packages/service-a/", "packages/common/"],
submodules: true
}
}source: http-archive
Download a .tgz and extract to a temp workspace directory.
Config
with.url(string, required) — direct URL to.tgz
Example
{ driver: "http-archive", with: { url: "https://example.com/builds/app.tgz" } }source: http-archive
Download a .tgz and extract to a temp workspace directory.
Config
with.url(string, required) — direct URL to.tgz- (Output path returns via
{ path }frommaterialize)
Example
{ driver: "http-archive", with: { url: "https://example.com/builds/app.tgz" } }source: local-folder
Use an existing local directory.
Config
with.path(string, required) — absolute or relative path
Example
{ driver: "local-folder", with: { path: "./local-src" } }runner: shell
Run a shell command locally.
Config
with.script(string, required) — shell to executewith.workdir(string, optional) — defaults toctx.paths.srcwith.env(object, optional) — env vars to merge
Example
{ kind: "run", runner: "shell", with: { script: "npm ci && npm test", workdir: "${{ ctx.paths.src }}", env: { NODE_ENV: "test" } } }runner: exec
Spawn a process directly (no shell).
Config
with.cmd(string, required) — executable name/pathwith.args(string[], optional) — CLI argswith.workdir(string, optional) — defaults toctx.paths.srcwith.env(object, optional) — env vars to merge
Example
{ kind: "run", runner: "exec", with: { cmd: "node", args: ["--version"] } }runner: node
Run Node.js with provided args (uses process.execPath).
Config
with.args(string[], optional) — e.g.["scripts/build.js", "--flag"]with.workdir(string, optional) — defaults toctx.paths.srcwith.env(object, optional) — env vars to merge
Example
{ kind: "run", runner: "node", with: { args: ["scripts/build.js"], workdir: "${{ ctx.paths.src }}"} }runner: delay
Sleep for ms milliseconds.
Config
with.ms(number, required) — milliseconds to wait
Example
{ kind: "run", runner: "delay", with: { ms: 2000 } }deployer: copy-folder
Copy a folder’s immediate contents into a destination (non-recursive move of the top folder, but recursive copy of entries).
Config
with.from(string, required) — source directorywith.to(string, required) — destination directory (created if missing)
Example
{ kind: "deploy", deployer: "copy-folder", with: { from: "./dist", to: "/srv/app" } }deployer: atomic-replace-folder
Fast, robust in-place updates; supports overlay/mirror, skip-unchanged, and native fast paths.
Config
with.from(string, required) — built output directorywith.to(string, required) — live implementation directorywith.mode("overlay"|"mirror", default: "overlay") — merge or mirrorwith.keepBackup(boolean, default: false) — keep.bakwith.skipUnchanged(boolean, default: true) — size/mtime heuristicwith.native("auto"|"off"|"robocopy"|"rsync", default: "auto")with.concurrency(number, default: 64) — internal copier workerswith.move(boolean, default: false) — delete source after copy
Example
{ kind: "deploy", deployer: "atomic-replace-folder", with: { from: "./build-output", to: "/opt/cynq-implementation", mode: "mirror", keepBackup: true } }deployer: ssh-rsync
Deploy over SSH using rsync.
Config
with.host(string, required)with.user(string, required)with.from(string, required) — local folderwith.to(string, required) — remote folderwith.keyPath(string, optional) — SSH private keywith.args(string[], optional) — extra rsync args (adds to-az --delete)
Example
{ kind: "deploy", deployer: "ssh-rsync", with: { host: "10.0.0.5", user: "deploy", from: "./dist", to: "/var/www/app", keyPath: "/home/cynq/id_ed25519" } }deployer: remote-exec
Execute a script on a remote host via SSH.
Config
with.host(string, required)with.user(string, required)with.script(string, required) — remote command/scriptwith.keyPath(string, optional) — SSH private keywith.env(object, optional) — exported before script
Example
{ kind: "deploy", deployer: "remote-exec", with: { host: "10.0.0.5", user: "deploy", keyPath: "/home/cynq/id_ed25519", script: "sudo systemctl restart myapp", env: { NODE_ENV: "production" } } }deployer: cynq-enqueue
Enqueue a job on this Cynq instance.
Config
with.project(string, required)with.target(string, required)with.payload(object, optional)
Example
{ kind: "deploy", deployer: "cynq-enqueue", with: { project: "web", target: "smoke-tests", payload: { version: "1.2.3" } } }deployer: remote-cynq-enqueue
Enqueue a job on a remote Cynq instance via HTTP.
Config
with.url(string, required) — base URL of remote Cynqwith.project(string, required)with.target(string, required)with.auth(string, optional) — e.g.,Bearer ...with.payload(object, optional)
Example
{ kind: "deploy", deployer: "remote-cynq-enqueue", with: { url: "https://cynq.example.com", project: "frontend", target: "production", auth: "Bearer XYZ", payload: { version: "1.2.3" } } }trigger: webhook
HTTP webhook listener for GitHub / Gitea / Gogs / GitLab events (HMAC/JWT compatible via headers).
Config
with.path(string, default:"/cynq-webhook")with.port(number, default:3100)with.secret(string, optional) — HMAC/shared secretwith.authorization(string, optional) — exact match forAuthorizationheader
Example
{ driver: "webhook", with: { path: "/cynq-update", port: 3100, secret: "shared-secret" } }trigger: poll
Poll HTTP endpoints or a file for changes, then enqueue.
Config (HTTP mode)
with.mode("http")with.url(string, required)with.method(string, default: "GET")with.headers(object, optional)with.body(string, optional)with.onlyOnChange(boolean, default: true) — ETag/Last-Modified/hashwith.includeBody(boolean, default: false) — attach parsed body to eventwith.okStatusLow(number, default: 200)with.okStatusHigh(number, default: 299)with.maxBodyBytes(number, default: 65536)with.select(string, optional) — dot path inside JSON bodywith.intervalMS(number, default: 60000)with.jitterMS(number, default: 0)with.initialDelayMS(number, default: 0)with.errorBackoffMS(number, default: 5000)with.enqueueOnError(boolean, default: false)with.enqueueOnException(boolean, default: false)with.payload(object, optional) — passthrough
Config (File mode)
with.mode("file")with.file(string, required)with.hash(boolean, default: false) — include content hash in change detection- plus:
intervalMS,jitterMS,initialDelayMS,errorBackoffMS,payload
Example (HTTP)
{ driver: "poll", with: { mode: "http", url: "https://status.example.com/api/version", intervalMS: 300000, includeBody: true, select: "stable.version" } }Example (File)
{ driver: "poll", with: { mode: "file", file: "/var/artifacts/build.zip", hash: true, intervalMS: 10000 } }trigger: fs-watch
Watch filesystem changes using Vigilite and enqueue batches.
Config
with.path(string|string[], required) — directory/directories to watchwith.extensions(string[], default: [""])* — filter by extensionwith.debounceMS(number, default: 200) — batch debouncewith.maxWaitMS(number, default: 2000) — batch max wait- (advanced)
with.onSignalIf(fn),with.onReveal(fn) — optional hooks
Example
{ driver: "fs-watch", with: { path: ["/srv/app", "/srv/lib"], extensions: ["js","json"], debounceMS: 300 } }Registering a custom driver:
engine.registerDriver("runner", "python", (ctx) => ({
exec : async ({ with : w }) => {
const { spawn } = require("child_process");
await new Promise((resolve) =>
spawn("python", [w.script], { stdio : "inherit" })
.on("exit", resolve)
);
}
}));⏱️ Pipeline & Step Delays
Configure delays around the whole pipeline and around individual steps.
Pipeline-level delays
Under pipeline.delay:
beforeMs(number, default: 0) — wait before the first stepbetweenStepsMs(number, default: 0) — wait between stepsafterMs(number, default: 0) — wait after the last step (runs on success and failure, before cleanup)
Example
"pipeline": {
"source": { "driver": "git", "with": { "repo": "https://github.com/org/app" } },
"delay": { "beforeMs": 500, "betweenStepsMs": 250, "afterMs": 1000 },
"steps": [ ... ]
}Step-level delays
Each run and deploy step accepts:
with.delayBeforeMs(number, default: 0)with.delayAfterMs(number, default: 0)
Example
{ kind: "run", name: "tests", runner: "exec",
with: { cmd: "npm", args: ["test"], delayBeforeMs: 500, delayAfterMs: 250 } }🧹 Workspace Cleanup
Apply cleanup through spec settings or per-step requests.
1) Automatic cleanup (spec-driven)
Under pipeline.autoCleanup:
on("never" | "always" | "success" | "failure", default:"never")keep(boolean, default: false) — whentrue, log that cleanup got skipped
Example
"pipeline": {
"autoCleanup": { "on": "success", "keep": false },
"source": {...},
"steps": [...]
}2) Step-requested cleanup (imperative)
Steps may request cleanup via with.requestCleanup:
"now"— cleanup immediately after the step"finally"— cleanup at the very end (forces cleanup regardless ofautoCleanup.on)
Example
{ kind: "deploy", name: "publish", deployer: "copy-folder",
with: { from: "./dist", to: "/srv/app", requestCleanup: "finally" } }Order on success (simplified):
- Optional
pipeline.delay.afterMs - Cleanup when
autoCleanupallows or any step requested"finally"
Order on failure:
- Run
on.failureactions and emit failure event - Optional
pipeline.delay.afterMs - Cleanup when
autoCleanupallows or any step requested"finally"
📁 Path Usage & Resolution
Use these rules to avoid path confusion.
Context paths
ctx.paths.src— materialized source directory frompipeline.sourcectx.paths.workspace— workspace root for the attempt (often parent ofsrc)
Resolution rules
- Runners:
with.workdirdefaults toctx.paths.src
- Deployers:
with.fromresolves relative toctx.paths.srcwhen not absolute; otherwise use as givenwith.totreats value as absolute destination
http-archive source
- When extraction yields a single top-level folder,
materialize()returns that folder aspath - Otherwise
materialize()returns the extracted root directory - In both cases,
workspacepoints at the temp workspace base
Cleanup safety
- Deletion occurs only within the configured workspace base. When a path falls outside the base, Cynq refuses deletion.
Examples
// Runner defaults to src as workdir
{ kind: "run", runner: "shell", with: { script: "npm run build" } }
// Deployer resolves 'from' relative to src
{ kind: "deploy", deployer: "copy-folder", with: { from: "./dist", to: "/srv/app" } }🧠 Driver Authoring
Cynq enables fully modular extensions through drivers — small, isolated units that define custom logic for different stages of the pipeline lifecycle.
Drivers transform how builds run, deploy, trigger, and source artifacts across environments.
⚙️ Driver Fundamentals
| Concept | Description |
|----------|-------------|
| Driver | A pluggable logic unit that executes during a specific lifecycle phase (runner, deployer, trigger, source). |
| Registration | Drivers register with registerDriver(category, name, factoryFn) and expose lifecycle hooks. |
| Factory Function | Returns an object defining async lifecycle methods such as start, activate, or fetch. |
| Schema Contract | Each driver consumes a with object — user-defined parameters validated before execution. |
| Sandbox | Drivers run in isolated async contexts; timeouts and I/O guards ensure safe parallel execution. |
| Context Object (ctx) | Provides runtime utilities such as logging, storage, event emission, and vault access. |
🧰 Shared Context Reference
| Property | Type | Description |
|-----------|------|-------------|
| ctx.logger | Function | Structured log emitter; supports { level, msg, data }. |
| ctx.emit(event, data) | Function | Emits custom runtime events. |
| ctx.enqueue(target, payload, opts?) | Function | Adds new jobs dynamically to a target queue. |
| ctx.storage | CynqStorageEngine | Persistent store for queue, attempts, secrets, KV, and approvals. |
| ctx.vault | Vault | Abstracted key-value backend (may represent local, S3, SQL, etc.). |
| ctx.abortSignal | AbortSignal | Enables cooperative cancellation. |
| ctx.meta | Object | Contains metadata for the current project, tenant, and pipeline. |
| ctx.env | Object | Environment variables scoped to current job. |
🏃 Runner Drivers
Runner drivers execute actual work — builds, tests, packaging, or orchestration steps.
| Lifecycle Method | Description | Async |
|------------------|--------------|--------|
| start({ with : w }, ctx) | Begin job execution using configuration w. | ✅ |
| stop(ctx) | Optional cleanup logic or abort signal handling. | ✅ |
Example
cynq.registerDriver("runner", "shell-task", (ctx) => ({
start : async ({ with : w }) => {
const { exec } = require("child_process");
if (!w?.cmd) { throw new Error("Missing cmd parameter"); }
await new Promise((resolve, reject) => {
exec(w.cmd, { cwd : w.cwd || process.cwd(), env : { ...process.env, ...w.env } }, (err, stdout, stderr) => {
if (err) reject(stderr || err);
else { ctx.logger?.(`stdout: ${stdout}`); resolve(); }
});
});
}
}));Common with Parameters
| Key | Type | Description |
|------|------|-------------|
| cmd | string | Shell command to execute. |
| cwd | string | Optional working directory. |
| env | object | Optional environment overrides. |
🚀 Deployer Drivers
Deployer drivers publish or distribute artifacts — pushing to registries, cloud storage, or remote systems.
| Lifecycle Method | Description | Async |
|------------------|--------------|--------|
| activate({ with : w }, ctx) | Deploy artifacts or assets to remote target. | ✅ |
| rollback(ctx) | Optional rollback or cleanup handler. | ✅ |
Example
cynq.registerDriver("deployer", "artifact-uploader", (ctx) => ({
activate : async ({ with : w }) => {
const fs = require("fs");
const axios = require("axios");
if (!w?.url || !w?.filePath) {
throw new Error("artifact-uploader requires url and filePath");
}
const data = fs.createReadStream(w.filePath);
await axios.post(w.url, data, {
headers : {
"Content-Type" : "application/octet-stream",
Authorization : w.auth || ""
}
});
ctx.logger?.(`Uploaded ${w.filePath} to ${w.url}`);
}
}));🌐 Trigger Drivers
Trigger drivers listen for external or scheduled events and enqueue new jobs dynamically.
| Lifecycle Method | Description | Async |
|------------------|--------------|--------|
| start({ with : w }, enqueue) | Start event listener or schedule; call enqueue() when triggered. | ✅ |
| stop(ctx) | Optional stop handler to clean up listener. | ✅ |
Example
cynq.registerDriver("trigger", "webhook", (ctx) => ({
start : async ({ with : w }, enqueue) => {
const express = require("express");
const app = express();
app.use(express.json());
const route = w.path || "/hook";
const port = w.port || 3100;
app.post(route, async (req, res) => {
await enqueue({ event : "webhook", payload : req.body });
res.json({ ok : true });
});
app.listen(port, () => ctx.logger?.(`Webhook listening on :${port}${route}`));
}
}));📦 Source Drivers
Source drivers fetch repositories, packages, or other input materials before a build or deployment begins.
| Lifecycle Method | Description | Async |
|------------------|--------------|--------|
| fetch({ with : w }, ctx) | Acquire source content and return local path reference. | ✅ |
Example
cynq.registerDriver("source", "git-clone", (ctx) => ({
fetch : async ({ with : w }) => {
const { execSync } = require("child_process");
if (!w?.repo) throw new Error("Missing repo URL");
const dir = w.dir || `./workspace-${Date.now()}`;
execSync(`git clone ${w.repo} ${dir}`, { stdio : "inherit" });
ctx.logger?.(`Cloned ${w.repo} into ${dir}`);
return dir;
}
}));🧩 Driver Safety Guidelines
| Guideline | Purpose |
|------------|----------|
| Validate with object early | Prevent undefined behavior. |
| Handle exceptions cleanly | Throw structured errors for proper reporting. |
| Use context utilities | For logging, metrics, vault, and queue operations. |
| Avoid global state | Drivers run in parallel; isolation avoids conflicts. |
| Respect abort signals | Check ctx.abortSignal.aborted for cooperative termination. |
| Secure external calls | Validate URLs, use HTTPS, enforce small payloads. |
🧠 Advanced Driver Patterns
1. Composable Drivers
Use drivers that delegate to other registered drivers internally.
cynq.registerDriver("runner", "composite", (ctx) => ({
start : async ({ with : w }) => {
for (const step of w.steps) {
await ctx.enqueue(step.target, step.payload);
}
}
}));2. Stateful Deployers
Maintain incremental state via ctx.storage.kvFacade().
const kv = ctx.storage.kvFacade("deploy", "artifact");
await kv.put("lastVersion", w.version);🧩 Pipeline Spec Reference
The Pipeline Spec defines how a project builds, tests, deploys, and reacts to events.
Every spec describes sources, steps, environments, and follow-up actions under a unified JSON structure.
🧱 Top-Level Schema
| Key | Type | Description |
|------|------|-------------|
| name | string | Logical pipeline identifier. |
| triggers | array<object> | List of trigger definitions (e.g., webhook, cron, manual). |
| pipeline | object | Core job structure including source, steps, and on. |
| env | object | Static environment variables injected into every driver. |
| matrix | object | Optional parameter expansion to run multiple variants. |
| secrets | object | Vault-backed secret reference map. |
| description | string | Optional pipeline documentation text. |
Example
{
"name": "backend-prod",
"triggers": [
{ "driver": "webhook", "with": { "path": "/github", "port": 3100, "secret": "shared" } }
],
"pipeline": {
"source": { "driver": "git-clone", "with": { "repo": "https://github.com/org/app.git", "branch": "main" } },
"steps": [
{ "kind": "run", "name": "build", "runner": "shell-task", "with": { "cmd": "npm ci && npm run build" } },
{ "kind": "deploy", "name": "publish", "deployer": "artifact-uploader", "with": { "url": "https://cdn.example.com/upload", "filePath": "./dist.zip" } }
],
"on": {
"success": [
{ "kind": "deploy", "deployer": "cynq-enqueue", "with": { "project": "web", "target": "smoke-tests" } }
],
"failure": [
{ "kind": "run", "runner": "shell-task", "with": { "cmd": "bash scripts/rollback.sh" } }
]
}
},
"env": {
"NODE_ENV": "production",
"REGION": "us-east-1"
},
"matrix": {
"node": ["18", "20"],
"region": ["us-east-1", "eu-west-1"]
},
"secrets": {
"GITHUB_TOKEN": "vault:deploy.github"
}
}⚙️ triggers[]
Defines what initiates the pipeline.
Each trigger uses a registered driver and optional configuration.
| Key | Type | Description |
|------|------|-------------|
| driver | string | Trigger driver name (e.g., "webhook", "cron", "manual"). |
| with | object | Parameters specific to the driver. |
| filter | object | Optional condition (branch, event type). |
Example
{ "driver": "webhook", "with": { "path": "/hook", "secret": "abc123" } }🔗 pipeline.source
Defines how to retrieve or prepare the source materials before the build starts.
| Key | Type | Description |
|------|------|-------------|
| driver | string | Source driver name ("git-clone", "fetch-archive", etc.). |
| with | object | Source configuration (repository URL, branch, token, etc.). |
| cache | boolean | Enables reuse of previous checkouts if unchanged. |
Example
{ "driver": "git-clone", "with": { "repo": "https://github.com/org/app.git", "branch": "main" } }🧩 pipeline.steps[]
Describes ordered tasks inside the pipeline.
Each step specifies what to run, deploy, or trigger next.
| Key | Type | Description |
|------|------|-------------|
| kind | string | "run", "deploy", "fetch", or "custom". |
| name | string | Human-readable identifier for the step. |
| runner / deployer | string | Driver name used for the step. |
| with | object | Configuration passed to the driver. |
| continueOnError | boolean | Whether subsequent steps execute after failure. |
| timeoutMs | number | Optional timeout per step. |
Example
{ "kind": "run", "name": "build", "runner": "shell-task", "with": { "cmd": "npm run build" } }🔁 pipeline.on
Defines follow-up actions based on the result of the main pipeline execution.
| Key | Type | Description |
|------|------|-------------|
| success | array<object> | Steps to execute if the pipeline completes successfully. |
| failure | array<object> | Steps to execute on any failure. |
| always | array<object> | Steps that always run at the end regardless of outcome. |
Example
"on": {
"success": [
{ "kind": "deploy", "deployer": "cynq-enqueue", "with": { "project": "web", "target": "smoke-tests" } }
],
"failure": [
{ "kind": "run", "runner": "shell-task", "with": { "cmd": "bash rollback.sh" } }
]
}🌍 env
Defines environment variables that apply globally to all steps.
Can be overridden per step via its own with.env.
| Key | Type | Description |
|------|------|-------------|
| any | string | Environment variable name/value pairs. |
Example
"env": {
"NODE_ENV": "production",
"LOG_LEVEL": "debug"
}🧮 matrix
Generates multiple parallel pipeline runs for parameter combinations.
Each key defines an axis with possible values.
| Key | Type | Description |
|------|------|-------------|
| axis | array<string> | Each array defines possible values for that variable. |
Example
"matrix": {
"node": ["18", "20"],
"region": ["us-east-1", "eu-west-1"]
}This expands into four runs:
(18, us-east-1), (18, eu-west-1), (20, us-east-1), (20, eu-west-1).
🔐 secrets
Lists vault references used in the pipeline.
Each secret resolves securely at runtime through the configured vault backend.
| Key | Type | Description |
|------|------|-------------|
| secretName | string | Vault reference in the form vault:key.path. |
Example
"secrets": {
"GITHUB_TOKEN": "vault:deploy.github",
"DOCKER_PASSWORD": "vault:docker.pass"
}When the pipeline runs, these values resolve securely at runtime.
🧠 Execution Flow Summary
1️⃣ Trigger fires (webhook, manual, cron, etc.) 2️⃣ Source driver fetches code or assets 3️⃣ Steps execute sequentially or in matrix form 4️⃣ Environment and secrets inject automatically 5️⃣ On success/failure handlers run 6️⃣ Metrics, logs, and states persist securely through the configured storage backend
Every source, step, and secret becomes structured, reproducible, and composable.
🧭 Cynq Core Methods
| Method | Description | Async |
|--------|--------------|-------|
| deploy(project, target, spec, ctx) | Execute a pipeline immediately | ✅ |
| planPipeline(project, spec, ctx) | Preview a pipeline plan before execution | ✅ |
| previewPipeline(project, spec, ctx) | Produce a summarized execution preview | ✅ |
| validatePipeline(spec) | Validate a pipeline definition | ✅ |
| startTriggers(project, spec, ctx) | Start trigger listeners | ✅ |
| runOnce(project, target, spec, ctx) | Run a single queued job manually | ✅ |
| sync(project, target, spec, ctx) | Continuously process jobs for a target | ✅ |
| selfUpdate(overrides?, callCtx?, opts?) | Updates the running Cynq implementation (build → deploy → optional restart) | ✅ |
| registerDriver(kind, name, factory) | Register a custom driver | ❌ |
| resolve(kind, name, ctx) | Retrieve a driver instance by name | ❌ |
| startRoutes(override?) | Start the route engine | ✅ |
| stopRoutes() | Stop the route engine | ✅ |
| close() | Stop all routes, triggers, and loops safely | ✅ |
🧹 Graceful Shutdown
process.on("SIGINT", async () => {
await engine.close();
process.exit(0);
});Stops all routes, trigger listeners, and active synchronization loops cleanly.
🌐 Cynq Route Engine
Overview
Cynq Route Engine exposes a minimal, secure HTTP interface for remote interaction and job enqueueing.
| Route | Description |
|--------|--------------|
| POST /enqueue | Enqueue a new pipeline job remotely |
Configuration Example
engine : {
routes : {
enabled : true,
autoStart : true,
port : 3333,
hmacSecret : "super-secret",
ipAllowlist : ["10.0.0.0/24"],
maxBytes : "5mb",
rate : { capacity : 100, refillPerSec : 5 }
}
}Security Checklist
✅ HMAC or JWT authentication
✅ Replay protection (X-Timestamp, nonce, short TTL)
✅ IP allowlist and CIDR support
✅ Content-type and size limits
✅ Rate limiting and quotas
✅ Idempotency via Idempotency-Key header
🔄 Remote Communication
Remote Enqueue Example
{
"steps": [
{
"kind": "deploy",
"name": "trigger-next",
"deployer": "remote-cynq-enqueue",
"with": {
"url": "https://cynq-node-b.example.com",
"project": "frontend",
"target": "production",
"auth": "Bearer xyz123",
"payload": { "version": "1.2.0" }
}
}
]
}🛰️ Cynq Event Reference
Cynq emits structured events for real-time dashboards, external integrations, or telemetry pipelines.
Events follow a consistent envelope:
{
"event": "cynq:step:ok",
"project": "frontend",
"target": "production",
"timestamp": 1734819100000,
"data": { /* event-specific payload */ }
}🔔 Core Lifecycle Events
| Event | Description | Payload Fields |
|--------|-------------|----------------|
| cynq:trigger:received | Trigger signal enters queue | { trigger, headers, source } |
| cynq:pipeline:start | Pipeline begins execution | { name, project, target } |
| cynq:pipeline:ok | Pipeline completes successfully | { name, project, target, durationMs } |
| cynq:pipeline:fail | Pipeline fails at any step | { name, project, target, reason } |
| cynq:sync:start | Sync loop starts for target | { project, target } |
| cynq:sync:stop | Sync loop stops for target | { project, target } |
| cynq:queue:enqueue | Job enters queue | { project, target, jobId } |
| cynq:queue:dequeue | Job leaves queue for execution | { project, target, jobId } |
| cynq:queue:done | Queued job finishes | { project, target, jobId, status } |
⚙️ Step-Level Events
| Event | Description | Payload Fields |
|--------|-------------|----------------|
| cynq:step:start | Step execution begins | { step, kind, name, driver } |
| cynq:step:ok | Step completes successfully | { step, durationMs, driver } |
| cynq:step:fail | Step throws error or non-zero exit | { step, reason, driver } |
| cynq:step:retry | Step retries after transient failure | { step, attempt, reason } |
| cynq:step:skipped | Step bypassed due to condition | { step, reason } |
🚀 Deployment Events
| Event | Description | Payload Fields |
|--------|-------------|----------------|
| cynq:deploy:start | Deployment begins | { deployer, target, project } |
| cynq:deploy:ok | Deployment completes successfully | { deployer, target, durationMs } |
| cynq:deploy:fail | Deployment fails | { deployer, target, reason } |
| cynq:deploy:chain | Remote enqueue triggered | { deployer, nextProject, nextTarget } |
🧩 Diagnostic & Audit Events
| Event | Description | Payload Fields |
|--------|-------------|----------------|
| cynq:driver:load | Driver registers successfully | { kind, name } |
| cynq:driver:error | Driver registration fails | { kind, name, error } |
| cynq:storage:lock:acquire | Lock acquired | { key, namespace } |
| cynq:storage:lock:release | Lock released | { key, namespace } |
| cynq:storage:kv:set | Key written to store | { key, namespace } |
| cynq:storage:kv:delete | Key removed from store | { key, namespace } |
📊 Metrics Integration
All metric events flow through the same channel.
Metrics describe runtime timing, counts, and performance distribution across pipelines.
| Metric Key | Description |
|-------------|-------------|
| attempt.start | Marks job start |
| attempt.ok | Marks successful completion |
| attempt.fail | Marks job failure |
| step.time.ms | Measures duration of each step |
| deploy.time.ms | Measures deployment duration |
Example Metric Stream
{
"event": "metric",
"metric": "step.time.ms",
"labels": { "project": "frontend", "step": "build" },
"value": 5234
}🧠 Realtime Consumption
Consume events from the runtime emitter or integrate IoTide for networked streams:
engine.on("event", (evt, data) => {
console.log("[event]", evt, data);
});
// or with IoTide
engine.realtime.emit("cynq:step:ok", { step: "build", durationMs: 5123 });All events preserve deterministic naming (cynq:*) for consistent filtering across observability systems such as Grafana, Prometheus, or custom dashboards.
🪝 Realtime Hook Patterns
Cynq exposes an event stream for live monitoring, logging, and notification systems.
Hooks can run inline, forward to third-party services, or store in custom observability backends.
🔧 Basic Listener
Attach directly to the realtime emitter to react to all events.
const { Cynq } = require("@trap_stevo/cynq");
const engine = new Cynq({
realtime : { emit : (evt, data) => console.log("[event]", evt, data) }
});For targeted subscriptions, filter by prefix:
engine.on("cynq:step:ok", (data) => {
console.log(`[ok] ${data.step} → ${data.durationMs}ms`);
});🧠 Scoped Hook Helpers
Define scoped handlers to simplify observability integration.
engine.onPipelineStart = (fn) => engine.on("cynq:pipeline:start", fn);
engine.onPipelineEnd = (fn) => engine.on("cynq:pipeline:ok", fn);
engine.onPipelineFail = (fn) => engine.on("cynq:pipeline:fail", fn);
engine.onStepStart = (fn) => engine.on("cynq:step:start", fn);
engine.onStepEnd = (fn) => engine.on("cynq:step:ok", fn);
engine.onStepFail = (fn) => engine.on("cynq:step:fail", fn);
engine.onDeployStart = (fn) => engine.on("cynq:deploy:start", fn);
engine.onDeployEnd = (fn) => engine.on("cynq:deploy:ok", fn);
engine.onDeployFail = (fn) => engine.on("cynq:deploy:fail", fn);Example
engine.onStepEnd((data) => {
console.log(`✅ Step "${data.step}" completed in ${data.durationMs}ms`);
});💬 Slack and Discord Hooks
Integrate with messaging platforms by posting from realtime events.
const axios = require("axios");
engine.on("cynq:pipeline:ok", async (data) => {
await axios.post(process.env.SLACK_WEBHOOK_URL, {
text : `✅ Pipeline "${data.name}" succeeded in ${data.durationMs}ms`
});
});
engine.on("cynq:pipeline:fail", async (data) => {
await axios.post(process.env.DISCORD_WEBHOOK_URL, {
content : `❌ Pipeline "${data.name}" failed — reason: ${data.reason}`
});
});📊 Custom Telemetry Collector
Aggregate step times or pipeline durations for dashboards.
const metrics = [];
engine.on("cynq:step:ok", (data) => {
metrics.push({
step : data.step,
duration : data.durationMs,
ts : Date.now()
});
});Send data periodically:
setInterval(() => {
if (metrics.length === 0) { return; }
console.table(metrics);
metrics.length = 0;
}, 5000);🧩 Chained Reactivity
Forward events into another Cynq instance or any remote listener.
engine.on("cynq:deploy:ok", async (data) => {
await axios.post("https://remote-node.example.com/enqueue", {
project : "mirror",
target : "sync",
payload : data
}, { headers : { Authorization : "Bearer xyz" } });
});⚙️ Pattern Reference
| Pattern | Description | Example |
|----------|--------------|----------|
| engine.on(event, fn) | Subscribe to a single event | engine.on("cynq:step:ok", fn) |
| engine.once(event, fn) | Subscribe once, auto-unsubscribe | engine.once("cynq:pipeline:ok", fn) |
| engine.off(event, fn) | Remove a specific listener | engine.off("cynq:deploy:start", fn) |
| engine.emit(event, data) | Emit custom event manually | engine.emit("custom:event", {...}) |
| engine.realtime.emit(event, data) | Broadcast event across IoTide or networked backplane | engine.realtime.emit("cynq:step:ok", data) |
🛰️ Example: Unified Realtime Dashboard Feed
engine.on("*", (evt, data) => {
console.log(`[${new Date().toISOString()}] ${evt}`, data);
});Combined with IoTide, events propagate instantly between distributed nodes for visualization, alerting, and streaming dashboards.
🔐 Production Tip
Always sanitize payloads before broadcasting externally.
Avoid exposing secret paths, credentials, or internal error stacks in event handlers.
Forward only relevant metadata for telemetry or logs.
💡 Example: End-to-End Flow
1️⃣ Push event triggers webhook
2️⃣ Job enters queue and runs build
3️⃣ Deploy step copies files to target path
4️⃣ Remote instance receives follow-up trigger
5️⃣ Metrics update in real time
6️⃣ Route engine awaits next event
📡 Distributed Mesh Mode
Cynq operates in mesh topology when IoTide connects multiple instances together.
Each node participates in a shared event fabric that synchronizes jobs, telemetry, and triggers across regions or data centers.
🧩 Mesh Overview
| Role | Description | |------|--------------| | Primary Node | Publishes build, deploy, and event data to the mesh | | Replica Node | Receives mirrored events, executes delegated pipelines | | Observer Node | Listens only, no execution; ideal for dashboards or metrics aggregation |
All nodes authenticate using IoTide peer configuration and auto-discover each other through shared topics.
⚙️ Configuration Example
const { Cynq } = require("@trap_stevo/cynq");
const IoTide = require("@trap_stevo/iotide");
// Launch IoTide on each node
const tide = new IoTide(
3101,
{ useCors : true, useHTTPS : false, tidalCoreOptions : { namespace : "cynq-mesh" } },
true,
(socket) => {
console.log("[mesh] node connected:", socket.tideID);
// Automatically join the shared mesh channel
tide.joinChannel(socket.tideID, { roomName : "cynq-mesh", userID : socket.tideID });
}
);
const engine = new Cynq({
realtime : {
emit : (event, data) => tide.emitOnChannel("cynq-mesh", event, data)
},
logger : (msg) => console.log("[mesh]", msg)
});
await engine.sync("backend", "prod", spec);Each Cynq instance emits and listens on the same IoTide channel.
When one node triggers a job, others react instantly through IoTide propagation.
🔄 Mesh Synchronization Events
| Event | Description |
|--------|-------------|
| mesh:node:join | A new node connects to the mesh |
| mesh:node:leave | Node disconnects from the cluster |
| mesh:job:forward | Job forwarded to remote instance |
| mesh:job:ack | Remote instance acknowledges receipt |
| mesh:sync:status | Status update shared across peers |
🧠 Use Cases
| Scenario | Description | |-----------|-------------| | Geo-Redundant Deployments | Mirror builds and releases across multiple regions | | Multi-Tenant Builds | Partition tenants by node for load balancing | | Failover CI/CD | Standby nodes automatically resume pipelines on downtime | | Global Observability | Aggregate step and deploy metrics across all Cynq instances | | Event Broadcasts | Real-time alerts or dashboards updated via distributed emitters |
🔐 Security & Isolation
✅ Peer authentication via signed tokens
✅ Namespace-scoped topic isolation
✅ HMAC validation for message integrity
✅ Configurable mesh-level ACLs
✅ Built-in replay and tamper protection
🔁 Example: Mirrored Event Relay
tide.on("cynq:deploy:ok", (data) => {
console.log("[replica] received deploy confirmation", data);
// Optionally enqueue follow-up job on another node
});🛰️ Combined Architecture
┌────────────────────────────┐
│ Cynq Node A │
│ • Receives webhook │
│ • Runs build & deploy │
│ • Emits events via IoTide │
└───────────▲────────────────┘
│
Mesh Fabric (IoTide)
│
┌───────────▼────────────────┐
│ Cynq Node B │
│ • Receives event mirror │
│ • Performs post-deploy │
│ • Sends metrics upstream │
└────────────────────────────┘🧩 Cluster-Wide Behavior
- Shared event namespace (
cynq:*) - Job delegation through mesh forwarding
- Metrics and locks synchronized across nodes
- Vault data kept local per node (no secret broadcast)
- RouteEngine remains independent per instance
⚡ Example: Cross-Node Job Flow
1️⃣ Node A receives webhook → triggers build
2️⃣ Build completes → emits cynq:deploy:ok
3️⃣ Node B listens → forwards remote-cynq-enqueue job
4️⃣ Node B deploys to its environment
5️⃣ Cluster updates metrics in real time
🛡️ Production Tip
Keep mesh namespaces separate for staging vs production.
Always rotate mesh authentication tokens periodically and monitor node join events.
🌍 Outcome
Distributed Mesh Mode turns independent Cynq instances into a single cooperative CI/CD fabric — enabling horizontally scalable pipelines, real-time telemetry, and autonomous recovery across global environments.
🔐 Secret Management
const secrets = engine.storage.secretsFacade("project", "target", { tenantId : "demo" });
await secrets.put("deploy.token", "ghp_ABC123");
const token = await secrets.get("deploy.token");Use vault references inside pipeline specs:
{
"with": {
"repo": "https://github.com/org/repo.git",
"token": "vault:deploy.token"
}
}🛡️ Security Principles
- Immutable deployment logs
- HMAC-protected enqueue endpoints
- Optional JWT-based authentication
- Replay-safe timestamp verification
- Strict content-type enforcement
- Tenant and project isolation
🔁 Self-Updater
Cynq includes a built-in self-update pipeline letting a deployed Cynq implementation update itself automatically when changes exist.
Developers extend or modify the default steps to match their build, deployment, and restart process.
⚙️ Example: Updating a Deployed Cynq Implementation
await engine.selfUpdate({
pipeline : {
source : {
driver : "git",
with : {
repo : "https://github.com/user/my-cynq-implementation",
branch : "main"
}
},
steps : [
{
kind : "run",
name : "build-implementation",
runner : "shell",
with : { script : "npm run build" }
},
{
kind : "deploy",
name : "replace-deployed-cynq-implementation",
deployer : "atomic-replace-folder",
with : {
from : "./build-output",
to : "/opt/cynq-implementation",
mode : "mirror",
keepBackup : true
}
},
{
kind : "run",
name : "restart-cynq-service",
runner : "shell",
with : { script : "pm2 reload cynq" }
}
]
}
});This pipeline fetches the latest implementation, runs the defined build command, replaces the deployed Cynq implementation directory, and restarts the running service.
⚙️ Configuration
- Source required — specify a
pipeline.source.driver(git,local-folder, or other). - Restart control — configure how Cynq reloads itself with
restartSignalorskipParentSignal. - Cleanup — temporary update data clears automatically after successful updates.
Set{ cleanupOnSuccess : false }to retain temporary data for debugging.
⚡ Triggering Self-Update
The self-updater can run manually or through triggers:
Manual:
await engine.selfUpdate(overrides);Webhook Trigger:
await engine.deploy("cynq", "self", {
triggers : [
{ driver : "webhook", with : { path : "/cynq-update", port : 3100 } }
]
});Scheduled Trigger:
await engine.deploy("cynq", "self", {
triggers : [
{ driver : "poll", with : { mode : "http", url : "https://example.com/cynq-update", intervalMS : 3600000 } }
]
});Git Repository Trigger:
await engine.deploy("cynq", "self", {
triggers : [
{
driver : "git",
with : {
repo : "https://github.com/user/my-cynq-implementation",
branch : "main",
intervalMS : 60000, // check for new commits every 60 seconds
autoPull : true, // automatically pull when updates detected
autoDeploy : true // trigger selfUpdate automatically after pull
}
}
]
});Triggers allow Cynq to apply updates automatically on webhook events or on a timed schedule.
📦 Installation
npm install @trap_stevo/cynq⚡ Quick Start
Minimal Example
const { Cynq } = require("@trap_stevo/cynq");
const engine = new Cynq({
engine : {
routes : { enabled : true, autoStart : true, port : 3333 }
}
});
const spec = {
name : "web-deploy",
pipeline : {
source : { driver : "git", with : { repo : "https://github.com/user/app" } },
steps : [
{ kind : "run", name : "build", runner : "shell", with : { script : "npm run build" } },
{ kind : "deploy", name : "copy", deployer : "copy-folder", with : { from : "./dist", to : "/srv/app" } }
]
}
};
await engine.deploy("myProject", "production", spec, { clientAuth : { token : "xyz" } });Continuous Sync Example
const { Cynq } = require("@trap_stevo/cynq");
const engine = new Cynq({
engine : {
routes : { enabled : false },
workspaceBase : "./cynq-sync-demo"
},
vault : {
enableEncryption : true,
starAuthEnabled : false // disable authentication for this demo
},
logger : (m) => console.log("[cynq]", m)
});
const spec = {
name : "ci-sync",
pipeline : {
source : { driver : "git", with : { repo : "https://github.com/user/ci-sample.git" } },
steps : [
{ kind : "run", name : "build", runner : "shell", with : { script : "npm run build" } },
{ kind : "deploy", name : "publish", deployer : "copy-folder", with : { from : "./dist", to : "./deploy" } }
]
}
};
(async () => {
const project = "ci-demo";
const target = spec.name;
const callCtx = { clientAuth : { token : "cynq-auth-token" } };
// Continuous synchronization (keeps process alive)
const stopSync = await engine.sync(project, target, spec, callCtx, { interval : 1000 });
process.on("SIGINT", () => {
stopSync();
console.log("Sync stopped. Exiting...");
process.exit(0);
});
})();Web Server Integration Example
Run Cynq inside another long-lived service (web server, worker, etc.). The sync loop detaches from the event loop (keepAlive : false), so your host service controls process lifetime.
const express = require("express");
const path = require("path");
const { Cynq } = require("@trap_stevo/cynq");
const app = express();
// Cynq engine lives inside your service
const engine = new Cynq({
engine : {
routes : { enabled : false },
workspaceBase : path.resolve(".cynq-workspace")
},
vault : {
enableEncryption : true,
starAuthEnabled : false
},
logger : (m) => console.log("[cynq]", m)
});
// Your pipeline spec
const spec = {
name : "service-integrated-deploy",
pipeline : {
source : { driver : "git", with : { repo : "https://github.com/user/app.git", branch : "main" } },
steps : [
{ kind : "run", name : "build", runner : "shell", with : { script : "npm ci && npm run build" } },
{ kind : "deploy", name : "publish", deployer : "copy-folder", with : { from : "./dist", to : "./deployed/app" } }
]
}
};
let stopSync = null;
// Start non-blocking sync under your service lifecycle
app.post("/sync/start", async (req, res) => {
if (stopSync) {
res.status(409).send("Sync already running");
return;
}
const project = "serviceProject";
const target = spec.name;
const callCtx = { clientAuth : { token : "cynq-auth-token" } };
// keepAlive : false → detach internal timer; host service governs lifetime
stopSync = await engine.sync(project, target, spec, callCtx, {
interval : 1500,
keepAlive : false
});
res.send("Sync started (non-blocking, keepAlive:false)");
});
// Stop sync on demand
app.post("/sync/stop", async (req, res) => {
try {
stopSync?.();
stopSync = null;
res.send("Sync stopped");
} catch (e) {
res.status(500).send(`Stop error: ${e.message}`);
}
});
// Optional: single-run trigger endpoint (no background loop)
app.post("/deploy/once", async (req, res) => {
const project = "serviceProject";
const target = spec.name;
const callCtx = { clientAuth : { token : "cynq-auth-token" } };
try {
const id = await engine.runOnce(project, target, spec, callCtx);
res.send({ ok : true, attemptId : id });
} catch (e) {
res.status(500).send({ ok : false, error : e.message });
}
});
const server = app.listen(8080, () => {
console.log("[service] listening on :8080");
});
// Graceful shutdown under host control
const shutdown = async () => {
try { stopSync?.(); } catch {}
try { await engine.stopRoutes(); } catch {}
server.close(() => process.exit(0));
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);📜 License
⚡ Automation Evolved.
From commits to clouds, from triggers to telemetry — one intelligent engine unites it all.
Automate intelligently. Deploy infinitely.
