npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

opinionated-machine

v6.10.1

Published

Very opinionated DI framework for fastify, built on top of awilix

Readme

opinionated-machine

Very opinionated DI framework for fastify, built on top of awilix

Table of Contents

Basic usage

Define a module, or several modules, that will be used for resolving dependency graphs, using awilix:

import { AbstractModule, type InferModuleDependencies, asSingletonClass, asMessageQueueHandlerClass, asEnqueuedJobWorkerClass, asJobQueueClass, asControllerClass } from 'opinionated-machine'

export class MyModule extends AbstractModule {
    resolveDependencies(
        diOptions: DependencyInjectionOptions,
    ) {
        return {
            service: asSingletonClass(Service),

            // by default init and disposal methods from `message-queue-toolkit` consumers
            // will be assumed. If different values are necessary, pass second config object
            // and specify "asyncInit" and "asyncDispose" fields
            messageQueueConsumer: asMessageQueueHandlerClass(MessageQueueConsumer, {
                queueName: MessageQueueConsumer.QUEUE_ID,
                diOptions,
            }),

            // by default init and disposal methods from `background-jobs-commons` job workers
            // will be assumed. If different values are necessary, pass second config object
            // and specify "asyncInit" and "asyncDispose" fields
            jobWorker: asEnqueuedJobWorkerClass(JobWorker, {
                queueName: JobWorker.QUEUE_ID,
                diOptions,
            }),

            // by default disposal methods from `background-jobs-commons` job queue manager
            // will be assumed. If different values are necessary, specify "asyncDispose" fields
            // in the second config object
            queueManager: asJobQueueClass(
                QueueManager,
                {
                    diOptions,
                },
                {
                    asyncInit: (manager) => manager.start(resolveJobQueuesEnabled(options)),
                },
            ),
        }
    }

    // controllers will be automatically registered on fastify app
    // both REST and SSE controllers go here - SSE controllers are auto-detected
    resolveControllers(diOptions: DependencyInjectionOptions) {
        return {
            controller: asControllerClass(MyController),
        }
    }
}

// Dependencies are inferred from the return type of resolveDependencies()
export type ModuleDependencies = InferModuleDependencies<MyModule>

The InferModuleDependencies utility type extracts the dependency types from the resolvers returned by resolveDependencies(), so you don't need to maintain a separate type manually.

When a module is used as a secondary module, only resolvers marked as public (asServiceClass, asUseCaseClass, asJobQueueClass, asEnqueuedJobQueueManagerFunction) are exposed. Use InferPublicModuleDependencies to infer only the public dependencies (private ones are omitted entirely):

// Inferred as { service: Service } — private resolvers are omitted
export type MyModulePublicDependencies = InferPublicModuleDependencies<MyModule>

Managing global public dependencies across modules

When your application has multiple secondary modules, you need a single type that combines all their public dependencies. The library exports an empty PublicDependencies interface that each module can augment via TypeScript's module augmentation. Each module file adds its own public deps to this shared interface using declare module. The augmentations are project-wide — they apply everywhere as long as the augmenting file is part of your TypeScript compilation (included in tsconfig.json), with no explicit import chain required.

Start with a CommonModule that provides shared infrastructure dependencies (logger, config, etc.), then add domain modules that each augment the same interface independently.

// CommonModule.ts — shared infrastructure
import { AbstractModule, type InferPublicModuleDependencies } from 'opinionated-machine'

export class CommonModule extends AbstractModule {
  resolveDependencies(diOptions: DependencyInjectionOptions) {
    return {
      config: asSingletonFunction(() => loadConfig()),     // private — omitted
      logger: asServiceClass(Logger),                      // public
      eventEmitter: asServiceClass(AppEventEmitter),       // public
    }
  }
}

declare module 'opinionated-machine' {
  interface PublicDependencies extends InferPublicModuleDependencies<CommonModule> {}
}
// UsersModule.ts — no need to import CommonModule's type
import { AbstractModule, type InferPublicModuleDependencies } from 'opinionated-machine'

export class UsersModule extends AbstractModule {
  resolveDependencies(diOptions: DependencyInjectionOptions) {
    return {
      userService: asServiceClass(UserService),         // public
      userRepository: asRepositoryClass(UserRepository), // private — omitted
    }
  }
}

declare module 'opinionated-machine' {
  interface PublicDependencies extends InferPublicModuleDependencies<UsersModule> {}
}
// BillingModule.ts — independent, no chain
import { AbstractModule, type InferPublicModuleDependencies } from 'opinionated-machine'

export class BillingModule extends AbstractModule {
  resolveDependencies(diOptions: DependencyInjectionOptions) {
    return {
      billingService: asServiceClass(BillingService),       // public
      paymentGateway: asRepositoryClass(PaymentGateway),     // private — omitted
    }
  }
}

declare module 'opinionated-machine' {
  interface PublicDependencies extends InferPublicModuleDependencies<BillingModule> {}
}

Importing PublicDependencies from anywhere gives you the full accumulated type: { logger: Logger; eventEmitter: AppEventEmitter; userService: UserService; billingService: BillingService }. Private dependencies (config, userRepository, paymentGateway) are omitted automatically. No explicit import chain between modules is needed — each module augments the interface independently.

Typing constructor dependencies within a module

Classes within a module can access both the module's own dependencies (including private ones like repositories) and all public dependencies from other modules. Combine InferModuleDependencies with PublicDependencies to get the full cradle type available at runtime:

// UsersModule.ts
import {
  AbstractModule,
  type InferModuleDependencies,
  type InferPublicModuleDependencies,
  type PublicDependencies,
} from 'opinionated-machine'

// Module's own deps (public + private) merged with all public deps from other modules
type UsersModuleInjectables = InferModuleDependencies<UsersModule> & PublicDependencies

export class UserService {
  private readonly repository: UserRepository
  private readonly logger: Logger  // from CommonModule's public deps

  constructor(dependencies: UsersModuleInjectables) {
    this.repository = dependencies.userRepository  // own private dep — accessible
    this.logger = dependencies.logger              // public dep from another module — accessible
    // dependencies.billingRepository              // private dep from another module — type error
  }
}

class UserRepository {}

export class UsersModule extends AbstractModule {
  resolveDependencies(diOptions: DependencyInjectionOptions) {
    return {
      userService: asServiceClass(UserService),
      userRepository: asRepositoryClass(UserRepository),
    }
  }
}

declare module 'opinionated-machine' {
  interface PublicDependencies extends InferPublicModuleDependencies<UsersModule> {}
}

This gives each class access to exactly what the DI container provides at runtime: the module's own registered dependencies plus all public dependencies from secondary modules. Private dependencies from other modules are excluded at the type level, matching the runtime behavior.

Constructing the combined dependency type for DIContext

Use PublicDependencies when building the full dependency type:

import type { PublicDependencies } from 'opinionated-machine'

