@fkws/klonk
v0.0.25
Published
A lightweight, extensible workflow automation engine for Node.js and Bun
Maintainers
Readme
A code-first, type-safe automation engine for TypeScript.
Introduction
Klonk is a code-first, type-safe automation engine. It provides composable primitives to build workflows and state machines with autocomplete and type inference. If you've ever wanted to build event-driven automations or a stateful agent in code, with all the benefits of TypeScript, Klonk is for you.
The two main features are Workflows and Machines.
- Workflows: Combine triggers with a series of tasks (a
Playlist) to automate processes. Example: "when a file is added to Dropbox, parse it, and create an entry in Notion." - Machines: Finite state machines where each state has its own
Playlistof tasks and conditional transitions to other states. Useful for agents, multi-step processes, or systems with stateful logic.
Installation
bun add @fkws/klonk
# or
npm i @fkws/klonkCompatibility
| Requirement | Support |
|-------------|---------|
| Runtimes | Node.js 18+, Bun 1.0+, Deno (via npm specifier, best-effort) |
| Module | ESM (native) and CJS (via bundled /dist) |
| TypeScript | 5.0+ (required for full type inference) |
| Dependencies | @fkws/klonk-result |
Status: Pre-1.0, API may change between minor versions. Aiming for stability by 1.0.
Quickstart
Copy-paste this to see Klonk in action. One trigger, two tasks, fully typed outputs:
import { Task, Trigger, Workflow } from "@fkws/klonk";
import { Result } from "@fkws/klonk-result";
// 1. Define two simple tasks
class FetchUser<I extends string> extends Task<{ userId: string }, { name: string; email: string }, I> {
async validateInput(input: { userId: string }) { return !!input.userId; }
async run(input: { userId: string }): Promise<Result<{ name: string; email: string }>> {
if (input.userId !== "123") {
return new Result({ success: false, error: new Error("User not found") });
}
return new Result({ success: true, data: { name: "Alice", email: "[email protected]" } });
}
}
class SendEmail<I extends string> extends Task<{ to: string; subject: string }, { sent: boolean }, I> {
async validateInput(input: { to: string; subject: string }) { return !!input.to; }
async run(input: { to: string; subject: string }): Promise<Result<{ sent: boolean }>> {
console.log(`📧 Sending "${input.subject}" to ${input.to}`);
return new Result({ success: true, data: { sent: true } });
}
}
// 2. Create a trigger (fires once with a userId)
class ManualTrigger<I extends string> extends Trigger<I, { userId: string }> {
async start() { this.pushEvent({ userId: "123" }); }
async stop() {}
}
// 3. Wire it up: trigger → playlist with typed outputs
const workflow = Workflow.create()
.addTrigger(new ManualTrigger("manual"))
.setPlaylist(p => p
.addTask(new FetchUser("fetch-user"))
.input((source) => ({ userId: source.data.userId })) // ← source.data is typed!
.addTask(new SendEmail("send-email"))
.input((source, outputs) => {
// outputs["fetch-user"] is typed as Result<{ name, email }> | null
const user = outputs["fetch-user"];
if (!user || user.isErr()) return null; // skip if failed
return { to: user.email, subject: `Welcome, ${user.name}!` };
})
);
workflow.start({ callback: (src, out) => console.log("✅ Done!", out) });What you just saw:
source.data.userIdis typed from the triggeroutputs["fetch-user"]is typed by the task's ident string literaluser.emailis narrowed after theisErr()guard
TypeScript Magic Moment
Klonk's type inference isn't marketing. Here's proof:
import { Machine } from "@fkws/klonk";
// Declare states upfront → autocomplete for ALL transitions
const machine = Machine.create<{ count: number }>()
.withStates("idle", "processing", "done") // ← These drive autocomplete
.addState("idle", node => node
.setPlaylist(p => p/* ... */)
.addTransition({
to: "processing", // ← Type "pro" and your IDE suggests "processing"
condition: async () => true,
weight: 1
})
// @ts-expect-error - "typo-state" is not a valid state
.addTransition({ to: "typo-state", condition: async () => true, weight: 1 })
, { initial: true });The withStates<...>() pattern means you can't transition to a state that doesn't exist. TypeScript catches it at compile time, not runtime.
Core Concepts
Klonk has a few concepts that work together.
Task
A Task is the smallest unit of work. It's an abstract class with two main methods you need to implement:
validateInput(input): Runtime validation of the task's input (on top of strong typing).run(input): Executes the task's logic.
Tasks return Result<T> from @fkws/klonk-result for handling success and error states without throwing exceptions. It's inspired by Rust's Result<T, E> type and provides convenient methods like isOk(), isErr(), and unwrap().
Result (Rust-inspired Result Type)
See the Result<T> implementation at https://github.com/klar-web-services/klonk-result.
Result<T> is Klonk's Rust-inspired result type. It forces you to handle both success and error cases:
import { Result } from "@fkws/klonk-result";
const ok = new Result({ success: true, data: { value: 42 } });
const err = new Result({ success: false, error: new Error("oops") });Result Methods
// isErr: Type guard for error case
if (err.isErr()) {
console.log(err.error); // TypeScript knows it's error
}
// unwrap: Get data or throw error (like Rust's .unwrap())
const data = ok.unwrap(); // Returns the data or throws
// When isOk() is true, you can access T's members directly (proxy forwarding).
if (ok.isOk()) {
console.log(ok.value);
}Why Result?
The Result type keeps error handling explicit without forcing exceptions. Combined with TypeScript's narrowing, you get the ergonomics of Rust's Result with JS-friendly patterns.
Playlist
A Playlist is a sequence of Tasks executed in order. Each task has access to the outputs of all previous tasks, in a fully type-safe way. You build a Playlist by chaining .addTask().input() calls:
playlist
.addTask(new FetchTask("fetch"))
.input((source) => ({ url: source.targetUrl }))
.addTask(new ParseTask("parse"))
.input((source, outputs) => ({
// Use isOk for Rust-style type narrowing!
html: outputs.fetch && outputs.fetch.isOk() ? outputs.fetch.body : ""
}))Note: If you forget to call
.input(), TypeScript will show an error mentioningTaskInputRequired- this is your hint that you need to provide the input builder!
Skipping Tasks
Need to conditionally skip a task? Just return null from the input builder:
playlist
.addTask(new NotifyTask("notify"))
.input((source, outputs) => {
// Skip notification if previous task failed - using isOk!
if (!outputs.fetch || outputs.fetch.isErr()) {
return null; // Task will be skipped!
}
return { message: "Success!", level: "info" };
})When a task is skipped:
- Its output in the
outputsmap isnull(not aResult) - The playlist continues to the next task
- Subsequent tasks can check
if (outputs.notify === null)to know it was skipped
This gives you Rust-like Option semantics using TypeScript's native null - no extra types needed!
Task Retries
When a task fails (Result.isErr()), it can be automatically retried. Retry behavior is configured on the Machine state or Workflow:
// On a Machine state:
Machine.create<MyState>()
.addState("fetch-data", node => node
.setPlaylist(p => p.addTask(...))
.retryDelayMs(500) // Retry every 500ms
.retryLimit(3) // Max 3 retries, then throw
)
// On a Workflow:
Workflow.create()
.addTrigger(myTrigger)
.retryDelayMs(1000) // Retry every 1s (default)
.retryLimit(5) // Max 5 retries
.setPlaylist(p => p.addTask(...))
// Disable retries entirely:
node.preventRetry() // Task failures throw immediatelyDefault behavior: infinite retries at 1000ms delay. This is designed for long-running daemons and background workers where resilience matters. For request/response contexts (APIs, CLIs, one-shot scripts), set .retryLimit(n) to cap attempts or use .preventRetry() to fail fast.
Trigger
A Trigger is what kicks off a Workflow. It's an event source. Klonk can be extended with triggers for anything: file system events, webhooks, new database entries, messages in a queue, etc.
Workflow
A Workflow connects one or more Triggers to a Playlist. When a trigger fires an event, the workflow runs the playlist, passing the event data as the initial input. This allows you to create powerful, event-driven automations.
Machine
A Machine is a finite state machine. You build it by declaring all state identifiers upfront with .withStates(...), then adding states with .addState():
Machine.create<MyStateData>()
.withStates("idle", "running", "complete") // Declare all states
.addState("idle", node => node
.setPlaylist(p => p.addTask(...).input(...))
.addTransition({ to: "running", condition: ..., weight: 1 }) // Autocomplete!
, { initial: true })
.addState("running", node => node...)
.finalize({ ident: "my-machine" });Each state has:
- A
Playlistthat runs when the machine enters that state. - A set of conditional
Transitionsto other states (with autocomplete!). - Retry rules for failed tasks and when no transition is available.
The Machine carries a mutable stateData object that can be read from and written to by playlists and transition conditions throughout its execution.
Machine run modes
- any: Runs until the first terminal condition occurs (leaf state, roundtrip to the initial state, or all reachable states visited).
- leaf: Runs until a leaf state (no transitions) is reached.
- roundtrip: Runs until it transitions back to the initial state.
- infinitely: Continues running indefinitely, sleeping between iterations (
intervalms, default 1000). UsestopAfterto cap total states entered.
Notes:
stopAftercounts states entered, including the initial state. For example,stopAfter: 1will run the initial state's playlist once and then stop;stopAfter: 0stops before entering the initial state.- Transition retries are independent of
stopAfter. A state can retry its transition condition (with optional delay) without affecting thestopAftercount until a state transition actually occurs. - Task retries use the same settings as transition retries. If a task fails and retries are enabled, it will retry until success or the limit is reached.
Features
- Type-Safe & Autocompleted: Klonk uses TypeScript's inference so the inputs and outputs of every step are strongly typed. You'll know at compile time if your logic is sound.
- Code-First: Define your automations directly in TypeScript. No YAML, no drag-and-drop UIs.
- Composable & Extensible: The core primitives (
Task,Trigger) are simple abstract classes, so you can create your own reusable components. - Flexible Execution:
Machinesrun with configurable modes viarun(state, options):any,leaf,roundtrip, orinfinitely(with optionalinterval).
Klonkworks: Pre-built Components
Coming soon(ish)! Klonkworks will be a collection of pre-built Tasks, Triggers, and integrations that connect to various services, so you don't have to build everything from scratch.
Code Examples
Here's how you create a custom Task. This task uses an AI client to perform text inference.
import { Task } from "@fkws/klonk";
import { Result } from "@fkws/klonk-result";
import { OpenRouterClient } from "./common/OpenrouterClient"
import { Model } from "./common/models";
type TABasicTextInferenceInput = {
inputText: string;
instructions?: string;
model: Model;
};
type TABasicTextInferenceOutput = {
text: string;
};
// A Task is a generic class. You provide the Input, Output, and an Ident (a unique string literal for the task).
export class TABasicTextInference<IdentType extends string> extends Task<
TABasicTextInferenceInput, // Input Type
TABasicTextInferenceOutput, // Output Type
IdentType // Ident Type (string literal for type-safe output keys)
> {
constructor(ident: IdentType, public client: OpenRouterClient) {
super(ident);
if (!this.client) {
throw new Error("[TABasicTextInference] An IOpenRouter client instance is required.");
}
}
// validateInput is for runtime validation of the data your task receives.
async validateInput(input: TABasicTextInferenceInput): Promise<boolean> {
if (!input.inputText || !input.model) {
return false;
}
return true;
}
// The core logic of your task. It must return a Result type.
async run(input: TABasicTextInferenceInput): Promise<Result<TABasicTextInferenceOutput>> {
try {
const result = await this.client.basicTextInference({
inputText: input.inputText,
instructions: input.instructions,
model: input.model
});
// On success, return a success object with your data.
return new Result({
success: true,
data: { text: result }
});
} catch (error) {
// On failure, return an error object.
return new Result({
success: false,
error: error instanceof Error ? error : new Error(String(error))
});
}
}
}Here's an example of a custom Trigger. This trigger fires on a given interval and pushes the current date as its event data.
import { Trigger } from '@fkws/klonk';
// A simple trigger that fires every `intervalMs` with the current date.
// You define the shape of the data the trigger will provide, in this case `{ now: Date }`.
export class IntervalTrigger<TIdent extends string> extends Trigger<TIdent, { now: Date }> {
private intervalId: NodeJS.Timeout | null = null;
constructor(ident: TIdent, private intervalMs: number) {
super(ident); // Pass the unique identifier to the parent constructor.
}
// The start method is called by the Workflow to begin listening for events.
async start(): Promise<void> {
if (this.intervalId) return; // Prevent multiple intervals.
this.intervalId = setInterval(() => {
// When an event occurs, use pushEvent to add it to the internal queue.
this.pushEvent({ now: new Date() });
}, this.intervalMs);
}
// The stop method cleans up any resources, like intervals or open connections.
async stop(): Promise<void> {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
}Workflows work well for event-driven automations. This example triggers when a new invoice PDF is added to a Dropbox folder, parses the invoice, and creates a new item in a Notion database.
Notice the fluent .addTask(task).input(builder) syntax - each task's input builder has access to source (trigger data) and outputs (all previous task results), with full type inference!
import { z } from 'zod';
import { Workflow } from '@fkws/klonk';
// The following example requires tasks, integrations and a trigger.
// Soon, you will be able to import these from @fkws/klonkworks.
import { TACreateNotionDatabaseItem, TANotionGetTitlesAndIdsForDatabase, TAParsePdfAi, TADropboxDownloadFile } from '@fkws/klonkworks/tasks';
import { INotion, IOpenRouter, IDropbox } from '@fkws/klonkworks/integrations';
import { TRDropboxFileAdded } from '@fkws/klonkworks/triggers';
// Providers and clients are instantiated as usual.
const notionProvider = new INotion({ apiKey: process.env.NOTION_API_KEY! });
const openrouterProvider = new IOpenRouter({ apiKey: process.env.OPENROUTER_API_KEY! });
const dropboxProvider = new IDropbox({
appKey: process.env.DROPBOX_APP_KEY!,
appSecret: process.env.DROPBOX_APP_SECRET!,
refreshToken: process.env.DROPBOX_REFRESH_KEY!
});
// Start building a workflow.
const workflow = Workflow.create()
.addTrigger(
new TRDropboxFileAdded("dropbox-trigger", {
client: dropboxProvider,
folderPath: process.env.DROPBOX_INVOICES_FOLDER_PATH ?? "",
})
)
.setPlaylist(p => p
// Get payees from Notion
.addTask(new TANotionGetTitlesAndIdsForDatabase("get-payees", notionProvider))
.input((source, outputs) => ({
database_id: process.env.NOTION_PAYEES_DATABASE_ID!
}))
// Get expense types from Notion
.addTask(new TANotionGetTitlesAndIdsForDatabase("get-expense-types", notionProvider))
.input((source, outputs) => ({
database_id: process.env.NOTION_EXPENSE_TYPES_DATABASE_ID!
}))
// Download the invoice PDF from Dropbox
.addTask(new TADropboxDownloadFile("download-invoice-pdf", dropboxProvider))
.input((source, outputs) => {
// The `source` object contains the trigger ident for discrimination
if (source.triggerIdent === "dropbox-trigger") {
return { file_metadata: source.data }
}
throw new Error(`Trigger ${source.triggerIdent} not implemented`);
})
// Parse the PDF with AI
.addTask(new TAParsePdfAi("parse-invoice", openrouterProvider))
.input((source, outputs) => {
// Access outputs of previous tasks - fully typed!
// Check for null (skipped) and success
const downloadResult = outputs['download-invoice-pdf'];
if (!downloadResult || downloadResult.isErr()) {
throw downloadResult?.error ?? new Error('Failed to download invoice PDF');
}
const payeesResult = outputs['get-payees'];
if (!payeesResult || payeesResult.isErr()) {
throw payeesResult?.error ?? new Error('Failed to load payees');
}
const expenseTypesResult = outputs['get-expense-types'];
if (!expenseTypesResult || expenseTypesResult.isErr()) {
throw expenseTypesResult?.error ?? new Error('Failed to load expense types');
}
return {
pdf: downloadResult.file,
instructions: "Extract data from the invoice",
schema: z.object({
payee: z.enum(payeesResult.map(p => p.id) as [string, ...string[]])
.describe("The payee id"),
total: z.number()
.describe("The total amount"),
invoice_date: z.string()
.regex(/^\d{4}-\d{2}-\d{2}$/)
.describe("Date as YYYY-MM-DD"),
expense_type: z.enum(expenseTypesResult.map(e => e.id) as [string, ...string[]])
.describe("The expense type id")
})
}
})
// Create the invoice entry in Notion
.addTask(new TACreateNotionDatabaseItem("create-notion-invoice", notionProvider))
.input((source, outputs) => {
const invoiceResult = outputs['parse-invoice'];
if (!invoiceResult || invoiceResult.isErr()) {
throw invoiceResult?.error ?? new Error('Failed to parse invoice');
}
const invoiceData = invoiceResult;
return {
database_id: process.env.NOTION_INVOICES_DATABASE_ID!,
properties: {
'Name': { 'title': [{ 'text': { 'content': 'Invoice' } }] },
'Payee': { 'relation': [{ 'id': invoiceData.payee }] },
'Total': { 'number': invoiceData.total },
'Invoice Date': { 'date': { 'start': invoiceData.invoice_date } },
'Expense Type': { 'relation': [{ 'id': invoiceData.expense_type }] }
}
}
})
);
// Run the workflow
console.log('[WCreateNotionInvoiceFromFile] Starting workflow...');
workflow.start({
callback: (source, outputs) => {
console.log('[WCreateNotionInvoiceFromFile] Workflow completed');
console.dir({ source, outputs }, { depth: null });
}
});Machines work well for stateful agents. This example shows an AI agent that takes a user's query, refines it, performs a web search, and generates a response.
The Machine manages a StateData object. Each StateNode's Playlist can modify this state, and the Transitions between states use it to decide which state to move to next.
import { Machine } from "@fkws/klonk"
import { OpenRouterClient } from "./tasks/common/OpenrouterClient"
import { Model } from "./tasks/common/models"
import { TABasicTextInference } from "./tasks/TABasicTextInference"
import { TASearchOnline } from "./tasks/TASearchOnline"
type StateData = {
input: string;
output?: string;
model?: Model;
refinedInput?: string;
searchTerm?: string;
searchResults?: {
results: {
url: string;
title: string;
content: string;
raw_content?: string;
score: string;
}[];
query: string;
answer?: string;
images?: string[];
follow_up_questions?: string[];
response_time: string;
};
finalResponse?: string;
}
const client = new OpenRouterClient(process.env.OPENROUTER_API_KEY!)
const webSearchAgent = Machine
.create<StateData>()
// Declare all states upfront for transition autocomplete
.withStates("refine_and_extract", "search_web", "generate_response")
.addState("refine_and_extract", node => node
.setPlaylist(p => p
// Refine the user's input
.addTask(new TABasicTextInference("refine", client))
.input((state, outputs) => ({
inputText: state.input,
model: state.model ?? "openai/gpt-5.2",
instructions: `You are a prompt refiner. Refine the prompt to improve LLM performance.
Break down by Intent, Mood, and Instructions. Do NOT answer - ONLY refine.`
}))
// Extract search terms from refined input
.addTask(new TABasicTextInference("extract_search_terms", client))
.input((state, outputs) => ({
inputText: `Original: ${state.input}\n\nRefined: ${outputs.refine && outputs.refine.isOk() ? outputs.refine.text : state.input}`,
model: state.model ?? "openai/gpt-5.2",
instructions: `Extract one short web search query from the user request and refined prompt.`
}))
// Update state with results - using isOk for type narrowing
.finally((state, outputs) => {
if (outputs.refine && outputs.refine.isOk()) {
state.refinedInput = outputs.refine.text;
}
if (outputs.extract_search_terms && outputs.extract_search_terms.isOk()) {
state.searchTerm = outputs.extract_search_terms.text;
}
})
)
.retryLimit(3) // Retry up to 3 times if no transition available
.addTransition({
to: "search_web", // Autocomplete works!
condition: async (state) => !!state.searchTerm,
weight: 2 // Higher weight = higher priority
})
.addTransition({
to: "generate_response", // Autocomplete works!
condition: async () => true, // Fallback
weight: 1
})
, { initial: true })
.addState("search_web", node => node
.setPlaylist(p => p
.addTask(new TASearchOnline("search"))
.input((state, outputs) => ({
query: state.searchTerm!
}))
.finally((state, outputs) => {
if (outputs.search && outputs.search.isOk()) {
state.searchResults = outputs.search;
}
})
)
.addTransition({
to: "generate_response",
condition: async () => true,
weight: 1
})
)
.addState("generate_response", node => node
.setPlaylist(p => p
.addTask(new TABasicTextInference("generate_response", client))
.input((state, outputs) => ({
inputText: state.input,
model: state.model ?? "openai/gpt-5.2",
instructions: `You received a user request and refined prompt.
${state.searchResults ? 'Search results are also available.' : ''}
Write a professional response.`
}))
.finally((state, outputs) => {
state.finalResponse = outputs.generate_response && outputs.generate_response.isOk()
? outputs.generate_response.text
: "Sorry, an error occurred: " + (outputs.generate_response?.error ?? "unknown");
})
)
)
.addLogger(pino()) // Optional: Add structured logging (pino recommended)
.finalize({ ident: "web-search-agent" });
// ------------- EXECUTION -------------
const state: StateData = {
input: "How do I update AMD graphic driver?",
model: "openai/gpt-5.2-mini"
};
// Run until it completes a roundtrip to the initial state
const finalState = await webSearchAgent.run(state, { mode: 'roundtrip' });
console.log(finalState.finalResponse);
// The original state object is also mutated:
console.log(state.finalResponse);Type System
Klonk's type system is minimal. Here's how it works:
Core Types
| Type | Parameters | Purpose |
|------|------------|---------|
| Task<Input, Output, Ident> | Input shape, output shape, string literal ident | Base class for all tasks |
| Result<Output> | Success data type | Rust-inspired result type (from @fkws/klonk-result) |
| Playlist<AllOutputs, Source> | Accumulated output map, source data type | Ordered task sequence with typed chaining |
| Trigger<Ident, Data> | String literal ident, event payload type | Event source for workflows |
| Workflow<Events> | Union of trigger event types | Connects triggers to playlists |
| Machine<StateData, AllStateIdents> | Mutable state shape, union of state idents | Finite state machine with typed transitions |
| StateNode<StateData, Ident, AllStateIdents> | State shape, this node's ident, all valid transition targets | Individual state with playlist and transitions |
Result Methods
| Function | Signature | Behavior |
|----------|-----------|----------|
| result.unwrap() | Result<T> → T | Returns data or throws error |
| result.isOk() | Result<T> → boolean | Type guard for success case |
| result.isErr() | Result<T> → boolean | Type guard for error case |
How Output Chaining Works
When you add a task to a playlist, Klonk extends the output type:
// Start with empty outputs
Playlist<{}, Source>
.addTask(new FetchTask("fetch")).input(...)
// Now outputs include: { fetch: Result<FetchOutput> | null }
Playlist<{ fetch: Result<FetchOutput> | null }, Source>
.addTask(new ParseTask("parse")).input(...)
// Now outputs include both: { fetch: ..., parse: Result<ParseOutput> | null }The | null accounts for the possibility that a task was skipped (when its input builder returns null). This is why you'll check for null before using isOk() - for example: outputs.fetch && outputs.fetch.isOk(). TypeScript then narrows the type so you can safely access fields!
This maps cleanly to Rust's types:
| Rust | Klonk (TypeScript) |
|------|-------------------|
| Option<T> | T \| null |
| Result<T, E> | Result<T> |
| Option<Result<T, E>> | Result<T> \| null |
