roadiejs-import
v0.0.8
Published
RoadieJS plug-in providing bulk data import functionality
Downloads
16
Readme
roadiejs-import
A plugin for RoadieJS
A configurable ETL pipeline, based on Node.js streams.
Contents
API
createImportStream
Registers a new import stream, and goes on to start importing.
Request
POST /streams
{
"namespace": "roadietest",
"blueprintName": "importPlanets",
"blueprintVersion": 1,
"localVersion": 0,
"importStream": "planetCsv",
"missingAction": "error",
"source": {
"type": "file",
"options": {
"paths": "./test/planets/import_files/advanced.csv"
}
}
}| Name | Notes
| ---- | ----
| namespace | Namespace of the blueprint which contains the import element.
| blueprintName | Name of the blueprint which contains the import element.
| blueprintVersion | Version number of the blueprint which contains the import element.
| localVersion | Local version number of the blueprint which contains the import element.
| importStream | The id of an importStream element, that's defined in the identified blueprint.
| missingAction | Controls the behaviour if trying to update/delete a document that doesn't exist. Valid values are warning, error.
| source | An object to configure the source of the import.
Sources
Data can be streamed from multiple sources. The source object is therefore mandatory, and has two common keys:
| Name | Notes
| ---- | ----
| type | The type of import (e.g. file).
| options | An object containing config specific to the type of import (see below).
file
Imports data from files stored on the local file system.
"source": {
"type": "file",
"options": {
"paths": "./test/school/import_files/full_school_dump.csv"
}
}| Name | Notes
| ---- | ----
| paths | Defines which files should be loaded. Supports paths to single files, * wildcards, glob-style ** (for directory recursion) and arrays of multiple strings.
Response
Status 201
{
"_id": "557c3834f0f1c14e25220e8b",
"_created": "2015-06-13T14:03:32.571Z",
"namespace": "roadietest",
"blueprintName": "importPlanets",
"blueprintVersion": 1,
"blueprintLocalVersion": 0,
"importStream": "planetCsv",
"totalSize": 1147,
"status": "starting",
"processedSize": 0,
"count": 0,
"warnings": 0,
"failures": 0,
"notDealtWith": 0
}| Name | Notes
| ---- | ----
| _id | The unique database-generated id for the import process.
| _created | A timestamp of when the import was registered.
| namespace | Namespace of the blueprint, as supplied in the request.
| blueprintName | Name of the blueprint, as supplied in the request.
| blueprintVersion | Version of the blueprint, as supplied in the request.
| blueprintLocalVersion | Local version of the blueprint, as supplied in the request.
| importStream | The id of an importStream element, as supplied in the request.
| totalSize | The number of units the import is estimated to be. Most likely number of bytes.
| status | Current status of the import (expect starting).
| processedSize | How many units have been imported so far (expect 0 at this point).
| count | Total count of documents which have been processed (regardless of whether they succeeded or failed).
| warnings | Number of documents, within the overall count, that have raised a warning.
| failures | Number of documents, within the overall count, that have failed.
| notDealtWith | Number of documents, within the overall count, that did not match any record pattern.
getImportStreamStatus
Get the latest status of a flow.
Request
GET /streams/:id
| Name | Notes
| ---- | ----
| id | The id that uniquely identifies an import (e.g. the _id value returned from createImportStream).
Response
Status 200
{
"_id": "557c6f1487a62fff374fa2ed",
"_created": "2015-06-13T17:57:40.707Z",
"namespace": "roadietest",
"blueprintName": "importSchools",
"blueprintVersion": 1,
"blueprintLocalVersion": 0,
"importStream": "studentCsv",
"totalSize": 364,
"status": "succeeded",
"finished": "2015-06-13T17:57:40.820Z",
"processedSize": 364,
"count": 6,
"warnings": 0,
"failures": 0,
}| Name | Notes
| ---- | ----
| _id | The unique database-generated id for the import process (e.g. the id provided as a parameter as part of the request).
| _created | A timestamp of when the import was registered.
| namespace | Namespace of the blueprint, as supplied in the request.
| blueprintName | Name of the blueprint, as supplied in the request.
| blueprintVersion | Version of the blueprint, as supplied in the request.
| blueprintLocalVersion | Local version of the blueprint, as supplied in the request.
| importStream | The id of an importStream element, as supplied in the request.
| totalSize | The number of units the import is estimated to be. Most likely number of bytes.
| status | Current status of the import, valid values are starting, succeeding, warning, failing, warned, failed, succeeded.
| finished | Timestamp of when the flow finished (not present if it's still running).
| processedSize | How many units have been imported so far
| count | Total count of documents which have been processed (regardless of whether they succeeded or failed).
| warnings | Number of documents, within the overall count, that have raised a warning.
| failures | Number of documents, within the overall count, that have failed.
| notDealtWith | Number of documents, within the overall count, that did not match any record pattern.
getImportStreamMessages
Returns an array of messages that have been generated by the specified flow (ordered-by creation timestamp ascending).
Request
GET /streams/:id/messages
| Name | Notes
| ---- | ----
| id | The id that uniquely identifies an import (e.g. the _id value returned from createImportStream).
Response
Status 200
[
{ "_id": "557c7680f09749d93b88619f",
"transactionId": "557c7680f09749d93b88619a",
"schemaName": "students",
"namespace": "roadietest",
"blueprintName": "importSchools",
"blueprintVersion": 1,
"blueprintLocalVersion": 0,
"type": "warning",
"name": "noDoc",
"message": "Unable to find document"
}
]| Name | Notes
| ---- | ----
| _id | A unique value to identify the message.
| transactionId | The unique database-generated id for the import process (e.g. the id provided as a parameter as part of the request).
| schemaName | The id of a schema related to the message.
| namespace | Namespace of the blueprint responsible for the import.
| blueprintName | Name of the blueprint responsible for the import.
| blueprintVersion | Version of the blueprint responsible for the import.
| blueprintLocalVersion | Local version of the blueprint responsible for the import.
| type | Type of message: a value from info, warning, error or exception.
| name | Name (e.g. code) of the message.
| message | Short message content
| body | Data to support the message (content specific to the type/name of message)
Elements
importStream
Registers a new import (e.g. a way of importing data into schemas within the blueprint).
Example
{
"id": "planetCsv",
"element": "importStream",
"config": {
"parser": {
"type": "csv",
"options": {
"delimiter": ",",
"qualifier": "\\"
}
},
"target": {
"type": "data"
}
}
}Config
| Name | Type | Notes
| ----------- | -------| -----------
| parser | object | An object that should contain a type string (e.g. csv) for identifying a parser, and an options object for configuring the parser.
| target| object | An object that configures a supported target for the import. The object must include a type value to identify a target.
Parsers
A parser takes the raw data stream from a source (configured via createImportStream) and turns it into a usable object for passing onto an adaptor.
csv
The csv parser expects a source that can provide individual chunks of data (typically a line from a file).
- Internally, parsing is handled via the csv-parse package.
- The
optionsdefined for the parser are passed through to acsv-parseparser. More information here.
Adaptors
An adaptor takes the output of a parser and maps it to fields in a schema.
- There's no need to explicitly define an adaptor.
- If an adaptor hasn't been defined, then an adaptor with the same name as the parser is used.
- The behaviour of an adaptor depends on its type.
csv
The csv adaptor expects one or more csvRecord elements to be defined as a child element of the importStream element.
Targets
A target is the final destination in the import pipeline, and does something with the output of an adaptor.
data
- Hooks into
roadiejs-dataso the object produced out of the adaptor can persisted.
console
- Outputs the object produced out of the adaptor to the console.
csvRecord
If an importStream element has a parser of type csv, then one or more csvRecord child elements should be defined for it.
- The purpose of a
csvRecordis to transform the output of acsvparser to a schema/field structure. - Multiple
csvRecordelements can be configured under animportStreamelement - as it's possible to 'identify' a suitable schema from the available csv columns. - A special
csvarray will be accessible when evaluating expressions, this reflects the parsed columns from the underlying CSV data.
Example
{
"id": "craterRecord",
"element": "csvRecord",
"parent": "importStream.planetCsv",
"config": {
"schemaId": "planets",
"recordIdentification": "csv[0]=='crater'",
"actionIdentification": {
"post": "csv[1]=='I'",
"put": "csv[1]=='U'",
"upsert": "csv[1]=='M'",
"del": "csv[1]=='D'"
},
"paramMap": [
"csv[5]",
"moons",
"csv[6]",
"craters",
"csv[2]"
],
"data": {
"title": "csv[3]",
"diameter": "csv[4]"
}
}
}Config
| Name | Type | Notes
| ------------ | -------- | -----------
| schemaId | string | The id of a schema defined within the blueprint that the CSV data will be ultimately persisted..
| recordIdentification| string | Optional. An expression. If it evaluates to true then the config of this csvRecord element will be used to transform the CSV data into a field structure.
| actionIdentification| object | Optional. Maps an action (e.g. post, put, upsert or del) to an expression. If it evaluates to true then that action will be used to persist/delete the transformed data.
| paramMap | [String] | Optional. An array of strings. Maps parameters (starting at docId) of a /data route to the contents of the CSV record. It is therefore possible target sub-docs.
| data | object | Maps a field name to an expression. The result of the expression will then be used as the value for that field.
populate
A simple way to populate with data - useful for supplying reference/lookup data from within a blueprint definition.
- Ensure a
populateelement is a child of the aschemaelement you wish to populate. - Schemas will only ever be populated once, and will not be re-asserted every time the blueprint is used
Example
{
"id": "statesPopulator",
"element": "populate",
"parent": "schema.states",
"config": {
"map": [
"name",
"abbreviation",
"capitalCity",
"mostPopulatedCity",
"population",
"squareMiles"
],
"data": [
["ALABAMA", "AL", "Montgomery", "Birmingham", 4708708, 52423],
["ALASKA", "AK", "Juneau", "Anchorage", 698473, 656425],
["ARIZONA", "AZ", "Phoenix", "Phoenix", 6595778, 114006],
["ARKANSAS", "AR", "Little Rock", "Little Rock", 2889450, 53182],
["CALIFORNIA", "CA", "Sacramento", "Los Angeles", 36961664, 163707],
["COLORADO", "CO", "Denver", "Denver", 5024748, 104100],
["CONNECTICUT", "CT", "Hartford", "Bridgeport", 3518288, 5544]
]
}
}Config
| Name | Type | Notes
| ------------ | -------| -----------
| map | [string] | An array of strings, each a field name within the schema you wish to populate. The order is important...
| data| [array] | An array of arrays - mimicking a record/field structure. The values of each 'record' should be in the same order as defined in map.