type Dependencies = InferModuleDependencies<PrimaryModule> & PublicDependencies

Avoiding circular dependencies in typed cradle parameters

Because InferModuleDependencies is inferred from the module's own resolveDependencies() return type, classes and functions that reference it inside the same module could create a circular type dependency. The library handles this automatically for class-based resolvers. For function-based resolvers, use the indexed access pattern described below.

Class-based resolvers (recommended — works automatically)

All class-based resolver functions (asSingletonClass, asServiceClass, asRepositoryClass, etc.) use a ClassValue<T> type internally, which infers the instance type from the class's prototype property rather than its constructor signature. This means classes can freely reference InferModuleDependencies in their constructors without causing circular type dependencies:

import { AbstractModule, type InferModuleDependencies, asServiceClass, asSingletonClass } from 'opinionated-machine'

export class MyService {
  // Constructor references ModuleDependencies — no circular dependency!
  constructor({ myHelper }: ModuleDependencies) {
    // myHelper is fully typed as MyHelper
  }
}

export class MyHelper {
  process() {}
}

export class MyModule extends AbstractModule {
  resolveDependencies(diOptions: DependencyInjectionOptions) {
    return {
      myService: asServiceClass(MyService),   // ClassValue<T> breaks the cycle
      myHelper: asSingletonClass(MyHelper),
    }
  }
}

export type ModuleDependencies = InferModuleDependencies<MyModule>

Prefer class-based resolvers wherever possible — they provide full type safety with no any fallback and no extra annotations needed.

Function-based resolvers (asSingletonFunction)

Function-based resolvers (asSingletonFunction) cannot use the ClassValue<T> trick because functions don't have a prototype property that separates return type from parameter types. Use indexed access on InferModuleDependencies to type individual dependencies, and always provide an explicit return type annotation on the factory function:

import { S3Client } from '@aws-sdk/client-s3'

// Inside resolveDependencies():
config: asSingletonClass(Config),
logger: asServiceClass(Logger),

s3Client: asSingletonFunction(
  ({ config, logger }: {
    config: ModuleDependencies['config']
    logger: ModuleDependencies['logger']
  }): S3Client => {
    return new S3Client({
      region: config.awsRegion,
      credentials: { accessKeyId: config.awsAccessKey, secretAccessKey: config.awsSecretKey },
      logger,
    })
  },
),

// ...

// At the bottom of the file:
export type ModuleDependencies = InferModuleDependencies<MyModule>

Indexed access types (ModuleDependencies['config']) are resolved lazily by TypeScript — it looks up individual properties without computing the entire ModuleDependencies type, avoiding the cycle. Each dependency stays in sync with the module's resolvers automatically.

For cross-module dependencies, use InferPublicModuleDependencies:

type CommonDeps = InferPublicModuleDependencies<CommonModule>

redis: asSingletonFunction(
  ({ config }: { config: CommonDeps['config'] }): Redis => {
    return new Redis({ host: config.redis.host, port: config.redis.port })
  },
),

The explicit return type is critical. Without it, TypeScript attempts to infer the return type from the function body, which requires resolving the parameter types, which triggers the circular reference:

// BREAKS — no explicit return type, TypeScript infers it from the body,
// requiring config's type to be resolved, triggering the cycle:
s3Client: asSingletonFunction(
  ({ config }: { config: ModuleDependencies['config'] }) => {
    return new S3Client({ region: config.awsRegion })
  },
),

Note: Pick<ModuleDependencies, 'a' | 'b'> does not work — Pick requires keyof ModuleDependencies, which forces TypeScript to resolve the entire type and triggers the circular reference. Each property must be accessed individually via indexed access.

Alternative: concrete parameter types

You can use concrete types instead of indexed access when the return type is dynamic or difficult to spell out explicitly. Because concrete types don't reference InferModuleDependencies, there is no circularity, so TypeScript can infer the return type for you:

// Return type inferred automatically — no explicit annotation needed
redisConfig: asSingletonFunction(
  ({ config }: { config: Config }) => {
    return config.getRedisConfig()
  },
),

The trade-off is that parameter types won't auto-sync if the module's resolver changes — but you'll still get a type error at the resolver level if the types diverge.

Fallback: class wrapper

If the adapter needs many dependencies and the inline syntax becomes too verbose, wrap the adaptation logic in a class and use asSingletonClass instead. The constructor can reference ModuleDependencies directly since ClassValue<T> breaks the cycle automatically — no return type annotation needed:

import { S3Client } from '@aws-sdk/client-s3'

// Full adapter — adds domain-specific methods:
class S3StorageAdapter {
  private readonly client: S3Client

  constructor({ config, logger }: ModuleDependencies) {
    this.client = new S3Client({
      region: config.awsRegion,
      credentials: { accessKeyId: config.awsAccessKey, secretAccessKey: config.awsSecretKey },
      logger,
    })
  }

  async upload(bucket: string, key: string, body: Buffer): Promise<string> {
    await this.client.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: body }))
    return `https://${bucket}.s3.amazonaws.com/${key}`
  }
}

// In resolveDependencies():
s3StorageAdapter: asSingletonClass(S3StorageAdapter),

// Thin wrapper — just bridges the constructor signature:
class S3ClientProvider {
  readonly client: S3Client

  constructor({ config, logger }: ModuleDependencies) {
    this.client = new S3Client({
      region: config.awsRegion,
      credentials: { accessKeyId: config.awsAccessKey, secretAccessKey: config.awsSecretKey },
      logger,
    })
  }
}

// In resolveDependencies():
s3ClientProvider: asSingletonClass(S3ClientProvider),

// Consumers access the original instance directly:
// this.s3ClientProvider.client.send(new PutObjectCommand({ ... }))

This is more heavyweight than a function resolver but provides full type safety with no explicit return type needed, and scales cleanly to any number of dependencies.

You can also use the explicit generic pattern if you prefer (e.g. for isolatedDeclarations mode):

export type ModuleDependencies = {
    service: Service
    messageQueueConsumer: MessageQueueConsumer
    jobWorker: JobWorker
    queueManager: QueueManager
}

export class MyModule extends AbstractModule<ModuleDependencies, ExternalDependencies> {
    resolveDependencies(
        diOptions: DependencyInjectionOptions,
        _externalDependencies: ExternalDependencies,
    ): MandatoryNameAndRegistrationPair<ModuleDependencies> {
        return { /* ... */ }
    }
}

Defining controllers

Controllers require using fastify-api-contracts and allow to define application routes.

import { buildFastifyRoute } from '@lokalise/fastify-api-contracts'
import { buildRestContract } from '@lokalise/api-contracts'
import { z } from 'zod/v4'
import { AbstractController } from 'opinionated-machine'

const BODY_SCHEMA = z.object({})
const PATH_PARAMS_SCHEMA = z.object({
  userId: z.string(),
})

