npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

nuxt-queue

v0.4.4

Published

Nuxt queue service based on Bullmq

Downloads

36

Readme

Nuxt Queue

Event-sourced queue and flow orchestration for Nuxt. Built on BullMQ with integrated real-time monitoring and multi-step workflow support.

✨ Features

  • 🔄 Queue Management: Reliable job processing with BullMQ
  • 🎭 Flow Orchestration: Multi-step workflows with event sourcing
  • Flow Scheduling: Cron-based and delayed flow execution
  • Real-time Updates: Redis Pub/Sub for <100ms latency monitoring
  • 📊 Event Sourcing: Complete audit trail of all flow operations
  • 🎨 Development UI: Visual flow diagrams, timeline, and scheduling
  • 🔌 Worker Context: Rich runtime with state, logging, and events
  • 📦 Auto-discovery: Filesystem-based worker registry
  • 🚀 Horizontal Scaling: Stateless architecture for easy scaling
  • 🔍 Full Observability: Real-time logs, metrics, and event streams

Version: v0.4.0
Status: ✅ Current Implementation
Last Updated: 2025-11-07 ✅ Core queue and flow functionality
✅ Event sourcing with Redis Streams
✅ Real-time monitoring UI with Vue Flow diagrams
✅ Flow scheduling (cron patterns and delays)
✅ Worker context with state, logging, and events
✅ Auto-discovery and flow analysis
🚧 Comprehensive trigger system (planned v0.5)
🚧 Python workers (planned v0.5)
🚧 Postgres adapters (planned v0.6)

🗃️ Event Schema & Storage

All flow operations are event-sourced and stored in Redis Streams (nq:flow:<runId>). Events are immutable, type-safe, and provide a complete audit trail.

Event types:

  • flow.start, flow.completed, flow.failed
  • step.started, step.completed, step.failed, step.retry
  • log, emit, state

See Event Schema for full details and field definitions.

🏆 Best Practices

  • Keep steps small and focused
  • Use state for shared data between steps
  • Use ctx.flow.emit() to trigger downstream steps
  • Log with context using ctx.logger.log()
  • Set concurrency based on resource needs
  • Use on-complete state cleanup for automatic state management
  • Document schedules with metadata for maintainability

⚠️ Limitations (v0.4)

  1. TypeScript only: Python workers not yet implemented (planned for v0.5)
  2. No complex triggers: Only basic scheduling available (v0.5 will add triggers)
  3. No await patterns: Pausing flows for time/events planned for v0.5
  4. Redis only: No Postgres adapter yet (planned for v0.6)
  5. State separate from events: Not unified with stream store (planned for v0.6)
  6. Basic logging: No advanced logger adapters (planned for v0.7)
  7. No schedule editing: Must delete and recreate schedules (v0.5 will add full trigger management)

🚀 Quick Start

Installation

npx nuxi@latest module add nuxt-queue

Configuration

// nuxt.config.ts
export default defineNuxtConfig({
  modules: ['nuxt-queue'],
  queue: {
    ui: true,  // Enable dev UI
    // Shortcut: Configure all backends with one setting
    store: {
      adapter: 'redis',
      redis: {
        host: '127.0.0.1',
        port: 6379,
      },
    },
    // Or configure individually:
    // queue: {
    //   adapter: 'redis',
    //   redis: { host: '127.0.0.1', port: 6379 },
    //   defaultConfig: { concurrency: 2 }
    // },
    // state: {
    //   adapter: 'redis',
    //   redis: { host: '127.0.0.1', port: 6379 }
    // },
    // eventStore: {
    //   adapter: 'memory'  // Use memory for events
    // },
  },
})

Create Your First Worker

// server/queues/example/process.ts
export default defineQueueWorker(async (job, ctx) => {
  // Access job data
  const { message } = job.data
  
  // Log to stream
  ctx.logger.log('info', 'Processing message', { message })
  
  // Store state
  await ctx.state.set('processedAt', new Date().toISOString())
  
  // Return result
  return { success: true, processed: message }
})

export const config = defineQueueConfig({
  concurrency: 5,
})

Enqueue a Job

// API route or wherever
const queueProvider = useQueueProvider()
await queueProvider.enqueue('process', {
  name: 'process',
  data: { message: 'Hello World' }
})

Create a Flow

Multi-step workflows with event-driven orchestration:

// server/queues/my-flow/start.ts
export default defineQueueWorker(async (job, ctx) => {
  ctx.logger.log('info', 'Flow started')
  const prepared = { step: 1, data: job.data }
  
  // Emit event to trigger next steps
  ctx.flow.emit('data.prepared', prepared)
  
  return prepared
})

export const config = defineQueueConfig({
  flow: {
    names: ['my-flow'],
    role: 'entry',
    step: 'start',
    emits: ['data.prepared']
  }
})

// server/queues/my-flow/process.ts
export default defineQueueWorker(async (job, ctx) => {
  const result = await processData(job.data)
  
  // Emit to trigger next step
  ctx.flow.emit('data.processed', result)
  
  return result
})

