npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@queue-ti/client

v2026.5.4

Published

Node.js client for the queue-ti message queue service

Downloads

920

Readme

@queue-ti/client

A Node.js (TypeScript) client library for producing and consuming messages from a queue-ti server.

Installation

npm install @queue-ti/client

Import as:

import { connect } from '@queue-ti/client'

Quick Start

Producer

import { connect } from '@queue-ti/client'

const client = await connect('localhost:50051', { insecure: true })
const producer = client.producer()

const id = await producer.publish('orders', Buffer.from(JSON.stringify({ amount: 99.99 })), {
  metadata: { source: 'checkout' },
})
console.log('enqueued:', id)

client.close()

Consumer

import { connect } from '@queue-ti/client'

const client = await connect('localhost:50051', { insecure: true })
const consumer = client.consumer('orders', { concurrency: 4 })

// Consume until process receives SIGINT
const signal = AbortSignal.timeout(60_000) // Stop after 60s for this example
await consumer.consume(async (msg) => {
  console.log(`[${msg.id}] ${msg.payload.toString()}`)
  // return normally to Ack; throw an error to Nack
})

client.close()

Connecting

connect(address: string, options?: ConnectOptions): Promise<Client>

Opens a gRPC connection to the server at address.

// No auth (local/dev)
const client = await connect('localhost:50051', { insecure: true })

// With JWT authentication and automatic token refresh
const client = await connect('localhost:50051', {
  insecure: true,
  token: initialToken,
  tokenRefresher: async () => {
    const resp = await fetch('http://localhost:8080/api/auth/refresh', {
      method: 'POST',
      headers: { Authorization: `Bearer ${currentToken}` },
    })
    const { token } = await resp.json()
    return token
  },
})

Always call client.close() when done (typically via finally or in a cleanup handler).

ConnectOptions

| Option | Type | Description | |---|---|---| | insecure? | boolean | Disable TLS — suitable for local and Docker deployments (default: false) | | token? | string | Attach a JWT Bearer token to every RPC call | | tokenRefresher? | () => Promise<string> | Callback the library calls automatically to obtain a fresh token before expiry |


Client

client.producer(): Producer

Returns a producer for enqueuing messages.

const producer = client.producer()

client.consumer(topic: string, options?: ConsumerOptions): Consumer

Returns a consumer for the given topic.

const consumer = client.consumer('orders', {
  concurrency: 8,
  visibilityTimeoutSeconds: 60,
})

client.setToken(token: string): void

Updates the authentication token on a live connection. The new token takes effect on the next RPC call.

client.setToken(newToken)

Throws an error if the client was not created with a token option.

client.close(): void

Closes the gRPC connection and stops background token refresh (if enabled).

client.close()

Producing Messages

producer.publish(topic: string, payload: Buffer | Uint8Array, options?: PublishOptions): Promise<string>

Enqueues payload on topic. Returns the assigned message ID.

const id = await producer.publish('payments', Buffer.from('...'))

// With metadata
const id = await producer.publish('payments', payload, {
  metadata: {
    tenant: 'acme',
    trace: traceID,
  },
})

// With a deduplication key (upserts pending messages with same key)
const id = await producer.publish('orders', payload, {
  key: 'order-123',
  metadata: { customer: 'acme' },
})

PublishOptions

| Option | Type | Description | |---|---|---| | metadata? | Record<string, string> | Arbitrary key-value metadata attached to the message | | key? | string | Deduplication key for upsert semantics |

Idempotent publishing with key

When you publish a message with a key, queue-ti uses upsert semantics: if a pending message with the same topic and key already exists in the queue, its payload and metadata are updated in place (the existing message ID is returned). This ensures idempotency when retrying publish operations.

Caveat: Once a message begins processing (transitions to processing status), it is no longer considered "pending". A key match only applies to messages awaiting processing. If the keyed message is already processing, a new row is inserted.

// Idempotent publish: if a message with topic="orders" and key="order-123" 
// exists and is pending, it is updated. Otherwise a new message is created.
const id = await producer.publish('orders', Buffer.from(`{"amount": 150.00}`), {
  key: 'order-123',
  metadata: { customer: 'acme' },
})
console.log('message id:', id) // same on retry if order-123 is still pending

Consuming Messages

consumer.consume(handler: MessageHandler): Promise<void>

Starts consuming messages from the topic. Blocks until the signal (passed in ConsumerOptions) is aborted, then returns.

const signal = AbortSignal.timeout(60_000)
await consumer.consume(async (msg) => {
  // Process the message.
  await processPayload(msg.payload)
  // Return normally to Ack; throw an error to Nack
})

Ack/Nack behaviour:

| Handler return | Effect | |---|---| | Returns normally | Ack — message is permanently removed from the queue | | Throws an error | Nack — message re-appears after the visibility timeout; the error message is stored as the failure reason |