const contract = buildRestContract({
  method: 'delete',
  successResponseBodySchema: BODY_SCHEMA,
  requestPathParamsSchema: PATH_PARAMS_SCHEMA,
  pathResolver: (pathParams) => `/users/${pathParams.userId}`,
})

export class MyController extends AbstractController<typeof MyController.contracts> {
  public static contracts = { deleteItem: contract } as const
  private readonly service: Service

  constructor({ service }: ModuleDependencies) {
      super()
      this.service = testService
  }

    private deleteItem = buildFastifyRoute(
        TestController.contracts.deleteItem,
        async (req, reply) => {
            req.log.info(req.params.userId)
            this.service.execute()
            await reply.status(204).send()
        },
    )

    public buildRoutes() {
        return {
            deleteItem: this.deleteItem,
        }
    }
}

Putting it all together

Typical usage with a fastify app looks like this:

import { serializerCompiler, validatorCompiler } from 'fastify-type-provider-zod'
import { createContainer } from 'awilix'
import { fastify } from 'fastify'
import { DIContext } from 'opinionated-machine'

const module = new MyModule()
const container = createContainer({
    injectionMode: 'PROXY',
})

type AppConfig = {
    DATABASE_URL: string
    // ...
    // everything related to app configuration
}

type ExternalDependencies = {
    logger: Logger // most likely you would like to reuse logger instance from fastify app
}

const context = new DIContext<ModuleDependencies, AppConfig, ExternalDependencies>(container, {
    messageQueueConsumersEnabled: [MessageQueueConsumer.QUEUE_ID],
    jobQueuesEnabled: false,
    jobWorkersEnabled: false,
    periodicJobsEnabled: false,
})

context.registerDependencies({
    modules: [module],
    dependencyOverrides: {}, // dependency overrides if necessary, usually for testing purposes
    configOverrides: {}, // config overrides if necessary, will be merged with value inside existing config
    configDependencyId?: string // what is the dependency id in the graph for the config entity. Only used for config overrides. Default value is `config`
}, 
    // external dependencies that are instantiated outside of DI
    {
    logger: app.logger
})

const app = fastify()
app.setValidatorCompiler(validatorCompiler)
app.setSerializerCompiler(serializerCompiler)

app.after(() => {
    context.registerRoutes(app)
})
await app.ready()

Resolver Functions

The library provides a set of resolver functions that wrap awilix's asClass and asFunction with sensible defaults for different types of dependencies. All resolvers create singletons by default.

Basic Resolvers

asSingletonClass(Type, opts?)

Basic singleton class resolver. Use for general-purpose dependencies that don't fit other categories.

service: asSingletonClass(MyService)

asSingletonFunction(fn, opts?)

Basic singleton function resolver. Use when you need to resolve a dependency using a factory function.

config: asSingletonFunction(() => loadConfig())

asClassWithConfig(Type, config, opts?)

Register a class with an additional config parameter passed to the constructor. Uses asFunction wrapper internally to pass the config as a second parameter. Requires PROXY injection mode.

myService: asClassWithConfig(MyService, { enableFeature: true })

The class constructor receives dependencies as the first parameter and config as the second:

class MyService {
  constructor(deps: Dependencies, config: { enableFeature: boolean }) {
    // ...
  }
}

Domain Layer Resolvers

asServiceClass(Type, opts?)

For service classes. Marks the dependency as public (exposed when module is used as secondary).

userService: asServiceClass(UserService)

asUseCaseClass(Type, opts?)

For use case classes. Marks the dependency as public.

createUserUseCase: asUseCaseClass(CreateUserUseCase)

asRepositoryClass(Type, opts?)

For repository classes. Marks the dependency as private (not exposed when module is secondary).

userRepository: asRepositoryClass(UserRepository)

asControllerClass(Type, opts?)

For REST controller classes. Marks the dependency as private. Use in resolveControllers().

userController: asControllerClass(UserController)

asSSEControllerClass(Type, sseOptions?, opts?)

For SSE controller classes. Marks the dependency as private with isSSEController: true for auto-detection. Automatically configures closeAllConnections as the async dispose method for graceful shutdown. When sseOptions.diOptions.isTestMode is true, enables the connection spy for testing. Use in resolveControllers() alongside REST controllers.

// In resolveControllers()
resolveControllers(diOptions: DependencyInjectionOptions) {
  return {
    userController: asControllerClass(UserController),
    notificationsSSEController: asSSEControllerClass(NotificationsSSEController, { diOptions }),
  }
}

asDualModeControllerClass(Type, sseOptions?, opts?)

For dual-mode controller classes that handle both SSE and JSON responses on the same route. Marks the dependency as private with isDualModeController: true for auto-detection. Inherits all SSE controller features including connection management and graceful shutdown. When sseOptions.diOptions.isTestMode is true, enables the connection spy for testing SSE mode.

// In resolveControllers()
resolveControllers(diOptions: DependencyInjectionOptions) {
  return {
    userController: asControllerClass(UserController),
    chatController: asDualModeControllerClass(ChatDualModeController, { diOptions }),
  }
}

Message Queue Resolvers

asMessageQueueHandlerClass(Type, mqOptions, opts?)

For message queue consumers following message-queue-toolkit conventions. Automatically handles start/close lifecycle and respects messageQueueConsumersEnabled option.

messageQueueConsumer: asMessageQueueHandlerClass(MessageQueueConsumer, {
    queueName: MessageQueueConsumer.QUEUE_ID,
    diOptions,
})

Background Job Resolvers

asEnqueuedJobWorkerClass(Type, workerOptions, opts?)

For enqueued job workers following background-jobs-common conventions. Automatically handles start/dispose lifecycle and respects enqueuedJobWorkersEnabled option.

jobWorker: asEnqueuedJobWorkerClass(JobWorker, {
    queueName: JobWorker.QUEUE_ID,
    diOptions,
})

asPgBossProcessorClass(Type, processorOptions, opts?)

For pg-boss job processor classes. Similar to asEnqueuedJobWorkerClass but uses start/stop lifecycle methods and initializes after pgBoss (priority 20).

enrichUserPresenceJob: asPgBossProcessorClass(EnrichUserPresenceJob, {
    queueName: EnrichUserPresenceJob.QUEUE_ID,
    diOptions,
})

asPeriodicJobClass(Type, workerOptions, opts?)

For periodic job classes following background-jobs-common conventions. Uses eager injection via register method and respects periodicJobsEnabled option.

cleanupJob: asPeriodicJobClass(CleanupJob, {
    jobName: CleanupJob.JOB_NAME,
    diOptions,
})

asJobQueueClass(Type, queueOptions, opts?)

For job queue classes. Marks the dependency as public. Respects jobQueuesEnabled option.

queueManager: asJobQueueClass(QueueManager, {
    diOptions,
})

asEnqueuedJobQueueManagerFunction(fn, diOptions, opts?)

