npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@actyx/machine-runner

v0.5.7

Published

asymmetric replicated state machines: runtime support

Downloads

979

Readme

Machine Runner

This library offers a TypeScript DSL for writing state machines and executing them in a fully decentralised fashion using the Actyx peer-to-peer event stream database. For an overview of the project this library is part of please refer to the GitHub repository.

The detailed documentation of this library is provided in its JsDoc comments.

Example usage

More detailed tutorial can be found here

We demonstrate the usage of our decentralized state machines on an example from manufacturing automation, i.e. the factory shop floor: a warehouse requests the fleet of logistics robots to pick something up and bring it somewhere else. Our task is to write the logic for the warehouse and for each of the robots so that the job will eventually be done. Since there are many robots we use an auction to settle who will do it.

Declaring the machines

First we define our set of events:

// sent by the warehouse to get things started
const requested = Event.design('requested')
  .withPayload<{ id: string; from: string; to: string }>()
// sent by each available candidate robot to register interest
const bid = Event.design('bid')
  .withPayload<{ robot: string; delay: number }>()
// sent by the robots
const selected = Event.design('selected')
  .withPayload<{ winner: string }>()

// declare a precisely typed tuple of all events we can now choose from
const transportOrderEvents = [requested, bid, selected] as const

Then we can declare a swarm protocol using these events:

const transportOrder = SwarmProtocol.make('transportOrder', transportOrderEvents)

Now we build two machines that participate in this protocol: the warehouse will request the material transport, while the fleet of robot will figure out who does it. The warehouse is much simpler in this initial part of the workflow since it has no further role after making the request — in a real implementation the protocol would go on to include the actual delivery.

// initialize the state machine builder for the `warehouse` role
const TransportOrderForWarehouse =
  transportOrder.makeMachine('warehouse')

// add initial state with command to request the transport
export const InitialWarehouse = TransportOrderForWarehouse
  .designState('Initial')
  .withPayload<{ id: string }>()
  .command('request', [requested], (ctx, from: string, to: string) =>
                                   [{ id: ctx.self.id, from, to }])
  .finish()

export const DoneWarehouse = TransportOrderForWarehouse.designEmpty('Done').finish()

// describe the transition into the `Done` state after request has been made
InitialWarehouse.react([requested], DoneWarehouse, (_ctx, _r) => [{}])

The robot state machine is constructed in the same way, albeit with more commands and state transitions:

const TransportOrderForRobot = transportOrder.makeMachine('robot')

type Score = { robot: string; delay: number }
type AuctionPayload =
  { id: string; from: string; to: string; robot: string; scores: Score[] }

export const Initial = TransportOrderForRobot.designState('Initial')
  .withPayload<{ robot: string }>()
  .finish()
export const Auction = TransportOrderForRobot.designState('Auction')
  .withPayload<AuctionPayload>()
  .command('bid', [bid], (ctx, delay: number) =>
                         [{ robot: ctx.self.robot, delay }])
  .command('select', [selected], (_ctx, winner: string) => [{ winner }])
  .finish()
export const DoIt = TransportOrderForRobot.designState('DoIt')
  .withPayload<{ robot: string; winner: string }>()
  .finish()

// ingest the request from the `warehouse`
Initial.react([requested], Auction, (ctx, r) => ({
  ...ctx.self,
  ...r.payload,
  scores: [],
}))

// accumulate bids from all `robot`
Auction.react([bid], Auction, (ctx, b) => {
  ctx.self.scores.push(b.payload)
  return ctx.self
})

// end the auction when a selection has happened
Auction.react([selected], DoIt, (ctx, s) =>
  ({ robot: ctx.self.robot, winner: s.payload.winner }))

Checking the machines

The part of the transport order workflow implemented in the previous section is visualized above as a UML state diagram. With the @actyx/machine-check library we can check that this workflow makes sense (i.e. it achieves eventual consensus, which is the same kind of consensus used by the bitcoin network to settle transactions), and we can also check that our state machines written down in code implement this workflow correctly.

To this end, we first need to declare the graph in JSON notation:

