piper-utils
v1.1.55
Published
Utility library for Piper
Downloads
2,226
Readme
Piper Utils
Production-ready utilities for building AWS Lambda microservices with Cognito authentication and Sequelize ORM. Simplifies common patterns for API Gateway responses, database queries, access control, and AWS service integrations.
Table of Contents
Overview
Piper Utils provides battle-tested utilities extracted from production microservices, focusing on:
- Lambda Integration: Request/response helpers for API Gateway
- Cognito Auth: User context extraction and access control
- Sequelize Helpers: Query string to ORM translation
- AWS Services: S3 and SNS utilities
- Event Management: S3 bucket watching and file processing
- Error Handling: Consistent error responses
- Audit Trail: Automatic change tracking
Installation
npm install piper-utilsCore Concepts
Import Everything You Need
import {
// Authentication & Access
accessRightsUtils,
checkModule,
checkWriteAccess,
getCurrentUser,
userDefaultBid,
// Database Queries
createFilters,
createSort,
createIncludes,
defaultFilters,
findAll,
// Request/Response
parseBody,
success,
failure,
// Event Management
watchBucket,
handleFile,
handleEvents,
publishEvents,
// AWS Services
S3Utils,
SNSUtils
} from 'piper-utils';Real-World Examples
Complete Lambda Handler
A production-ready Lambda function with authentication, validation, and database queries:
import {
accessRightsUtils,
checkModule,
createFilters,
createSort,
failure,
findAll,
success
} from 'piper-utils';
import Customer from './models/customer';
export async function getCustomers(event) {
try {
// 1. Check module permissions
checkModule('customer', event);
// 2. Get user's allowed business IDs from Cognito
const businessIds = accessRightsUtils(event, { useCognitoBid: true });
// 3. Parse query parameters
const queryParams = event.queryStringParameters || {};
// 4. Build Sequelize filters from query string
// Example: ?status=Active&createdAt[gte]=2024-01-01&sort=-lastOrderDate&limit=20
const where = createFilters(event, customerFilter);
const order = createSort(event, customerSort);
// 5. Add access control to query
where.businessId = businessIds;
// 6. Execute query
const customers = await findAll(Customer, {
where,
order,
limit: parseInt(queryParams.limit || '10'),
offset: parseInt(queryParams.offset || '0')
});
// 7. Return standardized response
return success(customers);
} catch (err) {
// Automatic error formatting
return failure(err);
}
}Authentication & Access Control
Extract user context and enforce permissions:
import {
getCurrentUser,
checkWriteAccess,
accessRightsUtils,
userDefaultBid
} from 'piper-utils';
export async function updateCustomer(event) {
try {
// Get user from Cognito claims
const user = getCurrentUser(event);
// user = { id, username, email, groups, jwt }
// Verify write permissions (throws if unauthorized)
const businessId = checkWriteAccess(event);
// Get all business IDs user can access
const allowedBusinessIds = accessRightsUtils(event, {
useCognitoBid: true
});
// Get user's default business
const defaultBid = userDefaultBid(event);
// Parse request body
const updates = parseBody(event);
// Secure query with access control
const customer = await Customer.findOne({
where: {
id: event.pathParameters.id,
businessId: allowedBusinessIds // Auto-filtered by permissions
}
});
if (!customer) {
throw { code: 'NOT_FOUND', statusCode: 404 };
}
// Update with audit trail
customer.set({
...updates,
updatedBy: user.id,
updatedAt: new Date()
});
await customer.save();
return success(customer);
} catch (e) {
return failure(e);
}
}Database Query Building
Convert API query strings to Sequelize queries automatically:
import {
createFilters,
createSort,
defaultFilters
} from 'piper-utils';
import { Op } from 'sequelize';
// 1. Define your model schema
const orderSchema = {
orderNumber: { type: db.STRING },
status: { type: db.STRING },
total: { type: db.DECIMAL },
customerId: { type: db.INTEGER },
shippingMethod: { type: db.STRING },
createdAt: { type: db.DATE },
businessId: { type: db.STRING }
};
// 2. Create filter configuration with relations
export const orderFilter = defaultFilters(orderSchema, {
customer: customerSchema, // Enable customer.* filtering
items: orderItemSchema // Enable items.* filtering
});
// 3. Use in Lambda handler
export async function searchOrders(event) {
// Query: /orders?status=Shipped&total[gte]=100&[email protected]&sort=-createdAt
const where = createFilters(event, orderFilter);
// Produces: {
// status: 'Shipped',
// total: { [Op.gte]: 100 },
// '$customer.email$': '[email protected]'
// }
const order = createSort(event, orderFilter);
// Produces: [['createdAt', 'DESC']]
const orders = await Order.findAll({
where,
order,
include: [
{ model: Customer, as: 'customer' },
{ model: OrderItem, as: 'items' }
]
});
return success(orders);
}Transaction Management
Handle complex operations with automatic rollback:
import {
parseBody,
getCurrentUser,
checkWriteAccess,
success,
failure
} from 'piper-utils';
import db from 'sequelize';
export async function createOrder(event) {
const t = await db.getConnection().transaction();
try {
const user = getCurrentUser(event);
const businessId = checkWriteAccess(event);
const body = parseBody(event);
// Validate input
await orderSchema.validateAsync(body);
// Create order in transaction
const order = await Order.create({
...body,
businessId,
status: 'Pending',
createdBy: user.id
}, { transaction: t });
// Create order items
const items = await Promise.all(
body.items.map(item =>
OrderItem.create({
...item,
orderId: order.id
}, { transaction: t })
)
);
// Update inventory
for (const item of body.items) {
const inventory = await Inventory.findOne({
where: { partId: item.partId },
transaction: t
});
if (!inventory || inventory.quantity < item.quantity) {
throw {
code: 'INSUFFICIENT_INVENTORY',
statusCode: 400,
partId: item.partId
};
}
inventory.quantity -= item.quantity;
await inventory.save({ transaction: t });
}
// Send notification
await SNSUtils.publish({
topicArn: process.env.ORDER_TOPIC,
message: {
type: 'ORDER_CREATED',
orderId: order.id,
customerId: order.customerId,
total: order.total
}
});
await t.commit();
return success({ order, items });
} catch (e) {
await t.rollback();
return failure(e);
}
}Error Handling
Consistent error responses with custom codes:
import { failure } from 'piper-utils';
// Define your error catalog
const errorList = {
notFound: {
code: 'NOT_FOUND',
statusCode: 404,
message: 'Resource not found'
},
customerNotFound: {
code: 'CUSTOMER_NOT_FOUND',
statusCode: 404,
message: 'Customer does not exist'
},
insufficientInventory: {
code: 'INSUFFICIENT_INVENTORY',
statusCode: 400,
message: 'Not enough inventory to fulfill order'
},
duplicateEmail: {
code: 'DUPLICATE_EMAIL',
statusCode: 409,
message: 'Email address already exists'
}
};
export async function createCustomer(event) {
try {
const body = parseBody(event);
const businessId = checkWriteAccess(event);
// Check for duplicate email
const existing = await Customer.findOne({
where: {
email: body.email,
businessId
}
});
if (existing) {
throw errorList.duplicateEmail;
}
// Create customer
const customer = await Customer.create({
...body,
businessId,
status: 'Active'
});
return success(customer);
} catch (e) {
// failure() automatically formats based on error structure
return failure(e);
// Returns: {
// statusCode: 409,
// body: JSON.stringify({
// code: 'DUPLICATE_EMAIL',
// message: 'Email address already exists'
// })
// }
}
}Event Manager
The Event Manager module provides utilities for watching S3 buckets and processing file events, perfect for building data ingestion pipelines.
watchBucket
Monitor an S3 bucket for new files and trigger processing:
import { watchBucket } from 'piper-utils';
// Basic usage - watch for new files
watchBucket({
bucket: process.env.AWS_S3_BUCKET,
onObjectCreated: async (event) => {
console.log('New file detected:', event.s3.object.key);
// Process the file
}
});
// Lambda function triggered by S3 events
export async function s3EventHandler(event) {
const { handleEvents } = require('piper-utils');
// Process S3 event records
for (const record of event.Records) {
if (record.eventName.startsWith('ObjectCreated')) {
await handleEvents(record);
}
}
return { statusCode: 200 };
}handleFile
Process individual files from S3:
import { handleFile, S3Utils } from 'piper-utils';
export async function processUploadedFile(bucket, key) {
try {
// handleFile retrieves and processes the file
const result = await handleFile({
bucket,
key,
processor: async (fileContent) => {
// Custom processing logic
const data = JSON.parse(fileContent);
// Transform data
const transformed = data.map(item => ({
...item,
processedAt: new Date()
}));
return transformed;
}
});
return result;
} catch (e) {
console.error('File processing failed:', e);
throw e;
}
}handleEvents
Orchestrate batch event processing:
import { handleEvents, success, failure } from 'piper-utils';
// Lambda handler for batch processing
export async function batchProcessor(event) {
try {
// Process multiple S3 events
const results = await handleEvents(event, {
parallel: true, // Process files in parallel
maxConcurrency: 5,
onError: (error, record) => {
// Custom error handling
console.error(`Failed to process ${record.s3.object.key}:`, error);
}
});
return success({
processed: results.length,
results
});
} catch (e) {
return failure(e);
}
}publishEvents
Publish processed events to SNS:
import { publishEvents } from 'piper-utils';
export async function processAndPublish(events) {
try {
// Process events
const processedEvents = events.map(event => ({
id: event.id,
type: 'FILE_PROCESSED',
timestamp: new Date(),
data: event
}));
// Publish to SNS
await publishEvents({
topicArn: process.env.EVENT_TOPIC,
events: processedEvents,
batchSize: 10 // Send in batches of 10
});
return { published: processedEvents.length };
} catch (e) {
console.error('Failed to publish events:', e);
throw e;
}
}Complete File Ingestion Pipeline Example
Here's a complete example of a file ingestion pipeline using all event manager utilities:
import {
watchBucket,
handleFile,
handleEvents,
publishEvents,
success,
failure
} from 'piper-utils';
// Lambda function for CSV file ingestion
export async function csvIngestionPipeline(event) {
try {
// 1. Set up bucket watching (for local development)
if (process.env.IS_LOCAL) {
watchBucket({
bucket: process.env.DATA_BUCKET,
prefix: 'uploads/',
suffix: '.csv',
onObjectCreated: async (s3Event) => {
await processCSVFile(s3Event);
}
});
}
// 2. Handle S3 event (for Lambda)
const results = await handleEvents(event, {
processor: processCSVFile
});
return success({
processed: results.length,
files: results
});
} catch (e) {
return failure(e);
}
}
async function processCSVFile(s3Event) {
const bucket = s3Event.s3.bucket.name;
const key = s3Event.s3.object.key;
// 3. Process the file
const data = await handleFile({
bucket,
key,
processor: async (content) => {
// Parse CSV
const rows = parseCSV(content);
// Validate and transform
const validRows = rows.filter(row => validateRow(row));
// Save to database
await Customer.bulkCreate(validRows);
return {
total: rows.length,
valid: validRows.length,
invalid: rows.length - validRows.length
};
}
});
// 4. Publish completion event
await publishEvents({
topicArn: process.env.INGESTION_TOPIC,
events: [{
type: 'CSV_INGESTED',
bucket,
key,
stats: data,
timestamp: new Date()
}]
});
return data;
}S3 Event Configuration
To use these utilities with Lambda, configure your S3 bucket to trigger your Lambda function:
Serverless Framework:
functions:
fileProcessor:
handler: src/handlers/fileProcessor.handler
events:
- s3:
bucket: ${self:custom.dataBucket}
event: s3:ObjectCreated:*
rules:
- prefix: uploads/
- suffix: .csvAWS CDK:
import * as s3 from '@aws-cdk/aws-s3';
import * as lambda from '@aws-cdk/aws-lambda';
import * as s3n from '@aws-cdk/aws-s3-notifications';
const bucket = new s3.Bucket(this, 'DataBucket');
const processorFunction = new lambda.Function(this, 'Processor', {
// ... function config
});
bucket.addEventNotification(
s3.EventType.OBJECT_CREATED,
new s3n.LambdaDestination(processorFunction),
{ prefix: 'uploads/', suffix: '.csv' }
);API Reference
Authentication Functions
getCurrentUser(event)- Extract user from Cognito claimscheckWriteAccess(event)- Verify write permissions, returns businessIdcheckModule(moduleName, event)- Check module access permissionsaccessRightsUtils(event, options)- Get user's accessible business IDsuserDefaultBid(event)- Get user's default business ID
Database Functions
createFilters(event, filterConfig)- Convert query params to WHERE clausecreateSort(event, sortConfig)- Convert sort param to ORDER BYcreateIncludes(event)- Build include array for relationsdefaultFilters(schema, relations)- Create filter configurationfindAll(Model, options)- Execute findAll with options
Request/Response Functions
parseBody(event)- Parse JSON request bodysuccess(data, options)- Format success responsefailure(error, options)- Format error response
Event Manager Functions
watchBucket(options)- Monitor S3 bucket for new filesbucket- S3 bucket nameprefix- Optional key prefix filtersuffix- Optional key suffix filteronObjectCreated- Callback for new objects
handleFile(options)- Process a single file from S3bucket- S3 bucket namekey- Object keyprocessor- Async function to process file content
handleEvents(event, options)- Process batch of S3 eventsevent- S3 event from Lambdaparallel- Process files in parallel (default: false)maxConcurrency- Max parallel operationsprocessor- Custom processor functiononError- Error handler callback
publishEvents(options)- Publish events to SNStopicArn- SNS topic ARNevents- Array of events to publishbatchSize- Events per SNS message
Query String Parameters
Supported operators in query strings:
?field=value- Exact match?field[gte]=100- Greater than or equal?field[gt]=100- Greater than?field[lte]=100- Less than or equal?field[lt]=100- Less than?field[ne]=value- Not equal?field[in]=value1,value2- In array?field[like]=%pattern%- SQL LIKE?sort=-field- Sort descending (no prefix for ascending)?limit=20&offset=40- Pagination
AWS Service Utilities
S3Utils
const s3 = new S3Utils();
await s3.getObject({ Bucket, Key });
await s3.putObject({ Bucket, Key, Body });SNSUtils
await SNSUtils.publish({
topicArn: 'arn:aws:sns:...',
message: { type: 'EVENT', data: {} },
attributes: { source: 'order-service' }
});Testing
Run tests with Jasmine:
npm testContributing
- Fork the repository
- Create your feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
License
MIT License - Copyright (c) Piper
