@atproto/lex-server
v0.0.8
Published
Request router for Atproto Lexicon protocols and schemas
Readme
@atproto/lex-server
Request router for Atproto Lexicon protocols and schemas. See the Changelog for version history.
npm install @atproto/lex-server- Type-safe request routing based on Lexicon schemas
- Support for queries, procedures, and WebSocket subscriptions
- Built on Web standard
Request/ResponseAPIs (portable across runtimes) - Custom authentication with credential passing
- Graceful shutdown with
AsyncDisposablepattern
[!IMPORTANT]
This package is currently in preview. The API and features are subject to change before the stable release.
What is this?
Building AT Protocol servers requires handling XRPC requests, validating inputs against Lexicon schemas, managing authentication, and supporting real-time subscriptions. @atproto/lex-server automates this by:
- Routing requests to type-safe handlers based on Lexicon schemas
- Automatically validating request parameters and bodies
- Providing a flexible authentication system with custom strategies
- Supporting WebSocket subscriptions with backpressure handling
import { LexRouter } from '@atproto/lex-server'
import { serve, upgradeWebSocket } from '@atproto/lex-server/nodejs'
import * as app from './lexicons/app.js'
const router = new LexRouter({ upgradeWebSocket })
.add(app.bsky.actor.getProfile, async ({ params }) => {
const profile = await db.getProfile(params.actor)
return { body: profile }
})
.add(app.bsky.feed.post.create, {
auth: requireAuth,
handler: async ({ credentials, input }) => {
const result = await db.createPost(credentials.did, input.body)
return { body: result }
},
})
await serve(router, { port: 3000 })- Quick Start
- LexRouter
- Queries and Procedures
- Subscriptions
- Authentication
- Error Handling
- Node.js Server
- Advanced Usage
- License
Quick Start
1. Install the package
npm install @atproto/lex-server2. Generate Lexicon schemas
Use @atproto/lex to generate TypeScript schemas from your Lexicon definitions:
lex install app.bsky.actor.getProfile
lex build3. Create a router and add handlers
import { LexRouter, LexError } from '@atproto/lex-server'
import { serve, upgradeWebSocket } from '@atproto/lex-server/nodejs'
import * as app from './lexicons/app.js'
const router = new LexRouter({ upgradeWebSocket })
// Add a query handler
router.add(app.bsky.actor.getProfile, async ({ params }) => {
const profile = await db.getProfile(params.actor)
if (!profile) {
throw new LexError('NotFound', 'Profile not found')
}
return { body: profile }
})
// Start the server
const server = await serve(router, { port: 3000 })
console.log('Server listening on port 3000')LexRouter
The LexRouter class is the core of @atproto/lex-server. It routes XRPC requests to type-safe handlers based on Lexicon schemas.
Creating a Router
import { LexRouter } from '@atproto/lex-server'
import { upgradeWebSocket } from '@atproto/lex-server/nodejs'
const router = new LexRouter({
// Required for WebSocket subscriptions (Node.js)
upgradeWebSocket,
// Optional: Handle unexpected errors
onHandlerError: ({ error, request, method }) => {
console.error(`Error in ${method.nsid}:`, error)
},
// Optional: WebSocket backpressure settings
highWaterMark: 250_000, // bytes (default: 250KB)
lowWaterMark: 50_000, // bytes (default: 50KB)
})Adding Routes
Routes are added using the .add() method, which accepts a Lexicon schema and a handler:
// Simple handler (no authentication)
router.add(schema, async ({ params, input, request }) => {
return { body: result }
})
// Handler with authentication
router.add(schema, {
auth: async ({ request, params }) => credentials,
handler: async ({ params, input, credentials, request }) => {
return { body: result }
},
})The router supports method chaining:
const router = new LexRouter()
.add(app.bsky.actor.getProfile, profileHandler)
.add(app.bsky.feed.getTimeline, timelineHandler)
.add(app.bsky.feed.post.create, postHandler)Handler Context
Handlers receive a context object with the following properties:
type LexRouterHandlerContext<Method, Credentials> = {
credentials: Credentials // Result of auth function (undefined if no auth)
input: InferMethodInput<Method> // Parsed request body (procedures only)
params: InferMethodParams<Method> // Parsed URL query parameters
request: Request // Original Web Request object
connection?: ConnectionInfo // Network connection info
}Handler Output
Handlers can return various output formats:
// JSON response (encoding inferred from schema)
return { body: { key: 'value' } }
// With custom encoding
return { encoding: 'text/plain', body: 'Hello, world!' }
// With response headers
return { body: data, headers: { 'x-custom': 'value' } }
// Empty response (200 OK with no body)
return {}
// Custom Response object (full control)
return new Response(body, { status: 201, headers })
// Proxy Response
return fetch('https://example.com/data')Queries and Procedures
Query Handler
Queries handle GET requests and receive parameters from the URL query string:
import * as app from './lexicons/app.js'
router.add(app.bsky.actor.getProfile, async ({ params }) => {
// params.actor is typed and validated
const profile = await db.getProfile(params.actor)
return { body: profile }
})Procedure Handler
Procedures handle POST requests and receive a request body:
router.add(app.bsky.feed.post.create, async ({ input }) => {
// input.body contains the parsed and validated request body
const post = await db.createPost(input.body)
return { body: { uri: post.uri, cid: post.cid } }
})Binary Payloads
For endpoints that accept or return binary data:
// Binary input
router.add(app.example.uploadBlob, async ({ input }) => {
// input.body is a Request object for streaming
// input.encoding contains the content-type
const blob = await input.body.arrayBuffer()
return { body: { cid: await store(blob) } }
})
// Binary output
router.add(app.example.getBlob, async ({ params }) => {
const stream = await getBlob(params.cid)
return {
encoding: 'application/octet-stream',
body: stream,
}
})Subscriptions
Subscriptions provide real-time data over WebSocket connections. Handlers are async generators that yield messages:
import { LexRouter, LexError } from '@atproto/lex-server'
import { serve, upgradeWebSocket } from '@atproto/lex-server/nodejs'
import { scheduler } from 'node:timers/promises'
const router = new LexRouter({
upgradeWebSocket, // Required for WebSocket support in nodejs
})
router.add(com.example.stream, async function* ({ params, request }) {
const { cursor = 0, limit = 10 } = params
const { signal } = request
for (let i = 0; i < limit; i++) {
// Yield messages to the client
yield com.example.stream.message.$build({
data: `Message ${cursor + i}`,
cursor: cursor + i,
})
// Wait between messages (respects abort signal)
await scheduler.wait(1000, { signal })
}
// Throwing a LexError closes the connection with an error frame
throw new LexError('LimitReached', `Limit of ${limit} messages reached`)
})Messages are CBOR-encoded and sent as WebSocket binary frames. The router handles:
- WebSocket upgrade negotiation
- Backpressure management
- Graceful connection cleanup
- Error frame encoding
Authentication
Custom Authentication
Authentication is implemented through the auth function in handler configs:
import { LexError, LexServerAuthError } from '@atproto/lex-server'
type Credentials = { did: string; scope: string[] }
const requireAuth = async ({
request,
}: {
request: Request
}): Promise<Credentials> => {
const header = request.headers.get('authorization')
if (!header?.startsWith('Bearer ')) {
throw new LexServerAuthError(
'AuthenticationRequired',
'Bearer token required',
{
Bearer: { realm: 'api' },
},
)
}
const token = header.slice(7)
const session = await verifyToken(token)
if (!session) {
throw new LexServerAuthError(
'InvalidToken',
'Token is invalid or expired',
{
Bearer: { realm: 'api', error: 'invalid_token' },
},
)
}
return { did: session.did, scope: session.scope }
}
// Use with handlers
router.add(app.bsky.feed.post.create, {
auth: requireAuth,
handler: async ({ credentials, input }) => {
// credentials.did is available here
const post = await db.createPost(credentials.did, input.body)
return { body: post }
},
})The auth function:
- Is called before parsing the request body
- Receives
params,request, andconnectioninfo - Should throw
LexErrororLexServerAuthErroron failure - Returns credentials that are passed to the handler
WWW-Authenticate Headers
Use LexServerAuthError to include WWW-Authenticate headers in error responses:
import { LexServerAuthError } from '@atproto/lex-server'
// Simple Bearer challenge
throw new LexServerAuthError('AuthenticationRequired', 'Login required', {
Bearer: { realm: 'api' },
})
// WWW-Authenticate: Bearer realm="api"
// Multiple schemes
throw new LexServerAuthError('AuthenticationRequired', 'Auth required', {
Bearer: { realm: 'api', scope: 'read write' },
Basic: { realm: 'api' },
})
// WWW-Authenticate: Bearer realm="api", scope="read write", Basic realm="api"
// Token68 format
throw new LexServerAuthError('AuthenticationRequired', 'Auth required', {
Bearer: 'token68value',
})
// WWW-Authenticate: Bearer token68valueError Handling
LexError
Throw LexError to return structured XRPC error responses:
import { LexError } from '@atproto/lex-server'
router.add(app.bsky.actor.getProfile, async ({ params }) => {
const profile = await db.getProfile(params.actor)
if (!profile) {
throw new LexError('NotFound', 'Profile not found')
}
return { body: profile }
})Error responses follow the XRPC format:
{
"error": "NotFound",
"message": "Profile not found"
}LexServerAuthError
LexServerAuthError extends LexError with WWW-Authenticate header support:
import { LexServerAuthError } from '@atproto/lex-server'
throw new LexServerAuthError('AuthenticationRequired', 'Invalid credentials', {
Bearer: { realm: 'api' },
})This returns a 401 response with the WWW-Authenticate header.
Error Handler Callback
Use onHandlerError to log or report unexpected errors:
const router = new LexRouter({
onHandlerError: async ({ error, request, method }) => {
// Log errors (excluding expected abort signals)
console.error(`Error in ${method.nsid}:`, error)
await reportToSentry(error)
},
})[!NOTE]
The callback is only invoked for unexpected errors, not for
LexErrorinstances or request aborts.
Node.js Server
The @atproto/lex-server/nodejs subpath provides Node.js-specific utilities.
serve()
Start a server and begin listening:
import { serve } from '@atproto/lex-server/nodejs'
const server = await serve(router, { port: 3000 })
console.log('Server listening on port 3000')
// Graceful shutdown
await server.terminate()The server supports AsyncDisposable:
await using server = await serve(router, { port: 3000 })
// Server is automatically terminated when scope exitsOptions:
type StartServerOptions = {
port?: number
host?: string
gracefulTerminationTimeout?: number // ms to wait for connections to close
}createServer()
Create a server without starting it:
import { createServer } from '@atproto/lex-server/nodejs'
const server = createServer(router, {
gracefulTerminationTimeout: 5000,
})
server.listen(3000, () => {
console.log('Server listening')
})toRequestListener()
Convert a handler to an Express/Connect-compatible middleware:
import express from 'express'
import { toRequestListener } from '@atproto/lex-server/nodejs'
const app = express()
// Mount the XRPC router
app.use('/xrpc', toRequestListener(router.handle))
app.listen(3000)upgradeWebSocket()
Required for WebSocket subscription support in Node.js:
import { LexRouter } from '@atproto/lex-server'
import { upgradeWebSocket } from '@atproto/lex-server/nodejs'
const router = new LexRouter({ upgradeWebSocket })Advanced Usage
Custom Response Objects
Return a Response object for full control over the response:
router.add(schema, async ({ params }) => {
if (params.redirect) {
return Response.redirect('https://example.com', 302)
}
return new Response(JSON.stringify({ custom: true }), {
status: 201,
headers: {
'Content-Type': 'application/json',
'X-Custom-Header': 'value',
},
})
})Response Headers
Add headers to responses:
router.add(schema, async ({ params }) => {
return {
body: { data: 'value' },
headers: {
'Cache-Control': 'public, max-age=3600',
'X-Request-Id': crypto.randomUUID(),
},
}
})Connection Info
Access network connection information:
router.add(schema, async ({ connection }) => {
console.log('Remote address:', connection?.remoteAddr?.hostname)
console.log('Local address:', connection?.localAddr?.hostname)
return { body: { status: 'ok' } }
})Connection info structure:
type ConnectionInfo = {
localAddr?: {
hostname: string
port: number
transport: 'tcp' | 'udp'
}
remoteAddr?: {
hostname: string
port: number
transport: 'tcp' | 'udp'
}
}License
MIT or Apache2
