@kcharfi/orchestrator-client-node
v1.1.0
Published
Node.js client for Task Orchestrator with auto work requesting and concurrent task processing
Maintainers
Readme
Distributed Task Orchestrator Node.js Client
Lean Node.js client library for interacting with the OnlineCNC Task Orchestrator.
Features
✅ Auto Work Requesting: Workers automatically request new tasks after completing current ones
✅ Concurrent Task Processing: Configure workers to handle multiple tasks simultaneously
✅ Automatic Heartbeats: Workers send regular heartbeats with status information
✅ Graceful Shutdown: Clean disconnection with active task tracking
✅ File Storage: Upload/download files via NATS to multiple storage providers (MinIO, S3, Proxy)
✅ TypeScript Support: Full type definitions included
Installation
npm install @onlinecnc/orchestrator-client-nodeQuick Start
Simple Worker with Auto Work Requesting
import { TaskWorker } from '@onlinecnc/orchestrator-client-node';
const worker = new TaskWorker({
natsUrl: 'nats://localhost:4222',
workerId: 'my-worker',
autoRequestWork: true, // Auto request work (default: true)
maxConcurrentTasks: 3 // Handle up to 3 tasks at once (default: 1)
});
await worker.connect();
// Register handler and start auto work - easiest method
await worker.registerAndRequestWork('my-task', async (execution) => {
console.log('Processing:', execution.inputs);
return { result: 'success' };
});
// Worker will now automatically request new tasks after each completionUsage
Publisher Client
Use the publisher to create tasks and resolve promises:
import { TaskPublisher } from '@onlinecnc/orchestrator-client-node';
const publisher = new TaskPublisher({
natsUrl: 'nats://localhost:4222'
});
await publisher.connect();
// Create a simple task
const taskId = await publisher.createTask({
handler: 'send-email',
metadata: { recipient: '[email protected]' }
});
// Create a promise
const promiseId = await publisher.createPromise();
// Create a task with dependencies
const { taskId: taskId2, resultPromiseIds } = await publisher.createTaskWithDependencies(
'process-data',
{ inputData: promiseId }, // dependencies
['processedData', 'summary'], // result keys
{
scheduling: { max_retries: 3 },
metadata: { priority: 'high' }
}
);
// Resolve a promise
await publisher.resolvePromise(promiseId, { data: 'some value' });
await publisher.disconnect();Worker Client
Basic Worker Setup
import { TaskWorker } from '@onlinecnc/orchestrator-client-node';
const worker = new TaskWorker({
natsUrl: 'nats://localhost:4222',
workerId: 'worker-001',
capabilities: { region: 'us-east' },
// Auto work requesting (enabled by default)
autoRequestWork: true,
// Maximum concurrent tasks (default: 1)
maxConcurrentTasks: 5
});
await worker.connect();Register Handlers and Start Auto Work
// Register task handlers
worker.registerHandler('send-email', async (execution) => {
console.log(`Sending email to: ${execution.inputs.recipient}`);
await new Promise(resolve => setTimeout(resolve, 1000));
return { messageId: 'msg-123', sent: true };
});
worker.registerHandler('process-data', async (execution) => {
const inputData = execution.inputs.inputData;
const processed = { ...inputData, processed: true };
const summary = `Processed ${Object.keys(inputData).length} fields`;
return {
processedData: processed,
summary: summary
};
});
// Start automatic work requesting for all registered handlers
await worker.startAutoWork(['send-email', 'process-data']);
// Worker will now continuously request and process tasksAlternative: Manual Work Requesting
// For manual control over work requests
const worker = new TaskWorker({
natsUrl: 'nats://localhost:4222',
autoRequestWork: false // Disable auto requesting
});
await worker.connect();
worker.registerHandler('my-task', async (execution) => {
// Task logic
return { result: 'done' };
});
// Manually request work when needed
await worker.requestWork(['my-task']);Worker Status and Control
// Get current worker status
const status = worker.getStatus();
console.log(`Active tasks: ${status.activeTasks}/${status.maxConcurrentTasks}`);
console.log(`Auto work: ${status.autoRequestWork}`);
console.log(`Handlers: ${status.registeredHandlers.join(', ')}`);
// Control auto work requesting
worker.stopAutoWork(); // Stop automatic requests
await worker.startAutoWork(); // Restart for all registered handlersConfiguration Options
TaskWorker Config
interface OrchestratorClientConfig {
natsUrl: string; // NATS server URL
workerId?: string; // Unique worker ID (auto-generated if not provided)
capabilities?: Record<string, any>; // Worker capabilities metadata
autoRequestWork?: boolean; // Enable automatic work requesting (default: true)
maxConcurrentTasks?: number; // Maximum concurrent tasks (default: 1)
}API Reference
TaskPublisher
connect(): Connect to NATSdisconnect(): Disconnect from NATScreateTask(request): Create a new taskcreatePromise(value?, type?): Create a new promiseresolvePromise(promiseId, value): Resolve a promiseresolvePromises(resolutions): Batch resolve promisescreateTaskWithDependencies(handler, dependencies, resultKeys, options?): Helper for complex task creation
TaskWorker
Connection Management
connect(): Connect to NATS and start listeningdisconnect(): Disconnect from NATS
Handler Management
registerHandler(name, handler): Register a task handler functionregisterAndRequestWork(name, handler): Register handler and start auto work
Work Management
requestWork(handlers): Manually request work for specific handlersstopAutoWork(): Stop automatic work requestinggetStatus(): Get current worker status
Task Execution
publishProgress(progress): Manually publish task progresspublishCompletion(completion): Manually publish task completion
Worker Status
interface WorkerStatus {
workerId: string;
activeTasks: number;
maxConcurrentTasks: number;
autoRequestWork: boolean;
registeredHandlers: string[];
autoRequestHandlers: string[];
}File Storage
Upload and download files through the ObjectStorageBrokerService via NATS. Supports multiple storage providers (MinIO, AWS S3, Proxy servers) with automatic URL generation and secure access.
Quick Upload
import { uploadFile } from '@onlinecnc/orchestrator-client-node';
// Upload a file - simple case
const result = await uploadFile(file, 'my-bucket', {
workerId: 'ai-worker',
natsUrl: 'nats://localhost:4222'
});
if (result.success) {
console.log(`File uploaded to ${result.bucket}/${result.objectKey}`);
} else {
console.error('Upload failed:', result.error);
}FileStorageClient
For multiple operations, use the FileStorageClient:
import { FileStorageClient } from '@onlinecnc/orchestrator-client-node';
const client = new FileStorageClient({
natsUrl: 'nats://localhost:4222',
defaultWorkerId: 'ai-worker',
defaultTimeout: 30000
});
await client.connect();
// Upload with progress tracking
const uploadResult = await client.uploadFile(file, 'proxy-ai-files', {
objectKey: 'custom/path/file.txt',
tags: { project: 'test', version: '1.0' },
onProgress: (progress) => {
console.log(`Progress: ${Math.round(progress.loaded/progress.total*100)}%`);
}
});
// Get download URL
const downloadResult = await client.downloadFile(
uploadResult.bucket!,
uploadResult.objectKey!
);
await client.disconnect();File-like Objects
The client accepts various file-like objects:
// Browser File object
const fileInput = document.getElementById('file');
await uploadFile(fileInput.files[0]);
// Blob
const blob = new Blob(['Hello world'], { type: 'text/plain' });
await uploadFile(blob);
// Node.js - Custom file object
const file = {
size: buffer.length,
type: 'application/json',
name: 'data.json',
async arrayBuffer() { return buffer.buffer; }
};
await uploadFile(file);Storage Providers
The system automatically routes to different storage providers based on bucket names:
- Proxy Provider:
proxy-*buckets → JWT token-based proxy server - MinIO Provider:
*-worker-filesbuckets → Direct MinIO presigned URLs - AWS S3 Provider:
prod-*buckets → AWS S3 presigned URLs
Worker permissions determine which buckets and providers are accessible.
File Storage Types
interface FileUploadOptions {
bucket?: string; // Target bucket (optional)
objectKey?: string; // Custom object path (optional)
workerId?: string; // Worker identity
timeout?: number; // Request timeout
tags?: Record<string, string>; // File metadata
onProgress?: (progress) => void; // Progress callback
}
interface FileStorageConfig {
natsUrl: string; // NATS server URL
defaultWorkerId?: string; // Default worker ID
defaultTimeout?: number; // Default timeout
}Types
All TypeScript interfaces are exported for type safety:
TaskCreationRequest,TaskExecution,TaskProgress,TaskCompletionWorkerHandlerRequest,OrchestratorClientConfig,TaskHandlerFileUploadOptions,FileDownloadOptions,FileStorageConfigUploadRequest,UploadResponse,DownloadRequest,DownloadResponse
Examples
See the examples/ directory for complete working examples:
simple-worker.js- Basic worker with auto work requestingworker-example.js- Advanced worker with multiple handlers and concurrent processingfile-upload-example.js- File upload/download with multiple storage providers
