npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

aws-bedrock-batch-inference-queue

v1.2.0

Published

TypeScript library for queuing and processing LLM tasks via AWS Bedrock batch inference jobs

Readme

AWS Bedrock Batch Inference Queue Library

A TypeScript library for queuing and processing LLM tasks via AWS Bedrock batch inference jobs. This library provides automatic batching, job status tracking in PostgreSQL, and event-driven notifications with result retrieval capabilities.

Features

  • Automatic Batching: Queues tasks and automatically flushes when thresholds are reached
  • Job Tracking: Tracks job status in PostgreSQL database with automatic status updates
  • Event-Driven: Emits events for job status changes, completions, and errors
  • Result Retrieval: Easy retrieval of batch inference results from S3
  • Polling Support: Automatic polling with manual override options
  • Multi-Model Support: Configurable for any AWS Bedrock model (Claude, Titan, etc.)
  • Graceful Retry Handling: Built-in exponential backoff (with jitter) for AWS SDK calls

Installation

npm install aws-bedrock-batch-inference-queue

Prerequisites

  1. PostgreSQL Database: Set up a PostgreSQL database and run the schema migration (see database/schema.sql)
  2. AWS Credentials: Configure AWS credentials with permissions for:
    • AWS Bedrock (CreateModelInvocationJob, GetModelInvocationJob)
    • S3 (PutObject, GetObject, ListObjects)
  3. IAM Role: Create an IAM role for Bedrock batch inference jobs with appropriate permissions
  4. S3 Buckets: Create S3 buckets for input and output files

Database Setup

Run the schema migration to create the required tables and types:

-- See database/schema.sql for the complete schema
psql -U your_user -d your_database -f database/schema.sql

Configuration

Basic Configuration

import { BedrockQueue } from 'aws-bedrock-batch-inference-queue';

const queue = new BedrockQueue({
  // AWS Configuration
  accessKeyId: 'your-access-key-id',
  secretAccessKey: 'your-secret-access-key',
  region: 'us-east-1',
  roleArn: 'arn:aws:iam::123456789012:role/BedrockBatchInferenceRole',

  // S3 Configuration
  s3InputBucket: 'my-bedrock-input-bucket',
  s3InputPrefix: 'bedrock-jobs/input/',
  s3OutputBucket: 'my-bedrock-output-bucket',
  s3OutputPrefix: 'bedrock-jobs/output/',

  // Database Configuration
  dbHost: 'localhost',
  dbPort: 5432,
  dbName: 'bedrock_jobs',
  dbUser: 'postgres',
  dbPassword: 'your-password',

  // Model Configuration
  modelId: 'anthropic.claude-sonnet-4-5-20250929-v1:0',
  jobType: 'retrieval-summary',

  // Optional: Batching Configuration
  batchSize: 100, // Default: 100 (AWS minimum)
  maxBatchSize: 50000, // Default: 50000 (AWS maximum)
  maxFileSizeBytes: 1024 * 1024 * 1024, // Default: 1GB
  maxJobSizeBytes: 5 * 1024 * 1024 * 1024, // Default: 5GB

  // Optional: Polling Configuration
  pollIntervalMs: 30000, // Default: 30 seconds
  enableAutoPolling: true, // Default: true

  // Optional: Retry Configuration
  retryMode: 'adaptive', // Default: adaptive; falls back to AWS defaults when omitted
  maxAttempts: 6, // Default: AWS SDK setting (typically 3)

  // Optional: Throttling Backoff (library-level)
  throttleBackoff: {
    maxRetries: 5, // Additional retries after SDK retries are exhausted
    baseDelayMs: 1000, // Initial wait before retrying throttled calls
    maxDelayMs: 30000, // Cap delay growth for long-running throttles
    multiplier: 2, // Exponential growth factor
    jitterRatio: 0.25, // +/- jitter applied to each delay
  },
});

// Initialize the queue (test database connection)
await queue.initialize();

Retry Behavior

The library configures all AWS SDK calls (Bedrock + S3) to use the SDK's exponential backoff with full jitter. By default, the queue selects the adaptive retry mode and lets the SDK decide the number of attempts (usually 3). You can override the behavior either through the queue config (retryMode, maxAttempts) or the standard environment variables (AWS_RETRY_MODE, AWS_MAX_ATTEMPTS). Explicit config values take precedence over environment variables.

Throttling Backoff

In addition to the SDK retry strategy, the queue retries throttled operations (job submission and status polling) with an exponential backoff of its own. This protects long-running batches when AWS returns extended ThrottlingException responses. Use the optional throttleBackoff block to tune how aggressively the queue retries throttled calls. When a throttled flush ultimately fails after the configured retries, the batch is re-queued so you can try again later.

