@glidemq/nestjs
v0.2.2
Published
NestJS module for glide-mq - decorators, dependency injection, and lifecycle management for queues and workers
Maintainers
Readme
@glidemq/nestjs
NestJS module for glide-mq - type-safe decorators and DI for high-performance queues with AI orchestration.
Why
- Decorator-based processors -
@Processorand@BroadcastProcessorauto-wire workers on startup - Full DI integration -
@InjectQueue,@InjectFlowProducer,@InjectBroadcast,@InjectProducerwork with NestJS's container - Zero-boilerplate shutdown - all workers, queues, and connections close automatically via
OnApplicationShutdown
Install
npm install @glidemq/nestjs glide-mq @nestjs/common @nestjs/coreRequires glide-mq >= 0.14.0 and NestJS 10+.
Quick start
// app.module.ts
import { Module } from "@nestjs/common";
import { GlideMQModule } from "@glidemq/nestjs";
@Module({
imports: [
GlideMQModule.forRoot({
connection: { addresses: [{ host: "localhost", port: 6379 }] },
}),
GlideMQModule.registerQueue({ name: "emails" }),
],
providers: [EmailProcessor, EmailService],
})
export class AppModule {}
// email.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from "@glidemq/nestjs";
import type { Job } from "glide-mq";
@Processor("emails")
export class EmailProcessor extends WorkerHost {
async process(job: Job) {
await sendEmail(job.data.to, job.data.subject);
return { sent: true };
}
@OnWorkerEvent("failed")
onFailed(job: Job, err: Error) {
console.error(`Job ${job.id} failed:`, err.message);
}
}
// email.service.ts
import { Injectable } from "@nestjs/common";
import { InjectQueue } from "@glidemq/nestjs";
import type { Queue } from "glide-mq";
@Injectable()
export class EmailService {
constructor(@InjectQueue("emails") private readonly queue: Queue) {}
async send(to: string, subject: string) {
await this.queue.add("send", { to, subject });
}
}AI-native features
glide-mq 0.14+ provides AI orchestration primitives - token/cost tracking, real-time streaming, human-in-the-loop suspend/signal, model failover chains, budget caps, dual-axis rate limiting, vector search, and rolling usage summaries. All are accessible through the injected Queue, Worker, FlowProducer, Broadcast, and QueueEvents instances in your NestJS services.
If you need to create and inspect flows over HTTP from cross-language or non-Nest clients, use the core glide-mq proxy or one of the HTTP wrapper integrations:
@glidemq/hono@glidemq/fastify@glidemq/hapi
These expose:
POST /flowsGET /flows/:idGET /flows/:id/treeDELETE /flows/:id
@glidemq/nestjs is a DI/module integration and does not provide an HTTP API layer.
Usage tracking and streaming
// llm.processor.ts
import { Processor, WorkerHost } from "@glidemq/nestjs";
import type { Job } from "glide-mq";
@Processor("llm-tasks")
export class LlmProcessor extends WorkerHost {
async process(job: Job) {
const response = await callLlm(job.data.prompt);
// Stream reasoning and content chunks back in real time
for (const chunk of response.reasoningChunks) {
await job.streamChunk("reasoning", chunk);
}
for (const chunk of response.contentChunks) {
await job.streamChunk("content", chunk);
}
await job.streamChunk("done");
// Report token usage and cost
await job.reportUsage({
model: "claude-sonnet-4-20250514",
provider: "anthropic",
tokens: {
input: response.inputTokens,
output: response.outputTokens,
reasoning: response.reasoningTokens,
},
costs: { total: response.cost },
costUnit: "usd",
});
return { result: response.text };
}
}Flow-level budgets
// orchestration.service.ts
import { Injectable } from "@nestjs/common";
import { InjectFlowProducer } from "@glidemq/nestjs";
import type { FlowProducer } from "glide-mq";
@Injectable()
export class OrchestrationService {
constructor(
@InjectFlowProducer("llm-flow") private readonly flow: FlowProducer,
) {}
async runChain(prompt: string) {
await this.flow.add(
{
name: "summarize",
queueName: "llm-tasks",
data: { prompt },
children: [
{ name: "research", queueName: "llm-tasks", data: { prompt } },
{ name: "draft", queueName: "llm-tasks", data: { prompt } },
],
},
{
budget: {
maxTotalTokens: 10000,
tokenWeights: { reasoning: 4, cachedInput: 0.25 },
maxTotalCost: 0.5,
costUnit: "usd",
onExceeded: "fail",
},
},
);
}
}The budget is enforced across all jobs in the flow. When the weighted token total or cost cap is hit, remaining jobs fail (or pause, depending on onExceeded). See the glide-mq docs for the full API.
Queue-wide usage summaries
import { Injectable } from "@nestjs/common";
import { InjectQueue } from "@glidemq/nestjs";
import type { Queue } from "glide-mq";
@Injectable()
export class UsageService {
constructor(@InjectQueue("llm-tasks") private readonly queue: Queue) {}
async summary() {
return this.queue.getUsageSummary({ windowMs: 60_000 });
}
}Use the instance method for queue-local summaries, or Queue.getUsageSummary(...) when you want to aggregate across multiple queues.
Configuration
| Method | Description |
|--------|-------------|
| GlideMQModule.forRoot(opts) | Global module with connection config |
| GlideMQModule.forRootAsync(opts) | Async config via useFactory, useClass, or useExisting |
| GlideMQModule.registerQueue(...opts) | Register queues for injection |
| GlideMQModule.registerFlowProducer(...opts) | Register FlowProducers for DAG workflows |
| GlideMQModule.registerBroadcast(...opts) | Register Broadcast instances for pub/sub |
| GlideMQModule.registerProducer(...opts) | Register lightweight Producers (serverless) |
Decorators: @Processor, @BroadcastProcessor, @InjectQueue, @InjectFlowProducer, @InjectBroadcast, @InjectProducer, @OnWorkerEvent, @QueueEventsListener, @OnQueueEvent
Testing
Pass testing: true to use in-memory TestQueue/TestWorker - no Valkey required:
const moduleRef = await Test.createTestingModule({
imports: [
GlideMQModule.forRoot({ testing: true }),
GlideMQModule.registerQueue({ name: "emails" }),
],
providers: [EmailProcessor, EmailService],
}).compile();
const service = moduleRef.get(EmailService);
await service.send("[email protected]", "Hello");Limitations
- Requires NestJS 10+ and Node.js 20+.
@BroadcastProcessorand@QueueEventsListenerare skipped in testing mode.registerBroadcastandregisterProduceralways require a live connection (no testing mode).
Links
- glide-mq - core library
- Full documentation
- Issues
- @glidemq/hono | @glidemq/fastify | @glidemq/hapi | @glidemq/dashboard