For job queue manager factory functions. Automatically calls start() with resolved enabled queues during initialization.

jobQueueManager: asEnqueuedJobQueueManagerFunction(
    createJobQueueManager,
    diOptions,
)

Server-Sent Events (SSE)

The library provides first-class support for Server-Sent Events using @fastify/sse. SSE enables real-time, unidirectional streaming from server to client - perfect for notifications, live updates, and streaming responses (like AI chat completions).

Prerequisites

Register the @fastify/sse plugin before using SSE controllers:

import FastifySSEPlugin from '@fastify/sse'

const app = fastify()
await app.register(FastifySSEPlugin)

Defining SSE Contracts

Use buildSseContract from @lokalise/api-contracts to define SSE routes. The method field determines the HTTP method. Paths are defined using pathResolver, a type-safe function that receives typed params and returns the URL path:

import { z } from 'zod'
import { buildSseContract } from '@lokalise/api-contracts'

// GET-based SSE stream with path params
export const channelStreamContract = buildSseContract({
  method: 'get',
  pathResolver: (params) => `/api/channels/${params.channelId}/stream`,
  requestPathParamsSchema: z.object({ channelId: z.string() }),
  requestQuerySchema: z.object({}),
  requestHeaderSchema: z.object({}),
  serverSentEventSchemas: {
    message: z.object({ content: z.string() }),
  },
})

// GET-based SSE stream without path params
export const notificationsContract = buildSseContract({
  method: 'get',
  pathResolver: () => '/api/notifications/stream',
  requestPathParamsSchema: z.object({}),
  requestQuerySchema: z.object({ userId: z.string().optional() }),
  requestHeaderSchema: z.object({}),
  serverSentEventSchemas: {
    notification: z.object({
      id: z.string(),
      message: z.string(),
    }),
  },
})

// POST-based SSE stream (e.g., AI chat completions)
export const chatCompletionContract = buildSseContract({
  method: 'post',
  pathResolver: () => '/api/chat/completions',
  requestPathParamsSchema: z.object({}),
  requestQuerySchema: z.object({}),
  requestHeaderSchema: z.object({}),
  requestBodySchema: z.object({
    message: z.string(),
    stream: z.literal(true),
  }),
  serverSentEventSchemas: {
    chunk: z.object({ content: z.string() }),
    done: z.object({ totalTokens: z.number() }),
  },
})

For reusable event schema definitions, you can use the SSEEventSchemas type (requires TypeScript 4.9+ for satisfies):

import { z } from 'zod'
import type { SSEEventSchemas } from 'opinionated-machine'

// Define reusable event schemas for multiple contracts
const streamingEvents = {
  chunk: z.object({ content: z.string() }),
  done: z.object({ totalTokens: z.number() }),
  error: z.object({ code: z.number(), message: z.string() }),
} satisfies SSEEventSchemas

Creating SSE Controllers

SSE controllers extend AbstractSSEController and must implement a two-parameter constructor. Use buildHandler for automatic type inference of request parameters:

import {
  AbstractSSEController,
  buildHandler,
  type SSEControllerConfig,
  type SSESession
} from 'opinionated-machine'

type Contracts = {
  notificationsStream: typeof notificationsContract
}

type Dependencies = {
  notificationService: NotificationService
}

export class NotificationsSSEController extends AbstractSSEController<Contracts> {
  public static contracts = {
    notificationsStream: notificationsContract,
  } as const

  private readonly notificationService: NotificationService

  // Required: two-parameter constructor (deps object, optional SSE config)
  constructor(deps: Dependencies, sseConfig?: SSEControllerConfig) {
    super(deps, sseConfig)
    this.notificationService = deps.notificationService
  }

  public buildSSERoutes() {
    return {
      notificationsStream: this.handleStream,
    }
  }

  // Handler with automatic type inference from contract
  // sse.start(mode) returns a session with type-safe event sending
  // Options (onConnect, onClose) are passed as the third parameter to buildHandler
  private handleStream = buildHandler(notificationsContract, {
    sse: async (request, sse) => {
      // request.query is typed from contract: { userId?: string }
      const userId = request.query.userId ?? 'anonymous'

      // Start streaming with 'keepAlive' mode - stays open for external events
      // Sends HTTP 200 + SSE headers immediately
      const session = sse.start('keepAlive', { context: { userId } })

      // For external triggers (subscriptions, timers, message queues), use sendEventInternal.
      // session.send is only available within this handler's scope - external callbacks
      // like subscription handlers execute later, outside this function, so they can't access session.
      // sendEventInternal is a controller method, so it's accessible from any callback.
      // It provides autocomplete for all event names defined in the controller's contracts.
      this.notificationService.subscribe(userId, async (notification) => {
        await this.sendEventInternal(session.id, {
          event: 'notification',
          data: notification,
        })
      })

      // For direct sending within the handler, use the session's send method.
      // It provides stricter per-route typing (only events from this specific contract).
      await session.send('notification', { id: 'welcome', message: 'Connected!' })

      // 'keepAlive' mode: handler returns, but connection stays open for subscription events
      // Connection closes when client disconnects or server calls closeConnection()
    },
  }, {
    onConnect: (session) => console.log('Client connected:', session.id),
    onClose: (session, reason) => {
      const userId = session.context?.userId as string
      this.notificationService.unsubscribe(userId)
      console.log(`Client disconnected (${reason}):`, session.id)
    },
  })
}

Type-Safe SSE Handlers with buildHandler

For automatic type inference of request parameters (similar to buildFastifyRoute for regular controllers), use buildHandler:

import {
  AbstractSSEController,
  buildHandler,
  type SSEControllerConfig,
  type SSESession
} from 'opinionated-machine'

class ChatSSEController extends AbstractSSEController<Contracts> {
  public static contracts = {
    chatCompletion: chatCompletionContract,
  } as const

  constructor(deps: Dependencies, sseConfig?: SSEControllerConfig) {
    super(deps, sseConfig)
  }

  // Handler with automatic type inference from contract
  // sse.start(mode) returns session with fully typed send()
  private handleChatCompletion = buildHandler(chatCompletionContract, {
    sse: async (request, sse) => {
      // request.body is typed as { message: string; stream: true }
      // request.query, request.params, request.headers all typed from contract
      const words = request.body.message.split(' ')

      // Start streaming with 'autoClose' mode - closes after handler completes
      // Sends HTTP 200 + SSE headers immediately
      const session = sse.start('autoClose')

      for (const word of words) {
        // session.send() provides compile-time type checking for event names and data
        await session.send('chunk', { content: word })
      }

      // 'autoClose' mode: connection closes automatically when handler returns
    },
  })

  public buildSSERoutes() {
    return {
      chatCompletion: this.handleChatCompletion,
    }
  }
}

You can also use InferSSERequest<Contract> for manual type annotation when needed:

import { type InferSSERequest, type SSEContext, type SSESession } from 'opinionated-machine'

