@kb-labs/core-state-broker
v1.2.0
Published
Universal state broker for persistent cross-invocation state
Readme
@kb-labs/state-broker
Universal state broker abstraction for persistent cross-invocation state management in KB Labs.
Overview
State Broker provides a unified interface for key-value storage with TTL support, enabling plugins to maintain persistent state across CLI command invocations.
Features
- Zero external dependencies: Pure Node.js implementation
- Multiple backends: In-memory, HTTP daemon, future: Redis, SQLite
- Automatic TTL cleanup: Background cleanup of expired entries
- Graceful degradation: Falls back to in-memory if daemon unavailable
- Namespace isolation: Permission-based access control
- Statistics & monitoring: Built-in metrics (hits, misses, evictions)
Architecture
┌─────────────────────────────────┐
│ Application Code │
│ ├─ QueryCache │
│ ├─ SessionManager │
│ └─ ConfigCache │
└─────────────────────────────────┘
↓
┌─────────────────────────────────┐
│ StateBroker Interface │
│ ├─ get<T>(key): Promise<T> │
│ ├─ set<T>(key, value, ttl) │
│ ├─ delete(key) │
│ ├─ clear(pattern) │
│ └─ getStats() │
└─────────────────────────────────┘
↓ ↓
┌──────────────┐ ┌──────────────┐
│ In-Memory │ │ HTTP Client │
│ Backend │ │ (Daemon) │
└──────────────┘ └──────────────┘
↓
┌──────────────┐
│ State Daemon │
│ (localhost) │
└──────────────┘Installation
pnpm add @kb-labs/state-brokerUsage
Basic Usage (In-Memory)
import { InMemoryStateBroker } from '@kb-labs/state-broker';
const broker = new InMemoryStateBroker();
// Set value with 60s TTL
await broker.set('user:123', { name: 'Alice' }, 60 * 1000);
// Get value
const user = await broker.get<{ name: string }>('user:123');
console.log(user); // { name: 'Alice' }
// Delete value
await broker.delete('user:123');
// Clear by pattern
await broker.clear('user:*');HTTP Client (Daemon Mode)
import { HTTPStateBroker } from '@kb-labs/state-broker';
const broker = new HTTPStateBroker('http://localhost:7777');
// Gracefully falls back to null if daemon unavailable
const cached = await broker.get('query-result');
if (cached) {
console.log('Cache hit:', cached);
} else {
console.log('Cache miss - daemon unavailable or expired');
}
// Set with TTL
await broker.set('query-result', result, 60 * 1000);Factory Pattern (Auto-detection)
import { detectStateBroker } from '@kb-labs/state-broker';
// Automatically detects if daemon is available
const broker = await detectStateBroker();
// Returns HTTPStateBroker if daemon running, InMemoryStateBroker otherwise
await broker.set('key', 'value', 30 * 1000);Plugin Runtime Integration
// In plugin handler (runtime.state is automatically configured)
export async function handler({ runtime }) {
// Access own namespace automatically
await runtime.state.set('query-123', result, 60 * 1000);
// Get from own namespace
const cached = await runtime.state.get('query-123');
// Access external namespace (requires manifest permission)
const shared = await runtime.state.get('other-plugin:config');
}API Reference
StateBroker Interface
interface StateBroker {
/**
* Get value by key
* @returns Value or null if not found/expired
*/
get<T>(key: string): Promise<T | null>;
/**
* Set value with optional TTL
* @param key - Storage key
* @param value - Value to store (must be JSON-serializable)
* @param ttl - Time to live in milliseconds (optional)
*/
set<T>(key: string, value: T, ttl?: number): Promise<void>;
/**
* Delete value by key
*/
delete(key: string): Promise<void>;
/**
* Clear values by pattern
* @param pattern - Glob pattern (e.g., 'user:*', 'cache:*')
*/
clear(pattern?: string): Promise<void>;
/**
* Get broker statistics
*/
getStats(): Promise<BrokerStats>;
/**
* Get health status
*/
getHealth(): Promise<HealthStatus>;
/**
* Stop broker and cleanup
*/
stop(): Promise<void>;
}BrokerStats
interface BrokerStats {
uptime: number; // Milliseconds since start
totalEntries: number; // Current entry count
totalSize: number; // Total size in bytes
hitRate: number; // Cache hit rate (0-1)
missRate: number; // Cache miss rate (0-1)
evictions: number; // Total evictions
namespaces: Record<string, NamespaceStats>;
}
interface NamespaceStats {
entries: number;
sizeBytes: number;
lastAccess: number;
}HealthStatus
interface HealthStatus {
status: 'ok' | 'degraded' | 'error';
version: string;
stats: BrokerStats;
}Backends
InMemoryStateBroker
Use case: Development, testing, fallback when daemon unavailable.
Features:
- ✅ No external dependencies
- ✅ Automatic TTL cleanup (every 30s)
- ✅ Fast (in-memory Map)
- ❌ Lost on process restart
- ❌ Not shared across processes
Configuration:
const broker = new InMemoryStateBroker(
cleanupIntervalMs: 30_000 // Cleanup interval (default: 30s)
);HTTPStateBroker
Use case: Production, persistent cross-invocation state.
Features:
- ✅ Persistent across CLI invocations
- ✅ Shared across processes
- ✅ Graceful degradation if daemon down
- ❌ Requires daemon process
- ❌ Network overhead (~0.1-0.5ms on localhost)
Configuration:
const broker = new HTTPStateBroker(
baseURL: 'http://localhost:7777' // Daemon URL (default)
);Error Handling
Graceful Degradation
HTTP client silently returns null on connection errors:
const broker = new HTTPStateBroker();
// Daemon down - returns null instead of throwing
const value = await broker.get('key'); // null
// Daemon down - silently fails (no-op)
await broker.set('key', 'value'); // No error thrownError Cases
// Daemon running but returns error
try {
await broker.get('key');
} catch (error) {
console.error('Broker error:', error.message);
// Handle error (not connection failure)
}Performance
Benchmarks
| Operation | In-Memory | HTTP (localhost) | File I/O | |-----------|-----------|------------------|----------| | get() | ~0.01ms | ~1ms | ~10-50ms | | set() | ~0.01ms | ~1ms | ~10-50ms | | delete() | ~0.01ms | ~1ms | ~10-50ms |
Expected improvement over file-based cache: 10-50x faster
Memory Usage
- Per entry overhead: ~100 bytes (key + metadata)
- Default quota: 100 MB per plugin (configurable)
- 10,000 entries: ~1 MB + data size
Integration with Plugin Permissions
Manifest Declaration
// manifest.v2.ts
permissions: {
state: {
own: {
read: true,
write: true,
delete: true,
},
external: [
{
namespace: 'other-plugin',
read: true,
write: false,
delete: false,
reason: 'Need to read shared configuration'
}
],
quotas: {
maxEntries: 10000,
maxSizeBytes: 100 * 1024 * 1024, // 100 MB
operationsPerMinute: 1000,
},
},
}Runtime Usage
// Automatic namespace prefixing
await runtime.state.set('key', value);
// Stored as: 'my-plugin:key'
// External namespace (requires permission)
await runtime.state.get('other-plugin:config');
// Permission check enforcedMigration from File-based Cache
Before
// Old file-based cache
const cache = await readJson('.kb/cache.json') || {};
cache['key'] = { value, createdAt: Date.now() };
await writeJson('.kb/cache.json', cache);After
// New StateBroker
const broker = await detectStateBroker();
await broker.set('key', value, 60 * 1000);
const cached = await broker.get('key');Backward Compatible
export class QueryCache {
constructor(cwd: string, broker?: StateBrokerLike) {
this.broker = broker;
}
async get(key: string) {
// Use broker if available
if (this.broker) {
return this.broker.get(key);
}
// Fallback to file-based cache
const cache = await readJson(this.cacheFile);
return cache[key];
}
}Related Packages
- @kb-labs/state-daemon - HTTP daemon server for persistent state
- @kb-labs/plugin-runtime - Runtime integration with permission checks
- @kb-labs/plugin-manifest - Permission type definitions
License
MIT
