@transferx/core
v1.1.6
Published
TransferX core engine — chunked upload state machine, retry logic, and events
Readme
@transferx/core
Core engine for TransferX — chunked upload state machine, retry logic, concurrency scheduling, and session persistence.
This is the low-level package. For building applications use @transferx/sdk which wires everything together. Use @transferx/core directly when writing a custom adapter or when you need fine-grained control over the engine.
📖 Full documentation →
🐙 GitHub →
Installation
npm install @transferx/coreArchitecture Overview
UploadEngine
├── Chunker — splits file into fixed-size parts
├── Scheduler — adaptive concurrency (1–16 chunks)
├── RetryEngine — per-chunk exponential backoff + jitter
├── ProgressEngine — EMA-smoothed speed + ETA
├── MetricsEngine — bytes, latency, error-rate counters
├── EventBus — typed publish/subscribe event system
└── ISessionStore — pluggable durable state (FileSessionStore / MemorySessionStore)
TransferManager
└── FIFO queue with maxConcurrentUploads cap (default: 4)Quick Start
import {
UploadEngine,
makeUploadSession,
makeSessionId,
FileSessionStore,
EventBus,
} from "@transferx/core";
import { MyAdapter } from "./my-adapter";
import { statSync } from "fs";
const store = new FileSessionStore("./.sessions");
const bus = new EventBus();
const engine = new UploadEngine({ adapter: new MyAdapter(), store, bus });
const stat = statSync("/path/to/file.mp4");
const sessionId = makeSessionId(
"/path/to/file.mp4",
"uploads/file.mp4",
stat.size,
);
const session = makeUploadSession(
sessionId,
{
name: "file.mp4",
size: stat.size,
mimeType: "video/mp4",
path: "/path/to/file.mp4",
},
"uploads/file.mp4",
engine.config,
);
await store.save(session);
bus.on("chunk:done", (e) => console.log(`chunk ${e.chunkIndex} done`));
bus.on("session:done", (e) => console.log("finished:", e.session.id));
bus.on("session:error", (e) => console.error("error:", e.error.message));
const result = await engine.upload(session);
console.log("state:", result.state); // "done"Session Persistence & Resume
Sessions are automatically saved to the store after every chunk state change. On restart, call resumeSession():
// After process restart — resume from where it left off
const result = await engine.resumeSession(sessionId);For bulk recovery at startup:
const manager = new TransferManager({ adapter, store, bus });
const { resuming } = await manager.restoreFromStore();
console.log(`Resuming ${resuming.length} session(s)`);Deterministic Session IDs
makeSessionId() generates a stable SHA-256-based ID from the file path, target key, and size. Calling it twice with the same inputs returns the same ID — preventing duplicate sessions:
import { makeSessionId } from "@transferx/core";
const id = makeSessionId("/data/video.mp4", "uploads/video.mp4", 3_221_225_472);
// → "a3f7c2e1..." (24 hex chars, deterministic)TransferManager
Coordinates multiple uploads with a FIFO queue and concurrency cap:
import { TransferManager } from "@transferx/core";
const manager = new TransferManager({
adapter,
store,
bus,
maxConcurrentUploads: 4, // default
});
// Enqueue a new upload
manager.enqueue(session);
// Enqueue a resume
manager.enqueueResume(sessionId);
// Recover all in-progress sessions after restart
const { resuming, skipped } = await manager.restoreFromStore();
// Status snapshot (no I/O)
const { activeUploads, queuedUploads } = manager.getStatus();
// Control
manager.pauseAll();
manager.resumeAll();
await manager.cancelAll();Event Bus
Subscribe to typed events from EventBus:
bus.on("chunk:done", (e) => {
/* e.sessionId, e.chunkIndex, e.bytesDone */
});
bus.on("chunk:failed", (e) => {
/* e.error, e.attempt */
});
bus.on("session:progress", (e) => {
/* e.progress: { percent, speedBytesPerSec, etaMs } */
});
bus.on("session:done", (e) => {
/* e.session */
});
bus.on("session:failed", (e) => {
/* e.session, e.error */
});
bus.on("session:error", (e) => {
/* non-fatal error */
});
bus.on("log", (e) => {
/* e.level, e.message, e.context */
});Engine Configuration
import { resolveEngineConfig } from "@transferx/core";
const config = resolveEngineConfig({
chunkSizeBytes: 10 * 1024 * 1024, // 10 MiB (default)
concurrency: {
initial: 4, // starting concurrency
min: 1,
max: 16,
adaptive: true, // auto-tune based on throughput
},
retry: {
maxAttempts: 5,
baseDelayMs: 500,
maxDelayMs: 30_000,
jitterMs: 500,
},
checksumVerify: true, // SHA-256 per chunk
});Session States
created → uploading → completing → done
↘ failed
↘ cancelledTERMINAL_STATES={ "done", "cancelled", "failed" }RESUMABLE_STATES={ "uploading", "failed" }
Writing a Custom Adapter
Implement ITransferAdapter:
import type {
ITransferAdapter,
ChunkUploadResult,
RemoteUploadState,
} from "@transferx/core";
import type { TransferSession, ChunkMeta } from "@transferx/core";
export class MyAdapter implements ITransferAdapter {
async initTransfer(session: TransferSession): Promise<string> {
// Start the remote multipart upload; return provider session ID
return myApi.createUpload(session.targetKey);
}
async uploadChunk(
session: TransferSession,
chunk: ChunkMeta,
data: Uint8Array,
sha256Hex: string,
): Promise<ChunkUploadResult> {
const token = await myApi.uploadPart(
session.providerSessionId!,
chunk.index + 1,
data,
);
return { providerToken: token };
}
async completeTransfer(
session: TransferSession,
chunks: ChunkMeta[],
): Promise<void> {
await myApi.completeUpload(
session.providerSessionId!,
chunks.map((c) => c.providerToken!),
);
}
async abortTransfer(session: TransferSession): Promise<void> {
await myApi.abortUpload(session.providerSessionId!).catch(() => undefined);
}
// Optional — implement for server-side resume reconciliation
async getRemoteState(session: TransferSession): Promise<RemoteUploadState> {
const parts = await myApi.listParts(session.providerSessionId!);
return {
uploadedParts: parts.map((p) => ({
partNumber: p.number,
checksum: p.sha256,
})),
};
}
}Error Taxonomy
All errors are TransferError instances with a typed category:
| Category | Retryable | Factory |
| ----------------- | --------- | ---------------------------------------------------- |
| network | ✅ | networkError(message) |
| rateLimit | ✅ | rateLimitError(message, retryAfterMs?) |
| serverError | ✅ | serverError(message, statusCode?) |
| checksum | ✅ | checksumError(message) |
| auth | ❌ | authError(message) |
| notFound | ❌ | — |
| invalidState | ❌ | invalidStateError(sessionId, state?) |
| invalidInput | ❌ | — |
| duplicateUpload | ❌ | duplicateUploadError(targetKey, existingSessionId) |
| cancelled | ❌ | cancelledError() |
Links
- 📖 Documentation
- 🐙 GitHub
- 📦 npm: @transferx/sdk — recommended entry point
