@okrlinkhub/agent-factory
v1.0.11
Published
A agent factory component for Convex.
Readme
Convex Agent Factory
A Convex component for hydration-based orchestration of OpenClaw agents on a generic worker pool (Fly Machines first, provider abstraction built-in).
Installation
Create a convex.config.ts file in your app's convex/ folder and install the
component by calling use:
// convex/convex.config.ts
import { defineApp } from "convex/server";
import agentFactory from "@okrlinkhub/agent-factory/convex.config.js";
const app = defineApp();
app.use(agentFactory);
export default app;Upgrade to 1.0.0
Version 1.0.0 introduces a worker lifecycle breaking change.
What changed:
workers.statusis no longer binary.- New persisted statuses are now possible:
drainingandstopping. - The lifecycle is now
active -> draining -> stopping -> stopped. activenow means claimable, not just "row exists and machine once existed".
Current status values:
active: worker is healthy and can claim new jobs.draining: worker must stop claiming and is waiting for final snapshot / shutdown progression.stopping: final snapshot is ready or provider teardown is in progress / pending retry.stopped: terminal state for that worker instance. Stopped workers are never reactivated.
Important compatibility notes:
- No manual data migration is required if your existing rows only contain
activeorstopped. - Consumer code may require updates if it assumes
worker.statuscan only beactiveorstopped. - Any exhaustive
switch/iflogic, dashboards, alerts, or admin tools that parse worker status must handledrainingandstopping. workerControlStateis stricter now: workers in non-claimable states, stale-heartbeat workers, and overdue workers returnshouldStop = true.
Recommended upgrade checklist:
- Upgrade the package to
1.0.0. - Regenerate Convex bindings in the consumer app.
- Update any consumer-side status handling for
workers.status. - Ensure a periodic reconcile fallback cron exists in your Convex app.
- Redeploy the worker runtime so it can react correctly to the stricter control-state semantics.
Reference example for the recommended reconcile fallback:
import { cronJobs } from "convex/server";
import { api } from "./_generated/api";
const crons = cronJobs();
crons.interval(
"agent-factory reconcile workers fallback",
{ minutes: 5 },
api.example.startWorkers,
{},
);
export default crons;Usage
User-facing agent APIs
Starting with this release, the component also exposes an additive set of user-facing aggregate APIs for building pages like MyAgent and MyAgentNew without reconstructing state in the consumer app.
What stays in the consumer app:
- naming policy for agents and Telegram usernames
- product-specific onboarding copy
- cron presets or local
agentSettings
What is now exposed directly by the component:
- user agent overview and active/history lookup
- onboarding and pairing state
- conversation view and queue items for a user agent
- agent-scoped push jobs and aggregate usage stats
- user-centric snapshot listing and latest snapshot lookup
Core APIs added for this pattern:
listUserAgentsgetUserAgentgetActiveUserAgentgetUserAgentsOverviewcreateUserAgentPairinggetUserAgentPairingStatusimportTelegramTokenForAgentgetUserAgentOnboardingStategetConversationViewForUserAgentlistQueueItemsForUserAgentsendMessageToUserAgentlistPushJobsForAgentlistPushDispatchesForAgentgetUserAgentUsageStatslistSnapshotsForUserAgentgetLatestSnapshotForUserAgent
Minimal consumer example:
import { query, mutation } from "./_generated/server";
import { components } from "./_generated/api";
import { v } from "convex/values";
export const getMyAgentOverview = query({
args: { consumerUserId: v.string() },
handler: async (ctx, args) => {
return await ctx.runQuery(components.agentFactory.lib.getUserAgentsOverview, {
consumerUserId: args.consumerUserId,
});
},
});
export const sendMessageToMyAgent = mutation({
args: {
consumerUserId: v.string(),
agentKey: v.string(),
content: v.string(),
},
handler: async (ctx, args) => {
return await ctx.runMutation(components.agentFactory.lib.sendMessageToUserAgent, {
consumerUserId: args.consumerUserId,
agentKey: args.agentKey,
content: args.content,
});
},
});The example consumer in example/convex/example.ts re-exports these APIs through exposeApi(...) and includes lightweight wrappers you can adapt.
First required setup: mandatory secrets for every worker/agent
Before running worker autoscaling (enqueue trigger, cron, or manual reconcile), you must store both secrets in the component secret store:
convex.urlfly.apiToken
Every spawned worker/agent needs these values at runtime. Manual "Start Workers" can work when you pass values inline from the UI, but automatic paths (enqueue + cron) rely on these stored secrets.
If one is missing, reconcile fails with errors like:
Missing Convex URL. Import an active 'convex.url' secret or pass convexUrl explicitly.Missing Fly API token. Import an active 'fly.apiToken' secret or pass flyApiToken explicitly.
Set them once:
npx convex run example:importSecret '{
"secretRef": "convex.url",
"plaintextValue": "https://<your-convex-deployment>.convex.site"
}'
npx convex run example:importSecret '{
"secretRef": "fly.apiToken",
"plaintextValue": "fly_XXXXXXXXXXXXXXXX"
}'Important URL mapping:
- Fly worker environment variable
CONVEX_URLmust use the.convex.cloudURL. - Component secret
convex.urlmust use the.convex.siteURL (used by component workflows and webhook-facing integration paths).
Verify status:
npx convex run example:secretStatus '{
"secretRefs": [
"convex.url",
"fly.apiToken",
"telegram.botToken",
"agent-bridge.serviceKey.default"
]
}'In the example UI (example/src/App.tsx), this is shown as step
0) Mandatory: configure convex.url secret; make sure fly.apiToken is also imported
as an active component secret.
import { components } from "./_generated/api";
import { mutation } from "./_generated/server";
import { v } from "convex/values";
export const enqueueTelegramMessage = mutation({
args: { text: v.string(), chatId: v.string() },
handler: async (ctx, args) => {
return await ctx.runMutation(components.agentFactory.lib.enqueue, {
conversationId: `telegram:${args.chatId}`,
agentKey: "default",
payload: {
provider: "telegram",
providerUserId: args.chatId,
messageText: args.text,
},
});
},
});After enqueue, a queue processor runtime must process the queue by calling:
components.agentFactory.lib.claimcomponents.agentFactory.lib.getHydrationBundlecomponents.agentFactory.lib.heartbeatcomponents.agentFactory.lib.completeorcomponents.agentFactory.lib.fail
Worker autoscaling reconcile now follows a hybrid model:
enqueueschedules an immediate async reconcile trigger (runAfter(0, ...))- a periodic cron fallback is still recommended to recover from missed triggers
- desired worker count is conversation-aware, so multiple queued messages on the same
conversationIddo not over-scale worker spawn
In this project setup, the queue processor runtime is Fly worker-only (not the consumer webhook app). The consumer app receives ingress and enqueues, while Fly workers dequeue and execute jobs. The worker should consume tenant-specific tokens from the hydration payload (resolved by the component), not from global Fly env vars.
Cron fallback every 5 minutes
In your Convex app, add a cron fallback for reconcile:
import { cronJobs } from "convex/server";
import { api } from "./_generated/api";
const crons = cronJobs();
crons.interval(
"agent-factory reconcile workers fallback",
{ minutes: 5 },
api.example.startWorkers,
{},
);
export default crons;This cron is a safety net. The primary path remains enqueue-triggered reconcile.
Agent pushing schedule (hourly dispatcher)
For agent pushing, the recommended scheduler is an hourly cron that dispatches due jobs:
import { cronJobs } from "convex/server";
import { api } from "./_generated/api";
const crons = cronJobs();
crons.cron(
"agent-factory push dispatch hourly",
"0 * * * *",
api.example.dispatchDuePushJobs,
{},
);
export default crons;Important product constraint:
- job configuration supports only fixed schedule slots (
HH:mm, plus weekday/day-of-month) - minute-based recurrence ("every N minutes") is intentionally not supported
Admin broadcast is also supported through sendBroadcastToAllActiveAgents, which enqueues one message per active target and records a dispatch audit.
LLM configuration (Fly env)
The model/provider is controlled by Fly worker environment variables (for example OPENCLAW_AGENT_MODEL, MOONSHOT_API_KEY, OPENAI_API_KEY) and applied at runtime by the worker image bootstrap.
Why:
- keeps model routing as infrastructure/runtime concern
- avoids per-agent schema coupling to a specific LLM field
- lets you switch model/provider with a Fly deploy or env change only
Practical notes:
- set model/provider env on the Fly app (
fly secrets set/[env]infly.toml) - keep
agentProfilesfocused on identity, bridge configuration, and secrets references - worker image tag stays centralized in
src/component/config.ts(DEFAULT_WORKER_IMAGE)
If you use exposeApi(...), the worker contract is available directly on the consumer API surface:
workerClaimworkerHydrationBundleworkerHeartbeatworkerCompleteworkerFail
agent-bridge: config and secrets for OpenClaw workers
agent-factory does not execute agent-bridge tools.
Its role stops at:
- storing bridge settings on the agent profile
- resolving bridge secrets from the component secret store
- exposing
bridgeRuntimeConfigin hydration - forwarding bridge-related env vars to spawned OpenClaw workers
Tool execution belongs to the OpenClaw worker runtime / worker image, not to agent-factory.
- Configure an agent profile with bridge settings:
await ctx.runMutation(components.agentFactory.lib.configureAgent, {
agentKey: "default",
version: "1.0.0",
secretsRef: [],
bridgeConfig: {
enabled: true,
baseUrl: "https://<your-consumer>.convex.site",
serviceId: "openclaw-prod",
appKey: "crm",
},
enabled: true,
});- Import bridge service key in component secrets:
npx convex run example:importSecret '{
"secretRef": "agent-bridge.serviceKey.default",
"plaintextValue": "abs_live_XXXXXXXXXXXXXXXX"
}'Naming convention supported by hydration resolver:
- per-agent service key:
agent-bridge.serviceKey.<agentKey>(recommended) - global service key fallback:
agent-bridge.serviceKey - optional profile override:
bridgeConfig.serviceKeySecretRef - per-agent base URL map JSON (for strict
execute-on-behalfskills):agent-bridge.baseUrlMapJson.<agentKey> - global base URL map JSON fallback:
agent-bridge.baseUrlMapJson - optional per-agent/global overrides for
baseUrl,serviceId,appKeyvia:agent-bridge.baseUrl.<agentKey>/agent-bridge.baseUrlagent-bridge.serviceId.<agentKey>/agent-bridge.serviceIdagent-bridge.appKey.<agentKey>/agent-bridge.appKey
Example value for agent-bridge.baseUrlMapJson.<agentKey>:
{"linkhub-w4":"https://www.okrlink.app","amc":"https://amc-primogroup.convex.site"}This is still stored as a normal component secret ref (same naming convention as other
bridge secrets). The secret value is the JSON map expected by strict agent-bridge
skills (APP_BASE_URL_MAP_JSON).
Hydration includes bridgeRuntimeConfig for the worker loop.
Do not treat agent-factory as the place where bridge.<functionKey> tool calls are executed.
If your OpenClaw agents use agent-bridge, that execution flow must live in the worker runtime itself.
Fallback env (worker-side only, used when hydration misses values):
OPENCLAW_AGENT_BRIDGE_BASE_URLorAGENT_BRIDGE_BASE_URLOPENCLAW_SERVICE_IDorAGENT_BRIDGE_SERVICE_IDOPENCLAW_SERVICE_KEYorAGENT_BRIDGE_SERVICE_KEYOPENCLAW_AGENT_APP/OPENCLAW_APP_KEY/AGENT_BRIDGE_APP_KEY
Required Fly.io / component secrets for agent-bridge
When agent-factory is used together with agent-bridge, spawned workers may need these environment variables available in their runtime:
| Env var | Component secret ref | Purpose |
|---------|----------------------|---------|
| OPENCLAW_SERVICE_ID | agent-bridge.serviceId | Service identity for bridge auth |
| OPENCLAW_SERVICE_KEY | agent-bridge.serviceKey | Service key for bridge auth |
| OPENCLAW_LINKING_SHARED_SECRET | agent-bridge.linkingSharedSecret | Shared secret for execute-on-behalf user linking |
The scheduler forwards these from the component secret store into each machine's env at spawn time. These values prepare the worker runtime for bridge usage; they do not implement bridge tool execution inside agent-factory.
Import all three into the component secret store:
npx convex run example:importSecret '{"secretRef": "agent-bridge.serviceId", "plaintextValue": "<your-service-id>"}'
npx convex run example:importSecret '{"secretRef": "agent-bridge.serviceKey", "plaintextValue": "<your-service-key>"}'
npx convex run example:importSecret '{"secretRef": "agent-bridge.linkingSharedSecret", "plaintextValue": "<your-linking-secret>"}'Alternatively, set OPENCLAW_SERVICE_ID, OPENCLAW_SERVICE_KEY, and OPENCLAW_LINKING_SHARED_SECRET directly in Fly app env/secrets (fly secrets set or fly.toml [env]). Component secrets take precedence when the scheduler spawns machines.
HTTP Routes
You can mount an ingress webhook route in your app:
import { httpRouter } from "convex/server";
import { registerRoutes } from "@okrlinkhub/agent-factory";
import { components } from "./_generated/api";
const http = httpRouter();
registerRoutes(http, components.agentFactory, {
pathPrefix: "/agent-factory",
});
export default http;This exposes:
POST /agent-factory/telegram/webhook-> enqueue-only (no business processing)
Important: the webhook/router only receives ingress and enqueues. Do not point Telegram directly to Fly worker machines. Use webhook -> consumer app (Next.js/Vercel) -> Convex queue -> Fly workers (pull-based processing).
One-time Telegram pairing and internal user mapping
The component can keep the user-to-agent mapping internally through identityBindings.
You can bind your consumer user id directly to an agentKey without managing a custom
table in the consumer app.
Mandatory prerequisite: configure Telegram webhook first
Before creating pairing codes, configure and verify Telegram webhook against your consumer ingress route.
Use the exposed API:
await configureTelegramWebhook({
convexSiteUrl: "https://<your-deployment>.convex.site",
secretRef: "telegram.botToken.default", // optional, default shown
});This API:
- loads bot token from component secrets (active secret for
secretRef) - calls Telegram
setWebhook - verifies status with
getWebhookInfo - returns
isReadyso your UI can gate the pairing flow
If isReady is false, do not proceed with pairing.
Typical one-time pairing flow:
- Configure webhook and verify
isReady === trueviaconfigureTelegramWebhook. - Your app authenticates the user and creates a one-time pairing code via
createPairingCode. - User opens Telegram deep-link (
/start <pairingCode>). registerRoutes(...)webhook consumes the pairing code and performsbindUserAgentautomatically withsource: "telegram_pairing"and Telegram ids from the update.- Webhook ingress then resolves the binding internally and enqueues with the mapped
agentKey.
Available pairing APIs (via exposeApi(...)):
createPairingCodegetPairingCodeStatusconfigureTelegramWebhook
Telegram token storage (multi-tenant):
- store tenant token in component secrets with an agent-scoped ref (for example
telegram.botToken.<agentKey>) - include that ref in
agentProfiles.secretsRef - worker gets resolved plaintext from hydration bundle (
telegramBotToken) at runtime - do not use a single global
TELEGRAM_BOT_TOKENon Fly app
registerRoutes(...) supports this behavior with:
resolveAgentKeyFromBinding(defaulttrue)fallbackAgentKey(default"default")requireBindingForTelegram(defaultfalse, whentruerejects unbound users)
Special handling for /start:
/start <pairingCode>attempts pairing consumption and does not enqueue the command.- invalid
/startpayload returns200with pairing error details to avoid Telegram retries.
Architecture
flowchart LR
telegramWebhook[TelegramWebhook] --> appRouter[Consumer Router NextOrVite]
appRouter --> enqueueMutation[ConvexEnqueueMutation]
enqueueMutation --> messageQueue[ConvexMessageQueue]
messageQueue --> claimLoop[FlyWorkerProcessingLoop]
claimLoop --> hydrateStep[HydrateFromConvex]
hydrateStep --> runEngine[OpenClawEngineExecution]
runEngine --> telegramSend[TelegramDirectReply]
claimLoop --> heartbeatLease[HeartbeatAndLeaseRenewal]
heartbeatLease --> cleanupTask[PeriodicLeaseCleanup]
cleanupTask --> messageQueue
schedulerNode[ConvexSchedulerAndAutoscaler] --> flyProvider[FlyMachinesProvider]
flyProvider --> flyWorkers[FlyWorkerMachines]
flyWorkers --> claimLoopData model
Core tables:
agentProfilesconversationsmessageQueueworkerssecrets
Hydration/runtime tables:
conversationHydrationCachedataSnapshots
Recent updates
1.0.0: worker lifecycle is now explicit and stateful withactive,draining,stopping,stopped.1.0.0: scheduler reconcile now uses provider-observed machine state and no longer treats everyactiverow as reusable capacity.1.0.0: stuckprocessingjobs are recovered more aggressively, including inconsistent rows missing valid lease metadata.1.0.0: idle workers withoutscheduledShutdownAtare backfilled automatically during reconcile/watchdog flows.idleTimeoutMsaligned to 30 minutes andworkers.scheduledShutdownAtnow tracks idle lifecycle fromlastClaimAt.- Pre-stop drain protocol added: worker snapshots
/databefore termination and uploads archive metadata intodataSnapshots. - Restore on boot added: new workers can rehydrate from latest snapshot archive.
- Hydration improved with
conversationHydrationCachedelta usage. agentSkillsandskillAssetsremoved from schema: skills must be baked into the OpenClaw worker image.- Worker control/snapshot APIs exposed for runtime loop (
workerControlState, snapshot upload/finalize/fail, restore lookup).
OpenClaw workspace persistence
| OpenClaw source | Persistence layer |
|---|---|
| AGENTS.md, SOUL.md, USER.md, IDENTITY.md, HEARTBEAT.md, TOOLS.md | worker filesystem backup (/data/workspace) |
| memory/YYYY-MM-DD.md, MEMORY.md | worker filesystem backup (/data/workspace) |
| Skills and related assets | bundled directly in worker image (openclaw-okr-image) |
| Conversation-specific deltas | conversationHydrationCache |
Failure model
- Worker crash during processing does not lose data.
- Each claimed job has a lease (
leaseId,leaseExpiresAt) and heartbeat. - Cleanup job requeues expired
processingjobs and unlocks conversations. - Retry uses exponential backoff with dead-letter fallback.
- Reconcile now also recovers malformed
processingrows that are missing lease metadata.
Config-first
src/component/config.ts defines type-safe policies:
- queue policy
- retry policy
- lease policy
- scaling policy
- provider config
Fly.io provider notes
The current provider implementation uses Fly Machines API endpoints for:
- create machine
- list machines
- cordon machine
- terminate machine
Isolation rule: one Fly app per Convex deployment
Do not share the same Fly app across multiple Convex backends/components that run their own queue polling/reconcile loop.
Why this is required:
- workers in a Fly app share the same control plane (create/list/stop),
- each backend computes desired capacity from its own queue state only,
- mixed backends in one app can stop each other's machines or produce unpredictable polling behavior.
Recommended pattern:
- one Convex backend -> one dedicated Fly app (for example
agent-factory-workers-prod) - another Convex backend -> another dedicated Fly app (for example
agent-factory-workers-staging) - keep
providerConfig.appNameand worker image registry aligned per backend/environment.
Worker image setup (required first step for custom skills)
Any new skill you want inside OpenClaw agents must be added to the worker image source repo:
- https://github.com/okrlinkhub/openclaw-okr-image
Fork this repository to maintain your own image with your custom skills/assets.
For globalSkills managed by this component, the recommended runtime pattern is different:
- store the source of truth in component tables
globalSkills,globalSkillVersions,globalSkillReleases - expose them through
getWorkerGlobalSkillsManifest - let the worker image materialize them into
OPENCLAW_SKILLS_DIRduring prestart, before the OpenClaw gateway boots
The manifest now carries an explicit on-disk layout contract for OpenClaw workspace skills:
layoutVersion = openclaw-workspace-skill-v1skillDirNamefiles[]withpath,content,sha256
Recommended worker bootstrap order:
- restore snapshot into
/data - fetch
workerGlobalSkillsManifest - verify checksums and materialize skills atomically into
OPENCLAW_SKILLS_DIR - start the OpenClaw gateway only after skills are ready
This avoids the historical race where the gateway could start before restored or DB-backed skills were present on disk.
First required flow:
- Take the image repo (fork/clone your own
openclaw-okr-image). - Build and deploy it on your own Fly app.
- Recommended build mode: remote Fly builder,
depotdisabled,--remote-only.
- Recommended build mode: remote Fly builder,
- Use the published image as reference in
src/component/config.ts(DEFAULT_WORKER_IMAGEis the source of truth). - Repeat the same process for every runtime/skills update.
Enterprise security model: The worker image enforces a security policy where only skills explicitly included by the image maintainer are installed by default. Any other skills that may be present in the workspace are automatically removed on each worker startup. This ensures that only approved, vetted skills from the image source can execute within your OpenClaw agents.
Worker image update procedure
When you update the worker runtime (for example in openclaw-okr-image/worker.mjs), use this flow to publish and roll out safely.
- Deploy with remote Fly builder (explicitly disabling Depot):
cd /path/to/openclaw-okr-image
fly deploy --remote-only --depot=false --yes- If deployment fails with
CONVEX_URL not set, set the secret and retry:
fly secrets set CONVEX_URL="https://<your-convex-deployment>.convex.cloud" -a <your-fly-worker-app>- Capture the new image tag from deploy output (for example
registry.fly.io/<your-fly-worker-app>:deployment-XXXXXXXXXXXX), then updatesrc/component/config.tsin this repo:
export const DEFAULT_WORKER_IMAGE =
"registry.fly.io/<your-fly-worker-app>:deployment-XXXXXXXXXXXX";- Verify rollout:
fly status -a <your-fly-worker-app>
fly logs -a <your-fly-worker-app> --no-tail- (Recommended) Commit the
DEFAULT_WORKER_IMAGEupdate so scheduler-driven spawns use the exact image that was just deployed.
Recommended runtime split:
- Consumer app (Next.js/Vercel): webhook ingress + enqueue only
- Fly worker app: claim/heartbeat/complete/fail loop
Anti-pattern to avoid:
- Telegram webhook -> Fly worker HTTP endpoint
- Reason: workers are batch processors, may be scaled to zero, and should not be used as public ingress.
- Global Fly env
TELEGRAM_BOT_TOKENfor all tenants - Reason: breaks multi-tenant isolation and forces shared bot credentials.
References:
- https://docs.machines.dev/
- https://fly.io/docs/machines/api/machines-resource/
- https://docs.convex.dev/components/authoring
Development
npm i
npm run devUpgrade note for older releases: version 0.2.14 makes agentProfiles.providerUserId, agentProfiles.soulMd, agentProfiles.clientMd, and agentProfiles.skills optional only to let you clean them safely. Before upgrading to version 0.2.15, where those fields are expected to be removed from the schema, install 0.2.14, run components.agentFactory.lib.clearDeprecatedAgentProfileFields from Convex Dashboard, and make sure a second run returns updated = 0. This avoids schema validation issues caused by leftover stored values during the upgrade to 0.2.15.
