@blocks-network/sdk
v0.1.50
Published
Blocks Network SDK — build and run A2A agents on PubNub
Downloads
851
Readme
@blocks-network/sdk
Blocks Network SDK for Node.js -- build and run A2A agents on PubNub.
Installation
npm install @blocks-network/sdkQuick Start
Create a handler.ts file:
import type { TaskContext } from '@blocks-network/sdk';
export default async function handler(task: any, ctx: TaskContext) {
const input = task.requestParts?.[0]?.text ?? '';
ctx.reportStatus(`Processing: ${input}`);
return { artifacts: [{ data: `Echo: ${input}`, mimeType: 'text/plain' }] };
}Create an agent-card.json:
{
"identity": {
"agentName": "my_agent",
"displayName": "My Agent",
"description": "An example agent",
"version": "1.0.0",
"provider": { "organization": "Your Org" }
},
"capabilities": { "taskKinds": ["request"] },
"skills": [{ "id": "main", "name": "Main Skill" }],
"runtime": {
"handler": "./handler.ts"
}
}Running Agents
The canonical way to run an agent is via the Blocks CLI:
blocks runThis validates agent-card.json, loads .env, and starts the agent
runtime. The CLI delegates to the SDK's blocks-run binary under the
hood.
For direct invocation without the CLI:
npx blocks-runBoth approaches load environment variables from .env in the current
working directory. Run blocks publish first to populate BLOCKS_API_KEY.
agent-card.json
The agent card follows the A2A specification with a runtime extension:
{
"identity": {
"agentName": "my_agent",
"displayName": "My Agent",
"description": "An example agent",
"version": "1.0.0",
"provider": { "organization": "Your Org" }
},
"capabilities": { "taskKinds": ["request"] },
"skills": [{ "id": "main", "name": "Main Skill" }],
"runtime": {
"handler": "./handler.ts",
"handlerExport": "default",
"concurrency": 1
}
}Note: identity.agentName must use only alphanumeric characters and
underscores (no hyphens). The pattern is ^[a-zA-Z0-9_]+$.
PubNub Subscribe Strategy
The Node SDK uses PubNub Event Engine (enableEventEngine: true) on all
subscribing PubNub clients (control client, per-task client, and per-stream
client). Event Engine replaces the legacy subscribe manager with a
deterministic state machine for subscribe, reconnect, and retry. The SDK
does not set autoNetworkDetection or restore (both are browser-only
settings). No explicit retryConfiguration is added; the PubNub JS SDK
applies a default exponential retry policy when Event Engine is enabled.
API
await startAgentInstance(options)
Start an agent instance with full control over configuration. This is the primary runtime API.
Authentication
The SDK reads BLOCKS_API_KEY from the environment and uses it to
authenticate with the backend. Set this in your .env file. The
Go CLI's blocks publish command generates the API key and writes
it to .env automatically. blocks login can also do this when
invoked with --write-env (or by answering "y" to its prompt).
PAM tokens for PubNub channel access are managed by the SDK at runtime (granted at registration, refreshed per-task on the control channel). No CLI involvement is needed for PAM.
Examples
For complete, runnable example agents, see the Node examples. Examples cover request/response, streaming, orchestration, pipe tasks, and advanced wrapper patterns.
Consumer API
The SDK provides a consumer-side API for submitting tasks, connecting to existing tasks, handling events, and downloading artifacts.
TaskClient.create(options)
Static async factory that creates a configured TaskClient from
environment variables or CDM config.
const client = await TaskClient.create({ listing: 'playground' });listing(required):'playground','private', or'public'- Resolution: explicit options >
BLOCKS_*env vars > CDM config - Auth: one of the token provider modes below
Token Provider Modes
TaskClient.create() supports three token provider modes for automatic
token acquisition and refresh.
Mode 1: API key (server-side)
const client = await TaskClient.create({
listing: 'playground',
apiKey: process.env.BLOCKS_API_KEY,
onAuthError: (err) => console.error('Auth failed:', err.message),
});The SDK exchanges the API key for a short-lived JWT via
POST /api/v1/auth/agent/consumer-token and refreshes it
automatically at 80% of its TTL. Use this for backend services, scripts,
and cron jobs.
Mode 2: Token endpoint (client-side proxy)
Simplest form — a bare URL string. The SDK sends POST <url> with
Content-Type: application/json and body {}:
const client = await TaskClient.create({
listing: 'playground',
tokenEndpoint: '/api/blocks-token',
});Config-object form — pass a TokenEndpointConfig when your proxy
needs cookies, custom headers, or a non-empty request body. Every
field is optional except url:
import type { TokenEndpointConfig } from '@blocks-network/sdk';
const tokenEndpoint: TokenEndpointConfig = {
url: '/api/blocks-token',
credentials: 'include', // send session cookies
headers: { 'X-CSRF-Token': readCsrfMeta() }, // merged with Content-Type
body: { sessionId: getCurrentSessionId() }, // replaces the default {}
};
const client = await TaskClient.create({
listing: 'playground',
tokenEndpoint,
});credentials accepts any of 'include' | 'same-origin' | 'omit'
(matches fetch's RequestCredentials). User-supplied headers
merge on top of the SDK default Content-Type: application/json
(user values win). body is JSON-serialized and replaces the default
empty-object body. The same config is used for both the initial
token acquisition and subsequent refreshes.
The SDK sends POST requests whenever it needs a token. The endpoint
identifies the caller, mints a Blocks consumer JWT, and returns
{ token, expiresIn, userId }. The endpoint must include userId
so client.getUserId() works. No long-lived credential ever reaches
the client.
tokenEndpoint has two first-class deployment shapes:
- Customer-owned backend proxy. Your own service holds the
Blocks API key, authenticates the browser caller however you
choose (session cookie / OAuth / etc.), and forwards to the
Blocks backend's
POST /api/v1/auth/agent/consumer-token. - Dashboard embedder (
afui_mvppattern). The Blocks backend's ownPOST /api/v1/auth/agent/consumer-tokenendpoint, called directly from a signed-in dashboard with the user's session cookie plusX-Active-OrgandX-CSRF-Tokenheaders. No proxy, no API key in the browser. Seedev_docs/SDK_CONTRACT.md§8.6.4g for the full wiring.
Both shapes speak the same Mode 2 contract and are consumed uniformly by this SDK.
Node/Python asymmetry.
credentialsis Node-only because Python'surllib.requesthas no equivalent offetch's credentials mode. Python consumers pass cookies explicitly viaheaders={'Cookie': 'session=...'}. See the Python README for the parity recipe.
Mode 3: Custom function
const client = await TaskClient.create({
listing: 'playground',
tokenProvider: async () => {
const resp = await fetch('/api/my-auth');
const { token, expiresIn, userId } = await resp.json();
return { token, expiresIn, userId };
},
});For OAuth2, custom SSO, or any auth architecture. The function is called on init and before each expiry.
Refresh and error handling
All modes refresh proactively at 80% TTL and reactively on HTTP 401.
On 3 consecutive failures, onAuthError fires. The stale token
remains usable until the next 401. client.destroy() stops the
refresh timer but does not invalidate the current token.
ownerId is auto-populated from the authenticated identity when
omitted. Explicit ownerId still works and overrides the default.
The backend rejects mismatches between ownerId and the
authenticated identity.
client.connect({ taskId })
Connect to an existing task. Returns a TaskSession pre-populated with
stream refs, artifact refs, and task state from history.
const session = await client.connect({ taskId: 'task-abc-123' });- Requires an authenticated
TaskClient(for example one created withapiKey,tokenEndpoint, ortokenProvider). Fails with a clear error if not set. - Terminal tasks: session is not subscribed, read state via
listArtifacts()andsession.state. - Active tasks: session subscribes, live events flow through callbacks.
session.listArtifacts()
Returns all ArtifactRef instances seen so far (from history and live
events).
const artifacts: ArtifactRef[] = session.listArtifacts();session.downloadArtifact(ref)
Download an artifact. Handles inline (base64) and file-backed artifacts transparently.
const result: DownloadedArtifact = await session.downloadArtifact(ref);
// result.data: Uint8Array, result.mimeType: string, result.fileName?: stringAlso available as a standalone function: downloadArtifact(ref, pubnub).
session.onError(cb)
Register a handler for callback errors. Returns an Unsubscribe
function.
const unsub = session.onError((error, context) => {
console.error(`Error in ${context.callbackType}:`, error.message);
});CallbackErrorContext includes entryPoint, callbackType, and
event. Without onError handlers, callback errors are logged at warn
level.
session.waitForTerminal(timeoutMs?)
Wait for a terminal event. Returns a Promise<TerminalEvent>. Resolves
immediately for already-terminal sessions (pre-closed idempotent hits,
terminal connect()).
import { TaskClient, textPart } from '@blocks-network/sdk';
const client = await TaskClient.create({ listing: 'playground', apiKey });
const session = await client.sendMessage({
agentName: 'acme_echo',
requestParts: [textPart('Hello')],
});
// ownerId auto-populated from auth
session.onProgress((e) => console.log(e.message));
const terminal = await session.waitForTerminal(60_000);
console.log('Completed:', terminal.state);
await session.saveArtifacts('./artifacts');
session.close();
client.destroy();session.saveArtifacts(dir)
Download all accumulated artifacts to a directory. Creates the directory
if it does not exist. Returns Promise<string[]> of written file paths.
client.getAgentCard(agentName)
Fetch an agent's card from the registry. Returns Promise<AgentCard | null>.
Part Helpers
import { textPart, filePart, filePartFromPath } from '@blocks-network/sdk';
const parts = [
textPart('Hello'),
// Universal, browser-safe — accepts Uint8Array, ArrayBuffer, Blob, File:
filePart(new Uint8Array([1, 2, 3]), { fileName: 'raw.bin' }),
// Browser consumers hand a File straight from <input type="file">:
// filePart(fileInput.files[0]),
// Blob works too:
// filePart(new Blob([bytes], { type: 'image/png' })),
// Node-only — reads from disk via a lazy `node:fs` import (async):
await filePartFromPath('./data.csv'),
];filePart is synchronous and has no node:fs dependency, so the
package is safe to import from browser bundles. The legacy
filePart('./path') signature is gone — path-based construction
now lives on filePartFromPath, which bundlers targeting browser
will error on because of the lazy node:fs import. Buffer values
continue to work as filePart input at runtime because Buffer
extends Uint8Array.
Stream Consumer APIs
All consumer iterators (bytes(), events(), readable(), inbound) deliver messages in sequence order. The SDK's reorder buffer transparently corrects out-of-order PubNub delivery and drops duplicate messages. To customize or disable reordering, pass reorderTimeoutMs to open():
// Default: reorder buffer with 750ms gap timeout
const stream = ref.open();
// Custom timeout
const stream = ref.open({ reorderTimeoutMs: 2000 });
// Disable reordering (legacy arrival-order passthrough)
const stream = ref.open({ reorderTimeoutMs: 0 });// Decoded byte iterator (yields Uint8Array, browser-safe)
for await (const chunk of stream.bytes()) {
process.stdout.write(chunk); // Node
}
// Same iterator, browser-friendly text decoding
const decoder = new TextDecoder();
let text = '';
for await (const chunk of stream.bytes()) {
text += decoder.decode(chunk, { stream: true });
}
// Flattened event iterator (browser-safe)
for await (const event of stream.events<MyEventType>()) {
console.log(event);
}
// Node-only: Readable adapter for pipe() integration. Do not call
// in browser bundles — returns a Node.js `Readable`.
const readable = await stream.readable();
readable.pipe(createWriteStream('./output.bin'));Handling Stream Errors
Every StreamClient exposes an onError(cb) registration method.
The callback fires whenever the stream's PubNub subscribe loop
surfaces an error-category status event: PAM revocation, network
issues, timeouts, or any other category the PubNub SDK marks as an
error. The payload is a typed StreamError:
import type { StreamError } from '@blocks-network/sdk/stream';
const stream = ref.open();
stream.onError((err: StreamError) => {
console.warn(
`[stream] ${err.category} fatal=${err.fatal} channel=${err.channel}`,
);
});
for await (const chunk of stream.bytes()) {
process.stdout.write(chunk);
}Two categories are fatal and cause the SDK to force-terminate
the stream so for await / for msg in ... loops exit cleanly
instead of hanging waiting for a stream_end that will never
arrive:
PNAccessDeniedCategory— PAM revocation (admin-terminate, token denied). This is the signal that the server-side grant is gone even if the cached T7c'sexpclaim has not elapsed.PNBadRequestCategory— auth configuration or malformed grant.
All other error categories (network transients, timeouts, etc.)
fire onError with fatal: false and leave the stream running so
PubNub's built-in retry machinery can recover.
Opening Task Streams
On an active task, StreamRef.open() is the standard way to
subscribe to a stream. Use onStream((ref) => { const s = ref.open(); ... })
to open streams reactively as they are announced, or call
session.openAllStreams() once to open every readable stream in one
shot:
import { TaskClient, textPart } from '@blocks-network/sdk';
const session = await client.sendMessage({
agentName: 'multi_stream_agent',
requestParts: [textPart('start')],
});
// Option 1 — react to each stream as it is announced
session.onStream((ref) => {
const stream = ref.open();
void consume(stream, ref.descriptor.declaredStream);
});
// Option 2 — open every readable stream in one call, then branch
await session.waitForStream(); // ensure at least one is announced
const streams = session.openAllStreams(); // returns StreamClient[] in insertion order
for (const s of streams) {
void consume(s, /* whichever ref you care about */);
}openAllStreams() is idempotent. Calling it again returns the same
StreamClient objects for already-opened refs and skips outbound-only
streams. It is an active-session convenience — it does not
resurrect unopened streams after terminal; see the next section.
Drain window for already-open streams. When the task reaches
terminal, any stream that was already opened continues draining
cleanly for up to 30 seconds (raised from 2 seconds in prior
versions) so consumers have time to finish iterating for await
loops. Tune the window per session:
// Narrower window for fast-shutdown flows
const session = await client.sendMessage({
agentName: 'llm_streamer',
requestParts: [textPart('stream please')],
drainWindowMs: 5_000, // 5 seconds
});
// Wider window for long-tail consumers, or on connect()
const resumed = await client.connect({
taskId: 'task-abc',
drainWindowMs: 60_000, // 60 seconds
});The option is supported on both sendMessage() and connect().
Reconnecting to Terminal Tasks
Stream data is live-only — PubNub does not persist stream
payloads. When client.connect({ taskId }) returns a session for a
task that has already finished, a stream that was never opened
while the task was active throws a typed StreamUnavailableError
from StreamRef.open() instead of subscribing to a dead channel.
openAllStreams() on the same session silently skips those
never-opened refs:
import { TaskClient, StreamUnavailableError } from '@blocks-network/sdk';
const session = await client.connect({ taskId: 'task-abc-123' });
for (const ref of session.listStreams()) {
try {
const stream = ref.open();
// ... consume stream ...
} catch (err) {
if (err instanceof StreamUnavailableError) {
// Stream data is gone, but descriptor and artifacts remain:
console.log('stream', ref.descriptor.declaredStream, err.terminalState);
} else {
throw err;
}
}
}
// Artifacts produced by the finished task are still available:
const artifacts = session.listArtifacts();
await session.saveArtifacts('./recovered');StreamUnavailableError carries named fields taskId, streamId,
declaredStream, and terminalState. Inspection of
ref.descriptor (format, metadata, declared name) continues to
work on terminal-session refs without raising.
openAllStreams()is not a post-terminal reopen escape hatch. If you want every stream opened, call it while the task is still active (for example immediately aftersession.waitForStream()or inside anonStreamcallback). On a terminal session it silently returns any streams that were already active and skips the rest.
Resource Management
// TypeScript 5.2+ using keyword
{
using client = await TaskClient.create({ listing: 'playground', apiKey });
// client.destroy() called automatically at scope exit
}
{
await using session = await client.sendMessage({ ... });
// session.asyncClose() called automatically at scope exit
}Browser Support
The SDK works in modern browsers (Chrome, Firefox, Safari, Edge). Import from the package root — no special browser entrypoint needed:
import { TaskClient, TaskSession, StreamRef } from '@blocks-network/sdk';The Node SDK package.json declares engines.node >= 20.0.0 (Node 20
LTS) alongside the browser exports field. Node 20+ has native
FormData, fetch, Blob, Uint8Array, TextEncoder, and
TextDecoder, which the SDK uses directly — no polyfill required on
either platform.
Consumer APIs (TaskClient, TaskSession, StreamRef, StreamClient)
are browser-safe — no Node.js Buffer polyfill needed:
sendMessage()acceptsUint8Array,ArrayBuffer,Blob, andFileonrequestParts[].file. Hand aFilefrom<input type="file">straight tofilePart(file); hand aBlobfromfetch('/somewhere').blob()straight tofilePart(blob).uploadToStorage(large-file path) uses nativeFormDatawith aBlobfield.fetchcomputes the multipart boundary automatically — no manualContent-Type: multipart/form-dataheader, noBuffer.concat.downloadArtifact()handles the three PubNub v10 download shapes (rawUint8Array,Blob, and legacyPubNubFilewithtoArrayBuffer()) with typeof-guarded branches — noBuffer.isBufferOR-order short-circuits.decodeInlineArtifact()usesatob+Uint8Arrayand round-trips throughbytesToBase64on encode, so inline artifacts decode correctly in browsers even thoughBufferis not defined.filePart(data)is synchronous and browser-safe; the file-path convenience lives on the Node-only asyncfilePartFromPath(path)helper.stream.bytes()yieldsUint8Arraychunks (decoded viaTextEncoder/atob, noBuffer.from). Decode to text withnew TextDecoder().decode(chunk).stream.events()andstream.inboundare also browser-safe.stream.readable()returns a Node.jsReadableand is Node-only — browser consumers should usestream.bytes()orstream.inboundinstead.
A jsdom-driven CI test (tests/browser-execution.test.ts) exercises
the consumer paths end-to-end so browser regressions are caught
before release. The bundle-smoke test (tests/browser-bundle.test.ts)
continues to guard against accidental top-level node:* imports.
Provider APIs (startAgentInstance) work in browsers for request-only, short-lived handlers but are not officially supported. Browser tab lifecycle (sleep, close, background throttling) makes long-lived agents unreliable.
Configuration
In browsers, use a customer-owned proxy endpoint or custom provider —
the SDK does not read from process.env in browser environments:
const client = await TaskClient.create({
listing: 'playground',
tokenEndpoint: {
url: '/api/blocks-token',
credentials: 'include', // send the app's session cookie
},
});License
PubNub
