@etohq/connector-engine
v2.0.1
Published
ETO Framework Core Connector Engine - Universal plugin system for external integrations
Readme
@etohq/connector-engine
Type-safe connector orchestration for Eto workflows.
Overview
@etohq/connector-engine provides:
- registers connector plugins with typed operations,
- auto-generates strongly typed execution methods,
- supports grouped execution with routing and fallback,
- composes directly with
@etohq/framework/workflows-sdk. - includes a strict runtime API with middleware and schema validation.
Core Concepts
ConnectorPlugin: plugin contract (getName,getVersion,operations).operations: map of async handlers(input, config) => output.ConnectorEngine(connectors, groups?, config?): engine factory.configLoader: runtime connector config source.routingConfigLoader: optional routing source for grouped execution.
Install
pnpm add @etohq/connector-enginePlugin Contract
import {
AbstractConnectorPlugin,
type OperationType,
} from "@etohq/connector-engine"
type PaymentOps = {
createCustomer: {
input: { email: string; entity_id: string }
output: { customer_id: string; psp_customer_id: string }
}
createVirtualAccount: {
input: { customer_id: string; currency: string }
output: { account_id: string; account_number: string }
}
}
class PaystackPlugin extends AbstractConnectorPlugin<PaymentOps> {
getName() {
return "payment"
}
getVersion() {
return "1.0.0"
}
operations = {
createCustomer: async (input, config) => {
return {
customer_id: "cust_123",
psp_customer_id: "CUS_abc123",
}
},
createVirtualAccount: async (input, config) => {
return {
account_id: "va_123",
account_number: "1234567890",
}
},
}
}Create Engine (Workflow-First API)
import { ConnectorEngine } from "@etohq/connector-engine"
const engine = ConnectorEngine(
{
paystack: new PaystackPlugin(),
},
{
payment: {
connectors: ["paystack"],
operations: ["createCustomer", "createVirtualAccount"],
options: {
behavior: "route_by_domain",
},
},
},
{
configLoader: async (connectorId) => {
// Fetch connector secrets/settings from DB or vault.
return { provider: connectorId }
},
routingConfigLoader: async (groupId) => {
// Optional dynamic routing profile from DB.
return {
profile_id: "default",
name: "default",
algorithm: { type: "priority", data: [] },
created_at: new Date().toISOString(),
modified_at: new Date().toISOString(),
version: 1,
}
},
}
)Execute Operations
Per Connector
Engine auto-generates methods from connector id + operation name:
executePaystackCreateCustomerexecutePaystackCreateVirtualAccount
const wf = engine.executePaystackCreateCustomer()
const { result } = await wf.run({
input: { entity_id: "ent_1", email: "[email protected]" },
})Per Group/Domain
Engine auto-generates methods from domain (plugin.getName()) + operation:
executePaymentCreateCustomerexecutePaymentCreateVirtualAccount
const wf = engine.executePaymentCreateCustomer(
{ entity_id: "ent_1", email: "[email protected]" },
{ country: "NG" }
)
const { result } = await wf.run({
input: { entity_id: "ent_1", email: "[email protected]" },
})Group Behaviors
options.behavior supports:
route_by_domain: use routing config + fallback connectors.execute_all: execute all connectors in the group.first_success: try connectors until first success.
Integration in Eto Workflows
Use generated connector workflows inside regular Eto workflows:
import { createWorkflow, WorkflowResponse } from "@etohq/framework/workflows-sdk"
export const createCustomerWorkflow = createWorkflow(
"create-customer",
(input) => {
const customer = engine.executePaymentCreateCustomer(
{ entity_id: input.entity_id, email: input.email },
{ country: input.country }
)(input)
return new WorkflowResponse(customer)
}
)Runtime Notes
- The engine itself has no built-in database.
- DB-backed behavior is provided by your
configLoaderandroutingConfigLoader. - Keep provider secrets outside code; load at runtime.
Runtime API (Single Primitive)
Use createRuntimeConnectorEngine when you want one canonical primitive:
execute({ domain, operation, input, ... }).
import {
createRuntimeConnectorEngine,
type ConnectorExecuteRequest,
} from "@etohq/connector-engine"
const runtime = createRuntimeConnectorEngine({
configLoader: async (connectorId) => ({ provider: connectorId }),
})
runtime.register("paystack", new PaystackPlugin())
const result = await runtime.execute({
domain: "payment",
operation: "createCustomer",
input: { entity_id: "ent_1", email: "[email protected]" },
})Schema Protocol (Any Schema Library)
execute accepts inputSchema and outputSchema using a schema protocol:
parse(input) => value, orsafeParse(input) => { success, data | error }
const result = await runtime.execute({
domain: "payment",
operation: "createCustomer",
input: payload,
inputSchema: customerInputSchema, // zod/valibot/custom adapter
outputSchema: customerOutputSchema,
})When schemas are provided, execute infers input/output types from them automatically.
You can also pre-register schemas per connector operation:
runtime.registerOperationSchemas("paystack", "createCustomer", {
inputSchema: customerInputSchema,
outputSchema: customerOutputSchema,
})Middleware
runtime.use({
name: "trace",
run: async (req, next) => {
const started = Date.now()
const result = await next()
console.log(req.operation, Date.now() - started)
return result
},
})Troubleshooting
- Missing generated method: check connector id/domain name and operation key spelling.
- Operation not found on connector:
ensure plugin exposes it under
operations. - Group routing errors:
validate
routingConfigLoaderoutput shape and connector ids.
