@ciromaciel/workflow
v1.0.1
Published
Engine declarativo para workflows agênticos: 6 camadas (workflow, authority, data, eval, audit, recovery) em volta da execução do agente.
Maintainers
Readme
@ciromaciel/workflow
Engine de orquestração para agentes/sistemas com governança embutida: capability tokens, eval gates determinísticos, audit append-only, saga/compensação, connectors com idempotência e credential vault. Spec declarativa, runner imperativo, zero magia.
Runtime: este pacote requer Bun >= 1.0 (usa
bun:sqliteno audit store). Node não é suportado.
Install
bun add @ciromaciel/workflowimport {
Engine, Workflow, Authority, Data, Eval, Audit,
Recovery, Agent, Credentials, Connectors, Errors, Utils,
} from '@ciromaciel/workflow';
const engine = new Engine();
engine.registerConnector('gmail', Connectors.createGmail({ /* ... */ }));
engine.registerConnector('github', Connectors.createGitHub({ /* ... */ }));Tudo (core + todos os connectors) é entregue num único bundle minificado (dist/index.js, ~100 KB). As SDKs externas (googleapis, imapflow, zod, ulid, @anthropic-ai/sdk) são dependencies resolvidas pelo seu node_modules — não viajam dentro do bundle.
workflow/
├── src/ # fontes (publicadas junto, pra leitura/debug)
│ ├── engine.js # composição de todas as camadas
│ ├── index.js # API pública (Engine, Workflow, Connectors, ...)
│ ├── workflow/ # spec declarativa + runner (frame queue)
│ ├── authority/ # Capability + Policy
│ ├── data/ # Resolver + adapters
│ ├── eval/ # gates (Zod / boolean / LLM-as-judge)
│ ├── audit/ # event store (bun:sqlite, append-only)
│ ├── recovery/ # Saga + retry/escalate/compensate
│ ├── agent/ # Executor + providers (Anthropic, OpenRouter, Stub)
│ ├── credentials/ # Vault + Ref + adapters
│ ├── connectors/ # base + registry + transport + providers
│ └── utils/ # errors com `code` estável, ids ULID
├── dist/ # bundle minificado (gerado por `bun run build`)
│ └── index.js
├── samples/ # 01-minimal → 10-connector-compensation
└── build.js # script de build (Bun.build + minify)Build / publicação
bun run build # gera dist/index.js minificado
npm pack --dry-run # inspeciona o que vai pro tarballO prepublishOnly roda o build antes de qualquer npm publish / release. dist/ não é versionado (está no .gitignore).
Mental model
O agente nunca toca recurso direto. Cada step declara o que ele pode fazer (reads/writes/invokes/scopes/spend/ttl). A Policy mint uma Capability com esses limites; o handler recebe a capability e só consegue acessar Data/Connectors/Agents passando ela. Tudo o que acontece vai parar no EventStore com um runId rastreável. Se algo falha, a Saga desfaz writes na ordem inversa.
Workflow.spec ──► Engine.run() ──► WorkflowRunner
│
│ para cada step:
▼
┌────────────────────┐
│ Policy.mintFor() │ ► Capability (ttl, spend, invokes...)
│ preEval.gate() │
│ handler(ctx) ──────┼──► Data / Connectors / Agent
│ postEval.gate() │ (todos exigem capability)
│ saga.record() │
└────────────────────┘
│
▼ next | fanout | onFail
EventStore (append-only)Camadas — uma propriedade do Engine por camada
| Camada | Acesso | Responsabilidade |
|---|---|---|
| Workflow | Workflow.defineWorkflow({...}) | Spec declarativa validada por Zod. entry + steps + grafo next. |
| Engine | new Engine({ auditPath, policyRules }) | Composição. Expõe audit, credentials, policy, data, evaluator, connectors, agents. |
| Authority | engine.policy | Policy.mintFor(step, ctx) cria Capability imutável (reads/writes/invokes/scopes/spendLimit/callBudget/ttl/dryRun). |
| Data | engine.data | fetch(resource, query, cap) / write(...). Verifica cap.canRead/canWrite. Retorna lineage. Adapters plugáveis. |
| Eval | engine.evaluator | define(id, fn) + gate(ids, value). Helpers: Eval.schemaRule(zod), Eval.boolRule(pred, msg). |
| Audit | engine.audit | EventStore SQLite append-only. byRun(runId), byStep(runId, stepId), lastCheckpoint(runId). |
| Recovery | spawned por run | Saga (LIFO) + estratégias retry(n,backoff), escalate(channel), compensate(fn). |
| Agent | engine.agents | AgentExecutor(provider) — único caminho pra LLM. Cobra usage.cost na capability. |
| Credentials | engine.credentials | Vault.register(handle, {adapter, scopes, ...}) → CredentialRef opaco. Secret nunca atravessa handler/audit. |
| Connectors | engine.connectors | Registry rotea invoke(name, action, payload, cap, ctx) — input/output Zod, idempotency, units, dryRun, audit. |
Core do engine vem de @ciromaciel/workflow; cada connector vem do seu subpath.
import {
Engine, Workflow, Authority, Data, Eval, Audit,
Recovery, Agent, Credentials, Errors, Utils,
} from '@ciromaciel/workflow';
import { createGmail } from '@ciromaciel/workflow/connectors/gmail';
import { defineConnector } from '@ciromaciel/workflow/connectors';Spec de um workflow
Workflow.defineWorkflow valida com Zod (ver spec.js). Cada step:
{
id: 'classify',
description: '...',
owner: 'agent', // 'agent' | 'human' | 'system'
// Schemas opcionais (acionados por preEvalRules/postEvalRules)
inputSchema, outputSchema,
// Authority — data
reads: ['docs.inbox'],
writes: ['docs.classified'],
// Authority — connectors
invokes: ['github:issue.create'],
scopes: ['repo'], // OAuth/API scopes
callBudget: { youtube: 100 }, // units por connector
credential: 'github.bot', // handle no Vault
dryRun: false,
confirmRequired: false, // futuro: human approval
// Spend / time
spendLimit: 0.30, // USD — AgentExecutor cobra usage.cost
ttlMs: 60_000, // expiração da capability
// Eval gates
preEvalRules: ['has_batch_id'],
postEvalRules: ['all_have_category'],
// Execução
handler: async (ctx) => { ... },
compensate: async (ctx) => { ... }, // chamado pela Saga em rollback
// Controle de fluxo
next: 'persist', // string → próximo step
// OU
next: { // object → branch (handler retorna { branch, input })
high: 'review_human',
low: 'auto_approve',
default: 'auto_approve', // OBRIGATÓRIO
},
onFail: 'retry', // 'retry' | 'escalate' | 'compensate' | 'abort'
recovery: async (err, ctx) => ({...}),// override de onFail
}ctx do handler
{
runId, // ULID
stepId, // se for fan-out, vem stampado: `step#0`, `step#1`...
input, // output do step anterior (ou initialInput)
capability, // Capability imutável (TTL, spend, invokes, scopes...)
engine, // engine completo — use eng.data, eng.agent('name')
connectors, // { invoke, run, get, describe } pré-bound ao runId/stepId/cap
fanCtx, // null OU { forEachId, index, total, item }
logger: (type, payload) => audit.append(runId, `step.log.${type}`, payload, stepId),
}Controle de fluxo
- Linear —
next: 'nextStepId'ounext: null(terminal). - Branch (
case) —next: { high: 's1', low: 's2', default: 's2' }. O handler retorna{ branch: 'high', input: {...} }e o runner seguenext[branch] ?? next.default. - Fan-out — handler retorna
{ __fanout: true, items: [...] }. O runner enfileira N frames paranext, um por item. Cada frame carregafanCtx. Sibling failures não matam outros itens (registrados emfanoutFailures). O run finaliza com sucesso se ≥1 frame completar.
Capability — o coração da governança
Policy.mintFor(step, ctx) cria uma Capability imutável que carrega tudo do step + rules da policy. Nenhuma I/O passa sem capability:
capability.canRead('docs.inbox') // boolean
capability.canWrite('docs.classified') // boolean
capability.canInvoke('github:issue.create') // suporta 'github:*' e '*'
capability.hasScope('repo')
capability.charge(0.02) // cobra USD — estoura → AuthorityError
capability.chargeUnits('youtube', 1) // cobra quota
// expira por TTL (Date.now() < expiresAt)Policy rules
Policy aplica uma pipeline de funções (draft, step, ctx) => draft antes de instanciar a Capability:
import { Authority } from '@ciromaciel/workflow';
const engine = new Engine({
policyRules: [
Authority.readOnlyInTest, // zera writes/invokes em ctx.env==='test'
Authority.tightTtlForSensitive(['pii.records']), // TTL ≤ 10s pra recursos sensíveis
Authority.forceDryRun('STAGING'), // dryRun se process.env.STAGING==='1'
Authority.capByConnector({ youtube: 50, meta: 100 }), // teto global
(draft) => ({ ...draft, spendLimit: Math.min(draft.spendLimit, 0.50) }),
],
});Rules são puras e testáveis. Para enforcement complexo, plugue OPA aqui.
Eval gates
Determinísticos por padrão (Zod / boolean). LLM-as-judge usa a mesma interface com capability separada.
import { Eval } from '@ciromaciel/workflow';
engine.evaluator.define('valid_input',
Eval.schemaRule(z.object({ n: z.number().positive() })));
engine.evaluator.define('all_classified',
Eval.boolRule(
(v) => v.classified.every((c) => c.confidence >= 0.5),
'baixa confiança',
));
// No step:
{ preEvalRules: ['valid_input'], postEvalRules: ['all_classified'] }Falha no gate → EvalError com details.report (lista de rules + reasons). Runner trata como falha normal (passa por onFail).
Audit — event sourcing
Tudo vai pro EventStore (bun:sqlite por padrão, plugável). Eventos canônicos:
run.started, run.completed, run.failed
step.started, step.capability, step.preEval, step.executed, step.postEval,
step.completed, step.failed, step.recovery, step.log.<type>
step.connectorInvoke, step.connectorResult, step.connectorRateLimited
fanout.start, fanout.empty, fanout.item.failedconst engine = new Engine({ auditPath: './audit.sqlite' });
// ...
const { runId } = await engine.run(workflow, input);
for (const e of engine.audit.byRun(runId)) {
console.log(`[${e.step_id ?? 'run'}] ${e.type}`, e.payload);
}Para inspecionar runs sem instanciar o engine, leia o SQLite direto (ele é append-only e human-readable).
Recovery — Saga + estratégias
Cada step bem-sucedido com compensate define registra a função na Saga. Se algo lá na frente lança, saga.unwind() roda compensações em LIFO.
onFail:
| valor | comportamento |
|---|---|
| retry | Default: retry(3, 500) — 3 tentativas, backoff exponencial 500ms × 2^n |
| escalate | Loga + next: 'abort' (em produção, vai pra outra fila) |
| compensate | Marca como abort; Saga roda unwind das compensações registradas |
| abort | Para tudo imediatamente |
Override por step com recovery: async (err, ctx) => ({ resolved, next, metadata }) — útil pra "retry só em 5xx, escalate em 4xx".
Agent
Wrapper único pra LLM. Provider trocável sem mudar workflow.
import { Agent } from '@ciromaciel/workflow';
engine.registerAgent('classifier',
new Agent.Executor(new Agent.Anthropic())); // ou Agent.OpenRouter() / Agent.Stub(fn)
// No handler:
const res = await engine.agent('classifier').execute({
system: '...',
messages: [{ role: 'user', content: '...' }],
tools: [],
capability, // OBRIGATÓRIO — cobra usage.cost
maxTokens: 1024,
});Agent.Stub(fn) recebe messages e retorna content — usado em samples sem chave de API.
Credentials
Secret nunca atravessa o handler do step nem o audit. Só o handle (nome simbólico) e scopes aparecem.
engine.credentials.register('github.bot', {
adapter: 'env', // built-in: lê de env var
envVar: 'GITHUB_TOKEN',
scopes: ['repo', 'workflow'],
});
// No step:
{ credential: 'github.bot', scopes: ['repo'], invokes: ['github:issue.create'] }A Policy.mintFor resolve o handle para um CredentialRef opaco anexado à capability. Quando o connector precisa, chama vault.resolve(ref, { scopes }) — secret in-memory, com cache TTL-aware e refresh transparente em OAuth.
Adapter custom: implemente { async resolve(config) → { type, value, expiresAt?, refresh? } } e engine.credentials.registerAdapter('name', instance).
Connectors
Integração com plataformas externas (GitHub, Gmail, Jira, Salesforce, YouTube, Meta, GCS/BigQuery/PubSub, etc). Toda invocação passa por:
1. capability.canInvoke('connector:action') → AuthorityError
2. capability.hasScope(s) pra cada action.scope → AuthorityError
3. action.input.parse(payload) → ConnectorError(INVALID_INPUT)
4. se action.idempotent: exige payload.idempotencyKey → ConnectorError(IDEMPOTENCY_REQUIRED)
5. capability.chargeUnits(connector, action.units) → AuthorityError se estourar callBudget
6. dryRun short-circuit em mutating actions → { dryRun, would, externalId:null }
7. vault.resolve(ref, { scopes }) → CredentialError
8. action.handler({...})
9. action.output.parse(result) → ConnectorError(INVALID_OUTPUT)Ver src/connectors/README.md pro contrato completo de defineConnector / defineAction, padrões de idempotência por plataforma, compensações reversíveis, LRO (Long-Running Operations) e mapa de error codes → recovery.
Uso no step
{
invokes: ['slack:chat.postMessage'],
scopes: ['chat:write'],
credential: 'slack.bot',
callBudget: { slack: 10 },
handler: async ({ input, connectors }) =>
connectors.invoke('slack', 'chat.postMessage', {
channel: '#alerts',
text: input.text,
idempotencyKey: input.eventId,
}),
}LRO
// (a) bloqueia step até concluir
const result = await connectors.run('youtube', 'video.upload', payload,
{ timeoutMs: 600_000, pollEveryMs: 5_000 });
// (b) handle separado + step `wait_upload`
const lro = await connectors.invoke('youtube', 'video.upload', payload);
return { lroId: lro.id };Errors
Sempre lance Errors.* em vez de Error genérico — o code direciona recovery (ver utils/errors.js).
| Classe | code típico |
|---|---|
| Errors.Engine | STEP_NOT_FOUND, codes customizados |
| Errors.Authority | AUTHORITY_DENIED |
| Errors.Eval | EVAL_FAILED (com details.report) |
| Errors.Data | READ_DENIED, WRITE_DENIED, NO_ADAPTER, NOT_WRITABLE |
| Errors.Connector | UNKNOWN_CONNECTOR, UNKNOWN_ACTION, INVALID_INPUT/OUTPUT, IDEMPOTENCY_REQUIRED, RATE_LIMITED, HTTP_ERROR, TIMEOUT, LRO_FAILED/TIMEOUT, NOT_IMPLEMENTED |
| Errors.Credential | CRED_UNKNOWN, CRED_SCOPE_DENIED, CRED_UNAVAILABLE, CRED_REF_MISSING, CRED_ADAPTER_* |
Samples
Cada arquivo em samples/ é executável standalone (bun run samples/0X-*.js) e mostra uma capacidade isolada:
| # | Sample | Cobertura |
|---|---|---|
| 01 | 01-minimal.js | Workflow de 1 step. Audit in-memory, capability vazia. |
| 02 | 02-eval-gates.js | preEvalRules + postEvalRules com Zod e boolRule. |
| 03 | 03-data-layer.js | Data.MemoryAdapter, engine.data.fetch/write + lineage. |
| 04 | 04-agent-step.js | Agent.Stub + spendLimit + cobrança automática. |
| 05 | 05-saga-compensation.js | onFail: 'compensate' + Saga.unwind() LIFO. |
| 06 | 06-retry-and-recovery.js | onFail: 'retry' + recovery custom por step. |
| 07 | 07-policy-rules.js | Pipeline de rules apertando TTL/spend no draft. |
| 08 | 08-full-pipeline.js | Tudo junto: data + agent + eval + compensação + audit SQLite. |
| 09 | 09-connector-github.js | Connector real com idempotência, scopes, audit. |
| 10 | 10-connector-compensation.js | defaultCompensate da action vs step.compensate. |
bun run samples/example.js <cpf> roda o exemplo SVR com SQLite persistido.
Princípios de design
- Spec declarativa, runner imperativo. Workflow é dado; engine é código.
- Capability obrigatória em toda I/O. Não há atalho — nem em handlers, nem em connectors.
- Audit append-only. Cada decisão é um evento com
runIdrastreável. Reprocessável. - Secret nunca em audit/handler. Só o handle opaco.
- Idempotência exigida em actions sensíveis. Schema falha se falta
idempotencyKey. - Compensação reversível por padrão. Se a plataforma permite, a action define
defaultCompensate. - Error codes guiam recovery. Strings estáveis, mapeadas em
onFail/recovery. - Camadas plugáveis. SQLite → Postgres/ClickHouse, env → Secret Manager, MemoryAdapter → KG real — sem mudar workflows.
