@the_haruspex/powerbi-realtime
v0.1.0
Published
Node/TypeScript SDK to push real-time data and manage Power BI datasets with built-in auth, batching, retries, and rate limiting.
Maintainers
Readme
@ecustom/powerbi-realtime
Node/TypeScript SDK for pushing near real-time data and managing Power BI datasets. Handles AAD auth, retries with backoff, rate limits, and batching so you can just call postRows() without babysitting Microsoft's limits.
Deprecation Notice: Power BI real-time streaming (Push/Streaming) is scheduled for retirement on 2027-10-31. This SDK abstracts the write path via a
RealTimeSinkto enable migration to Fabric Real-Time Intelligence later.
Architecture Overview
The SDK is built with a layered architecture that separates concerns and enables easy testing:
PowerBiClient (public API)
↓
HttpClient (retries, backoff, idempotency)
↓
AuthProvider (MSAL token management)
↓
RateLimiter (concurrency + throughput limits)
↓
RealTimeSink (abstracts push vs Fabric RTI)Core Components
AuthProvider (src/lib/auth.ts)
- Purpose: Manages Azure AD service principal authentication using MSAL
- Flow: Client credentials flow with scope
https://analysis.windows.net/powerbi/api/.default - Features:
- Token caching with proactive refresh at 80% of expiry
- Supports client secret or certificate auth (cert stub for now)
- Automatic retry on auth failures
- Dependencies:
@azure/msal-node
HttpClient (src/lib/http.ts)
- Purpose: Thin wrapper over undici fetch with enterprise-grade reliability
- Features:
- Retry Logic: Exponential backoff (250ms base, factor 2, max 60s) with full jitter
- Retry-After: Honors HTTP
Retry-Afterheaders for 429/503 responses - Idempotency: Attaches
x-ecs-idempotencyheader to POST requests - Logging: Structured request/response logging with PII redaction hooks
- Timeout: Configurable request timeout (default 60s)
- Retryable: 408, 429, 5xx status codes + network errors
- Dependencies:
undici(Node 18+ native fetch)
RateLimiter (src/lib/rateLimiter.ts)
- Purpose: Enforces Power BI operational limits to prevent API throttling
- Limits:
- Concurrency: Max 4 concurrent POST requests (stays under 5 pending cap)
- Throughput: 110 requests per minute (stays under 120/min cap)
- Strategy: Token bucket with rolling 60-second window
- Behavior: Blocks until capacity available, then releases semaphore
PowerBiClient (src/lib/client.ts)
- Purpose: Main public API surface that orchestrates all operations
- Modules:
- Datasets: Create push/push-streaming datasets, list existing ones
- Tables: Define/alter table schemas, list tables
- Rows: Add rows with automatic batching (≤10k per request) and 413 auto-split
- Refresh: Trigger refresh, get history and execution details
- Parameters: Update parameter values via
Default.UpdateParameters - Queries: Execute DAX for validation/diagnostics
- Features:
- Automatic row chunking to 10,000 rows per request
- Dynamic batch splitting on 413 RequestEntityTooLarge
- Idempotency key generation per batch
- Rate limiting integration
RealTimeSink Interface (src/types.ts)
- Purpose: Abstracts real-time write operations to enable future migration
- Methods:
ensureSink(),pushRows(),clearTable() - Implementations:
PushDatasetSink(src/lib/sinks/pushSink.ts): Current Power BI push datasetsFabricRtiSink(src/lib/sinks/fabricRtiSink.ts): Stub for future Fabric RTI
Operational Limits & Enforcement
The SDK enforces these Power BI limits (from Microsoft's documentation):
| Limit | SDK Behavior |
|-------|--------------|
| 10,000 rows per POST | Automatic chunking in postRows() |
| 1M rows/hour per dataset | Rate limiter: 110 req/min × 10k rows = 660k/hour |
| 120 POST requests/minute | Rate limiter: 110 req/min (10 req/min buffer) |
| 5 pending POST requests | Rate limiter: max 4 concurrent |
| 4,000 chars per string | Client validation (TODO) |
| 75 tables, 75 columns | Schema validation (TODO) |
Error Handling & Resilience
Error Types (src/lib/errors.ts)
PowerBiHttpError: HTTP errors with status, code, requestId, retryable flagRateLimitError: Rate limit exceeded with reset timeAuthError: Authentication/authorization failures
Retry Strategy
- Base delay: 250ms
- Exponential factor: 2
- Max delay: 60 seconds
- Max retries: 8 attempts
- Jitter: Full jitter (random 0-100% of calculated delay)
- Retry-After: Honors server-sent delay headers
Configuration & Environment
Documentation & Reports
- 📋 Available Datasets: Current Power BI workspace inventory and testing recommendations
- 📖 SDK Specification: Complete system specification and design decisions
- 🔧 Integration Setup: Step-by-step guide for setting up integration tests
- 📚 Reports Directory: Index of all project reports and documentation
Environment Variables
# Required
PBI_TENANT_ID=your-tenant-id
PBI_CLIENT_ID=your-service-principal-id
PBI_CLIENT_SECRET=your-service-principal-secret
# Optional
PBI_BASE_URL=https://api.powerbi.com/v1.0/myorg # default
PBI_MAX_POSTS_PER_MINUTE=110 # default
PBI_MAX_CONCURRENT_POSTS=4 # defaultClient Options
interface ClientOptions {
baseUrl?: string; // API base URL
userAgent?: string; // Custom user agent
requestTimeoutMs?: number; // Request timeout (default 60s)
maxConcurrentPosts?: number; // Concurrency limit (default 4)
maxPostsPerMinute?: number; // Throughput limit (default 110)
logger?: (msg: string, ctx?: Record<string, unknown>) => void;
redactFn?: (obj: unknown) => unknown; // PII redaction for logs
}Usage Examples
Basic Setup
import { PowerBiClient } from "@ecustom/powerbi-realtime";
const client = new PowerBiClient(
{
tenantId: process.env.PBI_TENANT_ID!,
clientId: process.env.PBI_CLIENT_ID!,
clientSecret: process.env.PBI_CLIENT_SECRET!
},
{
userAgent: "ecs-realtime-sdk/0.1.0",
logger: (msg, ctx) => console.log(`[PBI] ${msg}`, ctx)
}
);Create Dataset & Push Data
// Create push dataset
const dataset = await client.createDataset("workspace-id", {
name: "realtime_orders",
defaultMode: "Push",
defaultRetentionPolicy: "None",
tables: [{
name: "Orders",
columns: [
{ name: "orderId", dataType: "String" },
{ name: "amount", dataType: "Double" },
{ name: "timestamp", dataType: "Datetime" }
]
}]
});
// Push rows (automatically batched and rate-limited)
await client.postRows("workspace-id", dataset.id, "Orders", [
{ orderId: "A1001", amount: 42.15, timestamp: new Date().toISOString() },
{ orderId: "A1002", amount: 11.00, timestamp: new Date().toISOString() }
]);Using Real-Time Sink Abstraction
// Get the sink (currently PushDatasetSink, future FabricRtiSink)
const sink = client.getPushSink();
// Ensure dataset exists and push data
const { datasetId } = await sink.ensureSink({
groupId: "workspace-id",
datasetName: "realtime_orders",
schema: [/* table schemas */]
});
await sink.pushRows({
groupId: "workspace-id",
datasetId,
table: "Orders",
rows: [/* your data */]
});Advanced Operations
// Update table schema
await client.putTable("workspace-id", dataset.id, "Orders", {
name: "Orders",
columns: [
{ name: "orderId", dataType: "String" },
{ name: "amount", dataType: "Double" },
{ name: "customerId", dataType: "String" } // Added column
]
});
// Trigger refresh
const { refreshId } = await client.refreshDataset("workspace-id", dataset.id, {
applyRefreshPolicy: "full",
notifyOption: "MailOnCompletion"
});
// Update parameters
await client.updateParameters("workspace-id", dataset.id, [
{ name: "Environment", newValue: "Production" }
]);
// Execute DAX for validation
const result = await client.executeDax("workspace-id", dataset.id,
"EVALUATE Orders"
);Validating API via Device Code Delegated Flow (temporary)
Service principal (app-only) access to Power BI is currently blocked in this tenant. To validate endpoints now, use delegated (user) auth via MSAL Node's device code flow.
Requirements:
- Azure AD app registration configured as a Public client
- Delegated permissions granted/consented:
Dataset.ReadWrite.All,Workspace.Read.All
Environment:
export PBI_TENANT_ID=your-tenant-id
export PBI_CLIENT_ID=your-public-client-app-id
export PBI_GROUP_ID=your-workspace-id # hyphenatedRun:
npm run auth:device # Lists groups, optionally datasets for PBI_GROUP_ID
npm run auth:device -- --push-demo # Also creates a push dataset and posts sample rowsNotes:
- This is a temporary fallback until service principal is enabled. Then we'll switch to client credentials.
- For full live workflow, see Integration Setup Guide.
Auth-only Integration Test (delegated token)
If you just want to prove that a Power BI token works—without touching any specific workspace or dataset—run the device-code helper, grab the printed access-token, and export it:
npm run auth:device # follow browser prompt, copy access token
export PBI_DELEGATED_ACCESS_TOKEN="<the-huge-JWT>"
# run just the auth integration or the whole suite
npm run test:integration # will now include auth.device.test.ts and passThe test simply calls GET /groups and asserts HTTP 200. No workspace membership needed, so it’s safe even if the service-principal path is still blocked.
Development & Testing
Project Structure
src/
├── index.ts # Main exports
├── types.ts # Public interfaces and types
├── lib/
│ ├── errors.ts # Error classes
│ ├── auth.ts # MSAL authentication
│ ├── http.ts # HTTP client with retries
│ ├── rateLimiter.ts # Rate limiting logic
│ ├── client.ts # Main Power BI client
│ └── sinks/
│ ├── pushSink.ts # Power BI push dataset sink
│ └── fabricRtiSink.ts # Future Fabric RTI sink
test/ # Unit tests (Vitest + undici MockAgent)Testing Strategy
- Unit Tests: Vitest + undici MockAgent for HTTP interception
- Coverage: Target ≥80% line/branch coverage
- Test Cases:
- Row chunking and 413 auto-split
- HTTP retry with exponential backoff
- Rate limiter concurrency and throughput
- Error handling and type safety
Build & Quality
npm run build # TypeScript compilation
npm run test # Run test suite
npm run lint # ESLint checks
npm run fmt # Prettier formatting
npm run typecheck # Type checking onlyMigration Path to Fabric RTI
When Microsoft retires Power BI real-time streaming in 2027:
- Keep using
RealTimeSinkinterface - no code changes needed - Switch implementation:
client.getPushSink()returnsFabricRtiSinkinstead - New pipeline: Eventstreams → KQL DB → Direct Lake semantic model
- Feature flag: Toggle at client construction time
Dependencies
Runtime
@azure/msal-node: Azure AD authenticationundici: HTTP client (Node 18+ native fetch)
Development
typescript: TypeScript compilervitest: Test runnereslint: Lintingprettier: Code formatting
License
MIT License - see LICENSE file for details.
Contributing
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open Pull Request
Support
- Issues: GitHub Issues
- Documentation: API Reference
- Migration Help: Fabric RTI Guide
