npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@lindorm/iris

v0.6.0

Published

Decorator-driven messaging for Node.js with a single API across multiple brokers. Define messages once, publish and subscribe against memory, RabbitMQ, Kafka, NATS, or Redis Streams.

Readme

@lindorm/iris

Decorator-driven messaging for Node.js with a single API across multiple brokers. Define messages once, publish and subscribe against memory, RabbitMQ, Kafka, NATS, or Redis Streams.

Install

npm install @lindorm/iris

This package is ESM-only and ships native (TC39 stage 3) decorators. Use it from an ESM project ("type": "module") and a TypeScript version that supports stage 3 decorators (TS 5+). The package polyfills Symbol.metadata for older Node versions, so no extra setup is required.

Supported Drivers

Iris is broker-agnostic. The driver is chosen via the IrisSource constructor; the matching peer dependency is loaded lazily and only needs to be installed for the driver(s) you use.

| Driver | Peer Dependency | | -------- | --------------- | | memory | (none) | | rabbit | amqplib ^0.10 | | kafka | kafkajs ^2.2 | | nats | nats ^2.29 | | redis | ioredis ^5.10 |

@lindorm/amphora ^0.4 is an optional peer used for payload encryption (@Encrypted). @lindorm/logger ^0.5 is a required peer — every IrisSource takes an ILogger.

npm install amqplib       # RabbitMQ
npm install kafkajs       # Kafka
npm install nats          # NATS
npm install ioredis       # Redis Streams
npm install @lindorm/amphora  # Encryption (optional)

Quick Start

1. Define a Message

import {
  Field,
  IdentifierField,
  Message,
  Namespace,
  TimestampField,
  Version,
} from "@lindorm/iris";

@Message()
@Namespace("orders")
@Version(1)
class OrderPlaced {
  @IdentifierField() id!: string;
  @TimestampField() createdAt!: Date;
  @Field("string") orderId!: string;
  @Field("float") total!: number;
}

2. Create a Source

import { IrisSource } from "@lindorm/iris";

const source = new IrisSource({
  driver: "rabbit",
  url: "amqp://localhost",
  logger: myLogger,
  messages: [OrderPlaced],
});

await source.connect();
await source.setup();

3. Publish and Subscribe

const bus = source.messageBus(OrderPlaced);

await bus.subscribe({
  topic: "OrderPlaced",
  queue: "order-service",
  callback: async (msg, envelope) => {
    console.log(`Order ${msg.orderId} placed for $${msg.total}`);
  },
});

const msg = bus.create({ orderId: "abc-123", total: 59.99 });
await bus.publish(msg);

4. Graceful Shutdown

await source.drain();
await source.disconnect();

Messaging Patterns

Every primitive below is created from an IrisSource (or a session — see Sessions) and is bound to a single message class.

Publisher

Write-only. Use when a service produces messages but never consumes them.

const pub = source.publisher(OrderPlaced);

const msg = pub.create({ orderId: "abc-123", total: 59.99 });
await pub.publish(msg);

await pub.publish([msg1, msg2, msg3]);

Message Bus

Pub/sub with topic-based subscriptions. Supports broadcast and competing-consumer queues.

const bus = source.messageBus(OrderPlaced);

await bus.subscribe({
  topic: "OrderPlaced",
  callback: async (msg) => {
    /* every subscriber gets every message */
  },
});

await bus.subscribe({
  topic: "OrderPlaced",
  queue: "order-processors",
  callback: async (msg) => {
    /* round-robin across consumers in this queue */
  },
});

await bus.subscribe([
  { topic: "OrderPlaced", queue: "analytics", callback: handleAnalytics },
  { topic: "OrderPlaced", queue: "notifications", callback: handleNotify },
]);

await bus.unsubscribe({ topic: "OrderPlaced", queue: "analytics" });
await bus.unsubscribeAll();

Worker Queue

Specialised competing-consumer queue where every message is processed by exactly one consumer.

const queue = source.workerQueue(OrderPlaced);

await queue.consume("process-orders", async (msg, envelope) => {
  console.log(`Processing order ${msg.orderId} (attempt ${envelope.attempt})`);
});