Reconnection: if the stream is interrupted (network error, server restart), consume reconnects automatically with exponential backoff starting at 500 ms, doubling up to a maximum of 30 s.

Concurrency: messages are dispatched to the handler up to concurrency times in parallel. The promise resolves after the handler completes and the Ack/Nack is recorded.

import { signal } from 'os'

const abortController = new AbortController()

// Stop gracefully on SIGINT/SIGTERM
process.on('SIGINT', () => abortController.abort())
process.on('SIGTERM', () => abortController.abort())

const consumer = client.consumer('orders', {
  concurrency: 4,
  signal: abortController.signal,
})

await consumer.consume(async (msg) => {
  console.log(`processing ${msg.id}`)
  await processPayload(msg.payload)
})

consumer.consumeBatch(options: BatchOptions, handler: BatchHandler): Promise<void>

Polls the queue in batches and dispatches each batch to the handler. Returns when the signal (passed in ConsumerOptions) is aborted.

const signal = AbortSignal.timeout(60_000)
await consumer.consumeBatch(
  { batchSize: 50, visibilityTimeoutSeconds: 60 },
  async (messages) => {
    // Process all messages in the batch.
    for (const msg of messages) {
      try {
        await processPayload(msg.payload)
        await msg.ack()
      } catch (err) {
        await msg.nack(err instanceof Error ? err.message : 'unknown error')
      }
    }
  },
)

Batch semantics:

  • batchSize: number of messages to request per poll (valid range 1–1000).
  • Best-effort: returns 0–N messages per call, never blocks or waits for a full batch. When the queue is empty or throughput-limited, the consumer applies the same exponential backoff as consume (500 ms → 30 s). When messages are returned, backoff resets.
  • Per-message ack/nack: each message in the array has individual ack() and nack(reason) methods. Call them directly to acknowledge or reject each message.
  • Reconnection & backoff: network errors are retried with exponential backoff (500 ms → 30 s), same as consume.

Use consumeBatch when you want to process multiple messages together (e.g. batch writes to a data warehouse) or when you need more control over per-message error handling.

ConsumerOptions

| Option | Type | Description | |---|---|---| | concurrency? | number | Number of messages processed concurrently (default: 1) | | visibilityTimeoutSeconds? | number | How long a message stays invisible while being processed (default: server setting, typically 30 s) | | consumerGroup? | string | Consumer group name for independent message consumption; see Consumer Groups | | signal? | AbortSignal | Signal to abort the consumer loop |


Consumer Groups

Consumer groups enable independent consumption of the same messages by multiple systems. Each group tracks its own delivery state, allowing parallel processing of the same message by different applications without interference.

When a consumer group is specified, the client sends all RPCs scoped to that group and receives all messages enqueued to the topic. Each message is delivered independently to each group. A message is only deleted from the queue when all registered groups have acknowledged it.

Registering a Consumer Group

Consumer groups must be registered on the server before use:

curl -X POST http://localhost:8080/api/topics/orders/consumer-groups \
  -H "Content-Type: application/json" \
  -d '{"consumer_group": "warehouse"}'

Once registered, the group automatically receives all pending messages enqueued before registration (backfill), plus all future messages.

Single-Consumer (Consume)

const consumer = client.consumer('orders', {
  consumerGroup: 'warehouse',
  concurrency: 4,
})

await consumer.consume(async (msg) => {
  console.log(`[warehouse] processing ${msg.id}`)
  // return normally to Ack; throw to Nack
})

Batch Consumer (ConsumeBatch)

await consumer.consumeBatch(
  { batchSize: 50, consumerGroup: 'warehouse' },
  async (messages) => {
    for (const msg of messages) {
      try {
        await processPayload(msg.payload)
        await msg.ack()
      } catch (err) {
        await msg.nack(err instanceof Error ? err.message : 'unknown')
      }
    }
  },
)

The Message Type

interface Message {
  id: string
  topic: string
  payload: Buffer
  metadata: Record<string, string>
  createdAt: Date
  retryCount: number
  ack(): Promise<void>
  nack(reason: string): Promise<void>
}
  • id: The assigned message ID.
  • topic: The topic the message was enqueued on.
  • payload: The message body (as a Buffer).
  • metadata: Arbitrary key-value metadata attached at publish time.
  • createdAt: The timestamp when the message was first enqueued.
  • retryCount: The number of times this message has been nacked (0 on first receive).

msg.ack(): Promise<void>

Permanently removes the message from the queue. When using consume with a handler, ack is called automatically on successful completion — call this directly only when using consumeBatch or managing ack/nack manually.

msg.nack(reason: string): Promise<void>

Returns the message to the queue. reason is stored as the failure string and is visible in the Admin UI.


