@cordfuse/turnq
v0.4.2
Published
Named-channel turn coordinator. HTTP actions, SSE notifications.
Readme
turnq
When multiple agents or processes write to a shared resource concurrently — a git branch, a deploy slot, a database migration — the naive fix is retry with jitter: sleep a random amount, try again, hope for the best. It works until it doesn't. Under load, agents collide repeatedly, waste work already done, and wait time is unbounded.
turnq replaces that gamble with a queue. Clients enqueue on a named channel, receive their turn when they're at the front, do their work, and release. Exactly one client holds the token at a time. Order is strict FIFO. No retries, no conflicts, no tuning magic numbers.
Two modes: local (file lock, no server) and distributed (HTTP server, multi-host).
Quick start
import { createCoordinator } from '@cordfuse/turnq/coordinator';
const coordinator = await createCoordinator();
await coordinator.withTurn('my-channel', async () => {
// exactly one process is here at a time
});No server, no config. createCoordinator() with no arguments uses a local file lock (flock(2) via POSIX). Safe across multiple processes on the same host.
Modes
Local (default)
No server required. Uses flock(2) on a temp file — the OS releases the lock automatically if the process dies, so stale locks are impossible.
const coordinator = await createCoordinator();
// [turnq] local file lock modeLock files live at os.tmpdir()/turnq-locks/<channel>.lock.
Distributed
Runs against a turnq HTTP server. Serializes turns across multiple hosts.
const coordinator = await createCoordinator({
url: 'https://turnq.example.com',
apiKey: process.env.TURNQ_API_KEY,
});
// [turnq] distributed — https://turnq.example.comFallback (default: true)
If the distributed server is unreachable or credentials are missing, createCoordinator falls back to local mode and logs a warning.
// default — falls back to local silently
const coordinator = await createCoordinator({
url: 'https://turnq.example.com',
apiKey: process.env.TURNQ_API_KEY,
});
// strict — throws if distributed is unavailable
const coordinator = await createCoordinator({
url: 'https://turnq.example.com',
apiKey: process.env.TURNQ_API_KEY,
fallback: false,
});API
createCoordinator(opts?)
interface CoordinatorOptions {
url?: string; // turnq server URL — omit for local mode
apiKey?: string; // required when url is set
fallback?: boolean; // default: true — fall back to local if distributed unavailable
}
createCoordinator(opts?: CoordinatorOptions): Promise<Coordinator>Coordinator
interface Coordinator {
createChannel(name: string, opts?: { leaseMs?: number }): Promise<void>;
withTurn<T>(channel: string, fn: () => Promise<T>): Promise<T>;
close(): void;
}createChannel is a no-op in local mode. In distributed mode it ensures the channel exists on the server before use.
Direct client usage
For lower-level control, import TurnqClient directly:
import { TurnqClient } from '@cordfuse/turnq/client';
const client = new TurnqClient('https://turnq.example.com', {
apiKey: process.env.TURNQ_API_KEY,
});
await client.createChannel('my-channel', { leaseMs: 60_000 });
await client.withTurn('my-channel', async (ctx) => {
await ctx.withStep('fetch', () => git.fetch());
await ctx.withStep('commit', () => git.commit(message, files));
await ctx.withStep('push', () => git.push('origin', 'main'));
});
client.close();Running the server
Clone the repo and use docker compose:
git clone https://github.com/cordfuse/turnq
cd turnq
TURNQ_API_KEY=your-key docker compose up -dAdmin endpoints
GET /health
GET /metrics Prometheus-compatible
GET /channels/:name/queue inspect queue
DELETE /channels/:name/holder force-release current holderProtocol at a glance
HTTP for actions, SSE for notifications. JSON throughout.
POST /channels create channel
POST /channels/{name}/enqueue → { requestId, position }
GET /channels/{name}/subscribe SSE stream
POST /channels/{name}/release complete turn
POST /channels/{name}/abort leave queueTurn lifecycle: enqueue → subscribe → wait for your-turn → do work → release.
bash + curl
TURNQ_URL="https://turnq.example.com"
CHANNEL="my-channel"
CLIENT_ID=$(uuidgen)
REQUEST_ID=$(curl -s -X POST "$TURNQ_URL/channels/$CHANNEL/enqueue" \
-H "x-api-key: $TURNQ_API_KEY" \
-H "content-type: application/json" \
-d "{\"clientId\":\"$CLIENT_ID\"}" | jq -r .requestId)
curl -sN "$TURNQ_URL/channels/$CHANNEL/subscribe?clientId=$CLIENT_ID&requestId=$REQUEST_ID" \
-H "x-api-key: $TURNQ_API_KEY" | while IFS= read -r line; do
[[ "$line" == "event: your-turn" ]] || continue
do_work
curl -s -X POST "$TURNQ_URL/channels/$CHANNEL/release" \
-H "x-api-key: $TURNQ_API_KEY" \
-H "content-type: application/json" \
-d "{\"clientId\":\"$CLIENT_ID\",\"requestId\":\"$REQUEST_ID\"}"
break
doneGo
func withTurn(baseURL, channel, apiKey string, fn func() error) error {
clientID := newUUID()
setHeaders := func(r *http.Request) {
r.Header.Set("x-api-key", apiKey)
r.Header.Set("content-type", "application/json")
}
body, _ := json.Marshal(map[string]string{"clientId": clientID})
req, _ := http.NewRequest("POST", fmt.Sprintf("%s/channels/%s/enqueue", baseURL, channel), bytes.NewReader(body))
setHeaders(req)
resp, _ := http.DefaultClient.Do(req)
var enq struct{ RequestID string `json:"requestId"` }
json.NewDecoder(resp.Body).Decode(&enq)
resp.Body.Close()
url := fmt.Sprintf("%s/channels/%s/subscribe?clientId=%s&requestId=%s", baseURL, channel, clientID, enq.RequestID)
req, _ = http.NewRequest("GET", url, nil)
req.Header.Set("x-api-key", apiKey)
resp, _ = http.DefaultClient.Do(req)
defer resp.Body.Close()
scanner, currentEvent := bufio.NewScanner(resp.Body), ""
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "event:") { currentEvent = strings.TrimSpace(line[6:]) }
if strings.HasPrefix(line, "data:") && currentEvent == "your-turn" { break }
}
err := fn()
body, _ = json.Marshal(map[string]any{
"clientId": clientID, "requestId": enq.RequestID,
"result": map[string]bool{"success": err == nil},
})
req, _ = http.NewRequest("POST", fmt.Sprintf("%s/channels/%s/release", baseURL, channel), bytes.NewReader(body))
setHeaders(req)
http.DefaultClient.Do(req)
return err
}Java
import java.net.http.*;
import java.net.URI;
import java.util.UUID;
var turnqUrl = "https://turnq.example.com";
var channel = "my-channel";
var clientId = UUID.randomUUID().toString();
var http = HttpClient.newHttpClient();
// Enqueue
var enqResp = http.send(
HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString("{\"clientId\":\"" + clientId + "\"}"))
.uri(URI.create(turnqUrl + "/channels/" + channel + "/enqueue"))
.header("x-api-key", apiKey).header("content-type", "application/json")
.build(),
HttpResponse.BodyHandlers.ofString());
var requestId = parseRequestId(enqResp.body());
// Subscribe (SSE)
var sseResp = http.send(
HttpRequest.newBuilder()
.GET()
.uri(URI.create(turnqUrl + "/channels/" + channel +
"/subscribe?clientId=" + clientId + "&requestId=" + requestId))
.header("x-api-key", apiKey)
.build(),
HttpResponse.BodyHandlers.ofLines());
String currentEvent = "";
for (var line : (Iterable<String>) sseResp.body()::iterator) {
if (line.startsWith("event:")) currentEvent = line.substring(6).trim();
else if (line.startsWith("data:") && "your-turn".equals(currentEvent)) break;
}
// Do work
doWork();
// Release
http.send(
HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString(
"{\"clientId\":\"" + clientId + "\",\"requestId\":\"" + requestId + "\"}"))
.uri(URI.create(turnqUrl + "/channels/" + channel + "/release"))
.header("x-api-key", apiKey).header("content-type", "application/json")
.build(),
HttpResponse.BodyHandlers.discarding());C#
using System.Net.Http;
using System.Text;
using System.Text.Json;
var turnqUrl = "https://turnq.example.com";
var channel = "my-channel";
var clientId = Guid.NewGuid().ToString();
var http = new HttpClient();
http.DefaultRequestHeaders.Add("x-api-key", Environment.GetEnvironmentVariable("TURNQ_API_KEY"));
// Enqueue
var enqRes = await http.PostAsync(
$"{turnqUrl}/channels/{channel}/enqueue",
new StringContent($$"""{"clientId":"{{clientId}}"}""", Encoding.UTF8, "application/json"));
var enqJson = JsonDocument.Parse(await enqRes.Content.ReadAsStringAsync());
var requestId = enqJson.RootElement.GetProperty("requestId").GetString()!;
// Subscribe (SSE)
using var stream = await http.GetStreamAsync(
$"{turnqUrl}/channels/{channel}/subscribe?clientId={clientId}&requestId={requestId}");
using var reader = new StreamReader(stream);
string currentEvent = "", line;
while ((line = await reader.ReadLineAsync() ?? "") != null) {
if (line.StartsWith("event:")) currentEvent = line[6..].Trim();
else if (line.StartsWith("data:") && currentEvent == "your-turn") break;
}
// Do work
await DoWorkAsync();
// Release
await http.PostAsync(
$"{turnqUrl}/channels/{channel}/release",
new StringContent($$"""{"clientId":"{{clientId}}","requestId":"{{requestId}}"}""",
Encoding.UTF8, "application/json"));Rust
use reqwest::Client;
use serde_json::{json, Value};
use uuid::Uuid;
async fn with_turn<F, Fut>(base_url: &str, channel: &str, api_key: &str, work: F) -> anyhow::Result<()>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = anyhow::Result<()>>,
{
let client = Client::new();
let client_id = Uuid::new_v4().to_string();
// Enqueue
let enq: Value = client
.post(format!("{base_url}/channels/{channel}/enqueue"))
.header("x-api-key", api_key)
.json(&json!({ "clientId": client_id }))
.send().await?.json().await?;
let request_id = enq["requestId"].as_str().unwrap().to_owned();
// Subscribe (SSE)
let url = format!("{base_url}/channels/{channel}/subscribe?clientId={client_id}&requestId={request_id}");
let mut resp = client.get(&url).header("x-api-key", api_key).send().await?;
let mut current_event = String::new();
'sse: while let Some(chunk) = resp.chunk().await? {
for line in std::str::from_utf8(&chunk)?.lines() {
if let Some(ev) = line.strip_prefix("event:") { current_event = ev.trim().to_owned(); }
else if line.starts_with("data:") && current_event == "your-turn" { break 'sse; }
}
}
// Do work
let result = work().await;
// Release
client.post(format!("{base_url}/channels/{channel}/release"))
.header("x-api-key", api_key)
.json(&json!({
"clientId": client_id,
"requestId": request_id,
"result": { "success": result.is_ok() }
}))
.send().await?;
result
}PowerShell
$TurnqUrl = "https://turnq.example.com"
$Channel = "my-channel"
$ClientId = [System.Guid]::NewGuid().ToString()
$Headers = @{ "x-api-key" = $env:TURNQ_API_KEY; "Content-Type" = "application/json" }
# Enqueue
$Enq = Invoke-RestMethod -Method POST -Uri "$TurnqUrl/channels/$Channel/enqueue" `
-Headers $Headers -Body (ConvertTo-Json @{ clientId = $ClientId })
$RequestId = $Enq.requestId
# Subscribe (SSE) — read until your-turn
$Req = [System.Net.WebRequest]::Create("$TurnqUrl/channels/$Channel/subscribe?clientId=$ClientId&requestId=$RequestId")
$Req.Headers["x-api-key"] = $env:TURNQ_API_KEY
$Stream = $Req.GetResponse().GetResponseStream()
$Reader = New-Object System.IO.StreamReader($Stream)
$CurrentEvent = ""
while (-not $Reader.EndOfStream) {
$Line = $Reader.ReadLine()
if ($Line -match "^event:\s*(.+)") { $CurrentEvent = $Matches[1] }
elseif ($Line -match "^data:" -and $CurrentEvent -eq "your-turn") { break }
}
$Reader.Close()
# Do work
Invoke-YourWork
# Release
Invoke-RestMethod -Method POST -Uri "$TurnqUrl/channels/$Channel/release" `
-Headers $Headers `
-Body (ConvertTo-Json @{ clientId = $ClientId; requestId = $RequestId })License
MIT.
