effect-redis
v0.0.32
Published
Simple Effect wrapper for Redis.
Readme
Effect-Redis
A modern, type-safe Effect-TS wrapper for
the official redis client.
| Feature | Description |
| :--- | :--- |
| Resource-safe | Automatic connection lifecycle via Scope and Layer.scoped |
| Modular services | Independent services for Core, Pub/Sub, and Streams |
| Shared connection | Multiple services reuse a single managed connection |
| Typed errors | Tagged error union (RedisConnectionError, RedisCommandError, RedisGeneralError) |
| Raw escape hatch | Direct access to the underlying node-redis client via use() |
Table of Contents
- Installation
- Quick Start
- Architecture
- Services & Layers
- Configuration
- Redis Service (Core)
- RedisPubSub Service
- RedisStream Service
- Error Model
- Types Reference
- Configuration Utilities
- Usage Examples
- Peer Dependencies
Installation
pnpm add effect-redisQuick Start
import { Effect, Layer, Stream } from 'effect';
import {
RedisConnectionOptionsLive,
RedisLive,
RedisPubSubLive,
Redis,
RedisPubSub,
} from 'effect-redis';
const program = Effect.gen(function* () {
const redis = yield* Redis;
const pubsub = yield* RedisPubSub;
yield* redis.set('user:42', JSON.stringify({ name: 'Ada' }));
const val = yield* redis.get('user:42');
yield* redis.set('counter', '1');
yield* redis.incr('counter');
const subscription = yield* pubsub.subscribe('notifications');
yield* Effect.fork(
Stream.runForEach(subscription, (msg) =>
Effect.log(`Received: ${msg}`)
)
);
yield* pubsub.publish('notifications', 'Hello world!');
});
const RedisLayer = RedisConnectionOptionsLive({
url: 'redis://localhost:6379',
}).pipe(
Layer.provideMerge(RedisLive),
Layer.provideMerge(RedisPubSubLive)
);
Effect.runPromise(program.pipe(Effect.provide(RedisLayer)));Architecture
RedisConnectionOptionsLive(options)
|
v
RedisConnectionLive (Layer.scoped — manages connection lifecycle)
/ | \
v v v
RedisLive RedisPubSubLive RedisStreamLive
(core) (pub/sub) (streams)All service layers attempt to reuse a shared RedisConnection from context. If
none is provided, each creates its own scoped connection. Services that require
dedicated connections (Pub/Sub subscriber, Stream consumer) always create
additional clients internally.
Services & Layers
| Service | Layer | Tag | Purpose |
| :--- | :--- | :--- | :--- |
| Redis | RedisLive | 'Redis' | Comprehensive key-value, hash, list, set, sorted set, and admin commands |
| RedisPubSub | RedisPubSubLive | 'RedisPubSub' | Publish/Subscribe messaging |
| RedisStream | RedisStreamLive | 'RedisStream' | Redis Streams (XADD, XREAD, XRANGE, continuous polling) |
| RedisConnection | RedisConnectionLive | 'RedisConnection' | Shared managed connection |
| RedisConnectionOptions | RedisConnectionOptionsLive(opts) | 'RedisConnectionOptions' | Connection configuration provider |
Layer Composition
import { Layer } from 'effect';
import {
RedisConnectionOptionsLive,
RedisLive,
RedisPubSubLive,
RedisStreamLive,
} from 'effect-redis';
const FullRedisLayer = RedisConnectionOptionsLive({
url: 'redis://localhost:6379',
}).pipe(
Layer.provideMerge(RedisLive),
Layer.provideMerge(RedisPubSubLive),
Layer.provideMerge(RedisStreamLive)
);Configuration
RedisConnectionOptionsLive accepts the same options as redis.createClient().
An optional second argument overrides the client factory (useful for testing).
// URL-based
RedisConnectionOptionsLive({ url: 'redis://localhost:6379' });
// Host/port-based
RedisConnectionOptionsLive({
socket: { host: '10.0.0.1', port: 6380 },
password: 'secret',
database: 2,
});
// Custom client factory (e.g. for mocking)
RedisConnectionOptionsLive(
{ url: 'redis://localhost:6379' },
(opts) => myMockClient(opts)
);Configuration Utilities
Helper functions for validating and working with RedisConfig:
| Function | Signature | Description |
| :--- | :--- | :--- |
| validateRedisConfig | (config: RedisConfig) => Effect<RedisConfig, RedisConnectionError> | Validates host/port ranges, URL format |
| mergeWithDefaults | (config?: Partial<RedisConfig>) => RedisConfig | Fills missing fields with defaults |
| getConnectionInfo | (config: RedisConfig) => string | Returns a safe-to-log connection string (no credentials) |
| DEFAULT_REDIS_CONFIG | { host: 'localhost', port: 6379, connectTimeout: 10000, commandTimeout: 5000 } | Default values |
Redis Service (Core)
The Redis service provides typed wrappers for common Redis commands. Every
method returns Effect<T, RedisError>.
Key-Value
| Method | Signature | Description |
| :--- | :--- | :--- |
| get | (key: string) => Effect<string \| null> | Get the value of a key |
| set | (key: string, value: RedisValue, options?: KeyOptions) => Effect<'OK' \| string \| null> | Set a key to a value with optional expiration, condition, and GET |
| del | (...keys: string[]) => Effect<number> | Delete one or more keys; returns the number of keys removed |
| exists | (...keys: string[]) => Effect<number> | Check how many of the given keys exist |
| expire | (key: string, seconds: number) => Effect<number> | Set a timeout on a key in seconds |
| pexpire | (key: string, milliseconds: number) => Effect<number> | Set a timeout on a key in milliseconds |
| ttl | (key: string) => Effect<number> | Get remaining time-to-live in seconds (-1 if no expiry, -2 if missing) |
| pttl | (key: string) => Effect<number> | Get remaining time-to-live in milliseconds |
| incr | (key: string) => Effect<number> | Atomically increment the integer value of a key by 1 |
| decr | (key: string) => Effect<number> | Atomically decrement the integer value of a key by 1 |
| incrby | (key: string, increment: number) => Effect<number> | Increment the integer value of a key by a given amount |
| decrby | (key: string, decrement: number) => Effect<number> | Decrement the integer value of a key by a given amount |
Hash
| Method | Signature | Description |
| :--- | :--- | :--- |
| hset | (key: string, field: string, value: RedisHashValue) => Effect<number> | Set a field in a hash |
| hget | (key: string, field: string) => Effect<string \| null> | Get the value of a hash field |
| hgetall | (key: string) => Effect<Record<string, string>> | Get all fields and values of a hash |
| hdel | (key: string, ...fields: string[]) => Effect<number> | Delete one or more hash fields |
| hexists | (key: string, field: string) => Effect<boolean> | Check if a hash field exists |
| hkeys | (key: string) => Effect<string[]> | Get all field names in a hash |
| hvals | (key: string) => Effect<string[]> | Get all values in a hash |
| hlen | (key: string) => Effect<number> | Get the number of fields in a hash |
List
| Method | Signature | Description |
| :--- | :--- | :--- |
| lpush | (key: string, ...values: RedisListValue[]) => Effect<number> | Prepend one or more values to a list |
| rpush | (key: string, ...values: RedisListValue[]) => Effect<number> | Append one or more values to a list |
| lpop | (key: string, count?: number) => Effect<string \| string[] \| null> | Remove and return element(s) from the head of a list |
| rpop | (key: string, count?: number) => Effect<string \| string[] \| null> | Remove and return element(s) from the tail of a list |
| lrange | (key: string, start: number, stop: number) => Effect<string[]> | Get a range of elements from a list |
| llen | (key: string) => Effect<number> | Get the length of a list |
| lrem | (key: string, count: number, element: RedisListValue) => Effect<number> | Remove elements matching a value from a list |
Set
| Method | Signature | Description |
| :--- | :--- | :--- |
| sadd | (key: string, ...members: RedisSetMember[]) => Effect<number> | Add one or more members to a set |
| srem | (key: string, ...members: RedisSetMember[]) => Effect<number> | Remove one or more members from a set |
| sismember | (key: string, member: RedisSetMember) => Effect<boolean> | Check if a value is a member of a set |
| smembers | (key: string) => Effect<string[]> | Get all members of a set |
| scard | (key: string) => Effect<number> | Get the number of members in a set |
Sorted Set
| Method | Signature | Description |
| :--- | :--- | :--- |
| zadd | (key: string, score: number, member: string, ...rest: (number \| string)[]) => Effect<number> | Add one or more members with scores to a sorted set |
| zrange | (key: string, start: number, stop: number, withScores?: boolean) => Effect<string[] \| { value: string; score: number }[]> | Get a range of members by index; optionally include scores |
| zrangebyscore | (key: string, min: number \| string, max: number \| string, withScores?: boolean) => Effect<string[] \| { value: string; score: number }[]> | Get members with scores within a given range |
| zscore | (key: string, member: string) => Effect<number \| null> | Get the score of a member in a sorted set |
| zrem | (key: string, ...members: string[]) => Effect<number> | Remove one or more members from a sorted set |
| zcard | (key: string) => Effect<number> | Get the number of members in a sorted set |
Admin & Utilities
| Method | Signature | Description |
| :--- | :--- | :--- |
| scan | (options?: ScanOptions) => Effect<ScanResult> | Incrementally iterate over keys with optional pattern and type filtering |
| ping | (message?: string) => Effect<string> | Test connection liveness; returns PONG or the provided message |
| dbsize | () => Effect<number> | Get the number of keys in the current database |
| flushdb | () => Effect<'OK'> | Remove all keys from the current database |
| flushall | () => Effect<'OK'> | Remove all keys from all databases |
| quit | () => Effect<'OK'> | Gracefully close the connection |
| disconnect | () => Effect<void> | Force-close the connection immediately |
Raw Access
| Method | Signature | Description |
| :--- | :--- | :--- |
| use | <T>(fn: (client: RedisClient) => T) => Effect<Awaited<T>> | Execute any operation on the raw node-redis client |
| execute | <A>(command: string, ...args: (string \| number \| Buffer)[]) => Effect<A> | Send a raw Redis command |
| multi | (commands: [string, ...(string \| number \| Buffer)[]][]) => Effect<unknown[]> | Execute a MULTI/EXEC transaction |
RedisPubSub Service
Provides publish/subscribe messaging with separate connections for publish (shared) and subscribe (dedicated) to avoid blocking.
| Method | Signature | Description |
| :--- | :--- | :--- |
| publish | (channel: string, message: string) => Effect<void> | Publish a message to a channel |
| subscribe | (channel: string) => Effect<Stream<string>> | Subscribe to a channel; returns a continuous Stream of messages |
Connection Model
- Publish: reuses the shared
RedisConnectionif available - Subscribe: always creates a dedicated connection (Redis requires a separate client for blocking subscribe operations)
RedisStream Service
Provides Redis Streams operations with separate producer/consumer connections. The producer reuses the shared connection; the consumer always gets a dedicated client for blocking reads.
| Method | Signature | Description |
| :--- | :--- | :--- |
| xadd | (key, id, message) => Effect<string> | Append an entry to a stream; use '*' for auto-generated IDs |
| xread | (key, id, options?) => Effect<StreamEntry[]> | Read entries from a stream (supports blocking) |
| xrange | (key, start, end, options?) => Effect<StreamEntry[]> | Read a range of entries ('-' to '+' for all) |
| subscribe | (key, options?) => Stream<StreamEntry> | Continuous polling stream (blocks and yields new entries) |
| xack | (key, group, ...ids) => Effect<number> | Acknowledge processed entries in a consumer group |
StreamEntry
interface StreamEntry {
id: RedisArgument; // e.g. "1234567890-0"
data: Record<string, string>;
}StreamSubscribeOptions
interface StreamSubscribeOptions {
readonly id?: string; // Start ID (default: '$' — new entries only)
readonly block?: number; // Block time in ms (default: 5000)
readonly count?: number; // Max entries per read
}Error Model
Every operation fails with RedisError, a discriminated union of three tagged
error types. All extend Data.TaggedError for pattern matching with
Effect.catchTag.
class RedisConnectionError extends Data.TaggedError('RedisConnectionError')<{
readonly cause: unknown;
readonly message: string;
}> {}
class RedisCommandError extends Data.TaggedError('RedisCommandError')<{
readonly cause: unknown;
readonly message: string;
readonly command: string;
}> {}
class RedisGeneralError extends Data.TaggedError('RedisGeneralError')<{
readonly cause: unknown;
readonly message: string;
}> {}
type RedisError = RedisConnectionError | RedisCommandError | RedisGeneralError;| Error | Tag | Fields | When |
| :--- | :--- | :--- | :--- |
| RedisConnectionError | 'RedisConnectionError' | cause, message | Connection failures (ECONNREFUSED, ETIMEDOUT, etc.) |
| RedisCommandError | 'RedisCommandError' | cause, message, command | Command execution failures (WRONGTYPE, NOAUTH, etc.) |
| RedisGeneralError | 'RedisGeneralError' | cause, message | All other Redis-related failures |
The toRedisError utility automatically classifies raw errors into the correct
type using error codes and message pattern matching.
import { Effect } from 'effect';
import { Redis } from 'effect-redis';
const program = Effect.gen(function* () {
const redis = yield* Redis;
yield* redis.get('my-key').pipe(
Effect.catchTag('RedisConnectionError', (e) =>
Effect.logError(`Connection lost: ${e.message}`)
),
Effect.catchTag('RedisCommandError', (e) =>
Effect.logError(`Command ${e.command} failed: ${e.message}`)
)
);
});Types Reference
Value Types
| Type | Definition |
| :--- | :--- |
| RedisValue | string \| number \| Buffer |
| RedisHashValue | string \| number \| Buffer \| Record<string, string \| number \| Buffer> |
| RedisSetMember | string \| Buffer |
| RedisListValue | string \| number \| Buffer |
KeyOptions (for set)
interface KeyOptions {
readonly expiration?: {
readonly mode: 'EX' | 'PX' | 'EXAT' | 'PXAT';
readonly time: number;
};
readonly condition?: 'NX' | 'XX';
readonly get?: boolean; // When true, SET returns the old value
}ScanOptions / ScanResult
interface ScanOptions {
readonly cursor?: string;
readonly match?: string; // Glob pattern (e.g. 'user:*')
readonly count?: number;
readonly type?: 'string' | 'list' | 'set' | 'zset' | 'hash' | 'stream';
}
interface ScanResult {
readonly cursor: string;
readonly keys: readonly string[];
}RedisConfig
type RedisConfig =
| { host: string; port: number; username?: string; password?: string;
database?: number; ssl?: boolean; connectTimeout?: number;
commandTimeout?: number; }
| { url: string; connectTimeout?: number; commandTimeout?: number; };Usage Examples
1. Retry with Exponential Backoff
import { Effect, Stream, Schedule, Duration } from 'effect';
import { RedisPubSub } from 'effect-redis';
const program = Effect.gen(function* () {
const pubsub = yield* RedisPubSub;
const subscription = yield* pubsub.subscribe('raw-data');
yield* subscription.pipe(
Stream.tap((msg) => Effect.log(`Received: ${msg}`)),
Stream.retry(
Schedule.exponential(Duration.seconds(1)).pipe(
Schedule.intersect(Schedule.recurs(5))
)
),
Stream.runDrain
);
});2. Schema-Validated Stream Processing
import { Effect, Stream, Schema, pipe } from 'effect';
import { RedisPubSub } from 'effect-redis';
const TradeSchema = Schema.Struct({
symbol: Schema.String,
price: Schema.Number,
});
const program = Effect.gen(function* () {
const pubsub = yield* RedisPubSub;
const trades = yield* pubsub.subscribe('trades');
yield* pipe(
trades,
Stream.map(JSON.parse),
Stream.mapEffect((data) =>
Schema.decodeUnknown(TradeSchema)(data).pipe(
Effect.tapError((err) => Effect.logWarning(`Invalid trade: ${err}`)),
Effect.option
)
),
Stream.filterMap((opt) => opt),
Stream.tap((trade) =>
Effect.log(`Valid trade: ${trade.symbol} @ ${trade.price}`)
),
Stream.runDrain
);
});3. Raw Client & Transactions
import { Effect } from 'effect';
import { Redis } from 'effect-redis';
const program = Effect.gen(function* () {
const redis = yield* Redis;
const results = yield* redis.use((client) =>
client.multi().set('key1', 'val1').set('key2', 'val2').exec()
);
yield* Effect.log(`Transaction results: ${JSON.stringify(results)}`);
});4. Continuous Stream Polling
import { Effect, Stream, Ref } from 'effect';
import { RedisStream } from 'effect-redis';
const program = Effect.gen(function* () {
const stream = yield* RedisStream;
const lastId = yield* Ref.make('$');
const events = stream
.subscribe('app-events', {
id: yield* Ref.get(lastId),
block: 5000,
})
.pipe(Stream.tap((entry) => Ref.set(lastId, String(entry.id))));
yield* Stream.runForEach(events, (entry) =>
Effect.log(`Event ${entry.id}: ${JSON.stringify(entry.data)}`)
);
});5. SET with Expiration and Conditions
import { Effect } from 'effect';
import { Redis } from 'effect-redis';
const program = Effect.gen(function* () {
const redis = yield* Redis;
// Set only if key does not exist, expire in 60 seconds
yield* redis.set('lock:resource', 'owner-1', {
condition: 'NX',
expiration: { mode: 'EX', time: 60 },
});
// Set and return the previous value
const oldValue = yield* redis.set('config:version', '2', { get: true });
});Peer Dependencies
| Package | Version |
| :--- | :--- |
| redis | ^5.1.0 |
| typescript | ^5 |
| effect | (provided via workspace catalog) |
