npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@cordfuse/turnq

v0.4.2

Published

Named-channel turn coordinator. HTTP actions, SSE notifications.

Readme

turnq

When multiple agents or processes write to a shared resource concurrently — a git branch, a deploy slot, a database migration — the naive fix is retry with jitter: sleep a random amount, try again, hope for the best. It works until it doesn't. Under load, agents collide repeatedly, waste work already done, and wait time is unbounded.

turnq replaces that gamble with a queue. Clients enqueue on a named channel, receive their turn when they're at the front, do their work, and release. Exactly one client holds the token at a time. Order is strict FIFO. No retries, no conflicts, no tuning magic numbers.

Two modes: local (file lock, no server) and distributed (HTTP server, multi-host).


Quick start

import { createCoordinator } from '@cordfuse/turnq/coordinator';

const coordinator = await createCoordinator();

await coordinator.withTurn('my-channel', async () => {
  // exactly one process is here at a time
});

No server, no config. createCoordinator() with no arguments uses a local file lock (flock(2) via POSIX). Safe across multiple processes on the same host.


Modes

Local (default)

No server required. Uses flock(2) on a temp file — the OS releases the lock automatically if the process dies, so stale locks are impossible.

const coordinator = await createCoordinator();
// [turnq] local file lock mode

Lock files live at os.tmpdir()/turnq-locks/<channel>.lock.

Distributed

Runs against a turnq HTTP server. Serializes turns across multiple hosts.

const coordinator = await createCoordinator({
  url: 'https://turnq.example.com',
  apiKey: process.env.TURNQ_API_KEY,
});
// [turnq] distributed — https://turnq.example.com

Fallback (default: true)

If the distributed server is unreachable or credentials are missing, createCoordinator falls back to local mode and logs a warning.

// default — falls back to local silently
const coordinator = await createCoordinator({
  url: 'https://turnq.example.com',
  apiKey: process.env.TURNQ_API_KEY,
});

// strict — throws if distributed is unavailable
const coordinator = await createCoordinator({
  url: 'https://turnq.example.com',
  apiKey: process.env.TURNQ_API_KEY,
  fallback: false,
});

API

createCoordinator(opts?)

interface CoordinatorOptions {
  url?: string;       // turnq server URL — omit for local mode
  apiKey?: string;    // required when url is set
  fallback?: boolean; // default: true — fall back to local if distributed unavailable
}

createCoordinator(opts?: CoordinatorOptions): Promise<Coordinator>

Coordinator

interface Coordinator {
  createChannel(name: string, opts?: { leaseMs?: number }): Promise<void>;
  withTurn<T>(channel: string, fn: () => Promise<T>): Promise<T>;
  close(): void;
}

createChannel is a no-op in local mode. In distributed mode it ensures the channel exists on the server before use.


Direct client usage

For lower-level control, import TurnqClient directly:

import { TurnqClient } from '@cordfuse/turnq/client';

const client = new TurnqClient('https://turnq.example.com', {
  apiKey: process.env.TURNQ_API_KEY,
});

await client.createChannel('my-channel', { leaseMs: 60_000 });

await client.withTurn('my-channel', async (ctx) => {
  await ctx.withStep('fetch',  () => git.fetch());
  await ctx.withStep('commit', () => git.commit(message, files));
  await ctx.withStep('push',   () => git.push('origin', 'main'));
});

client.close();

Running the server

Clone the repo and use docker compose:

git clone https://github.com/cordfuse/turnq
cd turnq
TURNQ_API_KEY=your-key docker compose up -d

Admin endpoints

GET    /health
GET    /metrics                           Prometheus-compatible
GET    /channels/:name/queue              inspect queue
DELETE /channels/:name/holder             force-release current holder

Protocol at a glance

HTTP for actions, SSE for notifications. JSON throughout.

POST   /channels                          create channel
POST   /channels/{name}/enqueue           → { requestId, position }
GET    /channels/{name}/subscribe         SSE stream
POST   /channels/{name}/release           complete turn
POST   /channels/{name}/abort             leave queue

Turn lifecycle: enqueue → subscribe → wait for your-turn → do work → release.


bash + curl

TURNQ_URL="https://turnq.example.com"
CHANNEL="my-channel"
CLIENT_ID=$(uuidgen)

REQUEST_ID=$(curl -s -X POST "$TURNQ_URL/channels/$CHANNEL/enqueue" \
  -H "x-api-key: $TURNQ_API_KEY" \
  -H "content-type: application/json" \
  -d "{\"clientId\":\"$CLIENT_ID\"}" | jq -r .requestId)

