focketplow
v0.0.4
Published
A multi-agent flow library.
Maintainers
Readme
Multi-Agent Flow Library Documentation
Overview
Inspired by the beauty of PocketFlow (https://github.com/The-Pocket/PocketFlow/) here's FocketPlow.
A flexible, type-safe framework for building multi-agent systems using flow-based programming patterns. It supports both synchronous and asynchronous execution with retry semantics, batch processing, and parallel execution capabilities.
Installation
npm install focketplowCore Concepts
Nodes
The library is built around Nodes which are the fundamental building blocks of any flow. Each node has three main lifecycle phases:
- Prep: Prepare data and set up the node for execution
- Exec: Perform the actual work/business logic
- Post: Handle results and decide what action to take next
Flow Types
- Sync Flow: Execute nodes synchronously
- Async Flow: Execute nodes asynchronously with Promise support
- Batch Flow: Process multiple parameter bundles
- Parallel Flow: Process items in parallel for better performance
Type Definitions
NodeGenerics
interface NodeGenerics<S, P extends Dict, Prep, Out> {
shared: S; // Immutable shared context
params: P; // Mutable parameters
prep: Prep; // Data from prep() phase
out: Out; // Return value from exec() phase
}Action
type Action = string | undefineddefaultorundefined: Follow the default path- Custom strings: Select specific edges for conditional flows
Core Classes
BaseNode<S, P, Prep, Out>
The foundation of all nodes with basic lifecycle management.
Key Methods
setParams(params: P): Configure node parametersnext(node, action?): Connect successor nodeson(action).to(node): Fluent API for connecting nodesrun(shared): Execute node standalone
Lifecycle Hooks (Override)
prep(shared: S): Prep | voidexec(prepRes: Prep): Out | voidpost(shared: S, prepRes: Prep, execRes: Out): Action
Node<S, P, Prep, Out>
Extends BaseNode with retry capabilities.
Constructor Options
{
maxRetries?: number, // default: 1
waitMs?: number // default: 0 (no delay)
}BatchNode<S, P, ItemPrep, ItemOut>
Process arrays of items by running exec over each item.
Flow<S, P, Prep, Out>
Orchestrates multiple nodes by using actions to determine the path.
Key Methods
start(node): Set the starting noderun(shared): Execute the entire flow
AsyncNode<S, P, Prep, Out>
Asynchronous version of Node with Promise-based execution.
Async Lifecycle Hooks
prepAsync(shared: S): Promise<Prep | void>execAsync(prepRes: Prep): Promise<Out | void>execFallbackAsync(prepRes: Prep, exc): Promise<Out | void>postAsync(shared: S, prepRes: Prep, execRes: Out): Promise<Action>
Key Methods
runAsync(shared: S): Promise<Action>
AsyncFlow<S, P, Prep, Out>
Asynchronous flow orchestration that automatically handles both sync and async nodes.
Batch Variants
- BatchFlow: Sequential processing of parameter bundles
- AsyncBatchFlow: Async sequential processing
- AsyncParallelBatchFlow: Parallel async processing
Execution Models
Sequential Execution
const flow = new Flow()
.start(new NodeA())
.next(new NodeB())
.next(new NodeC());Batch Processing
class MyBatchFlow extends BatchFlow<MyContext, MyParams> {
prep(shared: MyContext) {
return [{ id: 1 }, { id: 2 }, { id: 3 }]; // Parameter bundles
}
}Parallel Execution
class MyParallelFlow extends AsyncParallelBatchFlow<MyContext, MyParams> {
prep(shared: MyContext) {
return items.map(item => ({ id: item.id }));
}
}Error Handling
All nodes support retry semantics with configurable retry counts and delays. When max retries are exceeded, you can provide custom fallback behavior.
Conditional Flows
Use action values to create branching flows:
class DecisionNode extends Node<SharedContext, Params> {
post(shared: SharedContext, prep: any, out: any): Action {
return out.success ? 'success_path' : 'failure_path';
}
}
const flow = new Flow()
.start(decisionNode)
.on('success_path').to(successHandler)
.on('failure_path').to(errorHandler);Type Safety
The library provides full TypeScript type safety for all node parameters, shared context, preparation data, and output types.
interface Context {
username: string;
requestId: string;
}
interface Config {
rateLimit?: number;
timeout?: number;
}
class TypedNode extends Node<Context, Config, string, number> {
// type-safe parameters
exec(input: string): number {
return input.length;
}
}Example
/**
* Focketplow Node Lifecycle:
* - prep(): Prepare phase - Extract data from shared context, perform setup
* - exec(): Execute phase - Main logic of the node, receives result from prep()
* - post(): Post-execute phase - Handle results, update shared context, determine next action
*/
import { Flow, Node } from 'focketplow';
class NodeA extends Node {
prep(shared) {
console.log('NodeA: prep phase');
return {};
}
exec(prepData) {
console.log('NodeA: exec phase');
return 1;
}
post(shared, prepData, output) {
console.log('NodeA: post phase - result:', output);
shared.results = shared.results || [];
shared.results.push(output);
return undefined;
}
}
class NodeB extends Node {
prep(shared) {
console.log('NodeB: prep phase');
return {};
}
exec(prepData) {
console.log('NodeB: exec phase');
return 2;
}
post(shared, prepData, output) {
console.log('NodeB: post phase - result:', output);
shared.results.push(output);
return undefined;
}
}
class NodeC extends Node {
prep(shared) {
console.log('NodeC: prep phase');
return {};
}
exec(prepData) {
console.log('NodeC: exec phase');
return 3;
}
post(shared, prepData, output) {
console.log('NodeC: post phase - result:', output);
shared.results.push(output);
return shared.results.join(',');
}
}
const nodeA = new NodeA();
const nodeB = new NodeB();
const nodeC = new NodeC();
nodeA.next(nodeB);
nodeB.next(nodeC);
const flow = new Flow();
flow.start(nodeA);
const sharedContext = {};
const result = flow.run(sharedContext);
console.log('Flow result:', result);
/**
* result:
NodeA: prep phase
NodeA: exec phase
NodeA: post phase - result: 1
NodeB: prep phase
NodeB: exec phase
NodeB: post phase - result: 2
NodeC: prep phase
NodeC: exec phase
NodeC: post phase - result: 3
Flow result: 1,2,3
*/