const proto: SwarmProtocolType = {
  initial: 'initial',
  transitions: [
    { source: 'initial', target: 'auction',
      label: { cmd: 'request', logType: ['requested'], role: 'warehouse' } },
    { source: 'auction', target: 'auction',
      label: { cmd: 'bid', logType: ['bid'], role: 'robot' } },
    { source: 'auction', target: 'doIt',
      label: { cmd: 'select', logType: ['selected'], role: 'robot' } },
  ]
}

The naming of states does not need to be the same as in our code, but the event type names and the commands need to match. With this preparation, we can perform the behavioral type checking as follows:

import { SwarmProtocolType, checkProjection, checkSwarmProtocol } from '@actyx/machine-check'

const robotJSON =
  TransportOrderForRobot.createJSONForAnalysis(Initial)
const warehouseJSON =
  TransportOrderForWarehouse.createJSONForAnalysis(InitialWarehouse)
const subscriptions = {
  robot: robotJSON.subscriptions,
  warehouse: warehouseJSON.subscriptions,
}

// these should all print `{ type: 'OK' }`, otherwise there’s a mistake in
// the code (you would normally verify this using your favorite unit
// testing framework)
console.log(
  checkSwarmProtocol(proto, subscriptions),
  checkProjection(proto, subscriptions, 'robot', robotJSON),
  checkProjection(proto, subscriptions, 'warehouse', warehouseJSON),
)

Running the machines

@actyx/machine-runner relies upon Actyx for storing/retrieving events and sending them to other nodes in the swarm. In other words, Actyx is the middleware that allows the warehouse and robot programs on different computers to talk to each other, in a fully decentralized peer-to-peer fashion and without further coordination — for maximum resilience and availability. Therefore, before we can run our machines we need to use the Actyx SDK to connect to the local Actyx service:

const actyx = await Actyx.of(
  { appId: 'com.example.acm', displayName: 'example', version: '0.0.1' })
const tags = transportOrder.tagWithEntityId('4711')
const robot1 = createMachineRunner(actyx, tags, Initial, { robot: 'agv1' })
const warehouse = createMachineRunner(actyx, tags, InitialWarehouse,
                                      { id: '4711' })

The tags can be thought of as the name of a dedicated pub–sub channel for this particular workflow instance. We demonstrate how to create both a robot and the warehouse, even though you probably won’t do that on the same computer in the real world.

Getting the process started means interacting with the state machines:

for await (const state of warehouse) {
  if (state.is(InitialWarehouse)) {
    await state.cast().commands()?.request('from', 'to')
  } else {
    // this role is done
    break
  }
}

The warehouse machine implements the async iterator JavaScript protocol, which makes it conveniently consumable using a for await (...) loop. Exiting this loop, e.g. using break as shown, will destroy the warehouse running machine, including cancelling the underlying Actyx event subscription for live updates.

Using the robot role we demonstrate a few more features of the machine runner:

let IamWinner = false

for await (const state of robot1) {
  if (state.is(Auction)) {
    const open = state.cast()
    if (!open.payload.scores.find((s) => s.robot === open.payload.robot)) {
      await open.commands()?.bid(1)
      setTimeout(() => {
        const open = robot1.get()?.as(Auction)
        open && open.commands()?.select(bestRobot(open.payload.scores))
      }, 5000)
    }
  } else if (state.is(DoIt)) {
    const assigned = state.cast()
    IamWinner = assigned.payload.winner === assigned.payload.robot
    if (!IamWinner) break
    // now we have the order and can start the mission
  }
}

The first one is that the accumulated state is inspected in the state.is(Auction) case to see whether this particular robot has already provided its bid for the auction. If not, it will do so by invoking a command, which will subsequently lead to the emission of a bid event and consequently to a new state being emitted from the machine, so a new round through the for await loop — this time we’ll find our bid in the list, though.

The second part is that upon registering our bid, we also set a timer to expire after 5sec. When that happens we synchronously check the current state of the workflow (since it will have changed, and if some other robot got to this part first, the auction may already be over). If the workflow still is in the Auction state, we compute the best robot bid (the logic in bestRobot is where your expertise would go) and run the select() command to emit the corresponding event and end the auction.