curl -sN "$TURNQ_URL/channels/$CHANNEL/subscribe?clientId=$CLIENT_ID&requestId=$REQUEST_ID" \
  -H "x-api-key: $TURNQ_API_KEY" | while IFS= read -r line; do
  [[ "$line" == "event: your-turn" ]] || continue
  do_work
  curl -s -X POST "$TURNQ_URL/channels/$CHANNEL/release" \
    -H "x-api-key: $TURNQ_API_KEY" \
    -H "content-type: application/json" \
    -d "{\"clientId\":\"$CLIENT_ID\",\"requestId\":\"$REQUEST_ID\"}"
  break
done

Go

func withTurn(baseURL, channel, apiKey string, fn func() error) error {
    clientID := newUUID()
    setHeaders := func(r *http.Request) {
        r.Header.Set("x-api-key", apiKey)
        r.Header.Set("content-type", "application/json")
    }

    body, _ := json.Marshal(map[string]string{"clientId": clientID})
    req, _  := http.NewRequest("POST", fmt.Sprintf("%s/channels/%s/enqueue", baseURL, channel), bytes.NewReader(body))
    setHeaders(req)
    resp, _ := http.DefaultClient.Do(req)
    var enq struct{ RequestID string `json:"requestId"` }
    json.NewDecoder(resp.Body).Decode(&enq)
    resp.Body.Close()

    url   := fmt.Sprintf("%s/channels/%s/subscribe?clientId=%s&requestId=%s", baseURL, channel, clientID, enq.RequestID)
    req, _ = http.NewRequest("GET", url, nil)
    req.Header.Set("x-api-key", apiKey)
    resp, _ = http.DefaultClient.Do(req)
    defer resp.Body.Close()

    scanner, currentEvent := bufio.NewScanner(resp.Body), ""
    for scanner.Scan() {
        line := scanner.Text()
        if strings.HasPrefix(line, "event:") { currentEvent = strings.TrimSpace(line[6:]) }
        if strings.HasPrefix(line, "data:") && currentEvent == "your-turn" { break }
    }

    err := fn()

    body, _ = json.Marshal(map[string]any{
        "clientId": clientID, "requestId": enq.RequestID,
        "result": map[string]bool{"success": err == nil},
    })
    req, _ = http.NewRequest("POST", fmt.Sprintf("%s/channels/%s/release", baseURL, channel), bytes.NewReader(body))
    setHeaders(req)
    http.DefaultClient.Do(req)
    return err
}

Java

import java.net.http.*;
import java.net.URI;
import java.util.UUID;

var turnqUrl  = "https://turnq.example.com";
var channel   = "my-channel";
var clientId  = UUID.randomUUID().toString();
var http      = HttpClient.newHttpClient();

// Enqueue
var enqResp = http.send(
    HttpRequest.newBuilder()
        .POST(HttpRequest.BodyPublishers.ofString("{\"clientId\":\"" + clientId + "\"}"))
        .uri(URI.create(turnqUrl + "/channels/" + channel + "/enqueue"))
        .header("x-api-key", apiKey).header("content-type", "application/json")
        .build(),
    HttpResponse.BodyHandlers.ofString());
var requestId = parseRequestId(enqResp.body());

// Subscribe (SSE)
var sseResp = http.send(
    HttpRequest.newBuilder()
        .GET()
        .uri(URI.create(turnqUrl + "/channels/" + channel +
            "/subscribe?clientId=" + clientId + "&requestId=" + requestId))
        .header("x-api-key", apiKey)
        .build(),
    HttpResponse.BodyHandlers.ofLines());

String currentEvent = "";
for (var line : (Iterable<String>) sseResp.body()::iterator) {
    if (line.startsWith("event:"))            currentEvent = line.substring(6).trim();
    else if (line.startsWith("data:") && "your-turn".equals(currentEvent)) break;
}

// Do work
doWork();

// Release
http.send(
    HttpRequest.newBuilder()
        .POST(HttpRequest.BodyPublishers.ofString(
            "{\"clientId\":\"" + clientId + "\",\"requestId\":\"" + requestId + "\"}"))
        .uri(URI.create(turnqUrl + "/channels/" + channel + "/release"))
        .header("x-api-key", apiKey).header("content-type", "application/json")
        .build(),
    HttpResponse.BodyHandlers.discarding());

C#

using System.Net.Http;
using System.Text;
using System.Text.Json;

var turnqUrl  = "https://turnq.example.com";
var channel   = "my-channel";
var clientId  = Guid.NewGuid().ToString();
var http      = new HttpClient();
http.DefaultRequestHeaders.Add("x-api-key", Environment.GetEnvironmentVariable("TURNQ_API_KEY"));