private handleStream = async (
  request: InferSSERequest<typeof chatCompletionContract>,
  sse: SSEContext<typeof chatCompletionContract['serverSentEventSchemas']>,
) => {
  // request.body, request.params, etc. all typed from contract
  const session = sse.start('autoClose')
  // session.send() is typed based on contract serverSentEventSchemas
  await session.send('chunk', { content: 'hello' })
  // 'autoClose' mode: connection closes when handler returns
}

SSE Controllers Without Dependencies

For controllers without dependencies, still provide the two-parameter constructor:

export class SimpleSSEController extends AbstractSSEController<Contracts> {
  constructor(deps: object, sseConfig?: SSEControllerConfig) {
    super(deps, sseConfig)
  }

  // ... implementation
}

Registering SSE Controllers

Use asSSEControllerClass in your module's resolveControllers method alongside REST controllers. SSE controllers are automatically detected via the isSSEController flag and registered in the DI container:

import { AbstractModule, type InferModuleDependencies, asControllerClass, asSSEControllerClass, asServiceClass, type DependencyInjectionOptions } from 'opinionated-machine'

export class NotificationsModule extends AbstractModule {
  resolveDependencies() {
    return {
      notificationService: asServiceClass(NotificationService),
    }
  }

  resolveControllers(diOptions: DependencyInjectionOptions) {
    return {
      // REST controller
      usersController: asControllerClass(UsersController),
      // SSE controller (automatically detected and registered for SSE routes)
      notificationsSSEController: asSSEControllerClass(NotificationsSSEController, { diOptions }),
    }
  }
}

export type NotificationsModuleDependencies = InferModuleDependencies<NotificationsModule>

Registering SSE Routes

Call registerSSERoutes after registering the @fastify/sse plugin:

const app = fastify()
app.setValidatorCompiler(validatorCompiler)
app.setSerializerCompiler(serializerCompiler)

// Register @fastify/sse plugin first
await app.register(FastifySSEPlugin)

// Then register SSE routes
context.registerSSERoutes(app)

// Optionally with global preHandler for authentication
context.registerSSERoutes(app, {
  preHandler: async (request, reply) => {
    if (!request.headers.authorization) {
      reply.code(401).send({ error: 'Unauthorized' })
    }
  },
})

await app.ready()

Broadcasting Events

Send events to multiple connections using broadcast() or broadcastIf():

// Broadcast to ALL connected clients
await this.broadcast({
  event: 'system',
  data: { message: 'Server maintenance in 5 minutes' },
})

// Broadcast to sessions matching a predicate
await this.broadcastIf(
  { event: 'channel-update', data: { channelId: '123', newMessage: msg } },
  (session) => session.context.channelId === '123',
)

Both methods return the number of clients the message was successfully sent to.

Controller-Level Hooks

Override these optional methods on your controller for global session handling:

class MySSEController extends AbstractSSEController<Contracts> {
  // Called AFTER session is registered (for all routes)
  protected onConnectionEstablished(session: SSESession): void {
    this.metrics.incrementConnections()
  }

  // Called BEFORE session is unregistered (for all routes)
  protected onConnectionClosed(session: SSESession): void {
    this.metrics.decrementConnections()
  }
}

Route-Level Options

Each route can have its own preHandler, lifecycle hooks, and logger. Pass these as the third parameter to buildHandler:

public buildSSERoutes() {
  return {
    adminStream: this.handleAdminStream,
  }
}

private handleAdminStream = buildHandler(adminStreamContract, {
  sse: async (request, sse) => {
    const session = sse.start('keepAlive')
    // ... handler logic
  },
}, {
  // Route-specific authentication
  preHandler: (request, reply) => {
    if (!request.user?.isAdmin) {
      reply.code(403).send({ error: 'Forbidden' })
    }
  },
  onConnect: (session) => console.log('Admin connected'),
  onClose: (session, reason) => console.log(`Admin disconnected (${reason})`),
  // Handle client reconnection with Last-Event-ID
  onReconnect: async (session, lastEventId) => {
    // Return events to replay, or handle manually
    return this.getEventsSince(lastEventId)
  },
  // Optional: logger for error handling (requires @lokalise/node-core)
  logger: this.logger,
})

Available route options:

| Option | Description | | -------- | ------------- | | preHandler | Authentication/authorization hook that runs before SSE session | | onConnect | Called after client connects (SSE handshake complete) | | onClose | Called when session closes (client disconnect, network failure, or server close). Receives (session, reason) where reason is 'server' or 'client' | | onReconnect | Handle Last-Event-ID reconnection, return events to replay | | logger | Optional SSELogger for error handling (compatible with pino and @lokalise/node-core). If not provided, errors in lifecycle hooks are silently ignored | | serializer | Custom serializer for SSE data (e.g., for custom JSON encoding) | | heartbeatInterval | Interval in ms for heartbeat keep-alive messages |

onClose reason parameter:

  • 'server': Server explicitly closed the session (via closeConnection() or autoClose mode)
  • 'client': Client closed the session (EventSource.close(), navigation, network failure)
options: {
  onConnect: (session) => console.log('Client connected'),
  onClose: (session, reason) => {
    console.log(`Session closed (${reason}):`, session.id)
    // reason is 'server' or 'client'
  },
  serializer: (data) => JSON.stringify(data, null, 2), // Pretty-print JSON
  heartbeatInterval: 30000, // Send heartbeat every 30 seconds
}

SSE Session Methods

The session object returned by sse.start(mode) provides several useful methods:

private handleStream = buildHandler(streamContract, {
  sse: async (request, sse) => {
    const session = sse.start('autoClose')

    // Check if session is still active
    if (session.isConnected()) {
      await session.send('status', { connected: true })
    }

    // Get raw writable stream for advanced use cases (e.g., pipeline)
    const stream = session.getStream()

    // Stream messages from an async iterable with automatic validation
    async function* generateMessages() {
      yield { event: 'message' as const, data: { text: 'Hello' } }
      yield { event: 'message' as const, data: { text: 'World' } }
    }
    await session.sendStream(generateMessages())

    // 'autoClose' mode: connection closes when handler returns
  },
})

| Method | Description | | -------- | ------------- | | send(event, data, options?) | Send a typed event (validates against contract schema) | | isConnected() | Check if the session is still active | | getStream() | Get the underlying WritableStream for advanced use cases | | sendStream(messages) | Stream messages from an AsyncIterable with validation |

Graceful Shutdown

SSE controllers automatically close all connections during application shutdown. This is configured by asSSEControllerClass which sets closeAllConnections as the async dispose method with priority 5 (early in shutdown sequence).

Error Handling

When sendEvent() fails (e.g., client disconnected), it:

  • Returns false to indicate failure
  • Automatically removes the dead connection from tracking
  • Prevents further send attempts to that connection
