@devcoons/pushlane
v0.1.0
Published
- **Durable** notifications stored in your SQL DB (MySQL/MariaDB) - **Realtime push** to the browser via **SSE** (no polling, no WS complexity) - **Redis Pub/Sub** as the fan‑out channel - Tiny API surface you wire once, then call `createAndPublish` fr
Readme
pushlane — Durable in‑app notifications with SSE (MySQL/MariaDB + Redis)
What it is
- Durable notifications stored in your SQL DB (MySQL/MariaDB)
- Realtime push to the browser via SSE (no polling, no WS complexity)
- Redis Pub/Sub as the fan‑out channel
- Tiny API surface you wire once, then call
createAndPublishfrom your domain code
Quick start (production‑style)
- Install and set env vars
- Create a single pushlane instance and run
init()once - Add three API routes:
stream,unread,mark-read - Use a tiny client hook to fetch unread + open SSE
- Call
createAndPublish()whenever you create a domain event for a user
1) Install & env
npm i pushlaneMYSQL_HOST=your-mysql-host
MYSQL_USER=your-db-user
MYSQL_PASSWORD=your-db-pass
MYSQL_DATABASE=your-db-name
REDIS_HOST=your-redis-host
REDIS_PASSWORD=MariaDB vs MySQL
- MariaDB: prefer
payload LONGTEXT(see DB notes below). - MySQL 8.0: you can use
payload JSONif you like (no code change needed).
2) Create a single pushlane instance
Put this in your app lib folder. It resolves the userId from your auth (replace with your logic).
import { createPushlane } from "pushlane";
import { cookies } from "next/headers";
// -----------------------------------------------------
// Replace with your actual auth/session integration
// -----------------------------------------------------
async function resolveUserIdFromAuth(): Promise<string | null> {
// Example: read "uid" cookie set by your login flow
return cookies().get("uid")?.value ?? null;
}
export const pushlane = createPushlane({
ensureSchema: true, // safe to run on every cold start (idempotent)
mysql: {
host: process.env.MYSQL_HOST!,
user: process.env.MYSQL_USER!,
password: process.env.MYSQL_PASSWORD!,
database: process.env.MYSQL_DATABASE!,
},
redis: {
host: process.env.REDIS_HOST!,
password: process.env.REDIS_PASSWORD || undefined,
},
});
// Fire-and-forget schema check (no-op after first run)
void pushlane.init();
export async function resolveUserId() {
return resolveUserIdFromAuth();
}3) API routes (App Router)
Keep these on Node runtime (SSE needs long‑lived connections).
export const runtime = "nodejs";
import { pushlane, resolveUserId } from "@/app/_lib/pushlane";
export async function GET(req: Request) {
// @ts-expect-error NextRequest-compatible at runtime
return pushlane.sseHandler(req, resolveUserId);
}export const runtime = "nodejs";
import { pushlane, resolveUserId } from "@/app/_lib/pushlane";
export async function GET(req: Request) {
// @ts-expect-error NextRequest-compatible at runtime
return pushlane.restHandlers.GET_unread(req, resolveUserId);
}export const runtime = "nodejs";
import { pushlane, resolveUserId } from "@/app/_lib/pushlane";
export async function POST(req: Request) {
// @ts-expect-error NextRequest-compatible at runtime
return pushlane.restHandlers.POST_markRead(req, resolveUserId);
}4) Client hook (fetch unread once + open SSE)
This example keeps a local state. Swap with Zustand/Redux if preferred.
"use client";
import { useEffect, useRef, useState } from "react";
export type PushlaneEvent = {
id: string;
kind: string;
createdAt: string;
payload: Record<string, unknown>;
};
export function usePushlane(userId: string | null) {
const [items, setItems] = useState<PushlaneEvent[]>([]);
const esRef = useRef<EventSource | null>(null);
useEffect(() => {
if (!userId) return;
let cancelled = false;
// 1) Initial unread
(async () => {
const r = await fetch(`/api/notifications/unread`, { credentials: "include", cache: "no-store" });
if (!r.ok) return;
const data = await r.json();
if (!cancelled) {
const mapped: PushlaneEvent[] = data.unread.map((n: any) => ({
id: n.id, kind: n.kind, createdAt: n.createdAt, payload: n.payload ?? {},
}));
setItems(mapped);
}
})();
// 2) SSE for realtime
const es = new EventSource(`/api/notifications/stream`, { withCredentials: true });
esRef.current = es;
es.onmessage = (evt) => {
try {
const msg = JSON.parse(evt.data) as PushlaneEvent;
setItems((prev) => prev.some((x) => x.id === msg.id) ? prev : [...prev, msg]);
} catch {}
};
es.onerror = () => {
// optional: toggle a "reconnecting…" banner
};
return () => { cancelled = true; es.close(); esRef.current = null; };
}, [userId]);
async function markRead(ids: string[]) {
if (!userId || ids.length === 0) return;
setItems((prev) => prev.filter((n) => !ids.includes(n.id))); // optimistic
await fetch(`/api/notifications/mark-read`, {
method: "POST",
headers: { "Content-Type": "application/json" },
credentials: "include",
body: JSON.stringify({ ids }),
}).catch(() => {/* optionally refetch unread */});
}
return { items, markRead };
}Usage in a client component
const { items, markRead } = usePushlane(session?.user?.id ?? null);
// render a badge/toast list from `items`5) Publish notifications from your domain code
Call once you’ve committed your domain writes. Keep payloads small.
"use server";
import { pushlane } from "@/app/_lib/pushlane";
import { db } from "@/app/_lib/db"; // your ORM/driver
export async function createPendingAction({
fromUserId, toUserId, actionData,
}: { fromUserId: string; toUserId: string; actionData: Record<string, unknown>; }) {
// 1) domain write
const id = crypto.randomUUID();
await db.pendingAction.create({ data: { id, fromUserId, toUserId, data: actionData } });
// 2) durable notif + push
await pushlane.createAndPublish({
userId: toUserId,
kind: "pending_action.created",
payload: { pendingActionId: id, fromUserId },
});
return id;
}6) DB notes: MySQL vs MariaDB
If you use MariaDB (e.g., mariadb:10.11):
- Use
payload LONGTEXT(MariaDB “JSON” is LONGTEXT under the hood). - Insert without
CAST(? AS JSON)— just pass the stringified JSON. - Optional: enforce JSON validity (only if your MariaDB actually enforces CHECK):
ALTER TABLE `notifications` ADD CONSTRAINT `chk_payload_json` CHECK (JSON_VALID(payload));
If you use MySQL 8.0:
- You can define
payload JSONif you want a strict type. - Still insert without any CAST; the server will validate.
pushlane’s built‑in ensureSchema can create the table/indexes if they’re missing. If you already have the table, you can disable ensureSchema or leave it on (it’s idempotent).
7) Reverse proxy (critical for SSE)
If you deploy behind Nginx/Caddy/Traefik, set no buffering and long timeouts for /api/notifications/stream.
Nginx:
location /api/notifications/stream {
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_read_timeout 1d;
proxy_send_timeout 1d;
proxy_buffering off;
proxy_cache off;
add_header X-Accel-Buffering no;
proxy_pass http://app:3000;
}8) Operational tips
- Keep SSE on Node runtime in Next.js.
- Heartbeat: pushlane sends
: pingevery 20s. Leave it. - File descriptors: raise
nofileif you expect thousands of concurrent streams. - Security: never trust
?userId; resolve from session/cookies on the server (as shown). - Observability: log
SSE SUB <user>and counts for publishes; optional delivery latency metric (publish vs client receipt time).
9) Local smoke test (optional)
If you want a simple local check with Docker:
# Infra
docker run -p 6379:6379 -d redis:7
docker run -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=pushlane -e MYSQL_USER=pushlane -e MYSQL_PASSWORD=pushlane -d mysql:8.0
# Start your Next.js app with env pointing to aboveThen:
- Open
http://localhost:3000/api/notifications/streamin a tab (keeps loading). - Trigger your domain action or curl a publish endpoint you expose.
- Verify
/api/notifications/unreadshows the row; mark it read.
10) Troubleshooting
Node error “RequestInit: duplex option is required when sending a body.”
You only see this inside a custom Node harness that forwards streaming bodies. In Next.js you won’t, but if you write a harness, setinit.duplex = 'half'when constructing aRequestwith a stream body.Publish succeeds, nothing in browser:
- Confirm an SSE tab is open to
/api/notifications/stream(not polling). - Ensure the server resolves the same user as the one you publish to.
redis-cli SUBSCRIBE notify:<userId>should echo the JSON when you publish.
- Confirm an SSE tab is open to
MariaDB syntax error near
CAST(? AS JSON):
Remove the CAST; store as LONGTEXT or use MySQL 8.0 with JSON type.Behind proxy and no updates:
Disable buffering and extend timeouts as in the Nginx snippet.
11) Minimal types
Events delivered to the browser have shape:
type PushlaneEvent = {
id: string;
kind: string;
createdAt: string; // ISO
payload: Record<string, unknown>;
};That’s everything you need to wire pushlane in a real app with production‑ready defaults.