The third feature becomes relevant once the auction has ended: we check if our robot is indeed the winner and record that in a variable IamWinner, i.e. in the current application in-memory state. Then we can use this information in all following states as well.

Change detection on for-await loop

When using a for-await loop with the machine runner, the loop iterates only if all of the following criteria are met:

  • A 'caughtUp' event is emitted; It happens when the machine runner receives the latest event published in Actyx;
  • An event between the current caughtUp and the previous one triggers a change to the machine's state; The state change is determined by comparing the name and payload between the state before and after the caughtUp event. The comparison uses the deepEqual function provided by the fast-equal package.

The consequences of Eventual Consensus

The design goal of Actyx and the machine runner is to provide uncompromising resilience and availability, meaning that if a device is capable of computation it shall be able to make progress, independent of the network. This implies that two devices that are not currently connected (which also includes the brief time lag introduced by ping times between them!) can make contradicting choices in a workflow.

In the example above, we deliberately didn’t use a manager or referee role to select the winner in the auction, since that decision maker would be a single-point-of-failure in the whole process. Instead, each robot independently ensures that after at five seconds a decision will be made — even if two robots concurrently come to different conclusions and both emit a selected event.

Machine runner resolves this conflict by using only the selected event that comes first in the Actyx event sort order; in other words, Actyx arbitrarily picks a winner and the losing event is discarded. If a robot saw itself winning, started the mission, and then discovers that its win turned out to be invalid, it will have to stop the mission and pick a new one.

Errors

The following section describes various unavoidable errors that can arise due to the language design of JavaScript and TypeScript combined with the library's inherent distributed and asynchronous nature.

Errors Emittable On Command Calls

A command call returns a promise. The promise's resolution marks the success of the events' publication to Actyx.

const whenParticularState = state.as(ParticularState)
if (whenParticularState) {
  // awaits the promise, which consequently may break the control flow if there is a thrown exception / promise rejection
  await whenParticularState.commands()?.someCommand()
}

However, certain scenarios can result in async errors (i.e. a rejected promise) which will be explained momentarily.

To avoid errors, the general best practice is to not stash commands in an external variable and defer the call. The commands() will return undefined when possible errors are detected. The passing of time might have invalidated the detection result, which is the stashed value.

// Good
await whenParticularState.commands()?.someCommand()

// Bad
const commands = whenParticularState.commands()
await someLongRunningTask() // this may have invalidated the error detection when finished executing
await commands?.someCommand()

Avoid accidentally not issuing commands by using state objects produced by next, peek, and for-await loop, as opposed to the get method. These methods ensure the newly retrieved state objects are not expired. It is recommended to only use get method when it is necessary to retrieve state objects immediately (without waiting for the machine to finish processing the next event batch) at the cost of possibly getting expired, locked, or non-caught-up state objects, such as when observing a machine runner's state passively.

// Guaranteed working, assuming there's no other
const whenOn = (await r.machine.peek()).value?.as(On)
if (whenOn) {
  await whenOn.commands()?.toggle() // returns promise
}

// There's a chance that commands is not available
const whenOn = r.machine.get().value?.as(On)
if (whenOn) {
  await whenOn.commands()?.toggle() // may be undefined
}

The list of errors that may arise is as follows.

  • MachineRunnerErrorCommandFiredAfterExpired:

    This error results from a command call to an expired state or a machine-runner with a non-empty event queue.

    An "Expired" state object is a state object that does not match its machine-runner's current state object. An expired state object can be obtained by means of storing a state object's reference in a variable and using it at a later time when the machine-runner has transitioned to another state.

    The non-empty event queue condition happens when a machine-runner is waiting for the completion of a multi-events transition. In a multi-events transition, different parts of this ordered events chain can arrive at different points in time. The queue is not empty in the period between the first and the last arrival.

  • MachineRunnerErrorCommandFiredAfterLocked:

    This error results from a command call to a locked state object/machine-runner. It is generally avoidable by not having a command being called twice in a concurrent fashion in the same state.

    "Locked" is a transitionary state object/machine-runner between the time a command is called and it receives a rejection. When a command results in a failed publication, the source state objects are unlocked, thus subsequent commands will be available. When a command results in a successful publication, the machine-runner is unlocked, but the issuing state object is kept locked to prevent subsequent command calls. The machine-runner will immediately produce a new state object with commands enabled, accessible via next, peek, and for-await loop.

  • MachineRunnerErrorCommandFiredAfterDestroyed:

    This error results from a command call to a state object of a destroyed machine.

    "Destroyed" is the status of a machine-runner that has been destroyed, either by explicitly calling its .destroy() method or by breaking out of the for-await that is applied to the runner.

  • MachineRunnerErrorCommandFiredWhenNotCaughtUp:

    This error results from a command call to a state object whose machine is not caught up.

    "Caught up" is a state where a machine-runner has processed all published events in a subscription stream. Actyx sends events in batches. During the processing of a batch, the machine-runner is not caught up.