export const config = defineQueueConfig({
  flow: {
    names: ['my-flow'],
    role: 'step',
    step: 'process',
    subscribes: ['data.prepared'],  // Triggered by start
    emits: ['data.processed']
  }
})

// server/queues/my-flow/validate.ts
export default defineQueueWorker(async (job, ctx) => {
  const validated = await validate(job.data)
  ctx.flow.emit('validation.complete', validated)
  return validated
})

export const config = defineQueueConfig({
  flow: {
    names: ['my-flow'],
    role: 'step',
    step: 'validate',
    subscribes: ['data.prepared'],  // Also triggered by start (parallel with process)
    emits: ['validation.complete']
  }
})

Start the flow:

const { startFlow } = useFlowEngine()
await startFlow('my-flow', { input: 'data' })

Flow execution: Entry step emits data.prepared → Both process and validate steps run in parallel (they both subscribe to data.prepared) → Each emits its own completion event for downstream steps.

Schedule a Flow

Schedule flows to run automatically with cron patterns or delays:

// Schedule a flow to run daily at 2 AM
await $fetch('/api/_flows/my-flow/schedule', {
  method: 'POST',
  body: {
    cron: '0 2 * * *',
    input: { retentionDays: 30 },
    metadata: {
      description: 'Daily cleanup job'
    }
  }
})

// Schedule a one-time delayed execution (5 minutes)
await $fetch('/api/_flows/reminder-flow/schedule', {
  method: 'POST',
  body: {
    delay: 300000,  // milliseconds
    input: { userId: '123', message: 'Check your email' }
  }
})

// List all schedules for a flow
const schedules = await $fetch('/api/_flows/my-flow/schedules')

// Delete a schedule
await $fetch('/api/_flows/my-flow/schedules/schedule-id', {
  method: 'DELETE'
})

Common cron patterns:

  • * * * * * - Every minute
  • */5 * * * * - Every 5 minutes
  • 0 * * * * - Every hour
  • 0 2 * * * - Daily at 2 AM
  • 0 9 * * 1 - Every Monday at 9 AM
  • 0 0 1 * * - First day of month at midnight

🎨 Development UI

Access the built-in UI as <QueueApp /> component:

  • 📊 Dashboard: Overview of queues and flows
  • 🔄 Flow Diagrams: Visual representation with Vue Flow
  • Flow Scheduling: Create and manage cron-based or delayed schedules
  • 📝 Event Timeline: Real-time event stream with step details
  • 📋 Logs: Filtered logging by flow/step
  • 📈 Metrics: Queue statistics and performance
  • 🔍 Flow Runs: Complete history with status tracking

🏗️ Architecture

Event Sourcing

Every flow operation is stored as an event in Redis Streams:

nq:flow:<runId>
├─ flow.start
├─ step.started
├─ log
├─ step.completed
├─ step.started
├─ log
├─ step.completed
└─ flow.completed

Real-time Distribution

Events are broadcast via Redis Pub/Sub for instant UI updates (<100ms latency).

Worker Context

Every worker receives a rich context:

{
  jobId: string              // BullMQ job ID
  queue: string              // Queue name
  flowId: string             // Flow run UUID
  flowName: string           // Flow definition name
  stepName: string           // Current step name
  logger: {
    log(level, msg, meta)    // Structured logging
  },
  state: {
    get(key)                 // Get flow-scoped state
    set(key, value, opts)    // Set with optional TTL
    delete(key)              // Delete state
  },
  flow: {
    emit(eventName, data)    // Emit flow event to trigger subscribed steps
    startFlow(name, input)   // Start nested flow
  }
}

📚 Documentation

v0.4 Documentation

API & Advanced

Roadmap & Future

🔮 Roadmap

v0.4 (Current - November 2025)

✅ Core queue and flow orchestration
✅ Event sourcing with Redis Streams
✅ Real-time monitoring UI
✅ Flow scheduling (cron and delays)
✅ Worker context with state and logging

v0.5

  • 🎯 Comprehensive trigger system (schedule, webhook, event, manual)
  • ⏱️ Await patterns (time, event, condition)
  • 🐍 Python worker support with RPC bridge
  • 🔗 Webhook triggers with auto-setup

v0.6

  • 🐘 PgBoss queue provider option
  • 🗄️ Postgres stream store adapter
  • 🔄 Unified state and event storage
  • 📊 Advanced state management

v0.7

  • 📊 Enhanced logger with multiple adapters
  • 🌐 HTTP mode for workers (REST/gRPC)
  • 🔌 External service hooks
  • 🎨 Pluggable worker execution modes

See specs/roadmap.md for complete details.

🤝 Contributing

Contributions welcome! Please read our architecture docs first:

  1. Review specs/v0.4/current-implementation.md
  2. Check specs/roadmap.md for planned features
  3. Open an issue to discuss changes
  4. Submit a PR with tests

Development Setup

# Install dependencies
yarn install

# Start playground with dev UI
cd playground
yarn dev

# Run tests
yarn test

📄 License

MIT License - Copyright (c) DevJoghurt