@usehelical/workflows
v0.0.1-alpha.15
Published
simple typesage durable workflows without magic
Readme
Helical Workflows
Simple, typesafe durable workflows without bundler magic
[!WARNING]
This is a work in progress
Goals
- simple & typesafe api
- minimal deployment complexity (just postgres)
- low latency (making it suitable for user-facing ai workflows)
Getting started
Writing workflows
import * as wf from 'helical-workflows/workflow';
async function indexDocument(url: string) {
// idempotent retryable durable steps
const document = await wf.runStep(async () => fetchDocument(url), { maxRetries: 5 });
const parsedDocument = await wf.runStep(async () => parseDocument(document));
await wf.runStep(async () => upsertIntoDb(parsedDocument));
}
const indexDocumentWorkflow = defineWorkflow(indexDocument);
const indexingQueue = defineQueue();
async function processDocumentBatch(urls: string[]) {
const handles = [];
// enqueue all documents for indexing
for (const url of urls) {
const runHandle = await wf.enqueueWorkflow(indexingQueue, indexDocument, [url]);
handles.push(runHandle);
}
let successfulCount = 0;
let failedCount = 0;
// wait for all documents to be indexed
for (const handle of handles) {
try {
await handle.result();
successfulCount++;
} catch (error) {
failedCount++;
}
}
return { successfulCount, failedCount };
}
const processDocumentBatchWorkflow = defineWorkflow(processDocumentBatch);Running a workflow
// create instance and register workflows and queues
const { runWorkflow } = createInstance({
workflows: { indexDocumentWorkflow, processDocumentBatchWorkflow },
queues: { indexingQueue },
options: { connectionString: 'dummy' },
});
const urls = ['a', 'b', 'c'];
const run = await runWorkflow(processDocumentBatchWorkflow, urls);
console.log(await run.result());Development roadmap
Messages
Messages are a way of communicating with a workflow from the outside. This can be useful for human-in-the-loop workflows or for other signals. Messages are persisted in the database and atomically consumed adhering to once-and-only-once semantics.
const approvalMessage = defineMessage<ApprovalMessage>('approval');
async function myWorkflowFunction() {
await wf.runStep(() => stageForApproval());
await wf.receiveMessage(approvalMessage);
await wf.runStep(() => publish());
}State
Workflow state is a way for workflows to expose state to the outside. The benefit of this state approach in comparison to other request handler approaches like in Temporal is that the workflow doesn’t have to be executing in order for the state to be retrievable. This makes it possible to retrieve state even if a workflow is already cancelled or finished.
async function myWorkflowFunction() {
await wf.setState({ progress: 50 });
}Schedules
Allow a workflow to be scheduled…
Streams
Streams allow for publishing streaming data from workflows to the outside the primary use case for this is realtime streaming of LLM responses. I want to take a different approach to other workflow systems here and treat streams as side effects inside of workflows and not as durable outputs. Since LLM streaming responses can be quite quick persisting the token stream into the database is costly. My hypothesis is that stream persistence is not needed since resuming LLM streaming responses require special handling and in most cases it makes more sense to just regenerate the response. Usually non-idempotent side effects in workflows are prohibited to achieve durability guarantees. But in this case the correctness of the workflow doesn’t depend on the stream the stream is just a user experience optimisation.
Workflow suspension
When implementing very long running workflows (days, months…) the workflow would still be the whole time in memory idling waiting for instance for a message or for a sleep() event. Workflow suspension would allow for workflows to be suspended when waiting and then replayed and continued when the event arrives. This happens at the cost of latency (since the workflow has to be replayed to reach its original state again after has been suspended) which is why I want to make this an optional feature.
Workflow version management
A tricky topic. In production systems it is possible that workflows with old versions are still Pending or are Queued. I don’t know yet what is the best way to handle this but I’m leaning towards workflow versions that can be manually specified using a versionId or could be auto generated by creating a hash from the workflow code.