Features

Observe Changes and Errors

An alternative use case of a machine runner is to listen to its events.

The next event emits states whenever a new state is calculated. When not using the machine, calling destroy is required to close the connection to Actyx.

const warehouse = createMachineRunner(actyx, tags, InitialWarehouse, { id: '4711' })

warehouse.events.on('next', (state) => {
  if (state.is(InitialWarehouse)) {
    // ...
  }
})

await untilWareHouseIsNotUsedAnymore()

warehouse.destroy()

error event can be used to capture errors from machine-runner.

import {
  MachineRunnerErrorCommandFiredAfterLocked,
  MachineRunnerErrorCommandFiredAfterDestroyed,
  MachineRunnerErrorCommandFiredAfterExpired,
} from "@actyx/machine-runner"

warehouse.events.on('error', (error) => {
  if (error instanceof MachineRunnerErrorCommandFiredAfterLocked) {
    //
  }
  
  if (error instanceof MachineRunnerErrorCommandFiredAfterDestroyed) {
    //
  }

  if (error instanceof MachineRunnerErrorCommandFiredAfterExpired) {
    //
  }
})

Event List

next

A next event is emitted when a state transition happens and the machine runner has processed all events matching the supplied tag.

The payload is StateOpaque, similar to the value produced in the for-await loop.

error

An error event is emitted when an error happened inside the runner. Currently this is the list of the errors:

  • A command is called when locked i.e. another command is being issued in the same machine
  • A command is called when the corresponding state is expired i.e. another command has been successfully issued from that state
  • A command is called on a destroyed machine

The payload has an error subtype.

change

A change event is emitted when a next event is emitted, a command is issued, a command’s event has been published, or a subscription error happened due to losing a connection to Actyx. This event is particularly useful in UI code where not only state changes are tracked, but also command availability and errors.

The payload is of type StateOpaque, like the value produced in the for-await loop.

debug.bootTime

A debug.bootTime event is emitted when a machine runner has caught up with its Actyx subscription (i.e. finished processing its events until the latest one) for the first time.

The payload includes information on the duration of the booting, the number of events processed, and the identity containing the swarm name, machine name, and tags.

// Logs every time a machine booting takes more than 100 milliseconds or processed more than 100 events
machine.events.on(
  'debug.bootTime',
  ({ durationMs, eventCount, identity: { machineName, swarmProtocolName, tags } }) => {
    if (durationMs > 100 || eventCount > 100) {
      console.warn(
        `Boot of "${swarmProtocolName}-${machineName}" tagged "${tags.toString()}" takes longer than usual (${durationMs} milliseconds of to process ${eventCount} events)`,
      )
    }
  },
)

Zod on MachineEvent

Zod can be used to define and validate MachineEvents. On designing an event, use withZod instead of withPayload.

export const requested = MachineEvent.design('requested').withPayload<{
  id: string
  from: string
  to: string
}>()

The above code can be converted into:

import * as z from 'zod'

export const requested = MachineEvent.design('requested').withZod(
  z.object({
    id: z.string(),
    from: z.string(),
    to: z.string(),
  }),
)

