@fatagnus/dink-convex
v2.9.5
Published
Dink Convex sync component - bidirectional sync between Convex and edge devices
Maintainers
Readme
@fatagnus/dink-convex
Convex sync component for Dink - provides bidirectional sync between Convex and edge devices using CRDT deltas.
Minimal boilerplate - each user file is just ~5 lines!
Overview
@fatagnus/dink-convex enables bidirectional sync between your Convex database and edge devices. Changes made in Convex automatically sync to edge devices, and changes from edge devices sync back to Convex - all using conflict-free CRDTs.
Just use your normal Convex patterns - mutations work as usual with ctx.db, and the sync happens automatically in the background.
Features
- Automatic CRDT sync - Changes are automatically synced using Yjs CRDTs
- Bidirectional - Sync from edge to Convex and Convex to edge
- Minimal DX - Just 6 files, each ~5 lines of code
- Offline-first - Edge devices can work offline and sync when connected
- Conflict-free - CRDT-based merge ensures no data conflicts
- Replicate-compatible - Works seamlessly with @trestleinc/replicate for unified sync
Installation
npm install @fatagnus/dink-convex convex-helpers yjsNote:
convex-helpersis a peer dependency required for the sync mutation wrapper.
Quick Start
Get started with minimal boilerplate - each file is just ~5 lines!
1. Schema (convex/schema.ts)
import { defineSchema } from "convex/server";
import { v } from "convex/values";
import { syncedTable } from "@fatagnus/dink-convex";
import { internalSyncSchema } from "@fatagnus/dink-convex";
export default defineSchema({
tasks: syncedTable({
title: v.string(),
completed: v.boolean(),
}),
...internalSyncSchema,
});Note:
syncedTable()automatically injectsid: v.string()andtimestamp: v.number()fields, plus createsby_doc_idandby_timestampindexes. This pattern is compatible with @trestleinc/replicate.
2. Component Config (convex/convex.config.ts)
import { defineApp } from "convex/server";
import dink from "@fatagnus/dink-convex/component";
const app = defineApp();
app.use(dink);
export default app;3. Sync Mutations (convex/sync.ts)
import { createSyncMutations } from "@fatagnus/dink-convex";
import { mutation, query } from "./_generated/server";
export const { applyDeltaFromEdge, getDocumentState, listDocuments, listDocumentsPaginated } =
createSyncMutations({ mutation, query });4. Outbox Processor (convex/outboxProcessor.ts)
import { createOutboxProcessor } from "@fatagnus/dink-convex";
import { internalAction, internalMutation, internalQuery } from "./_generated/server";
import { internal } from "./_generated/api";
export const { processOutboxBatch, queryPendingItems, updateItemStatus, scheduleOutboxProcessing } =
createOutboxProcessor({ internalAction, internalMutation, internalQuery, internal });5. HTTP Router (convex/http.ts)
import { createSyncHttpRouter } from "@fatagnus/dink-convex";
import { httpAction } from "./_generated/server";
import { api } from "./_generated/api";
export default createSyncHttpRouter({ httpAction, api });6. Crons (convex/crons.ts)
import { createSyncCrons } from "@fatagnus/dink-convex";
import { internal } from "./_generated/api";
export default createSyncCrons({ internal });7. Sync Mutation Wrapper (convex/syncSetup.ts)
For automatic delta generation when using syncMutation:
import { createSyncMutation } from "@fatagnus/dink-convex";
import { mutation } from "./_generated/server";
import { internal } from "./_generated/api";
export const syncMutation = createSyncMutation(mutation, ["tasks"], {
getInternal: () => internal,
});Writing Mutations
Use syncMutation instead of mutation for tables you want to sync. Mutations work normally - just use ctx.db as usual:
// convex/tasks.ts
import { syncMutation } from "./syncSetup";
import { query } from "./_generated/server";
import { v } from "convex/values";
// Create a task - works just like normal mutation
export const create = syncMutation({
args: { title: v.string() },
handler: async (ctx, args) => {
// Normal ctx.db.insert - sync happens automatically!
// You must provide id (unique doc identifier) and timestamp (for sync ordering)
return await ctx.db.insert("tasks", {
id: crypto.randomUUID(),
title: args.title,
completed: false,
timestamp: Date.now(),
});
},
});
// Toggle completion
export const toggle = syncMutation({
args: { id: v.id("tasks") },
handler: async (ctx, args) => {
const task = await ctx.db.get(args.id);
if (!task) throw new Error("Task not found");
await ctx.db.patch(args.id, { completed: !task.completed });
},
});
// Delete a task
export const remove = syncMutation({
args: { id: v.id("tasks") },
handler: async (ctx, args) => {
await ctx.db.delete(args.id);
},
});Writing Queries
Queries work normally - just use ctx.db.query as you normally would:
// convex/tasks.ts
import { query } from "./_generated/server";
// List all tasks
export const list = query({
handler: async (ctx) => {
return await ctx.db.query("tasks").collect();
},
});
// Get a single task
export const get = query({
args: { id: v.id("tasks") },
handler: async (ctx, args) => {
return await ctx.db.get(args.id);
},
});Client-Side Usage
Use standard Convex React hooks - synced tables work like any other table:
// React component
import { useQuery, useMutation } from "convex/react";
import { api } from "../convex/_generated/api";
function TaskList() {
// useQuery works normally - data syncs automatically
const tasks = useQuery(api.tasks.list);
const createTask = useMutation(api.tasks.create);
const toggleTask = useMutation(api.tasks.toggle);
return (
<div>
{tasks?.map((task) => (
<div key={task._id} onClick={() => toggleTask({ id: task._id })}>
{task.completed ? "✓" : "○"} {task.title}
</div>
))}
</div>
);
}Environment Variables
Set these in your Convex dashboard:
DINK_HTTP_URL- URL to your dinkd gateway (e.g.,http://localhost:8080)DINK_APP_SYNC_KEY- App sync key for authentication
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ BIDIRECTIONAL SYNC FLOW │
└─────────────────────────────────────────────────────────────────┘
Edge Client dinkd Gateway Convex
─────────── ───────────── ──────
│ │ │
│ NATS Publish │ │
│ (CRDT Delta) │ │
├───────────────────────►│ │
│ │ POST /api/dink/applyDelta│
│ ├──────────────────────►│
│ │ │ Materialize
│ │ │ to user table
│ │ │
│ │ │
│ │ │
│ │ POST /api/sync/ │
│ │ convex-push │
│ │◄──────────────────────┤
│ NATS Broadcast │ │ Outbox
│ (CRDT Delta) │ │ Processor
│◄───────────────────────┤ │
│ │ │API Reference
Schema Helpers
syncedTable(fields)- Define a synced table with automaticidandtimestampfields plusby_doc_idandby_timestampindexes (Replicate-compatible)internalSyncSchema- Internal tables required for sync (spread into your schema)
Factory Functions
createSyncMutations({ mutation, query })- Creates sync mutations and queriescreateOutboxProcessor({ internalAction, internalMutation, internalQuery, internal })- Creates outbox processor functionscreateSyncHttpRouter({ httpAction, api })- Creates HTTP router with sync endpointcreateSyncCrons({ internal })- Creates cron job for outbox processingcreateSyncMutation(mutation, tables, options)- Creates sync-enabled mutation wrapper
Edge RPC Functions
callEdge(ctx, options)- Call an RPC method on a specific edgescatterCall(ctx, options)- Broadcast an RPC call to multiple edgesdiscoverEdges(ctx, options)- Discover available edges by service or labelskickEdge(ctx, edgeIdOrOptions)- Forcibly disconnect an edge from the server
kickEdge
Forcibly disconnects an edge from the dinkd server. This is useful for:
- Key revocation workflow - Kick the edge before revoking its API key to ensure immediate disconnection
- Maintenance operations - Force an edge to reconnect (e.g., after config changes)
- Security incidents - Immediately disconnect a compromised edge
- Decommissioning - Clean up edges that are being retired
import { kickEdge } from "@fatagnus/dink-convex";
import { action } from "./_generated/server";
import { v } from "convex/values";
// Simple usage - just pass the edge ID
export const disconnectEdge = action({
args: { edgeId: v.string() },
handler: async (ctx, args) => {
const result = await kickEdge(ctx, args.edgeId);
if (result.success) {
console.log(`Edge ${result.edgeId} has been disconnected`);
}
return result;
},
});
// With reason - useful for audit logging
export const decommissionEdge = action({
args: { edgeId: v.string(), reason: v.string() },
handler: async (ctx, args) => {
const result = await kickEdge(ctx, {
edgeId: args.edgeId,
reason: args.reason
});
return result;
},
});Return value:
{
success: boolean; // Always true if no error thrown
edgeId: string; // The kicked edge ID
message?: string; // Present if edge was not found (may already be disconnected)
}What happens when you kick an edge:
- dinkd sends a disconnect command to the edge via NATS
- The edge client receives the command and calls any registered
OnKickcallback - The edge gracefully disconnects from the server
- dinkd deregisters the edge and all its services from the presence store
Note: If the edge is not found (404), kickEdge returns success with a message indicating the edge may already be disconnected. This is not an error - it's safe to kick edges that are already offline.
Key Management Functions
createEdgeKey(ctx, options)- Create a new edge API keylistEdgeKeys(ctx)- List all edge keysrevokeEdgeKey(ctx, keyIdOrOptions, reason?)- Revoke an edge key (auto-kicks the edge first)
revokeEdgeKey
Revokes an edge API key and automatically kicks the edge to ensure immediate disconnection:
import { revokeEdgeKey } from "@fatagnus/dink-convex";
import { action } from "./_generated/server";
import { v } from "convex/values";
// Simple usage - kicks the edge, then revokes the key
export const decommissionAgent = action({
args: { keyId: v.string() },
handler: async (ctx, args) => {
await revokeEdgeKey(ctx, args.keyId, 'Agent decommissioned');
},
});
// With options - skip kicking (only revoke the key)
export const revokeKeyOnly = action({
args: { keyId: v.string() },
handler: async (ctx, args) => {
await revokeEdgeKey(ctx, {
keyId: args.keyId,
reason: 'Maintenance',
skipKick: true // Don't kick, just revoke
});
},
});Default behavior (recommended):
- Looks up the key to find the associated edge ID
- Kicks the edge (sends disconnect command via NATS)
- Revokes the key in the key store
With skipKick: true:
- Only revokes the key without kicking
- Edge stays connected until it tries to reconnect or make a new authenticated request
Introspection Functions
describeEdgeServices(ctx, serviceName)- Describe an edge service by name, returning its ServiceDescriptorgetLLMContext(ctx, serviceNames?)- Get LLM-friendly context (llm.txt format) for edge services
describeEdgeServices
Returns structured metadata about an edge service:
import { describeEdgeServices } from "@fatagnus/dink-convex";
import { action } from "./_generated/server";
import { v } from "convex/values";
export const getServiceInfo = action({
args: { serviceName: v.string() },
handler: async (ctx, args) => {
const descriptor = await describeEdgeServices(ctx, args.serviceName);
return {
name: descriptor.name,
version: descriptor.version,
description: descriptor.description,
methods: descriptor.methods?.map(m => ({
name: m.name,
description: m.description,
})),
};
},
});getLLMContext
Returns human/AI-readable documentation for services in llm.txt format:
import { getLLMContext } from "@fatagnus/dink-convex";
import { action } from "./_generated/server";
import { v } from "convex/values";
// Get context for all services
export const getAllServiceDocs = action({
handler: async (ctx) => {
return await getLLMContext(ctx);
},
});
// Get context for specific services
export const getServiceDocs = action({
args: { services: v.array(v.string()) },
handler: async (ctx, args) => {
return await getLLMContext(ctx, args.services);
},
});Both functions require DINK_URL and DINK_APP_SYNC_KEY environment variables to be configured.
Troubleshooting
"DINK_APP_SYNC_KEY environment variable is not configured"
Set the environment variable in your Convex dashboard under Settings > Environment Variables.
Sync not working
- Check that dinkd gateway is running and accessible
- Verify
DINK_HTTP_URLpoints to the correct URL - Check Convex logs for errors in the outbox processor
Type errors with _generated imports
Make sure you've run npx convex dev to generate the types after updating your schema.
syncedTable not creating index
Ensure you're using syncedTable() from @fatagnus/dink-convex, not defineTable() from convex/server.
Replicate Integration
@fatagnus/dink-convex can work alongside @trestleinc/replicate, allowing both sync systems to share the same Convex tables. When Replicate is registered, deltas are automatically forwarded to Replicate clients.
Registering Replicate Component
// convex/convex.config.ts
import { defineApp } from "convex/server";
import dink from "@fatagnus/dink-convex/component";
import replicate from "@trestleinc/replicate/convex.config";
const app = defineApp();
app.use(dink);
app.use(replicate);
export default app;// In your sync setup file (e.g., convex/sync.ts)
import { registerReplicateComponent } from "@fatagnus/dink-convex";
import { components } from "./_generated/api";
// Register Replicate component for delta forwarding
// Call this once before any sync operations
registerReplicateComponent(components.replicate);Replicate Bridge API
registerReplicateComponent(component)- Register a Replicate component for automatic delta forwardinghasReplicateComponent()- Returns true if a Replicate component is registeredgetReplicateComponent()- Returns the registered Replicate component (or null)forwardToReplicate(ctx, deltaData)- Manually forward a delta to Replicate (usually automatic)
Once registered, all deltas from triggers and applyDeltaFromEdge are automatically forwarded to Replicate. If Replicate is not registered, these functions are no-ops.
BREAKING CHANGES (v2.0.0)
Schema Changes
The syncedTable() helper now uses a Replicate-compatible schema pattern:
Before (v1.x):
// v1.x: syncedTable added syncId field and by_syncId index
tasks: syncedTable({ title: v.string() })
// v1.x mutations used syncId:
ctx.db.insert("tasks", { title: "...", syncId: crypto.randomUUID() });After (v2.0.0):
// syncedTable now adds: id, timestamp, by_doc_id, by_timestamp indexes
tasks: syncedTable({ title: v.string() })
// In mutations:
ctx.db.insert("tasks", { id: crypto.randomUUID(), title: "...", timestamp: Date.now() });Migration Steps
- Update all
syncedTableusages - no code changes needed (automatic field injection) - Update mutations to use
idinstead ofsyncIdfield - Update mutations to include
timestamp: Date.now()on insert/update - Update any queries using
by_syncIdindex to useby_doc_id - If you have existing data, run a migration to rename
syncId→idand addtimestamp
Removed APIs
SyncedTableConfig.syncIdField- No longer configurable, always usesid
License
Apache-2.0
