@nvana-dharma/pubsub-receiver-server
v0.0.2
Published
A TypeScript library for creating Express servers that receive and process Google Cloud Pub/Sub push messages
Readme
@nvana-dharma/pubsub-receiver-server
A TypeScript library for creating Express servers that receive and process Google Cloud Pub/Sub push messages.
Features
- Type-safe: Built with TypeScript for full type safety
- Flexible: Customizable decode and handle functions for your specific use case
- Simple: Easy-to-use API with minimal configuration
- Secure: Optional verification token support
- Health checks: Built-in health check endpoint
Installation
npm install @nvana-dharma/pubsub-receiver-serveror
pnpm add @nvana-dharma/pubsub-receiver-serverUsage
Basic Example
import { PubSubPushServer, decodeDataInMessage } from "@nvana-dharma/pubsub-receiver-server"
// Define your message type
interface MyMessage {
userId: string
action: string
}
// Create a server instance
const server = new PubSubPushServer<MyMessage>({
// Decode the base64 message data
decode: (msg) => {
const data = decodeDataInMessage(msg)
return JSON.parse(data) as MyMessage
},
// Handle the decoded message
handle: async (decoded, context) => {
console.log("Processing message:", decoded)
console.log("Message ID:", context.raw.messageId)
// Your business logic here
},
// Health check
healthCheck: async () => true,
// Optional: Add a verification token
verificationToken: process.env.PUBSUB_TOKEN,
// Logger
log: console,
})
// Start the server
await server.start(8080, "/pubsub/push")Advanced Example with Custom Logger
import { PubSubPushServer, PushMessage } from "@nvana-dharma/pubsub-receiver-server"
interface OrderEvent {
orderId: string
status: "pending" | "completed" | "cancelled"
timestamp: number
}
const server = new PubSubPushServer<OrderEvent>({
decode: async (msg: PushMessage) => {
const data = Buffer.from(msg.data, "base64").toString("utf-8")
const parsed = JSON.parse(data)
// Validate the message structure
if (!parsed.orderId || !parsed.status) {
throw new Error("Invalid message format")
}
return parsed as OrderEvent
},
handle: async (event, context) => {
// Access attributes from the original message
const attributes = context.raw.attributes || {}
console.log(`Processing order ${event.orderId}`)
console.log(`Status: ${event.status}`)
console.log(`Attributes:`, attributes)
// Your business logic here
// - Update database
// - Send notifications
// - Trigger other events
},
healthCheck: async () => {
// Check if your dependencies are healthy
// e.g., database connection, external APIs, etc.
return true
},
log: {
info: (msg, ...args) => console.log(`[INFO] ${msg}`, ...args),
warn: (msg, ...args) => console.warn(`[WARN] ${msg}`, ...args),
error: (msg, ...args) => console.error(`[ERROR] ${msg}`, ...args),
},
})
// Start server
await server.start(8080, "/webhooks/orders")
// Graceful shutdown
process.on("SIGTERM", async () => {
console.log("Shutting down...")
await server.stop()
process.exit(0)
})API Reference
PubSubPushServer<T>
The main server class for handling Pub/Sub push messages.
Constructor Options
interface ServerOptions<T> {
decode: DecodeFn<T>
handle: HandleFn<T>
healthCheck: () => Promise<boolean>
verificationToken?: string
log: ILogger
}- decode: Function to decode the base64 message into your domain type
- handle: Function to process the decoded message
- healthCheck: Function that returns whether the service is healthy
- verificationToken: Optional token for authenticating Pub/Sub requests
- log: Logger instance (must implement
info,warn,errormethods)
Methods
start(port: number, path?: string): Promise<void>
Starts the Express server.
- port: The port to listen on
- path: The URL path for the push endpoint (default:
/pubsub)
stop(): Promise<void>
Stops the server gracefully.
Types
PushMessage
The structure of a message from Pub/Sub:
type PushMessage = {
data: string // base64-encoded
attributes?: Record<string, string>
messageId: string
publishTime?: string
}PushRequestBody
The POST request body from Pub/Sub:
type PushRequestBody = {
message?: PushMessage
subscription?: string
}Utility Functions
decodeDataInMessage(message: PushMessage): string
Helper function to decode the base64 data field into a UTF-8 string.
Setting up Pub/Sub Push Subscription
- Create a Pub/Sub topic in Google Cloud Console
- Create a push subscription pointing to your server endpoint
- Add the verification token as a query parameter in the push endpoint URL:
https://your-domain.com/pubsub/push?token=YOUR_SECRET_TOKEN
Error Handling
- Messages that fail to process return a 500 status code, causing Pub/Sub to retry
- Malformed or empty messages are acknowledged (204 status) to prevent retries
- Invalid tokens return 401 status code
Health Checks
The server automatically exposes a health check endpoint at /healthz (or customize via the start method). This endpoint:
- Returns 200 OK if the
healthCheckfunction returnstrue - Returns 503 Service Unavailable if the
healthCheckfunction returnsfalse
License
MIT
Author
Brendan
