@graphium/cosmos
v0.1.0-rc.1
Published
Azure Cosmos DB Gremlin boundary for Graphium
Maintainers
Readme
@graphium/cosmos
Graph-native OGM with multiple graph DB backends
Explicit persistence model — no Proxy magic, no N+1 footguns
@graphium/cosmos is the Azure Cosmos DB Gremlin backend for Graphium.
What's new in 0.4
- Unlimited
find()by default — the 0.3 implicit 10 000-row cap is removed. Opt back in viaGraph.create({ defaultMaxRows: 10_000 }). autoFlushdirty tracking — opt-in ES Proxy wrapper that flushes property mutations without explicitpersist()(Graph.create({ autoFlush: true })).- Raw Gremlin
@securityJSDoc —gremlinRaw()/raw()carry the uniform@security WARNINGtag (Phase 5.4). - Cosmos still does not support multi-statement transactions (Gremlin API limitation).
Status
experimental
Install
pnpm add @graphium/cosmos reflect-metadataQuickstart
import 'reflect-metadata'
import {
Graph,
CosmosDriver,
Node,
PrimaryKey,
Property,
} from '@graphium/cosmos'
@Node({ label: 'User' })
class User {
@PrimaryKey({ type: 'uuid', generated: true })
id!: string
@Property({ type: 'string' })
email!: string
}
const graph = await Graph.create({
driver: CosmosDriver.create({
endpoint: 'wss://your-account.gremlin.cosmos.azure.com:443/',
database: 'mydb',
container: 'mygraph',
primaryKey: process.env.COSMOS_KEY!,
}),
entities: [User],
})
const gm = graph.getManager()
await gm.save(User, { email: '[email protected]' })
const users = await gm.find(User, { where: { email: '[email protected]' } })Capabilities
- CRUD operations:
create,save,find,findOne,delete - Relationship persistence
- Find session (cursor-based pagination)
- Schema migration via
MigratorandGremlinMigration - Auto-driver from environment variables (
createCosmosDriverFromEnv) - Error classification (
classifyCosmosError) with retry support (withRetry) - NestJS adapter via
@graphium/nestjs/cosmos(usinggraphNestOgmAdapter)
Limitations
- No transaction support. Cosmos DB Gremlin API does not support multi-statement transactions.
- Script submission mode only. Bytecode traversal submission is not supported by the Cosmos DB Gremlin endpoint.
Auto-Driver (env-based)
COSMOS_ENDPOINT=wss://your-account.gremlin.cosmos.azure.com:443/
COSMOS_DATABASE=mydb
COSMOS_CONTAINER=mygraph
COSMOS_PRIMARY_KEY=your-keyimport { createCosmosDriverFromEnv } from '@graphium/cosmos/auto'Public API
Graph,GraphManager,RepositoryCosmosDriver,CosmosConnection,CosmosResultAdapter,CosmosTypeConverterCosmosTransactionStrategy,COSMOS_CAPABILITIES,CosmosCapabilitiesclassifyCosmosError,classifyCosmosRestError,withRetryCosmosRestClient,signCosmosRequest,buildCanonicalString,deriveDocumentEndpointPartialPersistError(re-exported from@graphium/core)Migrator,GremlinMigrationcreateCosmosDriverFromEnv,resolveCosmosConnectionOptionsFromEnvgraphNestOgmAdapter
Dual-Client Architecture (REST ETag CAS)
The Cosmos DB Gremlin endpoint does not expose optimistic-concurrency
semantics: there is no If-Match header for vertex updates and no atomic
"insert-if-absent" primitive. To make @Version entities behave the same way
as on other backends, @graphium/cosmos runs two clients side-by-side:
- Gremlin client (
gremlin.driver.Clientover WebSocket) — every read and every non-versioned write. - REST CAS adapter (
CosmosRestClient) — calls the Cosmos Document API (https://<account>.documents.azure.com) for the narrow set of operations that need ETag-based compare-and-swap (findOrCreate+saveon@Version-bearing entities). Hand-rolled, no@azure/cosmosdependency.
Routing happens inside CosmosConnection.execute(). The caller (core runtime
or user-level command envelope) sets command.metadata.requiresCAS = true
plus a command.metadata.cas payload describing the operation. The
connection inspects the flag and dispatches through the REST adapter; absent
the flag, the Gremlin client handles the request. Both transports share the
same classifyCosmosError retry policy, OTel hook surface, and connection
lifecycle (connect() / disconnect()).
sequenceDiagram
participant App
participant GM as GraphManager
participant Conn as CosmosConnection
participant Gremlin as Gremlin Client
participant REST as REST CAS Adapter
participant Cosmos as Cosmos DB
App->>GM: save(User, { id, version: 1 })
GM->>Conn: executeCommand({ metadata: { requiresCAS: true, cas: { put, id, ifMatch } } })
Conn->>REST: PUT /dbs/<db>/colls/<coll>/docs/<id>\n+ If-Match
REST->>Cosmos: HTTPS request signed (HMAC-SHA256)
Cosmos-->>REST: 200 (etag rotated) OR 412 (precondition failed)
alt 412
REST-->>Conn: { status: 412 }
Conn-->>GM: throw OptimisticLockError
else 200
REST-->>Conn: { status: 200, document, etag }
Conn-->>GM: IRawResult({ document, etag })
end
App->>GM: find(User)
GM->>Conn: execute('g.V().hasLabel("User")...')
Conn->>Gremlin: submit script
Gremlin->>Cosmos: WebSocket Gremlin
Cosmos-->>Gremlin: traversal result
Gremlin-->>Conn: recordsError semantics
412 Precondition Failed→OptimisticLockError(entityName + id surfaced).409 Conflict(unique-insert collision) →QueryErrorwithduplicateKey: true.429 Too Many Requests→ retried usingx-ms-retry-after-ms; falls through as a retryableConnectionErrorafter retries are exhausted.408 Timeout→ retryableConnectionError.- Cross-client partial failure (REST CAS succeeded but the follow-on Gremlin
relationship write failed) →
PartialPersistError. Cosmos has no cross-collection rollback so the error is intentionally non-retryable; the caller decides whether to compensate.
Capability flag
import { COSMOS_CAPABILITIES } from '@graphium/cosmos'
COSMOS_CAPABILITIES.optimisticLockViaCAS // trueUse this to branch in higher-level code that wants to know whether the backend supports strong CAS or only the simulated compare-and-swap that other Gremlin backends emit.
Migration Journal Domain Specification
Cosmos DB Gremlin migrations are subject to two interlocking constraints
that the Migrator enforces at load time:
Idempotency required. Cosmos cannot wrap the journal write and the migration body in a single transaction. The runner mitigates this with a three-state journal —
pending→applied/failed— but the body itself can still crash mid-execution. The loader therefore refuses any migration class whoseidempotentfield is nottrue. Author migrations usingcoalesce,fold().coalesce(unfold(), addV()), orhas(...).hasNot(...).addV()guards so that re-running them after a crash is safe.Versioned-entity migrations are blocked. The loader refuses any migration whose
touchesVersionedEntitiesfield istrue. A Gremlin journal write without a paired REST CAS document update would leave@Version-bearing entities in an indeterminate state on crash. Apply versioned-entity schema changes via a separate REST-CAS-aware tool (manual or out-of-band script) until compile-time enforcement lands in 0.3 (seeMigrationDescriptor<TCaps>in P18).
Three-state journal
Each migration journal vertex (_GraphiumMigration) carries a state
property:
| State | Meaning |
| --------- | --------------------------------------------------------------------- |
| pending | Runner wrote the row BEFORE invoking up(). Crash leaves it visible. |
| applied | up() succeeded and the row was promoted; safe to skip on next run. |
| failed | up() threw and the runner persisted the failure marker. |
On startup, the runner refuses to proceed if any rows are in pending or
failed state and raises MigrationPendingError with applied /
pending summaries. The operator must:
- Inspect the migration body's effects on the database.
- Either promote the row to
applied(if effects are fully present) or delete the row (if the migration did not run). Re-run the migrator after reconciliation.
Authoring example
import { GremlinMigration, GremlinScriptExecutor } from '@graphium/cosmos'
export default class AddUserIndex extends GremlinMigration {
public readonly name = '001_add_user_label'
public readonly idempotent = true
// Set this to true if the migration mutates @Version-bearing entities.
public readonly touchesVersionedEntities = false
public async up(execute: GremlinScriptExecutor): Promise<void> {
// Idempotent: addV only if no matching vertex exists.
await execute(
`g.V().has('User', 'kind', 'system').fold()` +
`.coalesce(unfold(), addV('User').property('kind', 'system'))`
)
}
public async down(execute: GremlinScriptExecutor): Promise<void> {
await execute(`g.V().has('User', 'kind', 'system').drop()`)
}
}