@aikirun/workflow
v0.1.12
Published
Workflow SDK for defining durable workflows
Maintainers
Readme
@aikirun/workflow
Workflow SDK for Aiki durable execution engine - define durable workflows with tasks, sleeps, waits, and event handling.
Installation
deno add jsr:@aikirun/workflow @aikirun/task @aikirun/client @aikirun/workerQuick Start
Define a Workflow
import { workflow } from "@aikirun/workflow";
import { markUserVerified, sendVerificationEmail } from "./tasks.ts";
export const onboardingWorkflow = workflow({ name: "user-onboarding" });
export const onboardingWorkflowV1 = onboardingWorkflow.v("1.0", {
async exec(input: { email: string }, run) {
run.logger.info("Starting onboarding", { email: input.email });
// Execute a task to send verification email
await sendVerificationEmail.start(run, { email: input.email });
// Execute task to mark user as verified
// (In a real scenario, this would be triggered by an external event)
await markUserVerified.start(run, { email: input.email });
// Sleep for 24 hours before sending tips
await run.sleep({ days: 1 });
// Send usage tips
await sendUsageTips.start(run, { email: input.email });
return { success: true, userId: input.email };
},
});Features
- Durable Execution - Automatically survives crashes and restarts
- Task Orchestration - Coordinate multiple tasks in sequence
- Durable Sleep - Sleep without consuming resources or blocking workers
- State Snapshots - Automatically save state at each step
- Error Handling - Built-in retry and recovery mechanisms
- Multiple Versions - Run different workflow versions simultaneously
- Logging - Built-in structured logging for debugging
Workflow Primitives
Execute Tasks
const result = await createUserProfile.start(run, {
email: input.email,
});Sleep for a Duration
// Using duration object
await run.sleep({ days: 1 });
await run.sleep({ hours: 2, minutes: 30 });
await run.sleep({ seconds: 30 });
// Using milliseconds (backward compatible)
await run.sleep(5000);Get Workflow State
const { state } = await run.handle.getState();
console.log("Workflow status:", state.status);Logging
run.logger.info("Processing user", { email: input.email });
run.logger.debug("User created", { userId: result.userId });Workflow Options
Delayed Trigger
export const morningWorkflowV1 = morningWorkflow.v("1.0", {
// ... workflow definition
}).withOptions({
trigger: {
type: "delayed",
delay: { seconds: 5 }, // or: delay: 5000
},
});Retry Strategy
export const paymentWorkflowV1 = paymentWorkflow.v("1.0", {
// ... workflow definition
}).withOptions({
retry: {
type: "exponential",
maxAttempts: 3,
baseDelayMs: 1000,
maxDelayMs: 10000,
},
});Idempotency Key
export const orderWorkflowV1 = orderWorkflow.v("1.0", {
// ... workflow definition
}).withOptions({
idempotencyKey: "order-${orderId}",
});Running Workflows
With the client:
import { client } from "@aikirun/client";
import { onboardingWorkflowV1 } from "./workflows.ts";
const aiki = await client({
url: "http://localhost:9090",
redis: { host: "localhost", port: 6379 },
});
const stateHandle = await onboardingWorkflowV1.start(aiki, {
email: "[email protected]",
});
// Wait for completion
const result = await stateHandle.wait(
{ type: "status", status: "completed" },
{ maxDurationMs: 60 * 1000, pollIntervalMs: 5_000 },
);
if (result.success) {
console.log("Workflow completed!", result.state);
} else {
console.log("Workflow did not complete:", result.cause);
}With a worker:
import { worker } from "@aikirun/worker";
const aikiWorker = worker(aiki, {
maxConcurrentWorkflowRuns: 10,
});
aikiWorker.registry.add(onboardingWorkflow);
await aikiWorker.start();Execution Context
The run parameter provides access to:
interface WorkflowRunContext<Input, Output> {
id: WorkflowRunId; // Unique run ID
name: WorkflowName; // Workflow name
versionId: WorkflowVersionId; // Version ID
options: WorkflowOptions; // Execution options (trigger, retry, idempotencyKey)
handle: WorkflowRunHandle<Input, Output>; // Advanced state management
logger: Logger; // Logging (info, debug, warn, error, trace)
sleep(duration: Duration): Promise<void>; // Durable sleep
}The Duration type accepts:
- Raw milliseconds:
run.sleep(5000) - Duration objects:
run.sleep({ days: 1 }),run.sleep({ hours: 2, minutes: 30 })
Error Handling
Workflows handle errors gracefully:
try {
await risky.start(run, input);
} catch (error) {
run.logger.error("Task failed", { error: error.message });
// Workflow can decide how to proceed
}Failed workflows transition to awaiting_retry state and are automatically retried by the server.
Best Practices
- Keep Workflows Deterministic - Same input should always produce same output
- Expect Replays - Code may execute multiple times during retries
- Use Descriptive Events - Name events clearly for debugging
- Handle Timeouts - Always check
event.receivedafter waiting - Log Strategically - Use logger to track workflow progress
- Version Your Workflows - Deploy new versions alongside old ones
Related Packages
- @aikirun/task - Define tasks
- @aikirun/client - Start workflows
- @aikirun/worker - Execute workflows
- @aikirun/lib - Utilities
License
Apache-2.0
