@glidemq/fastify
v0.1.0
Published
Fastify plugin for glide-mq - queue management REST API and SSE events
Maintainers
Readme
@glidemq/fastify
Fastify plugin for glide-mq — mount a full queue management REST API and real-time SSE event stream with two plugin registrations.
Declare your queues in config, register the plugins, and get 11 REST endpoints + live SSE — no boilerplate. Uses Fastify's decorator and lifecycle patterns.
Part of the glide-mq ecosystem:
| Package | Purpose | |---------|---------| | glide-mq | Core queue library — producers, workers, schedulers, workflows | | @glidemq/hono | Hono REST API + SSE middleware | | @glidemq/fastify | Fastify REST API + SSE plugin (you are here) | | @glidemq/dashboard | Express web UI for monitoring and managing queues | | @glidemq/nestjs | NestJS module — decorators, DI, lifecycle management | | examples | Framework integrations and use-case examples |
Install
npm install @glidemq/fastify glide-mq fastifyOptional Zod validation:
npm install zodQuick Start
import Fastify from 'fastify';
import { glideMQPlugin, glideMQRoutes } from '@glidemq/fastify';
const app = Fastify();
await app.register(glideMQPlugin, {
connection: { addresses: [{ host: 'localhost', port: 6379 }] },
queues: {
emails: {
processor: async (job) => {
await sendEmail(job.data.to, job.data.subject);
return { sent: true };
},
concurrency: 5,
},
reports: {},
},
});
await app.register(glideMQRoutes, { prefix: '/api/queues' });
await app.listen({ port: 3000 });API
glideMQPlugin
Core plugin. Creates a QueueRegistry and decorates the Fastify instance with fastify.glidemq. Automatically closes all queues and workers on app shutdown via the onClose hook.
interface GlideMQPluginOptions {
connection?: ConnectionOptions; // Required unless testing: true
queues: Record<string, QueueConfig>;
prefix?: string; // Key prefix (default: 'glide')
testing?: boolean; // Use TestQueue/TestWorker (no Valkey)
}
interface QueueConfig {
processor?: (job: Job) => Promise<any>; // Omit for producer-only
concurrency?: number; // Default: 1
workerOpts?: Record<string, unknown>;
}You can also pass a pre-built QueueRegistry instance directly:
const registry = new QueueRegistryImpl({ ... });
await app.register(glideMQPlugin, registry as any);glideMQRoutes
Pre-built REST API routes plugin. Requires glideMQPlugin to be registered first.
interface GlideMQRoutesOptions {
queues?: string[]; // Restrict to specific queues
}REST Endpoints
| Method | Route | Description |
|--------|-------|-------------|
| POST | /:name/jobs | Add a job |
| GET | /:name/jobs | List jobs (query: type, start, end) |
| GET | /:name/jobs/:id | Get a single job |
| GET | /:name/counts | Get job counts by state |
| POST | /:name/pause | Pause queue |
| POST | /:name/resume | Resume queue |
| POST | /:name/drain | Drain waiting jobs |
| POST | /:name/retry | Retry failed jobs |
| DELETE | /:name/clean | Clean old jobs (query: grace, limit, type) |
| GET | /:name/workers | List active workers |
| GET | /:name/events | SSE event stream |
Adding Jobs
curl -X POST http://localhost:3000/api/queues/emails/jobs \
-H 'Content-Type: application/json' \
-d '{"name": "welcome", "data": {"to": "[email protected]"}, "opts": {"priority": 10}}'Retrying Failed Jobs
# Retry up to 50 failed jobs
curl -X POST http://localhost:3000/api/queues/emails/retry \
-H 'Content-Type: application/json' \
-d '{"count": 50}'
# Retry all failed jobs (omit body or send empty object)
curl -X POST http://localhost:3000/api/queues/emails/retryCleaning Old Jobs
# Remove completed jobs older than 1 hour, up to 200
curl -X DELETE 'http://localhost:3000/api/queues/emails/clean?grace=3600000&limit=200&type=completed'
# Remove all failed jobs (defaults: grace=0, limit=100, type=completed)
curl -X DELETE 'http://localhost:3000/api/queues/emails/clean?type=failed'SSE Events
The events endpoint streams real-time updates. Available event types: completed, failed, progress, active, waiting, stalled, and heartbeat.
const eventSource = new EventSource('/api/queues/emails/events');
eventSource.addEventListener('completed', (e) => {
console.log('Job completed:', JSON.parse(e.data));
});
eventSource.addEventListener('failed', (e) => {
console.log('Job failed:', JSON.parse(e.data));
});
eventSource.addEventListener('progress', (e) => {
console.log('Job progress:', JSON.parse(e.data));
});Exported Types
import type {
GlideMQPluginOptions, // Core plugin options
GlideMQRoutesOptions, // Routes plugin options
GlideMQConfig, // Full configuration
QueueConfig, // Per-queue config (processor, concurrency)
QueueRegistry, // Registry interface (for custom implementations)
ManagedQueue, // { queue, worker } pair returned by registry.get()
JobResponse, // Serialized job shape returned by API
JobCountsResponse, // { waiting, active, delayed, completed, failed }
WorkerInfoResponse, // Worker metadata
} from '@glidemq/fastify';Utilities
For advanced use cases (custom routes, custom API sub-routers):
import { serializeJob, serializeJobs, createEventsRoute } from '@glidemq/fastify';
// serializeJob(job) - Convert a glide-mq Job to a plain JSON-safe object
// serializeJobs(jobs) - Serialize an array of jobs
// createEventsRoute() - SSE event handler factory for custom routesTesting
No Valkey needed for unit tests:
import { createTestApp } from '@glidemq/fastify/testing';
const { app, registry } = await createTestApp({
emails: {
processor: async (job) => ({ sent: true }),
},
});
const res = await app.inject({
method: 'POST',
url: '/emails/jobs',
payload: { name: 'test', data: {} },
});
expect(res.statusCode).toBe(201);
// Cleanup
await app.close();Note: SSE in testing mode emits
countsevents (polling-based state diffs) rather than job lifecycle events (completed,failed, etc.).
Direct Registry Access
Access the registry in your own routes:
app.post('/send-email', async (request, reply) => {
const registry = app.glidemq;
const { queue } = registry.get('emails');
const job = await queue.add('send', {
to: '[email protected]',
subject: 'Hello',
});
return reply.send({ jobId: job?.id });
});Shutdown
Graceful shutdown is automatic — the onClose hook calls registry.closeAll(). For manual control:
import { glideMQPlugin, glideMQRoutes, QueueRegistryImpl } from '@glidemq/fastify';
const registry = new QueueRegistryImpl({
connection: { addresses: [{ host: 'localhost', port: 6379 }] },
queues: { emails: { processor: processEmail } },
});
await app.register(glideMQPlugin, registry as any);
await app.register(glideMQRoutes, { prefix: '/api/queues' });
// Or handle shutdown yourself:
process.on('SIGTERM', async () => {
await app.close(); // triggers onClose hook → registry.closeAll()
process.exit(0);
});License
Apache-2.0