A zod-enabled event factory will have these additional features enabled:

  • When receiving events from Actyx, a MachineRunner will compare the event payload to the embedded ZodType, in addition to the mandatory event type checking. Events that don't match the defined MachineEvent on the reaction will be ignored by the MachineRunner. For example, see the reaction definition below:

    InitialWarehouse.react([requested], DoneWarehouse, (_ctx, _r) => [{}])

    In a scenario where an incorrectly created event comes from Actyx { "type": "requested", id: "some_id" }, the said event will not be regarded as valid and will be ignored.

  • When creating an event via the factory, which would be requested.make for the example above, an extra step will be taken to validate the payload. When the make method is called with an incorrect value, an exception will be thrown because internally ZodType.parse is used to validate the payload. For example:

    // Will throw because `{}` is not a valid value for the previously provided zod schema
    // But it takes `as any` to bypass TypeScript compiler in order to do this
    const singleEvent = requested.make({} as any)

    An extra care must be taken when the ZodType is refined. In contrast to a mismatch in schema, a refined ZodType is not caught at compile-time. Therefore, a compilation process and IDE warnings is not sufficient to catch these errors. For example:

    export const requested = MachineEvent.design('requested').withZod(
      z
        .object({
          id: z.string(),
          from: z.string(),
          to: z.string(),
        })
        .refine((payload) => {
          return payload.from == payload.to
        }),
    )
    
    // Will throw exception because `from` is same with `to`.
    // This mistake can't be caught by TypeScript compiler
    requested.make({
      id: 'some_id',
      from: 'some_location',
      to: 'some_location',
    })

Global Event Emitter

Some global event emitters are provided. These event emitters will emit events from all machine runners in the same process.

import { globals as machineRunnerGlobals } from "@actyx/machine-runner";

globals.emitter.addListener("debug.bootTime", ({ identity, durationMs, eventCount }) => {
  if (durationMs > 100) {
    console.warn(`${identity} boot time takes more than 100ms (${durationMs}ms) processing ${eventCount} events`);
  }
});

globals.emitter.addListener("error", console.error);

Extra Tags

In the case extra tags are required to be attached in events when invoking commands, extra tags can be registered on a command definition. These extra tags will always be attached when the command is invoked.

// State definition
export const InitialWarehouse = TransportOrderForWarehouse.designState('Initial')
  .withPayload<{ id: string }>()
  .command('request', [requested], (ctx, from: string, to: string) => {
    return [
      ctx.withTags(
        [`transport-order-from:${from}`, `transport-order-to:${to}`],
        { id: ctx.self.id, from, to }
      )
    ]
  })
  .finish()

// Command call
// The resulting events will include the extra tags
// `transport-order-from:${from}`,
// `transport-order-to:${to}`
const stateAsInitialWarehouse = state
  .as(InitialWarehouse)?
  .commands()?
  .request(from: `source`, to: `destination`);

refineStateType

A MachineRunner instance now has a new method available: refineStateType which return a new aliasing machine. State payload produced by the returned machine is typed as the union of all possible payload types instead of unknown. The union is useful to be used in combination with type-narrowing.

Usage example:

// States defined in previous examples
const allStates = [Initial, Auction, DoIt] as const
const machine = createMachineRunner(actyx, tags, Initial, { robot: 'agv1' }).refineStateType(
  allStates,
)

const state = machine.get()
if (state) {
  const payload = state.payload

  // Equals to:
  //  | { robot: string }
  //  | { id: string; from: string; to: string; robot: string; scores: Score[] }
  //  | { robot: string; winner: string }
  type PayloadType = typeof state.payload

  // 'robot' property is accessible directly because it is available in all variants
  const robot = payload.robot

  // Used with type-narrowing
  if ('winner' in payload) {
    // here the type of payload is narrowed to { robot: string; winner: string }
  } else if ('id' in payload) {
    // here the type of payload is narrowed to { id: string; from: string; to: string; robot: string; scores: Score[] }
  } else {
    // here the type of payload is narrowed to { robot: string }
  }
}

The argument to .refineStateType must be an array containing all previously defined states. Any other argument will throw an error.

The aliasing machine shares the original machine's internal state. All method calls, such as .destroy, create the same effect as when enacted on the original machine.

Developer support

If you have any questions, suggestions, or just want to chat with other interested folks, you’re welcome to join our discord chat. Please find a current invitation link on the top right of the Actyx docs page.