Authentication

When the server has auth.enabled = true, every RPC call requires a valid JWT. Tokens are issued by the server's HTTP API and expire after 15 minutes.

Using QueueTiAuth (recommended)

The QueueTiAuth helper automatically checks if authentication is required and handles login and token refresh:

import { connect, AdminClient, QueueTiAuth } from '@queue-ti/client'

const auth = await QueueTiAuth.login('http://localhost:8080', 'admin', 'secret')

const client = await connect('localhost:50051', {
  insecure: true,
  token: auth.token ?? undefined,
  tokenRefresher: auth.refresh,
})

const admin = new AdminClient('http://localhost:8080', {
  token: auth.token ?? undefined,
})

The QueueTiAuth helper:

  1. Calls GET /api/auth/status to check if authentication is required
  2. If auth is disabled, returns a no-op instance with a null token
  3. If auth is enabled, calls POST /api/auth/login with the provided credentials
  4. Exposes .token (string or null) for the current JWT and .refresh (arrow function) which satisfies the ConnectOptions.tokenRefresher interface for automatic token refresh

Option 1 — Obtaining a token manually

TOKEN=$(curl -s -X POST http://localhost:8080/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"secret"}' \
  | jq -r '.token')

Option 2 — Automatic refresh with custom fetcher

Pass an initial token and a tokenRefresher callback. The library decodes the JWT exp claim, sleeps until 60 seconds before expiry, and calls your callback to obtain a fresh token. The new token is applied to the next RPC call — no reconnection needed.

async function fetchToken(): Promise<string> {
  const resp = await fetch('http://localhost:8080/api/auth/refresh', {
    method: 'POST',
    headers: { Authorization: `Bearer ${currentToken}` },
  })
  const { token } = await resp.json()
  return token
}

const client = await connect('localhost:50051', {
  insecure: true,
  token: initialToken,
  tokenRefresher: fetchToken,
})

If the refresher returns an error, the library retries with exponential backoff (5 s → 60 s) and logs each failure. RPCs will start failing with Unauthenticated once the token expires, so ensure the refresher can recover.

Option 3 — Manual update

Call client.setToken() to swap the token on a live connection. The new token takes effect on the very next RPC call; no reconnection is needed.

const client = await connect('localhost:50051', {
  insecure: true,
  token: initialToken,
})

// Later, when you have a fresh token:
client.setToken(newToken)

This is useful when token lifecycle is managed externally (e.g. a shared token store, a sidecar, or an existing refresh loop in your application).

Option 4 — Static token (short-lived processes)

For scripts or jobs that complete within the 15-minute token window, a static token is sufficient:

const client = await connect('localhost:50051', {
  insecure: true,
  token: process.env.QUEUETI_TOKEN!,
})

Error Handling

publish wraps the underlying gRPC error and includes the topic name:

try {
  const id = await producer.publish('orders', payload)
} catch (err) {
  // e.g. "publish to topic \"orders\": rpc error: code = Unauthenticated ..."
  console.error(err)
}

consume and consumeBatch only throw errors for programming mistakes (invalid configuration). Network errors and stream interruptions are handled internally via reconnection. A clean shutdown (signal aborted) always resolves normally.

When a stream error occurs, the library logs the error and reconnects with exponential backoff. Your handler is not called during network outages.


Full Example

import { connect } from '@queue-ti/client'

async function main() {
  const initialToken = process.env.QUEUETI_TOKEN || 'your-token-here'

  const client = await connect('localhost:50051', {
    insecure: true,
    token: initialToken,
    tokenRefresher: async () => {
      const resp = await fetch('http://localhost:8080/api/auth/refresh', {
        method: 'POST',
        headers: { Authorization: `Bearer ${initialToken}` },
      })
      const { token } = await resp.json()
      return token
    },
  })

  // Publish one message
  const producer = client.producer()
  const id = await producer.publish('orders', Buffer.from(JSON.stringify({ item: 'book' })), {
    metadata: { source: 'example' },
  })
  console.log('published:', id)

  // Consume until SIGINT
  const abortController = new AbortController()
  process.on('SIGINT', () => {
    console.log('shutting down...')
    abortController.abort()
  })

  const consumer = client.consumer('orders', {
    concurrency: 4,
    signal: abortController.signal,
  })

  await consumer.consume(async (msg) => {
    console.log(`[${msg.id}] ${msg.payload.toString()}`)
    // return normally to Ack
  })

  client.close()
  console.log('consumer stopped')
}

main().catch(console.error)

TypeScript

The library ships with complete TypeScript type definitions. All public APIs are fully typed, including request/response shapes and options.

import { connect, ConnectOptions, ConsumerOptions, PublishOptions, Message } from '@queue-ti/client'