@papidb/river
v0.2.0
Published
API workflow orchestration CLI for developers
Maintainers
Readme
river
TypeScript-first API workflow orchestration for developers.
river sits between a test runner and an API client. Instead of writing one-off scripts or manually clicking through requests, you define reusable flows in TypeScript, compose them, pass data between them, and run them from the CLI.
Why river?
Most tools force you into one of two modes:
- API clients are great for manual exploration, but awkward for repeatable multi-step setup.
- test runners are great for assertions, but not ideal when your real goal is to bootstrap data, log in, chain requests, and move on.
river is for the in-between case:
- log in
- create or fetch setup data
- chain outputs into later calls
- compose flows into bigger workflows
- keep the whole thing in regular TypeScript
Core idea
Each flow is an async function.
River supports both of these styles:
- no-input flows
- function-like flows with explicit input/output
Shared runtime context remains available in both.
import { flow } from '@papidb/river'
export default flow('login', async (river) => {
const res = await river.http.post<{ token: string }>('/auth/login', {
email: river.env('AUTH_EMAIL'),
password: river.env('AUTH_PASSWORD'),
})
river.headers.set('Authorization', `Bearer ${res.data.token}`)
river.state.set('login.token', res.data.token)
})And now River also supports flows that behave more like normal functions:
import { flow } from '@papidb/river'
type GetUserInput = { id: number }
type GetUserOutput = { user: { id: number; name: string } }
export default flow<GetUserInput, GetUserOutput>('get-user', async (river, input) => {
const res = await river.http.get<GetUserOutput['user']>(`/users/${input.id}`)
river.state.set('last.user', res.data)
return { user: res.data }
})Composition can then use explicit return values:
const result = await river.run(getUser, { id: 1 })
console.log(result.user.name)How to think about data flow
Use each mechanism for a different job:
- flow input/output → the explicit contract of a flow
river.state→ ephemeral shared state during one runriver.store→ persistent shared state across runsriver.env→ secrets and runtime configurationriver.http/river.headers/river.log→ runtime capabilities
Rule of thumb:
Use args/returns for what a flow means. Use
river.*for what a flow needs from the runtime.
The runtime context is namespaced for clarity:
river.http.get/post/put/delete/patchriver.headers.set/removeriver.state.set/getfor in-run stateriver.store.save/loadfor persistent state API surfaceriver.env()river.run(otherFlow)river.log()
Example
import { flow } from '@papidb/river'
export default flow('health-check', async (river) => {
const res = await river.http.get('/get')
river.log(`Status: ${res.status}`)
})CLI output:
river ▸ health-check (dev)
✓ health-check 200 2527ms
Status: 200
1 step completed in 2527ms · all passedInstall
Global install
This should be the normal way to use River:
pnpm install -g @papidb/riverThen use the CLI directly:
river --help
river init my-api-flows
cd my-api-flows
pnpm install
river run health-checkPackage and CLI names
- npm package:
@papidb/river - import path:
@papidb/river - CLI command:
river
If you do not want a global install, you can still run it with npx:
npx @papidb/river init my-api-flowsAnd inside TypeScript projects you import it like this:
import { flow, defineConfig } from '@papidb/river'Developing River locally
From this repository:
pnpm install
pnpm typecheck
pnpm dev -- --helpFor normal usage after install, prefer the real CLI:
river run health-checkWhen working inside this repository before a global install, use the dev script:
pnpm dev -- run health-checkOr from this repository's example:
cd examples/jsonplaceholder
river run full-chainGetting started with river init
Scaffold a minimal project with a single health-check flow:
npx @papidb/river init my-api-flowsYou can also provide defaults non-interactively:
npx @papidb/river init my-api-flows --yes --base-url http://localhost:4000If you are creating the river project inside another git repository and you do not want to commit it, use:
npx @papidb/river init api-flows --git-excludeThat adds the generated folder to the nearest parent repository's .git/info/exclude.
For local development of river itself, there is also:
npx @papidb/river init api-flows --localThat uses a local file: dependency instead of the published npm version.
Minimal project structure
my-api-project/
├── river.config.ts
├── environments/
│ └── dev.env
├── .env.example
├── .gitignore
├── package.json
├── tsconfig.json
└── flows/
└── health-check.tsriver.config.ts
import { defineConfig } from '@papidb/river'
export default defineConfig({
environments: {
dev: {
baseUrl: 'https://httpbin.org',
},
},
defaultEnv: 'dev',
defaults: {
headers: {
'Content-Type': 'application/json',
},
timeout: 10000,
},
})flows/health-check.ts
import { flow } from '@papidb/river'
export default flow('health-check', async (river) => {
const res = await river.http.get('/get')
river.log(`Status: ${res.status}`)
})Run it:
pnpm install
river run health-checkPublic examples
The repository ships with public examples against JSONPlaceholder so anyone can run them.
Directory:
examples/jsonplaceholder/
├── river.config.ts
└── flows/
├── health-check.ts
├── get-users.ts
├── get-user-posts.ts
├── create-post.ts
├── full-chain.ts
├── full-chain-failure.ts
└── full-chain-mid-failure.tsWhat they demonstrate
health-check.ts— simple GET requestget-users.ts— fetch users and store values in stateget-user-posts.ts— compose another flow, then chain response data into the next requestcreate-post.ts— POST request using data from earlier flow statefull-chain.ts— pipeline-style orchestration across multiple flowsfull-chain-failure.ts— realistic failure pipeline that succeeds through setup steps and then dies on a bad endpointfull-chain-mid-failure.ts— succeeds on the first step, fails in the middle, and proves later work does not run
Run the full example
cd examples/jsonplaceholder
river run full-chainRun the failure example
cd examples/jsonplaceholder
river run full-chain-failureRun the mid-flow failure example
cd examples/jsonplaceholder
river run full-chain-mid-failureExpected shape of the mid-flow failure output:
river ▸ full-chain-mid-failure (dev)
✓ get-users 200 ...ms
Loaded user: Leanne Graham
✓ full-chain-mid-failure 404 ...ms
2 steps completed in ...ms · failed
ERROR Flow failed: full-chain-mid-failure
GET https://jsonplaceholder.typicode.com/not-a-real-posts-endpoint
HTTP 404 Not Found in ...ms
Response:
{}The important part: anything after the failing request does not run.
Expected shape of the failure output:
river ▸ full-chain-failure (dev)
✓ get-users 200 ...ms
↷ get-users (cached)
✓ get-user-posts 200 ...ms
About to fail after loading post: "..."
✓ full-chain-failure 404 ...ms
3 steps completed in ...ms · failed
ERROR Flow failed: full-chain-failure
GET https://jsonplaceholder.typicode.com/not-a-real-endpoint
HTTP 404 Not Found in ...ms
Response:
{}Expected shape of the output:
river ▸ full-chain (dev)
✓ get-users 200 ...ms
↷ get-users (cached)
✓ get-user-posts 200 ...ms
✓ full-chain 200 ...ms
✓ create-post 201 ...ms
4 steps completed in ...ms · all passedError handling
river stops on failure by default.
If a request fails, the CLI now prints the useful parts of the failure:
- flow name
- HTTP method and URL
- status code
- duration
- response body excerpt
Example shape:
Flow failed: create-post
POST https://api.example.com/posts
HTTP 401 Unauthorized in 214ms
Response:
{
"message": "Unauthorized"
}Current command surface
Implemented now:
river init [name]river run <flow>
Planned next:
river listriver state ...--verbose--json
Status
river is currently an early preview.
What works today:
river init [name]river run <flow>- TypeScript flow files
- namespaced runtime context:
river.http.*,river.headers.*,river.state.*,river.store.* - flow composition with
river.run(otherFlow) - in-run state sharing
- flow caching with
cache: true - public example flows under
examples/jsonplaceholder/
What is still in progress:
river list- persistent disk-backed store
- verbose / JSON output modes
- declarative flow execution
