npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@zimtsui/catflow

v0.0.13

Published

An AI workflow orchestrator powered by the most native capabilities of TypeScript.

Readme

Catflow

Npm package version

Almost all workflow orchestrators are based on Graph Theory, e.g. LangChain, LangGraph, Airflow, etc. While Catflow is one based on Category Theory, and is powered by the most native capabilities of TypeScript.

Rationale

Traditional workflows have almost all capabilities that AI workflows have, e.g. pipeline, parallelism, conditional, retry, etc. Popular AI workflow frameworks, e.g. LangChain, unify the APIs of various model suppliers. But in terms of orchestration, they are no different from the traditional ones.

So what is the essential difference between AI workflows and traditional workflows in terms of orchestration? The answer is about the mechanism of retry. In traditional workflows, if a node fails, or if the output of the node is rejected by the downstream, the node should typically retry by repeating the exact same operation with the same success rate as the last attempt. While in AI workflows, when a stateful AI node should retry, it revises its former output with a much higher success rate than the last attempt.

Concept

Workflow Node

The output of a node can be represented as an async generator which yields the result value to the downstream.

export type Draft<value> = AsyncGenerator<value, never, never>;

If the downstream accepts the yielded result, .throw of the generator will be called with a Finalized exception.

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *solve(problem: string): Draft<string> {
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{ role: 'system', content: 'Please solve math problems.' },
		{ role: 'user', content: problem },
	];
	const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
	// The `yield` will never return if the downstream accepts the yielded result.
	return yield completion.choices[0]!.message.content!;
}

If the downstream rejects the yielded result, the .throw of the generator should be called with an exception as feedback. In this case, the node should revise its output and yield a new version.

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *solve(problem: string): Draft<string> {
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{ role: 'system', content: 'Please solve math problems.' },
		{ role: 'user', content: problem },
	];
	for (;;) {
		const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
		messages.push({ role: 'assistant', content: completion.choices[0]!.message.content! });
		try {
			return yield completion.choices[0]!.message.content!;
		} catch (e) {
			if (e instanceof Error) {} else throw e;
			messages.push({ role: 'user', content: `Please revise your answer upon the feedback: ${e.message}` });
		}
	}
}

A node can reject the input by throwing an exception to the upstream.

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *solve(problem: string): Draft<string> {
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{ role: 'system', content: 'Please solve math problems.' },
		{ role: 'user', content: problem },
	];
	const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
	if (completion.choices[0]!.message.tool_calls?.[0]?.function.name === 'fail')
		throw new Error('The problem is too hard.');
	return yield completion.choices[0]!.message.content!;
}

A node can also yield an exception to the downstream, for example, to oppose the feedback.

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *solve(problem: string): Draft<string | Error> {
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{ role: 'system', content: 'Please solve math problems.' },
		{ role: 'user', content: problem },
	];
	const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
	try {
		return yield completion.choices[0]!.message.content!;
	} catch (e) {
		return yield new Error('My solution is correct, and your feedback is wrong.');
	}
}

Controlflow

A Controlflow is a wrapper of nodes. It's intended to compose nodes within a workflow into a larger node.

import { Controlflow, Draft } from '@zimtsui/catflow';
declare function translateEnglishToChinese(englishText: string): Draft<string>;

const cf = Controlflow.from('What does 1+1 equal to ?')
	.map((text: string) => text.trimStart())	// append a sync function
	.transform(async (text: string) => text.trimEnd())	// append an async function
	.then(translateEnglishToChinese)	// append an async generator function
;
export default await cf.first();

Basic Orchestrations

Conditional

import { Controlflow, type Draft } from '@zimtsui/catflow';

declare const determineLanguage: (text: string) => Promise<'Chinese' | 'Russian' | 'English'>;
declare const translateEnglishToChinese: (englishText: string) => Draft<string>;
declare const translateRussianToChinese: (russianText: string) => Draft<string>;
declare const solveEnglishMathProblem: (englishMathProblem: string) => Draft<string>;

const cf = Controlflow.from('What does 1+1 equal to ?')
	.then(async function *(mathProblem: string): Draft<string> {
		switch (await determineLanguage(mathProblem)) {
			case 'English': return yield *translateEnglishToChinese(mathProblem);
			case 'Russian': return yield *translateRussianToChinese(mathProblem);
			case 'Chinese': return yield mathProblem;
			default: throw new Error('Language Not Supported');
		}
	}).then(solveEnglishMathProblem)
;
export default await cf.first();

Loop

import { Controlflow, type Draft } from '@zimtsui/catflow';

