@nebulaos/core
v0.4.2
Published
Core primitives for NebulaOS (Agent, Workflow, Providers)
Maintainers
Readme
@starya/nebulaos-core
The core of NebulaOS for building AI Agents and complex Workflows. This package provides the essential primitives for orchestration, state management, and model abstraction.
Installation
pnpm add @starya/nebulaos-core✨ Features Overview
🤖 Agent System
- ✅ Multi-Step Reasoning - Automatic tool calling loops with configurable max steps
- ✅ Streaming Support - Real-time token streaming with
executeStream() - ✅ Tool Calling - Function calling with Zod schema validation
- ✅ Parallel Tool Execution - Execute multiple tools simultaneously
- ✅ Structured Outputs - JSON mode with schema validation
- ✅ Memory Management - Pluggable memory implementations (InMemory, Redis, etc.)
- ✅ Dynamic Instructions - Template interpolation with
{{variables}}and per-execution context - ✅ Request/Response Interceptors - Middleware for RAG, sanitization, etc.
- ✅ Multimodal Support - Text + Images (via ContentPart[])
- ✅ Execution Metadata - Attach custom key-value pairs for tracking and filtering
🔄 Workflow Orchestration
- ✅ Fluent API - Declarative workflow definition (
.start().step().finish()) - ✅ Branching & Parallel - Conditional paths and concurrent execution
- ✅ State Persistence - Save/resume workflow state (requires state store)
- ✅ Retry Policies - Exponential/Linear backoff with configurable attempts
- ✅ Input/Output Validation - Zod schema validation at boundaries
- ✅ Queue Integration - Async workflow execution (requires queue adapter)
- ✅ Event-Driven - Lifecycle events for monitoring and logging
📊 Observability & Tracing
- ✅ Distributed Tracing - Automatic trace/span propagation via AsyncLocalStorage
- ✅ Event System - Rich lifecycle events (
execution:start,llm:call,tool:result, etc.) - ✅ Correlation IDs - Track requests across nested operations
- ✅ Structured Logging - Beautiful console logs with ANSI colors and metadata trees
- ✅ Log Levels - Configurable (debug, info, warn, error)
- ✅ Custom Loggers - Implement
ILoggerfor Datadog, CloudWatch, etc. - ✅ PII Masking - GDPR/LGPD compliance with pluggable masking
🧪 Testing & Quality
- ✅ Provider Compliance Suite - Reusable test suite for model providers
- ✅ Mock Implementations - MockProvider, MockStateStore for unit tests
- ✅ Integration Tests - Live API tests with real providers
- ✅ Type Safety - Full TypeScript support with strict typing
- ✅ Test Coverage - Comprehensive unit and integration tests
🔌 Provider System
- ✅ Provider Abstraction - IModel interface for any LLM (OpenAI, Anthropic, etc.)
- ✅ Token Usage Tracking - Automatic accumulation across multi-step flows
- ✅ Streaming Protocol - Unified chunk types (content_delta, tool_call_start, etc.)
- ✅ Error Handling - Graceful degradation and retry logic
🆕 Recent Updates
v0.0.2 - Execution Metadata
- Execution Metadata: Attach custom key-value pairs to executions via
execute({ metadata: {...} }) - UI Filtering: Filter executions by metadata key/value in the NebulaOS Console
- API Support: Query executions with
?metadataKey=...&metadataValue=...
v0.0.1 - Distributed Tracing & Stream Parity
- Distributed Tracing: Automatic trace/span propagation across Agent → Workflow → Tool chains
- executeStream() Parity: Full feature parity with execute() including token tracking, events, and max steps fallback
- Enhanced Logging: Color-coded terminal output with hierarchical metadata display
- Response Interceptors: Now run only on final responses (not intermediate tool calls)
- Test Coverage: Expanded to 22 unit tests + 7 integration tests
Core Components
1. Agent (Agent)
The main class that manages the lifecycle of AI interaction.
Key Features
- Memory-First: Every agent requires a memory implementation (e.g.,
InMemory) to maintain context. - Interceptors: Middleware to modify requests and responses.
- Request Interceptors: Run before each LLM call. Useful for context injection (RAG).
- Response Interceptors: Run only on the final response (or when there are no tool calls), ideal for output sanitization and formatting.
- Streaming: Native support for text and tool call streaming.
- Observability: Integrated event-based logging system with PII Masking support (GDPR/LGPD).
Complete Example
import { Agent, InMemory, Tool, z } from "@starya/nebulaos-core";
import { OpenAI } from "@starya/nebulaos-openai";
// 1. Tools
const calculator = new Tool({
id: "calculator",
description: "Adds two numbers",
inputSchema: z.object({ a: z.number(), b: z.number() }),
handler: async (ctx, input) => ({ result: input.a + input.b })
});
// 2. Agent
const agent = new Agent({
name: "financial-assistant",
model: new OpenAI({
apiKey: process.env.OPENAI_API_KEY!,
model: "gpt-4o"
}),
memory: new InMemory(),
instructions: "You help with financial calculations.",
tools: [calculator],
logLevel: "info", // Detailed console logs
interceptors: {
response: [
async (res) => {
// Format final output to uppercase (example)
if (res.content) res.content = res.content.toUpperCase();
return res;
}
]
}
});
// 3. Execution
await agent.addMessage({ role: "user", content: "How much is 10 + 20?" });
const result = await agent.execute({
metadata: { userId: "user-123", channel: "api" } // Optional: attach tracking metadata
});
console.log(result.content);📝 Dynamic Instructions
Instructions support template interpolation with {{variable}} syntax. Variables can be provided at config time, runtime, or during execution.
Basic Usage
const agent = new Agent({
id: "support-agent",
name: "support-agent",
model: provider,
memory: new InMemory(),
instructions: `Voce e {{persona}}. Cliente: {{client.name}}. Plano: {{client.plan}}.`
});
await agent.addMessage({ role: "user", content: "Oi" });
await agent.execute({
variables: {
persona: "Veri",
client: { name: "Joao", plan: "Premium" }
}
});
// System prompt: "Voce e Veri. Cliente: Joao. Plano: Premium."InstructionConfig with Defaults
Use { template, defaults } format for default values that can be overridden:
const agent = new Agent({
id: "assistant",
name: "assistant",
model: provider,
memory: new InMemory(),
instructions: {
template: `Voce e {{persona}}. Modo: {{mode}}.`,
defaults: { persona: "Assistente", mode: "normal" }
}
});
// Uses defaults
await agent.execute();
// System prompt: "Voce e Assistente. Modo: normal."
// Override specific variables
await agent.execute({ variables: { mode: "urgente" } });
// System prompt: "Voce e Assistente. Modo: urgente."Loading from Files
Use Instruction.fromFileSync() or Instruction.fromFile() for external templates:
import { Instruction } from "@nebulaos/core";
const agent = new Agent({
id: "agent",
name: "agent",
model: provider,
memory: new InMemory(),
instructions: {
template: Instruction.fromFileSync("./prompts/agent.md"),
defaults: { language: "pt-BR" }
}
});Runtime Variables (Skills)
Skills can modify instructions during execution using setVariable():
// Inside a skill or tool
agent.setVariable("mode", "urgente");
agent.setVariables({ client: { name: "Maria" } });
// Read variables
const mode = agent.getVariable("mode"); // "urgente"
const all = agent.getVariables(); // { mode: "urgente", client: { ... } }
// Clear all runtime variables
agent.clearVariables();Variable Priority
Variables are merged with the following priority (higher overrides lower):
- Config defaults (
instructions.defaults) - Runtime variables (
setVariable()/setVariables()) - Execution variables (
execute({ variables }))
🏷️ Execution Metadata
Attach custom metadata to agent executions for tracking, filtering, and debugging. Metadata is persisted with the execution and can be used to search/filter executions in the UI.
Basic Usage
const result = await agent.execute("Process this order", {
metadata: {
environment: "production",
userId: "user-123",
orderId: "ORD-456",
channel: "whatsapp"
}
});Use Cases
| Use Case | Example Metadata |
|----------|------------------|
| Multi-tenant | { tenantId: "acme-corp", workspaceId: "ws-1" } |
| Request tracking | { requestId: "req-abc", sessionId: "sess-xyz" } |
| A/B testing | { experiment: "new-prompt-v2", variant: "B" } |
| Environment | { environment: "staging", region: "us-east-1" } |
| Business context | { customerId: "C-123", orderId: "ORD-456" } |
With Streaming
Metadata works the same way with executeStream():
const stream = agent.executeStream({
metadata: { streamId: "live-chat-001", channel: "widget" }
});
for await (const chunk of stream) {
// Process chunks...
}Filtering in the UI
Once metadata is attached, you can filter executions in the NebulaOS Console:
- By key: Find all executions with
environmentkey - By key-value: Find executions where
environment = "production"
API Filtering
# Filter by metadata key and value
GET /execution?metadataKey=environment&metadataValue=production
# Filter by key only (check if key exists)
GET /execution?metadataKey=orderIdBest Practices
- Use consistent keys across your application for easier filtering
- Keep metadata small - it's stored as JSONB, avoid large objects
- Use strings for values when possible - easier to filter
- Don't store sensitive data - metadata may appear in logs and UI
Supported Value Types
| Type | Behavior |
|------|----------|
| string, number, boolean | Converted directly |
| null | Converted to "null" |
| undefined | Placeholder kept as-is ({{key}}) |
| object, array | JSON stringified |
Nested Paths
Access nested object properties with dot notation:
instructions: "Cliente: {{client.profile.name}}, Empresa: {{client.company.name}}"
// With variables:
{
client: {
profile: { name: "Joao" },
company: { name: "Acme Corp" }
}
}
// Result: "Cliente: Joao, Empresa: Acme Corp"🔄 Workflow (Workflow)
Orchestrator for long-running and complex processes, supporting persistence, retries, and validation.
Features
- Fluent API: Declarative definition (
.start().step().branch().finish()). - State Persistence: Saves the state of each step (requires
stateStore). - Resilience: Configurable Retry Policies (Exponential, Linear).
- Type-Safe: Input and output validation with Zod.
Workflow Example
import { Workflow, MockStateStore } from "@starya/nebulaos-core";
import { z } from "zod";
const workflow = new Workflow({
id: "order-processing",
stateStore: new MockStateStore(), // Or real implementation (Redis/Postgres)
retryPolicy: {
maxAttempts: 3,
backoff: "exponential",
initialDelay: 1000
}
})
.start(async ({ input }) => {
console.log("Validating order", input);
return input;
})
.step("payment", async ({ input }) => {
// Payment logic
return { status: "paid", id: input.id };
})
.finish(async ({ input }) => {
return { message: `Order ${input.id} processed successfully` };
});
// Execution
const result = await workflow.run({ id: 123, total: 500 });🛡️ Logging & Observability
The Core emits rich events during execution (execution:start, llm:call, tool:result).
Configuration
The default logger (ConsoleLogger) can be configured via logLevel. For production, you can implement the ILogger interface to send logs to Datadog, CloudWatch, etc.
PII Masking (Privacy)
Protect sensitive data in logs by configuring a piiMasker.
const agent = new Agent({
// ...
piiMasker: {
mask: (text) => text.replace(/\d{3}\.\d{3}\.\d{3}-\d{2}/g, "***") // Example mask
}
});🧪 Testing & Compliance
Package Tests
pnpm testProvider Compliance Suite
If you are creating a new Provider (e.g., Anthropic), use the compliance suite to ensure full compatibility.
import { runProviderComplianceTests } from "@starya/nebulaos-core/test-utils";
runProviderComplianceTests(() => new MyNewProvider(...), { runLiveTests: true });