@kowalski21/infra-workflow
v0.1.0
Published
Standalone Medusa workflows-sdk infrastructure: Redis/BullMQ transaction storage, event bus with file-loaded subscribers, and an awilix container bridge
Maintainers
Readme
infra-workflow
Standalone infrastructure for running Medusa's workflows-sdk outside a Medusa app — in any Node service (Express, Hono, Fastify, plain scripts):
RedisTransactionStorage— durable workflow checkpoints on Redis + BullMQ: restart-surviving retries, step/transaction timeouts, async (pause/resume) steps, and cron/interval-scheduled workflows.RedisEventBus/LocalEventBus— a BullMQ event bus with at-least-once delivery, per-subscriber retry tracking, and an in-process drop-in for tests/scripts that needs no Redis.loadSubscribers— file-convention subscriber loading (defaulthandler +configexport per file).toWorkflowContainer/fromWorkflowContainer— bridges any awilix container (v8–v13 tested) to theMedusaContainerinterface the sdk expects, including its internal-symbol parent access andallowUnregisteredresolves.
Everything is peer-depended: the package adds no runtime dependencies of its own.
ioredis version: BullMQ pins ioredis exactly. Keep your app's ioredis pinned to the same version as your BullMQ's (e.g.
"ioredis": "5.10.1"), or its types will reject your connections.
Install
// package.json of the consuming app
"dependencies": {
"@kowalski21/infra-workflow": "^0.1.0",
"@medusajs/workflows-sdk": "^2.15.5",
"@medusajs/orchestration": "^2.15.5",
"@medusajs/types": "^2.15.5",
"@medusajs/utils": "^2.15.5",
"awilix": "^13",
"bullmq": "^5",
"ioredis": "5.10.1"
}Wiring (server entrypoint)
import {
DistributedTransaction, WorkflowScheduler
} from "@medusajs/orchestration";
import {
createRedisConnections, RedisEventBus, RedisTransactionStorage, loadSubscribers
} from "@kowalski21/infra-workflow";
const { redisClient, redisWorkerConnection } = createRedisConnections(REDIS_URL);
const eventBus = new RedisEventBus({ redisClient, redisWorkerConnection, logger });
container.register({ eventBus: asValue(eventBus) });
// Optional: how background executions (retries, scheduled runs, event
// handlers) build their DI scope — register per-execution values here.
const createScope = (root) => {
const scope = root.createScope();
scope.register({ requestId: asValue(`bg_${crypto.randomUUID()}`) });
return scope;
};
const storage = new RedisTransactionStorage({
redisClient, redisWorkerConnection, container, createScope, logger
});
DistributedTransaction.setStorage(storage);
WorkflowScheduler.setStorage(storage);
await storage.onApplicationStart();
await loadSubscribers(subscribersDir, eventBus, container, { logger, createScope });
eventBus.start();
// shutdown order:
// storage.onApplicationPrepareShutdown() → eventBus.shutdown()
// → storage.onApplicationShutdown() → redis connections .quit()For tests and scripts, register new LocalEventBus(logger) instead — no Redis required, and the default in-memory workflow storage handles synchronous workflows.
Running a workflow with your container
import { toWorkflowContainer, fromWorkflowContainer } from "@kowalski21/infra-workflow";
// in a route handler — pass your (scoped) awilix container:
const { result, errors } = await myWorkflow(toWorkflowContainer(requestScope))
.run({ input, throwOnError: false });
// inside a step — cast back to your container type:
const scope = fromWorkflowContainer<AppContainer>(container);
const service = scope.resolve("myService");Error mapping caveat: errors thrown inside steps are serialized by the orchestrator and lose their prototype chain. Match on error.name, not instanceof.
Subscriber files
// subscribers/order-created.ts
import type { SubscriberArgs, SubscriberConfig } from "@kowalski21/infra-workflow";
export default async function handleOrderCreated({
event, container
}: SubscriberArgs<{ orderId: string }, AppContainer>) {
await container.resolve("notifier").send(event.data.orderId);
}
export const config: SubscriberConfig = { event: "order.created" };Subscriber failures retry (3 attempts, exponential backoff) and never propagate to the emitter — side-effects whose failure must roll back the operation belong in the workflow as a compensated step, not in a subscriber.
