antiox
v0.1.4
Published
Rust- and Tokio-like async primitives for TypeScript.
Readme
Pre-release: This library is used in production but the API is subject to change.
Why Antiox?
- Async primitives: Channels, streams, select, tasks, and more. Everything you need for structured concurrency and backpressure in TypeScript.
- Lightweight No custom DSL, no wrapper types, no unnecessary allocations, and no dependencies. Intentionally does not implement
Result,Option, ormatch. TypeScript'sT | null, union types, andswitchalready cover these at zero cost. - Rust-shaped: The control flow and concurrency patterns you miss from Rust, mapped onto native JS primitives. Because let's be honest, you wish you were writing Rust instead.
- Lightweight: Every module is tree-shakeable and tiny enough to ship as a transitive dependency without burdening downstream consumers.
Install
npm install antioxOverview
The biggest win from antiox is channels and streams — primitives that give you structured concurrency and backpressure without callbacks, event emitters, or custom DSLs. Combine them with tasks to build powerful async systems.
import { channel } from "antiox/sync/mpsc";
import { oneshot, OneshotSender } from "antiox/sync/oneshot";
import { unreachable } from "antiox/panic";
import { spawn } from "antiox/task";
type Msg =
| { type: "increment"; amount: number }
| { type: "get"; resTx: OneshotSender<number> };
const [tx, rx] = channel<Msg>(32);
spawn(async () => {
let count = 0;
for await (const msg of rx) {
switch (msg.type) {
case "increment":
count += msg.amount;
break;
case "get":
msg.resTx.send(count);
break;
default:
// `unreachable(x: never)` provides compile-time exhaustiveness checking for switch statements
unreachable(msg);
}
}
});
// Fire-and-forget
await tx.send({ type: "increment", amount: 5 });
// Request-response via oneshot channel
const [resTx, resRx] = oneshot<number>();
await tx.send({ type: "get", resTx });
const value = await resRx;Bounded channels give you backpressure, for await gives you clean shutdown on disconnect, and oneshot channels give you typed request-response.
Modules
| Module | Rust Equivalent | Minified | Gzip |
|--------|-----------------|----------|------|
| antiox/sync/mpsc | tokio::sync::mpsc | 5.1 KB | 1.4 KB |
| antiox/stream | tokio_stream / futures::stream | 12.0 KB | 3.5 KB |
| antiox/task | tokio::task | 2.0 KB | 932 B |
| antiox/sync/oneshot | tokio::sync::oneshot | 1.7 KB | 625 B |
| antiox/sync/select | tokio::select! | 338 B | 260 B |
| antiox/time | tokio::time | 936 B | 530 B |
| antiox/sync/watch | tokio::sync::watch | 1.7 KB | 677 B |
| antiox/sync/broadcast | tokio::sync::broadcast | 2.4 KB | 936 B |
| antiox/sync/mutex | tokio::sync::Mutex | 1.4 KB | 606 B |
| antiox/sync/semaphore | tokio::sync::Semaphore | 2.0 KB | 845 B |
| antiox/sync/rwlock | tokio::sync::RwLock | 2.2 KB | 778 B |
| antiox/sync/cancellation_token | tokio_util::sync::CancellationToken | 623 B | 357 B |
| antiox/sync/notify | tokio::sync::Notify | 934 B | 466 B |
| antiox/sync/barrier | tokio::sync::Barrier | 1.1 KB | 528 B |
| antiox/sync/once_cell | tokio::sync::OnceCell | 699 B | 355 B |
| antiox/panic | std::panic!, std::todo!, std::unreachable! | 273 B | 199 B |
| antiox/sync/drop_guard | tokio_util::sync::DropGuard | 200 B | 169 B |
| antiox/collections/deque | std::collections::VecDeque | 1.3 KB | 493 B |
| antiox/collections/binary_heap | std::collections::BinaryHeap | 994 B | 492 B |
Documentation
Max-heap priority queue with O(log n) push/pop.
import { BinaryHeap } from "antiox/collections/binary_heap";
const heap = new BinaryHeap<number>();
heap.push(3);
heap.push(1);
heap.push(5);
console.log(heap.pop()); // 5
console.log(heap.pop()); // 3Double-ended queue with O(1) push/pop from both ends.
import { Deque } from "antiox/collections/deque";
const dq = new Deque<number>();
dq.push(1);
dq.push(2);
dq.pushFront(0);
console.log(dq.shift()); // 0
console.log(dq.pop()); // 2Diverging functions for halting execution. Mirrors panic!, todo!, and unreachable! from Rust.
import { panic, todo, unreachable } from "antiox/panic";
// Halt with a message
if (!isValid) panic("invariant violated");
// Stub unfinished code
function processEvent(event: Event): Result {
switch (event.type) {
case "click": return handleClick(event);
case "hover": todo("hover support");
}
}
// Exhaustive type checking
type Direction = "north" | "south" | "east" | "west";
function move(dir: Direction) {
switch (dir) {
case "north": return [0, 1];
case "south": return [0, -1];
case "east": return [1, 0];
case "west": return [-1, 0];
default: unreachable(dir); // compile error if cases missed
}
}Async stream combinators. All functions take and return AsyncIterable<T>. Zero wrapper objects.
import { map, filter, bufferUnordered, collect, pipe, merge, chunks } from "antiox/stream";
const results = await collect(
bufferUnordered(
map(urls, (url) => fetch(url)),
10,
),
);
const processed = pipe(
source,
(s) => filter(s, (x) => x > 0),
(s) => map(s, (x) => x * 2),
(s) => chunks(s, 10),
);
for await (const item of merge(stream1, stream2, stream3)) {
console.log(item);
}N tasks wait, all released when the Nth arrives.
import { Barrier } from "antiox/sync/barrier";
const barrier = new Barrier(3);
const result = await barrier.wait();
if (result.isLeader()) console.log("I'm the leader");Multi-producer, multi-consumer bounded channel. Every receiver gets every message.
import { broadcast } from "antiox/sync/broadcast";
const [tx, rx1] = broadcast<string>(16);
const rx2 = tx.subscribe();
tx.send("hello");
console.log(await rx1.recv()); // "hello"
console.log(await rx2.recv()); // "hello"Tree-structured cancellation. Parent cancel propagates to all children.
import { CancellationToken } from "antiox/sync/cancellation_token";
const token = new CancellationToken();
const child = token.child();
spawn(async () => {
await child.cancelled();
console.log("cancelled!");
});
token.cancel(); // cancels token and childEnsure cleanup runs on dispose.
import { DropGuard } from "antiox/sync/drop_guard";
const guard = new DropGuard(() => cleanup());
// ... do work ...
// guard[Symbol.dispose]() runs cleanup
guard.disarm(); // or prevent cleanupMulti-producer, single-consumer channels with backpressure and disconnection detection. Mirrors tokio::sync::mpsc.
import { channel, unboundedChannel } from "antiox/sync/mpsc";
// Bounded channel with backpressure
const [tx, rx] = channel<string>(32);
await tx.send("hello");
const msg = await rx.recv(); // "hello"
// Clone senders for multi-producer
const tx2 = tx.clone();
await tx2.send("from tx2");
// Async iteration
for await (const msg of rx) {
console.log(msg);
}
// Unbounded channel (never blocks on send)
const [utx, urx] = unboundedChannel<number>();
utx.send(42); // sync, never blocksAsync mutex guaranteeing exclusive access across await points.
import { Mutex } from "antiox/sync/mutex";
const mutex = new Mutex({ count: 0 });
const guard = await mutex.lock();
guard.value = { count: guard.value.count + 1 };
guard.release();Simplest synchronization primitive. Wake one or all waiters.
import { Notify } from "antiox/sync/notify";
const notify = new Notify();
// In one task:
await notify.notified();
// In another:
notify.notifyOne();Async lazy initialization. Compute a value once, share across tasks.
import { OnceCell } from "antiox/sync/once_cell";
const cell = new OnceCell<Config>();
const config = await cell.getOrInit(async () => loadConfig());Single-use channel. Send exactly one value. Receiver is awaitable.
import { oneshot } from "antiox/sync/oneshot";
const [tx, rx] = oneshot<string>();
tx.send("done");
const value = await rx; // "done"Multiple concurrent readers OR one exclusive writer.
import { RwLock } from "antiox/sync/rwlock";
const lock = new RwLock({ data: "hello" });
const reader = await lock.read();
console.log(reader.value);
reader.release();
const writer = await lock.write();
writer.value = { data: "world" };
writer.release();Race multiple async branches, cancel losers. TypeScript narrows the result type.
import { select } from "antiox/sync/select";
import { sleep } from "antiox/time";
const result = await select({
msg: (signal) => rx.recv(),
timeout: (signal) => sleep(5000, signal),
});
if (result.key === "msg") {
console.log(result.value); // narrowed type
}Counting semaphore for limiting concurrency.
import { Semaphore } from "antiox/sync/semaphore";
const sem = new Semaphore(3);
const permit = await sem.acquire();
// ... do work ...
permit.release();Single-value broadcast. One sender updates a value, many receivers observe changes.
import { watch } from "antiox/sync/watch";
const [tx, rx] = watch("initial");
const rx2 = tx.subscribe();
tx.send("updated");
await rx.changed();
console.log(rx.borrowAndUpdate()); // "updated"Task spawning with cooperative cancellation via AbortSignal. Mirrors tokio::task.
import { spawn, JoinSet, yieldNow } from "antiox/task";
// Spawn a task (returns awaitable JoinHandle)
const handle = spawn(async (signal) => {
const res = await fetch("https://example.com", { signal });
return res.text();
});
const result = await handle;
// Abort a task
handle.abort();
// JoinSet for managing multiple tasks
const set = new JoinSet<number>();
set.spawn(async (signal) => 1);
set.spawn(async (signal) => 2);
set.spawn(async (signal) => 3);
for await (const result of set) {
console.log(result); // 1, 2, 3 (in completion order)
}
// Yield to event loop
await yieldNow();Timer primitives with AbortSignal integration.
import { sleep, timeout, interval, TimeoutError } from "antiox/time";
await sleep(1000);
try {
const data = await timeout(5000, fetchData());
} catch (e) {
if (e instanceof TimeoutError) console.log("timed out");
}
// All functions accept an optional AbortSignal for cancellation
const controller = new AbortController();
for await (const tick of interval(1000, controller.signal)) {
console.log(`Tick ${tick}`);
if (tick >= 4) break;
}Filling the Gaps
Rust crates that antiox doesn't cover, and what to use instead in TypeScript:
| Rust | TypeScript Replacement | Why |
|------|----------------------|-----|
| Result / Option | better-result | Typed Result/Option without wrapper overhead |
| tracing | pino | Structured logging, zero-overhead when disabled |
| serde | zod | Schema validation and parsing |
| reqwest | Native fetch | Built into the runtime |
| anyhow / thiserror | Native Error + cause | TS union types + instanceof |
Who's using this?
Wish List
tokio-console-like observabilitypinointegration
Why not Effect?
Effect is excellent, but antiox exists for a different niche:
- Lightweight enough to ship inside libraries: Effect's runtime is too heavy as a transitive dependency end users didn't opt into.
- Mirrors Rust/Tokio APIs: Same structure, naming, and control flow across both codebases — the TypeScript reads like the Rust it was ported from.
- No new DSL: Plain
async/await,AbortSignal, andAsyncIterator. No wrapper types, no effect system, no generator-based control flow.
Compatibility
See COMPATIBILITY.md for a detailed comparison of every module against its Rust/Tokio equivalent, including intentionally skipped APIs and reasons.
License
MIT