declare const translateEnglishToRussian: (englishText: string) => Draft<string>;
declare const translateRussianToChinese: (russianText: string) => Draft<string>;
declare const translateChineseToEnglish: (chineseText: string) => Draft<string>;

const cf = Controlflow.from('What does 1+1 equal to ?')
	.then((mathProblemInEnglish: string) => {
		let cf = Controlflow.from(mathProblemInEnglish);
		for (let i = 1; i <= 3; i++) cf = cf
			.then(translateEnglishToRussian)
			.then(translateRussianToChinese)
			.then(translateChineseToEnglish)
		;
		return cf.draft;
	});
export default await cf.first();

Parallel

import { Controlflow, type Draft } from '@zimtsui/catflow';

declare const translateEnglishToChinese: (englishText: string) => Draft<string>;
declare const translateEnglishToRussian: (englishText: string) => Draft<string>;

const cf = Controlflow.from('What does 1+1 equal to ?')
	.transform(async (mathProblemInEnglish: string) => {
		const [mathProblemInChinese, mathProblemInRussian] = await Promise.all([
			Controlflow.from(mathProblemInEnglish).then(translateEnglishToChinese).first(),
			Controlflow.from(mathProblemInEnglish).then(translateEnglishToRussian).first(),
		]);
		return [
			`# English: ${mathProblemInEnglish}`,
			`# Chinese: ${mathProblemInChinese}`,
			`# Russian: ${mathProblemInRussian}`,
		].join('\n\n');
	});
export default await cf.first();

Advanced Orchestrations

Design Pattern of Optimizer - Evaluator

Optimizer

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *optimize(problem: string): Draft<string> {
    const messages: OpenAI.ChatCompletionMessageParam[] = [
        {
            role: 'system',
            content: [
                'Please solve math problems.',
                'Your answer will be evaluated and the feedback will be provided if the answer is rejected.'
            ].join(' ')
        },
        { role: 'user', content: problem },
    ];
    for (;;) try {
        const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
        messages.push(completion.choices[0]!.message);
        return yield completion.choices[0]!.message.content!;
    } catch (e) {
        if (e instanceof Error) {} else throw e;
        messages.push({
            role: 'user',
            content: `Your answer is rejected: ${e.message}. Please revise your answer.`,
        });
    }
}

Evaluator

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *evaluate(problem: string, answer: string): Draft<string> {
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{
			role: 'system',
			content: [
				'Please examine the given answer of the given math problem.',
				'Print only `ACCEPT` if it is correct.',
			].join(' '),
		},
		{ role: 'user', content: `Problem: ${problem}\n\nAnswer: ${answer}` },
	];
	const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
	messages.push(completion.choices[0]!.message);
	if (completion.choices[0]!.message.content === 'ACCEPT') return yield answer;
	else throw new Error(completion.choices[0]!.message.content!);
}

Workflow

import { optimize } from './optimize.ts';
import { evaluate } from './evaluate.ts';
import { Controlflow, Draft } from '@zimtsui/catflow';

export const workflow = (problem: string) => Controlflow.from(problem)
    .then(optimize)
    .then(answer => evaluate(problem, answer))
.draft satisfies Draft<string>;

Design Pattern of Optimizer - Stateful Evaluator

Evaluator

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *evaluate(problem: string, draft: Draft<string>): Draft<string> {
	let answer = await draft.next().then(r => r.value);
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{
			role: 'system',
			content: [
				'Please examine the given answer of the given math problem.',
				'Print `ACCEPT` if it is correct.',
			].join(' '),
		},
		{ role: 'user', content: `Problem: ${problem}\n\nAnswer: ${answer}` },
	];
	for (;;) try {
		const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
		messages.push(completion.choices[0]!.message);
		if (completion.choices[0]!.message.content === 'ACCEPT') return yield answer;
		else throw new Error(completion.choices[0]!.message.content!);
	} catch (e) {
		answer = await draft.throw(e as Error).then(r => r.value);
		messages.push({
			role: 'user',
			content: `The answer is revised: ${answer}\n\nPlease examine it again.`,
		});
	}
}

Workflow

import { optimize } from './optimize.ts';
import { evaluate } from './evaluate.ts';
import { Controlflow, Draft } from '@zimtsui/catflow';

export const workflow = (problem: string) => Controlflow.from(problem)
    .then(optimize)
    .pipe(draft => evaluate(problem, draft))
.draft satisfies Draft<string>;

Design Pattern of Optimizer - Opposable Evaluator

