npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@kairos-computer/core

v0.1.5

Published

A streamText replacement built on the AI SDK primitives. Owns the tool loop, streams to Redis, handles stop signals and message queuing mid-generation.

Downloads

788

Readme

kai

A streamText replacement built on the AI SDK primitives. Owns the tool loop, streams to Redis, handles stop signals and message queuing mid-generation.

Uses Effect-TS for typed errors, dependency injection, and concurrent stream management.

Install

bun add @kairos-computer/core
# Redis adapter (optional)
bun add @kairos-computer/redis

Quick Start

import { Agent } from "@kairos-computer/core"
import { tool, zodSchema } from "ai"
import { anthropic } from "@ai-sdk/anthropic"
import { z } from "zod"

const agent = new Agent({
  model: anthropic("claude-sonnet-4-20250514"),
  system: "You are a helpful assistant.",
  tools: {
    weather: tool({
      description: "Get the weather",
      inputSchema: zodSchema(z.object({ city: z.string() })),
      execute: async (input) => ({ temp: 72, city: input.city }),
    }),
  },
})

const result = await agent.runPromise("conversation-1", [
  { id: "msg-1", role: "user", parts: [{ type: "text", text: "Weather in SF?" }] },
])

console.log(result.finishReason) // "stop"
console.log(result.steps)        // [{ toolCalls: [...] }, { text: "It's 72°F in SF" }]

How It Works

