@sebspark/emulator
v0.4.0
Published
Helper for building emulators or test fakes.
Readme
@sebspark/emulator
Helper for building emulators or test fakes.
Overview
This package provides a generic, type-safe emulator engine. The idea is that you wrap it in a concrete emulator that adapts a real transport (HTTP, Pub/Sub, gRPC, etc.) to the emulator's simple request/response model. Tests then configure the emulator to respond in specific ways, without needing a real backend.
Real transport (Pub/Sub message, HTTP request, WebSocket, …)
│
▼
Your emulator adapter ← decodes, calls emulator.handle(...)
│
▼
createEmulator() ← dispatches to registered responders
│
▼
Your test ← registers responders with .reply() / .callback() / .stream()The two-layer design
createEmulator() is just an in-memory responder registry — it has no server, no network, no lifecycle. Your emulator adapter owns the transport (HTTP server, WebSocket server, Pub/Sub subscriber, etc.) and calls emulator.handle(...) when a real request arrives.
This separation is intentional and important:
- The transport is expensive. Starting and stopping an HTTP server or connecting a Pub/Sub subscriber takes time. You create it once and keep it running for the lifetime of the test suite (or the fake service).
- The responder registry is cheap. Registering and consuming handlers is pure in-memory. You do this per test, not per server start.
The rule: transport lifecycle in beforeAll/afterAll, responder setup in beforeEach or in the test itself.
let payments: PaymentEmulator
beforeAll(() => { payments = startPaymentEmulator(server) })
afterAll(() => payments.dispose())
beforeEach(() => {
// Register the responses this specific test needs.
// No response registered = emulator throws = test fails clearly.
payments.authorise().reply({ authCode: 'ABC123', status: 'approved' })
})
afterEach(() => {
payments.reset() // clear all responders for next test
})Never call startPaymentEmulator (or createEmulator) inside beforeEach. That restarts the transport on every test, defeating the purpose.
The emulator throwing on an unregistered handler is a feature, not a bug. If your test triggers a call you didn't set up, the throw tells you immediately. Avoid silent .persist() fallbacks for normal request/response flows — they mask missing test setup.
The exception is when the system under test makes background calls that are not the subject of the test — health check pollers, heartbeats, and similar. Those calls fire continuously regardless of what the test is doing. For those, register a .persist() baseline in beforeAll so they are always answered, then use LIFO one-shot overrides in specific tests that are testing the degraded behaviour:
beforeAll(() => {
payments = startPaymentEmulator(server)
// Health poller fires every 10s — always answer it so tests don't fail spuriously
payments.ping().persist().reply({ ok: true })
})
afterAll(() => payments.dispose())
// Normal test — one-shot, explicit, throws if missing
it('approves a payment', async () => {
payments.authorise().reply({ authCode: 'ABC123', status: 'approved' })
// ...
})
// Health test — LIFO override sits on top of the persist baseline
it('reports degraded when ping fails', async () => {
payments.ping().reply(() => { throw new Error('connection refused') })
// override fires once, then persist baseline resumes for subsequent polls
})Use cases
1. Automated tests
The primary use case. The emulator runs as a fake upstream during your test suite. The transport starts once; each test registers the responses it needs.
See the examples below.
2. Persistent fake service for local development or manual QA
The emulator can run as a standing replacement for a real upstream — a deployable fake that your application code talks to using production-ready client code. No test framework involved.
Register .persist() responders at startup and the service handles requests indefinitely without restarting:
const payments = startPaymentEmulator(server)
// Always approve — no test framework, just a running fake
payments.authorise().persist().reply({ authCode: 'FAKE-123', status: 'approved' })
payments.refund().persist().reply({ success: true })Because the transport and the responder registry are separate, you can also expose an admin endpoint that lets you change responses at runtime — swap the responders without restarting the server.
Example: request/response — payment gateway
The simplest case: one request, one response. A payment gateway is a natural fit.
import { createEmulator, disposable, type Disposable } from '@sebspark/emulator'
type PaymentMethodMap = {
authorise: {
args: { amount: number; currency: string }
resp: { authCode: string; status: 'approved' | 'declined' }
}
refund: {
args: { authCode: string; amount: number }
resp: { success: boolean }
}
}
export type PaymentEmulator = Disposable<
ReturnType<typeof createEmulator<PaymentMethodMap>>
>
export const startPaymentEmulator = (server: HttpServer): PaymentEmulator => {
const emulator = createEmulator<PaymentMethodMap>()
server.on('POST /authorise', async (req, res) => {
await emulator.handle('authorise', req.body, async (response) => {
res.json(response)
})
})
server.on('POST /refund', async (req, res) => {
await emulator.handle('refund', req.body, async (response) => {
res.json(response)
})
})
return disposable(emulator, () => server.close())
}In tests:
it('returns an auth code on approval', async () => {
payments.authorise().reply({ authCode: 'ABC123', status: 'approved' })
const result = await client.authorise({ amount: 100, currency: 'SEK' })
expect(result.authCode).toBe('ABC123')
})Example: streaming — chatbot over WebSocket
When a single request triggers a series of responses, use a streaming responder. A WebSocket chatbot that emits tokens one at a time is a natural fit.
import { createEmulator, disposable, type Disposable } from '@sebspark/emulator'
import { WebSocketServer, type WebSocket } from 'ws'
type ChatMethodMap = {
chat: {
args: { sessionId: string; message: string }
resp: { token: string; done: boolean }
}
}
export type ChatEmulator = Disposable<
ReturnType<typeof createEmulator<ChatMethodMap>>
>
export const createChatEmulator = (port: number): ChatEmulator => {
const emulator = createEmulator<ChatMethodMap>()
const wss = new WebSocketServer({ port })
wss.on('connection', (ws: WebSocket) => {
ws.on('message', async (data) => {
const args = JSON.parse(data.toString()) as ChatMethodMap['chat']['args']
await emulator.handle('chat', args, async (resp) => {
ws.send(JSON.stringify(resp))
})
})
})
return disposable(emulator, () => wss.close())
}Fixed reply with .callback()
Use .callback() when the full sequence of tokens is known upfront:
bot.chat().callback((_args, cb) => {
cb({ token: 'Sure', done: false })
cb({ token: ', here', done: false })
cb({ token: ' you go.', done: true })
})Test-driven streaming with .stream()
Use .stream(initializer) when the test needs to drive responses at its own pace — for example to assert state between tokens, or simulate a correction mid-stream.
.stream() returns a StreamHandle immediately, before any request has arrived. You capture it, trigger the SUT, then synchronise with waitForCall().
Correct sequence:
- Register
.stream(initializer)— capture the handle - Trigger the SUT action that sends the request
await handle.waitForCall()— blocks until the request arrives and the initializer firesawait handle.send(prev => ...)— push responses one at a time
Calling send() before waitForCall() resolves throws immediately — there is no open channel yet.
| Member | Type | Description |
|---|---|---|
| waitForCall(timeoutMs?) | (timeoutMs?: number) => Promise<void> | Resolves when the next request has arrived and the initializer has fired. Rejects with Error: waitForCall() timed out after {n}ms — no request arrived if no request arrives within timeoutMs ms (default: 5000) |
| send(modifier) | (fn: (prev) => Resp) => Promise<void> | Derives and sends the next response from the last one. Throws if called before waitForCall() resolves. |
| latestResponse | Resp \| undefined | The most recent response sent, or undefined before the first waitForCall() |
| hasBeenCalled | boolean | true once the first request has arrived and waitForCall() has resolved |
it('streams a correction mid-reply', async () => {
const stream = bot
.chat()
.stream(() => ({ token: 'Paris is in Germany.', done: false }))
const received: string[] = []
client.chat({ sessionId: 's1', message: 'Where is Paris?' }, (r) => {
received.push(r.token)
})
await stream.waitForCall()
expect(stream.latestResponse).toEqual({ token: 'Paris is in Germany.', done: false })
await stream.send(() => ({ token: 'Sorry — Paris is in France.', done: true }))
expect(received).toEqual([
'Paris is in Germany.',
'Sorry — Paris is in France.',
])
})Sequential streams with .times(n)
The lifetime modifier caps how many requests the responder accepts. Each waitForCall() picks up the next one. send() and latestResponse are always scoped to the stream resolved by the most recent waitForCall().
const stream = bot.chat().twice().stream(() => ({ token: 'Hello!', done: false }))
// First connection
client.chat({ sessionId: 's1', message: 'hi' }, onToken)
await stream.waitForCall()
await stream.send(() => ({ token: 'How can I help?', done: true }))
// Second connection
client.chat({ sessionId: 's2', message: 'hello' }, onToken)
await stream.waitForCall()
await stream.send(() => ({ token: 'Welcome back.', done: true }))
// Third connection → throws, responder exhaustedCalling send() before waitForCall() resolves throws immediately:
const stream = bot.chat().stream(() => ({ token: 'init', done: false }))
await stream.send(...) // throws: No active stream — call waitForCall() firstAPI reference
Single response — .reply()
Register a static response or a function. The responder is consumed after one use.
// Static response
payments.authorise().reply({ authCode: 'ABC123', status: 'approved' })
// Computed from the request
payments.authorise().reply((args) => ({
authCode: `CODE-${args.amount}`,
status: args.amount > 0 ? 'approved' : 'declined',
}))Streaming responses — .callback()
bot.chat().callback((_args, cb) => {
cb({ token: 'Sure', done: false })
cb({ token: ', here', done: false })
cb({ token: ' you go.', done: true })
})Externally-driven streaming — .stream()
See the chatbot example above. StreamHandle<R> is a named export if you need to type a helper:
import { type StreamHandle } from '@sebspark/emulator'
function driveStream(handle: StreamHandle<ChatResp>) { ... }Lifetime control
By default, a responder is consumed after one use. Control this with:
| Method | Behaviour |
|---|---|
| .reply(...) / .callback(...) / .stream(...) | One-time (default) |
| .once().reply(...) | One-time (explicit) |
| .twice().reply(...) | Two uses |
| .thrice().reply(...) | Three uses |
| .times(n).reply(...) | n uses |
| .persist().reply(...) | Unlimited uses — never consumed |
.persist() has three distinct uses:
1. LIFO fallback in tests — register a permanent default, then push one-shot overrides on top for specific tests:
// Always decline...
payments.authorise().persist().reply({ authCode: '', status: 'declined' })
// ...except the very next call
payments.authorise().reply({ authCode: 'ABC123', status: 'approved' })
// First call → approved (override consumed)
// Second call → declined (fallback)
// Third call → declined (fallback)2. Always-on background calls — health checks, heartbeats, or any call the running system makes continuously. Register once in beforeAll so every poll always gets a valid response. Override with a one-shot for degraded-state tests:
beforeAll(() => {
// Health poller fires immediately and every N seconds — always give it a 200
healthBackend.ping().persist().reply({ ok: true })
})
it('reports degraded when backend is down', async () => {
// LIFO: this fires once, then the persist fallback resumes
healthBackend.ping().reply(() => { throw new Error('connection refused') })
const res = await fetch('/health/ready')
expect(res.status).toBe(503)
})3. Persistent fake service — when the emulator runs as a standing fake for local development or manual QA (not inside a test suite at all), .persist() is the only sensible mode. See Use cases.
Responders are matched in LIFO order — the most recently registered matching responder wins.
Filters
Pass a filter function to restrict which requests a responder handles:
payments
.authorise((args) => args.currency === 'SEK')
.reply({ authCode: 'SEK-OK', status: 'approved' })
payments
.authorise()
.reply({ authCode: 'OTHER', status: 'declined' })Unhandled requests
If a request arrives with no matching responder registered, the emulator throws immediately rather than returning a silent default:
await payments.authorise(...)
// throws: No responder found for .authorise(...)Direct invocation — .execute()
Each registration returns an .execute() helper for triggering the responder directly without going through the transport:
const { execute } = payments
.authorise()
.reply({ authCode: 'TEST', status: 'approved' })
const result = await execute({ amount: 100, currency: 'SEK' })
// result → { authCode: 'TEST', status: 'approved' }Inspecting and resetting
pending — count unspent responders
payments.authorise().pending returns the number of responders currently registered for that method that have not yet been fully consumed.
A .persist() responder counts as 1 pending regardless of how many times it has fired. If you register one persistent default in beforeAll and one one-shot override per test, pending will be 2 before the override fires and 1 after — the persistent baseline is always present.
This is most useful in afterEach to catch leftover one-shot setup — a one-time responder registered in a test but never triggered indicates a test that didn't exercise what it intended:
afterEach(() => {
// Checks that no one-shot responders were left unexercised.
// If you have a .persist() baseline, pending will be 1 here, not 0.
expect(payments.authorise().pending).toBe(0)
expect(payments.refund().pending).toBe(0)
}).reset() — clear registered responders
payments.authorise().reset() removes all responders for that method. payments.reset() removes all responders across every method.
The per-method form is useful when you want to swap out a responder mid-test:
payments.authorise().persist().reply({ authCode: 'DEFAULT', status: 'approved' })
// Later in the test, replace it entirely
payments.authorise().reset()
payments.authorise().reply({ authCode: 'OVERRIDE', status: 'declined' })Use the emulator-level reset for blanket teardown in afterEach:
afterEach(() => {
payments.reset()
})Cleanup
disposable() adds .dispose() and the Symbol.dispose / Symbol.asyncDispose symbols for using / await using (Node 20+).
// Explicit
await payments.dispose()
// Or with the `using` keyword (TypeScript 5.2+, Node 20+)
await using payments = startPaymentEmulator(server)
// automatically disposed when the block exits