await queue.publish(queue.create({ orderId: "abc-123", total: 59.99 }));

await queue.unconsume("process-orders");
await queue.unconsumeAll();

RPC

Request/response over the broker.

@Message()
class GetPrice {
  @Field("string") sku!: string;
}

@Message()
class PriceResponse {
  @Field("float") price!: number;
  @Field("string") currency!: string;
}

const client = source.rpcClient(GetPrice, PriceResponse);
const server = source.rpcServer(GetPrice, PriceResponse);

await server.serve(async (req) => {
  const res = new PriceResponse();
  res.price = await lookupPrice(req.sku);
  res.currency = "USD";
  return res;
});

const req = new GetPrice();
req.sku = "WIDGET-42";

const res = await client.request(req);
const resWithTimeout = await client.request(req, { timeout: 5000 });

await client.close();
await server.unserveAll();

Stream Processor

Declarative pipelines built with an immutable builder. The pipeline is started, paused, resumed, and stopped explicitly.

const pipeline = source
  .stream()
  .from(RawEvent)
  .filter((msg) => msg.value > 0)
  .map((msg) => {
    const out = new AggregatedEvent();
    out.sum = msg.value;
    out.count = 1;
    return out;
  })
  .to(AggregatedEvent);

await pipeline.start();
pipeline.isRunning(); // true
await pipeline.pause();
await pipeline.resume();
await pipeline.stop();

The builder also exposes flatMap((msg) => Array<T>) and batch(size, { timeout? }) between from(...) and to(...).

Field Types

@Field() accepts the following type identifiers:

| Category | Types | | -------------- | -------------------------------- | | Boolean | boolean | | Integer | integer, bigint | | Floating point | float | | String | string, email, url, uuid | | Enum | enum | | Date/Time | date | | Structured | object, array |

@Message()
class Example {
  @IdentifierField() id!: string;
  @CorrelationField() correlationId!: string;
  @TimestampField() createdAt!: Date;

  @Field("string") name!: string;
  @Field("integer") count!: number;
  @Field("float") price!: number;
  @Field("boolean") active!: boolean;
  @Field("date") expiresAt!: Date;
  @Field("uuid") referenceId!: string;
  @Field("email") contactEmail!: string;
  @Field("url") callbackUrl!: string;
  @Field("array") tags!: Array<string>;
  @Field("object") metadata!: Record<string, unknown>;

  @Nullable() @Field("string") description!: string | null;
  @Optional() @Field("string") nickname?: string;
  @Default(0) @Field("integer") retryCount!: number;
  @Default(() => "generated") @Field("string") code!: string;
}

Decorators

All decorators use the TC39 stage 3 specification. Class decorators receive ClassDecoratorContext, field decorators receive ClassFieldDecoratorContext. Metadata flows through the Symbol.metadata prototype chain, so abstract base classes propagate fields and hooks to concrete subclasses automatically.

Class-Level Decorators

@Message

Marks a class as a concrete message and registers it in the global message registry. Every message must have exactly one of @Message or @AbstractMessage.

@Message()
class OrderPlaced {
  /* ... */
}

@Message({ name: "order-placed" })
class OrderPlaced_v1 {
  /* ... */
}

Options: { name?: string } — overrides the registered name. Defaults to the class name with any trailing _v1/_V2 suffix stripped. Names must be unique across the registry.

@AbstractMessage

Marks a class as an abstract base. The class is not registered. Fields, hooks, and metadata are inherited by @Message subclasses through the metadata prototype chain.

@AbstractMessage()
class BaseEvent {
  @IdentifierField() id!: string;
  @TimestampField() createdAt!: Date;
}

@Message()
@Namespace("orders")
class OrderPlaced extends BaseEvent {
  @Field("string") orderId!: string;
}

@Namespace

Places the message in a named namespace for routing and grouping.

@Namespace("orders")
@Message()
class OrderPlaced {
  /* ... */
}

Argument: string — non-empty, non-whitespace. Throws IrisMetadataError otherwise.

@Version

Sets the message schema version.

@Version(1)
@Message()
class OrderPlaced {
  /* ... */
}

