npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@sanity/worker-channels

v1.1.0

Published

Type-safe, structured communication between worker threads and parent processes via TypeScript meta-programming.

Downloads

42,101

Readme

@sanity/worker-channels

npm version bundle size github status checks npm weekly downloads semantic-release: angular

Type-safe, structured communication between worker threads and parent processes via TypeScript meta-programming.

Table of Contents

Motivation

Worker communication often becomes messy when different types of messages flow through the same channel. Worker channels provide type-safe, structured communication with clear contracts defined entirely in TypeScript.

// ================= worker.ts ====================
// traditional worker — simple enough
// ================================================

import {parentPort, workerData} from 'node:worker_threads'
import {processImage} from './processImage'

const imageFiles = workerData

parentPort.postMessage({
  type: 'started',
  expected: imageFiles.length,
})

let processed = 0
for (let i = 0; i < imageFiles.length; i++) {
  const file = imageFiles[i]

  try {
    const result = await processImage(file)
    parentPort.postMessage({
      type: 'progress',
      file: file.name,
      completed: i + 1,
      result,
    })
    processed++
  } catch {
    // ...
  }
}

parentPort.postMessage({
  type: 'finished',
  processed,
})
// ================ worker.ts ========================
// worker-channel worker — still simple and type-safe!
// ===================================================
import {parentPort, workerData} from 'node:worker_threads'
import {WorkerChannelReporter} from '@sanity/worker-channels'
import {processImage} from './processImage'
import type {ImageChannel} from '...'

const report = WorkerChannelReporter.from<ImageChannel>(parentPort)
const imageFiles = workerData

report.event.started({expected: imageFiles.length})

let processed = 0
for (let i = 0; i < imageFiles.length; i++) {
  const file = imageFiles[i]

  try {
    const result = await processImage(file)
    report.stream.progress.emit({
      file: file.name,
      completed: i + 1,
      result,
    })
    processed++
  } catch {
    // ...
  }
}
report.stream.progress.end()

report.event.finished({processed})
// ================= main.ts ===================
// a traditional parent — spaghetti control flow
// =============================================
import {Worker} from 'node:worker_threads'

const worker = new Worker(/* ... */)
let expected = 0

await new Promise((resolve, reject) => {
  worker.on('message', (msg) => {
    switch (msg.type) {
      case 'started':
        expected = msg.expected
        console.log(`Processing ${expected} images...`)
        break
      case 'progress':
        const {file, completed} = msg
        console.log(`${file}: ${completed}/${expected}`)
        break
      case 'finished':
        const {processed} = msg
        console.log(`Processed ${processed}/${expected}`)
        resolve()
        break
      default:
        console.warn('Unknown message type:', msg.type)
    }
  })

  // don't forget to propagate errors
  worker.on('error', reject)
})
// ==================== main.ts =======================
// a worker-channel parent — control flow matches child
// ====================================================
import {Worker} from 'node:worker_threads'
import {WorkerChannelReceiver} from '@sanity/worker-channels'
import type {ImageChannel} from '...'

const worker = new Worker(/* ... */)

const receiver = WorkerChannelReceiver.from<ImageChannel>(worker)

// BONUS: the receiver automatically propagates errors
const {expected} = await receiver.event.started()
console.log(`Processing ${expected} images...`)

for await (const progress of receiver.stream.progress()) {
  const {file, completed} = progress
  console.log(`${file}: ${completed}/${expected}`)
}

const {processed} = await receiver.event.finished()
console.log(`Processed ${processed}/${expected}`)

receiver.unsubscribe() // clean up after the worker

[!IMPORTANT] The channel contract exists solely in TypeScript types and is shared between worker and parent processes.

This library uses Proxies to dynamically intercept property access and route messages to the correct handlers, providing compile-time safety with minimal runtime overhead while keeping worker and parent code cleanly isolated.

import {type WorkerChannel} from '@sanity/worker-channels'

// both the child and parent can import this with a type import
export type ImageChannel = WorkerChannel.Definition<{
  started: WorkerChannel.Event<{expected: number}>
  progress: WorkerChannel.Stream<{file: ImageFile; completed: number}>
  finished: WorkerChannel.Event<{processed: number}>
}>