Optimizer

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *optimize(problem: string): Draft<string | Error> {
    const messages: OpenAI.ChatCompletionMessageParam[] = [
        {
            role: 'system',
            content: [
                'Please solve math problems.',
                'Your answer will be evaluated and the feedback will be provided if the answer is rejected.',
                'If you insist your answer, print only `OPPOSE` to oppose the rejection, or revise your answer.',
            ].join(' '),
        },
        { role: 'user', content: problem },
    ];
    for (;;) {
        const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
        messages.push(completion.choices[0]!.message);
        try {
            if (completion.choices[0]!.message.content! === 'OPPOSE')
                return yield new Error('My answer is correct.');
            else
                return yield completion.choices[0]!.message.content!;
        } catch (e) {
            if (e instanceof Error) {} else throw e;
            messages.push({
                role: 'user',
                content: `Your answer is rejected: ${e.message}. Please revise your answer.`,
            });
        }
    }
}

Evaluator

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *evaluate(problem: string, draft: Draft<string | Error>): Draft<string | Error> {
	let input = await draft.next().then(r => r.value);
	let answer = input as string;
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{
			role: 'system',
			content: [
				'Please examine the given answer of the given math problem.',
				'Print only `ACCEPT` if it is correct.',
			].join(' '),
		},
		{ role: 'user', content: `Problem: ${problem}\n\nAnswer: ${answer}` },
	];
	for (;;) try {
		const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
		messages.push(completion.choices[0]!.message);
		if (completion.choices[0]!.message.content === 'ACCEPT') return yield answer;
		else throw new Error(completion.choices[0]!.message.content!);
	} catch (e) {
		input = await draft.throw(e as Error).then(r => r.value);
		if (input instanceof Error) messages.push({
			role: 'user',
			content: `Your rejection is opposed: ${input.message}\n\nPlease examine it again.`,
		}); else messages.push({
			role: 'user',
			content: `The answer is revised: ${answer = input}\n\nPlease examine it again.`,
		});
	}
}

Workflow

import { optimize } from './optimize.ts';
import { evaluate } from './evaluate.ts';
import { Controlflow, Draft } from '@zimtsui/catflow';

export const workflow = (problem: string) => Controlflow.from(problem)
    .then(optimize)
    .pipe(draft => evaluate(problem, draft))
.draft satisfies Draft<string | Error>;

Design Pattern of Optimizer - Multiple Evaluators

Workflow

import { optimize } from './optimize.ts';
import { evaluate as evaluate1 } from './evaluate.ts';
import { Controlflow, Draft } from '@zimtsui/catflow';
declare const evaluate2: typeof evaluate1;
declare const evaluate3: typeof evaluate1;

export const workflow = (problem: string) => Controlflow.from(problem)
    .then(optimize)
    .then(draft => evaluate1(problem, draft))
    .then(draft => evaluate2(problem, draft))
    .then(draft => evaluate3(problem, draft))
.draft satisfies Draft<string>;

Design Pattern of Optimizer - Multiple Stateful Evaluators

Workflow

import { optimize } from '../multiple-evaluators/optimize.ts';
import { evaluate as evaluate1 } from './evaluate.ts';
import { Controlflow, Draft } from '@zimtsui/catflow';
declare const evaluate2: typeof evaluate1;
declare const evaluate3: typeof evaluate1;

export const workflow = (problem: string) => Controlflow.from(problem)
    .then(optimize)
    .pipe(draft => evaluate1(problem, draft))
    .pipe(draft => evaluate2(problem, draft))
    .pipe(draft => evaluate3(problem, draft))
.draft satisfies Draft<string>;

Design Pattern of Optimizer - Multiple Opposable Evaluators

Evaluator

import { Draft } from '@zimtsui/catflow';
import OpenAI from 'openai';
declare const openai: OpenAI;

export async function *evaluate(problem: string, draft: Draft<string | Error>): Draft<string | Error> {
	let input = await draft.next().then(r => r.value);
	let answer = input as string;
	const messages: OpenAI.ChatCompletionMessageParam[] = [
		{ role: 'system', content: 'Please examine the given answer of the given math problem. Print only `ACCEPT` if it is correct.' },
		{ role: 'user', content: `Problem: ${problem}\n\nAnswer: ${answer}` },
	];
	for (let evaluating = true;; evaluating = true) try {
		const completion = await openai.chat.completions.create({ model: 'gpt-4o', messages });
		messages.push(completion.choices[0]!.message);
		if (completion.choices[0]!.message.content === 'ACCEPT') return yield (evaluating = false, answer);
		else throw new Error(completion.choices[0]!.message.content!);
	} catch (e) {
		for (;;) try {
			input = await draft.throw(e as Error).then(r => r.value);
			if (input instanceof Error && !evaluating) return yield input; else break;
		} catch (newe) { e = newe; }
		if (input instanceof Error) messages.push({
			role: 'user',
			content: `Your rejection is opposed: ${input.message}\n\nPlease examine it again.`,
		}); else messages.push({
			role: 'user',
			content: `The answer is revised: ${answer = input}\n\nPlease examine it again.`,
		});
	}
}

