@uploadista/event-emitter-durable-object
v0.1.0
Published
Durable Object event emitter for Uploadista
Downloads
2,073
Readme
@uploadista/event-emitter-durable-object
Cloudflare Durable Objects-based event emitter for Uploadista. Provides globally consistent event emission with real-time WebSocket coordination.
Overview
The Durable Objects event emitter uses Cloudflare Durable Objects for strongly-consistent event emission. Perfect for:
- Edge Deployment: Events coordinated globally at Cloudflare edge
- Real-Time Coordination: Strong consistency across all operations
- WebSocket Integration: Native persistent connection support
- Global Subscribers: Serve events to clients worldwide
- Transactional Events: ACID guarantees for complex workflows
Installation
npm install @uploadista/event-emitter-durable-object
# or
pnpm add @uploadista/event-emitter-durable-objectPrerequisites
- Cloudflare Workers with Durable Objects enabled
@cloudflare/workers-typesfor type definitions- Durable Objects bindings in
wrangler.toml
Quick Start
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
import type { UploadEvent } from "@uploadista/core/types";
import { Effect } from "effect";
export interface Env {
EVENT_EMITTER_DO: DurableObjectNamespace;
}
export default {
async fetch(request: Request, env: Env) {
const program = Effect.gen(function* () {
// Event emitter is available
});
return Effect.runPromise(
program.pipe(
Effect.provide(
uploadEventEmitterDurableObjectStore({
durableObject: env.EVENT_EMITTER_DO,
})
)
)
);
},
};Features
- ✅ Strong Consistency: ACID properties for events
- ✅ Global Edge: Events coordinated at 300+ edge locations
- ✅ WebSocket Native: Built-in persistent connections
- ✅ Real-Time: Sub-10ms event delivery globally
- ✅ Transactional: Multiple operations as one unit
- ✅ Durable: Events persisted automatically
API Reference
Main Exports
uploadEventEmitterDurableObjectStore(config): Layer<UploadEventEmitter>
Creates an Effect layer for emitting upload events via Durable Objects.
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
const layer = uploadEventEmitterDurableObjectStore({
durableObject: env.EVENT_EMITTER_DO,
});makeEventEmitterDurableObjectStore<T>(config): EventEmitter<T>
Factory function for typed event emitter.
const emitter = makeEventEmitterDurableObjectStore<CustomEvent>({
durableObject: env.EVENT_EMITTER_DO,
});Available Operations
emit(key: string, event: T): Effect<void>
Emit event to all subscribers globally.
const program = Effect.gen(function* () {
yield* emitter.emit("upload:abc123", {
type: "completed",
duration: 45000,
size: 1048576,
});
});subscribe(key: string, connection): Effect<void>
Subscribe a WebSocket connection to events.
const program = Effect.gen(function* () {
yield* emitter.subscribe("upload:abc123", wsConnection);
});unsubscribe(key: string): Effect<void>
Unsubscribe from events.
const program = Effect.gen(function* () {
yield* emitter.unsubscribe("upload:abc123");
});Configuration
Basic Setup in wrangler.toml
name = "uploadista-worker"
main = "src/index.ts"
[[durable_objects.bindings]]
name = "EVENT_EMITTER_DO"
class_name = "EventEmitterDurableObject"
[env.production]
durable_objects = { bindings = [{name = "EVENT_EMITTER_DO", class_name = "EventEmitterDurableObject"}] }Worker Environment
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
export interface Env {
EVENT_EMITTER_DO: DurableObjectNamespace;
}
export default {
async fetch(request: Request, env: Env) {
const program = Effect.gen(function* () {
// Use event emitter
});
return Effect.runPromise(
program.pipe(
Effect.provide(
uploadEventEmitterDurableObjectStore({
durableObject: env.EVENT_EMITTER_DO,
})
)
)
);
},
};Examples
Example 1: Real-Time Upload Tracking
Server emits progress, clients receive globally:
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
import type { UploadEvent } from "@uploadista/core/types";
const trackUploadProgress = (uploadId: string) =>
Effect.gen(function* () {
// Emit start event
yield* emitter.emit(uploadId, {
type: "started",
timestamp: new Date().toISOString(),
});
// Simulate progress
for (let i = 0; i <= 100; i += 25) {
yield* Effect.sleep("1 second");
yield* emitter.emit(uploadId, {
type: "progress",
progress: i / 100,
bytesReceived: Math.floor((i / 100) * 1048576),
timestamp: new Date().toISOString(),
});
}
// Emit completion
yield* emitter.emit(uploadId, {
type: "completed",
timestamp: new Date().toISOString(),
duration: 4000,
});
});Client-side (browser, anywhere globally):
const ws = new WebSocket("wss://uploadista.example.com/events");
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === "progress") {
updateProgressBar(message.progress);
} else if (message.type === "completed") {
showSuccess("Upload complete!");
}
};Example 2: Coordinated Multi-Step Workflow
const executeUploadWorkflow = (uploadId: string) =>
Effect.gen(function* () {
// Step 1: Validate
yield* emitter.emit(uploadId, {
type: "workflow",
step: "validating",
details: "Checking file format...",
});
// Step 2: Process
yield* emitter.emit(uploadId, {
type: "workflow",
step: "processing",
details: "Resizing images...",
});
// Step 3: Store
yield* emitter.emit(uploadId, {
type: "workflow",
step: "storing",
details: "Uploading to R2...",
});
// Completion
yield* emitter.emit(uploadId, {
type: "workflow",
step: "completed",
details: "All done!",
});
});Example 3: Global WebSocket Broadcast
// Upgrade HTTP to WebSocket
if (request.headers.get("Upgrade") === "websocket") {
const { 0: client, 1: server } = new WebSocketPair();
const uploadId = new URL(request.url).searchParams.get("uploadId");
const program = Effect.gen(function* () {
// Subscribe this client
yield* emitter.subscribe(uploadId, server);
});
await Effect.runPromise(program);
return new Response(null, { status: 101, webSocket: client });
}Performance
| Operation | Latency | Range | |-----------|---------|-------| | emit() | 5-10ms | Global | | subscribe() | 1-3ms | Global | | unsubscribe() | 1-3ms | Global | | Message delivery | 10-50ms | Any edge location |
Strong consistency: All subscribers see same events in same order.
Limits & Quotas
| Limit | Value | |-------|-------| | Events per object | Unlimited | | Simultaneous WebSockets | 128 per object | | Storage | 128 MB per object | | Event size | 512 KB recommended |
Partition large event streams across multiple objects if needed.
Best Practices
1. Use Consistent Event Keys
// Good: Specific upload ID
"upload:abc123"
"flow:job:xyz"
"user:upload:456"
// Avoid: Generic
"events", "status", "updates"2. Handle WebSocket Lifecycle
// Client connects
yield* emitter.subscribe(uploadId, wsConnection);
// Client disconnects
ws.addEventListener("close", () => {
yield* emitter.unsubscribe(uploadId);
});3. Structure Events Clearly
interface UploadEvent {
type: "started" | "progress" | "completed" | "error";
timestamp: string;
progress?: number;
error?: string;
metadata?: Record<string, any>;
}Deployment
Cloudflare Workers Deployment
# Deploy to Cloudflare
wrangler publish
# To specific environment
wrangler publish --env productionGitHub Actions
name: Deploy
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: cloudflare/wrangler-action@v3
with:
apiToken: ${{ secrets.CLOUDFLARE_API_TOKEN }}Monitoring
Track Event Emission
const emitWithMetrics = (key: string, event: UploadEvent) =>
Effect.gen(function* () {
const start = Date.now();
yield* emitter.emit(key, event);
const duration = Date.now() - start;
console.log(`Event emitted: ${key} (${duration}ms)`);
});Monitor Durable Objects
Use Cloudflare Dashboard:
- "Durable Objects" in Workers analytics
- Monitor storage usage
- Track request rate
Troubleshooting
"Durable Object not found"
Ensure binding defined in wrangler.toml:
[[durable_objects.bindings]]
name = "EVENT_EMITTER_DO"
class_name = "EventEmitterDurableObject"WebSocket Disconnections
Implement reconnect logic:
const reconnectWebSocket = () => {
ws = new WebSocket(`wss://${host}/events?uploadId=${uploadId}`);
ws.onopen = () => {
console.log("Reconnected");
};
ws.onclose = () => {
setTimeout(reconnectWebSocket, 1000);
};
};Storage Exceeds 128MB
Partition events across multiple objects:
const getObjectId = (uploadId: string) => {
// Deterministic partitioning
const hash = uploadId.charCodeAt(0);
return `events-${hash % 10}`;
};Integration Patterns
With KV Cache
// Emit to Durable Objects
yield* emitter.emit(uploadId, event);
// Also cache in KV
yield* kv.put(`event:${uploadId}:latest`, JSON.stringify(event));With R2 Storage
// Emit event
yield* emitter.emit(uploadId, { type: "completed" });
// Store result in R2
await env.R2.put(`uploads/${uploadId}/result.json`, resultData);Related Packages
- @uploadista/core - Core types
- @uploadista/kv-store-cloudflare-do - Durable Objects KV
- @uploadista/adapters-hono - Hono for Workers
- @uploadista/event-emitter-websocket - WebSocket emitter
License
See LICENSE in the main repository.
See Also
- EVENT_SYSTEM.md - Architecture guide
- Cloudflare Durable Objects - Official docs
- Server Setup Guide - Deployment guide
- WebSocket Emitter - Alternative for single-region
