@flowtude/sdk
v0.1.11
Published
TypeScript SDK for building and executing distributed workflows with Flowtude
Downloads
13
Maintainers
Readme
@flowtude/sdk
TypeScript SDK for building and executing distributed workflows with Flowtude.
Installation
npm install @flowtude/sdk
# or
yarn add @flowtude/sdk
# or
pnpm add @flowtude/sdk
# or
bun add @flowtude/sdkQuick Start
Define a Simple Workflow
import { defineWorkflow, defineTask, z } from "@flowtude/sdk";
// Define a task (no name needed - it's specified when adding to workflow)
const greetTask = defineTask({
inputSchema: z.object({ name: z.string() }),
outputSchema: z.object({ greeting: z.string() }),
handler: async (input) => ({
greeting: `Hello, ${input.name}!`,
}),
});
// Define a workflow and add tasks with names
const workflow = defineWorkflow({
name: "greeting-workflow",
inputSchema: z.object({ name: z.string() }),
})
.task("greet", greetTask, (ctx) => ctx.workflow.input)
.build({
outputSchema: z.object({ result: z.string() }),
outputMapper: (ctx) => ({ result: ctx.tasks.greet.greeting }),
});
// Start listening for tasks (requires workerName)
await workflow.listen({
workerName: "my-worker-1",
});Multi-Task Workflow with Dependencies
import { defineWorkflow, defineTask, z } from "@flowtude/sdk";
const fetchUserTask = defineTask({
inputSchema: z.object({ userId: z.string() }),
outputSchema: z.object({ name: z.string(), email: z.string() }),
handler: async (input) => {
// Fetch user from database
return {
name: "John Doe",
email: "[email protected]",
};
},
});
const sendEmailTask = defineTask({
inputSchema: z.object({
email: z.string(),
message: z.string(),
}),
outputSchema: z.object({ sent: z.boolean() }),
handler: async (input) => {
// Send email
console.log(`Sending email to ${input.email}: ${input.message}`);
return { sent: true };
},
});
const workflow = defineWorkflow({
name: "user-notification",
inputSchema: z.object({ userId: z.string(), message: z.string() }),
})
.task("fetchUser", fetchUserTask, (ctx) => ({
userId: ctx.workflow.input.userId,
}))
.task("sendEmail", sendEmailTask, (ctx) => ({
email: ctx.tasks.fetchUser.email,
message: ctx.workflow.input.message,
}))
.build({
outputSchema: z.object({ success: z.boolean() }),
outputMapper: (ctx) => ({
success: ctx.tasks.sendEmail.sent,
}),
});
await workflow.listen({
workerName: "worker-1",
});Configuration
The SDK requires the following environment variable:
# Flowtude API key (required)
FLOWTUDE_API_KEY=vyu.your-public-id.your-secretCore Concepts
Tasks
Tasks are the basic units of work in Flowtude. Each task has:
- Input Schema: Defines the expected input structure (Zod schema)
- Output Schema: Defines the output structure (Zod schema)
- Handler: The function that processes the input and returns output
const myTask = defineTask({
inputSchema: z.object({ data: z.string() }),
outputSchema: z.object({ result: z.number() }),
handler: async (input) => {
return { result: input.data.length };
},
});Workflows
Workflows orchestrate multiple tasks with dependencies:
const workflow = defineWorkflow({
name: "my-workflow",
inputSchema: z.object({ input: z.string() }),
})
.task("task1", task1, (ctx) => ctx.workflow.input)
.task("task2", task2, (ctx) => ({
data: ctx.tasks.task1.output,
}))
.build({
outputSchema: z.object({ final: z.string() }),
outputMapper: (ctx) => ({ final: ctx.tasks.task2.result }),
});Context
The context object provides access to:
ctx.workflow.input: The workflow inputctx.tasks.<taskName>: Outputs from direct upstream dependenciesctx.getTaskResult(taskName): Get output from any completed task (for indirect dependencies)
Advanced Features
Error Handling
Tasks can throw errors that will be caught and retried automatically:
const taskWithRetry = defineTask({
inputSchema: z.object({ url: z.string() }),
outputSchema: z.object({ data: z.string() }),
handler: async (input) => {
const response = await fetch(input.url);
if (!response.ok) {
throw new Error(`Failed to fetch: ${response.statusText}`);
}
return { data: await response.text() };
},
});Parallel Task Execution
Tasks that don't depend on each other run in parallel automatically:
const workflow = defineWorkflow({
name: "parallel-workflow",
inputSchema: z.object({ data: z.string() }),
})
.task("task1", task1, (ctx) => ctx.workflow.input) // Runs in parallel
.task("task2", task2, (ctx) => ctx.workflow.input) // Runs in parallel
.task("task3", task3, (ctx) => ({
result1: ctx.tasks.task1.output,
result2: ctx.tasks.task2.output,
})) // Waits for task1 and task2
.build({
outputSchema: z.object({ final: z.string() }),
outputMapper: (ctx) => ({ final: ctx.tasks.task3.result }),
});Custom Listen Settings
Configure how workflows listen for tasks:
await workflow.listen({
workerName: "my-worker", // Required: Unique worker identifier
prefetch: 20, // Optional: Prefetch up to 20 tasks from queue (default: 1)
heartbeatIntervalMs: 30000, // Optional: Heartbeat interval in milliseconds (default: 30000)
retries: 5, // Optional: Override workflow default retries
timeout: 600, // Optional: Override workflow default timeout (seconds)
});API Reference
defineTask(config)
Creates a task definition.
Parameters:
config.inputSchema: Zod schema for input validationconfig.outputSchema: Zod schema for output validationconfig.handler: Async function that processes input and returns output
defineWorkflow(config)
Creates a workflow builder.
Parameters:
config.name: Unique workflow nameconfig.inputSchema: Zod schema for workflow input
WorkflowBuilder.task(name, task, inputMapper)
Adds a task to the workflow.
Parameters:
name: Unique task name within the workflowtask: Task definition created withdefineTaskinputMapper: Function that maps context to task input
WorkflowBuilder.build(config)
Builds the workflow.
Parameters:
config.outputSchema: Zod schema for workflow outputconfig.outputMapper: Function that maps context to workflow output
Workflow.listen(settings)
Starts listening for workflow tasks.
Parameters:
settings.workerName: Unique worker identifier (required) - typically hostname or container IDsettings.prefetch: Number of tasks to prefetch from queue (default: 1)settings.heartbeatIntervalMs: Worker heartbeat interval in milliseconds (default: 30000)settings.retries: Override workflow default retry attempts (optional)settings.timeout: Override workflow default timeout in seconds (optional)
TypeScript Support
The SDK is written in TypeScript and provides full type safety:
// Input and output types are automatically inferred
const task = defineTask({
inputSchema: z.object({ count: z.number() }),
outputSchema: z.object({ result: z.string() }),
handler: async (input) => {
// input is typed as { count: number }
return { result: `Count: ${input.count}` };
// Return type is validated as { result: string }
},
});
// Context is fully typed
const workflow = defineWorkflow({
name: "typed-workflow",
inputSchema: z.object({ value: z.number() }),
})
.task("double", task, (ctx) => {
// ctx.workflow.input is typed as { value: number }
return { count: ctx.workflow.input.value * 2 };
})
.build({
outputSchema: z.object({ final: z.string() }),
outputMapper: (ctx) => {
// ctx.tasks.double is typed as { result: string }
return { final: ctx.tasks.double.result };
},
});Support
- Documentation: https://flowtude.com/docs