Workflow

import { optimize } from './optimize.ts';
import { evaluate as evaluate1 } from './evaluate.ts';
import { Controlflow, Draft } from '@zimtsui/catflow';
declare const evaluate2: typeof evaluate1;
declare const evaluate3: typeof evaluate1;

export const workflow = (problem: string) => Controlflow.from(problem)
    .then(optimize)
    .pipe(draft => evaluate1(problem, draft))
    .pipe(draft => evaluate2(problem, draft))
    .pipe(draft => evaluate3(problem, draft))
.draft satisfies Draft<string | Error>;

Progress Log

import { Draft, Finalized, Controlflow } from '@zimtsui/catflow';

function beginning(nextStage: string) {
	return async function *<input>(input: input): Draft<input> {
		if (nextStage) console.log(nextStage);
		return yield input;
	}
}
function ending(lastStage: string) {
	return async function *<input>(input: input): Draft<input> {
		try {
			return yield input;
		} catch (e) {
			if (e instanceof Finalized) {}
			else if (lastStage) console.log(lastStage);
			throw e;
		}
	}
}
declare const optimize: (problem: string) => Draft<string>;
declare const evaluate1: (problem: string, draft: Draft<string>) => Draft<string>;
declare const evaluate2: (problem: string, draft: Draft<string>) => Draft<string>;
declare const evaluate3: (problem: string, draft: Draft<string>) => Draft<string>;

export const workflow = (problem: string) => Controlflow.from(problem)

    .then(beginning('Optimization'))
    .then(optimize)
    .then(beginning('Optimization'))

    .then(beginning('Evaluation 1'))
    .pipe(draft => evaluate1(problem, draft))
    .then(ending('Evaluation 1'))

    .then(beginning('Evaluation 2'))
    .pipe(draft => evaluate2(problem, draft))
    .then(ending('Evaluation 2'))

    .then(beginning('Evaluation 3'))
    .pipe(draft => evaluate3(problem, draft))
    .then(ending('Evaluation 3'))

.draft satisfies Draft<string>;

Explanation of Catflow in Mathematics

Functor of Draft 草稿函子

In analogy to Promise<t>, which is a type of future values, Draft<t> is a type of draft values, because it can be rejected and sent back to the author for revision.

Promise<t> 是期值类型,类比地,Draft<t> 是草稿类型,因为草稿可以打回去给作者进行修改。

In analogy to the functor Promise, which maps from the category of present value types to the category of future value types, the functor Draft maps from the category of final value types to the category of draft value types.

Promise 函子从现值范畴映射到期值范畴,类比地,Draft 函子从终稿范畴映射到草稿范畴。

export type Draft<t> = AsyncGenerator<t, never, never>;

Natural Transformations of Draft Functor 草稿函子的自然变换

  • eta is a natural transformation from the identity functor to the functor Draft.

    eta 是从恒等函子到 Draft 函子的自然变换。

  • mu is a natural transformation from the functor Draft$^2$ to the functor Draft.

    mu 是从 Draft$^2$ 函子到 Draft 函子的自然变换。

  • from is a natural transformation from the functor Promise to the functor Draft.

    from 是从 Promise 函子到 Draft 函子的自然变换。

  • to is a natural transformation from the functor Draft to the functor Promise.

    to 是从 Draft 函子到 Promise 函子的自然变换。

export declare function eta<t>(x: t): Draft<t>;
export declare function mu<t>(x: Draft<Draft<t>>): Draft<t>;
export declare function from<t>(x: Promise<t>): Draft<t>;
export declare function to<t>(x: Draft<t>): Promise<t>;

Morphisms of Draft Category 草稿范畴的态射

An stateful evaluator node is a morphism of the draft category.

一个有状态评估器节点是草稿范畴的一个态射。

export type StatefulEvaluator<i, o> = (draft: Draft<i>) => Draft<o>;

Kleisli Morphism of Draft Monad 草稿单子的 Kleisli 态射

An stateless evaluator node is a morphism of the Kleisli category of draft monad.

一个无状态评估器节点是草稿单子的 Kleisli 范畴中的一个态射。

export type StatelessEvaluator<i, o> = (i: i) => Draft<o>;