npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

consumer-pgmq

v3.3.0

Published

The consumer of Supabase pgmq

Downloads

41

Readme

About

This project is a consumer of Supabase/Postgresql queue(using pgmq extension) to simplify the process of consuming messages.

Features

  • Consumer message from Supabase queue. PS: instructions to setup https://supabase.com/blog/supabase-queues
  • Consumer message from Postgresql queue. PS: instructions to setup https://github.com/pgmq/pgmq
  • Support for both read and pop consume types
    • Read consume type is when the consumer gets the message and the message is not deleted from queue until the callback is executed with success.
    • Pop consume type is when the consumer gets the message and the message is deleted from queue.
  • Support for both Supabase and Postgresql
  • Support for both visibility time and pool size
  • Support to control the limit of consumers running at once. PS: is focus on Postgres queue driver, is helpful when you have a weak database where no allow to have a lot of connections.

Installation

  • Using pnpm
pnpm install consumer-pgmq
  • Using npm
npm install consumer-pgmq
  • Using yarn
yarn add consumer-pgmq

Options

  • queueName: The name of the queue.
  • visibilityTime: The time in seconds that the message will be invisible to other consumers. PS:
    • Your handler must finish in this time or the message will be visible again to other consumers.
    • Is used too to abort the message if the handler takes too long to finish. For example, if you set visibilityTime to 15 seconds and your handler didnt finish in 15 seconds the handler will be aborted and the message will be visible again to other consumers.
  • consumeType: The type of consume. Can be "read" or "pop"
    • Read consume type is when the consumer gets the message and the message is not deleted from queue until the callback is executed with success.
    • Pop consume type is when the consumer gets the message and delete from queue in the moment get the message.
  • poolSize: The number of consumers. PS: this is the number of consumers that will be created to consume the messages and if you use read consume type, the pool size is the number of messages will get at the same time.
  • timeMsWaitBeforeNextPolling: The time in milliseconds to wait before the next polling
  • enabledPolling: The enabled polling. PS: if true, the consumer will poll the message, if false, the consumer will consume the message one time and stop. PS: is required to the versions more than 1.0.5.
  • queueNameDlq: The name of the dead letter queue. PS: recommended to set the same name of the queue, but suffix with '_dlq'. For example: messages_dlq
  • totalRetriesBeforeSendToDlq: The total retries before send to dlq. For example: if you set totalRetriesBeforeSendToDlq equal 2, the message will be sent to dlq if the handler fails 2 times, so the third time the message will be sent to dlq and remove the main queue to avoid infinite retries.
  • enableControlConsumer: The enable control consumer. PS: if true, you can control the number of consumers running at once, because your Postgres database is weak and can be a problem to have a lot of connections.
    • Warning:
      • You need to execute SQL to create the table 'workers' and you can use schema.sql file for it.
      • If you need to increase the number of workers you can insert new registers on table 'workers'.
      • Each register on table workers will represent a process running as consumer(worker). PS: status column can be 'idle' or 'working'. The idle means is waiting to be allocate by a consumer and the working means was allocated by a consumer.

Extra points to know when use the dlq feature

  • The dead letter queue no work If you setted the consumerType option with value 'pop', because the pop get the message and remove from queue at same time, so if failed when you are processing you lose the message.
  • Recommendation no set lower value to the option 'visibilityTime' if you are using the dead letter queue feature. For example: set visibilityTime value lower than 30 seconds, because if the message wasn't delete and the message be available again the consumer application can consume the message again.
  • On Postgresql queue driver when you enable the option 'isCustomQueueImplementation', means you created the custom table to work as queue. PS: in this case you need to use the schema.sql file to create a queue table, if you dont want jobs and jobs_dlq as table name you can change in the schema.sql file.

Events

  • finish: When the message is consumed with success
  • abort-error: When the message is aborted
  • error: When an error occurs

Examples how to use

  • Consuming messages from Supabase queue
import { config } from "dotenv"
config()

import { SupabaseQueueDriver, Consumer } from "consumer-pgmq"
import { createClient, SupabaseClient } from '@supabase/supabase-js';


const supabase = createClient(
    // @ts-ignore
    process.env.SUPABASE_URL,
    process.env.SUPABASE_ANON_KEY,
    {
        db: {
            schema: 'pgmq_public'
        }
    }
);

const supabaseQueueDriver = new SupabaseQueueDriver(
    supabase as unknown as SupabaseClient
)


import timersPromises from "node:timers/promises";

