@othree.io/awsome
v5.0.0
Published
Functional TypeScript wrappers for AWS SDK v3 services with dependency injection and Optional-based error handling
Maintainers
Readme
@othree.io/awsome
Functional TypeScript wrappers for AWS SDK v3 services. Every function follows a curried deps pattern for dependency injection, returns Optional<T> for safe error handling, and is fully testable in isolation.
Install
npm install @othree.io/awsomeAWS SDK clients are peer dependencies — install only the ones you need:
npm install @aws-sdk/client-dynamodb @aws-sdk/util-dynamodb @othree.io/optionalServices
| Module | AWS Service | Key Functions |
|--------|------------|---------------|
| dynamo | DynamoDB | upsert, query, getBy, filterBy, getBatchBy, deleteBy, getAll |
| s3 | S3 | get, put, list, deleteObject, getPublicUrl |
| sqs | SQS | send, sendBatch, createQueue, deleteQueue, receiveMessages, getApproximateMessageCount, changeMessageVisibilityTimeout, getQueueUrl |
| sns | SNS | send, sendAll, subscribeSqs, unsubscribe |
| lambda | Lambda | invoke, query, command, request, unwrap |
| secretsManager | Secrets Manager | get, put |
| ses | SES | sendMimeEmail |
| kms | KMS | sign, getPublicKey |
| cloudwatch | CloudWatch | countFunctionInvocation, incrementFunctionInvocationCount, countFetchStatus |
| apigateway | API Gateway | getAllApiKeysTags |
| batch | Batch | submitJob, getJobStatus |
Usage
Every function takes a deps object as its first curried parameter containing the AWS client and any configuration. All results are wrapped in Optional<T> from @othree.io/optional.
DynamoDB
import { dynamo } from '@othree.io/awsome'
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
const deps = {
dynamoDb: new DynamoDBClient({}),
configuration: { TableName: 'Users' }
}
// Upsert an item
const result = await dynamo.upsert(deps)({ id: '123', name: 'Alice' })
result.map(user => console.log('Saved:', user.name))
// Query by keys
const users = await dynamo.query(deps)({ organizationId: 'org-1' })
users.map(items => items.forEach(u => console.log(u.name)))
// Query with filters
const filtered = await dynamo.query(deps)(
{ organizationId: 'org-1' },
{ status: 'active' }
)
// Batch get
const batch = await dynamo.getBatchBy(deps)([
{ id: '1' },
{ id: '2' },
{ id: '3' }
])
// Delete
await dynamo.deleteBy(deps)({ id: '123' })
// Scan all items
const all = await dynamo.getAll(deps)()SQS
import { sqs } from '@othree.io/awsome'
import { SQSClient } from '@aws-sdk/client-sqs'
const client = new SQSClient({})
// Send a message
await sqs.send({
client,
configuration: { QueueUrl: 'https://sqs.../MyQueue' }
})({ orderId: '123', total: 49.99 })
// Send with FIFO options
await sqs.send({
client,
configuration: { QueueUrl: 'https://sqs.../MyQueue.fifo' },
withMessageGroupId: (order) => order.customerId,
withDeduplicationId: (order) => order.orderId,
delay: 5
})({ orderId: '123', customerId: 'cust-1', total: 49.99 })
// Send a batch
await sqs.sendBatch({
client,
configuration: { QueueUrl: 'https://sqs.../MyQueue' },
withMessageId: (order) => order.orderId
})(orders)
// Receive and transform messages
const messages = await sqs.receiveMessages({
client,
configuration: {
QueueUrl: 'https://sqs.../MyQueue',
maxNumberOfMessages: 10,
waitTimeSeconds: 20,
autoAcknowledgeMessages: true
},
transformMessage: (msg) => JSON.parse(msg.Body!)
})()
// Create a FIFO queue
const queue = await sqs.createQueue({ client })({
queueName: 'OrderEvents.fifo',
fifo: {
contentBasedDeduplication: true,
deduplicationScope: 'messageGroup',
throughputLimit: 'perMessageGroupId'
}
})
// Convert ARN to URL (returns Optional)
const maybeUrl = sqs.getQueueUrl({
queueArn: 'arn:aws:sqs:us-west-2:123456:MyQueue'
})
maybeUrl.map(({ queueUrl }) => console.log(queueUrl))SNS
import { sns } from '@othree.io/awsome'
import { SNSClient } from '@aws-sdk/client-sns'
const client = new SNSClient({})
const configuration = {
TopicArn: 'arn:aws:sns:us-west-2:123456:OrderEvents',
BatchSize: 10,
Parallelism: 5
}
// Publish a single message
await sns.send({ client, configuration })({ orderId: '123' })
// Publish with attributes
await sns.send({
client,
configuration,
withMessageGroupId: (order) => order.customerId,
withAttributes: (order) => ({
eventType: { DataType: 'String', StringValue: 'OrderCreated' }
})
})({ orderId: '123', customerId: 'cust-1' })
// Publish a batch with parallelism control
import { fragment } from '@othree.io/awsome/lib/utils'
import { mapLimit } from 'async'
await sns.sendAll({
client,
configuration,
fragment,
mapLimit,
withId: (order) => order.orderId
})(orders)
// Subscribe an SQS queue to the topic
await sns.subscribeSqs({
snsClient: new SNSClient({}),
sqsClient: new SQSClient({}),
configuration
})({
queueArn: 'arn:aws:sqs:us-west-2:123456:OrderQueue',
queueUrl: 'https://sqs.../OrderQueue'
})Lambda
import { lambda } from '@othree.io/awsome'
import { LambdaClient } from '@aws-sdk/client-lambda'
const deps = {
lambda: new LambdaClient({}),
functionName: 'ProcessOrder'
}
// Direct invocation
const result = await lambda.invoke<OrderInput, OrderOutput>(deps)({
orderId: '123'
})
// Envelope patterns for CQRS
const queryResult = await lambda.query<GetOrderQuery, Order>(deps)({
orderId: '123'
})
const commandResult = await lambda.command<CreateOrderCmd, Order>(deps)({
customerId: 'cust-1',
items: [{ sku: 'ABC', qty: 2 }]
})
const requestResult = await lambda.request<OrderRequest, OrderResponse>(deps)({
action: 'process',
orderId: '123'
})
// Unwrap Optional into a discriminated union result
const unwrappedQuery = lambda.unwrap(lambda.query<GetOrderQuery, Order>(deps))
const result = await unwrappedQuery({ orderId: '123' })
if (result.success) {
console.log(result.result) // Order
} else {
console.log(result.error.errorType, result.error.errorMessage)
}InvocationResult<T> payload handling
When the invoked Lambda returns a serialized InvocationResult<T> as its payload (e.g. from a downstream unwrap call), invoke automatically detects and unwraps it — extracting the result on success or converting the error into an InvocationError:
// Downstream Lambda returns: { success: true, result: { orderId: '123' } }
const maybeOrder = await lambda.invoke<Input, Order>(deps)(payload)
maybeOrder.map(order => console.log(order.orderId)) // '123'
// Downstream Lambda returns: { success: false, error: { errorType: 'NotFound', errorMessage: '...' } }
const maybeOrder = await lambda.invoke<Input, Order>(deps)(payload)
maybeOrder.isEmpty // true
maybeOrder.getError() // InvocationError { errorType: 'NotFound' }S3
import { s3 } from '@othree.io/awsome'
import { S3Client } from '@aws-sdk/client-s3'
const deps = {
s3Client: new S3Client({}),
configuration: { bucket: 'my-bucket' }
}
// Upload
await s3.put(deps)('reports/q1.pdf', {
body: buffer,
contentType: 'application/pdf',
contentLength: buffer.length
})
// Download
const file = await s3.get(deps)('reports/q1.pdf')
file.map(obj => console.log(obj.contentType, obj.contentLength))
// List objects
const objects = await s3.list(deps)('reports/')
objects.map(items => items.forEach(o => console.log(o.key, o.size)))
// Get public URL
const url = await s3.getPublicUrl(deps)('reports/q1.pdf')
// Delete
await s3.deleteObject(deps)('reports/q1.pdf')Secrets Manager
import { secretsManager } from '@othree.io/awsome'
import { SecretsManagerClient } from '@aws-sdk/client-secrets-manager'
const deps = { secretsManagerClient: new SecretsManagerClient({}) }
// Read a secret
const secret = await secretsManager.get(deps)('my-app/db-credentials')
secret.map(value => JSON.parse(value))
// Write a secret
await secretsManager.put(deps)('my-app/db-credentials', {
username: 'admin',
password: 'hunter2'
})KMS
import { kms } from '@othree.io/awsome'
import { KMSClient, SigningAlgorithmSpec } from '@aws-sdk/client-kms'
const kmsClient = new KMSClient({})
// Sign a message
const signature = await kms.sign({
kmsClient,
configuration: {
KeyId: 'arn:aws:kms:us-west-2:123456:key/abc',
SigningAlgorithm: SigningAlgorithmSpec.RSASSA_PKCS1_V1_5_SHA_256
}
})('message to sign')
// Get a public key
const publicKey = await kms.getPublicKey({
kmsClient,
keyId: 'arn:aws:kms:us-west-2:123456:key/abc'
})()CloudWatch
import { cloudwatch } from '@othree.io/awsome'
import { CloudWatchClient } from '@aws-sdk/client-cloudwatch'
const cloudwatchClient = new CloudWatchClient({})
// Wrap a function with invocation counting
const trackedFn = cloudwatch.countFunctionInvocation({
cloudwatchClient,
metric: { namespace: 'MyApp', name: 'Invocations' },
fn: (x: number) => x * 2,
functionName: 'doubler'
})
trackedFn(5) // emits metric, returns 10
// Increment a counter
await cloudwatch.incrementFunctionInvocationCount({
cloudwatchClient,
metric: { namespace: 'MyApp', name: 'ProcessedOrders' },
functionName: 'orderProcessor'
})(10)
// Track HTTP response status codes
const trackedFetch = cloudwatch.countFetchStatus({
cloudwatchClient,
fetch: globalThis.fetch,
metricConditions: [
{ namespace: 'MyApp', name: '2XX', metricApplies: (r) => r.status >= 200 && r.status < 300 },
{ namespace: 'MyApp', name: '5XX', metricApplies: (r) => r.status >= 500 }
],
functionName: 'externalApi'
})
const response = await trackedFetch('https://api.example.com/data')SES
import { ses } from '@othree.io/awsome'
import { SESClient } from '@aws-sdk/client-ses'
const messageId = await ses.sendMimeEmail({
sesClient: new SESClient({})
})(mimeMessageBuffer)API Gateway
import { apigateway } from '@othree.io/awsome'
import { APIGatewayClient } from '@aws-sdk/client-api-gateway'
const keys = await apigateway.getAllApiKeysTags({
client: new APIGatewayClient({})
})()
keys.map(items => items.forEach(k => console.log(k.keyId, k.tags)))Batch
import { batch } from '@othree.io/awsome'
import { BatchClient } from '@aws-sdk/client-batch'
const deps = { client: new BatchClient({}) }
const job = await batch.submitJob(deps)({
jobArn: 'my-job',
jobDefinitionArn: 'arn:aws:batch:...:job-definition/my-def:1',
jobQueueArn: 'arn:aws:batch:...:job-queue/my-queue'
})
const status = await batch.getJobStatus(deps)({ jobId: 'job-123' })
// 'SUBMITTED' | 'PENDING' | 'RUNNABLE' | 'STARTING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED'Architecture
Dependency injection via deps
Every function takes its dependencies as the first curried parameter. This makes functions fully testable without mocking module imports:
// Production
const save = dynamo.upsert({
dynamoDb: new DynamoDBClient({}),
configuration: { TableName: 'Users' }
})
// Test
const save = dynamo.upsert({
dynamoDb: { send: vi.fn().mockResolvedValue({}) } as any,
configuration: { TableName: 'Users' }
})Error handling with Optional
All functions return Optional<T> from @othree.io/optional. No exceptions are thrown — errors are captured inside the Optional:
const result = await dynamo.query(deps)({ id: '123' })
if (result.isPresent) {
const items = result.get()
}
if (result.isEmpty) {
const error = result.getError()
}
// Or use functional chaining
result
.map(items => items.filter(i => i.active))
.orElse([])Peer Dependencies
Only install the AWS SDK clients you actually use. All are declared as peer dependencies with ^3.787.0+:
| Package | Required for |
|---------|-------------|
| @aws-sdk/client-dynamodb | dynamo |
| @aws-sdk/util-dynamodb | dynamo |
| @aws-sdk/client-s3 | s3 |
| @aws-sdk/client-sqs | sqs, sns.subscribeSqs |
| @aws-sdk/client-sns | sns |
| @aws-sdk/client-lambda | lambda |
| @aws-sdk/client-secrets-manager | secretsManager |
| @aws-sdk/client-ses | ses |
| @aws-sdk/client-kms | kms |
| @aws-sdk/client-cloudwatch | cloudwatch |
| @aws-sdk/client-api-gateway | apigateway |
| @aws-sdk/client-batch | batch |
| @othree.io/optional | All modules |
Development
npm install
npm test # run tests
npx vitest run --coverage # run tests with coverage
npm run build # compile to lib/cjs/ and lib/esm/Commit message format
This project uses semantic-release with Angular Commit Message Conventions:
| Commit message | Release type |
|----------------|-------------|
| fix(sqs): handle empty batch response | Patch |
| feat(dynamo): add transactWrite support | Minor |
| feat(lambda): change invoke signatureBREAKING CHANGE: deps object is now required | Major |
License
ISC
