effect-encore
v0.12.7
Published
Declarative actors and durable workflows for `@effect/cluster`.
Readme
effect-encore
Declarative actors and durable workflows for @effect/cluster.
bun add effect-encorePeer dependency: effect >= 4.0.0-beta.64.
For v3 @effect/cluster compat: import { Actor } from "effect-encore/v3".
Why
@effect/cluster requires custom Schema.Class, Rpc.make, RpcGroup, Entity.make, handler wiring, and a hand-rolled client service. Workflows add Activity, DurableDeferred, DurableClock, and Workflow.make on top. effect-encore compresses both into a declarative DSL — define entities and workflows as plain objects, get typed actors with execute/send/peek/watch/waitFor and a step DSL for durable orchestration.
Core API
Entity — reactive message handlers
import { Actor } from "effect-encore";
import { Schema } from "effect";
const Order = Actor.fromEntity("Order", {
Place: {
payload: { item: Schema.String, qty: Schema.Number },
success: Schema.String,
persisted: true,
id: (p) => `${p.item}-${p.qty}`,
},
Cancel: {
payload: { reason: Schema.String },
persisted: true,
id: (p) => p.reason,
},
});Persisted entity operations dedupe by the primaryKey returned from id; completions are reused until .rerun(payload) explicitly clears that execution.
Workflow — durable multi-step processes
const ProcessOrder = Actor.fromWorkflow("ProcessOrder", {
payload: { orderId: Schema.String },
success: OrderResult,
error: OrderError,
id: (p) => p.orderId,
signals: {
ManagerApproval: { success: ApprovalDecision },
Cancel: {},
},
captureDefects: true,
suspendOnFailure: false,
});Identity & Type Guards
// .name — the actor's name
Order.name; // "Order"
ProcessOrder.name; // "ProcessOrder"
// .type — the cluster entity type
Order.type; // "Order"
ProcessOrder.type; // "Workflow/ProcessOrder"
// Type guards
Actor.isEntity(Order); // true — narrows to EntityActor
Actor.isWorkflow(ProcessOrder); // true — narrows to WorkflowActor
// .of — typed identity for handler construction
Order.of({ Place: ..., Cancel: ... }); // infers handler types from defsUnified Call Site
Both entities and workflows share the same ref.execute / ref.send interface:
// Entity
const ref = yield * Order.ref("ord-1");
const result = yield * ref.execute(Order.Place({ item: "widget", qty: 3 }));
const execId = yield * ref.send(Order.Place({ item: "widget", qty: 3 }));
// Workflow — nullary actor()
const ref = yield * ProcessOrder.ref();
const result = yield * ref.execute(ProcessOrder.Run({ orderId: "ord-1" }));
const execId = yield * ref.send(ProcessOrder.Run({ orderId: "ord-1" }));Peek, Watch & WaitFor
Track execution status via opaque ExecId:
const execId = yield * ref.send(Order.Place({ item: "widget", qty: 3 }));
// one-shot status check
const status = yield * Order.peek(execId);
// → Pending | Success | Failure | Interrupted | Defect | Suspended
// polling stream
const stream = Order.watch(execId);
// block until terminal (or custom filter)
const final = yield * Order.waitFor(execId);
const custom =
yield *
Order.waitFor(execId, {
filter: (r) => r._tag === "Success",
schedule: Schedule.spaced("1 second"),
});
// compute ExecId without executing
const id = yield * Order.executionId("ord-1", Order.Place({ item: "widget", qty: 3 }));Handle — Entity
const OrderLive = Actor.toLayer(Order, {
Place: ({ operation }) => Effect.succeed(`order: ${operation.item} x${operation.qty}`),
Cancel: ({ operation }) => cancelOrder(operation.reason),
});
// Use .of for type-safe handlers when yielding services in Effect.gen
const OrderLive = Actor.toLayer(
Order,
Effect.gen(function* () {
const db = yield* Database;
return Order.of({
Place: ({ operation }) => db.placeOrder(operation.item, operation.qty),
Cancel: ({ operation }) => db.cancelOrder(operation.reason),
});
}),
);Entity State
Long-lived entity handlers can expose live, in-memory state without a
side-channel registry in the host app. Register the state from the entity scope;
clients read or watch it through the actor, keyed by the same entityId used
for operations.
const CounterLive = Actor.toLayer(
Counter,
Effect.gen(function* () {
const state = yield* SubscriptionRef.make(0);
yield* Actor.registerState({
get: SubscriptionRef.get(state),
watch: SubscriptionRef.changes(state),
});
return Counter.of({
Increment: ({ operation }) =>
SubscriptionRef.updateAndGet(state, (n) => n + operation.amount),
});
}),
);
const current = yield * Counter.getState<number>("counter-1");
const changes = Counter.watchState<number>("counter-1");
const activeIds = yield * Counter.listStateEntityIds();Cold getState and watchState calls materialize the entity before reading the
registered state; apps do not need to define their own no-op operation for that
case. The registration finalizer removes the state handle when the entity scope
closes. Actor.toLayer and Actor.toTestLayer provide the state registry
locally; remote producer-only runtimes cannot observe another process's live
heap state.
SQL Message Storage
Entity .rerun(payload) needs surgical deletion of one persisted request and
its replies. SQL-backed runtimes can use Encore's SQL layer instead of writing
their own adapter:
import { fromSqlClient } from "effect-encore";
import { SqliteClient } from "@effect/sql-sqlite-bun";
const MessageStorageLive = fromSqlClient().pipe(
Layer.provide(SqliteClient.layer({ filename: "app.db" })),
);fromSqlClient() provides both upstream MessageStorage.MessageStorage and
Encore's EncoreMessageStorage. It uses Effect Cluster's default
cluster_messages / cluster_replies tables and default sharding config. Use
fromSqlClientWithShardingConfig() when the host provides a custom
ShardingConfig.
Handle — Workflow (Step DSL)
Workflow handlers receive (payload, step) — a context object that wraps upstream workflow primitives.
Always provide success and error schemas. Activities serialize results through JSON — explicit schemas ensure durable round-tripping and typed decode. The shorthand (step.run(id, effect)) uses Schema.Unknown internally, which accepts any JSON-safe value but loses type safety on decode. Use it for prototyping; prefer full options for production workflows.
const ProcessOrderLive = Actor.toLayer(ProcessOrder, (payload, step) =>
Effect.gen(function* () {
// step.run — full options (recommended)
const order = yield* step.run("create-order", {
do: createOrder(payload),
success: OrderSchema,
});
// step.run — with undo (compensation on workflow failure)
const charge = yield* step.run("charge-card", {
do: chargeCard(order),
success: ChargeResult,
undo: (charge, _cause) => refundCharge(charge.id),
retry: { times: 3 },
});
// step.sleep — durable sleep
yield* step.sleep("cooling-period", "5 minutes");
// signal — await external input (defined on WorkflowDef.signals)
const token = yield* ProcessOrder.ManagerApproval.token;
yield* step.run("send-approval-email", {
do: sendApprovalEmail({ token }),
success: Schema.Void,
});
const decision = yield* ProcessOrder.ManagerApproval.await;
// step.race — first activity to complete wins
const winner = yield* step.race("fast-path", [
{ name: "route-a", execute: routeA(order), success: RouteResult },
{ name: "route-b", execute: routeB(order), success: RouteResult },
]);
// step.run — shorthand (infallible, Schema.Unknown — quick & dirty)
const debug = yield* step.run("log", Effect.succeed("ok"));
return { orderId: order.id, chargeId: charge.id };
}),
);Signal — external resolution
Signals are declared on WorkflowDef.signals and become typed properties on the actor:
// Defined on the workflow (see above)
// signals: { ManagerApproval: { success: ApprovalDecision } }
// Resolve from outside the workflow
const token = ProcessOrder.ManagerApproval.tokenFromExecutionId(executionId);
yield * ProcessOrder.ManagerApproval.succeed({ token, value: decision });Sender-Only (Client Layer)
.send() (fire-and-forget dispatch) goes through ActorMailbox + ActorAddressResolver Tags. Consumer hosts that already have full Sharding.Sharding get the wiring for free from Actor.toLayer.
Sender-only / ops-only hosts that must NOT register entity managers use ActorSenderLayer — bundles the three Tags on the fromConfig variants. Requires only MessageStorage + ShardingConfig, no Sharding runtime, no notifyLocal deadlock:
import { Layer } from "effect";
import { MessageStorage, ShardingConfig } from "effect/unstable/cluster";
import { ActorSenderLayer } from "effect-encore";
const SenderSupport = ActorSenderLayer.layer.pipe(
Layer.provide(MessageStorage.layerMemory), // or your durable storage
Layer.provide(ShardingConfig.layer()),
);
// Or, for tests / single-process setups — bundle includes in-memory
// storage and default sharding config:
const SenderTest = ActorSenderLayer.layerMemory;
// Sends are durably enqueued; the consumer's storage poll loop picks them up
// on the next entityMessagePollInterval tick.
yield * Order.Place.send({ item: "widget", qty: 3 });ActorMailboxLayer.fromConfig rejects non-persisted requests with MailboxError (only persisted requests can cross the storage boundary), mirroring upstream's persisted gate. Use ActorMailboxLayer.fromSharding when the host already has Sharding.Sharding and prefers the notifyLocal-accelerated path. The underlying ActorMailboxLayer / ActorAddressResolverLayer Tags remain exposed for advanced wiring (e.g. ops-only hosts that need address resolution but not .send).
Test
const OrderTest = Actor.toTestLayer(Order, {
Place: ({ operation }) => Effect.succeed(`order: ${operation.item}`),
Cancel: () => Effect.void,
});
const ProcessOrderTest = Actor.toTestLayer(ProcessOrder, (payload, step) =>
Effect.gen(function* () {
yield* step.run("work", {
do: Effect.succeed("done"),
success: Schema.String,
});
return { orderId: payload.orderId, status: "ok" };
}),
);
const test = it.scopedLive.layer(Layer.provide(OrderTest, TestShardingConfig));
test("places an order", () =>
Effect.gen(function* () {
const ref = yield* Order.ref("ord-1");
const result = yield* ref.execute(Order.Place({ item: "widget", qty: 1 }));
expect(result).toBe("order: widget");
}));Lifecycle
// Workflow: cancel + resume
yield * ProcessOrder.interrupt("ord-1");
yield * ProcessOrder.resume("ord-1");
// Entity: flush all messages + replies
yield * Order.flush("ord-1");
// Entity: redeliver — clear read leases so unprocessed messages re-enter polling
yield * Order.redeliver("ord-1");Protocol Transform
Transform the underlying RpcGroup protocol — middleware, annotations, or any protocol-level operation:
import { RpcMiddleware } from "effect/unstable/rpc";
class AuthMiddleware extends RpcMiddleware.Service<AuthMiddleware>()("AuthMiddleware", {
error: Schema.Never,
}) {}
const SecureOrder = Actor.fromEntity("Order", defs).pipe(
Actor.withProtocol((protocol) => protocol.middleware(AuthMiddleware)),
);PeekResult Schema
Encode/decode PeekResult values for serialization:
import { PeekResultSchema } from "effect-encore";
const schema = PeekResultSchema(Schema.String, OrderError);
const decode = Schema.decodeUnknownSync(schema);
const encode = Schema.encodeSync(schema);Delayed Delivery
const Scheduled = Actor.fromEntity("Scheduled", {
Process: {
payload: { id: Schema.String, deliverAt: Schema.DateTimeUtc },
id: (p) => p.id,
deliverAt: (p) => p.deliverAt,
persisted: true,
},
});License
MIT
