npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

svelte-pg-live-query

v0.1.1

Published

Typed PostgreSQL `LISTEN/NOTIFY` helpers for SvelteKit `query.live(...)` remote functions.

Readme

svelte-pg-live-query

Typed PostgreSQL LISTEN/NOTIFY helpers for SvelteKit query.live(...) remote functions.

This package gives you a single factory API:

  • createPgLiveQuery<Channels>(options?)

You define your channel-to-payload types once, then use pgLiveQuery.fn(...) inside query.live(...).

Install

npm install svelte-pg-live-query pg-listen

What you get

  • Strongly-typed payloads per channel
  • Shared Postgres listener scaffolding under the hood
  • Easy onInit + onNotified live query model
  • Configurable listener connection/options via factory
  • Built-in heartbeat events to keep idle streams alive

Payload shape

Recommended notify payload shape from SQL trigger:

{
  "operation": "INSERT | UPDATE | DELETE",
  "table": "User",
  "row": { "...": "row data" }
}

Using a single row field is easier to type than new/old branching.

SQL trigger example

CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
  payload JSON;
BEGIN
  payload := json_build_object(
    'operation', TG_OP,
    'table', TG_TABLE_NAME,
    'row', CASE WHEN TG_OP = 'DELETE' THEN row_to_json(OLD) ELSE row_to_json(NEW) END
  );

  PERFORM pg_notify(TG_ARGV[0], payload::text);

  IF (TG_OP = 'DELETE') THEN
    RETURN OLD;
  END IF;

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Basic usage

// src/routes/some.remote.ts
import { query } from '$app/server';
import { createPgLiveQuery } from 'svelte-pg-live-query';

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE';
    table: string;
    row: { id: number; email: string; name: string | null };
  };
};

const pgLiveQuery = createPgLiveQuery<Channels>();

export const usersLive = query.live(
  pgLiveQuery.fn({
    channel: 'user_changes',
    onInit: async () => {
      return { ok: true };
    },
    onNotified: async ({ payload }) => {
      // payload is fully typed by channel
      console.log(payload.operation, payload.row.id);
      return { ok: true };
    }
  })
);

Stream event envelope

Every emitted item is wrapped in a typed envelope:

type PgLiveQueryValue<T> =
  | { type: 'init'; data: T }
  | { type: 'update'; data: T }
  | { type: 'heartbeat'; data: null }
  | { type: 'error'; data: { message: string } };
  • init: first successful value from onInit
  • update: successful value from onNotified
  • heartbeat: emitted when stream is idle
  • error: emitted when onInit or onNotified throws

Factory options

const pgLiveQuery = createPgLiveQuery<Channels>({
  debug: true,
  debounceMs: 50,
  heartbeatMs: 5000,
  postgres: {
    connectionString: process.env.DATABASE_URL,
    subscriberConfig: {
      // any pg-listen subscriber options except connectionString
      retryInterval: 200,
      retryTimeout: 5000
    },
    onError: (error) => {
      console.error('listener error', error);
    }
  }
});

Option reference

  • debug?: boolean
  • debounceMs?: number
  • heartbeatMs?: number (default: 5000, set 0 to disable)
  • postgres?:
    • connectionString?: string
    • subscriberConfig?: Omit<pg-listen config, 'connectionString'>
    • onError?: (error: Error) => void

fn(...) options

pgLiveQuery.fn({
  channel: 'user_changes',
  id: 'users-stream',
  debug: true,
  heartbeatMs: 5000,
  onInit: async ({ input }) => {
    return null;
  },
  onNotified: async ({ input, payload }) => {
    return payload;
  },
  onServer: ({ input, payload }) => {
    // optional side effects/logging
  }
});

Return semantics

  • onInit return value is first yield
  • each onNotified return value is yielded to clients
  • return SKIP to ignore a notification and not emit an update
  • thrown errors from onInit/onNotified are emitted as { type: 'error', data: { message } }
import { SKIP } from 'svelte-pg-live-query';

SKIP example (ignore unrelated updates)

Use SKIP when a notification is valid but not relevant to the current live-query input.

import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE';
    table: string;
    row: { id: number; email: string };
  };
};

const pgLiveQuery = createPgLiveQuery<Channels>();

export const userByIdLive = query.live(
  pgLiveQuery.fn({
    channel: 'user_changes',
    onInit: async ({ input }: { input: { id: number } }) => {
      return { id: input.id };
    },
    onNotified: async ({ input, payload }) => {
      // Ignore notifications for other users
      if (payload.row.id !== input.id) return SKIP;

      return payload.row;
    }
  })
);

Database setup requirement

This library does not create database triggers for you. You must create your own Postgres NOTIFY triggers/channels that match the channel names and payload shape used in your createPgLiveQuery config.

PostgreSQL trigger docs:

Prisma migration example (create + apply trigger)

Below is a minimal example using Prisma migrations to create NOTIFY triggers.

1) Create an empty migration

npx prisma migrate dev --name add_user_changes_listener --create-only

This creates a new migration folder with migration.sql that you can edit before applying.

2) Edit migration.sql

-- Function that sends one consistent payload shape
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
  payload JSON;
BEGIN
  payload := json_build_object(
    'operation', TG_OP,          -- operation type: INSERT | UPDATE | DELETE
    'table', TG_TABLE_NAME,      -- table name that fired the trigger
    'row', CASE                  -- row shape returned to your live query payload
      WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
      ELSE row_to_json(NEW)
    END
  );

  -- Channel name your app listens to (must match `pgLiveQuery.fn({ channel: ... })`)
  PERFORM pg_notify('user_changes', payload::text);

  IF (TG_OP = 'DELETE') THEN
    RETURN OLD;
  END IF;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Trigger name (you choose this; useful for identifying/dropping later)
CREATE TRIGGER user_notify_changes
AFTER INSERT OR UPDATE OR DELETE ON "User"
FOR EACH ROW
EXECUTE FUNCTION notify_table_change();

3) Apply migration to database

npx prisma migrate dev

4) Match channel + payload type in your live query

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE'; // from payload.operation
    table: string;                              // from payload.table
    row: { id: number; email: string };         // from payload.row
  };
};
import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';
import { prisma } from '$lib/server/db'; // your Prisma client path

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE';
    table: string;
    row: { id: number; email: string; name: string | null };
  };
};

const pgLiveQuery = createPgLiveQuery<Channels>();

export const userByIdLive = query.live(
  pgLiveQuery.fn({
    channel: 'user_changes', // must match pg_notify('user_changes', ...)
    onInit: async ({ input }: { input: { id: number } }) => {
      return prisma.user.findUnique({ where: { id: input.id } });
    },
    onNotified: async ({ input, payload }) => {
      if (payload.row.id !== input.id) return SKIP; // ignore unrelated updates
      return prisma.user.findUnique({ where: { id: input.id } });
    }
  })
);