npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@surninsynergy/runpod-orchestrator

v1.3.3

Published

A Redis-backed, multi-instance task orchestrator for Runpod Serverless

Downloads

59

Readme

Runpod Task Orchestrator

A Redis-backed, multi-instance task orchestrator for Runpod Serverless that guarantees terminal outcomes even across crashes and restarts.

Features

  • Multi-instance safe: Run multiple instances in parallel with distributed coordination
  • Persistent state: All job state stored in Redis with atomic transitions
  • Idempotent operations: Safe to retry with same clientJobId
  • Automatic recovery: Recovers orphaned jobs on startup
  • Exponential backoff: Intelligent retry strategy with jitter
  • Event-driven: Real-time progress updates via EventEmitter
  • Metadata support: Pass additional data with jobs for analytics and user tracking
  • TypeScript support: Full type definitions included

Installation

npm install @surninsynergy/runpod-orchestrator ioredis

Quick Start

import { createOrchestrator } from '@surninsynergy/runpod-orchestrator';

const orchestrator = await createOrchestrator({
  redis: { url: process.env.REDIS_URL },
  runpod: { 
    apiKey: process.env.RUNPOD_API_KEY!,
    endpointId: process.env.RUNPOD_ENDPOINT_ID!
  },
  dedupe: { enable: true, useInputHash: true },
});

// Listen for events with runpodStatus
orchestrator.on('progress', ({ clientJobId, status, runpodStatus, metadata }) => {
  console.log('Job progress:', clientJobId, status);
  if (runpodStatus) {
    console.log('Runpod technical data:', runpodStatus);
    console.log('Runpod status:', runpodStatus.status);
  }
});

orchestrator.on('completed', ({ clientJobId, output, runpodStatus, metadata }) => {
  console.log('Job completed:', clientJobId, output);
  if (runpodStatus) {
    console.log('Final Runpod status:', runpodStatus.status);
    console.log('Complete technical data:', runpodStatus);
  }
});

orchestrator.on('failed', ({ clientJobId, error, status, runpodStatus, metadata }) => {
  console.error('Job failed:', clientJobId, error, status);
  if (runpodStatus) {
    console.log('Runpod error details:', runpodStatus.error);
    console.log('Runpod status at failure:', runpodStatus.status);
  }
});

// Submit a job
const { clientJobId } = await orchestrator.submit({
  clientJobId: crypto.randomUUID(),
  input: { prompt: "A cat in space" },
  metadata: { userId: "user-123" }
});

// Wait for result with runpodStatus
const result = await orchestrator.awaitResult(clientJobId, 15 * 60_000);
if (result.status === "COMPLETED") {
  console.log('Result:', result.output);
  console.log('Runpod status:', result.runpodStatus?.status);
  console.log('Technical data:', result.runpodStatus);
} else {
  console.error('Failed:', result.error);
  console.log('Runpod error details:', result.runpodStatus?.error);
}

RunpodStatus - Technical Data Access

The orchestrator now provides access to the complete runpodStatus object from Runpod's API, giving you detailed technical information about your jobs.

What's Available in runpodStatus

The runpodStatus object contains all the technical data returned by Runpod's status API:

interface RunpodJobStatus {
  id: string;           // Runpod job ID
  status: string;       // Current Runpod status (e.g., "IN_QUEUE", "RUNNING", "COMPLETED")
  output?: any;         // Job output (when completed)
  error?: any;          // Error details (when failed)
}

Accessing runpodStatus

The runpodStatus is available in:

  1. Event handlers - Real-time updates during job execution
  2. awaitResult() - Final status when job completes
  3. get() - Current status when querying job state
// Real-time monitoring
orchestrator.on('progress', ({ clientJobId, runpodStatus }) => {
  if (runpodStatus) {
    console.log('Current Runpod status:', runpodStatus.status);
    console.log('Runpod job ID:', runpodStatus.id);
  }
});

// Final result
const result = await orchestrator.awaitResult(jobId);
console.log('Final Runpod status:', result.runpodStatus?.status);
console.log('Complete technical data:', result.runpodStatus);

// Current job state
const job = await orchestrator.get(jobId);
console.log('Current Runpod status:', job?.runpodStatus?.status);

