svelte-pg-live-query
v0.1.1
Published
Typed PostgreSQL `LISTEN/NOTIFY` helpers for SvelteKit `query.live(...)` remote functions.
Readme
svelte-pg-live-query
Typed PostgreSQL LISTEN/NOTIFY helpers for SvelteKit query.live(...) remote functions.
This package gives you a single factory API:
createPgLiveQuery<Channels>(options?)
You define your channel-to-payload types once, then use pgLiveQuery.fn(...) inside query.live(...).
Install
npm install svelte-pg-live-query pg-listenWhat you get
- Strongly-typed payloads per channel
- Shared Postgres listener scaffolding under the hood
- Easy
onInit+onNotifiedlive query model - Configurable listener connection/options via factory
- Built-in heartbeat events to keep idle streams alive
Payload shape
Recommended notify payload shape from SQL trigger:
{
"operation": "INSERT | UPDATE | DELETE",
"table": "User",
"row": { "...": "row data" }
}Using a single row field is easier to type than new/old branching.
SQL trigger example
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
payload JSON;
BEGIN
payload := json_build_object(
'operation', TG_OP,
'table', TG_TABLE_NAME,
'row', CASE WHEN TG_OP = 'DELETE' THEN row_to_json(OLD) ELSE row_to_json(NEW) END
);
PERFORM pg_notify(TG_ARGV[0], payload::text);
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;Basic usage
// src/routes/some.remote.ts
import { query } from '$app/server';
import { createPgLiveQuery } from 'svelte-pg-live-query';
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE';
table: string;
row: { id: number; email: string; name: string | null };
};
};
const pgLiveQuery = createPgLiveQuery<Channels>();
export const usersLive = query.live(
pgLiveQuery.fn({
channel: 'user_changes',
onInit: async () => {
return { ok: true };
},
onNotified: async ({ payload }) => {
// payload is fully typed by channel
console.log(payload.operation, payload.row.id);
return { ok: true };
}
})
);Stream event envelope
Every emitted item is wrapped in a typed envelope:
type PgLiveQueryValue<T> =
| { type: 'init'; data: T }
| { type: 'update'; data: T }
| { type: 'heartbeat'; data: null }
| { type: 'error'; data: { message: string } };init: first successful value fromonInitupdate: successful value fromonNotifiedheartbeat: emitted when stream is idleerror: emitted whenonInitoronNotifiedthrows
Factory options
const pgLiveQuery = createPgLiveQuery<Channels>({
debug: true,
debounceMs: 50,
heartbeatMs: 5000,
postgres: {
connectionString: process.env.DATABASE_URL,
subscriberConfig: {
// any pg-listen subscriber options except connectionString
retryInterval: 200,
retryTimeout: 5000
},
onError: (error) => {
console.error('listener error', error);
}
}
});Option reference
debug?: booleandebounceMs?: numberheartbeatMs?: number(default:5000, set0to disable)postgres?:connectionString?: stringsubscriberConfig?: Omit<pg-listen config, 'connectionString'>onError?: (error: Error) => void
fn(...) options
pgLiveQuery.fn({
channel: 'user_changes',
id: 'users-stream',
debug: true,
heartbeatMs: 5000,
onInit: async ({ input }) => {
return null;
},
onNotified: async ({ input, payload }) => {
return payload;
},
onServer: ({ input, payload }) => {
// optional side effects/logging
}
});Return semantics
onInitreturn value is firstyield- each
onNotifiedreturn value is yielded to clients - return
SKIPto ignore a notification and not emit an update - thrown errors from
onInit/onNotifiedare emitted as{ type: 'error', data: { message } }
import { SKIP } from 'svelte-pg-live-query';SKIP example (ignore unrelated updates)
Use SKIP when a notification is valid but not relevant to the current live-query input.
import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE';
table: string;
row: { id: number; email: string };
};
};
const pgLiveQuery = createPgLiveQuery<Channels>();
export const userByIdLive = query.live(
pgLiveQuery.fn({
channel: 'user_changes',
onInit: async ({ input }: { input: { id: number } }) => {
return { id: input.id };
},
onNotified: async ({ input, payload }) => {
// Ignore notifications for other users
if (payload.row.id !== input.id) return SKIP;
return payload.row;
}
})
);Database setup requirement
This library does not create database triggers for you. You must create your own Postgres NOTIFY triggers/channels that match the channel names and payload shape used in your createPgLiveQuery config.
PostgreSQL trigger docs:
Prisma migration example (create + apply trigger)
Below is a minimal example using Prisma migrations to create NOTIFY triggers.
1) Create an empty migration
npx prisma migrate dev --name add_user_changes_listener --create-onlyThis creates a new migration folder with migration.sql that you can edit before applying.
2) Edit migration.sql
-- Function that sends one consistent payload shape
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
payload JSON;
BEGIN
payload := json_build_object(
'operation', TG_OP, -- operation type: INSERT | UPDATE | DELETE
'table', TG_TABLE_NAME, -- table name that fired the trigger
'row', CASE -- row shape returned to your live query payload
WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
ELSE row_to_json(NEW)
END
);
-- Channel name your app listens to (must match `pgLiveQuery.fn({ channel: ... })`)
PERFORM pg_notify('user_changes', payload::text);
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Trigger name (you choose this; useful for identifying/dropping later)
CREATE TRIGGER user_notify_changes
AFTER INSERT OR UPDATE OR DELETE ON "User"
FOR EACH ROW
EXECUTE FUNCTION notify_table_change();3) Apply migration to database
npx prisma migrate dev4) Match channel + payload type in your live query
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE'; // from payload.operation
table: string; // from payload.table
row: { id: number; email: string }; // from payload.row
};
};import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';
import { prisma } from '$lib/server/db'; // your Prisma client path
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE';
table: string;
row: { id: number; email: string; name: string | null };
};
};
const pgLiveQuery = createPgLiveQuery<Channels>();
export const userByIdLive = query.live(
pgLiveQuery.fn({
channel: 'user_changes', // must match pg_notify('user_changes', ...)
onInit: async ({ input }: { input: { id: number } }) => {
return prisma.user.findUnique({ where: { id: input.id } });
},
onNotified: async ({ input, payload }) => {
if (payload.row.id !== input.id) return SKIP; // ignore unrelated updates
return prisma.user.findUnique({ where: { id: input.id } });
}
})
);