@bernierllc/data-ingestion-orchestrator
v0.4.0
Published
Orchestration service for managing data ingestion jobs and workflows
Downloads
176
Readme
@bernierllc/data-ingestion-orchestrator
Orchestration service for managing data ingestion jobs and workflows with scheduling and coordination.
Overview
The data ingestion orchestrator coordinates data collection from multiple sources using different connector types (API, DOWNLOAD, FTP, SCRAPE). It manages job scheduling, workflow execution, error handling, and integrates with file storage.
Installation
npm install @bernierllc/data-ingestion-orchestratorUsage
Basic Job Registration
import { DataIngestionOrchestrator } from '@bernierllc/data-ingestion-orchestrator';
import { ConnectorAccessMethod } from '@bernierllc/data-connector-base';
const orchestrator = new DataIngestionOrchestrator();
// Register an API job
await orchestrator.registerJob({
id: 'daily-api-fetch',
name: 'Daily API Data Fetch',
accessMethod: ConnectorAccessMethod.API,
connectorConfig: {
apiType: 'rest',
baseURL: 'https://api.example.com',
auth: {
type: 'bearer',
token: 'your-token'
}
},
schedule: '0 0 * * *', // Daily at midnight
enabled: true,
metadata: {
store: true // Automatically store results
}
});
// Execute job manually
const result = await orchestrator.executeJob('daily-api-fetch');
console.log(`Job ${result.status}:`, result.data);Different Connector Types
// API Connector
await orchestrator.registerJob({
id: 'api-job',
accessMethod: ConnectorAccessMethod.API,
connectorConfig: {
apiType: 'rest',
baseURL: 'https://api.example.com'
}
});
// Download Connector
await orchestrator.registerJob({
id: 'download-job',
accessMethod: ConnectorAccessMethod.DOWNLOAD,
connectorConfig: {
baseURL: 'https://example.com'
}
});
// FTP Connector
await orchestrator.registerJob({
id: 'ftp-job',
accessMethod: ConnectorAccessMethod.FTP,
connectorConfig: {
protocol: 'sftp',
host: 'sftp.example.com',
username: 'user',
privateKey: '...'
}
});
// Web Scraper
await orchestrator.registerJob({
id: 'scrape-job',
accessMethod: ConnectorAccessMethod.SCRAPE,
connectorConfig: {
url: 'https://example.com',
headless: true
}
});Workflow Execution
// Execute a multi-step workflow
const workflow = {
id: 'data-pipeline',
name: 'Data Collection Pipeline',
steps: [
{
id: 'fetch-api',
type: 'connector',
config: {
accessMethod: ConnectorAccessMethod.API,
apiType: 'rest',
baseURL: 'https://api.example.com'
}
},
{
id: 'transform',
type: 'transform',
config: {
transform: (data) => {
// Transform data
return data.map(item => ({ ...item, processed: true }));
}
},
dependsOn: ['fetch-api']
},
{
id: 'store',
type: 'store',
config: {
filename: 'processed-data.json',
contentType: 'application/json'
},
dependsOn: ['transform']
}
]
};
const results = await orchestrator.executeWorkflow(workflow);Job Management
// List all jobs
const jobs = orchestrator.listJobs();
console.log(`Registered jobs: ${jobs.length}`);
// Get job status
const job = orchestrator.getJobStatus('daily-api-fetch');
console.log(`Job enabled: ${job?.enabled}`);
// Remove job
await orchestrator.removeJob('daily-api-fetch');Scheduled Jobs
// Schedule a job to run every hour
await orchestrator.registerJob({
id: 'hourly-sync',
name: 'Hourly Data Sync',
accessMethod: ConnectorAccessMethod.API,
connectorConfig: { /* ... */ },
schedule: '0 * * * *', // Every hour
enabled: true
});
// Jobs run automatically based on schedule
// You can also execute manually:
const result = await orchestrator.executeJob('hourly-sync');Integration with Connectors
The orchestrator automatically creates the appropriate connector based on accessMethod:
ConnectorAccessMethod.API→@bernierllc/api-connectorConnectorAccessMethod.DOWNLOAD→@bernierllc/download-connectorConnectorAccessMethod.FTP→@bernierllc/ftp-connectorConnectorAccessMethod.SCRAPE→@bernierllc/web-scraper
Dependencies
@bernierllc/queue-manager- Job queue management@bernierllc/scheduler-service- Cron scheduling@bernierllc/file-handler- File storage@bernierllc/api-connector- API connector@bernierllc/download-connector- Download connector@bernierllc/ftp-connector- FTP connector@bernierllc/web-scraper- Web scrapernode-cron- Cron expression parsing
API Reference
DataIngestionOrchestrator
Constructor
new DataIngestionOrchestrator(config?: OrchestratorConfig)Methods
registerJob(job: JobConfig): Promise<void>- Register a new jobexecuteJob(jobId: string): Promise<JobResult>- Execute a jobexecuteWorkflow(workflow: WorkflowConfig): Promise<JobResult[]>- Execute a workflowgetJobStatus(jobId: string): JobConfig | undefined- Get job configurationlistJobs(): JobConfig[]- List all registered jobsremoveJob(jobId: string): Promise<void>- Remove a job
License
Bernier LLC - Limited Use License
