@valence-js/core
v0.3.0
Published
Type-Type-safe, resilient, and composable pipeline orchestration for modern Node.js.
Readme
📦 Installation
yarn add @valence-js/core
# or
npm install @valence-js/core
# or
pnpm add @valence-js/core🧠 Architecture Overview
Valence models your business logic as a series of Steps. The output of Step N is statically enforced by TypeScript to the input of Step N+1.
graph TD
A([Initial Input]) --> B(Step 1: Format Data)
B -->|Transformed Data| C{Parallel Execution}
C-->|Input| D[Step 2A: Fetch User DB]
C-->|Input| E[Step 2B: Fetch Preferences]
C-->|Input| F[Step 2C: Calculate Score]
D -.->|Circuit Breaker| D
D-->|Tuple 0| G(Step 3: Aggregate Results)
E-->|Tuple 1| G
F-->|Tuple 2| G
G--> H([Final Output])
classDef success fill:#10B981,stroke:#047857,stroke-width:2px,color:white;
classDef parallel fill:#3B82F6,stroke:#1D4ED8,stroke-width:2px,color:white;
classDef input fill:#6B7280,stroke:#374151,stroke-width:2px,color:white;
class A,H input;
class B,D,F,G success;
class C parallel;🚀 Quick Start
Build a robust, parallelized,and type-safe onbarding pipeline in seconds:
import { Pipeline, Step } from '@valence-js/core'
// 1. Instantiate the Pipeline defining Initial Input and Final Output Types
const onboarding = new Pipeline<string, string>('UserOnboarding')
// 2. Add sequential steps
.addStep('Sanitize ID', (id) => id.trim().toLowerCase())
// 3. Add parallel steps (Fan-out). Output is heavily typed as a Tuple!
.addParallel('Fetch User Data', [
// This step will retry up to 2 times if it fails
new Step('Get Billing', async (id) => api.stripe.getConsumer(id), {
circuitBreaker: { failureThreshold: 3, resetTimeoutMs: 10000 }
})
])
// 4. Aggregate the results (Fan-in)
.addStep('Aggregate', ([profile, billing]) => {
// TypeScript known `profile` and `billing`exact types!
return `User ${profile.name} is on the ${billing.plan} plan.`;
});
// 5. Run the engine
try {
const result = await onboarding.run(' USR_123 ');
console.log(result) // "User Alice is on the Pro plan."
} catch (error) {
console.error('Pipeline failed:', error);
}🛡️ Enterprise-Grade Resilience
1. Auto-Retries
Transient network failures are a reality. Valence handles them gracefully.
pipeline.addStep('Flaky API', fetchExternalData, { maxRetries: 3 })2. Circuit Breakers (Fail-Fast)
Prevent cascading failures and resource exhaustion when a third-party service goes down.
pipeline.addStep('Payment Gateway', processPayment, {
circuitBreaker: {
failureThreshold: 5, // Open circuit after 5 consecutive failures
resetTimeoutMs: 30000 // Wait 30s testing if the service is back (Half-Open)
}
})3. Matryoshka Error Tracing (Root Cause)
No more Error: Something went wrong. Valence preserves the entire execution context and standardizes errors using Node's native cause property.
import { PipelineExecutionError, StepExecutionError, CircuitBreakerOpenError } from '@valence-js/core'
try {
await pipeline.run(input);
} catch (error) {
if (error instanceof PipelineExecutionError) {
console.log(`Failed at step: ${error.failedStep}`); // "Payment Gateway"
const stepError = error.cause as StepExecutionError;
console.log(`Failed after ${stepError.attempts} attempts.`);
// Get the exact original error thrown by your function or the Circuit Breaker
console.log('Original cause:', stepError.cause);
}
}4. Observability & Telemetry (Edge-Compatible)
Valence ships with a built-in, lightweight, and strictly-typed Event Emitter. It doesn't rely on Node's native events module, making your pipelines 100% compatible with Edge environments (Cloudflare Workers, Vercel Edge, etc.).
Hook into the lifecycle to integrate with Datadog, Sentry or custom loggers without blocking the main execution thread.
import { Pipeline } from '@valence-js/core'
const pipeline = new Pipeline<string, string>('ProcessOrder')
.on('pipeline:started', ({ pipelineName: string }) => {
console.log(`[START] Pipeline ${pipelineName} initiated.`);
})
.on('step:success', ({ stepName, durationMs }) => {
// Send metrics to your observability platform
datadog.timing(`pipeline.step.duration`, durationMs, [`step:${stepName}`]);
})
.on('step:failure', ({ stepName, error, willRetry }) => {
if (willRetry) {
console.warn(`[WARN] Step ${stepName} failed, retrying... (${error.message})`);
} else {
sentry.captureException(error, { tags: { step: stepName } });
}
})
// Add your steps...
.addStep('Validate', validateOrder)
.addStep('Charge', chargeCard)🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details on our Code of Conduct, Branching Strategy, and Development Workflow.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes following Conventional Commits (
git commit -m 'feat: add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
