@nicolastoulemont/std
v0.11.0
Published
Collection of utility functions
Downloads
135
Readme
@nicolastoulemont/std
Introduction
@nicolastoulemont/std is a functional TypeScript toolkit for modeling domain data, handling failures explicitly, and composing sync or async workflows.
It is designed for application code where clear control flow, predictable typing, and dependency-aware orchestration matter.
The API is pipe-friendly, namespace-oriented, and built from small primitives you can combine incrementally.
Installation
Install the base package:
pnpm add @nicolastoulemont/stdInstall optional extras used by some modules and examples:
pnpm add @nicolastoulemont/std zod multithreadingzodis optional. The schema-backed tagged union examples below use it, butSchemaaccepts any Standard Schema-compatible validator, includingzod,valibot,arktype, and similar libraries.multithreadingis optional. It is only required if you want to use theMultithreadmodule.
Quick Start
import { Data, Result, pipe } from "@nicolastoulemont/std"
class InvalidPortError extends Data.TaggedError("InvalidPortError")<{ input: string }> {}
const parsePort = (input: string) =>
pipe(
Result.try(() => Number.parseInt(input, 10)),
Result.filter(
(n) => Number.isInteger(n) && n > 0,
() => new InvalidPortError({ input }),
),
)import { Fx, Layer, Provide, Service, pipe } from "@nicolastoulemont/std"
const Config = Service.tag<{ baseUrl: string }>("Config")
const ConfigLive = Layer.ok(Config, { baseUrl: "https://api.example.com" })
const program = Fx.gen(function* () {
const config = yield* Config
return config.baseUrl
})
const exit = Fx.run(pipe(program, Provide.layer(ConfigLive)))
const response = Fx.match(exit, {
Ok: (ok) => ({ status: 200, body: ok.value }),
Err: (err) => ({ status: 400, body: err.error }),
Defect: (defect) => ({ status: 500, body: String(defect.defect) }),
})Main Modules
Result
Result models success and failure with typed errors so transformations stay explicit and composable.
Abstract Example
import { Data, Result, pipe } from "@nicolastoulemont/std"
class NotPositiveIntegerError extends Data.TaggedError("NotPositiveIntegerError")<{ input: string }> {}
const parsePositiveInt = (input: string) => {
const parsed = Number.parseInt(input, 10)
return pipe(
Result.ok(parsed),
Result.filter(
(n) => Number.isInteger(n) && n > 0,
() => new NotPositiveIntegerError({ input }),
),
)
}Real-World Example
import { Data, Result, pipe } from "@nicolastoulemont/std"
class ValidationError extends Data.TaggedError("ValidationError")<{ message: string }> {}
class ConflictError extends Data.TaggedError("ConflictError")<{ message: string }> {}
type SignupError = ValidationError | ConflictError
const validateEmail = (email: string) =>
email.includes("@") ? Result.ok(email) : Result.err<SignupError>(new ValidationError({ message: "Invalid email" }))
const createUser = (email: string) =>
email === "[email protected]"
? Result.err<SignupError>(new ConflictError({ message: "Email already used" }))
: Result.ok({ id: "u_123", email })
const signup = (email: string) => pipe(validateEmail(email), Result.flatMap(createUser))Option
Option models optional presence and absence when missing data is expected and not an error condition.
Abstract Example
import { Option, pipe } from "@nicolastoulemont/std"
const normalizedName = (value: string | undefined) =>
pipe(
Option.fromNullable(value),
Option.map((name) => name.trim()),
Option.filter((name) => name.length > 0),
Option.unwrapOr("Anonymous"),
)Real-World Example
import { Option, pipe } from "@nicolastoulemont/std"
const readPagination = (query: URLSearchParams) => ({
page: pipe(
Option.fromNullable(query.get("page")),
Option.map((raw) => Number.parseInt(raw, 10)),
Option.filter((n) => Number.isInteger(n) && n > 0),
Option.unwrapOr(1),
),
limit: pipe(
Option.fromNullable(query.get("limit")),
Option.map((raw) => Number.parseInt(raw, 10)),
Option.filter((n) => Number.isInteger(n) && n > 0 && n <= 100),
Option.unwrapOr(20),
),
})Either
Either models two meaningful branches where both sides are valid outcomes rather than success versus failure.
Abstract Example
import { Either } from "@nicolastoulemont/std"
const parseSource = (input: "local" | "remote") => (input === "local" ? Either.left("LOCAL") : Either.right("REMOTE"))
const label = Either.match(parseSource("local"), {
Left: (source) => `Source: ${source}`,
Right: (source) => `Source: ${source}`,
})Real-World Example
import { Either, pipe } from "@nicolastoulemont/std"
type Source = "cache" | "database"
type User = { id: string; name: string }
const findUser = (id: string) =>
id.startsWith("cached:") ? Either.left<Source, User>("cache") : Either.right<Source, User>({ id, name: "Ada" })
const responseMeta = (id: string) =>
pipe(
findUser(id),
Either.match({
Left: (source) => ({ source, stale: true }),
Right: (user) => ({ source: "database" as const, stale: false, user }),
}),
)Brand
Brand adds nominal typing to primitives and other values without changing their runtime representation.
Use Brand.make when the source is already trusted, and Brand.refine when you want a validated branded value wrapped in Result.
Abstract Example
import { Brand } from "@nicolastoulemont/std"
type Port = Brand.Branded<number, "Port">
const toPort = Brand.refine<Port>((value) => Number.isInteger(value) && value > 0 && value <= 65_535, "Invalid port")
const parsed = toPort(3000)Real-World Example
import { Brand, Result, pipe } from "@nicolastoulemont/std"
type Email = Brand.Branded<string, "Email">
const toEmail = Brand.refine<Email>(
(value) => value.includes("@"),
(value) => `Invalid email: ${value}`,
)
const register = (input: { email: string }) =>
pipe(
Result.ok(input.email),
Result.flatMap(toEmail),
Result.map((email) => ({ email })),
)Predicate
Predicate provides small composable boolean predicates and refinements for filtering, narrowing, and request validation.
Abstract Example
import { Predicate } from "@nicolastoulemont/std"
const isPositiveEven = Predicate.and<number>(
(n) => n > 0,
(n) => n % 2 === 0,
)
const ok = isPositiveEven(4)Real-World Example
import { Predicate } from "@nicolastoulemont/std"
type TeamRole = "Admin" | "Member"
type TeamRoleSubject = {
teamRole: TeamRole | null
}
type TeamNameSubject = {
name: string
}
const isTeamAdmin = (team: TeamRoleSubject) => team.teamRole === "Admin"
const isNotPersonalTeam = (team: TeamNameSubject) => team.name !== "Personal"
const canManageTeam = Predicate.and(isTeamAdmin, isNotPersonalTeam)
const canManage = canManageTeam({ teamRole: "Admin", name: "Core" })Schema
Schema wraps sync Standard Schema-compatible validators for three production use cases: boundary parsing, refinement, and bidirectional codecs.
Schema accepts provider schemas directly, even when their Standard Schema type is
sync-or-async. Validation is enforced as sync at runtime; if a schema returns a
Promise, Schema throws a TypeError.
Use Schema.parse at I/O boundaries when a broad external type hides smaller implicit subtypes.
Use Schema.is for direct in-memory proof checks.
Use Schema.refine when you need a reusable preserved-shape predicate, especially with higher-order APIs like Array.filter.
Use Schema.codec when you need an explicit bidirectional boundary adapter with validated encoded and decoded sides.
Use Schema.codec.json for JSON string encoding/decoding with explicit JSON.stringify options.
Use Schema.Refine<Base, typeof schema> for a reusable preserved-shape narrowed type, and Schema.Infer<typeof schema> for the exact schema output type.
Use Schema.struct, Schema.tagged, and Schema.union when you want schema-backed constructors that validate before returning typed data.
Only use Schema.is and Schema.refine with proof schemas that validate the current value in place. Transforms, defaults, and coercions should continue to use Schema.parse.
Boundary Parsing Example
import { Result, Schema } from "@nicolastoulemont/std"
import { z } from "zod"
type Ticket = {
channel: "chat" | "email"
chatId?: string | null
metadata?: {
conversationId?: string | null
} | null
}
type ChatTicket = {
channel: "chat"
chatId: string
metadata: {
conversationId: string
}
}
const ChatTicketSchema = z.object({
channel: z.literal("chat"),
chatId: z.string(),
metadata: z.object({
conversationId: z.string(),
}),
})
const parseChatTicket = Schema.parse(ChatTicketSchema)
const result = parseChatTicket({
channel: "chat",
chatId: "chat_123",
metadata: { conversationId: "conv_123" },
})
if (Result.isOk(result)) {
result.value.metadata.conversationId
}Direct In-Memory Proof Example
import { Schema } from "@nicolastoulemont/std"
import { z } from "zod"
type PersistedTicket = {
id: string
channel: "chat" | "email"
chatId?: string | null
metadata?: {
conversationId?: string | null
} | null
}
type ChatTicketFields = {
channel: "chat"
chatId: string
}
const ChatTicketSchema: Schema.SyncSchema<PersistedTicket, ChatTicketFields> = z.object({
channel: z.literal("chat"),
chatId: z.string(),
})
declare const ticket: PersistedTicket
if (Schema.is(ticket, ChatTicketSchema)) {
ticket.id
ticket.chatId.toUpperCase()
}In direct control flow, TypeScript preserves the current type shape and intersects in the schema-proven fields.
Reusable Preserved-Shape Predicate Example
import { Schema } from "@nicolastoulemont/std"
import { z } from "zod"
type PersistedTicket = {
id: string
channel: "chat" | "email"
chatId?: string | null
}
type ChatTicketFields = {
channel: "chat"
chatId: string
}
const ChatTicketSchema: Schema.SyncSchema<PersistedTicket, ChatTicketFields> = z.object({
channel: z.literal("chat"),
chatId: z.string(),
})
type ChatTicket = Schema.Refine<PersistedTicket, typeof ChatTicketSchema>
const isChatTicket = Schema.refine(ChatTicketSchema)
declare const tickets: PersistedTicket[]
const chatTickets = tickets.filter(isChatTicket)Use Schema.Infer<typeof schema> when you need the exact schema output, and use Schema.parse whenever the schema changes the output shape.
Bidirectional Codec Example
import { Result, Schema } from "@nicolastoulemont/std"
import { z } from "zod"
const Port = z.number().int().min(1).max(65535)
const PortString = Schema.codec({
encoded: z.string(),
decoded: Port,
decode: (encoded) => Number(encoded),
encode: (decoded) => String(decoded),
})
const decoded = PortString.decode("3000")
const encoded = PortString.encode(3000)
if (Result.isOk(decoded)) {
decoded.value
}
if (Result.isOk(encoded)) {
encoded.value
}Schema.codec validates both sides:
decode: encoded input -> encoded validation -> decode transform -> decoded validation -> decoded output
encode: decoded value -> decoded validation -> encode transform -> encoded validation -> encoded outputCodec errors are tagged errors under Schema.Codec:
type CodecError = Schema.Codec.ErrorJSON Codec Example
import { Result, Schema } from "@nicolastoulemont/std"
import { z } from "zod"
const User = z.object({
id: z.string(),
name: z.string(),
})
const UserJson = Schema.codec.json(User, { space: 2 })
const decoded = UserJson.decode('{"id":"u1","name":"Alice"}')
const encoded = UserJson.encode({ id: "u1", name: "Alice" })
if (Result.isOk(decoded)) {
decoded.value.name
}
if (Result.isOk(encoded)) {
encoded.value
}Schema-Backed Struct Example
import { Schema } from "@nicolastoulemont/std"
import { z } from "zod"
const Folder = Schema.struct(
z.object({
id: z.string(),
name: z.string(),
archived: z.boolean().default(false),
}),
)
const created = Folder({ id: "folder_1", name: "Inbox" })
if (created._tag === "Ok") {
created.value.equals({ id: "folder_1", name: "Inbox", archived: false })
}Schema-Backed Tagged Union Example
Schema builds tagged unions backed by any Standard Schema-compatible validator.
The examples below use zod, but the same API works with valibot, arktype, and other libraries that implement the Standard Schema contract.
import { Data, Schema } from "@nicolastoulemont/std"
import { z } from "zod"
const Shape = Schema.union("Shape", {
Circle: z.object({ radius: z.number() }),
Square: z.object({ side: z.number() }),
})
type Shape = Schema.Union.Infer<typeof Shape>
const describeShape = (shape: Shape) =>
Data.match(shape, {
Circle: (value) => `circle(${value.radius})`,
Square: (value) => `square(${value.side})`,
})Real-World Example
import { Data, Schema } from "@nicolastoulemont/std"
import { z } from "zod"
const OrderState = Schema.union("OrderState", {
Draft: z.object({ id: z.string() }),
Confirmed: z.object({ id: z.string(), paymentId: z.string() }),
Shipped: z.object({ id: z.string(), trackingId: z.string() }),
})
type OrderState = Schema.Union.Infer<typeof OrderState>
const badgeLabel = (state: OrderState) =>
Data.match(state, {
Draft: () => "Waiting for payment",
Confirmed: () => "Preparing shipment",
Shipped: (value) => `Shipped: ${value.trackingId}`,
})Data
Data creates immutable structural value objects with stable equality and hashing semantics. Use it when you want value semantics for structs, tuples, arrays, tagged records, custom error types, or pattern matching over tagged data.
Abstract Example
import { Data } from "@nicolastoulemont/std"
const a = Data.struct({ env: "prod", retries: 3 })
const b = Data.struct({ env: "prod", retries: 3 })
const same = a.equals(b) // trueSchema.struct is the validated counterpart to Data.struct: it validates through a sync Standard Schema, then wraps the validated object as a structural value.
Real-World Example
import { Data } from "@nicolastoulemont/std"
const previous = Data.struct({ search: "books", sort: "price-asc" })
const next = Data.struct({ search: "books", sort: "price-asc" })
if (previous.equals(next)) {
// Skip redundant fetch because filter state is structurally identical
}Order
Order provides composable comparators and immutable sorting helpers.
Use Order.string for deterministic lexicographic ordering and Order.collator(...) when locale rules or numeric string sorting matter.
Abstract Example
import { Order, pipe } from "@nicolastoulemont/std"
type User = { name: string; age: number }
const byAge = Order.by(Order.number, (user: User) => user.age)
const byName = Order.by(Order.string, (user: User) => user.name)
const userOrder = Order.merge(byAge, byName)
const sameOrder = pipe(byAge, Order.merge(byName))
const allOrders = Order.merge([byAge, byName])
const sorted = Order.sort(
[
{ name: "bob", age: 30 },
{ name: "alice", age: 30 },
{ name: "zoe", age: 25 },
],
allOrders,
)Real-World Example
import { Order } from "@nicolastoulemont/std"
const collator = new Intl.Collator("en", { numeric: true })
type Product = {
id: string
category: string
price: number
rating: number
}
const byCategory = Order.by(Order.collator(collator), (product: Product) => product.category)
const byPrice = Order.by(Order.number, (product: Product) => product.price)
const byRatingDesc = Order.reverse(Order.by(Order.number, (product: Product) => product.rating))
const sortProducts = Order.sortBy(byCategory, byPrice, byRatingDesc)
const products: Product[] = [
{ id: "a", category: "books", price: 20, rating: 4.8 },
{ id: "b", category: "books", price: 20, rating: 4.5 },
{ id: "c", category: "games", price: 60, rating: 4.7 },
]
const sorted = sortProducts(products)Context
Context is the typed immutable service map used by Fx, Layer, and Provide.
Use it directly when you want to assemble dependencies without building a layer first.
Abstract Example
import { Context, Service, pipe } from "@nicolastoulemont/std"
const Logger = Service.tag<{ log: (message: string) => void }>("Logger")
const Clock = Service.tag<{ now: () => number }>("Clock")
const ctx = pipe(Context.make(Logger, { log: () => undefined }), Context.add(Clock, { now: () => 123 }))
const now = Context.get(ctx, Clock).now()Real-World Example
import { Context, Service, pipe } from "@nicolastoulemont/std"
const Config = Service.tag<{ apiBaseUrl: string }>("Config")
const Request = Service.tag<{ id: string }>("Request")
const base = Context.make(Config, { apiBaseUrl: "https://api.example.com" })
const requestCtx = pipe(base, Context.add(Request, { id: "req_123" }))
const requestId = Context.get(requestCtx, Request).idService
Service defines typed dependency tags that can be yielded inside Fx.gen.
Use Service.tag(...) for interface-only tags and Service.Service<...>()("...") when you want a class-style service tag.
Abstract Example
import { Fx, Provide, Service } from "@nicolastoulemont/std"
const Clock = Service.tag<{ now: () => number }>("Clock")
const program = Fx.gen(function* () {
return (yield* Clock).now()
})
const exit = Fx.run(Provide.service(Clock, { now: () => 123 })(program))Real-World Example
import { Fx, Provide, Service } from "@nicolastoulemont/std"
const Logger = Service.Service<{ info: (message: string) => void }>()("Logger")
const program = Fx.gen(function* () {
const logger = yield* Logger
logger.info("starting request")
return "ok"
})
const exit = Fx.run(
Provide.service(Logger, {
info: (message) => console.log(message),
})(program),
)Layer
Layer builds services, composes service graphs, and models dependency construction separately from program execution. Use it when the service itself has dependencies, can fail, or needs scoped cleanup.
Abstract Example
import { Fx, Layer, Provide, Service } from "@nicolastoulemont/std"
const Port = Service.tag<number>("Port")
const PortLive = Layer.ok(Port, 3000)
const program = Fx.gen(function* () {
return yield* Port
})
const exit = Fx.run(Provide.layer(PortLive)(program))Real-World Example
import { Fx, Layer, Provide, Service } from "@nicolastoulemont/std"
const Config = Service.tag<{ baseUrl: string }>("Config")
const Client = Service.tag<{ get: (path: string) => string }>("Client")
const ConfigLive = Layer.ok(Config, { baseUrl: "https://api.example.com" })
const ClientLive = Layer.fx(Client)(
Fx.gen(function* () {
const config = yield* Config
return {
get: (path: string) => `${config.baseUrl}${path}`,
}
}),
)
const Live = Layer.provide(ConfigLive)(ClientLive)
const program = Fx.gen(function* () {
return (yield* Client).get("/users")
})
const exit = Fx.run(Provide.layer(Live)(program))Provide
Provide resolves Fx requirements using a service, a context, or a fully-built layer.
It is the last step that turns a dependency-requiring effect into a runnable one.
Abstract Example
import { Fx, Provide, Service } from "@nicolastoulemont/std"
const Port = Service.tag<number>("Port")
const readPort = Fx.gen(function* () {
return yield* Port
})
const exit = Fx.run(Provide.service(Port, 3000)(readPort))Use Provide.layers(...) when you want the ergonomics of Provide.layer(Layer.merge(...)) without the extra nesting.
Real-World Example
import { Context, Fx, Provide, Service, pipe } from "@nicolastoulemont/std"
const Config = Service.tag<{ baseUrl: string }>("Config")
const Logger = Service.tag<{ info: (message: string) => void }>("Logger")
const ctx = pipe(
Context.make(Config, { baseUrl: "https://api.example.com" }),
Context.add(Logger, { info: (message) => console.log(message) }),
)
const program = Fx.gen(function* () {
const config = yield* Config
const logger = yield* Logger
logger.info(`Calling ${config.baseUrl}`)
return config.baseUrl
})
const exit = Fx.run(Provide.context(ctx)(program))Fx
Fx models generator-based effects with typed dependencies, typed failures, and sync or async execution.
It is the center of the effectful part of the library, and it composes naturally with Result, Option, Layer, Provide, and Service.
Abstract Example
import { Fx, Layer, Provide, Service, pipe } from "@nicolastoulemont/std"
const Clock = Service.tag<{ now: () => number }>("Clock")
const ClockLive = Layer.ok(Clock, { now: () => Date.now() })
const program = Fx.gen(function* () {
const clock = yield* Clock
return clock.now()
})
const exit = Fx.run(pipe(program, Provide.layer(ClockLive)))
const timestamp = Fx.match(exit, {
Ok: (ok) => ok.value,
Err: () => 0,
Defect: () => 0,
})Real-World Example
import { Data, Fx, Layer, Provide, Result, Service, pipe } from "@nicolastoulemont/std"
const Api = Service.tag<{ postOrder: (input: { sku: string; qty: number }) => Promise<{ orderId: string }> }>("Api")
const ApiLive = Layer.ok(Api, {
postOrder: async () => ({ orderId: "ord_42" }),
})
class InvalidQuantityError extends Data.TaggedError("InvalidQuantityError")<{ qty: number }> {}
const submitOrder = Fx.gen(async function* (payload: { sku?: string; qty: number }) {
const api = yield* Api
const sku = yield* Fx.option(payload.sku)
const validQty = yield* Result.filter(
Result.ok(payload.qty),
(qty) => qty > 0,
(qty) => new InvalidQuantityError({ qty }),
)
const created = await Fx.try(() => api.postOrder({ sku, qty: validQty }))
return yield* created
})
const exit = await Fx.run(pipe(submitOrder({ sku: "book-1", qty: 2 }), Provide.layer(ApiLive)))Fiber
Fiber exposes handles for running Fx computations.
Use Fx.runFork to start a root fiber outside a program, and use Fx.forkChild or Fx.forkDetach inside Fx.gen.
Child fibers are owned by the current fiber and are interrupted when the parent exits.
Detached fibers snapshot the current runtime state but are not parent-owned.
Interruption is cooperative: long-running programs should yield with Fx.yieldNow() or wait on interruptible runtime operations.
Abstract Example
import { Fx } from "@nicolastoulemont/std"
const fiber = Fx.runFork(
Fx.gen(async function* () {
yield* Fx.yieldNow()
return 42
}),
)
const exit = await fiber.await()
// => { _tag: "Ok", value: 42 }Real-World Example
import { Fiber, Fx } from "@nicolastoulemont/std"
const program = Fx.gen(async function* () {
const child = yield* Fx.forkChild(
Fx.gen(async function* () {
yield* Fx.yieldNow()
return "child-ready"
}),
)
const value = yield* Fiber.join(child)
const statusAfterJoin = yield* Fiber.status(child)
return { value, statusAfterJoin }
})
const exit = await Fx.run(program)
// => { _tag: "Ok", value: { value: "child-ready", statusAfterJoin: "Done" } }Log
Log stores contextual fields in the Fx runtime state and sends log events to installed logger backends.
Log events include merged fields, active log spans, trace metadata when a span exists, and the current fiber id.
Logger failures are ignored so logging cannot fail the user program.
Abstract Example
import { Fx, Log, Provide } from "@nicolastoulemont/std"
const events: Log.Event[] = []
const program = Provide.layer(Log.layer({ log: (event) => events.push(event) }))(
Fx.gen(function* () {
const logger = yield* Log.context({ requestId: "req_1" })
yield* logger.info("request received", { route: "/orders" })
return "ok"
}),
)
const exit = Fx.run(program)
// => { _tag: "Ok", value: "ok" }Real-World Example
import { Fx, Log, Provide } from "@nicolastoulemont/std"
const logger = Log.json()
const program = Provide.layer(Log.layer(logger))(
Log.withFields({ service: "checkout" })(
Fx.gen(function* () {
const log = yield* Log.context({ requestId: "req_42" }, { logSpan: "checkout" })
yield* log.info("charging card", { amountCents: 4600 })
return "paid"
}),
),
)
const exit = Fx.run(program)
// => { _tag: "Ok", value: "paid" }Use Log.withSpan for log timing decoration. Use Trace.span when you need a real tracing span.
Trace
Trace stores span context in the Fx runtime state.
Use Trace.span to wrap an effect in a tracing span, Trace.annotate or Trace.attribute to attach attributes, and Trace.event to add events.
Trace.layer installs a tracer backend; Trace.native() provides a lightweight in-memory-compatible tracer implementation for local use and tests.
Abstract Example
import { Fx, Trace } from "@nicolastoulemont/std"
const program = Trace.span(
"checkout",
Fx.gen(function* () {
yield* Trace.attribute("order.id", "ord_42")
yield* Trace.event("charged")
const context = yield* Trace.currentContext()
return context === undefined ? "missing" : "traced"
}),
)
const exit = Fx.run(program)
// => { _tag: "Ok", value: "traced" }Real-World Example
import { Fx, Log, Provide, Trace } from "@nicolastoulemont/std"
const logger = Log.console()
const tracer = Trace.native()
const program = Provide.layers(
Log.layer(logger),
Trace.layer(tracer),
)(
Trace.span(
"POST /orders",
Fx.gen(function* () {
const log = yield* Log.context({ requestId: "req_42" })
yield* Trace.annotate({ "http.method": "POST", "http.route": "/orders" })
yield* log.info("request started")
yield* Trace.event("order.created", { orderId: "ord_42" })
return "ord_42"
}),
{ kind: "server" },
),
)
const exit = Fx.run(program)
// => { _tag: "Ok", value: "ord_42" }Duration
Duration provides fixed-size, millisecond-backed values for retries, timeouts, and config-style inputs.
Abstract Example
import { Duration, Result } from "@nicolastoulemont/std"
const timeout = Duration.seconds(30)
const parsed = Duration.parse("5 minutes")
const timeoutMs = Result.match(parsed, {
Ok: Duration.toMillis,
Err: (error) => error._tag,
})Real-World Example
import { Duration, Schedule } from "@nicolastoulemont/std"
const retry = Schedule.fixed({
times: 3,
delayMs: Duration.seconds(1),
})
const backoff = Schedule.exponential({
times: 5,
baseDelayMs: "0.5 seconds",
maxDelayMs: "10 seconds",
})Schedule
Schedule describes retry policies for Fx.retry.
Use recurs for immediate retries, fixed for constant delays, and exponential for backoff.
Abstract Example
import { Schedule } from "@nicolastoulemont/std"
const schedule = Schedule.recurs(2)
const delay = schedule.delayForAttempt(1)Real-World Example
import { Duration, Fx, Result, Schedule } from "@nicolastoulemont/std"
let attempts = 0
const flaky = Fx.gen(function* () {
attempts += 1
if (attempts < 3) {
return yield* Result.err("temporary" as const)
}
return "ok"
})
const exit = await Fx.run(
Fx.retry(
flaky,
Schedule.exponential({
times: 5,
baseDelayMs: Duration.millis(100),
maxDelayMs: "1 seconds",
}),
),
)Scope
Scope manages finalizers and nested resource lifecycles.
It is mostly used by Layer.scoped and Provide.layer, but you can use it directly when you want explicit cleanup semantics.
Abstract Example
import { Fx, Result, Scope } from "@nicolastoulemont/std"
let released = false
const scope = Scope.make()
Fx.run(
scope.addFinalizer(() =>
Fx.gen(function* () {
released = true
}),
),
)
Fx.run(scope.close(Result.ok(undefined)))Real-World Example
import { Fx, Result, Scope } from "@nicolastoulemont/std"
const events: string[] = []
const root = Scope.make()
const child = root.fork()
Fx.run(
root.addFinalizer(() =>
Fx.gen(function* () {
events.push("root")
}),
),
)
Fx.run(
child.addFinalizer(() =>
Fx.gen(function* () {
events.push("child")
}),
),
)
Fx.run(root.close(Result.ok(undefined)))Queue
Queue provides a standalone FIFO task queue with configurable concurrency, backpressure, and lifecycle controls.
Use it when you want bounded async work without adopting the full Fx model.
Abstract Example
import { Queue } from "@nicolastoulemont/std"
const queue = Queue.make({ concurrency: 2 })
const first = queue.enqueue(() => 1)
const second = queue.enqueue(async () => 2)
await queue.awaitIdle()
await queue.shutdown({ mode: "drain" })Real-World Example
import { Queue } from "@nicolastoulemont/std"
const imageQueue = Queue.bounded(100, { concurrency: 4 })
const tasks = imageUrls.map((url) =>
imageQueue.enqueue(async ({ signal }) => {
const response = await fetch(url, { signal })
return response.arrayBuffer()
}),
)
const buffers = await Promise.all(tasks)
await imageQueue.shutdown({ mode: "drain" })Multithread
Multithread runs self-contained callbacks in worker threads using a Result-first API while remaining yieldable in Fx.gen.
It requires the optional multithreading dependency at runtime.
Use it for independent CPU-heavy work that should not block the main thread, such as parsing large payloads, validating batches, compression, or expensive transforms. Callbacks are serialized into workers, so they must be self-contained: define helper functions inside the callback or pass data as arguments.
Lifecycle
Configure the worker pool before the first multithread operation starts:
import { Multithread } from "@nicolastoulemont/std"
const configured = Multithread.configure({ maxWorkers: 4 })
// => { _tag: "Ok", value: undefined }maxWorkers is the maximum number of worker threads in the shared runtime pool.
Per-operation parallelism controls how many jobs an operation tries to keep in flight, but the runtime can only execute up to maxWorkers worker jobs at the same time.
For Fx programs, prefer the scoped layer so the worker runtime is shut down with the scope:
import { Fx, Multithread, Provide } from "@nicolastoulemont/std"
const program = Provide.layer(Multithread.layer({ maxWorkers: 4 }))(
Fx.gen(async function* () {
return yield* Multithread.fx(() => 42)
}),
)
const exit = await Fx.run(program)
// => { _tag: "Ok", value: 42 }Inside Fx.gen
Use Multithread.fx inside reusable Fx.gen programs. It creates a fresh worker operation for each Fx execution, so separate runs do not accidentally share cancellation or memoized state.
import { Fx, Multithread, Provide } from "@nicolastoulemont/std"
type PurchaseEvent = {
readonly id: string
readonly accountId: string
readonly cents: number
}
const ndjsonLines = [
'{"id":"evt_1","accountId":"acct_1","cents":1200}',
'{"id":"evt_2","accountId":"acct_2","cents":3400}',
]
const program = Provide.layer(Multithread.layer({ maxWorkers: 4 }))(
Fx.gen(async function* () {
const events = yield* Multithread.fx((lines, ctx) => {
const parsed: PurchaseEvent[] = []
for (const line of lines) {
ctx.throwIfCancelled()
let value: Partial<PurchaseEvent>
try {
value = JSON.parse(line) as Partial<PurchaseEvent>
} catch {
return { _tag: "Err" as const, error: { _tag: "InvalidPurchaseEvent" as const, line } }
}
if (typeof value.id !== "string" || typeof value.accountId !== "string" || typeof value.cents !== "number") {
return { _tag: "Err" as const, error: { _tag: "InvalidPurchaseEvent" as const, line } }
}
parsed.push({
id: value.id,
accountId: value.accountId,
cents: value.cents,
})
}
return parsed
}, ndjsonLines)
return events.reduce((sum, event) => sum + event.cents, 0)
}),
)
const exit = await Fx.run(program)
// => { _tag: "Ok", value: 4600 }Fiber interruption aborts the worker operation. Worker cancellation is cooperative, so long-running callbacks should call ctx.throwIfCancelled() inside loops.
Direct Operation API
Use Multithread.run when you need the lower-level operation handle for direct Result-first usage, manual cancellation, explicit sharing, or composition with race and firstSuccess.
import { Multithread } from "@nicolastoulemont/std"
const op = Multithread.run((input: string, ctx) => {
ctx.throwIfCancelled()
return input.toUpperCase()
}, "hello")
const result = await op.result()
// => { _tag: "Ok", value: "HELLO" }MultithreadOp is cold and memoized. Once started, repeated result() calls observe the same worker result.
If you share one operation between multiple fibers or callers, cancellation is also shared: abort() cancels it for all current and future waiters.
const op = Multithread.run((ctx) => {
while (true) {
ctx.throwIfCancelled()
}
})
op.abort()
const result = await op.result()
// => { _tag: "Err", error: { _tag: "MultithreadCancelledError", ... } }Collection Work
Use map, forEach, filter, and flatMap for independent collection work.
Each worker receives (value, index, ctx).
import { Multithread } from "@nicolastoulemont/std"
const result = await Multithread.map(
[35, 36, 37],
(n, _index, ctx) => {
const fib = (value: number): number => (value <= 1 ? value : fib(value - 1) + fib(value - 2))
ctx.throwIfCancelled()
return fib(n)
},
{ parallelism: 3 },
).result()
// => { _tag: "Ok", value: [9227465, 14930352, 24157817] }race returns the first settled operation and aborts the rest.
firstSuccess returns the first successful operation and aggregates failures if all operations fail.
Performance depends on worker startup, callback serialization, payload transfer, and task size. Small jobs can be slower in workers. There is an opt-in probe for local measurement:
RUN_MULTITHREAD_PERFORMANCE=1 pnpm --filter @nicolastoulemont/std test src/multithread/tests/multithreading.performance.test.tspipe / flow
pipe and flow compose sync or async transformations into readable, type-inferred data pipelines.
Abstract Example
import { flow, pipe } from "@nicolastoulemont/std"
const toLabel = flow(
(n: number) => n * 2,
(n) => n.toString(),
(s) => `value:${s}`,
)
const result = pipe(10, (n) => n + 1, toLabel)Real-World Example
import { pipe } from "@nicolastoulemont/std"
type RawProfile = { name?: string; age?: string }
const normalizeProfile = (input: RawProfile) =>
pipe(
input,
(p) => ({ name: p.name?.trim() ?? "", age: Number.parseInt(p.age ?? "0", 10) }),
(p) => ({ ...p, age: Number.isNaN(p.age) ? 0 : p.age }),
(p) => ({ ...p, isAdult: p.age >= 18 }),
)