abxbus
v2.5.17
Published
Event bus library for browsers and ESM Node.js
Downloads
107,875
Readme
abxbus: 📢 Production-ready multi-language event bus
AbxBus is an in-memory event bus library for async Python and TS (node/bun/deno/browser).
It's designed for quickly building resilient, predictable, complex event-driven apps.
It "just works" with an intuitive, but powerful event JSON format + emit API that's consistent across both languages and scales consistently from one event to millions (~0.2ms/event):
bus.on(SomeEvent, some_function)
bus.emit(SomeEvent({some_data: 132}))It's async native, has proper automatic nested event tracking, and powerful concurrency control options. The API is inspired by EventEmitter or emittery in JS, but it takes it a step further:
- nice Zod / Pydantic schemas for events that can be exchanged between both languages
- automatic UUIDv7s and monotonic nanosecond timestamps for ordering events globally
- built in locking options to force strict global FIFO processing or fully parallel processing
♾️ It's inspired by the simplicity of async and events in JS but with baked-in features that allow to eliminate most of the tedious repetitive complexity in event-driven codebases:
- correct timeout enforcement across multiple levels of events, including cancellation of awaited/blocking child work when a parent times out
- ability to strongly type hint and enforce the return type of event handlers at compile-time
- ability to queue events on the bus, or inline await them for immediate execution like a normal function call
- handles ~5,000 events/sec/core in both languages, with ~2kb/event RAM consumed per event during active processing
🔗 Links
- Issue Tracker: https://github.com/ArchiveBox/abxbus/issues
- Documentation: https://abxbus.archivebox.io
- DeepWiki: https://deepwiki.com/ArchiveBox/abxbus
- PyPI: https://pypi.org/project/abxbus/
- NPM: https://www.npmjs.com/package/abxbus
🔢 Quickstart
npm install abxbusimport { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const CreateUserEvent = BaseEvent.extend('CreateUserEvent', {
email: z.string(),
event_result_type: z.object({ user_id: z.string() }),
})
const bus = new EventBus('MyAuthEventBus')
bus.on(CreateUserEvent, async (event) => {
const user = await yourCreateUserLogic(event.email)
return { user_id: user.id }
})
const event = bus.emit(CreateUserEvent({ email: '[email protected]' }))
await event.wait()
console.log(await event.eventResult()) // { user_id: 'some-user-uuid' }✨ Features
The features offered in TS are broadly similar to the ones offered in the python library.
- Typed events with Zod schemas (cross-compatible with Pydantic events from python library)
- FIFO event queueing with configurable concurrency
- Nested event support with automatic parent/child tracking
- Cross-bus forwarding with loop prevention
- Handler result tracking + validation + timeout enforcement
- History retention controls (
max_history_size) for memory bounds - Optional
@retrydecorator for easy management of per-handler retries, timeouts, and semaphore-limited execution
See the Python README for more details.
📚 API Documentation
The main bus class that registers handlers, schedules events, and tracks results.
Constructor:
new EventBus(name?: string, options?: {
id?: string
max_history_size?: number | null
max_history_drop?: boolean
event_concurrency?: 'global-serial' | 'bus-serial' | 'parallel' | null
event_timeout?: number | null
event_slow_timeout?: number | null
event_handler_concurrency?: 'serial' | 'parallel' | null
event_handler_completion?: 'all' | 'first'
event_handler_slow_timeout?: number | null
event_handler_detect_file_paths?: boolean
})Constructor options
| Option | Type | Default | Purpose |
| --------------------------------- | ------------------------------------------------------- | -------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| id | string | uuidv7() | Override bus UUID (mostly for serialization/tests). |
| max_history_size | number \| null | 100 | Max events kept in event_history; null = unbounded; 0 = keep only in-flight events and drop completed events immediately. |
| max_history_drop | boolean | false | If true, when history is full drop oldest history entries (including uncompleted if needed). If false, reject new emits when history reaches max_history_size. |
| event_concurrency | 'global-serial' \| 'bus-serial' \| 'parallel' \| null | 'bus-serial' | Event-level scheduling policy, resolved at processing time when the event does not override it. |
| event_handler_concurrency | 'serial' \| 'parallel' \| null | 'serial' | Per-event handler scheduling policy, resolved at processing time when the event does not override it. |
| event_handler_completion | 'all' \| 'first' | 'all' | Event completion mode, resolved at processing time when the event does not override it. |
| event_timeout | number \| null | 60 | Default per-handler timeout budget in seconds, resolved at processing time when the event does not override it; 0 disables. |
| event_handler_slow_timeout | number \| null | 30 | Slow handler warning threshold in seconds, resolved at processing time when the event does not override it; 0 disables. |
| event_slow_timeout | number \| null | 300 | Slow event warning threshold in seconds, resolved at processing time when the event does not override it; 0 disables. |
| event_handler_detect_file_paths | boolean | true | Capture source file:line for handlers (slower, better logs). |
Unset event option fields stay unset on the event object. The bus that actually processes the event resolves its defaults at execution time, so forwarded events can inherit the target bus defaults.
Runtime state properties
id: stringname: stringlabel: string(${name}#${id.slice(-4)})handlers: Map<string, EventHandler>handlers_by_key: Map<string, string[]>event_history: Map<string, BaseEvent>pending_event_queue: BaseEvent[]in_flight_event_ids: Set<string>locks: LockManager
on()
on<T extends BaseEvent>(
event_pattern: string | '*' | EventClass<T>,
handler: EventHandlerCallable<T>,
options?: Partial<EventHandler>
): EventHandlerUse during startup/composition to register handlers.
Advanced options fields, these can be used to override defaults per-handler if needed:
handler_timeout?: number | nullhard delay before handler execution is aborted with aHandlerTimeoutErrorhandler_slow_timeout?: number | nulldelay before emitting a slow handler warning log linehandler_name?: stringoptional name to use instead ofanonymousif handler is an unnamed arrow functionhandler_file_path?: string | nulloptional path/to/source/file.js:lineno where the handler is defined, used for logging onlyid?: stringunique UUID for the handler (normally a hash of bus_id + event_pattern + handler_name + handler_registered_at)
Notes:
- Prefer class/factory keys (
bus.on(MyEvent, handler)) for typed payload/result inference. - String and
'*'matching are supported (bus.on('MyEvent', ...),bus.on('*', ...)). - Returns an
EventHandlerobject you can later pass tooff()to de-register the handler if needed.
off()
off<T extends BaseEvent>(
event_pattern: EventPattern<T> | '*',
handler?: EventHandlerCallable<T> | string | EventHandler
): voidUse when tearing down subscriptions (tests, plugin unload, hot-reload).
- Omit
handlerto remove all handlers forevent_pattern. - Pass handler function reference to remove one by function identity.
- Pass handler id (
string) orEventHandlerobject to remove by id. - use
bus.off('*')to remove all registered handlers from the bus
emit()
emit<T extends BaseEvent>(event: T): TBehavior notes:
- Per-event config fields stay on the event as provided; when unset (
null/undefined), each bus resolves its own defaults at processing time. - If same event ends up forwarded through multiple buses, it is loop-protected using
event_path. - Emit is synchronous and returns immediately with the same event object (
event.event_statusis initially'pending').
Normal lifecycle:
- Create event instance (
const event = MyEvent({...})). - Emit (
const queued = bus.emit(event)). - Await with
await queued.now()(immediate/queue-jump semantics) orawait queued.wait()(bus queue order). - Inspect
queued.event_results, or callawait queued.eventResult()/await queued.eventResultsList()if you need handler return values.
find()
find<T extends BaseEvent>(event_pattern: EventPattern<T> | '*', options?: FindOptions): Promise<T | null>
find<T extends BaseEvent>(
event_pattern: EventPattern<T> | '*',
where: (event: T) => boolean,
options?: FindOptions
): Promise<T | null>Where:
type FindOptions = {
past?: boolean | number // true to look through all past events, or number in seconds to filter time range
future?: boolean | number // true to wait for event to appear indefinitely, or number in seconds to wait for event to appear
child_of?: BaseEvent | null // filter to only match events that are a child_of: some_parent_event
} & {
// event_status: 'pending' | 'started' | 'completed'
// event_id: 'some-exact-event-uuid-here',
// event_started_at: string | null (exact iso datetime string or null)
// ... any event field can be passed to filter events using simple equality checks
[key: string]: unknown
}bus.find() returns the first matching event (in emit timestamp order).
To return all matching events (newest to oldest) instead of just the first, use
bus.filter(...) — same arguments as find() plus an optional limit cap on
the number of results.
where behavior:
Any filter predicate function in the form of (event) => true | false, returning true to consider the event a match.
const matching_event = bus.find(SomeEvent, (event) => event.some_field == 123)
// or to match all event types:
const matching_event = bus.find('*', (event) => event.some_field == 123)past behavior:
true: search all history.false: skip searching past event history.number: search events emitted within lastNseconds.
future behavior:
true: wait forever for future match.false: do not wait.number: wait up toNseconds.
Lifecycle use:
- Use for idempotency / de-dupe before emit (
past: ...). - Use for synchronization/waiting (
future: ...). - Combine both to "check recent then wait".
- Add
child_ofto constrain by parent/ancestor event chain. - Add any event field (e.g.
event_status,event_id,event_timeout,user_id) to filter by strict equality. - Use wildcard matching with predicates when you want to search all event types:
bus.find('*', (event) => ...).
Debouncing expensive events with find():
const some_expensive_event = (await bus.find(ExpensiveEvent, { past: 15, future: 5 })) ?? bus.emit(ExpensiveEvent({}))
await some_expensive_event.now()filter()
filter<T extends BaseEvent>(event_pattern: EventPattern<T> | '*', options?: FilterOptions): Promise<T[]>
filter<T extends BaseEvent>(
event_pattern: EventPattern<T> | '*',
where: (event: T) => boolean,
options?: FilterOptions
): Promise<T[]>Same arguments as find() (FilterOptions extends FindOptions with limit?: number | null),
but returns the list of all matching events ordered newest to oldest. find() is equivalent to
filter(..., { limit: 1 }) returning the first result or null.
const recent = await bus.filter(ResponseEvent, { past: 10, future: false, limit: 5 })Important semantics:
- Past lookup matches any emitted events, not just completed events.
- Past/future matches resolve as soon as event is emitted. If you need the completed event, await
event.now()or pass{event_status: 'completed'}to filter only for completed events. - If both
pastandfutureare omitted, defaults arepast: true, future: false. - If both
pastandfuturearefalse, it returnsnullimmediately. - Detailed behavior matrix is covered in
abxbus-ts/tests/EventBus_find.test.ts.
waitUntilIdle(timeout?)
await bus.waitUntilIdle() is the normal "drain bus work" call to wait until bus is done processing everything queued.
Pass an optional timeout in seconds (await bus.waitUntilIdle(5)) for a bounded wait.
bus.emit(OneEvent(...))
bus.emit(TwoEvent(...))
bus.emit(ThreeEvent(...))
await bus.waitUntilIdle() // this resolves once all three events have finished processing
await bus.waitUntilIdle(5) // wait up to 5 seconds, then continue even if work is still in-flightEmit styles from handlers
Most handler code should use await event.emit(ChildEvent({})).now(). That creates a linked child and marks it as blocking parent completion.
| Style | event_parent_id | event_blocks_parent_completion | Blocks current handler? | Effect |
| --- | --- | --- | --- | --- |
| await event.emit(ChildEvent({})).now() | Parent event id | true | Yes | Linked child work; parent completion waits too. |
| event.emit(ChildEvent({})) without awaiting | Parent event id | false | No | Linked background child; visible in ancestry but parent completion does not wait. |
| await bus.emit(TopLevelEvent({})).now() | null | false | Yes | Detached top-level event; the handler waits naturally because it is awaited. |
| bus.emit(TopLevelEvent({})) without awaiting | null | false | No | True detached background event with no retained parent relationship. |
Parent/child/event lookup helpers
eventIsChildOf(child_event: BaseEvent, paret_event: BaseEvent): boolean
eventIsParentOf(parent_event: BaseEvent, child_event: BaseEvent): boolean
findEventById(event_id: string): BaseEvent | nulltoString() / toJSON() / fromJSON()
toString(): string
toJSON(): EventBusJSON
EventBus.fromJSON(data: unknown): EventBustoString()returnsBusName#abcdstyle labels used in logs/errors.toJSON()exports full bus state snapshot (config, handlers, indexes, event_history, pending queue, in-flight ids, find-waiter snapshots).fromJSON()restores a new bus instance from that payload (handler functions are restored as no-op stubs).
logTree()
logTree(): stringlogTree()returns a full event log hierarchy tree diagram for debugging.
destroy()
destroy(clear?: boolean): Promise<void>
destroy(options?: { clear?: boolean }): Promise<void>cleardefaults totrue.await bus.destroy()clears handlers/history/pending events/in-flight state/find waiters/locks and removes this bus from global tracking.await bus.destroy({ clear: false })stops runtime work and resolves waiters but keeps handlers/history for inspection. The bus is still destroyed and cannot be used again.- Destroy/GC behavior is exercised in
abxbus-ts/tests/EventBus.test.tsandabxbus-ts/tests/EventBus_performance.test.ts.
Base class + factory builder for typed event models.
Define your own strongly typed events with BaseEvent.extend('EventName', {...zod fields...}):
const MyEvent = BaseEvent.extend('MyEvent', {
some_key: z.string(),
some_other_key: z.number(),
// ...
// any other payload fields you want to include can go here
// known event_* fields are event metadata/options; unknown event_* fields are rejected
event_result_type: z.string().optional(),
event_timeout: 60,
// ...
})
const pending_event = MyEvent({ some_key: 'abc', some_other_key: 234 })
const queued_event = bus.emit(pending_event)
const completed_event = await queued_event.now()API behavior and lifecycle examples:
abxbus-ts/examples/simple.tsabxbus-ts/examples/immediate_event_processing.tsabxbus-ts/examples/forwarding_between_busses.tsabxbus-ts/tests/EventBus.test.tsabxbus-ts/tests/EventBus_find.test.tsabxbus-ts/tests/EventHandler_first.test.tsabxbus-ts/tests/BaseEvent.test.tsabxbus-ts/tests/EventBus_timeout.test.tsabxbus-ts/tests/EventResult.test.ts
Event configuration fields
Special configuration fields you can set on each event to control processing:
event_result_type?: z.ZodTypeAny | String | Number | Boolean | Array | Objectevent_version?: string(default:'0.0.1'; useful for your own schema/data migrations)event_timeout?: number | nullevent_handler_timeout?: number | nullevent_slow_timeout?: number | nullevent_handler_slow_timeout?: number | nullevent_concurrency?: 'global-serial' | 'bus-serial' | 'parallel' | nullevent_handler_concurrency?: 'serial' | 'parallel' | nullevent_handler_completion?: 'all' | 'first'
When these fields are null/unset, they are resolved from the current processing bus at execution time and are not eagerly written onto the event.
Runtime state fields
event_id,event_type,event_versionevent_path: string[](bus labels likeBusName#ab12)event_parent_id: string | nullevent_emitted_by_handler_id: string | nullevent_blocks_parent_completion: booleanevent_status: 'pending' | 'started' | 'completed'event_results: Map<string, EventResult>event_pending_bus_count: numberevent_created_at: stringevent_started_at: string | nullevent_completed_at: string | null
Read-only attributes
event_parent->BaseEvent | undefinedevent_children->BaseEvent[]event_descendants->BaseEvent[]event_errors->Error[]event_result->EventResultType<this> | undefined
now()
now(options?: { first_result?: boolean; timeout?: number | null }): Promise<this>- If called from inside a running handler, it queue-jumps child processing immediately.
- If called outside handler context, it waits for normal completion (or processes immediately if already next).
{ first_result: true }resolves when the first valid result is available; other handlers keep running.{ timeout }limits this wait call only. Useevent_timeout: 0/event_handler_timeout: 0to disable execution timeouts.- Rejects if event is not attached to a bus (
event has no bus attached). - Queue-jump behavior is demonstrated in
abxbus-ts/examples/immediate_event_processing.tsandabxbus-ts/tests/BaseEvent.test.ts.
wait()
wait(options?: { first_result?: boolean; timeout?: number | null }): Promise<this>- Waits for completion in normal runloop order.
- Use inside handlers when you explicitly do not want queue-jump behavior.
- Supports the same
{ first_result, timeout }wait-shaping options asnow().
First result
event.now({ first_result: true }).eventResult(): Promise<EventResultType<this> | undefined>- Resolves as soon as the first valid result is available.
- Does not change
event_handler_completion; all handlers keep running unless the event definition explicitly setsevent_handler_completion: 'first'. - Result filtering and error policy are controlled by
eventResult(...), notnow()/wait(). - Cancellation and winner-selection behavior is covered in
abxbus-ts/tests/EventHandler_first.test.ts.
eventResult(options?)
eventResult(
options?: {
include?: (result: EventResultType<this> | undefined, event_result: EventResult<this>) => boolean
raise_if_any?: boolean
raise_if_none?: boolean
}
): Promise<EventResultType<this> | undefined>- Returns the first matching handler result in handler registration order.
- If the event is still pending and has no settled results, it first waits through
now({ first_result: true }). - Uses the same filtering and error policy options as
eventResultsList(...).
eventResultsList(options?)
eventResultsList(
options?: {
include?: (result: EventResultType<this> | undefined, event_result: EventResult<this>) => boolean
raise_if_any?: boolean
raise_if_none?: boolean
}
): Promise<Array<EventResultType<this> | undefined>>- Returns handler result values in
event_resultsorder. - Default filter includes completed non-
null/non-undefinednon-error, non-forwarded (BaseEvent) values. raise_if_anydefaults totrueand throws when any handler result has an error.raise_if_nonedefaults tofalseand throws when no results matchinclude.- If every handler errors, only
{ raise_if_any: false, raise_if_none: false }suppresses the error; every other option combination rejects. - A single handler error rejects as that error; multiple handler errors reject as
AggregateError. - Examples:
await event.eventResultsList({ raise_if_any: false, raise_if_none: false })await event.eventResultsList({ include: (result) => typeof result === 'object', raise_if_any: false })
eventResultUpdate(handler, options?)
eventResultUpdate(
handler: EventHandler | EventHandlerCallable<this>,
options?: {
eventbus?: EventBus
status?: 'pending' | 'started' | 'completed' | 'error'
result?: EventResultType<this> | BaseEvent | undefined
error?: unknown
}
): EventResult<this>- Creates (if missing) or updates one
event_resultsentry for the given handler id. - Useful for deterministic seeding/rehydration paths before resuming normal dispatch.
- Example:
const seeded = event.eventResultUpdate(handler_entry, { eventbus: bus, status: 'pending' })seeded.update({ status: 'completed', result: 'seeded' })
reset()
reset(): this- Returns a fresh event copy with runtime state reset to pending so it can be emitted again safely.
- Original event object is unchanged.
- Generates a new UUIDv7
event_idfor the returned copy. - Clears runtime completion state (
event_results, status/timestamps, captured async context, done signal, local bus binding).
toString() / toJSON() / fromJSON()
toString(): string
toJSON(): BaseEventData
BaseEvent.fromJSON(data: unknown): BaseEvent
EventFactory.fromJSON?.(data: unknown): TypedEvent- JSON format is cross-language compatible with Python implementation.
event_result_typeis serialized as JSON Schema when possible and rehydrated onfromJSON.- In TypeScript-only usage,
event_result_typecan be any Zod schema shape or base type likenumber | string | boolean | etc.. For cross-language roundtrips, object-like schemas (including PythonTypedDict/dataclass-style shapes) are reconstructed on Python as Pydantic models, JSON object keys are always strings, and some fine-grained string-shape constraints may be normalized between Zod and Pydantic. - Round-trip coverage is in
abxbus-ts/tests/EventResult.test.tsandabxbus-ts/tests/EventBus.test.ts.
Each handler execution creates one EventResult stored in event.event_results.
Main fields
id: string(uuidv7 string)status: 'pending' | 'started' | 'completed' | 'error'event: BaseEventhandler: EventHandlerresult: EventResultType<this> | undefinederror: unknown | undefinedstarted_at: string | null(ISO datetime string)completed_at: string | null(ISO datetime string)event_children: BaseEvent[]
Read-only getters
event_id->stringuuiv7 of the event the result is forbus->EventBusinstance it's associated withhandler_id->stringuuidv5 of theEventHandlerhandler_name->string | 'anonymous'function name of the handler methodhandler_file_path->string | nullpath/to/file.js:lineno where the handler method is definedeventbus_name->stringname, same asthis.bus.nameeventbus_id->stringuuidv7, same asthis.bus.ideventbus_label->stringlabel, same asthis.bus.labelvalue->EventResultType<this> | undefinedalias ofthis.resultraw_value->anyraw result value before schema validation, available when handler return value validation failshandler_timeout->numberseconds before handler execution is aborted (precedence: handler config -> event config -> bus level defaults)handler_slow_timeout->numberseconds before logging a slow execution warning (same prececence ashandler_timeout)
toString() / toJSON() / fromJSON()
toString(): string
toJSON(): EventResultJSON
EventResult.fromJSON(event, data): EventResultRepresents one registered handler entry on a bus. You usually get these from bus.on(...), then pass them to bus.off(...) to remove.
Main fields
idunique handler UUIDv5 (deterministic hash from bus/event/handler metadata unless overridden)handlerfunction reference that executes for matching eventshandler_namefunction name (or'anonymous')handler_file_pathdetected source path (~/path/file.ts:line) ornullhandler_timeoutoptional timeout override in seconds (null/undefinedinherits,0disables)handler_slow_timeoutoptional slow-warning threshold in seconds (null/undefinedinherits,0disables)handler_registered_atISO timestampevent_patternsubscribed key ('SomeEvent'or'*')eventbus_namebus name where this handler was registeredeventbus_idbus UUID where this handler was registered
toString() / toJSON() / fromJSON()
toString(): string
toJSON(): EventHandlerJSON
EventHandler.fromJSON(data: unknown, handler?: EventHandlerCallable): EventHandlertoString()returnshandlerName() (path:line)when path/name are available, otherwisefunction#abcd().toJSON()emits only serializable handler metadata (never function bodies).fromJSON()reconstructs the handler entry and accepts an optional real function to re-bind execution behavior.
🏃 Runtimes
abxbus-ts supports all major JS runtimes.
- Node.js (default development and test runtime)
- Browsers (ESM)
- Bun
- Deno
Browser support notes
- The package output is ESM (
./dist/esm) which is supported by all browsers released after 2018 AsyncLocalStorageis preserved at emit and used during handling when available (Node/Bun), otel/tracing context will work normally in those environments
Performance comparison (local run, per-event)
Measured locally on an Apple M4 Pro with:
pnpm run perf:node(node v22.21.1)pnpm run perf:bun(bun v1.3.9)pnpm run perf:deno(deno v2.6.8)pnpm run perf:browser(chrome v145.0.7632.6)
| Runtime | 1 bus x 50k events x 1 handler | 500 buses x 100 events x 1 handler | 1 bus x 1 event x 50k parallel handlers | 1 bus x 50k events x 50k one-off handlers | Worst case (N buses x N events x N handlers) |
| ------------------ | ------------------------------ | ---------------------------------- | --------------------------------------- | ----------------------------------------- | -------------------------------------------- |
| Node | 0.015ms/event, 0.6kb/event | 0.058ms/event, 0.1kb/event | 0.021ms/handler, 3.8kb/handler | 0.028ms/event, 0.6kb/event | 0.442ms/event, 0.9kb/event |
| Bun | 0.011ms/event, 2.5kb/event | 0.054ms/event, 1.0kb/event | 0.006ms/handler, 4.5kb/handler | 0.019ms/event, 2.8kb/event | 0.441ms/event, 3.1kb/event |
| Deno | 0.018ms/event, 1.2kb/event | 0.063ms/event, 0.4kb/event | 0.024ms/handler, 3.1kb/handler | 0.064ms/event, 2.6kb/event | 0.461ms/event, 7.9kb/event |
| Browser (Chromium) | 0.030ms/event | 0.197ms/event | 0.022ms/handler | 0.022ms/event | 1.566ms/event |
Notes:
kb/eventis peak RSS delta per event during active processing (most representative of OS-visible RAM in Activity Monitor / Task Manager, withEventBus.max_history_size=1)- In
1 bus x 1 event x 50k parallel handlersstats are shown per-handler for clarity,0.02ms/handler * 50k handlers ~= 1000msfor the entire event - Browser runtime does not expose memory usage directly, in practice memory performance in-browser is comparable to Node (they both use V8)
👾 Development
git clone https://github.com/ArchiveBox/abxbus abxbus && cd abxbus
cd ./abxbus-ts
pnpm install
prek install # install pre-commit hooks
prek run --all-files # run pre-commit hooks on all files manually
pnpm lint
pnpm test