async function start() {
    for (let i = 0; i < 200; i++) {
        await supabase.rpc("send", {
            queue_name: "subscriptions",
            message: { "message": `Message triggered at ${Date.now()}` }
        });
    }
    console.log("Total messages sent: ", 200)

    const consumer = new Consumer(
        {
            queueName: 'subscriptions',
            visibilityTime: 30,
            consumeType: "read",
            poolSize: 8,
            timeMsWaitBeforeNextPolling: 1000,
            enabledPolling: true,
            queueNameDlq: "subscriptions_dlq",
            totalRetriesBeforeSendToDlq: 2
        },
        async function (message: { [key: string]: any }, signal): Promise<void> {
            try {
                console.log(message)
                const url = "https://jsonplaceholder.typicode.com/todos/1";
                await timersPromises.setTimeout(100, null, { signal });
                console.log("Fetching data...");
                const response = await fetch(url, { signal });
                const todo = await response.json();
                console.log("Todo:", todo);
            } catch (error: any) {
                if (error.name === "AbortError") {
                    console.log("Operation aborted");
                } else {
                    console.error("Error:", error);
                }
            }
        },
        supabaseQueueDriver
    );

    consumer.on('finish', (message: { [key: string]: any }) => {
        console.log('Consumed message =>', message);
    });

    consumer.on("abort-error", (err) => {
        console.log("Abort error =>", err)
    })

    consumer.on('error', (err: Error) => {
        if (err.message.includes("TypeError: fetch failed")) {
            console.log(err)
            process.exit(1);
        }
        console.error('Error consuming message:', err.message);
    });

    consumer.start();

}

start()
  • Consuming messages from Postgresql queue
import { config } from "dotenv"
config()

import Consumer from '../src/consumer';
import PostgresQueueDriver from '../src/queueDriver/PostgresQueueDriver';

import { Client } from 'pg'

async function start() {

    const pgClient = new Client({
        host: process.env.POSTGRES_HOST,
        database: process.env.POSTGRES_DATABASE,
        password: process.env.POSTGRES_PASSWORD,
        port: Number(process.env.POSTGRES_PORT),
        user: process.env.POSTGRES_USER,
        ssl: false,
    })

    await pgClient.connect()


    const postgresQueueDriver = new PostgresQueueDriver(
        pgClient, "pgmq"
    )

    const consumer = new Consumer(
        {
            queueName: 'subscriptions',
            visibilityTime: 30,
            consumeType: "read",
            poolSize: 8,
            timeMsWaitBeforeNextPolling: 1000,
            enabledPolling: true,
            queueNameDlq: "subscriptions_dlq",
            totalRetriesBeforeSendToDlq: 2
        },
        async function (message: { [key: string]: any }, signal): Promise<void> {
            try {
                console.log(message)
                const url = "https://jsonplaceholder.typicode.com/todos/1";
                await timersPromises.setTimeout(100, null, { signal });
                console.log("Fetching data...");
                const response = await fetch(url, { signal });
                const todo = await response.json();
                console.log("Todo:", todo);
            } catch (error: any) {
                if (error.name === "AbortError") {
                    console.log("Operation aborted");
                } else {
                    console.error("Error:", error);
                }
            }
        },
        postgresQueueDriver
    );

    consumer.on('finish', (message: { [key: string]: any }) => {
        console.log('Consumed message =>', message);
    });

    consumer.on("abort-error", (err) => {
        console.log("Abort error =>", err)
    })

    consumer.on('error', (err: Error) => {
        if (err.message.includes("TypeError: fetch failed")) {
            console.log(err)
            process.exit(1);
        }
        console.error('Error consuming message:', err.message);
    });

    consumer.start();

}

start()
  • Consuming messages from Postgresql queue has option enabledControlConsumer and isCustomQueueImplementation enable on Postgres queue driver:
import { config } from "dotenv"
config()

import Consumer from '../src/consumer';
import PostgresQueueDriver from '../src/queueDriver/PostgresQueueDriver';

import { Client } from 'pg'

async function start() {

    const pgClient = new Client({
        host: process.env.POSTGRES_HOST,
        database: process.env.POSTGRES_DATABASE,
        password: process.env.POSTGRES_PASSWORD,
        port: Number(process.env.POSTGRES_PORT),
        user: process.env.POSTGRES_USER,
        ssl: true,
    })

    await pgClient.connect()


    const postgresQueueDriver = new PostgresQueueDriver(
        pgClient, "public", true
    )

    const consumer = new Consumer(
        {
            queueName: 'jobs',
            visibilityTime: 30,
            consumeType: "read",
            poolSize: 8,
            timeMsWaitBeforeNextPolling: 1000,
            enabledPolling: true,
            queueNameDlq: "jobs_dlq",
            totalRetriesBeforeSendToDlq: 2,
            enableControlConsumer: true
        },
        async function (message: { [key: string]: any }, signal): Promise<void> {
            console.log(message)
        },
        postgresQueueDriver
    );

    for (let index = 0; index < 100; index++) {
        await postgresQueueDriver.send("jobs", {
            message: `Message ${index}`,
            id: index
        })
    }

    consumer.on("send-to-dlq", (message: { [key: string]: any }) => {
        console.log("Send to DLQ =>", message)
    })

    consumer.on('error', (err: Error) => {
        console.error('Error consuming message:', err.message);
    });

    await consumer.start();

    process.on("SIGINT", async () => {
        await consumer.freeConsumer()
        await pgClient.end()
        process.exit(0)
    })

}

start()