@marcschauer/orpc-effect
v0.2.0
Published
Integration of oRPC with Effect library
Maintainers
Readme
orpc-effect
Type-safe RPC with Effect - integrate oRPC with Effect for functional, composable, and type-safe remote procedure calls.
Overview
orpc-effect bridges oRPC and Effect, enabling you to:
- 🎯 Write RPC handlers using Effect's powerful abstractions
- 🌊 Stream data with Effect Streams instead of callbacks
- 🔒 Get type-safe error handling through Effect's error channel
- 🔌 Use Effect's dependency injection (Layers and Services)
- ⚡ Leverage Effect's concurrency and resource management
- 🚫 Handle cancellation with AbortSignal integration
Installation
pnpm add orpc-effect effect @orpc/client @orpc/serverPeer Dependencies:
effect^3.0.0
Usage
Working with Domain Models (Schema-Aware Wrappers)
orpc-effect provides schema-aware wrappers that automatically handle transformation between rich Effect Schema domain objects (with methods, branded types) and plain JSON data for oRPC transport.
Server-Side: effectProcedure
Use effectProcedure to work with domain objects while oRPC handles plain JSON:
import { Schema, Effect } from "effect";
import { os } from "@orpc/server";
import { effectProcedure, from, standard, brand } from "orpc-effect";
// Define rich domain models
const ProductId = Schema.String.pipe(brand("ProductId"));
class Product extends Schema.Class<Product>("Product")({
id: ProductId,
name: Schema.NonEmptyString,
price: Schema.Positive,
}) {
// Domain methods
get displayPrice() {
return `$${this.price.toFixed(2)}`;
}
applyDiscount(percent: number) {
return new Product({
...this,
price: this.price * (1 - percent / 100),
});
}
}
// Create schema-aware handler
export const getProduct = os
.input(standard(from(ProductId))) // oRPC validates plain string
.output(standard(from(Product))) // oRPC returns plain object
.handler(
effectProcedure({
input: ProductId, // Handler receives branded ProductId
output: Product, // Handler returns Product class instance
handler: (productId, context) =>
Effect.gen(function* () {
// Work with domain objects
const product = yield* ProductService.getById(productId);
return product.applyDiscount(10); // Use domain methods
})
})
);Key Features:
- Domain objects stay in your business logic
- Automatic encode/decode at boundaries
- Type-safe with branded types and class instances
- ParseError handling built-in
Client-Side: createEffectClient
Use createEffectClient to call RPCs with domain objects:
import { createORPCClient } from "@orpc/client";
import { createEffectClient } from "orpc-effect";
// Define schema contract (mirrors server)
const contract = {
product: {
getById: { input: ProductId, output: Product }
}
};
const rawClient = createORPCClient<typeof router>(link);
const runtime = await Effect.runPromise(Layer.toRuntime(Layer.empty));
const effectClient = createEffectClient(rawClient, contract, runtime);
// Use with domain objects
const productId = ProductId.make("prod_123");
const stream = effectClient.product.getById(productId);
// Receive Product class instance
const product = await Effect.runPromise(
stream.pipe(
Stream.runCollect,
Effect.map(chunk => Array.from(chunk)[0])
)
);
// Use domain methods
console.log(product.displayPrice); // "$89.99"
console.log(product.applyDiscount(20)); // New Product with 20% offSchema Helpers
import { from, to, brand, standard } from "orpc-effect";
import { Schema } from "effect";
class User extends Schema.Class<User>("User")({
id: Schema.String,
email: Schema.String,
}) {}
// Extract plain schema for oRPC
const userDTO = from(User); // Schema<{ id: string; email: string }>
// Extract domain schema
const userDomain = to(User); // Schema<User>
// Create branded type
const UserId = Schema.String.pipe(brand("UserId"));
const userId = UserId.make("user_123"); // string & Brand<"UserId">
// Use with standard() for oRPC
const orpcSchema = standard(from(User));Server-Side: Effect Handlers
Basic Handler
import { Effect } from "effect";
import { os } from "@orpc/server";
import { effectHandler } from "orpc-effect";
export const router = {
greet: os.handler(
effectHandler(({ input }: { input: { name: string } }) =>
Effect.succeed(`Hello, ${input.name}!`)
)
),
};Handler with Dependency Injection
import { Effect, Context, Layer } from "effect";
import { effectHandler } from "orpc-effect";
// Define a service
class DatabaseService extends Context.Tag("DatabaseService")<
DatabaseService,
{
readonly getUser: (id: number) => Effect.Effect<User, NotFoundError>;
}
>() {}
// Create the service implementation
const DatabaseServiceLive = Layer.succeed(DatabaseService, {
getUser: (id) =>
Effect.gen(function* () {
yield* Effect.sleep("100 millis"); // Simulate delay
if (id === 1) return { id: 1, name: "Alice" };
return yield* Effect.fail(new NotFoundError({ id }));
})
});
// Use in handler
const handler = effectHandler(
({ input }) => Effect.gen(function* () {
const db = yield* DatabaseService;
return yield* db.getUser(input.id);
}),
{ layer: DatabaseServiceLive }
);Streaming Handler
import { Effect, Stream, Schedule } from "effect";
import { effectStreamingHandler } from "orpc-effect";
const streamHandler = effectStreamingHandler(
({ input }) => Effect.succeed(
// Create a stream that emits updates every second
Stream.fromSchedule(Schedule.spaced("1 second")).pipe(
Stream.map((n) => ({
count: n,
timestamp: new Date().toISOString()
})),
Stream.take(input.maxUpdates)
)
)
);Client-Side: Wrapped Client
Basic Setup
import { Effect, Stream, Layer } from "effect";
import { createORPCClient } from "@orpc/client";
import { wrapClientAsEffect } from "orpc-effect";
// Create the raw oRPC client
const rawClient = createORPCClient<typeof router>(link);
// Create Effect runtime (or use an existing one)
const runtime = await Effect.runPromise(Layer.toRuntime(Layer.empty));
// Wrap the client - all methods now return Streams
const client = wrapClientAsEffect(rawClient, runtime);Consuming Single Values
// All methods return Streams, even for single values
const stream = client.greet({ name: "World" });
// Collect the stream to get the result
const result = await Effect.runPromise(Stream.runCollect(stream));
const greeting = Array.from(result)[0];
console.log(greeting); // "Hello, World!"Consuming Streams
// Streaming methods return Streams with multiple elements
const stream = client.users.liveUpdates({ maxUpdates: 10 });
// Convert to AsyncIterable for consumption
for await (const update of Stream.toAsyncIterable(stream)) {
console.log("Update:", update);
}
// Or use Stream operations
await Effect.runPromise(
stream.pipe(
Stream.tap((update) => Effect.log(update)),
Stream.runDrain
)
);Error Handling
import { Effect, Exit, Cause } from "effect";
import { ServerError, TransportError, ErrorTools } from "orpc-effect";
const stream = client.users.get({ id: 999 });
const result = await Effect.runPromiseExit(Stream.runCollect(stream));
if (Exit.isFailure(result)) {
const cause = result.cause;
if (Cause.isFailType(cause) && cause.error instanceof ServerError) {
console.error("Server error:", cause.error.code);
} else if (Cause.isFailType(cause) && cause.error instanceof TransportError) {
console.error("Network error:", cause.error.cause);
}
}With Runtime Factory
import { createEffectOrpc } from "orpc-effect";
// Create a factory with your services pre-configured
const { wrap, runtime } = createEffectOrpc(MyServiceLayer);
// Wrap any client
const client = wrap(createORPCClient<typeof router>(link));
// Use the runtime for other Effect operations
await Effect.runPromise(myEffect, { runtime });Schema Bridge
Convert Effect Schemas to Standard Schema for oRPC:
import { Schema } from "effect";
import { standard } from "orpc-effect";
const UserSchema = Schema.Struct({
id: Schema.Number,
name: Schema.String,
email: Schema.String.pipe(Schema.pattern(/^[^\s@]+@[^\s@]+\.[^\s@]+$/)),
});
// Convert to Standard Schema for oRPC
const standardUserSchema = standard(UserSchema);
// Use in oRPC procedure definitions
const procedure = os
.input(standardUserSchema)
.handler(({ input }) => {
// input is fully typed based on the schema
return Effect.succeed(input);
});Advanced Patterns
Cancellation
Effect's AbortSignal integration allows proper cancellation:
const stream = client.longRunningTask({ data: largeDataset });
// Fork the stream collection
const fiber = Effect.runFork(Stream.runCollect(stream));
// Later: cancel the operation
await Effect.runPromise(Fiber.interrupt(fiber));
// The server will receive the cancellation signalBackpressure
Effect Streams automatically handle backpressure:
effectStreamingHandler(() =>
Effect.succeed(
Stream.fromIterable(largeDataset).pipe(
// Buffer elements if consumer is slow
Stream.buffer({ capacity: 100 }),
// Process with delays
Stream.mapEffect((item) =>
processItem(item).pipe(Effect.delay("100 millis"))
)
)
)
);Retries and Resilience
import { Schedule } from "effect";
const resilientHandler = effectHandler(
({ input }) =>
fetchFromExternalAPI(input).pipe(
// Retry with exponential backoff
Effect.retry({
schedule: Schedule.exponential("100 millis"),
times: 3
}),
// Fallback on permanent failure
Effect.catchAll((error) =>
Effect.succeed(getCachedValue(input))
)
)
);Composing Streams
effectStreamingHandler(({ input }) =>
Effect.succeed(
Stream.fromIterable(input.userIds).pipe(
// Fetch users concurrently
Stream.mapEffect(
(id) => fetchUser(id),
{ concurrency: 5 }
),
// Filter successful results
Stream.filterMap((user) => user.active ? Option.some(user) : Option.none()),
// Add metadata
Stream.map((user) => ({
...user,
fetchedAt: new Date().toISOString()
}))
)
)
);Error Types
All client errors extend OrpcClientError:
class ServerError extends Data.TaggedError("ServerError")<{
readonly cause: ORPCError;
readonly code: string;
readonly message: string;
readonly data?: unknown;
}> {}
class TransportError extends Data.TaggedError("TransportError")<{
readonly cause: unknown;
}> {}
class UnknownError extends Data.TaggedError("UnknownError")<{
readonly cause: unknown;
}> {}
class DecodeError extends Data.TaggedError("DecodeError")<{
readonly cause: unknown;
}> {}TypeScript Configuration
Ensure your tsconfig.json includes:
{
"compilerOptions": {
"strict": true,
"moduleResolution": "bundler",
"module": "ESNext",
"target": "ES2022"
}
}Examples
See the examples directory for complete working examples:
- nextjs-streaming - Next.js app with streaming demos ✅
- node-basic - Basic Node.js server and client
- react-client - React UI patterns
API Exports
Main Entry (orpc-effect)
// Client utilities
export { wrapClientAsEffect, toStream, createEffectOrpc, createEffectOrpcFromManagedRuntime };
// Error types
export { OrpcClientError, ServerError, TransportError, UnknownError, DecodeError, ErrorTools };
// Server utilities
export { effectHandler, effectStreamingHandler, mapTaggedErrorToORPC };
// Schema utilities
export { standard };Subpath Exports
orpc-effect/server- Server-only utilitiesorpc-effect/schema- Schema bridge utilities
Testing
Run the test suite:
pnpm testTest Coverage:
- ✅ 25/25 tests passing
- ✅ Unit tests for all modules
- ✅ Integration tests for client-server communication
- ✅ Error handling scenarios
- ✅ Cancellation and streaming
Performance Considerations
- Bundle Size: ~4.6 KB (ESM), ~5.1 KB (CJS)
- Tree-shakeable: Use subpath imports for smaller bundles
- Zero runtime overhead: Type-safe wrappers with minimal proxying
Compatibility
- Node.js: >= 18.0.0
- Effect: ^3.0.0
- oRPC: ^1.9.3
- TypeScript: ^5.6.0
Roadmap
- [x] Core client/server adapters
- [x] Schema bridging
- [x] Error handling utilities
- [x] Comprehensive test suite
- [x] Next.js streaming example
- [ ] Advanced streaming patterns guide
- [ ] Performance benchmarks
- [ ] Additional framework examples
- [ ] Migration guides
Contributing
Contributions welcome! Please ensure:
- All tests pass (
pnpm test) - Code is properly typed
- Effect APIs are used correctly (check against official docs)
- Examples are updated if APIs change
License
MIT
Credits
Built with:
Questions? Check the examples or open an issue.
