station-schedules
v1.0.4
Published
Runtime-editable schedule store and reconciler for station-signal and station-broadcast
Readme
station-schedules
Runtime-editable schedule store and reconciler shared by station-signal and station-broadcast. Lets users define, edit, enable/disable, and remove schedules at runtime — distinct from the file-defined .every() schedules in signals/broadcasts, which remain in code.
Install
pnpm add station-schedulesWhat's in the box
Schedule/SchedulePatchtypes — the persisted record.ScheduleAdapterinterface — pluggable storage.ScheduleMemoryAdapter— in-process implementation, useful for tests.ScheduleReconciler— the polling + claim + trigger loop, identical semantics for both runners.
Persistent adapter implementations live in their own packages:
station-adapter-sqlite/schedulesstation-adapter-postgres/schedulesstation-adapter-mysql/schedulesstation-adapter-redis/schedules
Schedule
type ScheduleKind = "signal" | "broadcast-static" | "broadcast-dynamic";
interface Schedule {
id: string;
kind: ScheduleKind;
/** signal name OR broadcast name OR dynamic broadcast name */
target: string;
/** parsed by station-signal's parseInterval — "5m", "1h", "100ms", "1w", … */
interval: string;
input?: unknown;
enabled: boolean;
nextRunAt: Date;
lastRunAt?: Date;
lastRunStatus?: string;
lastRunId?: string;
createdAt: Date;
updatedAt: Date;
createdBy?: string;
}ScheduleAdapter
interface ScheduleAdapter {
add(schedule: Schedule): Promise<void>;
get(id: string): Promise<Schedule | null>;
list(filter?: { kind?: ScheduleKind; enabled?: boolean; due?: boolean }): Promise<Schedule[]>;
update(id: string, patch: SchedulePatch): Promise<void>;
delete(id: string): Promise<boolean>;
/**
* Atomically advance `nextRunAt` only if the stored value still matches
* `expectedNextRunAt`. Returns true if the caller successfully claimed
* the schedule. Required for multi-instance correctness.
*/
claimDue?(id: string, expectedNextRunAt: Date, newNextRunAt: Date): Promise<boolean>;
generateId(): string;
ping(): Promise<boolean>;
close?(): Promise<void>;
}Adapters that don't implement claimDue will fall back to a non-atomic advance and emit a warning — fine for single-process dev, unsafe for multi-runner deployments.
ScheduleReconciler
The reconciler is what actually fires schedules. Each runner constructs one and ticks it from its main poll loop:
import { ScheduleReconciler } from "station-schedules";
import { parseInterval } from "station-signal";
const reconciler = new ScheduleReconciler({
adapter: scheduleAdapter,
kinds: ["signal"], // this reconciler only handles signal schedules
parseInterval,
triggerFn: (s) => signalRunner.triggerSignal(s.target, s.input ?? {}),
hasPendingOrRunning: (s) => signalRunner.hasPendingOrRunningForSignal(s.target),
onError: (err, schedule) => console.error("schedule error:", err, schedule?.id),
});
// In your tick loop:
await reconciler.tick();What tick() does
- Lists schedules with
enabled = trueandnextRunAt <= now. - For each schedule whose kind this reconciler handles:
- Calls
claimDue(id, currentNextRunAt, newNextRunAt). If another runner already claimed, bails — at-most-once. - Optionally checks
hasPendingOrRunningto skip overlapping runs (recordslastRunStatus = "skipped:overlap"). - Calls
triggerFn(schedule). - Records
lastRunAt,lastRunId, andlastRunStatus("triggered"or"errored").
- Calls
If triggerFn throws, the schedule still has its nextRunAt advanced (via the claim) and records an error status — schedules can never busy-loop on a recurring failure.
Multi-instance safety
When two Station processes share the same schedule store, the atomic claimDue ensures each fire happens on exactly one of them. Adapter implementations:
- SQLite —
UPDATE WHERE next_run_at = ?, single-writer DB serializes - Postgres —
UPDATE … RETURNING id, atomic across connections - MySQL —
UPDATE … WHERE …,affectedRows > 0decides the winner - Redis — Lua
EVALscript comparesZSCOREand updates atomically
The in-memory adapter is single-process only.
License
MIT
