@nsshunt/stsrunnerframework
v2.0.18
Published
STS Runner Framework
Downloads
2,080
Readme
STS Runner Framework
A high-performance asynchronous runner orchestration framework for Node.js and browser environments.
The framework enables you to build systems that execute large numbers of concurrent test runners or workload generators across worker processes, with:
- asynchronous runner lifecycle management
- distributed worker execution
- structured telemetry
- instrumentation integration
- dynamic runner configuration
- state synchronization
- archive history tracking
- event-driven observability
The framework is designed for scenarios such as:
- load testing
- distributed task execution
- asynchronous job orchestration
- testing frameworks
- automation systems
- telemetry-driven workload engines
Table of Contents
- Overview
- Architecture
- Core Concepts
- System Components
- Execution Flow
- Telemetry and Observability
- Runner Lifecycle
- Worker Lifecycle
- State Synchronization
- Archiving
- Client Usage
- Creating Custom Runners
- Example Test Runner
- Example Client
- Threading Model
- Design Goals
- Future Enhancements
Overview
The STS Runner Framework provides a scalable architecture for executing multiple runners across multiple workers, with centralized orchestration.
The system consists of three primary layers:
Client Application
│
▼
STSWorkerManager (Coordinator)
│
▼
Workers (Execution Hosts)
│
▼
Runners (User Workload Logic)Key properties:
- Workers isolate execution
- Runners perform actual work
- Manager coordinates everything
- Telemetry flows upward
- Commands flow downward
Architecture
The framework is composed of several subsystems:
Client Application
│
▼
STSWorkerManager
│
├── WorkerRegistry
├── WorkerInstanceManager
├── RunnerLifecycleManager
├── WorkerStateSynchroniser
├── WorkerCommandCoordinator
├── AsyncRunnerInstanceManager
├── STSMessageBroker
└── ArchiveManagerEach subsystem has a clearly defined responsibility.
Core Concepts
Worker
A Worker represents an execution host.
Workers may run:
- inside Node.js worker threads
- inside browser workers
- in mocked in-process environments
Workers host multiple runners.
Runner
A Runner is the unit that performs actual work.
Examples:
- HTTP load test
- API validation
- database operation
- workflow automation
- synthetic telemetry generator
Runners implement:
interface IRunnerInstanceTypical methods:
ExecuteRunner()
StartRunner()
StopRunner()
PauseRunner()
ResumeRunner()
ResetRunner()
TerminateRunner()
UpdateRunner()
Completed()Manager
The STSWorkerManager is the central coordinator.
It:
- creates workers
- creates runners
- dispatches commands
- collects telemetry
- synchronizes state
- archives completed runners
Telemetry
Telemetry represents runtime metrics generated by runners.
Examples:
requestCount
errorCount
velocity
latency
duration
activeRequestCount
tx
rx
message[]Telemetry flows from:
Runner → Worker → Manager → ClientSystem Components
STSWorkerManager
The core orchestrator of the framework.
Responsibilities:
- worker creation
- runner creation
- lifecycle command dispatch
- state synchronization
- archive management
- telemetry processing
Example initialization:
const wm = new STSWorkerManager({
workerFactory,
maxArchiveListLength: 200,
logger: defaultLogger,
logLevel: 4,
messageTimeout: 10000
});WorkerRegistry
Stores the in-memory runtime graph of the system.
WorkerRegistry
├── WorkerEx
│ └── RunnerExProvides:
AddWorker()
AddRunner()
DeleteWorker()
DeleteRunner()
GetWorkersMap()
GetWorkersCoreMap()
GetNextAvailableWorker()
GetBusiestWorker()WorkerInstanceManager
Responsible for creating worker instances and wiring system events.
Handles:
- worker creation
- exit events
- error events
- message error events
Supports:
Node Worker Threads
Browser Workers
Mock WorkersSTSMessageBroker
Provides message routing between manager and workers.
Responsibilities:
SendMessageToWorkerForRunnerWithCallBack()
SendMessageToWorkerForWorkerWithCallBack()
SetupMessagePort()
SetupMessagePortListener()Uses MessagePort communication channels.
AsyncRunnerInstanceManager
Creates and manages runner instances inside workers.
Responsibilities:
- runner creation
- runner execution control
- instrument controller attachment
RunnerLifecycleManager
Handles runner telemetry and lifecycle state updates.
Responsibilities:
ProcessTelemetry()
RunnerStateChange()
EmitRunnerEvent()
SyncRunnerData()Ensures the in-memory model stays synchronized.
WorkerCommandCoordinator
Handles command dispatching.
Two command scopes:
Worker Commands
StartWorkers
StopWorkers
PauseWorkers
ResumeWorkers
ExecuteWorkers
ResetWorkers
TerminateWorkers
UpdateWorkersRunner Commands
StartRunners
StopRunners
PauseRunners
ResumeRunners
ExecuteRunners
ResetRunners
TerminateRunners
UpdateRunnersCommands are executed asynchronously across workers.
WorkerStateSynchroniser
Ensures that the manager's in-memory model matches actual worker state.
Mechanism:
Manager → Worker : GetRunners
Worker → Manager : Runner statesMethods:
GetSyncedWorkers()
GetSyncedWorkersCore()ArchiveManager
Stores completed runner history.
Archived runners contain:
runnerId
runnerOptions
runnerHistory[]
telemetrySnapshotsProvides:
GetArchiveList()Execution Flow
Worker Creation
Client → STSWorkerManager.AddWorker()Flow:
Manager
↓
WorkerInstanceManager
↓
WorkerFactory
↓
WorkerInstance
↓
WorkerRegistryRunner Creation
Client → Worker.AddRunner()Flow:
Manager
↓
AsyncRunnerInstanceManager
↓
Runner created
↓
Worker receives AddRunner commandRunner Execution
StartRunner()Execution loop:
Runner.ExecuteRunner()
↓
Update telemetry
↓
Sleep or perform work
↓
Publish telemetryTelemetry and Observability
Telemetry is buffered and published using a hybrid strategy:
Flush if:
buffer size reached
OR
timeout reachedExample logic:
maxBufferSize = 50
timeout = 1000msTelemetry metrics include:
requestCount
errorCount
retryCount
velocity
latency
duration
activeRequestCount
network RX/TXInstrumentation integrates with:
@nsshunt/stsobservability
@nsshunt/stsinstrumentmanagerclientRunner Lifecycle
Runner states include:
initializing
running
paused
stopped
completed
error
terminatedLifecycle transitions generate events:
Telemetry
StateChangeExample:
runner.on('Telemetry', handler)
runner.on('StateChange', handler)Worker Lifecycle
Worker events include:
exit
error
messageerrorThe WorkerInstanceManager wires these events and forwards them to the manager.
State Synchronization
The manager maintains an in-memory graph.
Before returning model queries:
GetWorkers()
GetWorkersCore()The system performs:
SyncAllWorkers()Which queries every worker for current runner state.
Archiving
Completed runners are archived by the ArchiveManager.
The archive contains:
runner metadata
execution history
telemetry snapshots
state transitionsExample:
const archiveList = await wm.GetArchiveList({});Client Usage
Example initialization:
const wm = new STSWorkerManager({
workerFactory,
maxArchiveListLength: 200,
logger: defaultLogger,
logLevel: 4,
messageTimeout: 10000
});Create Worker
const worker = await wm.AddWorker({
hostName: "host01",
agentId: "agent01",
mocked: true,
tags: ["testing"],
logLevel: 4
});Create Runner
const runner = await worker.AddRunner({
testType: "TestCase01",
executionProfile: {
iterations: 40,
delayBetweenIterations: 250
}
});Runner Events
runner.on("Telemetry", handler);
runner.on("StateChange", handler);Start Runner
await runner.Start();or
await wm.StartRunners(worker.id, [runner.id]);Pause / Resume
await runner.Pause();
await runner.Resume();Update Runner
await runner.Update({
executionProfile: {
iterations: 30
}
});Creating Custom Runners
Implement:
IRunnerInstanceExample skeleton:
class MyRunner implements IRunnerInstance {
async ExecuteRunner() {
// perform work
return true;
}
async StartRunner() {}
async StopRunner() {}
async PauseRunner() {}
async ResumeRunner() {}
}Example Test Runner
Two reference implementations exist:
TestCase01
TestCase02They demonstrate:
- telemetry batching
- simulated workload
- logging
- lifecycle transitions
Example Client
Example scenarios included:
client test 1:
multiple workers + runners
client test 2:
single runner dynamic updateBoth demonstrate full framework usage.
Threading Model
The framework supports multiple execution models:
Node Worker Threads
Browser Workers
Mock WorkersMock workers are useful for testing.
Design Goals
The framework was designed with the following principles:
Scalability
Supports large numbers of runners across workers.
Observability
First-class telemetry and instrumentation integration.
Flexibility
Works in:
- Node
- Browser
- test environments
Isolation
Workers isolate execution workloads.
Event-Driven
Runners emit structured events.
Dynamic Control
Runners can be:
paused
resumed
reset
updated
terminatedat runtime.
Future Enhancements
Potential roadmap features:
- distributed multi-machine worker clusters
- built-in load test profiles
- Web UI dashboard
- Prometheus exporters
- distributed tracing
- adaptive execution control
- auto-scaling workers
License
MIT License.
Architecture Diagrams
System Architecture
flowchart TB
Client["Client Application"]
subgraph Manager["STSWorkerManager"]
WorkerRegistry
WorkerInstanceManager
RunnerLifecycleManager
WorkerCommandCoordinator
WorkerStateSynchroniser
AsyncRunnerInstanceManager
ArchiveManager
STSMessageBroker
end
subgraph Workers["Worker Threads / Web Workers"]
Worker1["WorkerInstance"]
Worker2["WorkerInstance"]
end
subgraph Runners["Runner Instances"]
Runner1["RunnerInstance"]
Runner2["RunnerInstance"]
Runner3["RunnerInstance"]
end
Client --> Manager
Manager --> WorkerRegistry
Manager --> WorkerInstanceManager
Manager --> RunnerLifecycleManager
Manager --> WorkerCommandCoordinator
Manager --> WorkerStateSynchroniser
Manager --> AsyncRunnerInstanceManager
Manager --> ArchiveManager
Manager --> STSMessageBroker
WorkerInstanceManager --> Worker1
WorkerInstanceManager --> Worker2
Worker1 --> Runner1
Worker1 --> Runner2
Worker2 --> Runner3
STSMessageBroker --> Worker1
STSMessageBroker --> Worker2Component Interaction
flowchart LR
Client --> STSWorkerManager
STSWorkerManager --> WorkerRegistry
STSWorkerManager --> WorkerInstanceManager
STSWorkerManager --> WorkerCommandCoordinator
STSWorkerManager --> RunnerLifecycleManager
STSWorkerManager --> WorkerStateSynchroniser
STSWorkerManager --> AsyncRunnerInstanceManager
STSWorkerManager --> ArchiveManager
STSWorkerManager --> STSMessageBroker
WorkerCommandCoordinator --> STSMessageBroker
WorkerStateSynchroniser --> STSMessageBroker
STSMessageBroker --> Worker
Worker --> Runner
Runner --> RunnerLifecycleManager
RunnerLifecycleManager --> WorkerRegistry
RunnerLifecycleManager --> ArchiveManagerRunner Lifecycle
stateDiagram-v2
[*] --> initializing
initializing --> running : Start
running --> paused : Pause
paused --> running : Resume
running --> completed : Iterations reached
running --> stopped : Stop
running --> error : Exception
running --> terminated : Terminate
paused --> terminated : Terminate
completed --> [*]
stopped --> [*]
error --> [*]
terminated --> [*]Execution Sequence
sequenceDiagram
participant Client
participant Manager as STSWorkerManager
participant Broker as MessageBroker
participant Worker
participant Runner
Client->>Manager: AddWorker()
Manager->>Worker: Create WorkerInstance
Client->>Manager: AddRunner()
Manager->>Broker: Send AddRunner
Broker->>Worker: AddRunner message
Worker->>Runner: Create RunnerInstance
Client->>Manager: StartRunner()
Manager->>Broker: StartRunner command
Broker->>Worker: StartRunner
Worker->>Runner: ExecuteRunner()
loop Execution Loop
Runner->>Runner: Perform Work
Runner->>Worker: PostTelemetry
Worker->>Broker: Telemetry Message
Broker->>Manager: Telemetry Event
Manager->>Client: Emit Telemetry Event
end
Runner->>Worker: Completed
Worker->>Broker: RunnerCompleted
Broker->>Manager: StateChange
Manager->>Client: StateChange EventWorker Load Balancing / Runner Placement Diagram
This shows how the manager can choose a worker, typically by using the registry helpers like: GetNextAvailableWorker() GetBusiestWorker() and then place runners onto workers.
flowchart TB
NewRunner["New Runner Request"]
subgraph Manager["STSWorkerManager"]
Registry["WorkerRegistry"]
Coordinator["Worker / Runner Coordination"]
Selector{"Select Worker"}
end
subgraph WorkerPool["Live Workers"]
W1["Worker A\n2 runners"]
W2["Worker B\n5 runners"]
W3["Worker C\n1 runner"]
end
NewRunner --> Manager
Manager --> Registry
Registry --> Selector
Coordinator --> Selector
Selector -->|"GetNextAvailableWorker()"| W3
Selector -->|"GetBusiestWorker()"| W2
W3 -->|"AddRunner()"| Added["Runner assigned to least-loaded worker"]- The registry maintains the current live worker/runners graph
- worker selection can be based on current runner counts
- the least-loaded worker can be selected for simple balancing
- after selection, the runner is created and attached to that worker
Telemetry Pipeline Diagram
This shows the full telemetry path from a concrete runner up to the client.
flowchart LR
subgraph WorkerRuntime["Worker Runtime"]
Runner["RunnerInstance"]
ExecWorker["AbstractRunnerExecutionWorker"]
end
subgraph ManagerSide["Manager Side"]
Broker["STSMessageBroker"]
Lifecycle["RunnerLifecycleManager"]
Registry["WorkerRegistry"]
Sync["WorkerStateSynchroniser"]
end
Client["Client Application"]
Instrument["PublishInstrumentController / Telemetry Output"]
Runner -->|"updates instrumentData"| ExecWorker
ExecWorker -->|"PostTelemetryById()"| Broker
Broker -->|"InstrumentTelemetry message"| Lifecycle
Lifecycle -->|"SyncRunnerData()"| Registry
Lifecycle -->|"ProcessTelemetry()"| Instrument
Registry --> Sync
Sync --> Client
Lifecycle -->|"runner.on('Telemetry')"| Client
Lifecycle -->|"runner.on('StateChange')"| Client
- runners update their local instrumentData
- the worker runtime posts telemetry upward via PostTelemetryById()
- the message broker routes telemetry to the manager
- the runner lifecycle manager applies the update into the live registry model
- optional instrumentation publishers can also receive metric updates
- clients can observe telemetry through:
- event subscriptions
- synchronized model queries like GetWorkers()
stsrunnerframework (Old Legacy Doco - Previous Version)
terminated - this means the process has been terminated and no end-of-process steps should execute such as reports etc.
stopped - end the test and report as if completed normnally
State Machine
---
config:
theme: neo
look: neo
title: STSRunnerFramework
---
stateDiagram
direction TB
[*] --> Created:Select Start
Created --> Running
Created --> Error:Error Caught
Running --> Completed:Iterations Exhausted
Running --> Error:Error Caught
Running --> Stopped:Select Stop
Completed --> [*]
Running --> Paused:Select Pause
Paused --> Running:Select Resume
Paused --> Paused:Select Execute or Reset
Paused --> Stopped:Select Stop
Running --> Terminated:Select Terminate
Paused --> Terminated:Select Terminate
Paused --> Completed:Select Execute<br>Iterations Exhausted
Terminated --> [*]
Error --> [*]
Stopped --> [*]Nodejs Factory Instance Example
/* eslint @typescript-eslint/no-unused-vars: 0, @typescript-eslint/no-explicit-any: 0 */ // --> OFF
import { IRunnerInstance, ITestRunnerTelemetryPayload, eIWMessageCommands, WorkerInstance } from './../index'
import { TestCase01 } from './testCase01'
import { parentPort } from 'worker_threads';
export class WorkerTestCases extends WorkerInstance {
constructor() {
super()
}
override CreateAsyncRunner = (testRunnerTelemetryPayload: ITestRunnerTelemetryPayload): IRunnerInstance | null => {
const { runner } = testRunnerTelemetryPayload;
switch (runner.options.testType) {
case 'TestCase01' :
return new TestCase01(this, runner)
}
return null;
}
}
const worker = new WorkerTestCases();
parentPort?.on('message', (data: any) => {
if (isNode) {
const { command, payload } = data;
if (command === eIWMessageCommands.MessagePort) {
payload.port.on('message', (data: any) => {
worker.ProcessMessage(data);
});
worker.ProcessMessage(data);
} else {
throw new Error(`Invalid command: [${command}]`)
}
}
});Agent Factory Instance Example
/* eslint @typescript-eslint/no-unused-vars: 0, @typescript-eslint/no-explicit-any: 0 */ // --> OFF
import { IRunnerInstance, ITestRunnerTelemetryPayload, WorkerInstance, eIWMessageCommands } from './../index'
import { TestCase01 } from './testCase01'
export class WorkerTestCases extends WorkerInstance {
constructor() {
super()
}
override CreateAsyncRunner = (testRunnerTelemetryPayload: ITestRunnerTelemetryPayload): IRunnerInstance | null => {
const { runner } = testRunnerTelemetryPayload;
switch (runner.options.testType) {
case 'TestCase01' :
return new TestCase01(this, runner)
}
return null;
}
}
const worker = new WorkerTestCases();
onmessage = async function(data: any) {
const { command, payload } = data.data;
if (command === eIWMessageCommands.MessagePort) {
payload.port.start();
payload.port.addEventListener('message', (data: any) => {
worker.ProcessMessage(data.data); // browser version
});
/*
payload.port.addEventListener('messageerror', (error: any) => {
console.error(`onmessageerror(): [${error}]`)
worker.ProcessMessage(data.data); // browser version
});
*/
worker.ProcessMessage(data.data);
} else {
throw new Error(`Invalid command: [${command}]`)
}
}Mocked Factory Instance Example (for testing only)
/* eslint @typescript-eslint/no-unused-vars: 0, @typescript-eslint/no-explicit-any: 0 */ // --> OFF
import { IRunnerInstance, ITestRunnerTelemetryPayload, WorkerInstance } from './../index'
import { TestCase01 } from './testCase01'
import { MessagePort } from 'worker_threads';
import { JSONObject } from '@nsshunt/stsutils';
export class WorkerTestCases extends WorkerInstance {
#port: MessagePort | null = null;
constructor() {
super()
}
SetPort = (message: JSONObject) => {
this.#port = message.payload.port as MessagePort;
this.#port.on('message', (data: any) => {
if (isNode) {
this.ProcessMessage(data);
} else {
this.ProcessMessage(data.data); // browser version
}
});
this.ProcessMessage(message);
}
override CreateAsyncRunner = (testRunnerTelemetryPayload: ITestRunnerTelemetryPayload): IRunnerInstance | null => {
const { runner } = testRunnerTelemetryPayload;
switch (runner.options.testType) {
case 'TestCase01' :
return new TestCase01(this, runner)
}
return null;
}
}