@nadam_pranavam/pccp-core-library
v0.0.2
Published
Core library for PCCP WebSocket server with rate limiting and metrics
Readme
PCCP Core Library
A robust agent orchestration and management library built on the PCCX protocol, designed for scalable, secure, and extensible multi-agent systems.
Table of Contents
- Features
- Protocol
- Architecture Overview
- Project Structure
- Installation
- Development
- Usage Examples
- YAML-Based Agentic Workflows
- Custom Agent Workflows
- Performance Benchmarking
- Testing Strategy
- Dependencies
- Contributing
- License
- Support
- Docker: Custom Deployment Environment
Features
- Agent creation, orchestration, and management
- Workflow/job management for distributed agents
- YAML-based workflow templates (load via CLI, API, or programmatically)
- WebSocket and REST communication bridges
- Rate limiting, metrics, and monitoring
- Pluggable trust, plugin, and result store systems
- Network reliability testing with Toxiproxy
- Comprehensive test suite
- TypeScript support
Protocol
The library is built on the PCCX protocol (PCCP Communication Protocol Extension), which defines the message formats, agent interaction patterns, and orchestration semantics for multi-agent systems. PCCX ensures interoperability, extensibility, and robust communication between agents and orchestrators.
See the full protocol and architecture diagram in PLAN_PCCX.md.
Architecture Overview
The PCCP Core Library is designed for modularity, extensibility, and testability. It features:
- Agent runtime and orchestrator for workflow/job management
- Core message protocol framework (
PCCPMessage) - Pluggable trust, plugin, and result store systems
- Abstracted communication layer (WebSocket, REST, NATS, etc.)
- Monitoring/admin APIs and developer SDK/CLI
Project Structure
├── src/ # Source code
│ ├── agent/ # Agent implementation
│ ├── bridge/ # Communication bridges (REST, NATS)
│ ├── config/ # Configuration settings
│ ├── context/ # Context management
│ ├── metrics/ # Metrics collection and visualization
│ ├── orchestration/ # Orchestration components
│ ├── plugin/ # Plugin system
│ ├── resultStore/ # Result storage implementation
│ ├── transport/ # Network transport implementations
│ ├── trust/ # Trust policies and verification
│ ├── utils/ # Utility functions
│ └── websocket/ # WebSocket implementation
├── test/ # Test files
│ ├── components/ # Component tests
│ │ ├── admin/ # Admin API tests
│ │ ├── bridge/ # Bridge tests
│ │ ├── communication/ # Communication tests
│ │ ├── message/ # Message handling tests
│ │ ├── misc/ # Miscellaneous tests
│ │ ├── orchestrator/ # Orchestrator tests
│ │ ├── pccp/ # PCCP protocol tests
│ │ └── rateLimit/ # Rate limiting tests
│ └── config/ # Test configuration
├── dist/ # Compiled output
└── node_modules/ # DependenciesInstallation
npm installDevelopment
Building the Project
npm run buildRunning Tests
# Run all tests
npm test
# Run tests with coverage
npm run test:save
# Run linting
npm run lintStarting the Server
npm startUsage Examples
Sending a Message (WebSocket)
import { PCCPWebSocketClient } from './src/transport/pccp.websocket.client';
import { PCCPMessage } from './src/message';
const client = new PCCPWebSocketClient('ws://localhost:9001');
await client.connect();
const msg = new PCCPMessage('MESSAGE', 'sender-id', { data: 'hello' });
client.send(msg);Loading a Workflow from YAML (CLI)
pccx workflow load path/to/workflow.yaml
pccx workflow listLoading a Workflow from YAML (Web API)
curl -X POST http://localhost:PORT/workflow \
-H 'Authorization: Bearer <token>' \
-H 'Content-Type: text/plain' \
--data-binary @path/to/workflow.yamlLoading a Workflow from YAML (Programmatically)
import { Orchestrator } from './src/orchestration/orchestrator';
import { loadWorkflowFromFile } from './src/workflow/yamlLoader';
const wf = await loadWorkflowFromFile('path/to/workflow.yaml');
orchestrator.loadWorkflowTemplate(wf);YAML-Based Agentic Workflows
Overview
- Workflows are defined in YAML: Each workflow describes a sequence of steps, required agent capabilities, data flow, and (optionally) input/output schemas.
- YAML is loaded via CLI, API, or directly in code: See above for examples.
- Agents are NOT created by YAML: The YAML only describes what capabilities are needed for each step. Agents must be implemented, started, and registered separately.
- The orchestrator matches steps to agents: At runtime, the orchestrator finds registered agents that advertise the required capabilities for each step and assigns the step to them.
Agentic Workflow Lifecycle
- Define the workflow in YAML (see below for an example).
- Implement and start real agents that inherit from
AbstractAgentand advertise their capabilities (e.g.,dicom-ingest,dicom-analysis). - Register agents with the runtime (this can be done via CLI, API, or programmatically).
- Load the workflow YAML via CLI, API, or code.
- Start a workflow job (e.g., via API or message). The orchestrator:
- Reads the workflow template.
- For each step, finds a registered agent with the required capabilities.
- Assigns the step to that agent by sending a message (e.g.,
EXECUTE_TASK). - Waits for completion, then proceeds to the next step.
- Workflow completes when all steps are done or if a failure occurs.
Example YAML Workflow
workflowId: dicom-analysis-pipeline
name: DICOM Image Analysis Pipeline
inputSchema:
type: object
properties:
dicomUrl:
type: string
patientId:
type: string
required: [dicomUrl, patientId]
steps:
- stepId: ingest
name: Ingest DICOM
capabilitiesRequired: ["dicom-ingest"]
inputMapping:
url: $inputs.dicomUrl
outputName: dicomData
- stepId: analyze
name: Analyze Image
capabilitiesRequired: ["dicom-analysis", "ai"]
inputMapping:
dicom: $steps.ingest.output.dicomData
outputName: analysisResult
- stepId: report
name: Generate Report
capabilitiesRequired: ["report-generation"]
inputMapping:
analysis: $steps.analyze.output.analysisResult
patient: $inputs.patientId
outputMapping:
reportId: $steps.report.output.reportId
summary: $steps.analyze.output.analysisResult.summaryKey Points
- Agents must be running and registered before the workflow is started.
- The orchestrator does not create agents; it only matches steps to available agents by capability.
- You must implement real agents (e.g.,
DicomIngestAgent,DicomAnalysisAgent) that inherit fromAbstractAgentand register themselves. - The orchestrator uses the YAML to coordinate the workflow, assigning each step to the right agent and handling data flow and results.
Implementing and Registering Real Agents
- Implement a concrete agent class:
import { AbstractAgent } from 'pccp-core-library';
class DicomIngestAgent extends AbstractAgent {
constructor(id: string) {
super(id);
// ...
}
getCapabilities() { return new Set(['dicom-ingest']); }
async handleMessage(msg) { /* ... */ }
// ...
}- Start and register the agent with the runtime (via CLI, API, or code).
- Ensure the agent advertises its capabilities so the orchestrator can match it to workflow steps.
Custom Agent Workflows
The core library enables you to define, orchestrate, and manage custom agent workflows for real-world domains. Below are examples for healthcare DICOM image analysis using agent orchestration.
Example: Healthcare DICOM Image Analysis Workflow
Suppose you want to build a distributed workflow where:
- An ingest agent receives DICOM images
- An analysis agent performs AI-based image analysis
- A report agent generates and stores diagnostic reports
- The orchestrator coordinates the workflow and aggregates results
1. Define Agents and Capabilities
import { PCCXSDK } from 'pccp-core-library';
const ingestAgent = sdk.createAgent('ingest-agent', ['dicom-ingest']);
const analysisAgent = sdk.createAgent('analysis-agent', ['dicom-analysis']);
const reportAgent = sdk.createAgent('report-agent', ['report-generation']);2. Orchestrate a DICOM Analysis Workflow
import { PCCPMessage, MessageType } from 'pccp-core-library';
// Orchestrator submits a new workflow job
const workflowId = 'dicom-analysis-job-1';
const dicomData = /* ...binary DICOM data... */;
// Step 1: Ingest
const ingestMsg = new PCCPMessage(MessageType.EXECUTE_TASK, 'orchestrator', { dicom: dicomData }, { workflow_id: workflowId, to: 'ingest-agent' });
sdk.routeMessage(ingestMsg);
// Step 2: Analysis (triggered after ingest completion)
sdk.on('message', (msg) => {
if (msg.type === MessageType.TASK_COMPLETED && msg.sender_id === 'ingest-agent') {
const analysisMsg = new PCCPMessage(MessageType.EXECUTE_TASK, 'orchestrator', { dicom: msg.payload.dicom }, { workflow_id: workflowId, to: 'analysis-agent' });
sdk.routeMessage(analysisMsg);
}
// Step 3: Report Generation
if (msg.type === MessageType.TASK_COMPLETED && msg.sender_id === 'analysis-agent') {
const reportMsg = new PCCPMessage(MessageType.EXECUTE_TASK, 'orchestrator', { analysis: msg.payload.analysis }, { workflow_id: workflowId, to: 'report-agent' });
sdk.routeMessage(reportMsg);
}
// Step 4: Finalize
if (msg.type === MessageType.TASK_COMPLETED && msg.sender_id === 'report-agent') {
console.log('DICOM analysis workflow complete. Report:', msg.payload.report);
}
});3. Custom Agent Logic Example (Analysis Agent)
sdk.on('message', (msg) => {
if (msg.type === MessageType.EXECUTE_TASK && msg.sender_id === 'orchestrator' && msg.to === 'analysis-agent') {
// Perform AI-based DICOM analysis
const analysisResult = runDicomAnalysis(msg.payload.dicom);
const completedMsg = new PCCPMessage(MessageType.TASK_COMPLETED, 'analysis-agent', { analysis: analysisResult }, { workflow_id: msg.workflow_id });
sdk.routeMessage(completedMsg);
}
});Tip: You can chain as many agent steps as needed, and use the SDK's workflow/job management features to track progress, handle errors, and aggregate results.
Performance Benchmarking
- Run
./scripts/run-perf-benchmarks.sh <output_dir> [MODE] [extra_args]for batch testing - See scripts/PERF-BENCHMARK-USAGE.md for detailed instructions
Testing Strategy
The project includes a comprehensive testing strategy that covers:
- Network reliability testing
- Communication protocol verification
- Concurrent communication testing
- Rate limiting and metrics verification
- Security validation
- Integration testing
For detailed information about the testing approach, please refer to test_strategy.md.
Dependencies
Main Dependencies
ws: WebSocket client and server implementationexpress: HTTP server for REST implementationnode-fetch: HTTP client for REST communicationajv: JSON Schema validationchart.js: For metrics visualizationtoxiproxy-node-client: For network testing
Development Dependencies
- TypeScript
- Vitest for testing
- ESLint for code linting
- Various TypeScript type definitions
Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Requirements:
- All public APIs must be documented with JSDoc comments
- Run all tests and ensure coverage before submitting
- If you add new features, update the README and add usage examples
- To generate HTML API docs from JSDoc, run:
npx typedoc src/
License
This project is proprietary and confidential.
Support
For support and questions, please contact the development team.
Docker: Custom Deployment Environment
Building the Core-Library Base Image
Build the Docker image for the core-library:
docker build -t core-library:latest .Using the Image as a Base for Custom Agents/Orchestrators
Create a Dockerfile in your custom agent/orchestrator project:
FROM core-library:latest
# Copy your custom agent/orchestrator code
COPY ./my-agent /usr/src/app/custom
# Set working directory
WORKDIR /usr/src/app/custom
# Install any additional dependencies (if needed)
# RUN npm install
# Set the entrypoint for your agent/orchestrator
CMD ["node", "myAgent.js"]Example: Build and Run Your Custom Agent
docker build -t my-agent:latest .
docker run --rm my-agent:latestNotes
- The base image does not run any process by default; you must specify the entrypoint in your custom image.
- The working directory
/usr/src/app/customis provided for your custom code. - The image uses a non-root user for security.
