sync-about-it
v0.1.0
Published
Event-sourced CQRS/DDD connective tissue between XState v5 and PowerSync
Downloads
213
Readme
sync-about-it
Event-sourced CQRS/DDD connective tissue between XState v5 and PowerSync.
Overview
sync-about-it gives structure to the pattern of event-sourced domain entities flowing through PowerSync's sync loop. You define your domain once — entity fields and an XState v5 state machine — and the library derives PowerSync table schemas, intent event types, and projection types from that single definition.
Every user action becomes an intent that transitions a local state machine, gets persisted to SQLite, and syncs to the server via PowerSync. The server runs the same (or stricter) machine to validate each intent, producing confirmed projections that sync back to the client. Rejections flow back too, with structured reasons the UI can surface.
This is not a framework or an abstraction over PowerSync. It is a thin library that wires together the event-sourcing discipline — intent persistence, optimistic projection, server validation, reconciliation, and rejection handling — on top of the sync infrastructure PowerSync already provides.
Install
npm install sync-about-itPeer dependencies
| Package | Required | Notes |
|---------|----------|-------|
| xstate ^5.0.0 | Yes | State machine runtime |
| @powersync/common ^1.0.0 | Yes | PowerSync database types and schema |
| @powersync/react ^1.0.0 | No | Only if using sync-about-it/react |
| react ^18.0.0 || ^19.0.0 | No | Only if using sync-about-it/react |
npm install xstate @powersync/common
# For React bindings:
npm install @powersync/react reactQuick Start
import { setup, assign } from 'xstate';
import { defineDomain, generateSchema, createEventSourcedActor } from 'sync-about-it';
// 1. Define a state machine
const todoMachine = setup({
types: {
context: {} as { title: string; completed: number },
events: {} as
| { type: 'CREATE'; title: string }
| { type: 'TOGGLE' },
},
}).createMachine({
id: 'todo',
initial: 'idle',
context: { title: '', completed: 0 },
states: {
idle: { on: { CREATE: { target: 'active', actions: assign({ title: ({ event }) => event.title }) } } },
active: { on: { TOGGLE: { target: 'done', actions: assign({ completed: () => 1 }) } } },
done: { on: { TOGGLE: { target: 'active', actions: assign({ completed: () => 0 }) } } },
},
});
// 2. Define the domain
const todoDomain = defineDomain({
name: 'todo',
fields: { title: 'text', completed: 'integer' },
machine: todoMachine,
});
// 3. Generate PowerSync schema
const schema = generateSchema(todoDomain);
// 4. Create an actor and dispatch events
const actor = await createEventSourcedActor(todoDomain, db, {
clientId: 'client-1',
});
actor.start();
const projection = await actor.send({ type: 'CREATE', title: 'Buy milk' });
console.log(projection); // { title: 'Buy milk', completed: 0 }Concepts
Domain definition — A domain bundles an entity's field structure (name-to-column-type mapping) with its XState v5 state machine. Call defineDomain() once; every other API derives from this single source of truth.
Intents — Every user action is captured as an immutable intent event. Intents are persisted locally before any sync occurs, making the system fully offline-capable. The intent log is the source of truth; projections are derived from it.
Projections — The current state of an entity, computed by replaying intents through the state machine. The client maintains an optimistic projection; the server maintains the authoritative one. Both are derived from the same machine — they're disposable caches that can be rebuilt at any time.
Reconciliation — When the server's confirmed projection syncs back to the client, the reconciliation engine merges it with any remaining pending local intents. If no intents are pending, the server projection is used directly. If local intents remain, they're replayed on top of the server state.
Rejections — When the server machine rejects an intent (invalid transition, failed guard), a rejection record with the reason and the machine state at rejection time syncs to the client. The client removes the rejected intent, reconciles, and surfaces the error.
Asymmetric trust — The client machine is optimistic and permissive for responsiveness. The server machine is authoritative and may enforce stricter guards, authorization checks, or validation. The server can use the same machine or a stricter variant — the asymmetry is by design.
API Reference
Domain Definition
function defineDomain<TName, TFields, TMachine>(
config: DomainDefinition<TName, TFields, TMachine>
): Domain<TName, TFields, TMachine>Creates a frozen domain object from a name, fields record, and XState v5 machine. The returned Domain is the handle passed to all other APIs.
type ColumnType = 'text' | 'integer' | 'real'
type FieldsRecord = Record<string, ColumnType>
interface DomainDefinition<TName, TFields, TMachine> {
name: TName;
fields: TFields;
machine: TMachine;
}
interface Domain<TName, TFields, TMachine> {
readonly name: TName;
readonly fields: Readonly<TFields>;
readonly machine: TMachine;
}Type Inference
type DomainContext<D> // Entity shape derived from fields (text→string, integer→number, real→number)
type DomainEvents<D> // Union of event types from the machine
type DomainState<D> // Machine snapshot typeEvent Types
interface IntentEvent<D> // Shape of a pending intent record
interface ProcessedEvent<D> // Archived intent with projection snapshot (extends IntentEvent)
interface RejectionRecord<D> // Rejection record with reason and state at rejection timeSchema Generation
function generateTables(domain: Domain): Record<string, Table>Generates the six PowerSync table definitions for a domain (intent_events, intent_events_processed, intent_events_rejected, projection, intent_events_local, projection_local).
function generateSchema(...domains: Domain[]): SchemaCombines tables from one or more domains into a single PowerSync Schema. Throws on table name collisions.
Client
async function createEventSourcedActor<D extends Domain>(
domain: D,
db: AbstractPowerSyncDatabase,
options: EventSourcedActorOptions,
): Promise<EventSourcedActor<D>>Factory that creates the primary client-side actor. For new entities, omit entityId (a UUID is generated). For existing entities, pass entityId to restore from the server projection and replay pending intents.
interface EventSourcedActorOptions {
clientId: string;
entityId?: string;
}
interface EventSourcedActor<D extends Domain> {
readonly entityId: string;
send(event: DomainEvents<D>): Promise<DomainContext<D>>;
getProjection(): DomainContext<D>;
start(): void;
stop(): void;
subscribe(callback: (projection: DomainContext<D>) => void): () => void;
onRejection(callback: RejectionCallback<D>): () => void;
getRejections(): Promise<RejectionRecord<D>[]>;
clearRejection(rejectionId: string): Promise<void>;
getAllRejections(): Promise<RejectionRecord<D>[]>;
reconcile(): Promise<void>;
}function createUploadHandler(
domain: Domain,
): (database: AbstractPowerSyncDatabase) => Promise<void>Creates an uploadData handler for PowerSync's PowerSyncBackendConnector. Reads pending intents from the local table and writes them to the synced table.
class IntentWriter<D extends Domain> {
constructor(domain: D, db: AbstractPowerSyncDatabase, clientId: string);
init(): Promise<void>;
write(entityId: string, event: object, machineSnapshot: unknown): Promise<IntentEvent<D>>;
getPendingIntents(entityId: string): Promise<IntentEvent<D>[]>;
getAllPendingIntents(): Promise<IntentEvent<D>[]>;
clearProcessedIntents(intentIds: string[]): Promise<void>;
}Client-side intent persistence layer. Writes intent events to the local-only SQLite table. Used internally by createEventSourcedActor but available for advanced use cases.
class ReconciliationEngine<D extends Domain> {
constructor(domain: D, db: AbstractPowerSyncDatabase, entityId: string, clientId: string, writer: IntentWriter<D>);
onProjectionChange(callback: (projection: DomainContext<D>, snapshot: SnapshotFrom<AnyStateMachine>) => void): void;
onRejection(callback: RejectionCallback<D>): () => void;
start(): void;
stop(): void;
reconcile(): Promise<ReconciliationResult<D> | null>;
}
interface ReconciliationResult<D extends Domain> {
projection: DomainContext<D>;
clearedIntentIds: string[];
snapshot: SnapshotFrom<AnyStateMachine>;
}
type RejectionCallback<D> = (rejection: RejectionRecord<D>) => voidMerges server-confirmed projections with pending local intents. Used internally by createEventSourcedActor.
class RejectionManager<D extends Domain> {
constructor(domain: D, db: AbstractPowerSyncDatabase);
getRejections(entityId: string): Promise<RejectionRecord<D>[]>;
clearRejection(rejectionId: string): Promise<void>;
getAllRejections(): Promise<RejectionRecord<D>[]>;
}Client-side API for querying and managing rejection records.
Server
function createEventProcessor(
domain: Domain,
db: ServerDatabaseAdapter,
options?: EventProcessorOptions,
): EventProcessorFactory that creates the server-side authoritative engine. Consumes pending intents, validates through the XState machine, and manages the projection/archive/rejection lifecycle.
interface EventProcessorOptions {
serverMachine?: AnyStateMachine; // Stricter machine for asymmetric trust
}
interface EventProcessor {
processIntent(intentEvent: IntentEvent): Promise<ProcessIntentResult>;
processPending(): Promise<ProcessingSummary>;
}
interface ProcessIntentResult {
status: 'processed' | 'rejected';
}
interface ProcessingSummary {
processed: number;
rejected: number;
errors: number;
}
interface ServerDatabaseAdapter {
execute(sql: string, params?: unknown[]): Promise<void>;
getAll<T>(sql: string, params?: unknown[]): Promise<T[]>;
getOptional<T>(sql: string, params?: unknown[]): Promise<T | null>;
transaction(fn: (tx: ServerDatabaseAdapter) => Promise<void>): Promise<void>;
}React
Exported from sync-about-it/react. All hooks accept an optional db parameter; if omitted, they read from the nearest SyncAboutItProvider.
function SyncAboutItProvider(props: SyncAboutItProviderProps): ReactNode
interface SyncAboutItProviderProps { db: AbstractPowerSyncDatabase; children: ReactNode }
function useSyncAboutIt(): { db: AbstractPowerSyncDatabase }function useProjection<D>(domain: D, db: AbstractPowerSyncDatabase | undefined, entityId: string): UseProjectionResult<D>
interface UseProjectionResult<D> { data: DomainContext<D> | null; loading: boolean; error: Error | null }function useDomainActor<D>(domain: D, db: AbstractPowerSyncDatabase | undefined, options: UseDomainActorOptions): UseDomainActorResult<D>
interface UseDomainActorOptions { entityId?: string; clientId: string }
interface UseDomainActorResult<D> { actor: EventSourcedActor<D> | null; projection: DomainContext<D> | null; send: (event: DomainEvents<D>) => void }function useRejections<D>(domain: D, db: AbstractPowerSyncDatabase | undefined, entityId?: string): UseRejectionsResult<D>
interface UseRejectionsResult<D> { rejections: RejectionRecord<D>[]; clearRejection: (id: string) => void }Architecture
CLIENT SERVER
------ ------
1. User action
|
2. Client XState machine processes event
(optimistic - permissive)
|
3. Intent written to
intent_events_local
(local-only SQLite table)
|
4. UI renders from local
projection (machine snapshot)
|
=== PowerSync sync ==========> 5. Intent arrives in
intent_events (Postgres)
|
6. EventProcessor picks up intent
|
7. Server XState machine processes event
(authoritative - may enforce
stricter guards than client)
|
8a. Valid -> archive to
intent_events_processed,
update projection,
delete from intent_events
8b. Invalid -> write to
intent_events_rejected,
delete from intent_events
|
<== PowerSync sync =========== 9. Updated projection and/or
rejections sync to client
|
10. Client reconciles:
- Replace projection if no
pending local intents
- Replay pending intents on
top of server projection
if local intents remain
- Surface rejections to userThe library provides the structure for steps 2, 3, 6, 7, 8, and 10. PowerSync handles transport (steps 4-5 and 9-10). You provide the UI (step 1) and the domain definition.
Per-domain tables
| Table | Location | Synced | Purpose |
|-------|----------|--------|---------|
| {name}_intent_events | Postgres | Yes | Pending intent queue (client -> server) |
| {name}_intent_events_processed | Postgres | No | Immutable event archive with snapshots |
| {name}_intent_events_rejected | Postgres | Yes | Rejection records (server -> client) |
| {name}_projection | Postgres | Yes | Authoritative state (server -> client) |
| {name}_intent_events_local | Client SQLite | No | Local write-ahead log (local-only) |
| {name}_projection_local | Client SQLite | No | Optional local projection cache |
React Bindings
Wrap your app with SyncAboutItProvider to avoid passing db to every hook:
import { SyncAboutItProvider, useDomainActor, useProjection, useRejections } from 'sync-about-it/react';
function App() {
return (
<SyncAboutItProvider db={db}>
<TodoList />
</SyncAboutItProvider>
);
}
function TodoItem({ entityId }: { entityId: string }) {
const { projection, send } = useDomainActor(todoDomain, undefined, {
clientId: 'client-1',
entityId,
});
const { data, loading } = useProjection(todoDomain, undefined, entityId);
const { rejections, clearRejection } = useRejections(todoDomain, undefined, entityId);
if (!projection) return <div>Loading...</div>;
return (
<div>
<span>{projection.title}</span>
<button onClick={() => send({ type: 'TOGGLE' })}>Toggle</button>
{rejections.map((r) => (
<div key={r.id}>
Rejected: {r.reason}
<button onClick={() => clearRejection(r.id)}>Dismiss</button>
</div>
))}
</div>
);
}Server Setup
Implement ServerDatabaseAdapter for your database client, then create an EventProcessor:
import { createEventProcessor, defineDomain } from 'sync-about-it';
// Your domain definition (can be shared with the client)
const todoDomain = defineDomain({ name: 'todo', fields: { title: 'text', completed: 'integer' }, machine: todoMachine });
// Optionally use a stricter server machine
const processor = createEventProcessor(todoDomain, dbAdapter, {
serverMachine: stricterTodoMachine, // optional
});
// Process all pending intents (call from a cron job, queue consumer, or webhook)
const summary = await processor.processPending();
console.log(`Processed: ${summary.processed}, Rejected: ${summary.rejected}, Errors: ${summary.errors}`);The ServerDatabaseAdapter interface:
const dbAdapter: ServerDatabaseAdapter = {
execute: (sql, params) => pool.query(sql, params).then(() => {}),
getAll: (sql, params) => pool.query(sql, params).then((r) => r.rows),
getOptional: (sql, params) => pool.query(sql, params).then((r) => r.rows[0] ?? null),
transaction: async (fn) => {
const client = await pool.connect();
try {
await client.query('BEGIN');
await fn({
execute: (sql, params) => client.query(sql, params).then(() => {}),
getAll: (sql, params) => client.query(sql, params).then((r) => r.rows),
getOptional: (sql, params) => client.query(sql, params).then((r) => r.rows[0] ?? null),
transaction: () => { throw new Error('Nested transactions not supported'); },
});
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
},
};License
MIT
