nexflow-caas
v1.0.1
Published
NexFlow CAAS - Cron & Keep-Alive Oracle for Virtuals ACP
Readme
NexFlow CAAS
Cron & Keep-Alive Oracle - A production-grade scheduled job service for the Virtuals ACP ecosystem.
Overview
NexFlow CAAS provides reliable, durable scheduled task execution as a service:
- Scheduled Jobs: Schedule tasks to run at specific times or after delays
- Webhook Delivery: Automatically fires HTTP webhooks when jobs are due
- Retry Logic: Exponential backoff with configurable retry limits
- ACP Integration: Native integration with Virtuals Protocol ACP as a provider agent
Design Guarantees
| Guarantee | Description |
|-----------|-------------|
| At-Least-Once Delivery | Jobs will be delivered at least once. Duplicates are possible on network failures. |
| Max 5 Retries | Failed deliveries retry up to 5 times with exponential backoff (30s → 1h). |
| Idempotent Callbacks Expected | Buyer agents should handle duplicate webhook calls gracefully. |
| Durable Storage | All jobs are persisted to PostgreSQL before acknowledgment. |
| Graceful Shutdown | In-flight batches complete before process termination. |
| Row-Level Locking | FOR UPDATE SKIP LOCKED prevents duplicate processing across workers. |
For Virtuals Builders: Design your callback handlers to be idempotent — use the
jobIdto deduplicate if you receive the same webhook twice.
Architecture
┌─────────────────────────────────────────────────────────────┐
│ NexFlow CAAS │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ API Server │ │ Scheduler │ │
│ │ (Fastify) │ │ Worker │ │
│ │ │ │ │ │
│ │ POST /acp/ │ │ Poll every 60s │ │
│ │ schedule-task │ │ Fetch due jobs │ │
│ │ │ │ Fire webhooks │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ └───────────┬───────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ PostgreSQL │ │
│ │ scheduled_jobs │ │
│ └───────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘Kernel DTO: ScheduleJobRequest
All job creation flows through a single protocol-neutral DTO:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ REST API │ │ ACP Client │ │ Internal │
│ /acp/* │ │ (Virtuals) │ │ (SMF) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ mapLegacy...() │ acpInput...() │ (direct)
▼ ▼ ▼
┌────────────────────────────────────────────────────────────┐
│ ScheduleJobRequest (Kernel DTO) │
│ tenantId | source | callbackUrl | triggerType | payload │
└──────────────────────────┬─────────────────────────────────┘
│
▼
scheduleJobFromKernel()
│
▼
PostgreSQL| Field | Description |
|-------|-------------|
| tenantId | Generic identifier (ACP buyer agent, customer ID, SMF ID) |
| source | Origin: 'ACP', 'HTTP', or 'INTERNAL' |
| callbackUrl | HTTPS webhook URL |
| triggerType | 'AT_TIME' or 'IN_DELAY' |
| payload | JSON payload (max 16 KB) |
| metadata | Optional context (ACP task ID, correlation ID) |
This design enables:
- NexFlow public API → same kernel
- ACP integration → same kernel
- SMF agent → same kernel
Tech Stack
- Runtime: Node.js 20+
- Language: TypeScript (ESM)
- Web Framework: Fastify
- Database: PostgreSQL
- Migrations: node-pg-migrate
- Process Manager: PM2
- Logging: Pino (JSON structured logs)
- Validation: Zod
Quick Start
Prerequisites
- Node.js 20+
- PostgreSQL 14+
- PM2 (for production)
Installation
# Clone and enter directory
cd nexflow-caas
# Install dependencies
npm install
# Copy environment file
cp env.example .env
# Edit .env with your configurationConfiguration
Edit .env with your settings:
# Required
DATABASE_URL=postgresql://user:password@localhost:5432/nexflow_caas
ACP_PRIVATE_KEY=your_wallet_private_key
# Optional (shown with defaults)
PORT=3100
LOG_LEVEL=info
SCHEDULER_POLL_INTERVAL_MS=60000
SCHEDULER_BATCH_SIZE=50
SCHEDULER_MAX_RETRIES=5
WEBHOOK_TIMEOUT_MS=30000Database Setup
# Create database (if needed)
createdb nexflow_caas
# Run migrations
npm run migrateRunning Locally
# Build TypeScript
npm run build
# Start API server (terminal 1)
npm run start:api
# Start scheduler worker (terminal 2)
npm run start:schedulerFor development with auto-reload:
# API server with watch
npm run dev:api
# Scheduler with watch
npm run dev:schedulerRunning with PM2 (Production)
# Build first
npm run build
# Start both processes
npm run pm2:start
# View logs
npm run pm2:logs
# Stop all
npm run pm2:stop
# Restart
npm run pm2:restartAPI Reference
Schedule a Task
# Schedule task to run at specific time
curl -X POST http://localhost:3100/acp/schedule-task \
-H "Content-Type: application/json" \
-d '{
"buyer_agent_id": "my-agent-001",
"callback_url": "https://example.com/webhook",
"callback_auth": "secret-token",
"trigger_type": "AT_TIME",
"trigger_time": "2024-12-25T12:00:00Z",
"payload": {
"action": "send_reminder",
"user_id": "user123"
}
}'
# Schedule task with delay (e.g., 5 minutes from now)
curl -X POST http://localhost:3100/acp/schedule-task \
-H "Content-Type: application/json" \
-d '{
"buyer_agent_id": "my-agent-001",
"callback_url": "https://example.com/webhook",
"trigger_type": "IN_DELAY",
"delay_seconds": 300,
"payload": {
"action": "check_status"
}
}'Response:
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "scheduled",
"trigger_time": "2024-12-25T12:00:00.000Z"
}Get Task Status
curl http://localhost:3100/acp/schedule-task/550e8400-e29b-41d4-a716-446655440000Response:
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "PENDING",
"triggerTime": "2024-12-25T12:00:00.000Z",
"attemptCount": 0,
"lastError": null,
"createdAt": "2024-12-20T10:00:00.000Z",
"updatedAt": "2024-12-20T10:00:00.000Z"
}Cancel a Task
curl -X POST http://localhost:3100/acp/schedule-task/550e8400-e29b-41d4-a716-446655440000/cancelResponse:
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "CANCELLED",
"message": "Job cancelled successfully"
}Health Check
curl http://localhost:3100/healthResponse:
{
"status": "healthy",
"version": "1.0.0",
"timestamp": "2024-12-20T10:00:00.000Z",
"checks": {
"database": {
"status": "healthy"
}
},
"responseTime": "5ms"
}Metrics
curl http://localhost:3100/metricsResponse:
{
"timestamp": "2024-12-20T10:00:00.000Z",
"jobs": {
"total": 1500,
"byStatus": {
"pending": 50,
"fired": 1400,
"failed": 30,
"cancelled": 20
}
},
"lastFiredJobAt": "2024-12-20T09:59:00.000Z",
"uptime": 86400,
"memory": {
"heapUsed": 45,
"heapTotal": 65,
"rss": 80
}
}Webhook Payload
When a job triggers, the callback URL receives:
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"triggered_at": "2024-12-25T12:00:00.000Z",
"payload": {
"action": "send_reminder",
"user_id": "user123"
},
"provider_agent_id": "nexflow-caas",
"status": "TRIGGERED"
}Headers:
Content-Type: application/jsonUser-Agent: NexFlow-CAAS/1.0X-NexFlow-Job-Id: <job-id>X-NexFlow-Provider: nexflow-caasAuthorization: Bearer <callback_auth>(if provided)
Retry Behavior
Failed webhook deliveries are retried with exponential backoff:
| Attempt | Delay | |---------|-------| | 1 | 30 seconds | | 2 | 60 seconds | | 3 | 5 minutes | | 4 | 15 minutes | | 5 | 1 hour |
After 5 failed attempts, the job is marked as FAILED.
Database Schema
scheduled_jobs Table
| Column | Type | Description | |--------|------|-------------| | id | UUID | Primary key | | buyer_agent_id | VARCHAR(255) | ACP agent identifier | | callback_url | TEXT | Webhook URL | | callback_auth | TEXT | Optional bearer token | | trigger_type | ENUM | 'AT_TIME' or 'IN_DELAY' | | trigger_time | TIMESTAMPTZ | When to fire | | delay_seconds | INTEGER | Original delay (for IN_DELAY) | | payload | JSONB | Custom payload data | | status | ENUM | PENDING/FIRED/FAILED/CANCELLED | | attempt_count | INTEGER | Number of delivery attempts | | last_error | TEXT | Last error message | | created_at | TIMESTAMPTZ | Creation time | | updated_at | TIMESTAMPTZ | Last update time |
Indexes:
(status, trigger_time)- Partial index on PENDING jobs(buyer_agent_id)- For agent-specific queries
Project Structure
nexflow-caas/
├── src/
│ ├── api/
│ │ ├── index.ts # Fastify server entry
│ │ └── routes/
│ │ ├── acpSchedule.ts
│ │ ├── health.ts
│ │ └── metrics.ts
│ ├── scheduler/
│ │ ├── index.ts # Scheduler process entry
│ │ └── worker.ts # Job processing logic
│ ├── services/
│ │ ├── jobService.ts # Job CRUD operations
│ │ └── webhookService.ts # HTTP delivery
│ ├── config/
│ │ ├── env.ts # Environment config
│ │ ├── db.ts # Database pool
│ │ └── acp.ts # ACP configuration
│ ├── acp/
│ │ └── acpClient.ts # ACP SDK integration
│ ├── types/
│ │ ├── job.ts # Job types + schemas
│ │ └── acp.ts # ACP types
│ └── utils/
│ ├── logger.ts # Pino logger
│ └── error.ts # Error classes
├── migrations/
│ ├── 1704067200000_create-scheduled-jobs-table.cjs
│ └── 1704067200001_add-job-metrics-table.cjs
├── ecosystem.config.cjs # PM2 config
├── migrate.config.cjs # Migration config
├── tsconfig.json
├── package.json
└── README.mdnpm Scripts
| Script | Description |
|--------|-------------|
| build | Compile TypeScript to dist/ |
| dev:api | Run API with hot reload |
| dev:scheduler | Run scheduler with hot reload |
| start:api | Start API (production) |
| start:scheduler | Start scheduler (production) |
| migrate | Run pending migrations |
| migrate:down | Rollback last migration |
| migrate:create | Create new migration |
| pm2:start | Start via PM2 |
| pm2:stop | Stop PM2 processes |
| pm2:restart | Restart PM2 processes |
| pm2:logs | View PM2 logs |
AWS EC2 Deployment
For t4g.micro/t4g.medium instances:
# Install Node.js 20
curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash -
sudo apt-get install -y nodejs
# Install PM2 globally
sudo npm install -g pm2
# Clone and setup
git clone <repo> /opt/nexflow-caas
cd /opt/nexflow-caas
npm install
npm run build
# Configure environment
cp env.example .env
# Edit .env with production values
# Run migrations
npm run migrate
# Start with PM2
npm run pm2:start
# Save PM2 startup script
pm2 save
pm2 startupVirtuals ACP Integration
NexFlow CAAS is a Virtuals Protocol ACP provider offering scheduled task execution services to buyer agents.
Architecture
┌─────────────────┐ ┌──────────────────────────────────────────┐
│ Buyer Agent │ │ NexFlow CAAS │
│ (ACP Client) │ │ │
│ │ │ ┌─────────────┐ ┌──────────────────┐ │
│ Request Task │────▶│ │ ACP Client │──▶│ Kernel DTO │ │
│ │ │ │ (adapter) │ │ (ScheduleJob │ │
│ │ │ └─────────────┘ │ Request) │ │
│ │ │ └────────┬─────────┘ │
│ │ │ │ │
│ │ │ ┌────────▼─────────┐ │
│ ◀── Webhook ───│─────│────────────────────│ Scheduler │ │
│ │ │ │ Worker │ │
└─────────────────┘ │ └──────────────────┘ │
└──────────────────────────────────────────┘How It Works
- ACP tasks are received by
src/acp/acpClient.ts - Mapped to the protocol-neutral
ScheduleJobRequestkernel DTO - Processed through
scheduleJobFromKernel()— the single entry point - Persisted to PostgreSQL
- Scheduler worker polls for due jobs and delivers webhooks
- ACP completion is confirmed back to the protocol
Job Offering
| Field | Value | |-------|-------| | Name | Schedule Task Execution (Cron & Keep-Alive) | | Price | 0.1 $VIRTUAL per task | | Capabilities | schedule_at_time, schedule_with_delay, webhook_callback, retry_on_failure, status_tracking, cancellation, idempotent_delivery |
Multi-Tenant Design
CAAS is multi-tenant from day one:
- tenantId: Generic identifier (ACP buyer agent ID, external customer ID, SMF ID)
- source: Tracks origin (
ACP,HTTP,INTERNAL) - Future: NexFlow public API and SMF will use the same kernel
Documentation
See docs/acp-application.md for:
- Full ACP application form content
- Input field definitions
- Example payloads
- Callback payload format
License
MIT
Support
For issues and feature requests, please open a GitHub issue.
