@argilzar/cli-plugin-export-parquet
v1.3.4
Published
Plugin to export data from the flowcore platform as parquet files
Maintainers
Readme
Flowcore CLI Plugin - Export Parquet
Plugin to export data from the flowcore platform as parquet files using DuckDB via the modern @duckdb/node-api package for efficient data processing and storage.
- Flowcore CLI Plugin - Export Parquet
- Export with default timestamped filename
- Export with custom filename (extension automatically added)
- Export with custom filename (extension already included)
- Using short flag
- Export with custom output directory
- Export with custom filename and output directory
- Using short flags
- Export last year's data
- Export specific date range
- Live streaming export
- Install dependencies
- Build the project
- Run tests
- Run linter
Overview
The Export Parquet plugin for Flowcore CLI allows you to stream data from Flowcore data cores and export it directly to Parquet files. It uses DuckDB via the modern @duckdb/node-api package for in-memory data processing, making it efficient for handling large volumes of streaming data.
Installation
npm install -g @argilzar/cli-plugin-export-parquetUsage
Basic Usage
# Export with default timestamped filename
flowcore export-parquet "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-liveCustom Filename with CLI Flag
# Export with custom filename (extension automatically added)
flowcore export-parquet "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-live --filename brian
# Export with custom filename (extension already included)
flowcore export-parquet "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-live --filename brian.parquet
# Using short flag
flowcore export-parquet "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-live -f brian
# Export with custom output directory
flowcore export-parquet "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-live --output-dir /path/to/exports
# Export with custom filename and output directory
flowcore export-parquet "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-live --filename brian --output-dir /path/to/exports
# Using short flags
flowcore export-parquet "https://flowcore.io/<org>/<Data Core>/*" -s 1y --no-live -f brian -o /path/to/exportsCustom Filename (Programmatic)
// Create service with custom filename
const exportService = new ExportParquetService(logger, "my-custom-export.parquet");Programmatic Usage
import { ExportParquetService } from "@flowcore/cli-plugin-export-parquet";
// With default filename (timestamped) and default output directory
const service = new ExportParquetService(logger);
// With custom filename (extension automatically added) and default output directory
const service = new ExportParquetService(logger, "my-export");
// With custom filename (extension already included) and default output directory
const service = new ExportParquetService(logger, "my-export.parquet");
// With custom filename and custom output directory
const service = new ExportParquetService(logger, "my-export", "/path/to/exports");
// With default filename and custom output directory
const service = new ExportParquetService(logger, undefined, "/path/to/exports");Note: The .parquet extension is automatically added if not provided. Both "my-export" and "my-export.parquet" will result in the same filename.
Commands
export-parquet <STREAM>
Export data from a Flowcore stream as parquet files.
Arguments:
STREAM- The stream URL in the formathttps://flowcore.io/<org>/<Data Core>/*
Flags:
-s, --start=<value>- Start time for the export (e.g., "1y", "2024-01-01")-e, --end=<value>- End time for the export (e.g., "2024-12-31")--live- Enable live streaming (continuous export)--no-live- Disable live streaming (one-time export)-f, --filename=<value>- Custom filename for the parquet export (.parquetextension automatically added)-o, --output-dir=<value>- Custom output directory for the parquet file (default:./exports)
Examples:
# Export last year's data
export-parquet "https://flowcore.io/myorg/mydatacore/*" -s 1y --no-live
# Export specific date range
export-parquet "https://flowcore.io/myorg/mydatacore/*" -s 2024-01-01 -e 2024-12-31 --no-live
# Live streaming export
export-parquet "https://flowcore.io/myorg/mydatacore/*" --liveFeatures
- Modern DuckDB Integration: Uses
@duckdb/node-apifor the latest DuckDB features and performance - Streaming Support: Handles both batch and live streaming data
- Intelligent Schema Detection: Automatically analyzes payload structures and creates appropriately typed columns
- Dynamic Schema Evolution: Automatically adds new columns as payload structures are discovered
- Native Data Type Preservation: Stores values in their native types (numbers, booleans, timestamps) instead of JSON strings
- Proper Timestamp Handling: Correctly handles ISO 8601 datetime strings without conversion errors
- Unix Timestamp Detection: Automatically detects Unix timestamps in numeric values and creates TIMESTAMP columns
- Automatic Unix Timestamp Conversion: Converts Unix timestamps to ISO 8601 strings before inserting into DuckDB
- Intelligent Type Memory: Remembers detected column types and only performs conversions when necessary
- Custom Filename Support: Allows specifying custom filenames for parquet exports
- Custom Output Directory: Allows specifying custom output directories for parquet files
- Timestamped Output: Generates timestamped parquet files
- Progress Tracking: Shows progress during export operations
- Error Handling: Robust error handling with detailed logging
How It Works
- Initialization: The service initializes an in-memory DuckDB database using
@duckdb/node-api - Streaming: As events arrive, they are processed and stored in DuckDB
- Dynamic Schema: The service automatically analyzes payload structures and creates new columns with appropriate data types
- Type Memory: Column types are stored in memory for efficient future reference
- Data Processing: Each event stores flowcore metadata in a single JSON field and spreads payload fields as individually typed columns
- Type Preservation: Values are stored in their native types (e.g., numbers as numbers, not quoted strings)
- Smart Timestamp Detection: Automatically detects both string-based and numeric Unix timestamps
- Conditional Conversion: Unix timestamps are only converted when the column type requires it
- Export: When the stream completes, data is exported to a timestamped parquet file
- Cleanup: Database connections are properly closed
Output
- Location: Files are saved to the
./exports/directory by default, or to a custom directory if specified - Naming: Files follow the pattern
events_YYYY-MM-DDTHH-MM-SS-sssZ.parquetby default - Custom Filenames: Can be customized by passing a filename parameter to the service constructor
- Custom Output Directories: Can be customized by passing an outputDir parameter to the service constructor
- Format: Standard Parquet format for optimal compression and query performance
- Structure: Each row contains:
flowcore(JSON): Complete SourceEvent metadata excluding payload:eventId: Unique event identifierdataCoreId: Data core IDflowType: Flow type nameeventType: Event type nametimeBucket: Time bucket for the eventmetadata: Event metadatavalidTime: Event validity timestamp
- Payload Fields (auto-discovered with intelligent typing):
- Numeric Fields:
BIGINTfor integers,DOUBLEfor decimals (stored as native numbers) - String Fields:
VARCHARfor text data (stored as native strings) - DateTime Fields:
TIMESTAMPfor date/time strings and Unix timestamps (stored as native timestamps) - Boolean Fields:
BOOLEANfor true/false values (stored as native booleans) - Complex Fields:
JSONfor objects and arrays (stored as JSON strings) - Unix Timestamp Detection: Automatically detects numeric Unix timestamps (10, 13, or 16 digits) and creates TIMESTAMP columns
- Unix Timestamp Conversion: Converts Unix timestamps to ISO 8601 strings before insertion to ensure proper DuckDB compatibility
- Type Memory System: Remembers detected column types to avoid unnecessary conversions on subsequent events
- Field names match exactly as they appear in the payload
- These fields are automatically created with appropriate types as they are discovered in the event stream
- No Double Quotes: Numeric, boolean, and timestamp values are stored without quotes, preserving their native types
- Numeric Fields:
Requirements
- Node.js >= 18.0.0
- Flowcore CLI with proper authentication
- Access to the target data core
Dependencies
- @duckdb/node-api: Modern Node.js API for DuckDB database operations
- @flowcore/cli-plugin-core: Core Flowcore CLI functionality
- @flowcore/cli-plugin-config: Configuration management
Development
# Install dependencies
yarn install
# Build the project
yarn build
# Run tests
yarn test
# Run linter
yarn lintArchitecture
The plugin consists of:
- ExportParquetService: Core service implementing the OutputService interface
- DuckDB Integration: In-memory database using
@duckdb/node-apifor data processing - Stream Processing: Handles the Flowcore event stream lifecycle
- Parquet Export: Converts processed data to parquet format
License
MIT
