@cxai/job
v0.3.0
Published
Async iterable → Y.Map job engine. Toggle-controlled, progress-tracked. Standalone — depends only on yjs.
Maintainers
Readme
@cxai/job
Async iterable → Y.Map job engine. Consumes any AsyncIterable<{ item, progress? }> and writes items into a Yjs map. Toggle-controlled via Y.Map observer, with progress tracking.
Install
npm install @cxai/jobUsage
Standalone
import * as Y from "yjs";
import { createJob } from "@cxai/job";
const doc = new Y.Doc();
const job = createJob({
doc,
path: "users",
iterable: async function* () {
for (const user of await fetchUsers()) {
yield { item: user, progress: 50 };
}
},
});
// Start via toggle (or remotely via Yjs sync)
job.trigger();
// Or control via Y.Map directly
doc.getMap("@job.users.toggle").set("value", true);
// Check status
console.log(job.getStatus()); // { status: "running", progress: 42 }
console.log(job.getItems());
// Stop / clear / cleanup
job.stop();
job.clear();
job.destroy();Combined with @cxai/faker
import { createFakerIterable } from "@cxai/faker";
import { createJob } from "@cxai/job";
const job = createJob({
doc,
path: "orders",
iterable: () => createFakerIterable({ id: "{{string.uuid}}", total: "{{commerce.price}}" }, { count: 50 }),
});
job.trigger();Combined with @cxai/yapi
import { createApp } from "@cxai/yapi";
import { createJob } from "@cxai/job";
const job = createJob({ doc, path: "events", iterable: mySource });
const api = createApp(doc, { routes: { "/events": { collection: "events", sse: {} } } });
// SSE streams items as the job pushes themReal-world: App Root Discovery
A job that probes paths on a domain using yMapIterate from @cxai/stream — each probe added to a Y.Map gets fetched automatically:
import * as Y from "yjs";
import { createJob } from "@cxai/job";
import { createApp } from "@cxai/yapi";
import { yMapIterate } from "@cxai/stream/yjs";
import { mapAsync } from "@cxai/stream/iterator";
const doc = new Y.Doc();
const probesMap = doc.getMap("probes");
// Pure function: fetch a URL, return result
async function checkProbe(url: string) {
const res = await fetch(url, { redirect: "follow" });
return {
id: new URL(url).pathname,
path: new URL(url).pathname,
status: res.status,
ok: res.ok,
contentType: res.headers.get("content-type") || "unknown",
timestamp: new Date().toISOString(),
};
}
// Job: iterate probes Y.Map → checkProbe each → write results to Y.Map("github.com")
const changes = yMapIterate(probesMap);
const results = mapAsync(changes, ([key]) => checkProbe(`https://github.com${key}`));
const job = createJob({
doc,
path: "github.com",
iterable: {
[Symbol.asyncIterator]: () => ({
async next() {
const { value, done } = await results.next();
if (done || !value) return { done: true, value: undefined };
return { done: false, value: { item: value } };
},
}),
},
});
job.trigger();
// Seed probes — job picks them up immediately
doc.transact(() => {
for (const path of ["/", "/api", "/health", "/robots.txt", "/graphql"]) {
probesMap.set(path, { id: path, path });
}
});
// Push more later — job processes them too
setTimeout(() => probesMap.set("/login", { id: "/login", path: "/login" }), 5000);
// Serve results via yapi (JSON/HTML/SSE)
const api = createApp(doc, {
routes: {
"/:domain": {
collection: { template: "{{domain}}" },
select: "list",
json: { template: { results: "{{list}}", count: "{{count}}" } },
sse: { event: "result.{{action}}" },
},
},
});
// GET /github.com → JSON results
// GET /github.com?format=sse → live stream as probes completeHTTP App (@cxai/job/app)
import jobApp from "@cxai/job/app";
import { Hono } from "hono";
const app = new Hono();
app.route("/jobs", jobApp);
// POST /jobs/users/start — trigger job
// POST /jobs/users/stop — stop job
// POST /jobs/users/push — push item manually
// POST /jobs/users/clear — clear items + reset
// GET /jobs/users/status
// GET /jobs/users/items
// GET /jobs/status — all jobsAPI Reference
createJob<T>(options): Job<T>
| Option | Type | Description |
|--------|------|-------------|
| doc | Y.Doc | Yjs document |
| path | string | Y.Map key for items |
| iterable | AsyncIterable \| () => AsyncIterable | Data source |
| statusPath? | string | Override status map path (default: @job.{path}.status) |
| togglePath? | string | Override toggle map path (default: @job.{path}.toggle) |
Job<T> methods
| Method | Description |
|--------|-------------|
| trigger() | Set toggle to true, starting the job |
| stop() | Abort the running iterable |
| isRunning() | Returns boolean |
| getStatus() | Returns { status, progress?, lastSyncTime? } |
| getItems() | Returns T[] from the Y.Map |
| clear() | Stop + clear items + reset status |
| destroy() | Remove observers, abort if running |
getJob<T>(doc, path): Job<T>
Retrieve/observe a job by its path in an existing doc.
JobStatus
type JobStatus = { status: "idle" | "running" | "completed" | "error"; progress?: number; lastSyncTime?: string }Examples
@cxai/example-job-app-discovery— Job app with discovery routes@cxai/example-faker-yapi-combined— Faker + yapi + job combined
See Also
@cxai/faker— Template-based fake data generator@cxai/yapi— Content-negotiated Y.Map server
