dic-nj
v0.1.4
Published
Zero-dependency DI container with NestJS-style decorators and Stage 3 TypeScript decorators
Maintainers
Readme
dic-nj
Dependency Injection container with NestJS-style decorators, cron tasks, and worker threads. Built on Stage 3 TypeScript decorators — no
reflect-metadataneeded.
@Injectable()
class UserService {
private readonly repo = inject(UserRepository);
findAll() { return this.repo.findAll(); }
}
@Module({ providers: [UserRepository, UserService], exports: [UserService] })
class AppModule {}
const app = await Application.create(AppModule);
app.get(UserService).findAll();Features
- No external dependencies — metadata stored in
WeakMap - Stage 3 decorators — TypeScript 5.0+, no
experimentalDecorators - NestJS-style API —
@Module,@Injectable,InjectionToken, dynamic modules - Field injection —
inject()as the primary pattern - All provider shapes —
useClass,useValue,useFactory,useExisting - Async providers —
useFactory: async () => …fully supported - Scopes —
SINGLETON(default) andTRANSIENT - Cron tasks —
@Taskdecorator withregisterCronTasks/shutdownCronTasks/getJob - Worker threads —
@Worker+@Handlewith typed proxy viaspawnWorker<T> - Worker pool —
createWorkerPool<T>with least-busy routing andAsyncLocalStoragepropagation - Typed channels — Go-style
createChannel<T>for streaming between threads - Shared memory —
SharedAtomicValuebacked bySharedArrayBuffer + Atomics - Broadcast bus —
createBus<T>for worker ↔ worker fan-out viaBroadcastChannel - Dual ESM / CJS output — built with zshy
Installation
npm install dic-njRequirements: Node ≥ 18 or Bun, TypeScript ≥ 5.0
Quick start
TypeScript config
{
"compilerOptions": {
"target": "ES2022",
"strict": true
// do NOT set "experimentalDecorators": true
}
}Bootstrap
import { Application } from 'dic-nj';
const app = await Application.create(AppModule);
const svc = app.get(UserService);Dependency Injection
@Injectable + inject()
import { Injectable, inject, InjectionToken } from 'dic-nj';
interface AppConfig { port: number; dbUrl: string }
const CONFIG = new InjectionToken<AppConfig>('AppConfig');
@Injectable()
class DatabaseService {
private readonly config = inject(CONFIG);
connect() { return `Connected to ${this.config.dbUrl}`; }
}
@Injectable()
class UserService {
private readonly db = inject(DatabaseService);
findAll() { this.db.connect(); return []; }
}Constructor injection with deps
import { optional } from 'dic-nj';
@Injectable({ deps: [OrderRepository, optional(LogService)] })
class OrderService {
constructor(
private readonly repo: OrderRepository,
private readonly logger?: LogService,
) {}
}Modules
@Module({
providers: [
DatabaseService,
UserService,
{ provide: CONFIG, useValue: { port: 3000, dbUrl: 'postgres://localhost/mydb' } },
],
exports: [UserService],
})
class AppModule {}Providers
// useClass — swap implementation
{ provide: IRepo, useClass: PostgresRepo }
// useValue — constants and config objects
{ provide: CONFIG, useValue: { port: 3000 } }
// useFactory — computed or async values
{
provide: DatabaseService,
useFactory: async (config: Config) => {
const db = new DatabaseService(config.dsn);
await db.connect();
return db;
},
inject: [CONFIG],
}
// useExisting — alias one token to another
{ provide: ILogger, useExisting: MyLogger }Scopes
import { Scope } from 'dic-nj';
@Injectable({ scope: Scope.TRANSIENT })
class RequestContext {}Dynamic modules
@Module({})
class DatabaseModule {
static forRoot(dsn: string): DynamicModule {
return {
module: DatabaseModule,
providers: [
{ provide: DSN_TOKEN, useValue: dsn },
DatabaseService,
],
exports: [DatabaseService],
global: true, // visible to every module without explicit imports
};
}
}
@Module({ imports: [DatabaseModule.forRoot(process.env.DB_URL!)] })
class AppModule {}Cron tasks
Requires Bun. Uses
Bun.cronnatively; falls back tocronon Node.js (npm install cron).
Define tasks
import { Injectable, inject, Task } from 'dic-nj';
@Injectable()
class ScheduleTasks {
private readonly logger = inject(LoggerService);
@Task('heartbeat', '* * * * *')
async heartbeat() {
this.logger.log('ping');
}
@Task('cleanup', '0 3 * * *') // every day at 03:00
async cleanup() {
await this.db.purgeOldRecords();
}
}Register and shutdown
import { registerCronTasks, shutdownCronTasks } from 'dic-nj';
const app = await Application.create(AppModule);
await registerCronTasks([app.get(ScheduleTasks)]);
process.on('SIGINT', () => {
shutdownCronTasks();
process.exit(0);
});Get a job handle
getJob(id) retrieves a handle for any registered task by its ID.
Call it any time after registerCronTasks.
import { getJob } from 'dic-nj';
// run immediately, outside the schedule (e.g. on startup or via HTTP endpoint)
await getJob('cleanup')?.runNow();
// pause a job temporarily
getJob('heartbeat')?.stop();
// resume it
getJob('heartbeat')?.start();| Method | Description |
|---|---|
| runNow() | Invoke the task immediately, outside its schedule |
| stop() | Pause the job (does not remove it) |
| start() | Resume a stopped job |
Worker threads
Uses node:worker_threads — works on both Node.js and Bun.
Worker file
Define the worker class with @Worker and mark public methods with @Handle.
@Worker automatically registers the worker when the module is loaded.
// math.worker.ts
import { Worker, Handle } from 'dic-nj';
@Worker
class MathWorker {
@Handle
async fibonacci(n: number): Promise<number> {
if (n <= 1) return n;
let a = 0, b = 1;
for (let i = 2; i <= n; i++) [a, b] = [b, a + b];
return b;
}
@Handle
async primes(limit: number): Promise<number[]> {
const sieve = new Uint8Array(limit + 1).fill(1);
// ... sieve of Eratosthenes
return [...sieve.keys()].filter(i => sieve[i]);
}
}
export type { MathWorker };Spawn and call from main thread
spawnWorker<T> returns a WorkerProxy<T> — all @Handle methods become async and are fully typed.
import { Injectable, inject, spawnWorker } from 'dic-nj';
import type { WorkerProxy } from 'dic-nj';
import type { MathWorker } from './math.worker';
@Injectable()
class MathService {
private readonly worker: WorkerProxy<MathWorker> = spawnWorker<MathWorker>(
new URL('./math.worker.ts', import.meta.url),
);
async fibonacci(n: number) { return this.worker.fibonacci(n); }
async primes(limit: number) { return this.worker.primes(limit); }
async shutdown() { await this.worker.terminate(); }
}Concurrent calls are safe — each request gets a unique requestId correlated to its response:
const [f10, f20, f30] = await Promise.all([
math.fibonacci(10),
math.fibonacci(20),
math.fibonacci(30),
]);Passing initial data (Node.js workerData)
spawnWorker<MyWorker>(url, { data: { config: '...' } });
// In the worker file:
import { workerContext } from 'dic-nj';
const ctx = workerContext<Config>();
console.log(ctx.initialData); // { config: '...' }Worker pool
createWorkerPool distributes calls across N workers using least-busy routing.
Each resolve/reject is bound to the caller's async context via AsyncResource.bind,
so AsyncLocalStorage (trace IDs, request context) propagates correctly.
The worker file is identical to a regular @Worker class — the pool is a detail of the main thread only.
// image.worker.ts — same pattern as spawnWorker, nothing pool-specific here
import { Worker, Handle } from 'dic-nj';
@Worker
class ImageWorker {
@Handle
async resize(buffer: Buffer, width: number, height: number): Promise<Buffer> {
// CPU-heavy — runs in its own thread, won't block the event loop
return runSharpResize(buffer, width, height);
}
@Handle
async thumbnail(buffer: Buffer): Promise<Buffer> {
return runSharpResize(buffer, 320, 240);
}
}
export type { ImageWorker };// image.service.ts
import { Injectable } from 'dic-nj';
import { createWorkerPool } from 'dic-nj';
import type { WorkerPoolProxy } from 'dic-nj';
import type { ImageWorker } from './image.worker';
@Injectable()
export class ImageService {
// pool of 4 workers — one per CPU core
private readonly pool: WorkerPoolProxy<ImageWorker> = createWorkerPool<ImageWorker>(
new URL('./image.worker.ts', import.meta.url),
{ size: 4 },
);
// 16 uploads processed in parallel — ~4× faster than a single worker
async processUploads(buffers: Buffer[]): Promise<Buffer[]> {
return Promise.all(buffers.map(buf => this.pool.resize(buf, 800, 600)));
}
async shutdown() { await this.pool.terminate(); }
}spawnWorker vs createWorkerPool:
| | spawnWorker | createWorkerPool |
|---|---|---|
| Workers | 1 | N (configurable) |
| Concurrent calls | Queued on 1 thread | Distributed across N threads |
| Best for | Single background task, sequential work | CPU-bound, many concurrent requests |
| Speedup | 1× | ~N× for tasks > 5ms |
When to use a pool: when you have many concurrent CPU-bound tasks — image/video processing, PDF rendering, crypto, compression. A pool of N gives ~N× throughput. If your task is fast (< 1ms), a single worker is usually sufficient since postMessage overhead dominates.
Typed channel (Go-style)
createChannel<T> wraps a MessageChannel pair with a typed async-iterator interface.
Use it to stream data from a worker — progress updates, chunks, event sequences —
instead of a single request/response.
// main.ts
import { Worker } from 'node:worker_threads';
import { createChannel } from 'dic-nj';
const [local, remote] = createChannel<{ progress: number; chunk: Buffer }>();
const worker = new Worker(new URL('./processor.worker.ts', import.meta.url), {
workerData: { port: remote.port, fileUrl: '...' },
transferList: [remote.port],
});
// consume the stream as it arrives
for await (const { progress, chunk } of local) {
console.log(`${progress}% done`);
if (progress === 100) break;
}
await worker.terminate();// processor.worker.ts
import { workerData } from 'node:worker_threads';
import { channelFromPort } from 'dic-nj';
const ch = channelFromPort<{ progress: number; chunk: Buffer }>(workerData.port);
for (let i = 1; i <= 10; i++) {
const chunk = await readNextChunk();
ch.send({ progress: i * 10, chunk });
}
ch.close();When to use: long-running tasks with incremental output (file parsing, ML inference, data export).
RPC (spawnWorker) returns one result; a channel streams many.
Shared memory
SharedAtomicValue wraps a SharedArrayBuffer cell with atomic operations.
Zero-copy: the buffer is shared across threads without serialisation.
// main.ts
import { Worker } from 'node:worker_threads';
import { SharedAtomicValue } from 'dic-nj';
const activeRequests = new SharedAtomicValue(0);
// pass the raw buffer — no serialisation, zero-copy
const worker = new Worker(new URL('./handler.worker.ts', import.meta.url), {
workerData: { activeRequestsBuffer: activeRequests.buffer },
});
console.log(activeRequests.get()); // read from main thread at any time// handler.worker.ts
import { workerData } from 'node:worker_threads';
import { SharedAtomicValue } from 'dic-nj';
const activeRequests = new SharedAtomicValue(0, workerData.activeRequestsBuffer);
export async function handleRequest(req: Request) {
activeRequests.add(1);
try {
return await process(req);
} finally {
activeRequests.sub(1);
}
}| Method | Description |
|---|---|
| get() | Atomic read |
| set(n) | Atomic write |
| add(delta) | Atomic increment, returns new value |
| sub(delta) | Atomic decrement, returns new value |
| compareAndSwap(expected, next) | CAS — returns true if swapped |
| .buffer | The SharedArrayBuffer to pass to workers |
When to use: shared counters (active connections, rate-limit slots, progress),
lightweight flags, or anywhere you need cross-thread state without postMessage overhead.
Broadcast bus (worker ↔ worker)
createBus<T> wraps BroadcastChannel — one sender, all subscribers receive the message.
Workers talk to each other directly without routing through the main thread.
// cache.worker.ts (one of N pool workers)
import { createBus } from 'dic-nj';
const bus = createBus<{ key: string }>('cache-invalidation');
// when this worker updates the cache, tell all siblings
async function updateCache(key: string, value: unknown) {
cache.set(key, value);
bus.emit({ key }); // fan-out to every worker on this channel
}
// receive invalidations from other workers
bus.on(({ key }) => cache.delete(key));When to use: cache invalidation across a worker pool, broadcasting config reloads, or any 1:N event fan-out between workers without a central coordinator.
Combining tools
The primitives are designed to be combined. Below are the common patterns.
Pool + Bus: cache invalidation across workers
One worker updates a shared cache and broadcasts the key to all siblings via createBus.
Workers talk directly without going through the main thread.
// cache.worker.ts
import { Worker, Handle } from 'dic-nj';
import { createBus } from 'dic-nj';
const bus = createBus<{ key: string }>('cache');
bus.on(({ key }) => localCache.delete(key)); // receive from siblings
@Worker
class CacheWorker {
@Handle
async set(key: string, value: unknown): Promise<void> {
localCache.set(key, value);
bus.emit({ key }); // fan-out to all other workers
}
@Handle
async get(key: string): Promise<unknown> {
return localCache.get(key);
}
}
export type { CacheWorker };// main thread
const pool = createWorkerPool<CacheWorker>(url, { size: 4 });
await pool.set('user:1', { name: 'Alice' }); // one worker updates + broadcasts
const user = await pool.get('user:1'); // any worker can answer from local cachePool + SharedAtomicValue: live metrics without postMessage
Pass a SharedArrayBuffer to every worker via workerData. Workers increment counters directly — zero serialisation, readable from main thread at any time.
// worker.ts
import { Worker, Handle, workerContext, SharedAtomicValue } from 'dic-nj';
const { activeBuffer, totalBuffer } = workerContext<{
activeBuffer: SharedArrayBuffer;
totalBuffer: SharedArrayBuffer;
}>().initialData!;
const active = new SharedAtomicValue(0, activeBuffer);
const total = new SharedAtomicValue(0, totalBuffer);
@Worker
class ApiWorker {
@Handle
async handleRequest(req: Request): Promise<Response> {
active.add(1);
total.add(1);
try {
return await processRequest(req);
} finally {
active.sub(1);
}
}
}
export type { ApiWorker };// main thread — poll metrics without any postMessage
const active = new SharedAtomicValue(0);
const total = new SharedAtomicValue(0);
const pool = createWorkerPool<ApiWorker>(url, {
size: 4,
data: { activeBuffer: active.buffer, totalBuffer: total.buffer },
});
setInterval(() => {
console.log(`active: ${active.get()} total: ${total.get()}`);
}, 1000);Pool + Channel: streaming progress from a long task
Use a channel to stream progress while the pool processes heavy work.
// processor.worker.ts
import { Worker, Handle, workerContext } from 'dic-nj';
import { channelFromPort } from 'dic-nj';
import type { MessagePort } from 'node:worker_threads';
@Worker
class ProcessorWorker {
@Handle
async process(fileUrl: string, progressPort: MessagePort): Promise<string> {
const ch = channelFromPort<number>(progressPort);
const chunks = await readFile(fileUrl);
for (let i = 0; i < chunks.length; i++) {
await processChunk(chunks[i]);
ch.send(Math.round(((i + 1) / chunks.length) * 100)); // 0–100%
}
ch.close();
return 'done';
}
}
export type { ProcessorWorker };// main thread
import { createChannel } from 'dic-nj';
const [local, remote] = createChannel<number>();
// pass the port to the worker as a transferable — zero-copy
const result = pool.process(fileUrl, remote.port); // starts in a pool worker
for await (const pct of local) {
console.log(`progress: ${pct}%`);
}
console.log(await result); // 'done'Low-level API
For cases where you don't need the decorator pattern:
import { createWorker, workerContext, isMainThread } from 'dic-nj';
// Main thread
const worker = createWorker<Request, Response>(url, { data: initData });
worker.onMessage(console.log).onError(console.error);
worker.postMessage({ type: 'ping' });
await worker.terminate();
// Worker file
const ctx = workerContext<Request, Response>();
ctx.onMessage(async (req) => {
ctx.postMessage({ result: await process(req) });
});
// Detect context
if (isMainThread) { /* ... */ }Error reference
| Error | When thrown |
|---|---|
| TokenNotFoundError | Token has no registered provider |
| CircularDependencyError | A → B → A cycle detected |
| InjectionContextError | inject() called outside container-managed instantiation |
| NotInjectableError | Plain class (no @Injectable) passed as a provider |
| NotAModuleError | Class without @Module passed to the compiler |
| AsyncProviderError | Async factory resolved via synchronous resolve() |
All errors extend DIError:
import { DIError } from 'dic-nj';
if (e instanceof DIError) console.error('DI error:', e.message);API reference
DI decorators
| | Description |
|---|---|
| @Injectable(options?) | Marks a class as a DI provider |
| @Module(options) | Declares a module |
DI functions
| | Description |
|---|---|
| inject(token, options?) | Resolve a dep inside a class field initialiser |
| optional(token) | Wrap a token as optional in a deps array |
DI classes
| | Description |
|---|---|
| Application | Bootstrap — Application.create(RootModule) |
| InjectionToken<T> | Typed token for non-class values |
| Container | Low-level register / resolve / createChild |
Cron
| | Description |
|---|---|
| @Task(id, schedule) | Marks a method as a cron task |
| registerCronTasks(instances) | Starts all tasks on the given instances |
| shutdownCronTasks() | Stops all running tasks |
| getJob(id) | Returns CronJobHandle for a registered task, or undefined |
Workers
| | Description |
|---|---|
| @Worker | Marks a class as a worker handler, auto-registers on module load |
| @Handle | Marks a method as callable from the main thread |
| spawnWorker<T>(url, options?) | Spawns a single worker, returns typed WorkerProxy<T> |
| createWorkerPool<T>(url, options) | Spawns N workers, routes calls to least-busy, returns WorkerPoolProxy<T> |
| createWorker(url, options?) | Low-level worker handle |
| workerContext() | Gets parentPort + workerData inside a worker file |
| isMainThread | true when running on the main thread |
| createChannel<T>() | Creates a typed [local, remote] channel pair for streaming between threads |
| channelFromPort<T>(port) | Wraps a received MessagePort as a Channel<T> inside a worker |
| SharedAtomicValue(initial?, buffer?) | Atomic Int32 counter backed by SharedArrayBuffer |
| createBus<T>(name) | Typed BroadcastChannel wrapper for worker ↔ worker fan-out |
Worker types
| | Description |
|---|---|
| WorkerProxy<T> | All @Handle methods of T as Promise-returning functions + terminate(): Promise<number> |
| WorkerPoolProxy<T> | Same as WorkerProxy<T> but terminate(): Promise<void> (drains all N workers) |
| WorkerHandle<TIn, TOut> | Low-level handle returned by createWorker |
| WorkerContext<TIn, TOut> | Context returned by workerContext |
| Channel<T> | Typed channel: send(), close(), port, [Symbol.asyncIterator] |
| WorkerBus<T> | Typed bus: emit(), on(handler) => unsubscribe, close() |
| WorkerPoolOptions | { size: number } & CreateWorkerOptions |
License
MIT