Optional Rate Limiting (Token Bucket)

If you want to cap the rate of flush() calls before they ever hit AWS, the library exports a simple token bucket helper. It lets you define burst capacity and sustained throughput so multiple queue instances can share a coordinated limit.

import { BedrockQueue, TokenBucket } from 'aws-bedrock-batch-inference-queue';

const limiter = new TokenBucket(
  5, // bucket capacity: allow up to 5 immediate flushes
  2  // refill rate: 2 tokens per second (sustained rate)
);

async function flushWithRateLimit(queue: BedrockQueue) {
  await limiter.take();  // wait until a token is available
  return queue.flush();
}

// Remember to stop the limiter when shutting down
process.on('SIGINT', () => limiter.stop());

Tune the numbers to stay comfortably below your Bedrock quotas. Pairing the limiter with the built-in throttling backoff gives you both proactive rate control and resilient recovery when AWS still answers with throttling.

Usage

Basic Usage

import { BedrockQueue, BedrockTask } from 'aws-bedrock-batch-inference-queue';

// Create queue instance
const queue = new BedrockQueue(config);
await queue.initialize();

// Queue tasks
const task: BedrockTask = {
  recordId: 'task-1',
  modelInput: {
    anthropic_version: 'bedrock-2023-05-31',
    max_tokens: 1024,
    messages: [
      {
        role: 'user',
        content: [
          {
            type: 'text',
            text: 'Summarize this document...',
          },
        ],
      },
    ],
  },
};

await queue.enqueue(task);

// Queue will automatically flush when threshold is reached
// Or manually flush
const jobId = await queue.flush();

// Wait for job completion
const result = await queue.awaitJob(jobId);
console.log('Job completed:', result.status);
console.log('Results:', result.results);

Example: Claude Sonnet 4.5

const queue = new BedrockQueue({
  // ... configuration
  modelId: 'anthropic.claude-sonnet-4-5-20250929-v1:0',
  jobType: 'retrieval-summary',
});

await queue.initialize();

// Queue multiple tasks
for (let i = 0; i < 1500; i++) {
  await queue.enqueue({
    recordId: `task-${i}`,
    modelInput: {
      anthropic_version: 'bedrock-2023-05-31',
      max_tokens: 1024,
      messages: [
        {
          role: 'user',
          content: [
            {
              type: 'text',
              text: `Process document ${i}`,
            },
          ],
        },
      ],
    },
  });
}

// Queue will automatically flush at 100 tasks
// Get the job ID from the event or manually flush remaining tasks
const remainingJobId = await queue.flush();

// Wait for all jobs to complete
const results = await queue.awaitJobs([jobId, remainingJobId]);

Example: Amazon Titan Embed

const queue = new BedrockQueue({
  // ... configuration
  modelId: 'amazon.titan-embed-text-v2:0',
  jobType: 'text-embed',
});

await queue.initialize();

// Queue embedding tasks
await queue.enqueue({
  recordId: 'embed-1',
  modelInput: {
    inputText: 'This is the text to embed',
  },
});

const jobId = await queue.flush();
const result = await queue.awaitJob(jobId);

// Access embeddings
result.results?.forEach((taskResult) => {
  if (taskResult.modelOutput) {
    console.log('Embedding:', taskResult.modelOutput.embedding);
  }
});

Event Handling

// Listen to events
queue.on('task-queued', (task) => {
  console.log('Task queued:', task.recordId);
});

queue.on('batch-flushed', (jobId, recordCount) => {
  console.log(`Batch flushed: ${jobId} with ${recordCount} records`);
});

queue.on('job-queued', (jobId, jobArn) => {
  console.log(`Job queued: ${jobId} (ARN: ${jobArn})`);
});

queue.on('job-status-changed', (jobId, status) => {
  console.log(`Job ${jobId} status: ${status}`);
});

queue.on('job-completed', (jobId, result) => {
  console.log(`Job ${jobId} completed:`, result);
  console.log('Results:', result.results);
});

queue.on('job-failed', (jobId, error) => {
  console.error(`Job ${jobId} failed:`, error);
});

queue.on('error', (error) => {
  console.error('Queue error:', error);
});

Manual Status Checks

// Get job status
const status = await queue.getJobStatus(jobId);
console.log('Job status:', status);

// Get job results
if (status === 'completed') {
  const results = await queue.getJobResults(jobId);
  console.log('Results:', results);
}

Multiple Jobs (Promise.all equivalent)