Argument: number — positive integer (>= 1). Throws IrisMetadataError otherwise.

@Topic

Provides a callback that resolves the routing topic dynamically from the message instance instead of using the class name.

@Topic((msg: RegionalEvent) => `events.${msg.region}.${msg.type}`)
@Message()
class RegionalEvent {
  @Field("string") region!: string;
  @Field("string") type!: string;
}

Argument: (message: any) => string.

@Broadcast

Marks a message for broadcast delivery. Every subscriber receives every message rather than competing for it.

@Broadcast()
@Message()
class SystemNotification {
  /* ... */
}

@Persistent

Marks a message as durable. Persistent messages survive broker restarts where the underlying driver supports it.

@Persistent()
@Message()
class PaymentCharge {
  /* ... */
}

@Priority

Default priority for a message type. Higher priority messages are delivered first where supported.

@Priority(8)
@Message()
class UrgentAlert {
  /* ... */
}

Argument: number — integer in [0, 10]. Throws IrisMetadataError otherwise.

@Delay

Default delivery delay in milliseconds. Messages are held until the delay elapses. Can be overridden per publish via PublishOptions.delay.

@Delay(5000)
@Message()
class ScheduledReminder {
  /* ... */
}

Argument: number — non-negative integer.

@Expiry

Default TTL in milliseconds. Unconsumed messages are discarded after this window. Can be overridden per publish via PublishOptions.expiry.

@Expiry(60000)
@Message()
class TemporaryOffer {
  /* ... */
}

Argument: number — non-negative integer.

@Encrypted

Encrypts the payload via @lindorm/amphora before publishing and decrypts on consume. Requires amphora on IrisSource.

@Encrypted()
@Message()
class SensitivePayload {
  @Field("string") ssn!: string;
}

@Encrypted({ purpose: "pii" })
@Message()
class MedicalRecord {
  @Field("object") data!: Record<string, unknown>;
}

Argument: AmphoraPredicate (defaults to {}) — predicate object that selects which key from the amphora key store is used (e.g. algorithm, encryption, purpose, type, ownerId, plus the standard $eq, $in, $neq operators).

@Compressed

Compresses the payload before publishing.

@Compressed() // gzip (default)
@Compressed("brotli") // brotli
@Message()
class LargePayload {
  @Field("object") data!: Record<string, unknown>;
}

Argument: "gzip" | "deflate" | "brotli" (defaults to "gzip"). When combined with @Encrypted, compression runs first.

@Retry

Configures retry behavior when a consume callback throws.

@Retry()
@Message()
class ProcessOrder {
  /* ... */
}

@Retry({
  maxRetries: 5,
  strategy: "exponential",
  delay: 1000,
  delayMax: 30000,
  multiplier: 2,
  jitter: true,
})
@Message()
class PaymentCharge {
  /* ... */
}

Options:

| Field | Type | Default | Description | | ------------ | --------------------------------------------- | ------------ | -------------------------------------- | | maxRetries | number | 3 | Maximum number of retry attempts | | strategy | "constant" | "linear" | "exponential" | "constant" | Backoff strategy | | delay | number | 1000 | Initial delay in milliseconds | | delayMax | number | 30000 | Maximum delay cap in milliseconds | | multiplier | number | 2 | Multiplier used by exponential backoff | | jitter | boolean | false | Add randomness to spread retry storms |

| Strategy | Delay sequence (delay=1000, multiplier=2) | | --------------- | ---------------------------------------------------- | | "constant" | 1000, 1000, 1000, ... | | "linear" | 1000, 2000, 3000, ... | | "exponential" | 1000, 2000, 4000, 8000, ... (capped at delayMax) |

@DeadLetter

Routes messages that exhaust all retries to the configured dead letter store. Requires @Retry and IrisSource.persistence.deadLetter.

@Retry({ maxRetries: 3 })
@DeadLetter()
@Message()
class PaymentCharge {
  /* ... */
}

Field Decorators

Each field decorator declares one message field with its type, default, and nullability. Combine with the modifier decorators below.

@Field

Declares a field with an explicit type from the Field Types table.