Example: Monitoring Job Performance

orchestrator.on('progress', ({ clientJobId, runpodStatus }) => {
  console.log(`Job ${clientJobId} status:`, runpodStatus?.status);
  console.log(`Runpod job ID:`, runpodStatus?.id);
});

orchestrator.on('completed', ({ clientJobId, runpodStatus }) => {
  console.log(`Job ${clientJobId} completed with status:`, runpodStatus?.status);
  console.log('Complete technical data:', runpodStatus);
});

Example: Error Analysis

orchestrator.on('failed', ({ clientJobId, runpodStatus }) => {
  console.log(`Job ${clientJobId} failed with Runpod status:`, runpodStatus?.status);
  console.log(`Runpod job ID:`, runpodStatus?.id);
  
  if (runpodStatus?.error) {
    console.log('Runpod error details:', runpodStatus.error);
  }
  
  // Log complete technical data for debugging
  console.log('Complete runpodStatus:', runpodStatus);
});

Custom Namespace

You can customize the Redis key namespace to avoid conflicts with other applications:

const orchestrator = await createOrchestrator({
  redis: { url: process.env.REDIS_URL },
  runpod: { 
    apiKey: process.env.RUNPOD_API_KEY!,
    endpointId: process.env.RUNPOD_ENDPOINT_ID!
  },
  namespace: "myapp:runpod:", // Custom namespace
});

This will use keys like myapp:runpod:job:<clientJobId> instead of the default runpod:job:<clientJobId>.

Configuration

interface RunpodOrchestratorConfig {
  instanceId?: string;                 // unique per process; default = hostname:pid:random
  redis: { url?: string } | { client: import("ioredis").Redis };
  runpod: { apiKey: string; endpointId: string };
  namespace?: string;                  // Redis key namespace; default = "runpod:"
  polling: {
    enableStreaming?: boolean;         // try stream() if supported
    initialBackoffMs?: number;         // default 2000
    maxBackoffMs?: number;             // default 10000
    jitterPct?: number;                // default 0.2
    batchSize?: number;                // default 100
  };
  storage: {
    persistInput?: boolean;            // store original input JSON in Redis
    resultTtlSec?: number;             // TTL for terminal jobs (default 7 days)
  };
  dedupe?: { enable?: boolean; useInputHash?: boolean };
  logging?: { debug?: (...args: any[]) => void; info?: (...args: any[]) => void; error?: (...args: any[]) => void };
}

API Reference

submit(options: SubmitOptions)

Submit a new job to the orchestrator.

interface SubmitOptions {
  clientJobId: string;                 // caller-generated UUID
  input: unknown;                      // Runpod endpoint input
  inputHash?: string;                  // optional dedupe key
  metadata?: Record<string, any>;      // optional additional data
}

Returns: Promise<{ clientJobId: string; runpodJobId: string }>

awaitResult(clientJobId: string, timeoutMs?: number)

Wait for a job to complete with optional timeout.

Returns: Promise<{ status: "COMPLETED"|"FAILED"|"TIMED_OUT"|"CANCELED"; output?: any; error?: any; runpodStatus?: any; metadata?: Record<string, any> }>

get(clientJobId: string)

Get current job status and details.

Returns: Promise<JobRecord | null>

cancel(clientJobId: string)

Cancel a running job.

Returns: Promise<void>

recoverAllPending()

Recover all non-terminal jobs (useful on startup).

Returns: Promise<number> (number of recovered jobs)

Events

orchestrator.on('submitted', ({ clientJobId, runpodJobId, metadata }) => {});
orchestrator.on('progress', ({ clientJobId, status, runpodStatus, metadata }) => {});
orchestrator.on('completed', ({ clientJobId, output, runpodStatus, metadata }) => {});
orchestrator.on('failed', ({ clientJobId, error, status, runpodStatus, metadata }) => {});

Metadata Support

The orchestrator supports passing additional metadata with jobs, which is useful for:

  • User tracking: Associate jobs with specific users
  • Analytics: Track campaigns, experiments, or business metrics
  • Debugging: Include debugging information or request context
  • Business logic: Pass business context through the job lifecycle

