@fuguejs/host
v0.2.1
Published
The Fugue Host is a production-grade runtime that discovers, loads, and serves DAG-based AI workflows via HTTP. It polls a git repository for DAG definitions, validates them at load time, and exposes them as authenticated REST endpoints with concurrency l
Readme
@fuguejs/host
The Fugue Host is a production-grade runtime that discovers, loads, and serves DAG-based AI workflows via HTTP. It polls a git repository for DAG definitions, validates them at load time, and exposes them as authenticated REST endpoints with concurrency limiting, circuit breaking, and graceful shutdown. DAGs may suspend at human-review gates (durable approve/reject/resume over HTTP or Microsoft Teams) and reach the outside world through identity-scoped capabilities brokered per request.
Docs
Shipped in this package — read from node_modules/@fuguejs/host/docs/:
docs/writing-dags.md— theDagRegistration+fugue.yaml+ discovery contract DAG authors deploy against.docs/auth.md— admin / team-token / OIDC auth and team isolation.docs/deployment.md— container + Redis + OpenShift deployment.docs/hitl-teams.md— human-in-the-loop approvals in Microsoft Teams (webhook + Bot Framework transports).
For authoring DAGs themselves, see @fuguejs/framework/docs/llm-dag-authoring.md.
Deployment Model: One Host Per Team
Each team gets their own host instance. This gives you:
- Own API key — each team configures their LLM provider and key
- Own model choice — teams pick any model their provider supports (per-node in DAG code)
- Own provider — one team on Anthropic, another on OpenAI, another on Azure
- Blast radius isolation — runaway DAGs can't starve other teams
- Independent scaling — scale each team's host to their traffic pattern
┌───────────────────────────────┐ ┌───────────────────────────────┐
│ fugue-host (team: cx) │ │ fugue-host (team: leads) │
│ │ │ │
│ LLM_PROVIDER=openai │ │ LLM_PROVIDER=anthropic │
│ OPENAI_API_KEY=sk-cx-... │ │ ANTHROPIC_API_KEY=sk-ant-... │
│ ADMIN_TOKEN=<ops-token> │ │ ADMIN_TOKEN=<ops-token> │
│ DAGS_REPO_URL=team's repo │ │ DAGS_REPO_URL=team's repo │
│ │ │ │
│ DAGs: customer-summary, │ │ DAGs: lead-scoring, │
│ intent-classifier │ │ lead-opener │
└───────────────────────────────┘ └───────────────────────────────┘The auth layer still adds value in this model:
ADMIN_TOKEN= platform operations (monitoring, debugging, provisioning)fug_...team token = application credential (what the team's services use)- Applications never hold the admin key — separation of privilege
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Fugue Host │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ HTTP Layer (Hono) │ │
│ │ /health /readiness /admin/* /dags /dags/:id/run │ │
│ │ /runs/:id /runs/:id/approve /teams/messages │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────┐ ┌────────────┴─────────┐ ┌──────────────────┐ │
│ │ Auth │ │ Domain (pure) │ │ Sync Loop │ │
│ │ Middleware│ │ • HostState machine │ │ • git pull │ │
│ │ token+JWT│ │ • Registry (immutable) │ │ • module load │ │
│ │ │ │ • Concurrency limiter │ │ • registry swap │ │
│ │ │ │ • Circuit breaker │ │ │ │
│ └──────────┘ └───────────────────────┘ └──────────────────┘ │
│ │ │
│ ┌──────────────────────────┴──────────────────────────────────┐ │
│ │ Adapters (imperative) │ │
│ │ • GitSync (Bun.spawn → git) • ModuleLoader (import()) │ │
│ │ • TokenStore (Redis) • NodeContextFactory │ │
│ │ • HITL run store + worker • Capability broker │ │
│ │ (durable suspend/resume) (Keycloak token exchange, │ │
│ │ • Teams approval transport JWT validation, metering) │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┴──────────────────────────────────┐ │
│ │ Infrastructure │ │
│ │ Redis (tokens, cache, checkpoints, HITL run store) │ │
│ │ LLM (team's provider) Keycloak/Entra (identity, optional)│ │
│ └──────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘Quick Start
1. Start Redis
redis-server --daemonize yes2. Create a DAGs directory
mkdir -p /tmp/my-dags/dags/my-team/my-dag/prompts3. Write a DAG
// /tmp/my-dags/dags/my-team/my-dag/dag.ts
import { z } from "zod";
import type { DagRegistration } from "@fuguejs/host/contract";
import { defineDag, createLlmNode, DAG_INPUT } from "@fuguejs/framework";
const summarize = createLlmNode({
id: "summarize",
model: "gpt-4o-mini", // team picks their model
promptName: "summarize",
inputSchema: z.object({ text: z.string() }),
outputSchema: z.object({ summary: z.string() }),
buildInput: (input) => ({ text: input.text }),
});
const dag = defineDag({
id: "my-dag",
nodes: { summarize },
// Under 0.2.0 no node implicitly receives the request — feed the entry
// node explicitly from the DAG_INPUT ("$input") sentinel.
edges: [{ from: DAG_INPUT, to: "summarize" }],
outputNodeId: "summarize",
});
const registration: DagRegistration = {
dag,
inputSchema: z.object({ text: z.string() }),
meta: { description: "Summarize text", version: "1.0.0" },
};
export default registration;4. Add a prompt template
# /tmp/my-dags/dags/my-team/my-dag/prompts/summarize.txt
Text to summarize:
{{text}}
Produce a concise 2-3 sentence summary.5. Add fugue.yaml
# /tmp/my-dags/dags/my-team/my-dag/fugue.yaml
team: my-team6. Start the host
DAGS_LOCAL_PATH=/tmp/my-dags \
DAGS_REPO_URL=https://unused.git \
REDIS_URL=redis://localhost:6379 \
ADMIN_TOKEN=$(openssl rand -base64 32) \
LLM_PROVIDER=openai \
OPENAI_API_KEY=sk-proj-your-key \
PORT=3000 \
bun run packages/host/src/main.ts7. Provision the team token
# Create the team
curl -X POST http://localhost:3000/admin/teams \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{"team": "my-team", "label": "My team"}'
# → { "ok": true, "token": "fug_...", "team": "my-team" }
# Call the DAG with the team token
curl -X POST http://localhost:3000/dags/my-dag/run \
-H "Authorization: Bearer fug_<token-from-above>" \
-H "Content-Type: application/json" \
-d '{"text": "The quick brown fox jumped over the lazy dog."}'Configuration
All configuration is via environment variables, validated at startup with Zod. The host refuses to start if required variables are missing or invalid.
Required
| Variable | Description |
|----------|-------------|
| DAGS_REPO_URL | Git URL for the DAGs repository |
| REDIS_URL | Redis connection URL — all host instances can share one Redis instance (keys are namespaced) |
| ADMIN_TOKEN | Admin bearer token (min 16 chars) — used for team provisioning and ops |
LLM Provider (team chooses one)
| Variable | Required when | Description |
|----------|--------------|-------------|
| LLM_PROVIDER | Always (default: anthropic) | anthropic, openai, or azure |
| ANTHROPIC_API_KEY | LLM_PROVIDER=anthropic | Team's Anthropic API key |
| OPENAI_API_KEY | LLM_PROVIDER=openai | Team's OpenAI API key |
| AZURE_OPENAI_ENDPOINT | LLM_PROVIDER=azure | Team's Azure endpoint URL |
| AZURE_OPENAI_API_KEY | LLM_PROVIDER=azure | Team's Azure API key |
| AZURE_OPENAI_DEPLOYMENT | LLM_PROVIDER=azure | Team's Azure deployment name |
| AZURE_OPENAI_API_VERSION | No (default: 2025-03-01-preview) | Azure API version |
Optional
| Variable | Default | Description |
|----------|---------|-------------|
| DAGS_REPO_BRANCH | main | Branch to track |
| DAGS_POLL_INTERVAL_MS | 30000 | Git polling interval (ms) |
| DAGS_LOCAL_PATH | — | Skip git clone, read from this path (dev mode) |
| PORT | 3000 | HTTP listen port |
| REDIS_PROBE_INTERVAL_MS | 10000 | Redis liveness probe interval (ms) |
| MAX_GLOBAL_CONCURRENCY | 50 | Max concurrent DAG runs |
| DEFAULT_DAG_CONCURRENCY | 10 | Default per-DAG concurrency limit |
| DEFAULT_DAG_TIMEOUT_MS | 60000 | Default per-DAG timeout (ms) |
| MAX_DAG_TIMEOUT_MS | 120000 | Hard ceiling on DAG timeout |
| DRAIN_TIMEOUT_MS | 30000 | Graceful shutdown drain period |
| CIRCUIT_BREAKER_THRESHOLD | 5 | Failures before circuit opens |
| CIRCUIT_BREAKER_WINDOW_MS | 60000 | Failure counting window |
| CIRCUIT_BREAKER_COOLDOWN_MS | 30000 | Cooldown before half-open probe |
| DEFAULT_CACHE_TTL_MS | 300000 | Default cache entry TTL (5 min) |
| DEFAULT_CHECKPOINT_TTL_MS | 86400000 | Default checkpoint TTL (24 hr) |
| DOCUMENTS_ADAPTER | — | Documents capability adapter (fs) |
| DOCUMENTS_FS_ROOT | — | Root dir for fs documents (required when adapter=fs) |
| OTEL_EXPORTER_OTLP_ENDPOINT | — | OpenTelemetry exporter endpoint |
| MLFLOW_TRACKING_URI | — | MLflow tracking server URI |
| MLFLOW_EXPERIMENT_ID | — | MLflow experiment ID |
Human-in-the-Loop & Identity (optional)
Only needed when a DAG uses human-review gates or identity-scoped capabilities. HITL approvals run on the BullMQ-over-Redis backend, so a reachable REDIS_URL is required.
| Variable | Default | Description |
|----------|---------|-------------|
| HITL_APPROVAL_BASE_URL | — | Public base URL embedded in approval deep links |
| HITL_RUN_TTL_SEC | 604800 | Suspended-run retention (7 days) |
| HITL_LOCK_TTL_SEC | 300 | Per-run worker lock TTL |
| HITL_WORKER_CONCURRENCY | 4 | HITL resume-worker concurrency |
| TEAMS_WEBHOOK_URL | — | Incoming webhook for deep-link-out Teams approvals |
| BOT_APP_ID | — | Bot Framework app id (enables in-Teams button approvals; takes precedence over the webhook) |
| BOT_APP_PASSWORD | — | Bot Framework app secret |
| BOT_TOKEN_URL | — | Bot Framework token endpoint override |
| REALM_JWT_ISSUER | — | OIDC issuer for end-user (realm) JWT validation |
| REALM_JWT_AUDIENCE | fugue-host | Expected audience for realm JWTs |
| AGENT_CLIENT_SCOPES | — | Fail-closed scope policy for brokered capability tokens |
LLM & Model Selection
How models work
The LLM provider and API key are configured at the host level (one per team's host instance). The model is chosen per-node in DAG code:
const expensive = createLlmNode({
id: "deep-analysis",
model: "gpt-4o", // full-power model for complex reasoning
// ...
});
const cheap = createLlmNode({
id: "classify",
model: "gpt-4o-mini", // cheap model for simple classification
// ...
});Different nodes in the same DAG can use different models. The model string is sent directly to the provider API.
Provider-specific behavior
| Provider | Model routing |
|----------|--------------|
| OpenAI | Per-node model sent directly. Teams use any model their key has access to: gpt-4o, gpt-4o-mini, o4-mini, etc. |
| Anthropic | Per-node model sent directly: claude-sonnet-4-6, claude-haiku-4-5, etc. (use current ids, not the dated form). |
| Azure | AZURE_OPENAI_DEPLOYMENT overrides per-node model. All calls route through one deployment. Deploy multiple host instances for multiple Azure models. |
Authentication & Team Provisioning
The host authenticates bearer tokens across three identity tiers:
| Tier | Source | Purpose |
|------|--------|---------|
| Admin | ADMIN_TOKEN env var | Platform ops: provisioning, monitoring, debugging |
| Team | Generated via POST /admin/teams | Application credential: what the team's services use |
| User (OIDC) | Realm JWT (REALM_JWT_ISSUER / REALM_JWT_AUDIENCE) | End-user identity for identity-scoped capability brokering; verifier-gated and fail-closed (a JWT is rejected unless the realm verifier is configured) |
See docs/auth.md for the full three-tier model and the capability-broker flow.
First-time setup (at deploy time)
After the host starts, provision the team:
curl -X POST http://fugue-host:3000/admin/teams \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{"team": "cx", "label": "CX team production"}'Response:
{
"ok": true,
"token": "fug_a3x8k9m2pL4vR8nT1wF6jH3cY5aD0gEabc123xyzQ_dK-abcdef",
"team": "cx",
"label": "CX team production"
}⚠️ The token is shown ONCE. Only the SHA-256 hash is stored in Redis. If lost, revoke and re-create.
Store the token in the team's secret management (1Password, Vault, K8s secrets) and configure their applications to use it.
How applications authenticate
curl -X POST http://fugue-host:3000/dags/customer-summary/run \
-H "Authorization: Bearer fug_<team-token>" \
-H "Content-Type: application/json" \
-d '{"customerId": "cust-001"}'Why the team token layer exists (even with one team per host)
- Applications never hold the admin key — leaked app credential ≠ admin access
- Admin can revoke/rotate the app credential without restarting the host
- Clear audit separation: admin ops vs application traffic
- If you later add DAGs from multiple teams to the same host, isolation is already wired
For the full auth guide (token anatomy, security internals, rotation workflow, operational runbook), see docs/auth.md.
HTTP API
Unauthenticated
| Method | Path | Description |
|--------|------|-------------|
| GET | /health | Liveness probe — always 200 |
| GET | /readiness | Readiness — 200 when serving, 503 during boot/drain |
Admin (requires ADMIN_TOKEN)
| Method | Path | Description |
|--------|------|-------------|
| POST | /admin/teams | Provision a team token |
| GET | /admin/teams | List provisioned teams |
| DELETE | /admin/teams/:team | Revoke a team token (immediate) |
DAG Execution (requires valid token)
| Method | Path | Description |
|--------|------|-------------|
| GET | /dags | List registered DAGs |
| GET | /dags/:id/manifest | DAG schema/structure manifest |
| POST | /dags/:id/run | Execute a DAG (synchronous 200, or 202 queued when the DAG suspends at a human-review gate) |
| POST | /<custom-route> | Execute via custom route override |
Human-in-the-Loop (HITL)
A DAG that reaches a humanReview gate suspends and the run is durably parked. Approvers drive it forward over HTTP, or in Microsoft Teams (see docs/hitl-teams.md).
| Method | Path | Description |
|--------|------|-------------|
| GET | /runs/:runId | Poll a suspended/terminal run's status |
| POST | /runs/:runId/approve | Resolve a pending review (approve / reject / approve-with-edit / reroute) |
| POST | /teams/messages | Bot Framework messaging endpoint for in-Teams approvals |
Response Format
Success (200):
{
"ok": true,
"data": { "summary": "..." },
"runId": "f644de42-2085-44cf-880d-e9efd659c590",
"durationMs": 3200
}Suspended at a human-review gate (202):
{ "runId": "f644de42-2085-44cf-880d-e9efd659c590", "status": "queued" }Error (4xx/5xx):
{
"ok": false,
"error": "input-validation-failed",
"message": "input validation failed for DAG 'customer-summary': 1 issue(s)",
"dagId": "customer-summary",
"details": { "issues": [{ "path": ["customerId"], "message": "Required" }] }
}| Status | Error | When |
|--------|-------|------|
| 400 | input-validation-failed | Request body fails Zod schema |
| 400 | body-parse-failed | Not valid JSON |
| 401 | unauthorized | Missing or invalid token |
| 403 | forbidden | Token can't access this DAG's team |
| 404 | dag-not-found | DAG ID doesn't exist |
| 408 | timeout | Exceeded timeout |
| 429 | dag-concurrency-exceeded | Per-DAG concurrency limit hit |
| 429 | global-concurrency-exceeded | Global limit hit |
| 503 | dag-disabled | Circuit breaker open |
Writing DAGs
See the full guide in docs/writing-dags.md.
Directory Convention
your-dags-repo/
├── dags/
│ └── my-team/
│ └── customer-summary/
│ ├── dag.ts ← DagRegistration (default export)
│ ├── prompts/
│ │ └── synthesis.txt ← {{placeholder}} templates
│ └── fugue.yaml ← team, route, timeout overrides
├── package.json
└── bun.lockThe DagRegistration Contract
import type { DagRegistration } from "@fuguejs/host/contract";
const registration: DagRegistration = {
dag, // from defineDag()
inputSchema: InputSchema, // Zod schema for HTTP body validation
route: "/summarize", // optional custom route (default: /dags/:id/run)
config: {
timeoutMs: 90_000, // per-DAG timeout
maxConcurrent: 5, // per-DAG concurrency limit
cacheTtlMs: 600_000, // cache entry TTL
checkpointTtlMs: 86_400_000, // checkpoint TTL
circuitBreaker: { // per-DAG circuit breaker
failureThreshold: 3,
resetTimeoutMs: 15_000,
},
},
meta: { description: "...", version: "1.0.0" },
};
export default registration;Per-DAG Config (fugue.yaml)
team: my-team # team owning this DAG
owner: platform # individual owner (metadata)
route: /summarize # custom route override
maxConcurrent: 5 # concurrency limit
timeoutMs: 90000 # timeout (ms)
cacheTtlMs: 600000 # cache TTL
checkpointTtlMs: 86400000 # checkpoint TTL
env: # required env vars (fail-closed)
- MY_SECRET_KEYfugue.yaml wins over dag.ts config for any field it sets.
Hot Reload
DAGs are updated by pushing to the git repository. No host restart needed.
- Host polls git every
DAGS_POLL_INTERVAL_MS(default 30s) git pull --ff-only→ compare SHA- If
bun.lockchanged →bun install --frozen-lockfile - Discover and load all DAGs
- Atomically swap the registry (immutable snapshot)
- Force-reset all circuit breakers
A broken DAG file doesn't affect others — the host logs the error and continues serving healthy DAGs.
Deployment
See docs/deployment.md for the full OpenShift guide.
Minimal production setup
# Environment for a team's host instance
DAGS_REPO_URL=https://github.com/org/team-dags.git
REDIS_URL=redis://:password@redis:6379
ADMIN_TOKEN=<generated-32-byte-random>
LLM_PROVIDER=openai
OPENAI_API_KEY=sk-proj-team-key
PORT=3000After deploy:
# One-time: provision the team
curl -X POST http://host:3000/admin/teams \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{"team": "my-team"}'
# → Save the returned fug_... token for the team's appsTests
bun test # All tests across all packages
bun test packages/host # Host tests only
bun run typecheck # TypeScript validationFurther Documentation
| Document | Contents |
|----------|----------|
| docs/auth.md | Full auth guide: token anatomy, three-tier security model, rotation, operational runbook |
| docs/writing-dags.md | DAG authoring: directory convention, contract, prompts, per-DAG config |
| docs/deployment.md | OpenShift deployment: container, Redis, secrets, monitoring, scaling |
| docs/hitl-teams.md | Human-in-the-loop approvals in Microsoft Teams: webhook + Bot Framework transports |