@Field("string") name!: string;
@Field("integer") count!: number;
@Field("date") expiresAt!: Date;
@Field("array") tags!: Array<string>;
@Field("object") metadata!: Record<string, unknown>;

Argument: MetaFieldType (re-exported as IrisFieldType).

@IdentifierField

Auto-generated UUID v4 primary identifier. Equivalent to @Default(() => randomUUID()) @Field("uuid"). Non-nullable, non-optional.

@IdentifierField() id!: string;

@CorrelationField

Auto-generated UUID v4 used to trace related messages across publish/consume chains. Equivalent to @Default(() => randomUUID()) @Field("uuid"). Non-nullable, non-optional.

@CorrelationField() correlationId!: string;

@TimestampField

Auto-generated Date set on creation. Equivalent to @Default(() => new Date()) @Field("date"). Non-nullable, non-optional.

@TimestampField() createdAt!: Date;

@MandatoryField

Boolean flag that defaults to false. Equivalent to @Default(false) @Field("boolean").

@MandatoryField() requiresApproval!: boolean;

@PersistentField

Boolean persistence flag that defaults to false. Equivalent to @Default(false) @Field("boolean").

@PersistentField() shouldPersist!: boolean;

Field Modifiers

Stack these on top of a field decorator to refine its behavior.

@Nullable / @Optional

@Nullable() @Field("string") description!: string | null;
@Optional() @Field("string") nickname?: string;

@Default

@Default(0) @Field("integer") retryCount!: number;
@Default(() => "generated") @Field("string") code!: string;

Argument: MetaFieldDefault (re-exported as IrisFieldDefault) — a primitive value, an array/record of primitives, or a no-arg function returning one.

@Generated

Marks a field for automatic value generation when an instance is created.

@Generated("uuid") @Field("uuid") traceId!: string;
@Generated("date") @Field("date") processedAt!: Date;
@Generated("string") @Field("string") token!: string;
@Generated("string", { length: 12 }) @Field("string") shortCode!: string;
@Generated("integer", { min: 1, max: 1000 }) @Field("integer") sequence!: number;
@Generated("float", { min: 0, max: 1 }) @Field("float") weight!: number;

Strategies: "uuid" | "date" | "string" | "integer" | "float" (re-exported as IrisGeneratedStrategy). Options: { length?, min?, max? }.

@Header

Promotes a field to a transport header so it is accessible without deserialising the body.

@Header() @Field("string") source!: string;
@Header("x-trace-id") @Field("uuid") traceId!: string;

Argument: string? — header name, defaults to the property name. Throws IrisMetadataError if it resolves to an empty string.

@Enum

Restricts the field to the values of a TypeScript enum or Record<string, string | number>. Enforced during validation.

enum OrderStatus {
  Pending = "pending",
  Shipped = "shipped",
  Delivered = "delivered",
}

@Enum(OrderStatus) @Field("enum") status!: OrderStatus;

@Min / @Max

Numeric bounds for number fields, length bounds for string fields.

@Min(0) @Max(100) @Field("integer") score!: number;
@Min(1) @Max(255) @Field("string") name!: string;

@Schema

Attaches a Zod schema for fine-grained validation.

import { z } from "zod";

@Schema(z.string().email()) @Field("email") email!: string;
@Schema(z.number().int().min(13).max(150)) @Field("integer") age!: number;

Argument: z.ZodType.

@Transform

Bidirectional transform between the in-memory field value and its serialised form. to runs on serialise, from runs on hydrate.

@Transform({
  to: (value: string[]) => value.join(","),
  from: (raw: string) => raw.split(","),
})
@Field("string")
tags!: string[];

Options: { to: (value) => unknown, from: (raw) => unknown } (the type is re-exported as IrisTransformFn).

Lifecycle Hook Decorators

Hooks are class decorators. Each hook receives (message, meta) where meta: IrisHookMeta carries { correlationId, actor, timestamp }. Hooks may be sync or async.