Using Metadata

// Submit job with metadata
const job = await orchestrator.submit({
  clientJobId: 'job-123',
  input: { prompt: 'Generate an image' },
  metadata: {
    userId: 'user-123',
    priority: 'high',
    source: 'web-app',
    analytics: {
      campaign: 'summer-promo',
      experiment: 'A',
      sessionId: 'sess-456'
    }
  }
});

// Metadata is available in all events
orchestrator.on('completed', ({ clientJobId, output, metadata }) => {
  console.log(`Job ${clientJobId} completed for user ${metadata?.userId}`);
  
  // Send result to user via WebSocket or API
  if (metadata?.userId) {
    sendToUser(metadata.userId, { jobId: clientJobId, result: output });
  }
  
  // Track analytics
  if (metadata?.analytics) {
    trackEvent('job_completed', metadata.analytics);
  }
});

// Metadata is available in results
const result = await orchestrator.awaitResult(job.clientJobId);
console.log('User ID:', result.metadata?.userId);
console.log('Campaign:', result.metadata?.analytics?.campaign);

Metadata Best Practices

  • Keep it small: Metadata is stored in Redis, so avoid large objects
  • Use consistent structure: Define a schema for your metadata
  • Include user context: Always include userId for user-facing applications
  • Add debugging info: Include request IDs, session IDs, or trace IDs
  • Consider privacy: Don't store sensitive data in metadata

Job States

  • SUBMITTED - Accepted locally, not yet on Runpod
  • QUEUED - Runpod accepted, waiting to start
  • IN_PROGRESS - Currently executing
  • COMPLETED - Successfully finished (terminal)
  • FAILED - Failed with error (terminal)
  • TIMED_OUT - Exceeded timeout (terminal)
  • CANCELED - Canceled by user (terminal)

Multi-Instance Deployment

The orchestrator is designed to run multiple instances safely:

  1. Distributed locks prevent duplicate processing
  2. Work sharding via Redis sorted sets
  3. Automatic recovery of orphaned jobs
  4. Cross-instance events via Redis Pub/Sub
# Start multiple instances
NODE_ENV=production node app.js &
NODE_ENV=production node app.js &
NODE_ENV=production node app.js &

Redis Schema

The orchestrator uses the following Redis keys (with configurable namespace):

  • <namespace>job:<clientJobId> - Job hash with all metadata
  • <namespace>pending - Sorted set of jobs to poll (score = nextPollAt)
  • <namespace>locks:<clientJobId> - Distributed lock token
  • <namespace>events - Pub/Sub channel for cross-instance events
  • <namespace>index:inputHash:<hash> - Input hash deduplication index

By default, <namespace> is "runpod:", but you can customize it via the namespace configuration option.

Error Handling

The orchestrator handles various error scenarios:

  • Transient errors: Network issues, 5xx responses, rate limits
  • Permanent errors: 4xx responses, invalid input, authentication
  • Lock timeouts: Automatic recovery by other instances
  • Process crashes: Jobs recovered on restart

Monitoring

Use the logging hooks to integrate with your monitoring system:

const orchestrator = await createOrchestrator({
  // ... config
  logging: {
    debug: (msg, ...args) => console.debug(`[DEBUG] ${msg}`, ...args),
    info: (msg, ...args) => console.info(`[INFO] ${msg}`, ...args),
    error: (msg, ...args) => console.error(`[ERROR] ${msg}`, ...args),
  }
});

Development

Local Development

# Install dependencies
npm install

# Run tests
npm test

# Run tests in watch mode
npm run test:watch

# Build
npm run build

# Lint
npm run lint

Docker Development

# Start Redis and run the basic example
docker-compose up

# Run in detached mode
docker-compose up -d

# View logs
docker-compose logs -f

# Stop services
docker-compose down

# Rebuild and run
docker-compose up --build

The docker-compose setup includes:

  • Redis: Redis 7 with persistence enabled
  • Orchestrator: Runs the basic usage example with environment variables from .env

Make sure to create a .env file with your Runpod credentials:

RUNPOD_API_KEY=your_api_key_here
RUNPOD_ENDPOINT_ID=your_endpoint_id_here

License

MIT