@coderule/qulite
v0.2.0
Published
SQLite queue for outbox/synchronization: UPSERT, dedup, leases, transactions.
Maintainers
Readme
@coderule/qulite
A minimalistic SQLite-based queue library for Node.js with UPSERT deduplication, atomic job claiming, and file system event coalescence.
Features
- UPSERT/Deduplication: Prevent duplicate jobs with unique dedupe keys
- Atomic Job Claiming: Lease-based claiming prevents multiple workers from processing the same job
- File System Event Coalescence: Intelligently merge related file system events
- Automatic Retries: Configurable retry logic with exponential backoff
- Transaction Safety: All operations are atomic and transaction-safe
- No External Dependencies: Only requires better-sqlite3
- TypeScript First: Full TypeScript support with comprehensive types
Installation
npm install @coderule/quliteQuick Start
import Database from 'better-sqlite3';
import { Qulite, Worker } from '@coderule/qulite';
// Create database and queue
const db = new Database('queue.db');
const queue = new Qulite(db);
// Enqueue a job with deduplication
const { id, upserted } = queue.upsertGeneric({
type: 'send-email',
dedupe_key: 'welcome-user-123', // Prevents duplicate emails
data: { to: '[email protected]', template: 'welcome' }
});
// Create a worker to process jobs
const worker = new Worker({
queue,
type: 'send-email', // Only process send-email jobs
processor: async (job, ctx) => {
try {
// Process the job
await sendEmail(JSON.parse(job.data));
return { kind: 'ack' }; // Mark as complete
} catch (error) {
if (job.attempts < 3) {
return { kind: 'retry', delayMs: 5000 }; // Retry after 5s
}
return { kind: 'fail', error: error.message }; // Give up
}
}
});
// Start processing
await worker.start();File System Events
Qulite includes specialized support for file system event processing with intelligent coalescence:
import { enqueueFsEvent } from '@coderule/qulite';
// Enqueue file system events
enqueueFsEvent(queue, {
root_id: 'project-123',
rel_path: 'src/index.ts',
kind: 'modify',
size: 1024,
mtime_ns: Date.now() * 1000000,
sha256: 'abc123...'
});
// Multiple modify events for the same file are automatically coalesced
enqueueFsEvent(queue, {
root_id: 'project-123',
rel_path: 'src/index.ts',
kind: 'modify',
size: 1048,
mtime_ns: Date.now() * 1000000,
sha256: 'def456...'
});
// Only the latest event is keptWorker Configuration
const worker = new Worker({
queue,
type: 'my-job-type', // Optional: only process specific job types
pollIntervalMs: 500, // How often to check for new jobs
logger: console, // Custom logger
processor: async (job, ctx) => {
// Access job data
console.log('Processing job:', job.id);
console.log('Attempt:', job.attempts);
// Use context methods
ctx.ack(); // Mark as complete
ctx.retry(1000); // Retry after 1 second
ctx.fail('error'); // Mark as failed
// Or return a result
return { kind: 'ack' };
}
});Queue Options
const queue = new Qulite(db, {
defaultLeaseMs: 30000, // Default lease duration (30s)
defaultMaxAttempts: 25, // Maximum retry attempts
busyTimeoutMs: 5000, // SQLite busy timeout
logger: customLogger // Custom logger implementation
});Job Lifecycle
- Pending: Job is waiting to be processed
- Processing: Job has been claimed by a worker (with lease)
- Done: Job completed successfully
- Failed: Job failed after maximum attempts
Advanced Usage
Manual Job Management
// Claim next available job
const job = queue.claimNext({
type: 'my-type', // Optional: filter by type
leaseOwner: 'worker-1', // Unique worker ID
leaseMs: 60000 // Lease duration
});
if (job) {
try {
// Process job...
queue.ack(job.id, 'worker-1');
} catch (error) {
queue.retry(job.id, 'worker-1', 5000); // Retry after 5s
}
}
// Requeue timed-out jobs
const requeuedCount = queue.requeueTimedOut();Queue Statistics
// Get counts by status
const counts = queue.getCounts();
console.log('Pending:', counts.pending);
console.log('Processing:', counts.processing);
console.log('Done:', counts.done);
console.log('Failed:', counts.failed);
// Clean up old jobs
queue.cleanupDone(7 * 24 * 60 * 60 * 1000); // Remove jobs older than 7 days
queue.cleanupFailed(30 * 24 * 60 * 60 * 1000); // Remove failed jobs older than 30 daysConcurrent Workers
// Run multiple workers for high throughput
const workers = [];
for (let i = 0; i < 4; i++) {
const worker = new Worker({
queue,
type: 'process-file',
processor: async (job) => {
// Process job...
return { kind: 'ack' };
}
});
workers.push(worker);
worker.start();
}
// Graceful shutdown
process.on('SIGTERM', async () => {
await Promise.all(workers.map(w => w.stop()));
db.close();
});Performance
Qulite is designed for high throughput with SQLite:
- Uses WAL mode for concurrent reads/writes
- Prepared statements for all queries
- Intelligent indexing for fast job polling
- Efficient deduplication with unique constraints
Benchmark results (on a typical development machine):
- Single worker: ~1,000 jobs/second
- 4 workers: ~3,500 jobs/second
- 8 workers: ~6,000 jobs/second
Run benchmarks yourself:
npm run benchAPI Reference
Qulite
constructor(db: Database, options?: QuliteOptions)upsertGeneric(params): { id: number, upserted: boolean }upsertFsEvent(params): { id: number, coalesced: boolean }claimNext(options?: ClaimOptions): PersistedJob | nullack(id: number, leaseOwner: string): booleanfail(id: number, leaseOwner: string, error?: string): booleanretry(id: number, leaseOwner: string, delayMs: number): booleanrequeueTimedOut(): numbergetCounts(): { pending, processing, done, failed }cleanupDone(olderThanMs: number): numbercleanupFailed(olderThanMs: number): number
Worker
constructor(options: WorkerOptions)start(): Promise<void>stop(): Promise<void>
Types
interface PersistedJob {
id: number;
type: string;
status: JobStatus;
priority: number;
run_after: number;
created_at: number;
updated_at: number;
attempts: number;
max_attempts: number;
lease_owner?: string;
lease_expires_at?: number;
last_error?: string;
done_at?: number;
failed_at?: number;
dedupe_key?: string;
data?: string;
// File system specific fields
root_id?: string;
rel_path?: string;
kind?: FsEventKind;
to_path?: string;
size?: number;
mtime_ns?: number;
sha256?: string;
}
type ProcessResult =
| { kind: 'ack' }
| { kind: 'retry'; delayMs?: number }
| { kind: 'fail'; error?: string };License
MIT
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and feature requests, please use the GitHub issue tracker.