| Decorator | Fires | | ----------------- | ---------------------------------------------------------------------------------- | | @OnCreate | When a message is constructed via create() | | @OnHydrate | After raw transport data is rehydrated, before delivery to a consume callback | | @OnValidate | During validation, after the built-in Zod check — throw to reject | | @BeforePublish | Before the transport publishes | | @AfterPublish | After the transport confirms publication | | @BeforeConsume | After deserialisation, before the consume callback | | @AfterConsume | After the consume callback completes successfully | | @OnConsumeError | When the consume callback throws — receives (error, message, meta) (error first) |

@OnCreate((msg) => {
  msg.slug = msg.name.toLowerCase().replace(/\s+/g, "-");
})
@OnValidate((msg) => {
  if (msg.startDate >= msg.endDate) throw new Error("startDate must be before endDate");
})
@BeforePublish(async (msg) => {
  await validateExternalId(msg.externalId);
})
@AfterPublish(async (msg) => {
  metrics.increment("messages.published");
})
@BeforeConsume(async (msg) => {
  audit.log("consuming", msg);
})
@AfterConsume(async (msg) => {
  metrics.increment("messages.consumed");
})
@OnConsumeError(async (error, msg) => {
  errorTracker.capture(error, { messageId: msg.id });
})
@Message()
class BookingRequest {
  /* ... */
}

Hook Execution Order

| Phase | Order | | ---------- | ----------------------------------------------------------------------------------------------------------- | | Creation | @OnCreate | | Validation | @OnValidate | | Hydration | @OnHydrate | | Publish | @BeforePublish -> subscriber.beforePublish -> transport -> @AfterPublish -> subscriber.afterPublish | | Consume | @BeforeConsume -> subscriber.beforeConsume -> callback -> @AfterConsume -> subscriber.afterConsume | | Error | @OnConsumeError -> subscriber.onConsumeError (replaces the after-consume steps) |

Subscribers

Observe message lifecycle events across every message in a source.

import type { IMessageSubscriber } from "@lindorm/iris";

const audit: IMessageSubscriber = {
  beforePublish: async (msg) => {
    auditLog.log("publishing", msg);
  },
  afterConsume: async (msg) => {
    auditLog.log("consumed", msg);
  },
  onConsumeError: async (err, msg) => {
    auditLog.log("consume-failed", { error: err.message, msg });
  },
};

source.addSubscriber(audit);
source.removeSubscriber(audit);

Consume Envelope

Every subscribe/consume callback receives the message and a ConsumeEnvelope:

import type { ConsumeEnvelope } from "@lindorm/iris";

await bus.subscribe({
  topic: "OrderPlaced",
  callback: async (msg: OrderPlaced, envelope: ConsumeEnvelope) => {
    envelope.topic; // string
    envelope.messageName; // string
    envelope.namespace; // string | null
    envelope.version; // number
    envelope.headers; // Record<string, string>
    envelope.attempt; // number (1 on first delivery, increments on retry)
    envelope.correlationId; // string | null
    envelope.timestamp; // number (unix ms)
  },
});

Message Manipulation

Every publisher, message bus, and worker queue exposes the same instance utilities:

const bus = source.messageBus(OrderPlaced);

const msg = bus.create({ orderId: "abc-123", total: 59.99 });
const hydrated = bus.hydrate({ id: "...", orderId: "abc-123", total: 59.99, createdAt: ... });
const copied = bus.copy(msg); // deep clone with a fresh identifier value
bus.validate(msg);            // throws IrisValidationError when invalid

create() runs default value generation and @OnCreate hooks. hydrate() reconstructs from raw transport data without regenerating defaults and runs @OnHydrate hooks.

Publish Options

Per-publish overrides for publish(message, options):

await bus.publish(msg, {
  delay: 5000, // ms — overrides @Delay
  priority: 8, // 0..10 — overrides @Priority
  expiry: 60000, // ms — overrides @Expiry
  key: "partition-key", // routing/partition key
  headers: { "x-source": "api" }, // additional headers
});

Sessions

source.session() returns a lightweight IIrisSession that shares the underlying driver connection but carries its own logger, IrisHookMeta, and a snapshot of the source's subscribers at the time of cloning. This is the idiomatic way to scope correlation IDs and per-request loggers without tearing down the connection.