// Queue multiple batches
const jobIds: string[] = [];
for (let i = 0; i < 5; i++) {
  // ... queue tasks
  const jobId = await queue.flush();
  jobIds.push(jobId);
}

// Wait for all jobs to complete
const results = await queue.awaitJobs(jobIds);
results.forEach((result) => {
  console.log(`Job ${result.jobId}: ${result.status}`);
});

Cleanup

// Close connections when done
await queue.close();

API Reference

BedrockQueue

Constructor

new BedrockQueue(config: BedrockQueueConfig)

Methods

initialize(): Promise<void>

Initialize the queue and test database connection.

enqueue(task: BedrockTask): Promise<void>

Add a task to the queue. Automatically flushes if threshold is reached.

flush(): Promise<string>

Manually flush the queue and create a batch inference job. Returns the job UUID.

getQueueSize(): number

Get the current number of tasks in the queue.

getJobStatus(jobId: string): Promise<JobStatus>

Get the current status of a job.

getJobResults(jobId: string): Promise<TaskResult[]>

Retrieve results from S3 for a completed job.

awaitJob(jobId: string): Promise<JobResult>

Wait for a job to complete and return the result.

awaitJobs(jobIds: string[]): Promise<JobResult[]>

Wait for multiple jobs to complete (Promise.all equivalent).

close(): Promise<void>

Close database connections and cleanup.

Events

  • task-queued: Emitted when a task is added to the queue
  • batch-flushed: Emitted when the queue is flushed (jobId, recordCount)
  • job-queued: Emitted when a job is submitted to AWS (jobId, jobArn)
  • job-status-changed: Emitted when job status changes (jobId, status)
  • job-completed: Emitted when job completes (jobId, result)
  • job-failed: Emitted when job fails (jobId, error)
  • error: Emitted when an error occurs (error)

Types

BedrockQueueConfig

interface BedrockQueueConfig {
  // AWS Configuration
  accessKeyId: string;
  secretAccessKey: string;
  region: string;
  roleArn: string;

  // S3 Configuration
  s3InputBucket: string;
  s3InputPrefix: string;
  s3OutputBucket: string;
  s3OutputPrefix: string;

  // Database Configuration
  dbHost: string;
  dbPort: number;
  dbName: string;
  dbUser: string;
  dbPassword: string;

  // Model Configuration
  modelId: string;
  jobType: 'retrieval-summary' | 'text-embed';

  // Optional: Batching Configuration
  batchSize?: number;
  maxBatchSize?: number;
  maxFileSizeBytes?: number;
  maxJobSizeBytes?: number;

  // Optional: Polling Configuration
  pollIntervalMs?: number;
  enableAutoPolling?: boolean;

  // Optional: Retry Configuration
  retryMode?: 'standard' | 'adaptive' | 'legacy';
  maxAttempts?: number;

  // Optional: Throttling Backoff
  throttleBackoff?: {
    maxRetries?: number;
    baseDelayMs?: number;
    maxDelayMs?: number;
    multiplier?: number;
    jitterRatio?: number;
  };
}

BedrockTask

interface BedrockTask {
  recordId: string;
  modelInput: Record<string, any>; // Model-specific input format
}

JobResult

interface JobResult {
  jobId: string;
  status: JobStatus;
  jobArn?: string;
  s3InputUri?: string;
  s3OutputUri?: string;
  errorMessage?: string;
  results?: TaskResult[];
  recordCount?: number;
}

TaskResult

interface TaskResult {
  recordId: string;
  modelOutput?: any;
  error?: {
    code: string;
    message: string;
  };
}

AWS Bedrock Constraints

  • Minimum records per job: 1,000
  • Maximum records per job: 50,000
  • Maximum file size: 1 GB
  • Maximum job size: 5 GB
  • Maximum concurrent jobs: 10 per account per model ID

The library automatically enforces these constraints.

Model Input Formats

Claude Models

{
  recordId: 'task-1',
  modelInput: {
    anthropic_version: 'bedrock-2023-05-31',
    max_tokens: 1024,
    messages: [
      {
        role: 'user',
        content: [
          {
            type: 'text',
            text: 'Your prompt here',
          },
        ],
      },
    ],
  },
}

Titan Embed Models

{
  recordId: 'task-1',
  modelInput: {
    inputText: 'Text to embed',
  },
}

Error Handling

The library emits error events for various failure scenarios:

  • Database connection errors
  • S3 upload/download errors
  • AWS Bedrock API errors
  • Job execution errors

Listen to the error event to handle errors:

queue.on('error', (error) => {
  console.error('Error:', error);
  // Handle error appropriately
});

License

MIT

Contributing

Contributions are welcome! Please open an issue or submit a pull request.