@ntxdev/job-system
v0.1.0
Published
Durable job queue with TypeORM persistence, per-type locking, and progress tracking.
Readme
Job system — usage and integration
Portable NestJS job runtime: usage, registration, and integration. Exports are listed in index.ts. File links below resolve to siblings of docs/ inside the job-system tree.
Overview
The job system is database-backed: each job is a row in job_system_jobs (JobEntity). Long-running work runs in process; the DB stores status, progress, JSON payload (data), scheduling, and soft-delete metadata.
- Immediate jobs:
registerJob({ runImmediately: true })claims the row and runs the handler on the next tick. - Scheduled jobs:
registerJobwithrunImmediately: falseleaves the row inscheduleduntilscheduledAt;JobSystemService.scheduleDueJobs(a@Cronevery second) picks due rows, uses per-job-type locking viajob_type_lock(JobTypeLockEntity) andensure-job-lock, then starts the handler outside the transaction.
Exports: index.ts re-exports JobSystemModule, JobSystemService, entities, Progress, types, and JOB_SYSTEM_CONNECTION.
Portability and "ready to ship"
What portable means here: the job-system folder is written so you can vendor it (copy, git subtree, or internal monorepo package) into another Nest app. It is not published as a standalone npm package in this repo; you wire tsconfig paths or a workspace package name yourself.
What a greenfield Nest app needs: peer-style dependencies already used by this code (@nestjs/common, @nestjs/core, @nestjs/typeorm, @nestjs/schedule, typeorm, plus your DB driver), ScheduleModule.forRoot() at the root, a TypeORM connection whose name matches JobSystemModule.forRoot({ connectionName }), migrations (or equivalent) for the two tables, one forRoot import for that connection (see below), and a registrar that calls startCheckingJobs() after registering handlers.
Caveats for production: the runtime assumes one Node process (or coordinated scaling) is draining scheduled jobs for a given DB; multiple app instances sharing one queue is supported only insofar as your DB and lock semantics allow concurrent scheduleDueJobs (per-type locks reduce duplicate work, but you should still understand deployment topology). There is no built-in distributed leader election.
NestJS imports: one forRoot per connection
JobSystemModule.forRoot is registered as a global dynamic module (@Global() + global: true). After you import it once (typically from AppModule, DatabaseModule, or similar), JobSystemService is injectable in any module without listing JobSystemModule again in feature imports.
Still call forRoot only once per connectionName. If two unrelated modules each call JobSystemModule.forRoot({ connectionName: 'app_data' }), Nest can still register duplicate providers and you get the same failure mode as before: split handler maps and two scheduleDueJobs crons. The global flag fixes discovery (injection), not accidental double registration.
Multiple feature modules are fine: registrars and services in different modules all share one JobSystemService as long as only one forRoot ran for that connection. Re-exporting JobSystemModule from a host module is optional (useful for documentation or tests); it is no longer required for injection.
Job type strings (keys) and multiple teams
The job.type column and the string passed to registerJobHandler / registerContextFactory / registerJob must match. That string is also the per-type lock key in job_type_lock: only one running job of that type is intended at a time (see ensure-job-lock).
Uniqueness: there is no namespacing in the library. If two registrars (or two apps sharing one DB) register the same type, the last registerJobHandler / registerContextFactory wins (plain object assignment). Scheduled rows created with 'report' will use whichever handler is registered under 'report' when they run.
Convention: prefix types with a bounded context or product area, e.g. billing:invoice-sync, reports:weekly-pdf, so multiple modules or services never collide. If two deployments share one database, treat type like a global enum agreed across teams.
Components
| Piece | Role |
| ------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| JobSystemModule.forRoot({ connectionName }) | Global module: registers JobEntity + JobTypeLockEntity on the named TypeORM connection and provides JobSystemService; injectable app-wide after a single forRoot. |
| JobSystemService | registerJobHandler(type, factory) — factory(jobData) returns a JobHandlerFn. registerContextFactory(type, factory) — factory(job) returns the context object passed to the handler. registerJob(...) — enqueue / run. getJobStatus, getJob, cancelJob, helpers for success/failure with messages. onModuleInit — marks stuck processing jobs as failed on boot. startCheckingJobs() — sets an internal flag so scheduleDueJobs actually processes scheduled work (must be called after registration). |
| job-system.types.ts | JobHandlerFn: (update, isCancelled, context) => Promise<void>. UpdateFn: progress + optional message + optional ETA ms. JobContextFactory: async (job: JobEntity) => JobContext. |
| Progress | Optional class that wraps UpdateFn with weighted phases, throttling, and ETA hints for multi-step jobs. |
| ensure-job-lock.ts | Internal: pessimistic row lock on job_type_lock for MySQL/Postgres/etc., or an in-process mutex per type for SQLite (e.g. in-memory test DBs). Ensures one running job per type when draining the schedule. |
| job-system-job.model.ts | JobSystemJobStatus — shape returned by getJobStatus for APIs. |
| prettify-ms.ts | Formats millisecond durations for status display. |
Registering and organizing jobs in host applications
Keep orchestration (JobSystemModule) in job-system; keep wiring (which services each job needs) in the host Nest module that owns that domain.
A common pattern is a dedicated registrar injectable: one class that implements OnModuleInit, injects JobSystemService plus domain services, registers every handler and context factory, then calls startCheckingJobs(). That class lives in the host application, not inside job-system.
Recommended pattern
One registrar class implementing NestJS
OnModuleInit, provided from the host module that owns those jobs (for exampleOrdersModuleorReportingModule). Application-specific registrars belong in the host project, not mixed into the portablejob-systemsources.Stable string keys for
job.type(human-readable or slug) stored in a constant map, e.g.JobTypes.my_job.key, used consistently in:jobSystemService.registerJobHandler(JobTypes.my_job.key, MyJobHandlerFactory)jobSystemService.registerContextFactory(JobTypes.my_job.key, async (job) => ({ ...deps, job }))- Callers of
registerJob({ type: JobTypes.my_job.key, ... })
The handler factory receives
job.data(parsed JSON from the row) and returns aJobHandlerFn. Example factory/handler shape:// my-job.ts (host app) import { JobHandlerFn } from './job-system'; // or package path import { JobEntity } from './job-system'; interface MyJobContext { job: JobEntity; ordersService: OrdersService; } export function createMyJobHandler(): JobHandlerFn<MyJobContext> { return async (update, isCancelled, context) => { const { job, ordersService } = context; // use job.data, ordersService, update(), isCancelled() }; } // In registrar: // registerJobHandler('My Job', () => createMyJobHandler()); // registerContextFactory('My Job', async (job) => ({ // job, // ordersService: this.ordersService, // }));Registration order: register all handlers and all context factories first, then call
jobSystemService.startCheckingJobs()once at the end ofonModuleInit. UntilstartCheckingJobs()runs,scheduleDueJobsis a no-op (checkJobsis false).One registrar per bounded context (or per application) for a given set of
typestrings — avoid registering the sametypetwice.Export the key map (e.g.
JobTypes) from the host feature module or a barrel so HTTP/GraphQL layers use the same strings as the databasejob.typecolumn.
Minimal module sketch (pseudo-code)
Root (once): register the job system next to your TypeORM root config.
// app.module.ts
@Module({
imports: [
ScheduleModule.forRoot(),
TypeOrmModule.forRootAsync({
/* name: 'app_data', entities: [..., JobEntity, JobTypeLockEntity] */
}),
JobSystemModule.forRoot({ connectionName: 'app_data' }),
],
})
export class AppModule {}Feature module: no JobSystemModule import needed — inject JobSystemService directly.
// feature.module.ts
@Module({
imports: [
/* TypeOrmModule.forFeature(..., 'app_data'), other imports — not JobSystemModule */
],
providers: [FeatureJobRegistrarService /* domain services */],
})
export class FeatureModule {}
// feature-job-registrar.service.ts
@Injectable()
export class FeatureJobRegistrarService implements OnModuleInit {
constructor(
private readonly jobSystem: JobSystemService,
private readonly someService: SomeService,
) {}
onModuleInit() {
this.jobSystem.registerJobHandler('My Job', (data) =>
MyJob(data),
);
this.jobSystem.registerContextFactory('My Job', async (job) => ({
someService: this.someService,
job,
}));
this.jobSystem.startCheckingJobs();
}
}Handler author checklist
- Signature:
(update, isCancelled, context) => Promise<void>(JobHandlerFn). - Payload: read from
context.job(JobEntity); JSON input is injob.data. - Progress: call
update(fraction, message?, timeToCompleteMs?)and/or useProgresswith a phase weight map. - Cancellation: poll
await isCancelled(); cooperative cancel sets flags on the row — exit or throwError('Job canceled by ...')as appropriate. - Errors: uncaught exceptions mark the job failed; the runtime updates the row.
- Context factory: must provide every dependency the handler reads (services,
DataSource, config,job, etc.).
Enqueueing and monitoring
Inject JobSystemService and call:
await jobSystem.registerJob({
type: 'My Job', // must match registerJobHandler / registerContextFactory
data: {
/* serializable */
},
name: 'Display name',
role: 'user-role',
email: '[email protected]',
runImmediately: true, // optional; default false → scheduled
scheduledAt: new Date(), // optional; defaults to now
uploadedFileURL: '...', // optional
});Other useful methods: getJobStatus(jobId), getJob, getRunningJobByType, getLatestJobByType, cancelJob, SucceedJobWithJsonString, failJobWithJsonString, etc. (see job-system.service.ts).
Integrating into a new NestJS project (migrations only, no synchronize)
Assume TypeORM with synchronize: false and migrations for schema changes.
1. Dependencies
@nestjs/common, @nestjs/core, @nestjs/typeorm, @nestjs/schedule, typeorm, and your DB driver (e.g. mysql2, pg).
2. Root module
Import ScheduleModule.forRoot() in AppModule (or equivalent). The job scheduler uses @Cron on scheduleDueJobs; without the schedule module, cron handlers will not run.
3. TypeORM connection
Configure TypeOrmModule.forRoot / forRootAsync with:
synchronize: falseentitiesincludingJobEntityandJobTypeLockEntity(import from the job-system package path)name: a connection name string, e.g.'app_data', used consistently everywheremigrationspath andmigrationsRun/ CLI workflow as you prefer
4. Schema migrations
Create tables that match the entities (column types differ by DB; do not copy SQL blindly across dialects):
job_system_jobs— UUID primary key,type,data(JSON),status,progress, timestamps, soft-delete columndeleted_at, indexes as declared onJobEntity.job_type_lock— primary keytype(varchar),lockedUntil,updatedAtperJobTypeLockEntity.
Practical approach: use TypeORM CLI migration:generate (or equivalent) against a development database with these entities loaded, then review and commit the generated migration. Adjust for your SQL dialect.
5. Feature module
Import:
JobSystemModule.forRoot({ connectionName: 'app_data' });connectionName must match the name passed to TypeOrmModule.forRoot.
6. Registrar
Implement OnModuleInit as in Registering and organizing jobs; end with startCheckingJobs().
7. Optional: DataSource token
If handlers need the same DataSource instance for transactions, register a custom provider in the host module that re-exports the connection, for example:
{
provide: 'APP_DATA_SOURCE',
useExisting: getDataSourceToken('app_data'),
}Inject that token in the registrar and pass it through context factories. Replace 'app_data' and 'APP_DATA_SOURCE' with your own names.
Pitfalls
| Issue | Symptom / fix |
| ---------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| JobSystemModule.forRoot() more than once (same connectionName) | Two JobSystemService instances, handlers split across them, double @Cron polling. Global registration does not prevent this. Fix: one forRoot in the root module (see NestJS imports). |
| Duplicate forFeature([JobEntity, JobTypeLockEntity], name) | Register these entities only via JobSystemModule.forRoot, not again in the host module's forFeature for the same connection. |
| Missing ScheduleModule.forRoot() | Scheduled jobs never drain; cron never runs. |
| Missing startCheckingJobs() | scheduleDueJobs returns immediately; scheduled rows are never claimed. |
| type mismatch | registerJob({ type: 'X' }) must equal the string passed to registerJobHandler and registerContextFactory; otherwise the job fails with "No handler registered" or "No context factory registered". |
| Duplicate type across registrars | Last registration wins; mysterious behavior if two teams use the same string. Fix: namespaced type strings (see Job type strings). |
| Boot with stuck processing rows | onModuleInit marks them failed; ensure graceful shutdown where possible. |
| connectionName ≠ TypeORM name | Repository injection fails or wrong database at runtime. They must be the same string. |
| JobSystemService cannot be injected | forRoot never ran in the app's module graph, or a lazy-loaded subtree without globals. Do not add a second forRoot to "fix" it — ensure a single forRoot is imported from AppModule (or equivalent). |
FAQ — common mistakes when wiring the host app
These are the issues that most often show up when someone new to the codebase integrates jobs.
"Scheduled jobs stay scheduled forever."
Checklist: ScheduleModule.forRoot() in the root module; registrar called startCheckingJobs() after all registerJobHandler / registerContextFactory calls; server actually running (cron is in-process).
"Error: No handler registered for job type …"
The type on the row does not match any key passed to registerJobHandler. Typos, copy-paste, or a different JobSystemService instance (duplicate forRoot) cause this.
"Error: No context factory registered …"
You registered a handler but not registerContextFactory for the same type string.
"TypeORM can't find repository for JobEntity."connectionName in forRoot does not match TypeOrmModule.forRoot({ name: '...' }), or the connection is not registered yet (import order: register TypeORM before or alongside the module that imports JobSystemModule).
"I added JobEntity to forRoot entities but migrations are empty."
Point the TypeORM CLI / DataSource at the same entity list and connection; then generate migrations from a dev DB. The library does not ship SQL files.
"Tests pass with SQLite but production behaves differently."ensure-job-lock uses an in-process mutex for SQLite and row locks for typical server databases. Concurrency guarantees differ; do not assume test behavior matches MySQL/Postgres under load.
"Should each microservice have its own job_system_jobs table?"
Usually yes (one DB per service) unless you intentionally share a queue and coordinate all type strings and deploys globally.
"Where does JOB_SYSTEM_CONNECTION get used?"
It is provided as the configured connection name for consumers that need to inject the same string elsewhere; optional for basic use.
Source layout (paths relative to docs/)
| Path | Purpose |
| --------------------------------------------------- | ------------------------- |
| job-system.module.ts | Dynamic JobSystemModule |
| job-system.service.ts | Runtime API |
| job-system.types.ts | Handler contracts |
| progress.ts | Optional progress helper |
| index.ts | Public exports |