kai calls model.doStream() directly in a loop. Each iteration:

  1. Convert UIMessage[]LanguageModelV3Prompt (using AI SDK's own conversion)
  2. Stream one LLM call, publish chunks to the Streaming service
  3. On tool-calls → execute tools (parallel), append results, continue
  4. On stop → check message queue for new user messages, continue if any
  5. On length → call onContextOverflow hook or fail
  6. On abort → normalize partial results, break

No maxSteps. No stopCondition. The loop runs until the model says stop, the user sends a stop signal, or an error occurs.

Core Concepts

Messages: UIMessage

kai uses the AI SDK's UIMessage type for all messages. This is the same type the frontend uses — no conversion needed between backend and frontend.

Tools: ToolSet

Define tools with tool() from the ai package. kai passes them to model.doStream() using the AI SDK's own prepareToolsAndToolChoice.

import { tool, zodSchema } from "ai"
import { z } from "zod"

const tools = {
  search: tool({
    description: "Search the web",
    inputSchema: zodSchema(z.object({ query: z.string() })),
    execute: async (input) => fetchResults(input.query),
  }),
}

executeTools

Controls whether kai executes tool calls or returns them for the caller to handle.

// Default: true — auto-execute all tools
executeTools: true

// Handoff pattern: stop at tool calls, let another system execute them
executeTools: false

// Custom: you control execution
executeTools: (toolCalls, tools, abortSignal) =>
  Effect.succeed(toolCalls.map(tc => ({
    toolCallId: tc.toolCallId,
    toolName: tc.toolName,
    input: tc.input,
    output: myExecute(tc),
    isError: false,
    durationMs: 0,
  })))

prepareStep

Called before each LLM call. Override the model, tools, or system prompt per step.

prepareStep: (ctx) => {
  if (ctx.stepNumber > 5) {
    return { tools: { ...baseTools, ...advancedTools } }
  }
  return {}
}

Streaming: StreamChunk

Every chunk published to the Streaming service is wrapped in a StreamChunk:

interface StreamChunk {
  conversationId: string
  responseId: string       // correlate streaming with persisted message
  chunk: UIMessageChunk    // text-delta, tool-input-available, finish, etc.
  seq: number              // monotonic, for dedup and ordering
}

The responseId correlates streaming chunks with the persisted message. Each assistant message in result.messages carries its responseId in metadata.responseId. If the loop absorbs queued user messages and generates multiple responses in one run, each response gets its own responseId — a finish chunk is emitted between them.

Services

kai uses four optional Effect services. Provide them via layers, or use the noop defaults.

Streaming

Publishes StreamChunk for real-time frontend updates.

import { Streaming } from "@kairos-computer/core"
import { Layer, Effect } from "effect"

const myStreamingLayer = Layer.succeed(Streaming, {
  publish: (chunk) => Effect.promise(() => redis.publish(channel, JSON.stringify(chunk))),
})

MessageQueue

User messages arriving mid-generation. The loop drains these between steps.

import { MessageQueue } from "@kairos-computer/core"

const myMqLayer = Layer.succeed(MessageQueue, {
  drain: () => Effect.promise(() => readAllPendingMessages()),
  wait: () => Effect.promise(() => blockForNextMessage()),
  // Optional: ACK entries consumed by drain()/wait().
  // Agent.run() calls this after successful persistence.
  ack: () => Effect.promise(() => ackConsumedEntries()),
})

StopSignal

External stop requests. Aborts both streaming and tool execution.

import { StopSignal } from "@kairos-computer/core"

const myStopLayer = Layer.succeed(StopSignal, {
  check: () => Effect.promise(() => redis.getdel("stop-key").then(v => v !== null)),
  wait: () => Effect.async((resume) => {
    subscriber.subscribe("stop-channel")
    subscriber.on("message", () => resume(Effect.void))
  }),
})

Persistence

Save/load messages. Called by the Agent class before and after the loop.

import { Persistence } from "@kairos-computer/core"

const myPersistenceLayer = Layer.succeed(Persistence, {
  saveMessages: (convId, msgs) => Effect.promise(() => db.save(convId, msgs)),
  loadMessages: (convId) => Effect.promise(() => db.load(convId)),
})

Redis Adapter

@kairos-computer/redis provides ready-made implementations using ioredis.

import { RedisStreamingLayer, RedisMessageQueueLayer, RedisStopSignalLayer } from "@kairos-computer/redis"
import Redis from "ioredis"

const redis = new Redis()
const subscriber = new Redis()

const streamingLayer = RedisStreamingLayer({ redis, userId: "user-123" })
const mqLayer = RedisMessageQueueLayer({
  redis,
  conversationId: "conv-1",
  consumerId: "agent-1",
  parseMessage: (data) => JSON.parse(data.payload),
})
const stopLayer = RedisStopSignalLayer({ redis, subscriber, conversationId: "conv-1" })

Using runLoop Directly

The Agent class is a thin wrapper. For full control, use runLoop:

import { runLoop } from "@kairos-computer/core"
import { anthropic } from "@ai-sdk/anthropic"
import { Effect, Layer } from "effect"

const result = await runLoop({
  model: anthropic("claude-sonnet-4-20250514"),
  conversationId: "conv-1",
  responseId: "resp-abc",
  system: "You are helpful.",
  tools: myTools,
  initialMessages: messages,
  callSettings: { maxOutputTokens: 4096, temperature: 0.7 },
  prepareStep: (ctx) => ({ /* per-step overrides */ }),
  onContextOverflow: (msgs, usage) => Effect.succeed(compactMessages(msgs, usage)),
  hooks: {
    onStepStart: (n) => Effect.log(`Step ${n}`),
    onStepFinish: (r) => Effect.log(`Finished: ${r.finishReason}`),
    onToolCall: (tc) => Effect.log(`Calling ${tc.toolName}`),
    onToolResult: (tr) => Effect.log(`${tr.toolName}: ${tr.isError ? "error" : "ok"}`),
  },
}).pipe(
  Effect.provide(Layer.mergeAll(streamingLayer, mqLayer, stopLayer)),
  Effect.runPromise,
)

Stop Signal Behavior

Stop aborts at every level:

  • Between steps: check() polls before each LLM call
  • Mid-stream: wait() fires → AbortController aborts → stream interrupted
  • Mid-tool-execution: same AbortController passed to tool.execute() — tools that respect abortSignal bail out

Aborted tools get output: "Stopped by user" so the model knows not to retry them on the next turn.

MCP Support

MCP tools work out of the box — they're just ToolSet entries. Load them from @ai-sdk/mcp and pass to kai:

import { createMCPClient } from "@ai-sdk/mcp"

const mcp = await createMCPClient({ transport: myTransport })
const mcpTools = await mcp.tools()

const agent = new Agent({
  model: anthropic("claude-sonnet-4-20250514"),
  tools: { ...localTools, ...mcpTools },
})

For dynamic tool discovery per step:

prepareStep: async () => {
  const mcpTools = await mcp.tools()
  return { tools: { ...localTools, ...mcpTools } }
}

API Reference

Types

| Type | Description | |------|-------------| | UIMessage | AI SDK message type (re-exported) | | ToolSet | AI SDK tool set (re-exported) | | StreamChunk | Chunk + routing metadata (conversationId, responseId, seq) | | LoopResult | Final result: messages, steps, totalUsage, finishReason, responseId | | StepResult | Per-step: finishReason, text, toolCalls, usage, durationMs | | ParsedToolCall | Tool call extracted from model response | | ToolCallResult | Result of executing a tool call | | AgentConfig | Configuration for the Agent class | | LoopConfig | Configuration for runLoop | | CallSettings | Model call settings (temperature, maxOutputTokens, etc.) |

LoopFinishReason

| Value | Meaning | |-------|---------| | "stop" | Model finished naturally | | "tool-calls" | Model wants tool execution but executeTools is false | | "aborted" | Stopped by external signal | | "error" | Model returned an error finish reason |

Errors

| Error | When | |-------|------| | StreamError | model.doStream() fails | | ContextOverflowError | finishReason: "length" with no onContextOverflow handler | | PersistenceError | Persistence service fails to save/load | | ToolExecutionError | Available for custom ExecuteToolsFn implementations |