@monque/tsed
v1.0.0
Published
Ts.ED integration for Monque - A robust, type-safe MongoDB job queue for TypeScript
Maintainers
Readme
A Ts.ED integration for Monque, a robust, type-safe MongoDB job queue for TypeScript.
Features
- 🎯 Decorator-based API:
@JobController,@Job, and@Cronfor declarative job handling. - 💉 Dependency Injection: Full support for Ts.ED DI (inject Services/Providers into your jobs).
- 🔒 Job Isolation: Each job execution runs in a dedicated
DIContextwith Request Scope support. - 🔍 Type Safety: Leverage TypeScript generics for fully typed job payloads.
- ⚡ Full Monque Power: Complete access to all
@monque/corefeatures (backoff, heartbeats, atomic locking). - 🛠️ Seamless Integration: Native lifecycle hooks support (
$onInit,$onDestroy) for graceful scheduler management.
Installation
bun add @monque/tsed @monque/core @tsed/mongoose mongooseOr using npm/yarn/pnpm:
npm install @monque/tsed @monque/core @tsed/mongoose mongoose
yarn add @monque/tsed @monque/core @tsed/mongoose mongoose
pnpm add @monque/tsed @monque/core @tsed/mongoose mongooseConfiguration
Import MonqueModule in your Server.ts and configure the connection:
import { Configuration } from "@tsed/di";
import { MonqueModule } from "@monque/tsed";
import "@tsed/mongoose";
import "@tsed/platform-express"; // or @tsed/platform-koa
@Configuration({
imports: [
MonqueModule
],
mongoose: [
{
id: "default",
url: "mongodb://localhost:27017/my-app",
connectionOptions: {}
}
],
monque: {
enabled: true,
// Option 1: Reuse existing Mongoose connection (Recommended)
mongooseConnectionId: "default",
// Option 2: Provide existing Db instance via factory
// dbFactory: async () => {
// const client = new MongoClient(process.env.MONGO_URL);
// await client.connect();
// return client.db("my-app");
// },
}
})
export class Server {}Usage
1. Define a Job Controller
Create a class decorated with @JobController. Methods decorated with @Job will process jobs.
import { JobController, Job } from "@monque/tsed";
import { Job as MonqueJob } from "@monque/core";
import { EmailService } from "./services/EmailService";
interface EmailPayload {
to: string;
subject: string;
}
@JobController("email") // Namespace prefix: "email."
export class EmailJobs {
constructor(private emailService: EmailService) {}
@Job("send", { concurrency: 5 }) // Job name: "email.send"
async sendEmail(job: MonqueJob<EmailPayload>) {
await this.emailService.send(
job.data.to,
job.data.subject
);
}
}2. Schedule Jobs (Cron)
Use the @Cron decorator to schedule recurring tasks.
import { JobController, Cron } from "@monque/tsed";
@JobController()
export class ReportJobs {
@Cron("0 0 * * *", { name: "daily-report" })
async generateDailyReport() {
console.log("Generating report...");
}
}3. Enqueue Jobs
Inject MonqueService to dispatch jobs from anywhere in your app.
import { Service, Inject } from "@tsed/di";
import { MonqueService } from "@monque/tsed";
@Service()
export class AuthService {
@Inject()
private monque: MonqueService;
async registerUser(user: User) {
// ... save user ...
// Dispatch background job
await this.monque.enqueue("email.send", {
to: user.email,
subject: "Welcome!"
});
}
}API Reference
Decorators
@JobController(namespace?: string)
Class decorator to register a job controller.
namespace: Optional prefix for all job names in this class.
@Job(name: string, options?: WorkerOptions)
Method decorator to register a job handler.
name: Job name (combined with namespace).options: Supports all@monque/coreWorkerOptions (concurrency, lockTimeout, etc.).
@Cron(pattern: string, options?: ScheduleOptions)
Method decorator to register a scheduled job.
pattern: Cron expression (e.g.,* * * * *).options: Supports all@monque/coreScheduleOptions (tz, job name override, etc.).
Services
MonqueService
Injectable wrapper for the main Monque instance. It exposes the full Monque public API through dependency injection.
Job Scheduling:
enqueue(name, data, opts)- Enqueue a jobschedule(cron, name, data, opts)- Schedule a recurring jobnow(name, data)- Enqueue for immediate processing
Job Management:
getJob(id)/getJobs(filter)- Query jobscancelJob(id)/cancelJobs(filter)- Cancel jobsretryJob(id)/retryJobs(filter)- Retry failed jobsdeleteJob(id)/deleteJobs(filter)- Delete jobsrescheduleJob(id, date)- Change execution time
Observability:
getQueueStats(filter?)- Get queue statisticsisHealthy()- Check scheduler healthgetJobsWithCursor(opts)- Paginated job list
[!TIP] All methods on
MonqueServicedelegate to the underlyingMonqueinstance. For a complete list of methods and options, see the @monque/core documentation.
Testing
Use @tsed/platform-http/testing and PlatformTest to test your workers. You can mock MonqueService or use a real Mongo connection with Testcontainers.
import { PlatformTest } from "@tsed/platform-http/testing";
import { MonqueService } from "@monque/tsed";
describe("EmailJobs", () => {
beforeEach(PlatformTest.create);
afterEach(PlatformTest.reset);
it("should process email", async () => {
const service = PlatformTest.get<MonqueService>(MonqueService);
// ... test logic ...
});
});License
ISC