const sent = await this.sendEvent(connectionId, { event: 'update', data })
if (!sent) {
  // Connection was closed or failed - already removed from tracking
  this.cleanup(connectionId)
}

Lifecycle hook errors (onConnect, onReconnect, onClose):

  • All lifecycle hooks are wrapped in try/catch to prevent crashes
  • If a logger is provided in route options, errors are logged with context
  • If no logger is provided, errors are silently ignored
  • The session lifecycle continues even if a hook throws
// Provide a logger to capture lifecycle errors
public buildSSERoutes() {
  return {
    stream: this.handleStream,
  }
}

private handleStream = buildHandler(streamContract, {
  sse: async (request, sse) => {
    const session = sse.start('autoClose')
    // ... handler logic
  },
}, {
  logger: this.logger, // pino-compatible logger
  onConnect: (session) => { /* may throw */ },
  onClose: (session, reason) => { /* may throw */ },
})

Long-lived Connections vs Request-Response Streaming

SSE session lifetime is determined by the mode passed to sse.start(mode):

// sse.start('autoClose') - close connection when handler returns (request-response pattern)
// sse.start('keepAlive') - keep connection open for external events (subscription pattern)
// sse.respond(code, body) - send HTTP response before streaming (early return)

Long-lived sessions (notifications, live updates):

  • Handler starts streaming with sse.start('keepAlive')
  • Session stays open indefinitely after handler returns
  • Events are sent later via callbacks using sendEventInternal()
  • Client closes session when done (e.g., eventSource.close() or navigating away)
  • Server cleans up via onConnectionClosed() hook
private handleStream = buildHandler(streamContract, {
  sse: async (request, sse) => {
    // Start streaming with 'keepAlive' mode - stays open for external events
    const session = sse.start('keepAlive')

    // Set up subscription - events sent via callback AFTER handler returns
    this.service.subscribe(session.id, (data) => {
      this.sendEventInternal(session.id, { event: 'update', data })
    })
    // 'keepAlive' mode: handler returns, but connection stays open
  },
})

// Clean up when client disconnects
protected onConnectionClosed(session: SSESession): void {
  this.service.unsubscribe(session.id)
}

Request-response streaming (AI completions):

  • Handler starts streaming with sse.start('autoClose')
  • Use session.send() for type-safe event sending within the handler
  • Session automatically closes when handler returns
private handleChatCompletion = buildHandler(chatCompletionContract, {
  sse: async (request, sse) => {
    // Start streaming with 'autoClose' mode - closes when handler returns
    const session = sse.start('autoClose')

    const words = request.body.message.split(' ')
    for (const word of words) {
      await session.send('chunk', { content: word })
    }
    await session.send('done', { totalTokens: words.length })

    // 'autoClose' mode: connection closes automatically when handler returns
  },
})

Error handling before streaming:

Use sse.respond(code, body) to return an HTTP response before streaming starts. This is useful for any early return: validation errors, not found, redirects, etc.

private handleStream = buildHandler(streamContract, {
  sse: async (request, sse) => {
    // Early return BEFORE starting stream - can return any HTTP response
    const entity = await this.service.find(request.params.id)
    if (!entity) {
      return sse.respond(404, { error: 'Entity not found' })
    }

    // Validation passed - start streaming with autoClose mode
    const session = sse.start('autoClose')
    await session.send('data', entity)
    // Connection closes automatically when handler returns
  },
})

### SSE Parsing Utilities

The library provides production-ready utilities for parsing SSE (Server-Sent Events) streams:

| Function | Use Case |
|----------|----------|
| `parseSSEEvents` | **Testing & complete responses** - when you have the full response body |
| `parseSSEBuffer` | **Production streaming** - when data arrives incrementally in chunks |

#### parseSSEEvents

Parse a complete SSE response body into an array of events.

**When to use:** Testing with Fastify's `inject()`, or when the full response is available (e.g., request-response style SSE like OpenAI completions):

```ts
import { parseSSEEvents, type ParsedSSEEvent } from 'opinionated-machine'

const responseBody = `event: notification
data: {"id":"1","message":"Hello"}

event: notification
data: {"id":"2","message":"World"}

