@signal-kernel/async-runtime
v0.3.0
Published
<p align="center"> <img src="https://github.com/Luciano0322/signal-kernel/blob/main/assets/brands/async-icon.svg" alt="signal-kernel logo" width="120" /> </p> <h1 align="center">@signal-kernel/async-runtime</h1>
Maintainers
Readme
Async primitives built on top of @signal-kernel/core
@signal-kernel/async-runtime provides a set of high-level utilities for managing asynchronous state using the fine-grained reactive engine from @signal-kernel/core.
It exposes five primary capabilities:
createResource()– A source-driven async state primitive similar to Solid'screateResource, but built on a deterministic scheduler and cancellation model.createStreamResource()– A source-driven streaming async primitive for progressive visible state with stable committed value semantics.fromPromise()– Converts a cancellable Promise producer into a reactive async state.asyncSignal()– A convenient wrapper exposing both value and metadata (status, error, reload).createRevision()/createKeyedRevision()- Signal-backed invalidation sources for declarative async consistency.
This package does not depend on any frontend framework. It can be used in browser apps, server runtimes, CLI tools, or any JS environment.
Installation
npm install @signal-kernel/core @signal-kernel/async-runtimeOverview
This runtime implements a deterministic, cancelable, fine-grained async system that integrates fully with signal, computed, and effect.
Some highlights:
- Automatic cancellation of outdated async work.
- Stable
pending → success/error/cancelledstate transitions for single-shot async tasks. - Streaming-aware lifecycle support for progressive async output.
- Integration with the core scheduler ensures deterministic execution order.
- Work is isolated at the async node level (no global fetch manager).
1. createResource()
A source-driven async primitive similar to Solid's createResource, but implemented with:
- deterministic scheduling
- fine-grained dependency tracking
- automatic cancellation on source change
Signature
interface ResourceContext {
signal: AbortSignal;
token: number;
}
type ResourceOptions<T = unknown> = Omit<FromPromiseOptions<T>, "eager">;
createResource<I, T, E = unknown>({
input?: () => I;
observe?: () => void;
run: (input: I, ctx: ResourceContext) => Promise<T>;
trigger?: "auto";
} & ResourceOptions<T>): [() => T | undefined, AsyncMeta<E, T>]
createResource<I, T, E = unknown>({
trigger: "manual";
run: (input: I, ctx: ResourceContext) => Promise<T>;
invalidates?: (result: T, input: I) => InvalidationTarget[];
} & ResourceOptions<T>): [() => T | undefined, RunnableAsyncMeta<I, T, E>]
// v0.x compatibility shorthand
createResource<S, T, E = unknown>(
source: () => S,
fetcher: (sourceValue: S, ctx: ResourceContext) => Promise<T>,
options?: ResourceOptions<T>
): [() => T | undefined, AsyncMeta<E, T>]How it works
New code should prefer object form with
input,observe, andrun.input()is tracked viacreateEffect()and its return value is passed torun(input, ctx).observe()may track additional invalidation dependencies without passing them torun().When
input()orobserve()dependencies change:- The previous async work is canceled (
meta.cancel("source-changed")). - A new fetch begins with the latest input.
- The previous async work is canceled (
On first run, it automatically loads initial data.
Manual resources run only when
meta.run(input)is called.Manual resource
reload()reruns the most recentrun(input)input. Before any input exists, it is a no-op.Manual resource
invalidatestargets run only after a successful operation.Values and metadata update reactively.
eager is intentionally not part of ResourceOptions. Auto resources are driven by tracked graph dependencies; manual resources are driven by meta.run(input).
The older positional createResource(source, fetcher, options?) shape remains a v0.x compatibility shorthand, but object form is the primary documented API because it scales to input, observe, manual execution, and declarative invalidation.
Example
const id = signal(1);
const [user, meta] = createResource({
input: id.get,
run: async (id, ctx) => {
const res = await fetch(`/api/user/${id}`, {
signal: ctx.signal,
});
return res.json();
},
});
createEffect(() => {
console.log("User:", user());
console.log("Status:", meta.status());
});
// Changing input triggers new fetch
id.set(2);Declarative invalidation
const usersRevision = createRevision();
const [users] = createResource({
observe: () => {
usersRevision.get();
},
run: async (_input, ctx) => {
const res = await fetch("/api/users", { signal: ctx.signal });
return res.json();
},
});
const [, updateUserMeta] = createResource({
trigger: "manual",
run: async (payload: { id: string; name: string }, ctx) => {
const res = await fetch(`/api/users/${payload.id}`, {
method: "PUT",
body: JSON.stringify(payload),
signal: ctx.signal,
});
return res.json();
},
invalidates: () => [usersRevision],
});
await updateUserMeta.run({ id: "u1", name: "Alice" });Key features
- Cancel old requests automatically.
- Works perfectly with fine-grained reactivity in the core runtime.
- Fully deterministic, thanks to the two-phase scheduler.
- Supports declarative invalidation without a global query cache.
2. createStreamResource()
createStreamResource() is the streaming sibling primitive of createResource().
While createResource() models a single-shot async task that resolves once, createStreamResource() models an async task that can emit multiple chunks over time before reaching a final completion state.
It is intended for cases such as:
- LLM text streaming
- structured incremental generation
- server-sent events
- progressive aggregation
- long-running tasks with partial visible output
Signature
type StreamAsyncStatus =
| "idle"
| "pending"
| "streaming"
| "success"
| "error"
| "cancelled";
type StreamInterruptionPolicy =
| "keep-partial"
| "rollback"
| "clear";
interface StreamContext<TChunk, TValue> {
emit(chunk: TChunk): void;
set(value: TValue): void;
done(finalValue?: TValue): void;
isCancelled(): boolean;
}
createStreamResource<I, TChunk, TValue, E = unknown>({
input?: () => I;
observe?: () => void;
stream: (
input: I,
ctx: StreamContext<TChunk, TValue>
) => Promise<void> | void;
} & StreamResourceOptions<TChunk, TValue, E>): [
() => TValue | undefined,
StreamAsyncMeta<E, TValue>
]
// v0.x compatibility shorthand
createStreamResource<S, TChunk, TValue, E = unknown>(
source: () => S,
streamer: (
sourceValue: S,
ctx: StreamContext<TChunk, TValue>
) => Promise<void> | void,
options?: StreamResourceOptions<TChunk, TValue, E>
): [() => TValue | undefined, StreamAsyncMeta<E, TValue>]Core semantics
A stream resource separates visible accumulated value from stable committed value.
- The returned getter represents the currently visible accumulated value
stableValue()represents the last successfully committed valuestatus()can beidle,pending,streaming,success,error, orcancelled
This allows streaming UIs to expose partial output while still preserving a stable-state model for higher-level logic.
New stream code should prefer object form with input, observe, and stream. The older positional createStreamResource(source, streamer, options?) shape remains a v0.x compatibility shorthand.
Interruption policy
createStreamResource() supports explicit interruption policies for cancellation and error cases:
keep-partialrollbackclear
This means streaming behavior is not forced into a single model.
For example:
- text generation UIs may prefer
keep-partial - conservative state flows may prefer
rollback
Example
const prompt = signal("Explain signals simply");
const [text, meta] = createStreamResource({
input: prompt.get,
stream: async (input, ctx) => {
const chunks = ["Signals ", "track ", "dependencies."];
for (const chunk of chunks) {
if (ctx.isCancelled()) return;
await delay(50);
ctx.emit(chunk);
}
ctx.done();
},
initialValue: "",
reduce: (current = "", chunk) => current + chunk,
onCancel: "keep-partial",
onError: "rollback",
});
createEffect(() => {
console.log("Text:", text());
console.log("Stable:", meta.stableValue());
console.log("Status:", meta.status());
});Key features
- Supports progressive visible async state.
- Separates current visible value from last committed stable value.
- Tracks
input()andobserve()dependencies for stream resubscription. - Allows explicit cancellation/error policies (
keep-partial,rollback,clear). - Fits naturally into the same deterministic runtime model as
createResource().
3. fromPromise()
Signature
fromPromise<T, E = unknown>(
makePromise: (ctx: { signal: AbortSignal; token: number }) => Promise<T>,
options?: FromPromiseOptions<T>
): AsyncSignal<T, E>
fromPromise<I, T, E = unknown>({
run: (input: I, ctx: { signal: AbortSignal; token: number }) => Promise<T>;
keepPreviousValueOnPending?: boolean;
onSuccess?: (value: T) => void;
onError?: (error: unknown) => void;
onCancel?: (reason?: unknown) => void;
}): RunnableAsyncSignal<I, T, E>
fromPromise<I, T, E = unknown>({
eager: true;
initialInput: I;
run: (input: I, ctx: { signal: AbortSignal; token: number }) => Promise<T>;
keepPreviousValueOnPending?: boolean;
}): RunnableAsyncSignal<I, T, E>What it does
fromPromise() turns an async function into an async signal, exposing:
value(): T | undefinedstatus(): "idle" | "pending" | "success" | "error" | "cancelled"error(): E | undefinedreload()run(input)when using the descriptor formcancel(reason?)
It internally:
- Tracks only the latest Promise result (via token matching).
- Respects cancellation semantics.
- Integrates with
batch()from@signal-kernel/core.
Example
import { fromPromise } from "@signal-kernel/async-runtime";
const user = fromPromise(async (ctx) => {
const res = await fetch("/api/user", {
signal: ctx.signal,
});
return res.json();
});
createEffect(() => {
console.log("Status:", user.status());
console.log("Value:", user.value());
});Input-based work should use descriptor form so the runtime does not need to infer the producer shape:
const user = fromPromise({
run: async (id: string, ctx) => {
const res = await fetch(`/api/user/${id}`, {
signal: ctx.signal,
});
return res.json();
},
});
await user.run("u1");Function form is eager by default because it has no external input to wait for. Descriptor form is lazy by default because the runtime needs run(input) to establish the first input. If a descriptor should run immediately, pass both eager: true and initialInput.
4. asyncSignal()
Signature
asyncSignal<T, E = unknown>(
makePromise: (ctx: { signal: AbortSignal; token: number }) => Promise<T>,
options?: FromPromiseOptions<T>
): [() => T | undefined, AsyncMeta<E, T>]
asyncSignal<I, T, E = unknown>({
run: (input: I, ctx: { signal: AbortSignal; token: number }) => Promise<T>;
keepPreviousValueOnPending?: boolean;
}): [() => T | undefined, RunnableAsyncMeta<I, T, E>]
asyncSignal<I, T, E = unknown>({
eager: true;
initialInput: I;
run: (input: I, ctx: { signal: AbortSignal; token: number }) => Promise<T>;
keepPreviousValueOnPending?: boolean;
}): [() => T | undefined, RunnableAsyncMeta<I, T, E>]What it provides
asyncSignal() is a convenience layer on top of fromPromise() that splits output into:
value getter
meta object containing:
status()error()reload()cancel(reason?)keepPreviousValueOnPending
Example
const [user, meta] = asyncSignal(async (ctx) => {
const res = await fetch("/api/user", {
signal: ctx.signal,
});
return res.json();
});
createEffect(() => {
console.log("User:", user());
console.log("Loading:", meta.status() === "pending");
});For explicit input-based execution:
const [user, meta] = asyncSignal({
run: (id: string, ctx) => fetchUser(id, { signal: ctx.signal }),
});
await meta.run("u1");5. Types
The package exports the async-related types:
AsyncStatusAsyncSignal<T, E>AsyncMeta<E, T = unknown>RunnableAsyncMeta<I, T, E>FromPromiseOptions<T = unknown>FromPromiseDescriptor<I, T>AsyncSignalDescriptor<I, T>ResourceOptions<T = unknown>AutoResourceDescriptor<I, T>ManualResourceDescriptor<I, T>RevisionKeyedRevision<K>InvalidationTargetStreamAsyncStatusStreamInterruptionPolicyStreamContext<TChunk, TValue>StreamResourceOptions<TChunk, TValue, E>StreamResourceDescriptor<I, TChunk, TValue, E>StreamAsyncMeta<E, TValue>
These allow you to annotate higher-level abstractions or build your own async primitives.
6. Relationship to @signal-kernel/core
This package requires @signal-kernel/core because:
- Async values are stored in signals.
- Status/error/value transitions rely on
batch(). - Integration with the scheduler ensures effects run deterministically.
- Dependency tracking (
createEffect) keeps async flows reactive and incremental.
@signal-kernel/async-runtime does not introduce scheduling or graph logic—
it simply builds async capabilities on top of the stable core runtime.
7. Design Philosophy
Unlike global async managers or framework-specific query libraries, this runtime: ✔ Treats async values as data, not components ✔ Avoids global caches or keyed registries ✔ Keeps each async node isolated and deterministic ✔ Enables composition with computed/effect ✔ Supports both single-shot and streaming async primitives ✔ Leaves room for higher-level extensions such as server resources or async graphs
It is intentionally minimal while remaining robust enough to serve as a foundation for meta-frameworks.
Summary
@signal-kernel/async-runtime is a fine-grained, deterministic, cancelable async layer designed to pair with @signal-kernel/core.
It enables:
- Source-driven async resources
- Streaming async resources
- Promise-based reactive state
- Clear status/error handling
- Automatic cancellation
- Framework-agnostic integration
This package provides the essential async building blocks used throughout the Signal Kernel ecosystem.
