@pikku/redis
v0.12.4
Published
Redis-based implementation of WorkflowStateService for Pikku Workflows.
Readme
@pikku-workflows/redis
Redis-based implementation of WorkflowStateService for Pikku Workflows.
Features
- RedisWorkflowService: Redis implementation of WorkflowStateService for persistent workflow execution state
- Fast in-memory storage: Leverage Redis for high-performance workflow state management
- Configurable key prefix: Use custom key prefixes for namespace isolation (default: 'workflows')
- Distributed locking: Uses Redis SET NX with TTL for concurrent run safety
- Type-safe: Full TypeScript support
Installation
npm install @pikku-workflows/redis ioredis
# or
yarn add @pikku-workflows/redis ioredisUsage
Basic Setup (Remote Mode)
import Redis from 'ioredis'
import { RedisWorkflowService } from '@pikku-workflows/redis'
import { BullQueueService } from '@pikku/queue-bullmq'
// Create Redis connection
const redis = new Redis({
host: 'localhost',
port: 6379,
})
// Create queue service for remote mode
const queueService = new BullQueueService('redis://localhost:6379')
// Create workflow state service
const workflowService = new RedisWorkflowService(
redis,
queueService,
'workflows'
)
// Initialize (verifies connection)
await workflowService.init()Inline Mode Setup (Testing)
For testing, pass undefined as the queue service to enable inline mode:
// Create workflow state service without queue = inline mode
const workflowService = new RedisWorkflowService(
redis,
undefined, // No queue service = inline mode
'workflows'
)
await workflowService.init()Custom Key Prefix
// Use a custom key prefix
const workflowService = new RedisWorkflowService(
redis,
queueService,
'myapp_workflows'
)
await workflowService.init()With Connection String
// Create service with Redis connection string
const workflowService = new RedisWorkflowService(
'redis://localhost:6379',
queueService
)
await workflowService.init()With Existing Connection
// Share connection with other services
const redis = new Redis('redis://localhost:6379')
const workflowService = new RedisWorkflowService(redis, queueService)
// Connection is shared, won't be closed by workflowService.close()With Config (Owned Connection)
// Let service create its own connection
const workflowService = new RedisWorkflowService(
{ host, port, password },
queueService
)
await workflowService.init()
// Later...
await workflowService.close() // Closes the connectionRedis Data Structure
The service uses the following Redis data structures:
Workflow Runs
- Hash:
{keyPrefix}:run:{runId}- Stores workflow run dataid: Run IDworkflow: Workflow namestatus: Current status ('running', 'completed', 'failed', 'cancelled')input: JSON string of input dataoutput: JSON string of output data (if completed)error: JSON string of error (if failed)createdAt: TimestampupdatedAt: Timestamp
Workflow Steps
- Hash:
{keyPrefix}:step:{runId}:{stepName}- Stores step execution datastepId: Unique step IDstatus: Step status ('pending', 'scheduled', 'succeeded', 'failed')rpcName: RPC function name (if RPC step)data: JSON string of step input dataresult: JSON string of result (if succeeded)error: JSON string of error (if failed)retries: Number of retry attempts allowedretryDelay: Delay between retriesattemptCount: Current attempt numbercreatedAt: TimestampupdatedAt: Timestamp
Workflow Step History
- List:
{keyPrefix}:history:{runId}- Stores all step attempts in chronological order- Each entry contains complete step state for that attempt
Locking
- String:
{keyPrefix}:lock:{runId}- Distributed lock with TTL (30 seconds)
API
RedisWorkflowService
Extends WorkflowStateService from @pikku/core/workflow.
Constructor
new RedisWorkflowService(
connectionOrConfig: Redis | RedisOptions | string,
queue?: any,
keyPrefix?: string
)connectionOrConfig: ioredis Redis instance, RedisOptions config, or connection stringqueue: Optional queue service for remote workflow executionkeyPrefix: Redis key prefix (default: 'workflows')
Methods
init(): Initialize the service (verifies Redis connection)createRun(workflowName, input): Create a new workflow rungetRun(id): Get workflow run by IDgetRunHistory(runId): Get all step attempts in chronological orderupdateRunStatus(id, status, output?, error?): Update run statusinsertStepState(runId, stepName, rpcName, data, stepOptions?): Insert initial step stategetStepState(runId, stepName): Get step state with attempt countsetStepScheduled(stepId): Mark step as scheduledsetStepRunning(stepId): Mark step as runningsetStepResult(stepId, result): Store step result and mark as succeededsetStepError(stepId, error): Store step error and mark as failedcreateRetryAttempt(failedStepId): Create a new retry attempt for a failed stepwithRunLock(id, fn): Execute function with distributed lockclose(): Close Redis connection (if owned)
Documentation
For complete workflow documentation, see pikku.dev/docs/workflows
Locking Behavior
The withRunLock method uses Redis SET NX with a 30-second TTL for distributed locking:
- Retries up to 10 times with 100ms delay between attempts
- Automatically releases lock after function execution
- Uses Lua script to ensure only the lock owner can release it
- Lock automatically expires after 30 seconds if not released
License
MIT
