@quarry-systems/drift-event-listener
v0.1.0-alpha.1
Published
Event listener and webhook plugin for Drift
Maintainers
Readme
MCG Event Listener Plugin
A powerful event listening plugin for Managed Cyclic Graph (MCG) that enables nodes to pause and wait for external events from webhooks, polling, pub/sub systems, or custom sources.
Features
- ✅ Webhook Listening: Wait for HTTP webhook calls
- ✅ Polling: Poll endpoints until conditions are met
- ✅ Pub/Sub: Subscribe to topics and wait for messages
- ✅ Custom Listeners: Implement your own event sources
- ✅ Event Filtering: Validate events before accepting
- ✅ Event Transformation: Process event payloads
- ✅ Timeouts: Configure maximum wait times
- ✅ Callbacks: React to events and timeouts
Installation
npm install @quarry-systems/mcg-event-listenerQuick Start
Plugin-Based Approach
import { ManagedCyclicGraph } from '@quarry-systems/managed-cyclic-graph';
import { mcgEventListenerPlugin, webhook, pubsub } from '@quarry-systems/mcg-event-listener';
const graph = new ManagedCyclicGraph()
.use(mcgEventListenerPlugin)
.node('waitForPayment', {
type: 'eventnode',
meta: {
event: webhook('/payment-complete', {
secret: 'my-webhook-secret',
secretHeader: 'X-Webhook-Secret'
})
}
})
.node('waitForInventory', {
type: 'eventnode',
meta: {
event: pubsub('inventory-updated')
}
})
.build();Action-Based Approach
import { ManagedCyclicGraph } from '@quarry-systems/managed-cyclic-graph';
import { createEventAction, poll } from '@quarry-systems/mcg-event-listener';
const graph = new ManagedCyclicGraph()
.node('submitJob', {
execute: [
// ... submit job logic
]
})
.node('waitForCompletion', {
execute: [
createEventAction('waitForCompletion',
poll('https://api.example.com/job/status',
(response) => response.status === 'completed',
5000 // Poll every 5 seconds
)
)
]
})
.node('processResults', {
execute: [
// ... process results
]
})
.build();API Reference
Helper Functions
webhook(path: string, options?: WebhookConfig)
Create a webhook listener that waits for HTTP requests.
webhook('/payment-complete')
webhook('/order-update', {
method: 'POST',
secret: 'my-secret',
secretHeader: 'X-Webhook-Secret',
port: 3000
})poll(url: string, condition: (response) => boolean, intervalMs?: number)
Create a polling listener that checks an endpoint repeatedly.
poll(
'https://api.example.com/job/123',
(response) => response.status === 'done',
2000 // Check every 2 seconds
)pubsub(topic: string, provider?: 'memory' | 'redis' | 'custom')
Create a pub/sub listener that waits for messages on a topic.
pubsub('order-completed')
pubsub('inventory-updated', 'memory')custom(listen: (ctx) => Promise<any>)
Create a custom event listener with your own logic.
custom(async (ctx) => {
// Your custom event listening logic
return await myEventSource.waitForEvent();
})Configuration Options
interface EventListenerConfig {
source: 'webhook' | 'poll' | 'pubsub' | 'custom';
sourceConfig: WebhookConfig | PollConfig | PubSubConfig | CustomConfig;
timeoutMs?: number; // Timeout in milliseconds
filter?: (event: any) => boolean; // Event filter function
transform?: (event: any, ctx) => any; // Event transformation
storePath?: string; // Custom storage path
onEvent?: (event: any, ctx) => void; // Event callback
onTimeout?: (ctx) => void; // Timeout callback
}Webhook Configuration
interface WebhookConfig {
path: string; // Webhook endpoint path
method?: 'GET' | 'POST' | ...; // HTTP method
port?: number; // Port to listen on
secret?: string; // Validation secret
secretHeader?: string; // Header name for secret
}Poll Configuration
interface PollConfig {
url: string; // URL to poll
intervalMs: number; // Polling interval
maxAttempts?: number; // Max poll attempts
method?: 'GET' | 'POST'; // HTTP method
headers?: Record<string, string>; // Request headers
condition: (response: any) => boolean; // Success condition
}Pub/Sub Configuration
interface PubSubConfig {
topic: string; // Topic/channel name
provider?: 'memory' | 'redis' | 'custom';
subscribe?: (topic, callback) => unsubscribe; // Custom subscriber
}Examples
Payment Processing Workflow
const graph = new ManagedCyclicGraph()
.use(mcgEventListenerPlugin)
.node('createOrder', {
execute: [/* create order */]
})
.node('waitForPayment', {
type: 'eventnode',
meta: {
event: {
...webhook('/stripe-webhook'),
timeoutMs: 300000, // 5 minute timeout
filter: (event) => event.type === 'payment_intent.succeeded',
onTimeout: (ctx) => {
console.log('Payment timeout, canceling order');
}
}
}
})
.node('fulfillOrder', {
execute: [/* fulfill order */]
})
.build();Job Status Polling
const graph = new ManagedCyclicGraph()
.use(mcgEventListenerPlugin)
.node('submitJob', {
execute: [
{
id: 'submit',
run: async (ctx) => {
const jobId = await submitJob();
ctx.data.jobId = jobId;
return ctx;
}
}
]
})
.node('pollStatus', {
type: 'eventnode',
meta: {
event: poll(
'https://api.example.com/jobs/${data.jobId}',
(response) => response.status === 'completed',
5000
)
}
})
.node('processResults', {
execute: [/* process results */]
})
.build();Multi-Event Coordination
const graph = new ManagedCyclicGraph()
.use(mcgEventListenerPlugin)
.node('waitForInventory', {
type: 'eventnode',
meta: {
event: pubsub('inventory-available')
}
})
.node('waitForApproval', {
type: 'eventnode',
meta: {
event: pubsub('manager-approved')
}
})
.node('processOrder', {
execute: [/* both events received, process order */]
})
.build();Event Transformation
const graph = new ManagedCyclicGraph()
.use(mcgEventListenerPlugin)
.node('waitForWebhook', {
type: 'eventnode',
meta: {
event: {
...webhook('/data-update'),
transform: (event, ctx) => {
// Extract only what you need
return {
userId: event.body.user.id,
timestamp: event.body.timestamp,
changes: event.body.changes
};
},
onEvent: (event, ctx) => {
console.log('Received event:', event);
}
}
}
})
.build();Custom Event Source
import { custom } from '@quarry-systems/mcg-event-listener';
const graph = new ManagedCyclicGraph()
.use(mcgEventListenerPlugin)
.node('waitForCustomEvent', {
type: 'eventnode',
meta: {
event: custom(async (ctx) => {
// Connect to your custom event source
const connection = await connectToEventSource();
return new Promise((resolve) => {
connection.on('myEvent', (data) => {
resolve(data);
});
});
})
}
})
.build();Metadata Storage
Event metadata is stored in the context at data.events.{nodeId} by default:
{
data: {
events: {
waitForPayment: {
source: 'webhook',
receivedAt: 1701234567890,
event: { /* event payload */ },
timedOut: false
}
}
}
}Publishing Events
For testing or integration, you can publish events to the global event bus:
import { globalEventBus } from '@quarry-systems/mcg-event-listener';
// Publish to webhook
globalEventBus.publish('webhook:/payment-complete', {
method: 'POST',
headers: { 'X-Webhook-Secret': 'my-secret' },
body: { orderId: '123', status: 'paid' }
});
// Publish to pub/sub
globalEventBus.publish('order-completed', {
orderId: '123',
total: 99.99
});Use Cases
- Payment Processing: Wait for payment confirmations
- Job Completion: Poll job status until done
- Inventory Management: React to stock updates
- Approval Workflows: Wait for human approval
- External Integrations: Respond to third-party events
- Real-time Updates: React to live data changes
- Async Operations: Coordinate distributed systems
- Event-Driven Architecture: Build reactive workflows
Best Practices
- Always Set Timeouts to prevent infinite waits
- Use Filters to validate events before processing
- Transform Events to extract only needed data
- Add Callbacks for logging and monitoring
- Secure Webhooks with secrets and validation
- Poll Responsibly with appropriate intervals
- Handle Timeouts gracefully with fallback logic
Integration with Other Plugins
Combine with Timer plugin for retry logic:
import { mcgEventListenerPlugin, poll } from '@quarry-systems/mcg-event-listener';
import { mcgTimerPlugin, sleep } from '@quarry-systems/mcg-timer';
const graph = new ManagedCyclicGraph()
.use(mcgEventListenerPlugin)
.use(mcgTimerPlugin)
.node('pollJob', {
type: 'eventnode',
meta: {
event: poll(url, condition, 5000),
timeoutMs: 30000
}
})
.node('waitBeforeRetry', {
type: 'timernode',
meta: { timer: sleep(10000) }
})
.edge('pollJob', 'waitBeforeRetry', ['timedOut'])
.edge('waitBeforeRetry', 'pollJob', 'any')
.build();License
ISC