Installation

npm install @sanity/worker-channels

Quick Start

1. Define your channel contract

// types.ts - shared between worker and parent
import {type WorkerChannel} from '@sanity/worker-channels'

export type BuildChannel = WorkerChannel.Definition<{
  buildStart: WorkerChannel.Event<{target: string}>
  progress: WorkerChannel.Stream<{file: string; percent: number}>
  buildComplete: WorkerChannel.Event<{duration: number; files: string[]}>
}>

2. Report events in your worker thread

// worker.ts
import {parentPort} from 'node:worker_threads'
import {WorkerChannelReporter} from '@sanity/worker-channels'
import type {BuildChannel} from './types'

const report = WorkerChannelReporter.from<BuildChannel>(parentPort)

// Signal build started
report.event.buildStart({target: 'production'})

// Stream progress updates
const files = ['app.js', 'styles.css', 'index.html']
for (let i = 0; i < files.length; i++) {
  report.stream.progress.emit({
    file: files[i],
    percent: Math.round(((i + 1) / files.length) * 100),
  })
  await new Promise((resolve) => setTimeout(resolve, 1000)) // Simulate work
}
report.stream.progress.end() // Important: end the stream

// Signal completion
report.event.buildComplete({duration: 3000, files})

3. Await events in your parent process

// main.ts
import {Worker} from 'node:worker_threads'
import {WorkerChannelReceiver} from '@sanity/worker-channels'
import type {BuildChannel} from './types'

const worker = new Worker('./worker.js')
const receiver = WorkerChannelReceiver.from<BuildChannel>(worker)

// Wait for build to start
const {target} = await receiver.event.buildStart()
console.log(`Build started for ${target}`)

// Monitor progress stream
for await (const {file, percent} of receiver.stream.progress()) {
  console.log(`${file}: ${percent}%`)
}

// Wait for completion
const {duration, files} = await receiver.event.buildComplete()
console.log(`Build completed in ${duration}ms, ${files.length} files`)

receiver.unsubscribe() // Clean up

Usage Examples

Node.js Workers

import {Worker} from 'node:worker_threads'
import {parentPort} from 'node:worker_threads'

// Worker thread
const report = WorkerChannelReporter.from<MyChannel>(parentPort)

// Parent process
const worker = new Worker('./worker.js')
const receiver = WorkerChannelReceiver.from<MyChannel>(worker)

Web Workers

// In web worker
const report = WorkerChannelReporter.from<MyChannel>(self)

// In main thread
const worker = new Worker('./worker.js')
const receiver = WorkerChannelReceiver.from<MyChannel>(worker)

EventEmitter

Useful for asynchronously reporting progress within the same thread in Node.js.

import {EventEmitter} from 'node:events'

const emitter = new EventEmitter()
const reporter = WorkerChannelReporter.from<MyChannel>(emitter)
const receiver = WorkerChannelReceiver.from<MyChannel>(emitter)

EventTarget

Similarly, EventTarget is also supported and useful for asynchronously reporting progress within the same thread in non-Node.js environments.

const target = new EventTarget()
const reporter = WorkerChannelReporter.from<MyChannel>(target)
const receiver = WorkerChannelReceiver.from<MyChannel>(target)

API Reference

WorkerChannel.Definition, WorkerChannel.Event, WorkerChannel.Stream

type MyChannel = WorkerChannel.Definition<{
  eventName: WorkerChannel.Event<PayloadType> // One-time events
  streamName: WorkerChannel.Stream<PayloadType> // Continuous data streams
}>

WorkerChannelReporter

Reports events and streams from worker to parent process.

Creation

Static factory (recommended):

// Automatically detects the interface type
const report = WorkerChannelReporter.from<MyChannel>(parentPort)

Constructor:

// For custom message posting logic
const report = new WorkerChannelReporter<MyChannel>((message) => {
  parentPort.postMessage(message)
})

Usage

// Events (one-time only)
report.event.eventName(payload)

// Streams (multiple emissions + end)
report.stream.streamName.emit(payload)
report.stream.streamName.emit(anotherPayload)
report.stream.streamName.end()

