npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rxjs-dynamodb-client

v2.0.4

Published

Lightweight dynamoDb client built on top of RxJS

Downloads

27

Readme

DynamoDb client built with lodash and rxjs!

Hi, this client is built on top of RxJS Observables (a reactive programming library https://rxjs.dev), so, would be good to have a small portion of knowledege about that to enjoy all the features which this lib provide, specially the operators. I'will explain a bit when needed.

To create an instance, is simple, just pass a dynamodb client to lib's constructor:

	import AWS from 'aws-sdk';
	import {
		DynamoDB
	} from 'rxjs-dynamodb-client';

	AWS.config.update({
		accessKeyId: 'yourAccessKeyId',
		secretAccessKey: 'yourSecretAccessKey',
		region: 'us-east-1'
	});

	const dynamoDb = new DynamoDB({
		client: new AWS.DynamoDB()
	});

Methods

.table(tableName: string, tableSchema: object)

  • Starts an operation chain

  • Returns a request instance which allow chain more operations

      dynamoDb.table(tableName: string, tableSchema: {
      	primaryKeys: {
      		partition: string [required];
      		sort: string [optional];
      	}
      	indexes: {
      		someLocalIndexName: {
      			partition: string [required];
      			sort: string [required];
      		},
      		someGlobalIndexName: {
      			partition: string [required];
      			sort: string [required];
      		}
      	}
      });

.addPlaceholderName(value: string | Array | object) .addPlaceholderValue(value: string | Array | object)

  • Chain to create placeholders to be used in conjunction of expressions

  • Returns a request instance which allow chain more operations

      dynamoDb.table({...})
      	.addPlaceholderName({
      		a: 'field1'	
      	})
      	.addPlaceholderValue({
      		b: 'value'	
      	})
      	.filter(`begins_with(#a, :b)`) // any dynamodb expression http://docs.aws.amazon.com/amazondynamodb/latest/
      	.query({...});
    
      dynamoDb.table({...})
      	.addPlaceholderName('field1')
      	.addPlaceholderValue('value')
      	.filter(`begins_with(#field1, :value)`) // any dynamodb expression http://docs.aws.amazon.com/amazondynamodb/latest/
      	.query({...});
    
      dynamoDb.table({...})
      	.addPlaceholderName(['field1', 'field2'])
      	.addPlaceholderValue(['value1', 'value2'])
      	.filter(`begins_with(#field1, :field1) OR #field2 = :value2`) // any dynamodb expression http://docs.aws.amazon.com/amazondynamodb/latest/
      	.query({...});

.describe()

  • Describe existent table

  • Returns an observable carrying the data about table if exists, throws if not, which can be used to create a table if not exists, se sample below.

      dynamoDb.table({...})
      	.describe({...})
      	.pipe(
      		catch(() => createTable)
      	)
      	.subscribe(next, err, complete);

Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.consumedCapacity(value: string)

  • Chain to inform request to returns consumedCapacity in their results

  • Returns a request instance which allow chain more operations

      import {
      	ConsumedCapacity
      } from 'rxjs-dynamodb-client';
    
      dynamoDb.table({...})
      	.consumedCapacity(ConsumedCapacity.NONE | ConsumedCapacity.TOTAL | ConsumedCapacity.INDEXES)
      	.query({...});

.select(values: any)

  • Chain to inform requests what to returns when querying or getting an entity

  • Returns a request instance which allow chain more operations

      import {
      	Select
      } from 'rxjs-dynamodb-client';
    
      dynamoDb.table({...})
      	.select('name, age, ...')
      	.query({...});
    
      dynamoDb.table({...})
      	.select(Select.ALL_ATTRIBUTES | Select.ALL_PROJECTED_ATTRIBUTES | Select.SPECIFIC_ATTRIBUTES | Select.COUNT)
      	.query({...});

.consistent()

  • Chain to inform requests to perform a consistent read

  • Returns a request instance which allow chain more operations

      dynamoDb.table({...})
      	.consistent()
      	.query({...});

.query(args: object | string) .queryScan(args: object | string)

  • Query table using primaryKeys or Indexes, or combined.

  • Returns an observable carrying the data, which can be used with any RxJS operators, like, map, filter, reduce, debounceTime, ...

      dynamoDb.table({...})
      	.index(...)
      	.select(...)
      	.limit(10)
      	.query({
      		[partitionAttr]: string;
      		[sortAttr]: string;
      		|
      		[partitionAttr]: string;
      		[localIndexAttr]: string;
      		|
      		[globalIndexartitionAttr]: string;
      		[globalIndexortAttr]: string;
      	})
      	.pipe(
      		filter(filterFn), // rxjs sample operators
      		map(mapFn), // rxjs sample operators
      		toArray() // Other way it will emit values by streaming, good if you are using real time responses, like webSocket [=.
      	)
      	.subscribe(next, err, complete);
    
      const request = dynamoDb.table({...});
    
      request.select(...)
      	.limit(10)
      	.addPlaceholderName({
      		partition: 'partitionAttr',
      		sort: 'sortAttr'
      	})
      	.addPlaceholderValue({
      		partition: 'partitionValue',	
      		sort: 'sortValue'	
      	})
      	.query(`#partition = :partition AND begins_with(#sort, :sort)`)
      	.pipe(
      		toArray(), // Other way it will emit values by streaming, good if you are using real time responses, like webSocket [=.
      		map(items => {
      			return {
      				items,
      				stats: request.queryStats // at the end, you can get queryStats which gives you before, after, count, consumedCapacity, scannedCount and iteractions
      			}
      		})
      	)
      	.subscribe(next, err, complete);
    
      	// this response will be
      	//
      	// {
      	//		after: object;
      	//		before: object;
      	//		consumedCapacity: number;
      	//		count: number;
      	//		items: Array<item>;
      	//		iteractions: number;
      	//		scannedCount: number;
      	// }

Note: DynamoDb just fetch max 1MB, this lib handles this and perform many requests as needed to fetch all data. So, always look for use .limit(value: number) when querying.

.get(item: object)

  • Get a value table using primaryKeys or Indexes, or combined.

  • Returns an observable carrying the data, which can be used with any RxJS operators, like, map, filter, reduce, debounceTime, ...

      dynamoDb.table({...})
      	.select(...)
      	.get({
      		[partitionAttr]: string; // required
      		[sortAttr]: string;	 // required
      	})
      	.pipe(
      		filter(filterFn), // rxjs sample operators
      		map(mapFn) // rxjs sample operators
      	)
      	.subscribe(next, err, complete);

Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.index(indexName: string)

  • Chain to inform requests that next request will be indexed by a previously created index;

  • Returns a request instance which allow chain more operations

      dynamoDb.table({...})
      	.index('someLocalIndexName')
      	.query({...})
      	.subscribe(next, err, complete);

.desc()

  • Chain to inform requests that next request will be made in descendent order.

  • Returns a request instance which allow chain more operations

      dynamoDb.table({...})
      	.desc()
      	.query({...})
      	.subscribe(next, err, complete);

.limit()

  • Chain to inform requests that next request will have a limit.

  • Returns a request instance which allow chain more operations

      dynamoDb.table({...})
      	.limit(100)
      	.query({...})
      	.subscribe(next, err, complete);

.filter(expression: string, append: boolean = false, condition = 'AND')

  • Chain to create a filterExpression, or append one, it shoud be used with .addPlaceholderName() and .addPlaceholderValue().

  • Returns a request instance which allow chain more operations

      dynamoDb.table({...})
      	.addPlaceholderName({
      		field1: 'field1'	
      	})
      	.addPlaceholderValue({
      		field1: 'field1 value'	
      	})
      	.filter(`begins_with(#field1, :field1)`) // any dynamodb expression http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.SpecifyingConditions.html#Expressions.SpecifyingConditions.ConditionExpressions
      	.query({
      		[partitionAttr]: string; // required
      		[sortAttr]: string; // required
      		...
      	})
      	.subscribe(next, err, complete);
    
      	// you can append a filter, and sepecify the condition
    
      	dynamoDb.table({...})
      		.addPlaceholderName({
      			field1: 'field1'	
      		})
      		.addPlaceholderValue({
      			field1: 'field1 value'	
      		})
      		.filter(`begins_with(#field1, :field1)`)
      		.addPlaceholderName('field1') // placeholder can be overlapped without problems, the lib omit duplicates
      		.addPlaceholderValue('value2')
      		.filter(`begins_with(#field2, :value2)`, true, 'OR');
    
      	// the final expression will be:
      	// begins_with(#field1, :field1) OR begins_with(#field2, :value2)

.resume(keys: object)

  • Chain to inform requests that next request be resumed according to keys' arguments.

  • Returns a request instance which allow chain more operations

      dynamoDb.table({...})
      	.resume({
      		[partitionAttr]: string;
      		[sortAttr]: string;
      		|
      		[partitionAttr]: string;
      		[sortAttr]: string;
      		[localIndexAttr]: string;
      		|
      		[globalIndexartitionAttr]: string;
      		[globalIndexortAttr]: string;	
      	})
      	.query({...})
      	.subscribe(next, err, complete);

.insert(item: object, replace: boolean = false, overrideTimestamp: boolean = false)

  • Insert, how the name says, just inserts, not updates nor replaces an entity.

  • Returns an observable carrying the data just added plus createdAt and updatedAt attributes with the same value, which can be used with any RxJS operators, like, map, filter, reduce, debounceTime, ...

      dynamoDb.table({...})
      	.insert({...})
      	.subscribe(next, err, complete);

Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.insertOrReplace(item: object)

  • Insert or replace, how the name says, just inserts or replaces, not update an entity.

  • Returns an observable carrying the data just added plus createdAt and updatedAt attributes with the same value, which can be used with any RxJS operators, like, map, filter, reduce, debounceTime, ...

      dynamoDb.table({...})
      	.insertOrReplace({...})
      	.subscribe(next, err, complete);

Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.insertOrUpdate(item: object, where: object)

  • Insert or replace, how the name says, just inserts or updates, not replaces an entity.

  • Returns an observable carrying the data just added plus createdAt and updatedAt attributes with the same value if inserted, and updatedAt changed if it was updated, which can be used with any RxJS operators, like, map, filter, reduce, debounceTime, ...

      dynamoDb.table({...})
      	.insertOrUpdate({...})
      	.subscribe(next, err, complete);

Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.update(item: object, where: object, insert: boolean = false)

  • Just updated a previously inserted value, not inserts nor replaces.

  • Returns an observable carrying the data defined by .return().

      import {
      	ReturnValues
      } from 'rxjs-dynamodb-client';
    
      dynamoDb.table({...})
      	.return(ReturnValues.NONE | ReturnValues.ALL_OLD | ReturnValues.UPDATED_OLD | ReturnValues.ALL_NEW | ReturnValues.UPDATED_NEW)
      	.update({
      		[partitionAttr]: string; // required
      		[sortAttr]: string; // required
      		...
      	})
      	.subscribe(next, err, complete);

Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.delete(item: object)

  • Just delete a previously inserted value.

  • Returns an observable carrying the data defined by .return().

      import {
      	ReturnValues
      } from 'rxjs-dynamodb-client';
    
      dynamoDb.table({...})
      	.return(ReturnValues.NONE | ReturnValues.ALL_OLD)
      	.delete({
      		[partitionAttr]: string; // required
      		[sortAttr]: string; // required
      	})
      	.subscribe(next, err, complete);

Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.batchGet(items: Array) .batchGetScan(items: Array, scan: () => Obervable)

  • Returns an observable carrying a stream of gotten values

      dynamoDb.table({...})
      	.batchGet([{
      		[partitionAttr]: string; // required
      		[sortAttr]: string; // required
      	}])
      	.pipe(
      		filter(filterFn), // rxjs sample operators
      		map(mapFn), // rxjs sample operators
      		toArray() // Other way it will emit values by streaming, good if you are using real time responses, like webSocket [=.
      	)
      	.subscribe(next, err, complete);

Note: Dynamo gets just max 100 entities, but this lib handle this and perform many requests as needed to get all data. Note: If you are using Promises, you can easily tranform Observables into Promises calling .toPromise() instead of .subscribe(), but we really advice you to learn RxJS, its amazing powerful.

.batchWrite(toDelete: Array | null, toInsert: Array | null)

  • Returns an observable carrying an array of unprocessed items

      dynamoDb.table({...})
      	.batchWrite(toDelete => [{
      		[partitionAttr]: string; // required
      		[sortAttr]: string; // required
      	}], toInsert => [{
      		[partitionAttr]: string; // required
      		[sortAttr]: string; // required,
      		...
      	}])
      	.subscribe(next, err, complete);

Note: Dynamo handles 25 write operations max, but this lib handle this and perform many requests as needed to perform all operations.

Low level requests

If you need to perform low level requests, like create a table, you can do it using .routeCall(opertaionName: string, args: object). This sample creates a table if it not exists, and insert 10 items. We are using RxJS operators, if you have doubts about that, you can learn at https://rxjs.dev.

		request.describe() // verify if table exists
			.pipe(
				catch(() => request.routeCall('createTable', { //if not, create a table
					TableName: 'tblSpec',
					ProvisionedThroughput: {
						ReadCapacityUnits: 1,
						WriteCapacityUnits: 1
					},
					AttributeDefinitions: [{
						AttributeName: 'namespace',
						AttributeType: 'S'
					}, {
						AttributeName: 'key',
						AttributeType: 'S'
					}, {
						AttributeName: 'localIndexedAttr',
						AttributeType: 'S'
					}, {
						AttributeName: 'globalIndexedPartitionAttr',
						AttributeType: 'S'
					}, {
						AttributeName: 'globalIndexedSortAttr',
						AttributeType: 'S'
					}],
					KeySchema: [{
						AttributeName: 'namespace',
						KeyType: 'HASH'
					}, {
						AttributeName: 'key',
						KeyType: 'RANGE'
					}],
					LocalSecondaryIndexes: [{
						IndexName: 'localIndexedSpec',
						KeySchema: [{
							AttributeName: 'namespace',
							KeyType: 'HASH'
						}, {
							AttributeName: 'localIndexedAttr',
							KeyType: 'RANGE'
						}],
						Projection: {
							ProjectionType: 'ALL'
						}
					}],
					GlobalSecondaryIndexes: [{
						IndexName: 'globalIndexedSpec',
						KeySchema: [{
							AttributeName: 'globalIndexedPartitionAttr',
							KeyType: 'HASH'
						}, {
							AttributeName: 'globalIndexedSortAttr',
							KeyType: 'RANGE'
						}],
						Projection: {
							ProjectionType: 'ALL'
						},
						ProvisionedThroughput: {
							ReadCapacityUnits: 1,
							WriteCapacityUnits: 1
						}
					}]
				})),
				mergeMap(() => {
					return range(0, 10) // insert 10 items to the table
						.pipe(
							mergeMap(n => request.insert({
								namespace,
								key: `key-${n}`,
								message: `message-${n}`,
								localIndexedAttr: `local-indexed-${n}`,
								globalIndexedPartitionAttr: `global-indexed-${namespace}`,
								globalIndexedSortAttr: `global-indexed-${n}`,
							}, true))
				})
			)
			.subscribe(null, null, done);

CRUD

This lib follows with a Crud class helper, at this way you can extend your models with Crud. In order to start with it:

	import AWS from 'aws-sdk';
	import {
		Crud,
		DynamoDB
	} from 'rxjs-dynamodb-client';

	AWS.config.update({
		accessKeyId: 'yourAccessKeyId',
		secretAccessKey: 'yourSecretAccessKey',
		region: 'us-east-1'
	});
	
	// instance the lib
	const dynamoDb = new DynamoDB({
		client: new AWS.DynamoDB()
	});

	const crud = new Crud(tableName: string, schema: object, {
		dynamoDb	
	});

Crud operations

	fetch(
		args: {
			[partition | globalIndexPartition]: string (required);
			[sort | localIndexAttribute | globalIndexSort]: string;
			after: base64<string>;
			before: base64<string>;
			consistent: boolean;
			desc: boolean;
			indexName: string;
			limit: number;
			prefix: boolean = true;
			resume: base64<string>;
			select: string (comma separated);
		}, 
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number) : Array<args>,
		itemSelector: function(Observable<object>): Observable<object>,
		reducer: function(Observable<object>): Observable<any>,
		) : Observable<{
			after: base64<string>;
			before: base64<string>;
			count: number;
			consumedCapacity: number;
			items: Array<object>;
			iterations: number;
			scannedCount: number;
		}>

	get(
		args: {
			[partition]: string (required);
			[sort]: string;
			consistent: boolean;
			select: string (comma separated);
		}, 
		hook: function(
			request: Request, 
			partition: string, 
			sort: string | number) : Array<args>) : Observable<object>

	multiGet(
		args: {
			items: Array<{
				[partition]: string (required);
				[sort]: string;
			}>
			select: string (comma separated);
		}, 
		hook: function(
			request: Request, 
			items: Array<object>) : Observable<object>

	insert(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: any;
		}, 
		hook: function(
			request: Request, 
			partition: string, 
			sort: string | number, 
			...args: any) : Array<args>) : Observable<object>

	insertOrReplace(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: any;
		}, 
		hook: function(
			request: Request, 
			partition: string, 
			sort: string | number, 
			...args: any) : Array<args>) : Observable<object>

	insertOrUpdate(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: any;
		},
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			partition: string, 
			sort: string | number, 
			...args: any) : Array<args>) : Observable<object>

	update(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: any;
		}, 
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			partition: string, 
			sort: string | number, 
			...args: any) : Array<args>) : Observable<object>

	updatePrimaryKeys(
		args: {
			[partition]: string (required);
			[sort]: string (required);
		},
		primaryKeys: {
			[partition]: string (optional);
			[sort]: string (optional);
		}
	): Observable<object>

	delete(
		args: {
			[partition]: string (required);
			[sort]: string (required);
		}, 
		hook: function(
			request: Request, 
			partition: string, 
			sort: string | number) : Array<args>) : Observable<object>

	prependToList(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: Array<any>
		},
		create: boolean = false,
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number, 
			attributes: Array<any>) : Array<args>) : Observable<object>

	appendToList(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: Array<any>
		},
		create: boolean = false,
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number, 
			attributes: Array<any>) : Array<args>) : Observable<object>

	removeFromList(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: number | Array<number>
		}, 
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number, 
			attributes: Array<number>) : Array<args>) : Observable<object>

	updateAtList(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: {[index: number]: value: any}
		} 
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number, 
			attributes: Array<{[index: number]: value: any}>) : Array<args>) : Observable<object>

	addToSet(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: Array<string> | string | Array<number> | number
		},
		create: boolean = false,
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number, 
			attributes: Array<string> | string | Array<number> | number) : Array<args>) : Observable<object>

	removeFromSet(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: Array<string> | string | Array<number> | number
		}, 
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number, 
			attributes: Array<string> | string | Array<number> | number) : Array<args>) : Observable<object>

	removeAttributes(
		args: {
			[partition]: string (required);
			[sort]: string (required);
			...args: any
		}, 
		returns: string = 'ALL_NEW',
		hook: function(
			request: Request, 
			expression: string, 
			partition: string, 
			sort: string | number, attributes: any) : Array<args>) : Observable<object>

	multiGet(
		items: Array<{
			[partition]: string (required);
			[sort]: string | number (required);
		}>
	) : Observable<Array<object>>

	clear(
		args: {
			[partition]: string (required);
			[sort]: string;
		}) : Observable<object>

Helpers

DynamoDB has two main types of http errors: retryable and non-retryable. The most common type of retryable error is a throughput exception. To handle this, you can chain an Observable helper to handle them:
	
	// most simple way, if err.retryable, will retry once, you can configure this behavior like samples below:
	request.insert()
		.onRetryableError();

	// shorthand for max retries, will obey err.retryDelay returned by AWS
	request.insert()
		.onRetryableError(5);
	
	// config retries with a callback function
	request.insert()
		.onRetryableError((err, index) => ({
			delay: (err.retryDelay * 1000) * index, // will retry after n seconds with index factor
			max: 10,
			retryable: index >= 4 ? false : err.retryable // will retry 5x before throw (index starts with 0)
		}));
	
	// or with a plain object
	request.insert()
		.onRetryableError({
			return {
				max: 10, // will retry 10x before throw
				delay: 1000 // will retry after 1 second
			};
		});