@conectte/automations
v0.2.1
Published
Official Node SDK for writing Conectte automations. Handles execution lifecycle so your code only deals with context and outputs.
Downloads
520
Maintainers
Readme
@conectte/automations
Official Node SDK for writing Conectte automations.
The SDK hides the execution lifecycle (start/complete/fail) so your code only deals with the two domain concepts you care about: inputs you read and outputs you write.
Install
npm install @conectte/automationsRequires Node 18+.
Usage
import { runAutomation } from '@conectte/automations'
runAutomation(async ({ inputs, outputs, log }) => {
// Download input — SDK fetches the presigned URL for you
const csv = await inputs.get('source').download()
// Your business logic
const result = processCSV(csv)
// Upload output — SDK does the PUT + registers the file automatically
await outputs.get('result').upload(result, {
filename: 'report.csv',
mimeType: 'text/csv',
})
log.info('Done')
})runAutomation does the following automatically:
- Reads runtime config from env vars.
- POSTs
/startto signal execution begin. - Fetches the execution context (presigned input/output URLs).
- Invokes your handler with typed
inputs,outputs,log, andexecutionId. - On success → POSTs
/complete. On error → POSTs/failwith code and message. - Traps
SIGTERM/SIGINTand POSTs/failwitherrorCode = 'TERMINATED'. - Retries backend calls automatically on transient errors (3 attempts, exponential backoff).
Input & output handles
Reading inputs
const buffer = await inputs.get('source').download() // → Buffer
const text = await inputs.get('notes').downloadText() // → string
const json = await inputs.get('config').downloadJSON() // → unknown (cast as needed)
const stream = await inputs.get('video').downloadStream() // → ReadableStreamReading tabular files (CSV, TSV, TXT, XLSX, Parquet)
streamTabular and streamTabularBatch stream tabular data row-by-row with constant memory usage, regardless of file size. Every row is Record<string, string> — all values are normalized to strings.
Supported formats: .csv, .tsv, .txt, .xlsx, .parquet
Format detection: automatic from file extension, MIME type, or magic bytes. Override with format.
streamTabular — async-iterable
const stream = await inputs.get('sales').streamTabular()
const headers = await stream.headers // ['id', 'amount', 'region']
for await (const row of stream) {
await db.insert(row) // row: Record<string, string>
}
const result = await stream.result
// { headers, totalRows, durationMs, format, filePath }streamTabularBatch — batched processing
await inputs.get('sales').streamTabularBatch({
batchSize: 500,
onBatch: async (rows) => {
await db.bulkInsert(rows)
},
})Options
const stream = await inputs.get('sales').streamTabular({
format: 'csv', // force format ('csv'|'txt'|'xlsx'|'parquet')
requiredHeaders: ['id', 'amount'], // throws TabularMissingHeadersError if absent
csv: { delimiter: ';' }, // CSV-specific: custom delimiter
xlsx: { sheet: 'Sheet2' }, // XLSX-specific: target sheet
parquet: { columns: ['id', 'amount'] }, // Parquet-specific: column projection
})Parquet — column projection
Column projection limits which columns are read from disk — useful for wide schemas:
const stream = await inputs.get('warehouse-export').streamTabular({
parquet: { columns: ['order_id', 'total', 'status'] },
})Parquet files with no extension are detected automatically via the PAR1 magic bytes.
Writing outputs
// Buffer / string / Uint8Array
await outputs.get('result').upload(buffer, { filename: 'report.csv', mimeType: 'text/csv' })
// Stream (sizeBytes required — S3 needs Content-Length upfront)
await outputs.get('video-out').uploadStream(readableStream, {
filename: 'output.mp4',
mimeType: 'video/mp4',
sizeBytes: fileSizeInBytes,
})inputs.get(alias) and outputs.get(alias) both expose a presignedUrl property for advanced use cases.
Required environment variables
| Variable | Purpose |
|---|---|
| DATAFLOW_API_BASE_URL | Backend base URL (e.g. https://api.conectte.com). |
| DATAFLOW_AUTOMATION_ID | The automation this execution belongs to. |
| DATAFLOW_EXECUTION_ID | The execution ID assigned by the backend. |
| DATAFLOW_RUN_TOKEN | Per-execution token used to authenticate lifecycle calls. |
In production (Docker/Fargate): these are injected automatically. You do nothing.
Local development (simulation mode)
Set CONECTTE_SIMULATE=true to run your automation locally without any backend or S3 calls. Inputs are read from a local directory; outputs are written to another.
# .env.local
CONECTTE_SIMULATE=true
CONECTTE_SIMULATE_INPUTS_DIR=./fixtures/inputs # default: ./conectte-fixtures/inputs
CONECTTE_SIMULATE_OUTPUTS_DIR=./out # default: ./conectte-fixtures/outputsPlace your input files as ./fixtures/inputs/<alias> or ./fixtures/inputs/<alias>.<ext>, then run:
dotenv -e .env.local -- tsx src/index.tsThe handler code is identical — no changes needed between local and production.
API reference
runAutomation(handler, options?)
runAutomation(
handler: (ctx: AutomationContext) => Promise<void> | void,
options?: {
config?: Partial<RuntimeConfig> // override any env var
fetch?: typeof fetch // custom fetch (e.g. for tests)
timeoutMs?: number // default: 30 000 ms
noExit?: boolean // suppress process.exit (tests)
}
): Promise<void>AutomationContext
type AutomationContext = {
readonly executionId: string
inputs: InputCollection // .get(alias) .all() .has(alias)
outputs: OutputCollection // .get(alias) .all() .has(alias)
log: { info, warn, error }
}Error classes
| Class | When thrown |
|---|---|
| MissingRuntimeConfigError | Required env var is absent |
| InvalidConfigError | apiBaseUrl is not a valid URL |
| ConecteApiError | Backend returned an HTTP error after all retries |
| InputDownloadError | Presigned GET request failed |
| OutputUploadError | Presigned PUT request or registration failed |
| UnknownTabularFormatError | Format cannot be detected and was not forced |
| TabularMissingHeadersError | requiredHeaders are not present in the file |
All extend ConectteSdkError which extends Error. Tabular errors are thrown before any rows are emitted.
License
MIT