// Enqueue
var enqRes = await http.PostAsync(
    $"{turnqUrl}/channels/{channel}/enqueue",
    new StringContent($$"""{"clientId":"{{clientId}}"}""", Encoding.UTF8, "application/json"));
var enqJson   = JsonDocument.Parse(await enqRes.Content.ReadAsStringAsync());
var requestId = enqJson.RootElement.GetProperty("requestId").GetString()!;

// Subscribe (SSE)
using var stream = await http.GetStreamAsync(
    $"{turnqUrl}/channels/{channel}/subscribe?clientId={clientId}&requestId={requestId}");
using var reader = new StreamReader(stream);

string currentEvent = "", line;
while ((line = await reader.ReadLineAsync() ?? "") != null) {
    if (line.StartsWith("event:"))                                    currentEvent = line[6..].Trim();
    else if (line.StartsWith("data:") && currentEvent == "your-turn") break;
}

// Do work
await DoWorkAsync();

// Release
await http.PostAsync(
    $"{turnqUrl}/channels/{channel}/release",
    new StringContent($$"""{"clientId":"{{clientId}}","requestId":"{{requestId}}"}""",
        Encoding.UTF8, "application/json"));

Rust

use reqwest::Client;
use serde_json::{json, Value};
use uuid::Uuid;

async fn with_turn<F, Fut>(base_url: &str, channel: &str, api_key: &str, work: F) -> anyhow::Result<()>
where
    F: FnOnce() -> Fut,
    Fut: std::future::Future<Output = anyhow::Result<()>>,
{
    let client    = Client::new();
    let client_id = Uuid::new_v4().to_string();

    // Enqueue
    let enq: Value = client
        .post(format!("{base_url}/channels/{channel}/enqueue"))
        .header("x-api-key", api_key)
        .json(&json!({ "clientId": client_id }))
        .send().await?.json().await?;
    let request_id = enq["requestId"].as_str().unwrap().to_owned();

    // Subscribe (SSE)
    let url      = format!("{base_url}/channels/{channel}/subscribe?clientId={client_id}&requestId={request_id}");
    let mut resp = client.get(&url).header("x-api-key", api_key).send().await?;

    let mut current_event = String::new();
    'sse: while let Some(chunk) = resp.chunk().await? {
        for line in std::str::from_utf8(&chunk)?.lines() {
            if let Some(ev) = line.strip_prefix("event:") { current_event = ev.trim().to_owned(); }
            else if line.starts_with("data:") && current_event == "your-turn" { break 'sse; }
        }
    }

    // Do work
    let result = work().await;

    // Release
    client.post(format!("{base_url}/channels/{channel}/release"))
        .header("x-api-key", api_key)
        .json(&json!({
            "clientId":  client_id,
            "requestId": request_id,
            "result": { "success": result.is_ok() }
        }))
        .send().await?;

    result
}

PowerShell

$TurnqUrl  = "https://turnq.example.com"
$Channel   = "my-channel"
$ClientId  = [System.Guid]::NewGuid().ToString()
$Headers   = @{ "x-api-key" = $env:TURNQ_API_KEY; "Content-Type" = "application/json" }

# Enqueue
$Enq       = Invoke-RestMethod -Method POST -Uri "$TurnqUrl/channels/$Channel/enqueue" `
               -Headers $Headers -Body (ConvertTo-Json @{ clientId = $ClientId })
$RequestId = $Enq.requestId

# Subscribe (SSE) — read until your-turn
$Req    = [System.Net.WebRequest]::Create("$TurnqUrl/channels/$Channel/subscribe?clientId=$ClientId&requestId=$RequestId")
$Req.Headers["x-api-key"] = $env:TURNQ_API_KEY
$Stream = $Req.GetResponse().GetResponseStream()
$Reader = New-Object System.IO.StreamReader($Stream)

$CurrentEvent = ""
while (-not $Reader.EndOfStream) {
    $Line = $Reader.ReadLine()
    if ($Line -match "^event:\s*(.+)")            { $CurrentEvent = $Matches[1] }
    elseif ($Line -match "^data:" -and $CurrentEvent -eq "your-turn") { break }
}
$Reader.Close()

# Do work
Invoke-YourWork

# Release
Invoke-RestMethod -Method POST -Uri "$TurnqUrl/channels/$Channel/release" `
    -Headers $Headers `
    -Body (ConvertTo-Json @{ clientId = $ClientId; requestId = $RequestId })

License

MIT.