@toolkit-p2p/push
v0.1.0
Published
Real-time alert and push notification system for P2P applications
Maintainers
Readme
@toolkit-p2p/push
Real-time alert and push notification system for P2P applications.
Features
- Priority Queue: Priority-based alert delivery (URGENT, HIGH, NORMAL, LOW)
- TTL Management: Automatic alert expiration (max 48 hours)
- Delivery Retry: Exponential backoff retry logic for failed deliveries
- Mailbox Integration: Seamless integration with @toolkit-p2p/mailbox
- Custom Handlers: Pluggable delivery handlers for custom transport
- Auto-Purge: Configurable automatic cleanup of expired alerts
- Query & Filter: Filter alerts by recipient, sender, status, and priority
- Statistics: Monitoring and performance metrics
- Batch Delivery: Process multiple alerts in a single operation
- Timeout Protection: Configurable delivery timeouts
Installation
pnpm add @toolkit-p2p/pushQuick Start
import { PushService, AlertPriority } from '@toolkit-p2p/push';
// Create a push service
const service = new PushService();
// Send an alert
const alert = await service.send({
to: 'did:key:recipient',
from: 'did:key:sender',
title: 'New Message',
message: 'You have a new message waiting',
priority: AlertPriority.HIGH,
});
console.log(`Alert sent: ${alert.id}`);API Reference
PushService
Constructor
new PushService(mailbox?: MailboxQueue, options?: PushServiceOptions)Options:
maxQueueSize?: number- Maximum alerts in queue (default:10000)maxRetryAttempts?: number- Maximum delivery attempts (default:3)retryDelayMs?: number- Base retry delay (default:5000)deliveryTimeoutMs?: number- Delivery timeout (default:30000)autopurgeIntervalMs?: number- Auto-purge interval (default:60000)autoDeliver?: boolean- Auto-deliver on enqueue (default:true)
Methods
send(options: CreateAlertOptions): Promise<Alert>
Send an alert notification.
const alert = await service.send({
to: 'did:key:recipient',
from: 'did:key:sender',
title: 'Alert Title',
message: 'Alert message content',
priority: AlertPriority.URGENT, // optional
metadata: { category: 'notification' }, // optional
ttl: 3600000, // optional, in milliseconds
});Note: TTL cannot exceed 48 hours (172,800,000ms).
getAlert(alertId: string): Alert | null
Get an alert by ID.
queryAlerts(query: AlertQuery): Alert[]
Query alerts with filters.
const alerts = service.queryAlerts({
to: 'did:key:recipient', // optional
from: 'did:key:sender', // optional
status: AlertStatus.QUEUED, // optional
priority: AlertPriority.HIGH, // optional
limit: 20, // default: 20
offset: 0, // default: 0
});getStats(): AlertStats
Get service statistics.
const stats = service.getStats();
console.log(`Total: ${stats.total}`);
console.log(`Failed: ${stats.failedCount}`);
console.log(`Avg delivery: ${stats.averageDeliveryTime}ms`);
console.log(`Memory: ${stats.memoryUsage} bytes`);onDeliver(handlerName: string, handler: (alert: Alert) => Promise<void>): void
Register a custom delivery handler.
service.onDeliver('websocket', async (alert) => {
await websocket.send(JSON.stringify(alert));
});removeDeliveryHandler(handlerName: string): boolean
Remove a delivery handler.
deliverNext(): Promise<DeliveryResult | null>
Manually deliver the next alert in the queue.
const result = await service.deliverNext();
if (result) {
console.log(`Alert ${result.alertId}: ${result.success ? 'delivered' : 'failed'}`);
}deliverBatch(count: number): Promise<DeliveryResult[]>
Deliver multiple alerts.
const results = await service.deliverBatch(10);
console.log(`Delivered ${results.filter(r => r.success).length} alerts`);purgeExpired(): number
Purge expired alerts. Returns the number of alerts purged.
stop(): void
Stop the service and clean up resources.
AlertQueue
Low-level priority queue for managing alerts.
Constructor
new AlertQueue(maxQueueSize?: number)Methods
enqueue(options: CreateAlertOptions): Alert
Add an alert to the queue.
dequeue(): Alert | null
Dequeue the highest priority alert.
get(alertId: string): Alert | null
Get an alert by ID.
updateStatus(alertId: string, status: AlertStatus, error?: string): boolean
Update alert status.
remove(alertId: string): boolean
Remove an alert from the queue.
query(query: AlertQuery): Alert[]
Query alerts with filters.
purgeExpired(now: number): number
Purge expired alerts.
getStats(): AlertStats
Get queue statistics.
clear(): void
Clear the entire queue.
Types
AlertPriority
enum AlertPriority {
LOW = 'low',
NORMAL = 'normal',
HIGH = 'high',
URGENT = 'urgent',
}AlertStatus
enum AlertStatus {
PENDING = 'pending',
QUEUED = 'queued',
DELIVERING = 'delivering',
DELIVERED = 'delivered',
FAILED = 'failed',
EXPIRED = 'expired',
}Alert
interface Alert {
id: string;
to: string; // Recipient DID
from: string; // Sender DID
priority: AlertPriority;
title: string;
message: string;
metadata?: Record<string, unknown>;
createdAt: number; // Unix timestamp (ms)
ttl: number; // Time-to-live (ms, max 48 hours)
status: AlertStatus;
attempts: number;
lastAttempt?: number;
error?: string;
}CreateAlertOptions
interface CreateAlertOptions {
to: string; // Recipient DID
from: string; // Sender DID
priority?: AlertPriority; // default: NORMAL
title: string;
message: string;
metadata?: Record<string, unknown>;
ttl?: number; // default: 24h, max: 48h
}Examples
Basic Usage
import { PushService, AlertPriority } from '@toolkit-p2p/push';
const service = new PushService();
// Send a high-priority alert
await service.send({
to: 'did:key:alice',
from: 'did:key:bob',
title: 'Security Alert',
message: 'Unusual activity detected on your account',
priority: AlertPriority.HIGH,
ttl: 3600000, // 1 hour
});With Mailbox Integration
import { PushService } from '@toolkit-p2p/push';
import { MailboxQueue } from '@toolkit-p2p/mailbox';
const mailbox = new MailboxQueue();
const service = new PushService(mailbox, {
autoDeliver: true,
maxRetryAttempts: 5,
});
// Alerts will automatically be delivered via mailbox
await service.send({
to: 'did:key:recipient',
from: 'did:key:sender',
title: 'Message',
message: 'Hello!',
});Custom Delivery Handler
import { PushService, type Alert } from '@toolkit-p2p/push';
const service = new PushService(undefined, {
autoDeliver: false,
});
// Register custom handler (e.g., WebSocket, HTTP, etc.)
service.onDeliver('websocket', async (alert: Alert) => {
await websocket.send({
type: 'alert',
...alert,
});
});
// Send alert
await service.send({
to: 'did:key:recipient',
from: 'did:key:sender',
title: 'Notification',
message: 'You have a new notification',
});
// Manually trigger delivery
await service.deliverNext();Query and Filter
// Get all urgent alerts for a recipient
const urgentAlerts = service.queryAlerts({
to: 'did:key:alice',
priority: AlertPriority.URGENT,
status: AlertStatus.QUEUED,
});
// Get recent alerts from a sender
const recentFromBob = service.queryAlerts({
from: 'did:key:bob',
limit: 10,
offset: 0,
});
// Paginate through all queued alerts
const page1 = service.queryAlerts({
status: AlertStatus.QUEUED,
limit: 20,
offset: 0,
});
const page2 = service.queryAlerts({
status: AlertStatus.QUEUED,
limit: 20,
offset: 20,
});TTL Management
// Create service with custom auto-purge interval
const service = new PushService(undefined, {
autopurgeIntervalMs: 30000, // Purge every 30 seconds
});
// Add alert with 5-minute TTL
await service.send({
to: 'did:key:recipient',
from: 'did:key:sender',
title: 'Temporary Alert',
message: 'This will expire in 5 minutes',
ttl: 300000, // 5 minutes
});
// Manually purge expired alerts
const purged = service.purgeExpired();
console.log(`Purged ${purged} expired alerts`);
// Stop service to clean up auto-purge interval
service.stop();Retry Strategy
// Configure retry behavior
const service = new PushService(undefined, {
maxRetryAttempts: 5, // Try up to 5 times
retryDelayMs: 3000, // Start with 3s delay
deliveryTimeoutMs: 45000, // 45s timeout per attempt
});
// The service will automatically retry failed deliveries with exponential backoff:
// Attempt 1: immediate
// Attempt 2: +3s delay
// Attempt 3: +6s delay
// Attempt 4: +12s delay
// Attempt 5: +24s delayBatch Delivery
// Send multiple alerts
for (let i = 0; i < 100; i++) {
await service.send({
to: `did:key:user${i}`,
from: 'did:key:system',
title: 'System Notification',
message: `Notification ${i}`,
});
}
// Process them in batches
const results = await service.deliverBatch(10);
results.forEach((result) => {
if (!result.success) {
console.error(`Failed to deliver ${result.alertId}: ${result.error}`);
}
});Statistics and Monitoring
const stats = service.getStats();
console.log(`Total alerts: ${stats.total}`);
console.log(`By status:`, stats.byStatus);
console.log(`By priority:`, stats.byPriority);
console.log(`Failed deliveries: ${stats.failedCount}`);
console.log(`Average delivery time: ${stats.averageDeliveryTime}ms`);
console.log(`Memory usage: ${(stats.memoryUsage / 1024).toFixed(2)} KB`);How it Works
Priority Queue
Alerts are dequeued in priority order:
Priority Weight
URGENT: 1000
HIGH: 100
NORMAL: 10
LOW: 1Within the same priority, alerts are dequeued FIFO (oldest first).
Delivery Flow
- Enqueue: Alert is added to queue with status
QUEUED - Dequeue: Highest priority alert is selected, status becomes
DELIVERING - Attempt Delivery:
- Try custom delivery handlers first
- Fall back to mailbox delivery
- Handle Result:
- Success: Status becomes
DELIVERED - Failure: Retry with exponential backoff if attempts < max
- Max Retries: Status becomes
FAILED
- Success: Status becomes
Exponential Backoff
Retry delays follow exponential backoff:
delay = baseDelay × 2^(attempts - 1)With default settings (baseDelay=5s, max=60s):
- Attempt 1: immediate
- Attempt 2: +5s
- Attempt 3: +10s
- Attempt 4: +20s (capped at 60s max)
TTL and Expiration
- Default TTL: 24 hours
- Maximum TTL: 48 hours
- Auto-purge: Configurable interval (default: 60s)
- Alerts are removed when
now >= createdAt + ttl
Mailbox Integration
When a mailbox is provided, alerts are automatically converted to mailbox messages:
{
to: alert.to,
from: alert.from,
payload: JSON.stringify({
type: 'alert',
priority: alert.priority,
title: alert.title,
message: alert.message,
metadata: alert.metadata,
}),
ttlSec: alert.ttl / 1000,
}Performance Considerations
- Memory: All alerts stored in-memory (Map structure)
- Queue Operations: O(n log n) for dequeue (sorting), O(1) for enqueue
- Purge: O(n) for expired alert cleanup
- Delivery: O(handlers) per alert
- Query: O(n) with filtering
Best practices:
- Set appropriate TTLs to limit memory usage
- Use auto-purge to clean up expired alerts
- Monitor queue size with
getStats() - Use batch delivery for high-volume scenarios
- Configure retry limits based on delivery requirements
Testing
Run tests:
pnpm testTest coverage:
- 26 test cases
- AlertQueue: enqueue, dequeue, priority ordering, query, purge, statistics
- PushService: send, query, custom handlers, retry logic, batch delivery, auto-purge
- Edge cases: TTL enforcement, max queue size, priority ordering, retry exhaustion
License
MIT
Author
Aaron Rosenthal
