@liquidmetal-ai/icet
v0.0.5
Published
Iceberg Library for Typescript
Maintainers
Keywords
Readme
@liquidmetal-ai/icet
A TypeScript implementation of Apache Iceberg for modern JavaScript runtimes. Works with Node.js and Cloudflare Workers.
Features
- 🧊 Full Iceberg Support - Tables, schemas, partitioning, snapshots, and time travel
- 🔄 REST Catalog API - Connect to any Iceberg REST catalog (including Cloudflare R2 Data Catalog)
- 🏹 Apache Arrow Integration - Native Arrow table support for high-performance data operations
- 📦 Parquet I/O - Read and write Parquet files with WASM support for edge runtimes
- 🔐 Credential Vending - Automatic credential management for S3-compatible storage
- 📊 Schema Evolution - Add columns, evolve types with full compatibility checks
- ⚡ Partition Pruning - Efficient query planning with automatic file skipping
Installation
npm install @liquidmetal-ai/icet
# or
pnpm add @liquidmetal-ai/icet
# or
yarn add @liquidmetal-ai/icetQuick Start
import { IceT } from '@liquidmetal-ai/icet';
// Create IceT instance with format version
const iceT = new IceT({ formatVersion: 2 });
// Define a schema
const schema = iceT.schema([
iceT.types.nestedField(1, 'id', iceT.types.StringType, true),
iceT.types.nestedField(2, 'timestamp', iceT.types.TimestampType, true),
iceT.types.nestedField(3, 'message', iceT.types.StringType, true),
iceT.types.nestedField(4, 'level', iceT.types.StringType, false),
]);
// Connect to a REST catalog
const catalog = iceT.catalog({
uri: 'https://your-catalog.example.com',
token: 'your-bearer-token',
warehouse: 'my-warehouse',
});
// Create a table with partitioning
const partitionSpec = iceT.partitionSpec(schema)
.day('timestamp', 'log_date')
.identity('level')
.build();
const table = await catalog.createTable(
{ namespace: ['analytics'], name: 'logs' },
schema,
partitionSpec
);
// Write data using Arrow tables
const data = iceT.arrow.buildTableFromRows(schema, [
{ id: '1', timestamp: new Date(), message: 'Hello', level: 'INFO' },
{ id: '2', timestamp: new Date(), message: 'World', level: 'DEBUG' },
]);
await table.append(data);API Reference
IceT Class
The main entry point that provides a unified API with automatic format version management.
import { IceT } from '@liquidmetal-ai/icet';
const iceT = new IceT({ formatVersion: 2 });Configuration
| Option | Type | Description |
|--------|------|-------------|
| formatVersion | 2 \| 3 | Iceberg format version. Version 2 is recommended for compatibility. |
Schema Definition
Create schemas using the fluent types API:
// Using nested fields
const schema = iceT.schema([
iceT.types.nestedField(1, 'id', iceT.types.StringType, true),
iceT.types.nestedField(2, 'count', iceT.types.LongType, true),
iceT.types.nestedField(3, 'price', iceT.types.DoubleType, false),
iceT.types.nestedField(4, 'tags', iceT.types.list(5, iceT.types.StringType), false),
]);
// Or using the schema builder
const schema = iceT.schemaBuilder()
.addRequiredField('id', iceT.types.StringType)
.addRequiredField('count', iceT.types.LongType)
.addOptionalField('price', iceT.types.DoubleType)
.build();Available Types
| Type | Description |
|------|-------------|
| BooleanType | Boolean values |
| IntegerType | 32-bit signed integer |
| LongType | 64-bit signed integer |
| FloatType | 32-bit IEEE 754 floating point |
| DoubleType | 64-bit IEEE 754 floating point |
| DateType | Calendar date without timezone |
| TimeType | Time of day without timezone |
| TimestampType | Timestamp without timezone |
| TimestamptzType | Timestamp with timezone |
| StringType | UTF-8 encoded string |
| UuidType | UUID value |
| BinaryType | Arbitrary binary data |
| VariantType | Semi-structured data (Iceberg v3) — supports any JS value (objects, arrays, primitives) with server-side queryability |
| decimal(precision, scale) | Fixed-precision decimal |
| fixed(length) | Fixed-length binary |
| list(elementId, elementType) | List/array of values |
| map(keyId, keyType, valueId, valueType) | Key-value map |
| struct(fields) | Nested structure |
Catalog Connection
Connect to an Iceberg REST catalog:
const catalog = iceT.catalog({
uri: 'https://catalog.example.com',
token: 'Bearer your-token', // Optional: Bearer token
credential: 'your-credential', // Optional: Alternative auth
warehouse: 'my-warehouse', // Optional: Warehouse ID
});Catalog Operations
// List namespaces
const namespaces = await catalog.listNamespaces();
// Create namespace
await catalog.createNamespace(
{ namespace: ['analytics'], properties: {} },
{ description: 'Analytics tables' }
);
// List tables
const tables = await catalog.listTables({ namespace: ['analytics'], properties: {} });
// Load existing table
const table = await catalog.loadTable({ namespace: ['analytics'], name: 'events' });
// Create table
const table = await catalog.createTable(
{ namespace: ['analytics'], name: 'events' },
schema,
partitionSpec,
sortOrder,
undefined, // location (optional)
{ 'write.format.default': 'parquet' } // properties
);
// Drop table
await catalog.dropTable({ namespace: ['analytics'], name: 'events' }, true);Partitioning
Define partition specs for efficient data organization:
const partitionSpec = iceT.partitionSpec(schema)
.identity('region') // Partition by exact value
.day('timestamp', 'day') // Partition by day
.month('timestamp', 'month') // Partition by month
.year('timestamp', 'year') // Partition by year
.hour('timestamp', 'hour') // Partition by hour
.bucket('id', 16, 'id_bucket') // Hash bucket
.truncate('name', 4, 'name_prefix') // Truncate string
.build();
// Or create unpartitioned tables
const unpartitioned = iceT.unpartitioned();Sort Order
Define sort order for data files:
const sortOrder = iceT.sortOrder(schema)
.asc('timestamp')
.desc('id')
.build();
// Or unsorted
const unsorted = iceT.unsorted();Table Definition
Bundle schema, partitioning, and options together:
const logsTable = iceT.defineTable({
name: 'logs',
schema: iceT.schema([
iceT.types.nestedField(1, 'id', iceT.types.StringType, true),
iceT.types.nestedField(2, 'timestamp', iceT.types.TimestampType, true),
iceT.types.nestedField(3, 'message', iceT.types.StringType, true),
]),
partitionBy: (schema) => iceT.partitionSpec(schema).day('timestamp').build(),
sortBy: (schema) => iceT.sortOrder(schema).asc('timestamp').build(),
properties: {
'write.format.default': 'parquet',
},
writeOptions: {
compression: 'zstd',
rowGroupSize: 10000,
},
});
// Use the definition
const table = await catalog.createTable(
logsTable.identifier(['analytics']),
logsTable.schema,
logsTable.partitionSpec,
logsTable.sortOrder,
undefined,
logsTable.properties
);Reading Data
Query tables with filtering and projection:
// Simple scan
const arrowTable = await table.scan().toArrowTable();
// With filter (partition pruning)
import { Expressions } from '@liquidmetal-ai/icet';
const filtered = await table.scan()
.filter(Expressions.equal('level', 'ERROR'))
.limit(100)
.toArrowTable();
// Complex filters
const results = await table.scan()
.filter(Expressions.and(
Expressions.greaterThanOrEqual('timestamp', new Date('2024-01-01')),
Expressions.equal('source', 'web')
))
.toArrowTable();
// Get scan plan (shows partition pruning)
const plan = await table.scan()
.filter(Expressions.equal('region', 'us-west'))
.planFiles();
console.log(`Files to scan: ${plan.totalFiles} (pruned: ${plan.prunedFiles})`);Writing Data
Append or overwrite data using Arrow tables:
import * as arrow from 'apache-arrow';
// Build Arrow table from rows
const arrowTable = iceT.arrow.buildTableFromRows(schema, [
{ id: '1', timestamp: new Date(), message: 'Event 1' },
{ id: '2', timestamp: new Date(), message: 'Event 2' },
]);
// Or from column arrays
const arrowTable = iceT.arrow.buildTable(schema, {
id: ['1', '2', '3'],
timestamp: [new Date(), new Date(), new Date()],
message: ['Event 1', 'Event 2', 'Event 3'],
});
// Append data
await table.append(arrowTable, {
schemaEvolution: 'permissive', // Allow adding new columns
compression: 'zstd',
});
// Overwrite data
await table.overwrite(arrowTable);Time Travel
Read historical snapshots:
// Get all snapshots
const snapshots = table.getSnapshots();
// Read from specific snapshot
const historicalTable = table.asOfSnapshot(snapshotId);
const data = await historicalTable.scan().toArrowTable();
// Read as of timestamp
const pastTable = table.asOfTimestamp(new Date('2024-01-15'));
const pastData = await pastTable.scan().toArrowTable();Table Statistics
const stats = await table.getStatistics();
console.log(`Records: ${stats.totalRecords}`);
console.log(`Files: ${stats.fileCount}`);
console.log(`Size: ${stats.totalSize} bytes`);Direct S3/R2 Access
For direct storage access without a catalog:
import { S3FileIO } from '@liquidmetal-ai/icet';
const fileIO = new S3FileIO({
region: 'auto', // Use 'auto' for R2
accessKeyId: 'your-access-key',
secretAccessKey: 'your-secret-key',
endpoint: 'https://account.r2.cloudflarestorage.com',
bucket: 'my-bucket',
});
// Read file
const data = await fileIO.readFileBytes('s3://my-bucket/path/to/file.parquet');
// Write file
await fileIO.writeFileBytes('s3://my-bucket/path/to/output.parquet', data);
// Check existence
const exists = await fileIO.fileExists('s3://my-bucket/path/to/file.parquet');Cloudflare Workers Support
IceT works in Cloudflare Workers with WASM-based Parquet support:
import { IceT, initParquetWasm } from '@liquidmetal-ai/icet';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Initialize Parquet WASM (required in Workers)
await initParquetWasm();
const iceT = new IceT({ formatVersion: 2 });
const catalog = iceT.catalog({
uri: env.CATALOG_URI,
token: env.CATALOG_TOKEN,
warehouse: env.WAREHOUSE,
});
// ... use catalog normally
}
};Low Memory Mode
For memory-constrained environments:
import { createLowMemoryWriterProperties } from '@liquidmetal-ai/icet';
const writerProps = createLowMemoryWriterProperties({
compression: 'zstd',
rowGroupSize: 1000, // Smaller row groups
dictionaryEnabled: false, // Disable dictionary to save memory
});Error Handling
IceT provides a comprehensive error hierarchy:
import {
IcebergError,
TableNotFoundError,
SchemaValidationError,
CatalogConnectionError,
isIcebergError
} from '@liquidmetal-ai/icet';
try {
const table = await catalog.loadTable({ namespace: ['demo'], name: 'missing' });
} catch (error) {
if (error instanceof TableNotFoundError) {
console.log('Table does not exist');
} else if (error instanceof CatalogConnectionError) {
console.log('Cannot connect to catalog');
} else if (isIcebergError(error)) {
console.log('Iceberg error:', error.message);
}
}Error Types
| Category | Errors |
|----------|--------|
| Catalog | CatalogError, CatalogConnectionError, CatalogAuthenticationError |
| Namespace | NamespaceNotFoundError, NamespaceAlreadyExistsError, NamespaceNotEmptyError |
| Table | TableNotFoundError, TableAlreadyExistsError, TableCommitError, TableCommitConflictError |
| Schema | SchemaValidationError, SchemaEvolutionError, FieldNotFoundError |
| File I/O | FileReadError, FileWriteError, FileNotFoundError, S3AuthenticationError |
| Parquet | ParquetReadError, ParquetWriteError, ParquetSchemaError |
| Transaction | TransactionCommitError, TransactionValidationError |
Advanced Usage
Schema Evolution
// Permissive mode - automatically add new columns
await table.append(dataWithNewColumns, {
schemaEvolution: 'permissive',
});
// Strict mode - fail on schema mismatch
await table.append(data, {
schemaEvolution: 'strict',
});
// Immutable mode - never change schema
await table.append(data, {
schemaEvolution: 'immutable',
});Arrow Expression Evaluator
Filter Arrow tables directly:
import { ArrowExpressionEvaluator, Expressions } from '@liquidmetal-ai/icet';
const filter = Expressions.and(
Expressions.greaterThan('age', 21),
Expressions.equal('status', 'active')
);
const filtered = ArrowExpressionEvaluator.filter(arrowTable, filter);In-Memory Testing
Use InMemoryFileIO for testing:
import { InMemoryFileIO } from '@liquidmetal-ai/icet';
const fileIO = new InMemoryFileIO();
await fileIO.writeFileBytes('test/data.parquet', data);
const read = await fileIO.readFileBytes('test/data.parquet');VARIANT Type Support (Iceberg v3)
IceT supports the Iceberg v3 VARIANT type for storing semi-structured data with server-side queryability. VARIANT columns can store any JavaScript value (objects, arrays, primitives, nested structures) while maintaining efficient binary encoding and query filtering capabilities.
Creating Tables with VARIANT Columns
const schema = iceT.schema([
iceT.types.requiredField(1, 'id', iceT.types.StringType),
iceT.types.requiredField(2, 'payload', iceT.types.VariantType),
iceT.types.optionalField(3, 'metadata', iceT.types.VariantType),
]);
const table = iceT.defineTable({ name: 'events', schema });Writing VARIANT Data
const builder = new ArrowTableBuilder(schema);
builder.addColumn('id', ['evt-1', 'evt-2', 'evt-3']);
builder.addColumn('payload', [
{ model: 'llama-3', tokens: 100, usage: { prompt: 50, completion: 50 } },
{ model: 'gpt-4', tokens: 200 },
'simple string value' // VARIANT supports any JS type
]);
builder.addColumn('metadata', [
{ version: 1, tags: ['production', 'api'] },
null,
{ version: 2 }
]);
const arrowTable = builder.build();
await table.insert(arrowTable);Path Navigation (Efficient Filtering)
VARIANT supports server-side path access for filtering without deserializing entire objects:
import { variantEncode, variantGet } from '@liquidmetal-ai/icet';
// Encode a JS value to VARIANT binary format
const data = variantEncode({
usage: { prompt_tokens: 100, completion_tokens: 50 },
model: 'gpt-4'
});
// Navigate by path without full deserialization
const promptTokens = variantGet(data, ['usage', 'prompt_tokens']); // 100
const model = variantGet(data, ['model']); // 'gpt-4'
const missing = variantGet(data, ['usage', 'missing']); // undefinedVARIANT Storage Details
- Arrow representation: Stored as
Binarytype in Arrow tables - Encoding: Custom binary format with type-tagged values
- Query performance: Path access is optimized for filtering; promoted columns should be used for frequently-filtered fields
- Partitioning: VARIANT columns cannot be used as partition sources
- Schema evolution: VARIANT columns maintain type identity across schema changes
Use Cases
- Log/event storage: Store variable telemetry payloads
- API responses: Capture arbitrary JSON data
- Configuration data: Store semi-structured settings
- Metadata: Attach flexible metadata to structured records
Environment Variables
When using with demos or CLI tools:
| Variable | Description |
|----------|-------------|
| ICEBERG_CATALOG_URI | REST catalog endpoint URL |
| ICEBERG_AUTH_TOKEN | Bearer token for authentication |
| ICEBERG_WAREHOUSE | Warehouse identifier |
| ICEBERG_NAMESPACE | Default namespace (e.g., analytics) |
Compatibility
- Node.js: 18+ (ESM required)
- Cloudflare Workers: Full support with WASM initialization
- Iceberg Format: Version 2 and 3
- Catalogs: Any Iceberg REST catalog (Tabular, Snowflake, Dremio, R2 Data Catalog, etc.)
- Storage: S3, R2, MinIO, and any S3-compatible storage
Dependencies
apache-arrow- Arrow table operationsparquet-wasm- Parquet read/write via WebAssembly@aws-sdk/client-s3- S3 operationsuuid- UUID generation
Contributing
Contributions are welcome! Please see the main repository for contribution guidelines.
