npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pgstream

v0.1.2

Published

persistent FIFO queue for Postgres

Downloads

7

Readme

pgstream

This Node.js library lets you use PostgreSQL for event streaming. It gives you:

  • Persistent, queue-like (FIFO) data structures
  • Non-destructive consumption (via consumer offsets)
  • Full durability of messages and consumer offsets
  • Transactional publish and consume via knex.js
  • Low to moderate throughput (expect ~100 msg/s)

Install

npm install pgstream knex

Produce messages

import { knex } from 'knex';
import { Admin, ProducerBuilder, PendingMessage } from 'pgstream';
const db = knex({
    client: 'pg',
    connection: process.env.POSTGRES_URL
});
// Prepare the necessary tables - this is safe to run multiple times:
const admin = new Admin(db);
await admin.install();
await admin.createStream('myevents');
// Create our producer, now that it has a place to write to:
const producer = new ProducerBuilder()
    .withKnex(db)
    .stream('myevents')
    .build();
await producer.produce(new PendingMessage(
    Buffer.from('hello, world!'),
    { fooheader: 'bar' }
));

How do I produce a message in an existing database transaction?

// db is a Knex instance
db.transaction(async (trx) => {
    const producer = new ProducerBuilder()
        .inTransaction(trx)
        .stream('myevents')
        .build();
    // and now produce as usual
});

How do I produce a message into a non-knex transaction?

(TODO: Describe how to implement a custom TransactionProvider.)

Consume messages

import { knex } from 'knex';
import { Admin, ProducerBuilder, PendingMessage } from 'pgstream';
const db = knex({
    client: 'pg',
    connection: process.env.POSTGRES_URL
});
// We also ensure the stream exists in the consumer code - this way, it doesn't matter
//  which part you run first, producer or consumer.
const admin = new Admin(db);
await admin.install();
await admin.createStream('myevents');
// Let's receive messages from the producer!
const consumer = new ConsumerBuilder()
        .withKnex(db)
        .stream('myevents')
        .name('myconsumer1')
        .handler(async (msg, trx) => {
            // If you want, you can use trx, which is a Knex transaction that's started
            //  with isolation level set to DB session default.
            console.log(msg);
            // When the Promise returned by the handler fulfills, the consumer's
            //  offset is advanced by 1, so it will subsequently get the next message.
            // On the other hand, if it rejects, the entire transaction is rolled
            //  back and the handler is re-run with the same message again.
            // This is effectively one-time message processing.
        }).build();
        await consumer.run();
        // Some time later, perhaps:
        // consumer.destroy();

Offsets are tracked per consumer name. If you run several consumers with the same name at the same time, only one will be active due to locking. This ensures true FIFO semantics, but disallows parallelism - consumer groups, as found in other, more advanced platforms, are not supported.

In messaging terms, this means that every consumer is exclusive and has a prefetch count of 1. Therefore, the only benefit from running multiple instances of a given consumer is failover.

How do I get a SERIALIZABLE transaction in my handler?

Configure the Knex pool to change the session characteristics and set a new default isolation level:

  • https://knexjs.org/guide/#aftercreate
  • https://www.postgresql.org/docs/current/sql-set-transaction.html

(TODO: Provide a complete example.)

Use cases

Use this library when your application already uses PostgreSQL and you want to avoid introducing another technology such as Apache Kafka. It was created with on-site and home usage in mind, especially for running on edge devices like Raspberry Pi where memory is limited and adding more memory-hungry software (JVM), may be a problem.

Do not use this library for high-throughput systems. The following solutions may be a better fit if you need serious streaming capabilities:

  • Apache Kafka
  • Apache Pulsar
  • RabbitMQ Streams
  • NATS Streaming