WorkerChannelReceiver

Receives events and streams from worker in the parent process.

Creation

Static factory (recommended):

// Automatically detects the interface type and sets up listeners
const receiver = WorkerChannelReceiver.from<MyChannel>(worker)

Constructor:

// For custom subscription logic
const receiver = new WorkerChannelReceiver<MyChannel>((subscriber) => {
  worker.addListener('message', subscriber.next)
  worker.addListener('error', subscriber.error)

  // Return cleanup function
  return () => {
    worker.removeListener('message', subscriber.next)
    worker.removeListener('error', subscriber.error)
  }
})

Usage

// Events (returns Promise that resolves once)
const payload = await receiver.event.eventName()

// Streams (returns AsyncIterable for multiple values)
for await (const payload of receiver.stream.streamName()) {
  console.log('Received:', payload)
}

// Cleanup (important for static factory instances)
receiver.unsubscribe()

Error Propagation

When an error occurs in the worker thread, it automatically propagates to any awaiting event handlers or stream iterators in the parent process:

// If the worker throws an error...
// worker.ts
throw new Error('Something went wrong in worker')
// ...it will reject awaiting promises in the parent
// main.ts
try {
  const result = await receiver.event.completed() // ← Will reject with worker error
} catch (error) {
  console.error('Worker failed:', error.message) // "Something went wrong in worker"
}

// ...and cause stream iteration to throw
try {
  for await (const progress of receiver.stream.progress()) {
    // ← Will throw worker error
    console.log(progress)
  }
} catch (error) {
  console.error('Stream failed:', error.message) // "Something went wrong in worker"
}

This eliminates the need for manual error handling patterns like worker.on('error', ...) - errors are automatically propagated to the appropriate awaiting code.

Important Caveats

Events are one-time only

// ❌ This will throw an error
reporter.event.buildComplete({duration: 1000})
reporter.event.buildComplete({duration: 2000}) // Error: already reported

// ✅ Use streams for multiple values instead
reporter.stream.status.emit('processing')
reporter.stream.status.emit('finalizing')
reporter.stream.status.end()

[!NOTE] This design prevents bugs where the same event fires multiple times unexpectedly. It makes the contract explicit: use events for singular occurrences, streams for continuous data.

Streams must be ended

// ❌ Stream never ends - receiver will wait forever
reporter.stream.progress.emit(50)
reporter.stream.progress.emit(100)
// Missing: reporter.stream.progress.end()

// ✅ Always end streams
reporter.stream.progress.emit(50)
reporter.stream.progress.emit(100)
reporter.stream.progress.end() // Signals completion

Control flow must match between worker and parent

The parent's control flow should mirror the worker's control flow. If a condition prevents the worker from reporting an event/stream that the parent is awaiting, the parent will hang indefinitely:

// ❌ Worker may conditionally skip events
// worker.ts
if (shouldProcess) {
  report.event.started({count: files.length})
  // ... processing
  report.event.finished({success: true})
}
// If shouldProcess is false, no events are sent

// ❌ Parent unconditionally awaits - will hang if worker skips
// main.ts
await receiver.event.started() // ← Will hang forever if shouldProcess is false
// ...
await receiver.event.finished()
// ✅ Match the conditional logic or use different events
// worker.ts
if (shouldProcess) {
  report.event.started({count: files.length})
  // ... processing
  report.event.finished({success: true})
} else {
  report.event.skipped({reason: 'No processing needed'})
}

// ✅ Parent handles both cases
// main.ts
const startResult = await Promise.race([
  receiver.event.started().then((data) => ({type: 'started', data})),
  receiver.event.skipped().then((data) => ({type: 'skipped', data})),
])

if (startResult.type === 'started') {
  // ... handle processing flow
  await receiver.event.finished()
} else {
  console.log('Processing skipped:', startResult.data.reason)
}

[!WARNING] Worker channels provide message routing, not flow validation. The library doesn't verify that events are sent in the expected order or that all expected events are sent - it only ensures messages reach the right handlers when they are sent.

LICENSE

MIT License - see LICENSE file for details.