@hiliosai/datasources
v1.2.1
Published
Extensible datasource classes for building data access layers
Readme
@hiliosai/datasources
Extensible datasource classes for building robust data access layers in microservices.
Installation
npm install @hiliosai/datasources
# or
yarn add @hiliosai/datasources
# or
bun add @hiliosai/datasourcesFeatures
- 🎯 Type-safe: Full TypeScript support with proper type inference
- 🔌 Extensible: Easy to extend base classes for custom datasources
- 🚀 Performance: Built-in performance monitoring and caching
- 🛡️ Error Handling: Comprehensive error handling with proper context
- 📊 Pagination: Built-in pagination helpers
- 🔄 Transactions: Transaction support for Prisma datasources
- 💾 Caching: Redis-based caching with tags and invalidation
- 🔒 Multi-tenancy: Built-in tenant isolation support
Usage
PrismaDataSource
Create a Prisma-based datasource for your data models:
import {PrismaDataSource} from '@hiliosai/datasources';
import type {PrismaClient, User} from '@prisma/client';
export class UserDatasource extends PrismaDataSource<PrismaClient> {
async findById(id: string): Promise<User | null> {
return this.executeQuery('findById', () =>
this.prisma.user.findUnique({
where: this.applyFilters({id})
})
);
}
async findByEmail(email: string): Promise<User | null> {
return this.executeQuery('findByEmail', () =>
this.prisma.user.findFirst({
where: this.applyFilters({email})
})
);
}
async create(data: CreateUserInput): Promise<User> {
return this.executeTransaction('createUser', async (tx) => {
const user = await tx.user.create({
data: this.applyFilters(data)
});
// Create related records in transaction
await tx.profile.create({
data: {userId: user.id}
});
return user;
});
}
async findAll(page = 1, pageSize = 20) {
return this.executePaginatedQuery(
'findAll',
() => this.prisma.user.count({
where: this.applyFilters()
}),
(pagination) => this.prisma.user.findMany({
where: this.applyFilters(),
...pagination
}),
page,
pageSize
);
}
async softDeleteUser(id: string): Promise<User> {
return this.softDelete('user', {id});
}
}
// Usage in a Moleculer service
const userDatasource = new UserDatasource(ctx, {
tenantId: ctx.meta.tenantId,
softDelete: true
});
const user = await userDatasource.findById('123');CacheDataSource
Create a cache-based datasource for high-performance data access:
import {CacheDataSource} from '@hiliosai/datasources';
export class SessionDatasource extends CacheDataSource {
async getSession(sessionId: string) {
return this.getOrSet(
`session:${sessionId}`,
async () => {
// Fetch from database if not cached
return await this.fetchSessionFromDB(sessionId);
},
{ttl: 3600} // Cache for 1 hour
);
}
async setUserSession(userId: string, sessionData: any) {
return this.set(
`user-session:${userId}`,
sessionData,
{
ttl: 7200, // 2 hours
tags: ['user-sessions', `user:${userId}`]
}
);
}
async invalidateUserSessions(userId: string) {
// Invalidate all sessions for a user using tags
return this.invalidateByTag(`user:${userId}`);
}
async incrementLoginAttempts(email: string): Promise<number> {
const key = `login-attempts:${email}`;
const attempts = await this.increment(key);
// Set expiration if this is the first attempt
if (attempts === 1) {
await this.expire(key, 900); // 15 minutes
}
return attempts;
}
}
// Usage
const sessionDs = new SessionDatasource(ctx, {
keyPrefix: 'myapp:',
defaultTTL: 3600
});
const session = await sessionDs.getSession('session-123');Combining PrismaDataSource with Caching
import {PrismaDataSource, CacheDataSource} from '@hiliosai/datasources';
export class CachedUserDatasource extends PrismaDataSource<PrismaClient> {
private cache: CacheDataSource;
constructor(context: DataSourceContext, config: PrismaDataSourceConfig = {}) {
super(context, config);
this.cache = new CacheDataSource(context, {
keyPrefix: 'user:',
defaultTTL: 300 // 5 minutes
});
}
async findById(id: string): Promise<User | null> {
return this.cache.getOrSet(
`id:${id}`,
() => super.executeQuery('findById', () =>
this.prisma.user.findUnique({
where: {id}
})
),
{ttl: 600}
);
}
async update(id: string, data: UpdateUserInput): Promise<User> {
const user = await super.executeQuery('update', () =>
this.prisma.user.update({
where: {id},
data
})
);
// Invalidate cache
await this.cache.delete(`id:${id}`);
await this.cache.invalidateByTag(`user:${id}`);
return user;
}
}API Reference
DataSource
Base class for all datasources.
Methods
callAction(actionName, params?, opts?)- Call a Moleculer actionemitEvent(eventName, payload?, opts?)- Emit an eventlog(level, message, meta?)- Log messageshandleError(error, operation)- Handle errors uniformlymeasurePerformance(operation, fn)- Measure operation performancevalidate(data, schema?)- Validate input parametersdispose()- Clean up resources
PrismaDataSource
Extends DataSource for Prisma ORM integration.
Methods
getTenantPrisma()- Get tenant-scoped Prisma clientapplyFilters(where?, options?)- Apply tenant and soft delete filtersexecuteQuery(operation, queryFn)- Execute a Prisma query with error handlingexecuteTransaction(operation, transactionFn)- Execute a transactionexecutePaginatedQuery(operation, countFn, queryFn, page?, pageSize?)- Execute paginated queryexecuteBatch(items, batchSize, operation, processFn)- Batch operationssoftDelete(model, where)- Soft delete recordsrestore(model, where)- Restore soft deleted recordsexists(model, where)- Check if record exists
CacheDataSource
Extends DataSource for caching capabilities.
Methods
get(key, options?)- Get value from cacheset(key, value, options?)- Set value in cachedelete(key)- Delete from cacheexists(key)- Check if key existsgetOrSet(key, fetchFn, options?)- Cache-aside patternincrement(key, amount?)- Increment counterdecrement(key, amount?)- Decrement counterexpire(key, ttl)- Set expirationttl(key)- Get remaining TTLclear(pattern?)- Clear cache by patterninvalidateByTag(tag)- Invalidate by tagmget(keys)- Batch getmset(entries, options?)- Batch setacquireLock(key, ttl?, retries?, retryDelay?)- Distributed lockreleaseLock(key)- Release lock
Configuration
PrismaDataSource Configuration
interface PrismaDataSourceConfig {
prismaClient?: PrismaClient; // Custom Prisma client instance
tenantId?: string; // Tenant ID for multi-tenancy
softDelete?: boolean; // Enable soft delete
includeDeleted?: boolean; // Include soft deleted records
name?: string; // Datasource name for logging
debug?: boolean; // Enable debug logging
}CacheDataSource Configuration
interface CacheDataSourceConfig {
redis?: Redis; // Redis client instance
keyPrefix?: string; // Key prefix (default: 'cache:')
defaultTTL?: number; // Default TTL in seconds (default: 3600)
json?: boolean; // Auto JSON serialization (default: true)
compression?: boolean; // Enable compression (future feature)
name?: string; // Datasource name for logging
debug?: boolean; // Enable debug logging
}Integration with @hiliosai/core
For production applications, use the PrismaClientFactory from @hiliosai/core for optimal connection management:
npm install @hiliosai/coreimport { PrismaDataSource } from '@hiliosai/datasources';
// Automatically uses PrismaClientFactory for best practices
export class UserDataSource extends PrismaDataSource<PrismaClient> {
constructor(context: DataSourceContext) {
super(context, {
useClientFactory: true, // Use factory pattern (default)
tenantId: context.meta?.tenantId,
softDelete: true,
clientConfig: {
logging: process.env.NODE_ENV === 'development',
maxConnections: 10
}
});
}
}The factory provides:
- ✅ Singleton pattern to prevent connection pool multiplication
- ✅ Multi-tenant client management with automatic cleanup
- ✅ Hot reload compatibility for development
- ✅ Graceful shutdown handling
- ✅ Memory leak prevention
- ✅ Connection monitoring and health checks
Moleculer Integration
Use the PrismaFactoryMixin from @hiliosai/core in your services:
import { PrismaFactoryMixin } from '@hiliosai/core';
export default {
name: 'users',
mixins: [PrismaFactoryMixin()],
actions: {
async create(ctx) {
// this.prisma is automatically available
return this.executeDbOperation('createUser', (client) =>
client.user.create({ data: ctx.params })
);
},
async health(ctx) {
return this.callAction('$prisma.health', {}, { meta: ctx.meta });
}
}
};Best Practices
- Use PrismaClientFactory: Always use
@hiliosai/corefor production applications - Extend, don't modify: Create your own datasource classes by extending the base classes
- Use transactions: For operations that modify multiple records, use transactions
- Cache strategically: Cache frequently accessed, rarely changed data
- Handle errors: Always use try-catch or the built-in error handlers
- Monitor performance: Use the built-in performance monitoring
- Tenant isolation: Always configure tenantId for multi-tenant applications
- Resource cleanup: The factory handles cleanup automatically, but call
dispose()for manual clients
License
MIT
