@glidemq/hono
v0.2.1
Published
Hono middleware for glide-mq - queue management REST API and SSE events
Maintainers
Readme
@glidemq/hono
Hono middleware for glide-mq - mount a full queue management REST API and real-time SSE event stream in one line.
Declare your queues in config, mount the middleware, and get 11 REST endpoints + live SSE - no boilerplate. Works with Hono's typed RPC client out of the box.
Part of the glide-mq ecosystem:
| Package | Purpose | |---------|---------| | glide-mq | Core queue library - producers, workers, schedulers, workflows | | @glidemq/hono | Hono REST API + SSE middleware (you are here) | | @glidemq/dashboard | Express web UI for monitoring and managing queues | | @glidemq/nestjs | NestJS module - decorators, DI, lifecycle management | | examples | Framework integrations and use-case examples |
Install
npm install @glidemq/hono glide-mq honoOptional Zod validation:
npm install zod @hono/zod-validatorQuick Start
import { Hono } from 'hono';
import { glideMQ, glideMQApi } from '@glidemq/hono';
const app = new Hono();
app.use(glideMQ({
connection: { addresses: [{ host: 'localhost', port: 6379 }] },
queues: {
emails: {
processor: async (job) => {
await sendEmail(job.data.to, job.data.subject);
return { sent: true };
},
concurrency: 5,
},
reports: {},
},
}));
app.route('/api/queues', glideMQApi());
export default app;API
glideMQ(config)
Middleware factory. Creates a QueueRegistry and injects it into c.var.glideMQ.
interface GlideMQConfig {
connection?: ConnectionOptions; // Required unless testing: true
queues: Record<string, QueueConfig>;
prefix?: string; // Key prefix (default: 'glide')
testing?: boolean; // Use TestQueue/TestWorker (no Valkey)
}
interface QueueConfig {
processor?: (job: Job) => Promise<any>; // Omit for producer-only
concurrency?: number; // Default: 1
workerOpts?: Record<string, unknown>;
}glideMQApi(opts?)
Pre-built REST API sub-router. Mount it on any path.
interface GlideMQApiConfig {
queues?: string[]; // Restrict to specific queues
}REST Endpoints
| Method | Route | Description |
|--------|-------|-------------|
| POST | /:name/jobs | Add a job |
| GET | /:name/jobs | List jobs (query: type, start, end) |
| GET | /:name/jobs/:id | Get a single job |
| GET | /:name/counts | Get job counts by state |
| POST | /:name/pause | Pause queue |
| POST | /:name/resume | Resume queue |
| POST | /:name/drain | Drain waiting jobs |
| POST | /:name/retry | Retry failed jobs |
| DELETE | /:name/clean | Clean old jobs (query: grace, limit, type) |
| GET | /:name/workers | List active workers |
| GET | /:name/events | SSE event stream |
Adding Jobs
curl -X POST http://localhost:3000/api/queues/emails/jobs \
-H 'Content-Type: application/json' \
-d '{"name": "welcome", "data": {"to": "[email protected]"}, "opts": {"priority": 10}}'Retrying Failed Jobs
# Retry up to 50 failed jobs
curl -X POST http://localhost:3000/api/queues/emails/retry \
-H 'Content-Type: application/json' \
-d '{"count": 50}'
# Retry all failed jobs (omit body or send empty object)
curl -X POST http://localhost:3000/api/queues/emails/retryCleaning Old Jobs
# Remove completed jobs older than 1 hour, up to 200
curl -X DELETE 'http://localhost:3000/api/queues/emails/clean?grace=3600000&limit=200&type=completed'
# Remove all failed jobs (defaults: grace=0, limit=100, type=completed)
curl -X DELETE 'http://localhost:3000/api/queues/emails/clean?type=failed'SSE Events
The events endpoint streams real-time updates. Available event types: completed, failed, progress, active, waiting, stalled, and heartbeat.
const eventSource = new EventSource('/api/queues/emails/events');
eventSource.addEventListener('completed', (e) => {
console.log('Job completed:', JSON.parse(e.data));
});
eventSource.addEventListener('failed', (e) => {
console.log('Job failed:', JSON.parse(e.data));
});
eventSource.addEventListener('progress', (e) => {
console.log('Job progress:', JSON.parse(e.data));
});Type-Safe RPC Client
import { hc } from 'hono/client';
import type { GlideMQApiType } from '@glidemq/hono';
const client = hc<GlideMQApiType>('http://localhost:3000/api/queues');
const res = await client.emails.jobs.$post({
json: { name: 'welcome', data: { to: '[email protected]' } },
});Exported Types
import type {
GlideMQConfig, // Middleware configuration
GlideMQEnv, // Hono env type for c.var.glideMQ
GlideMQApiConfig, // API sub-router options
QueueConfig, // Per-queue config (processor, concurrency)
QueueRegistry, // Registry interface (for custom implementations)
ManagedQueue, // { queue, worker } pair returned by registry.get()
JobResponse, // Serialized job shape returned by API
JobCountsResponse, // { waiting, active, delayed, completed, failed }
WorkerInfoResponse, // Worker metadata
GlideMQApiType, // Hono RPC type for hc<GlideMQApiType>()
} from '@glidemq/hono';Utilities
For advanced use cases (custom routes, custom API sub-routers):
import { serializeJob, serializeJobs, createEventsRoute } from '@glidemq/hono';
// serializeJob(job) - Convert a glide-mq Job to a plain JSON-safe object
// serializeJobs(jobs) - Serialize an array of jobs
// createEventsRoute() - SSE event handler factory for custom routersTesting
No Valkey needed for unit tests:
import { createTestApp } from '@glidemq/hono/testing';
const { app, registry } = createTestApp({
emails: {
processor: async (job) => ({ sent: true }),
},
});
const res = await app.request('/emails/jobs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ name: 'test', data: {} }),
});
expect(res.status).toBe(201);
// Cleanup
await registry.closeAll();Note: SSE in testing mode emits
countsevents (polling-based state diffs) rather than job lifecycle events (completed,failed, etc.).
Direct Registry Access
Access the registry in your own routes:
app.post('/send-email', async (c) => {
const registry = c.var.glideMQ;
const { queue } = registry.get('emails');
const job = await queue.add('send', {
to: '[email protected]',
subject: 'Hello',
});
return c.json({ jobId: job?.id });
});Shutdown
For graceful shutdown, construct the registry yourself and pass it to glideMQ():
import { glideMQ, glideMQApi, QueueRegistryImpl } from '@glidemq/hono';
const registry = new QueueRegistryImpl({
connection: { addresses: [{ host: 'localhost', port: 6379 }] },
queues: { emails: { processor: processEmail } },
});
app.use(glideMQ(registry));
app.route('/api/queues', glideMQApi());
process.on('SIGTERM', async () => {
await registry.closeAll();
process.exit(0);
});License
Apache-2.0