`

const events: ParsedSSEEvent[] = parseSSEEvents(responseBody)
// Result:
// [
//   { event: 'notification', data: '{"id":"1","message":"Hello"}' },
//   { event: 'notification', data: '{"id":"2","message":"World"}' }
// ]

// Access parsed data
const notifications = events.map(e => JSON.parse(e.data))

parseSSEBuffer

Parse a streaming SSE buffer, handling incomplete events at chunk boundaries.

When to use: Production clients consuming real-time SSE streams (notifications, live feeds, chat) where events arrive incrementally:

import { parseSSEBuffer, type ParseSSEBufferResult } from 'opinionated-machine'

let buffer = ''

// As chunks arrive from a stream...
for await (const chunk of stream) {
  buffer += chunk
  const result: ParseSSEBufferResult = parseSSEBuffer(buffer)

  // Process complete events
  for (const event of result.events) {
    console.log('Received:', event.event, event.data)
  }

  // Keep incomplete data for next chunk
  buffer = result.remaining
}

Production example with fetch:

const response = await fetch(url)
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''

while (true) {
  const { done, value } = await reader.read()
  if (done) break

  buffer += decoder.decode(value, { stream: true })
  const { events, remaining } = parseSSEBuffer(buffer)
  buffer = remaining

  for (const event of events) {
    console.log('Received:', event.event, JSON.parse(event.data))
  }
}

ParsedSSEEvent Type

Both functions return events with this structure:

type ParsedSSEEvent = {
  id?: string      // Event ID (from "id:" field)
  event?: string   // Event type (from "event:" field)
  data: string     // Event data (from "data:" field, always present)
  retry?: number   // Reconnection interval (from "retry:" field)
}

Testing SSE Controllers

Enable the connection spy for testing by passing isTestMode: true in diOptions:

import { createContainer } from 'awilix'
import { DIContext, SSETestServer, SSEHttpClient } from 'opinionated-machine'

describe('NotificationsSSEController', () => {
  let server: SSETestServer
  let controller: NotificationsSSEController

  beforeEach(async () => {
    // Create test server with isTestMode enabled
    server = await SSETestServer.create(
      async (app) => {
        // Register your SSE routes here
      },
      {
        setup: async () => {
          // Set up DI container and resources
          return { context }
        },
      }
    )

    controller = server.resources.context.diContainer.cradle.notificationsSSEController
  })

  afterEach(async () => {
    await server.resources.context.destroy()
    await server.close()
  })

  it('receives notifications over SSE', async () => {
    // Connect with awaitServerConnection to eliminate race condition
    const { client, serverConnection } = await SSEHttpClient.connect(
      server.baseUrl,
      '/api/notifications/stream',
      {
        query: { userId: 'test-user' },
        awaitServerConnection: { controller },
      },
    )

    expect(client.response.ok).toBe(true)

    // Start collecting events
    const eventsPromise = client.collectEvents(2)

    // Send events from server (serverConnection is ready immediately)
    await controller.sendEvent(serverConnection.id, {
      event: 'notification',
      data: { id: '1', message: 'Hello!' },
    })

    await controller.sendEvent(serverConnection.id, {
      event: 'notification',
      data: { id: '2', message: 'World!' },
    })

    // Wait for events
    const events = await eventsPromise

    expect(events).toHaveLength(2)
    expect(JSON.parse(events[0].data)).toEqual({ id: '1', message: 'Hello!' })
    expect(JSON.parse(events[1].data)).toEqual({ id: '2', message: 'World!' })

    // Clean up
    client.close()
  })
})

SSESessionSpy API

The connectionSpy is available when isTestMode: true is passed to asSSEControllerClass:

// Wait for a session to be established (with timeout)
const session = await controller.connectionSpy.waitForConnection({ timeout: 5000 })

// Wait for a session matching a predicate (useful for multiple sessions)
const session = await controller.connectionSpy.waitForConnection({
  timeout: 5000,
  predicate: (s) => s.request.url.includes('/api/notifications'),
})

// Check if a specific session is active
const isConnected = controller.connectionSpy.isConnected(sessionId)

// Wait for a specific session to disconnect
await controller.connectionSpy.waitForDisconnection(sessionId, { timeout: 5000 })

// Get all session events (connect/disconnect history)
const events = controller.connectionSpy.getEvents()

// Clear event history and claimed sessions between tests
controller.connectionSpy.clear()

Note: waitForConnection tracks "claimed" sessions internally. Each call returns a unique unclaimed session, allowing sequential waits for the same URL path without returning the same session twice. This is used internally by SSEHttpClient.connect() with awaitServerConnection.

Session Monitoring

Controllers have access to utility methods for monitoring sessions:

// Get count of active sessions
const count = this.getConnectionCount()

// Get all active sessions (for iteration/inspection)
const sessions = this.getConnections()

// Check if session spy is enabled (useful for conditional logic)
if (this.hasConnectionSpy()) {
  // ...
}

SSE Test Utilities

The library provides utilities for testing SSE endpoints.

Two transport methods:

  • Inject - Uses Fastify's built-in inject() to simulate HTTP requests directly in-memory, without network overhead. No listen() required. Handler must close the session for the request to complete.
  • Real HTTP - Actual HTTP via fetch(). Requires the server to be listening. Supports long-lived sessions.

Quick Reference

| Utility | Connection | Requires Contract | Use Case | |---------|------------|-------------------|----------| | SSEInjectClient | Inject (in-memory) | No | Request-response SSE without contracts | | injectSSE / injectPayloadSSE | Inject (in-memory) | Yes | Request-response SSE with type-safe contracts | | SSEHttpClient | Real HTTP | No | Long-lived SSE connections |

SSEInjectClient and injectSSE/injectPayloadSSE do the same thing (Fastify inject), but injectSSE/injectPayloadSSE provide type safety via contracts while SSEInjectClient works with raw URLs.

Inject vs HTTP Comparison

| Feature | Inject (SSEInjectClient, injectSSE) | HTTP (SSEHttpClient) | |---------|----------------------------------------|------------------------| | Connection | Fastify's inject() - in-memory | Real HTTP via fetch() | | Event delivery | All events returned at once (after handler closes) | Events arrive incrementally | | Connection lifecycle | Handler must close for request to complete | Can stay open indefinitely | | Server requirement | No listen() needed | Requires running server | | Best for | OpenAI-style streaming, batch exports | Notifications, live feeds, chat |

SSETestServer

Creates a test server with @fastify/sse pre-configured:

import { SSETestServer, SSEHttpClient } from 'opinionated-machine'

// Basic usage
const server = await SSETestServer.create(async (app) => {
  app.get('/api/events', async (request, reply) => {
    reply.sse({ event: 'message', data: { hello: 'world' } })
    reply.sse.close()
  })
})

// Connect and test
const client = await SSEHttpClient.connect(server.baseUrl, '/api/events')
const events = await client.collectEvents(1)
expect(events[0].event).toBe('message')

// Cleanup
client.close()
await server.close()

With custom resources (DI container, controllers):

const server = await SSETestServer.create(
  async (app) => {
    // Register routes using resources from setup
    myController.registerRoutes(app)
  },
  {
    configureApp: async (app) => {
      app.setValidatorCompiler(validatorCompiler)
    },
    setup: async () => {
      // Resources are available via server.resources
      const container = createContainer()
      return { container }
    },
  }
)

const { container } = server.resources

SSEHttpClient

For testing long-lived SSE connections using real HTTP:

import { SSEHttpClient } from 'opinionated-machine'

// Connect to SSE endpoint with awaitServerConnection (recommended)
// This eliminates the race condition between client connect and server-side registration
const { client, serverConnection } = await SSEHttpClient.connect(
  server.baseUrl,
  '/api/stream',
  {
    query: { userId: 'test' },
    headers: { authorization: 'Bearer token' },
    awaitServerConnection: { controller }, // Pass your SSE controller
  },
)

// serverConnection is ready to use immediately
expect(client.response.ok).toBe(true)
await controller.sendEvent(serverConnection.id, { event: 'test', data: {} })

// Collect events by count with timeout
const events = await client.collectEvents(3, 5000) // 3 events, 5s timeout

// Or collect until a predicate is satisfied
const events = await client.collectEvents(
  (event) => event.event === 'done',
  5000,
)

// Iterate over events as they arrive
for await (const event of client.events()) {
  console.log(event.event, event.data)
  if (event.event === 'done') break
}

// Cleanup
client.close()

collectEvents(countOrPredicate, timeout?)

Collects events until a count is reached or a predicate returns true.

| Parameter | Type | Description | |-----------|------|-------------| | countOrPredicate | number \| (event) => boolean | Number of events to collect, or predicate that returns true when collection should stop | | timeout | number | Maximum time to wait in milliseconds (default: 5000) |

Returns Promise<ParsedSSEEvent[]>. Throws an error if the timeout is reached before the condition is met.

// Collect exactly 3 events
const events = await client.collectEvents(3)

// Collect with custom timeout
const events = await client.collectEvents(5, 10000) // 10s timeout

// Collect until a specific event type (the matching event IS included)
const events = await client.collectEvents((event) => event.event === 'done')

// Collect until condition with timeout
const events = await client.collectEvents(
  (event) => JSON.parse(event.data).status === 'complete',
  30000,
)

events(signal?)

Async generator that yields events as they arrive. Accepts an optional AbortSignal for cancellation.

// Basic iteration
for await (const event of client.events()) {
  console.log(event.event, event.data)
  if (event.event === 'done') break
}

// With abort signal for timeout control
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 5000)

try {
  for await (const event of client.events(controller.signal)) {
    console.log(event)
  }
} finally {
  clearTimeout(timeoutId)
}

When to omit awaitServerConnection

Omit awaitServerConnection only in these cases:

  • Testing against external SSE endpoints (not your own controller)
  • When isTestMode: false (connectionSpy not available)
  • Simple smoke tests that only verify response headers/status without sending server events

Consequence: Without awaitServerConnection, connect() resolves as soon as HTTP headers are received. Server-side connection registration may not have completed yet, so you cannot reliably send events from the server immediately after connect() returns.

// Example: smoke test that only checks connection works
const client = await SSEHttpClient.connect(server.baseUrl, '/api/stream')
expect(client.response.ok).toBe(true)
expect(client.response.headers.get('content-type')).toContain('text/event-stream')
client.close()

SSEInjectClient

For testing request-response style SSE streams (like OpenAI completions):

import { SSEInjectClient } from 'opinionated-machine'

const client = new SSEInjectClient(app) // No server.listen() needed

// GET request
const conn = await client.connect('/api/export/progress', {
  headers: { authorization: 'Bearer token' },
})

// POST request with body (OpenAI-style)
const conn = await client.connectWithBody(
  '/api/chat/completions',
  { model: 'gpt-4', messages: [...], stream: true },
)

// All events are available immediately (inject waits for complete response)
expect(conn.getStatusCode()).toBe(200)
const events = conn.getReceivedEvents()
const chunks = events.filter(e => e.event === 'chunk')

Contract-Aware Inject Helpers

For typed testing with SSE contracts:

import { injectSSE, injectPayloadSSE, parseSSEEvents } from 'opinionated-machine'

// For GET SSE endpoints with contracts
const { closed } = injectSSE(app, notificationsContract, {
  query: { userId: 'test' },
})
const result = await closed
const events = parseSSEEvents(result.body)

// For POST/PUT/PATCH SSE endpoints with contracts
const { closed } = injectPayloadSSE(app, chatCompletionContract, {
  body: { message: 'Hello', stream: true },
})
const result = await closed
const events = parseSSEEvents(result.body)

Dual-Mode Controllers (SSE + Sync)

Dual-mode controllers handle both SSE streaming and sync responses on the same route path, automatically branching based on the Accept header. This is ideal for APIs that support both real-time streaming and traditional request-response patterns.

Overview

| Accept Header | Response Mode | | ------------- | ------------- | | text/event-stream | SSE streaming | | application/json | Sync response | | */* or missing | Sync (default, configurable) |

Dual-mode controllers extend AbstractDualModeController which inherits from AbstractSSEController, providing access to all SSE features (connection management, broadcasting, lifecycle hooks) while adding sync response support.

Defining Dual-Mode Contracts

Dual-mode contracts define endpoints that can return either a complete sync response or stream SSE events, based on the client's Accept header. Use dual-mode when:

  • Clients may want immediate results (sync) or real-time updates (SSE)
  • You're building OpenAI-style APIs where stream: true triggers SSE
  • You need polling fallback for clients that don't support SSE

To create a dual-mode contract, include a successResponseBodySchema in your buildSseContract call:

  • Has successResponseBodySchema but no requestBodySchema → GET dual-mode route
  • Has both successResponseBodySchema and requestBodySchema → POST/PUT/PATCH dual-mode route
import { z } from 'zod'
import { buildSseContract } from '@lokalise/api-contracts'

// GET dual-mode route (polling or streaming job status)
export const jobStatusContract = buildSseContract({
  method: 'get',
  pathResolver: (params) => `/api/jobs/${params.jobId}/status`,
  requestPathParamsSchema: z.object({ jobId: z.string().uuid() }),
  requestQuerySchema: z.object({ verbose: z.string().optional() }),
  requestHeaderSchema: z.object({}),
  successResponseBodySchema: z.object({
    status: z.enum(['pending', 'running', 'completed', 'failed']),
    progress: z.number(),
    result: z.string().optional(),
  }),
  serverSentEventSchemas: {
    progress: z.object({ percent: z.number(), message: z.string().optional() }),
    done: z.object({ result: z.string() }),
  },
})

// POST dual-mode route (OpenAI-style chat completion)
export const chatCompletionContract = buildSseContract({
  method: 'post',
  pathResolver: (params) => `/api/chats/${params.chatId}/completions`,
  requestPathParamsSchema: z.object({ chatId: z.string().uuid() }),
  requestQuerySchema: z.object({}),
  requestHeaderSchema: z.object({ authorization: z.string() }),
  requestBodySchema: z.object({ message: z.string() }),
  successResponseBodySchema: z.object({
    reply: z.string(),
    usage: z.object({ tokens: z.number() }),
  }),
  serverSentEventSchemas: {
    chunk: z.object({ delta: z.string() }),
    done: z.object({ usage: z.object({ total: z.number() }) }),
  },
})

Note: Dual-mode contracts use pathResolver instead of static path for type-safe path construction. The pathResolver function receives typed params and returns the URL path.

Response Headers (Sync Mode)

Dual-mode contracts support an optional responseHeaderSchema to define and validate headers sent with sync responses. This is useful for documenting expected headers (rate limits, pagination, cache control) and validating that your handlers set them correctly:

export const rateLimitedContract = buildSseContract({
  method: 'post',
  pathResolver: () => '/api/rate-limited',
  requestPathParamsSchema: z.object({}),
  requestQuerySchema: z.object({}),
  requestHeaderSchema: z.object({}),
  requestBodySchema: z.object({ data: z.string() }),
  successResponseBodySchema: z.object({ result: z.string() }),
  // Define expected response headers
  responseHeaderSchema: z.object({
    'x-ratelimit-limit': z.string(),
    'x-ratelimit-remaining': z.string(),
    'x-ratelimit-reset': z.string(),
  }),
  serverSentEventSchemas: {
    result: z.object({ success: z.boolean() }),
  },
})

In your handler, set headers using reply.header():

handlers: buildHandler(rateLimitedContract, {
  sync: async (request, reply) => {
    reply.header('x-ratelimit-limit', '100')
    reply.header('x-ratelimit-remaining', '99')
    reply.header('x-ratelimit-reset', '1640000000')
    return { result: 'success' }
  },
  sse: async (request, sse) => {
    const session = sse.start('autoClose')
    // ... send events ...
    // Connection closes automatically when handler returns
  },
})

If the handler doesn't set the required headers, validation will fail with a RESPONSE_HEADERS_VALIDATION_FAILED error.

Status-Specific Response Schemas (responseBodySchemasByStatusCode)

Dual-mode and SSE contracts support responseBodySchemasByStatusCode to define and validate responses for specific HTTP status codes. This is typically used for error responses (4xx, 5xx), but can define schemas for any status code where you need a different response shape:

export const resourceContract = buildSseContract({
  method: 'post',
  pathResolv