const source = new IrisSource({
  driver: "rabbit",
  url: "amqp://localhost",
  logger: appLogger,
  messages: [OrderPlaced],
});

await source.connect();
await source.setup();

const scoped = source.session({
  logger: requestLogger,
  meta: { correlationId: "abc-123", actor: "user:42", timestamp: new Date() },
});

const pub = scoped.publisher(OrderPlaced);
await pub.publish(pub.create({ orderId: "abc", total: 10 }));

A session exposes the same messageBus, publisher, workerQueue, stream, rpcClient, rpcServer, hasMessage, ping, and driver surface as the source.

Driver Configuration

Memory

new IrisSource({ driver: "memory", logger, messages: [OrderPlaced] });

RabbitMQ

new IrisSource({
  driver: "rabbit",
  url: "amqp://localhost",
  logger,
  messages: [OrderPlaced],
  exchange: "my-exchange",
  prefetch: 10,
  connection: {
    heartbeat: 60,
    socketOptions: { timeout: 30000, keepAlive: true },
  },
});

Kafka

new IrisSource({
  driver: "kafka",
  brokers: ["localhost:9092"],
  logger,
  messages: [OrderPlaced],
  prefix: "my-app",
  prefetch: 100,
  acks: -1, // -1 = all, 0 = none, 1 = leader
  sessionTimeoutMs: 30000,
  connection: {
    clientId: "my-service",
    ssl: true,
    sasl: { mechanism: "scram-sha-256", username: "user", password: "pass" },
    connectionTimeout: 10000,
    requestTimeout: 30000,
    retry: { retries: 5, initialRetryTime: 300 },
  },
});

NATS

new IrisSource({
  driver: "nats",
  servers: "nats://localhost:4222", // string or Array<string>
  logger,
  messages: [OrderPlaced],
  prefix: "my-app",
  prefetch: 50,
  connection: {
    user: "nats-user",
    pass: "nats-pass",
    tls: true,
    maxReconnectAttempts: 10,
    reconnectTimeWait: 2000,
    timeout: 10000,
    pingInterval: 30000,
    name: "my-service",
  },
});

Redis Streams

new IrisSource({
  driver: "redis",
  url: "redis://localhost:6379",
  logger,
  messages: [OrderPlaced],
  prefix: "my-app",
  prefetch: 50,
  blockMs: 5000, // XREAD block time
  maxStreamLength: 10000, // MAXLEN cap per stream
  connection: {
    host: "redis.internal",
    port: 6379,
    password: "secret",
    db: 0,
    tls: {},
    connectTimeout: 10000,
    commandTimeout: 5000,
    keepAlive: 30000,
    connectionName: "iris-worker",
  },
});

Persistence (Delay and Dead Letter Stores)

Configure where delayed deliveries and dead-lettered envelopes are kept. These are used by drivers that don't already provide native primitives for them.

new IrisSource({
  driver: "kafka",
  brokers: ["localhost:9092"],
  logger,
  messages: [OrderPlaced],
  persistence: {
    delay: { type: "memory", pollIntervalMs: 250 },
    // delay: { type: "redis", url: "redis://localhost:6379", pollIntervalMs: 250 },
    // delay: { type: "custom", store: myDelayStore },

    deadLetter: { type: "memory" },
    // deadLetter: { type: "redis", url: "redis://localhost:6379" },
    // deadLetter: { type: "custom", store: myDeadLetterStore },
  },
});

Custom Stores

Implement IDelayStore and/or IDeadLetterStore for bespoke persistence:

import type {
  DeadLetterEntry,
  DeadLetterFilterOptions,
  DeadLetterListOptions,
  DelayedEntry,
  IDeadLetterStore,
  IDelayStore,
} from "@lindorm/iris";

class MyDelayStore implements IDelayStore {
  schedule = async (entry: DelayedEntry): Promise<void> => {
    /* ... */
  };
  poll = async (now: number): Promise<Array<DelayedEntry>> => {
    /* ... */
  };
  cancel = async (id: string): Promise<boolean> => {
    /* ... */
  };
  size = async (): Promise<number> => {
    /* ... */
  };
  clear = async (): Promise<void> => {
    /* ... */
  };
  close = async (): Promise<void> => {
    /* ... */
  };
}

