@otter-sh/source-postgres
v0.1.0
Published
Postgres source for [Otter](https://github.com/tomnagengast/otter). Implements the `Source` interface from [`@otter-sh/core`](https://www.npmjs.com/package/@otter-sh/core) using `Bun.sql` — **no `pg` or `postgres.js` dependency**. Extracts rows in paginat
Downloads
67
Readme
@otter-sh/source-postgres
Postgres source for Otter. Implements the Source
interface from @otter-sh/core using Bun.sql
— no pg or postgres.js dependency. Extracts rows in paginated batches of 5 000.
Reads column types from information_schema.columns up front so the target CREATE TABLE uses
real Postgres types instead of text.
Install
bun add @otter-sh/source-postgresAdd it to your project's dependencies alongside @otter-sh/core and @otter-sh/cli.
Requires Bun.
Configuration
Import postgresSource and declare the source under sources in otter.config.ts:
import { postgresAdapter } from "@otter-sh/adapter-postgres";
import { defineConfig } from "@otter-sh/core";
import { postgresSource } from "@otter-sh/source-postgres";
export default defineConfig({
profiles: { dev: { target: postgresAdapter({ url: process.env.PG_URL ?? "" }) } },
sources: {
stripe_pg: postgresSource({ url: process.env.STRIPE_PG_URL ?? "" }),
},
modelsDir: "models",
});| Option | Type | Default | Description |
| ------ | -------- | ------- | --------------------------------- |
| url | string | — | Source Postgres connection string |
Streams
A stream name passed to otter load <source>.<stream> is parsed as:
"schema.table"→ reads fromschema.table."table"(no dot) → reads frompublic.table.
otter load stripe_pg.charges # public.charges
otter load stripe_pg.billing.invoices # billing.invoicesExtract behavior
- Column types pulled from
information_schema.columns. - Batched extraction via
Bun.sqlin pages of 5 000 rows. - Without a cursor:
select * from <schema>.<table> order by 1 limit 5000 offset <n>. - With a cursor:
select * from <schema>.<table> where <cursor_field> > <cursor> order by <cursor_field> asc limit 5000.
Incremental loads
Declare an incremental cursor per stream in sources/<name>.ts:
// sources/stripe_pg.ts
import { defineSource } from "@otter-sh/core";
export default defineSource({
streams: {
users: {
write_disposition: "merge",
primary_key: "id",
incremental: { cursor_field: "updated_at" },
},
},
});The driver reads the high-water mark from .otter/state.db (key <stream>:<cursor_field>),
filters with where <cursor_field> > <cursor>, and writes the last value back after each
batch. Pass --full-refresh to otter load to clear the cursor before extract.
Example
otter load stripe_pg.charges --strategy merge --unique-key id
otter load stripe_pg.users --full-refreshFull documentation
- Driver reference — source-postgres
- Interface — sources
- State / cursors — state
otter loadCLI — cli#load
License
MIT