class MyDeadLetterStore implements IDeadLetterStore {
  add = async (entry: DeadLetterEntry): Promise<void> => {
    /* ... */
  };
  list = async (options?: DeadLetterListOptions): Promise<Array<DeadLetterEntry>> => {
    /* ... */
  };
  get = async (id: string): Promise<DeadLetterEntry | null> => {
    /* ... */
  };
  remove = async (id: string): Promise<boolean> => {
    /* ... */
  };
  purge = async (options?: DeadLetterFilterOptions): Promise<number> => {
    /* ... */
  };
  count = async (options?: DeadLetterFilterOptions): Promise<number> => {
    /* ... */
  };
  close = async (): Promise<void> => {
    /* ... */
  };
}

Connection State

const state = source.getConnectionState();
// "disconnected" | "connecting" | "connected" | "reconnecting" | "draining"

source.on("connection:state", (next) => {
  console.log(`connection state -> ${next}`);
});

source.once("connection:state", (next) => {
  /* ... */
});
source.off("connection:state", listener);

const healthy = await source.ping();

Testing with Mocks

Iris ships per-runtime mock factories. Pick the import path that matches your test runner:

// Jest
import {
  createMockIrisSource,
  createMockIrisSession,
  createMockMessageBus,
  createMockPublisher,
  createMockRpcClient,
  createMockWorkerQueue,
} from "@lindorm/iris/mocks/jest";

// Vitest
import {
  createMockIrisSource,
  createMockIrisSession,
  createMockMessageBus,
  createMockPublisher,
  createMockRpcClient,
  createMockWorkerQueue,
} from "@lindorm/iris/mocks/vitest";
const source = createMockIrisSource();

await source.connect();
expect(source.connect).toHaveBeenCalledTimes(1);

const bus = source.messageBus(OrderPlaced);
const pub = source.publisher(OrderPlaced);
const queue = source.workerQueue(OrderPlaced);
const rpc = source.rpcClient(GetPrice, PriceResponse);

The bus, publisher, and worker-queue mocks expose a published array and a clearPublished() helper. The RPC client mock takes an optional response factory and exposes requests plus clearRequests().

const pub = createMockPublisher<OrderPlaced>();
await pub.publish(pub.create({ orderId: "abc", total: 10 }));
expect(pub.published).toHaveLength(1);
pub.clearPublished();

const client = createMockRpcClient<GetPrice, PriceResponse>((req) => {
  const res = new PriceResponse();
  res.price = 42.0;
  res.currency = "USD";
  return res;
});
const res = await client.request(new GetPrice());
expect(client.requests).toHaveLength(1);
client.clearRequests();

CLI

The package ships an iris binary for scaffolding:

npx iris init --driver rabbit --directory ./src/iris
npx iris generate message OrderPlaced --directory ./src/iris/messages

iris init (alias i) writes a source.ts plus an empty messages/ directory. iris generate message (alias g m) writes a single PascalCase-named message file. Both commands accept --dry-run.

Error Classes

Every error extends IrisError, which extends LindormError from @lindorm/errors.

| Error | Raised when | | ------------------------ | ------------------------------------------------ | | IrisError | Base class — extend for custom errors | | IrisDriverError | Driver connection or operation failed | | IrisMetadataError | A decorator was used with an invalid value | | IrisNotSupportedError | The active driver does not support the operation | | IrisPublishError | Publishing a message failed | | IrisScannerError | Auto-scanning message classes from disk failed | | IrisSerializationError | (De)serialisation of an envelope failed | | IrisSourceError | Source configuration or lifecycle error | | IrisTimeoutError | An operation exceeded its timeout | | IrisTransportError | Transport-layer failure | | IrisValidationError | A message failed validation |

import { IrisTimeoutError } from "@lindorm/iris";

try {
  await client.request(req, { timeout: 1000 });
} catch (error) {
  if (error instanceof IrisTimeoutError) {
    // handle timeout
  }
}

License

AGPL-3.0-or-later