bulltrackers-module
v1.0.314
Published
Helper Functions for Bulltrackers.
Maintainers
Readme
BullTrackers Platform - Comprehensive Developer Documentation
Table of Contents
- System Overview
- Architecture & Design Patterns
- Core Infrastructure
- Module Reference
- Data Flow & Pipelines
- Computation System
- Development Guidelines
- Deployment & Operations
1. System Overview
1.1 What is BullTrackers?
BullTrackers is a sophisticated financial analytics platform that monitors and analyzes eToro trading portfolios at scale. The system processes millions of user portfolios daily, computing real-time market sentiment signals, risk metrics, and behavioral analytics.
1.2 Key Capabilities
- User Discovery & Monitoring: Automatically discovers and tracks ~5M+ eToro users
- Portfolio Tracking: Stores daily snapshots of user portfolios (both normal users and high-leverage "speculators")
- Trading History: Captures complete trade history for behavioral analysis
- Real-Time Computations: Runs 100+ mathematical computations daily to generate market insights
- Social Sentiment Analysis: Tracks and analyzes social posts from eToro's platform using AI
- Public API: Exposes computed insights via REST API with schema introspection
1.3 Technology Stack
- Runtime: Node.js 20+ (Google Cloud Functions Gen 2)
- Database: Google Firestore (NoSQL document store)
- Messaging: Google Cloud Pub/Sub
- AI/ML: Google Gemini API (sentiment analysis)
- Proxy Infrastructure: Google Apps Script (rate limit mitigation)
- Package Management: NPM private packages
2. Architecture & Design Patterns
2.1 Modular Pipe Architecture
The entire codebase follows a "pipe" pattern where each module exports stateless functions that receive all dependencies explicitly:
// Example pipe signature
async function handleRequest(message, context, config, dependencies) {
const { logger, db, pubsub } = dependencies;
// Function logic here
}Key Benefits:
- Testability: Easy to mock dependencies
- Reusability: Functions are pure and composable
- Maintainability: Clear separation of concerns
2.2 Dependency Injection Pattern
All modules receive dependencies via a dependencies object:
const dependencies = {
db: firestoreInstance,
pubsub: pubsubClient,
logger: loggerInstance,
headerManager: intelligentHeaderManager,
proxyManager: intelligentProxyManager,
batchManager: firestoreBatchManager,
calculationUtils: { withRetry, loadInstrumentMappings },
firestoreUtils: { getBlockCapacities, ... },
pubsubUtils: { batchPublishTasks }
};2.3 Configuration Objects
Each module has its own configuration namespace:
const config = {
// Task Engine Config
ETORO_API_PORTFOLIO_URL: "https://...",
TASK_ENGINE_MAX_USERS_PER_SHARD: 500,
// Orchestrator Config
discoveryConfig: { normal: {...}, speculator: {...} },
updateConfig: { ... },
// Computation System Config
COMPUTATION_PASS_TO_RUN: "1",
resultsCollection: "unified_insights"
};2.4 Sharded Data Storage
To handle massive scale, user data is sharded across multiple Firestore documents:
Collection: NormalUserPortfolios
├── Block: 0M (users 0-999,999)
│ └── snapshots/
│ └── 2025-01-15/
│ └── parts/
│ ├── part_0 (users 0-499)
│ ├── part_1 (users 500-999)
│ └── ...
├── Block: 1M (users 1,000,000-1,999,999)
└── ...Why Sharding?
- Firestore document size limit: 1MB
- Each "part" contains ~500 users
- Enables parallel processing
2.5 Batch Processing Pattern
All writes use a FirestoreBatchManager to aggregate operations and minimize API calls:
// Instead of 1000 individual writes:
await batchManager.addToPortfolioBatch(userId, blockId, date, data, userType);
// ... (accumulates in memory)
await batchManager.flushBatches(); // Commits in bulk3. Core Infrastructure
3.1 IntelligentProxyManager
Purpose: Manages a pool of Google Apps Script proxies to bypass eToro rate limits.
Key Features:
- Automatic Proxy Rotation: Selects available (unlocked) proxies
- Rate Limit Detection: Identifies HTML error pages (DOCTYPE checks)
- Exponential Backoff: Retries failed requests with different proxies
- Locking Mechanism: Temporarily locks proxies that hit rate limits
Usage:
const response = await proxyManager.fetch(url, options);
// Handles: proxy selection, retries, rate limit detectionCritical Configuration:
proxyUrls: [
"https://script.google.com/macros/s/.../exec",
// 10-20 different proxy URLs
],
proxyLockingEnabled: true,
PERFORMANCE_DOC_PATH: "system_state/proxy_performance"3.2 IntelligentHeaderManager
Purpose: Manages browser headers to mimic real user traffic.
Key Features:
- Header Rotation: Uses different User-Agent strings
- Performance Tracking: Records success rates per header
- Weighted Selection: Prefers high-success headers
- Firestore Sync: Persists performance data
Usage:
const { id, header } = await headerManager.selectHeader();
// Use header in request
headerManager.updatePerformance(id, wasSuccessful);
await headerManager.flushPerformanceUpdates();3.3 FirestoreBatchManager
Purpose: Aggregates Firestore writes to minimize costs and improve performance.
Features:
- Portfolio Batching: Groups user portfolio updates
- Trading History Batching: Aggregates trade history
- Timestamp Management: Tracks last update times
- Username Map Caching: Reduces redundant lookups
- 10-Minute History Cache: Prevents duplicate fetches
Critical Methods:
// Add data (accumulates in memory)
await batchManager.addToPortfolioBatch(userId, blockId, date, data, userType);
await batchManager.addToTradingHistoryBatch(userId, blockId, date, historyData, userType);
await batchManager.updateUserTimestamp(userId, userType, instrumentId);
// Commit everything at once
await batchManager.flushBatches();Auto-Flush Logic:
_scheduleFlush() {
const totalOps = this._estimateBatchSize();
if (totalOps >= 400) {
this.flushBatches(); // Immediate flush if near limit
return;
}
// Otherwise, schedule delayed flush
if (!this.batchTimeout) {
this.batchTimeout = setTimeout(
() => this.flushBatches(),
FLUSH_INTERVAL_MS
);
}
}4. Module Reference
4.1 Orchestrator Module
Entry Point: functions/orchestrator/index.js
Purpose: High-level task coordination (discovery of new users, scheduling updates).
4.1.1 Discovery Orchestration
Function: runDiscoveryOrchestrator(config, deps)
Flow:
- Reset proxy locks (clear stale locks from previous runs)
- Check if discovery is needed for normal users
- Check if discovery is needed for speculators
- Generate candidate CIDs (prioritized + random)
- Publish discovery tasks to Pub/Sub
Key Concepts:
- Blocks: Users are grouped into 1M blocks (e.g., 0M = users 0-999,999)
- Target Capacity: Each block should have ~500 monitored users
- Prioritized Discovery: For speculators, existing normal users holding speculator assets are checked first
- Random Discovery: Random CIDs within blocks are tested
4.1.2 Update Orchestration
Function: runUpdateOrchestrator(config, deps)
Flow:
- Reset proxy locks
- Query timestamp documents for stale users (normal users: updated >24h ago)
- Query speculator blocks for stale users (verified >24h ago AND held assets recently)
- Publish update tasks to dispatcher
Thresholds:
const thresholds = {
dateThreshold: startOfTodayUTC, // Users updated before today
gracePeriodThreshold: 30DaysAgoUTC // Speculators who held assets in last 30 days
};4.2 Dispatcher Module
Entry Point: functions/dispatcher/index.js
Purpose: Rate-limit task submission to prevent GCP scaling issues.
Why Needed?
- Orchestrator generates 10,000+ tasks instantly
- Directly publishing causes:
- GCP quota errors
- Firestore rate limit issues
- Cost spikes
Solution: Dispatcher batches tasks and publishes with delays:
const config = {
batchSize: 50, // Tasks per Pub/Sub message
batchDelayMs: 100 // Delay between batches
};Flow:
- Receive batch of tasks from orchestrator
- Group into sub-batches of 50 tasks
- Publish 1 Pub/Sub message per sub-batch
- Sleep 100ms between publishes
4.3 Task Engine Module
Entry Point: functions/task-engine/handler_creator.js
Purpose: Worker that executes individual user tasks (discover, verify, update).
4.3.1 Task Types
Discover Task
Purpose: Test if random CIDs are public & active users
Flow:
- Receive batch of CIDs (e.g., [123456, 234567, ...])
- POST to eToro Rankings API (returns public users only)
- Filter for active users (traded in last 30 days, non-zero exposure)
- Apply speculator heuristic (if userType='speculator'):
const isLikelySpeculator = ( (trades > 500) || (totalTradedInstruments > 50) || (mediumLeveragePct + highLeveragePct > 50) || (weeklyDrawdown < -25) ); - Chain to 'verify' task for active users
Output: Publishes 'verify' task with usernames
Verify Task
Purpose: Fetch full portfolios to confirm user type
Flow:
- Receive users from discover task
- Sequentially fetch each user's portfolio (to avoid rate limits)
- For speculators: Check if they hold target instruments
- Write to Firestore:
- Block document (
SpeculatorBlocks/{blockId}orNormalUserPortfolios/{blockId}) - Bronze status document (if user is Bronze tier)
- Username map shards (for future lookups)
- Block count increments
- Block document (
Output: Users stored in appropriate collections
Update Task
Purpose: Refresh existing user's portfolio & trade history
Flow:
- Lookup username from cache (or fetch if missing)
- History Fetch (once per user per invocation):
if (!batchManager.checkAndSetHistoryFetched(userId)) { // Fetch trade history from eToro // Store in trading_history collection } - Portfolio Fetch (per instrument for speculators):
// Normal user: 1 fetch (full portfolio) // Speculator: N fetches (1 per instrument they hold) - Accumulate in batch manager
- Update timestamp documents
Critical: All fetches are sequential (concurrency = 1) to avoid AppScript rate limits.
4.3.2 Fallback Mechanism
All API calls use a two-tier fallback:
try {
// Tier 1: AppScript proxy (preferred)
response = await proxyManager.fetch(url, options);
} catch (proxyError) {
// Tier 2: Direct fetch via GCP IPs
response = await fetch(url, options);
}Why?
- AppScript proxies have better rate limit tolerance
- Direct fetch used as emergency fallback
- Only AppScript failures penalize header performance
4.4 Computation System Module
Entry Point: functions/computation-system/helpers/computation_pass_runner.js
Purpose: Executes mathematical computations on stored portfolio data.
4.4.1 Architecture
Manifest Builder (computation_manifest_builder.js):
- Introspects all calculation classes
- Validates dependencies
- Performs topological sort
- Determines execution passes
Pass Runner (computation_pass_runner.js):
- Loads manifest for specific pass
- Checks data availability
- Streams portfolio data in chunks
- Executes computations
- Writes results to Firestore
4.4.2 Calculation Classes
All calculations extend this pattern:
class MyCalculation {
constructor() {
this.results = {};
}
static getMetadata() {
return {
name: "my-calculation",
category: "core", // or 'gem', 'gauss', etc.
type: "standard", // or 'meta'
isHistorical: false, // true if needs yesterday's data
rootDataDependencies: ["portfolio"], // 'insights', 'social', 'history'
userType: "all" // 'normal', 'speculator', or 'all'
};
}
static getDependencies() {
return ["other-calculation"]; // Names of required computations
}
static getSchema() {
return {
"TICKER": {
"metric": 0.0,
"otherField": "string"
}
};
}
async process(context) {
const { user, math, computed, previousComputed } = context;
// Access current user's portfolio
const positions = math.extract.getPositions(
user.portfolio.today,
user.type
);
// Use math primitives
for (const pos of positions) {
const pnl = math.extract.getNetProfit(pos);
const ticker = context.mappings.instrumentToTicker[
math.extract.getInstrumentId(pos)
];
this.results[ticker] = { pnl };
}
}
async getResult() {
return this.results;
}
}4.4.3 Math Primitives Layer
File: functions/computation-system/layers/math_primitives.js
Purpose: Single source of truth for data extraction and mathematical operations.
Key Classes:
DataExtractor
// Schema-agnostic position access
const positions = DataExtractor.getPositions(portfolio, userType);
const instrumentId = DataExtractor.getInstrumentId(position);
const netProfit = DataExtractor.getNetProfit(position);
const weight = DataExtractor.getPositionWeight(position, userType);
// Speculator-specific
const leverage = DataExtractor.getLeverage(position);
const openRate = DataExtractor.getOpenRate(position);
const stopLoss = DataExtractor.getStopLossRate(position);MathPrimitives
// Statistical functions
const avg = MathPrimitives.average(values);
const median = MathPrimitives.median(values);
const stdDev = MathPrimitives.standardDeviation(values);
// Risk calculations
const hitProbability = MathPrimitives.calculateHitProbability(
currentPrice,
stopLossPrice,
volatility,
daysAhead
);
// Monte Carlo simulation
const futurePrices = MathPrimitives.simulateGBM(
currentPrice,
volatility,
days,
simulations
);HistoryExtractor
const history = HistoryExtractor.getDailyHistory(user);
const assets = HistoryExtractor.getTradedAssets(history);
const summary = HistoryExtractor.getSummary(history);SignalPrimitives
// Access dependency results
const metric = SignalPrimitives.getMetric(
dependencies,
"calc-name",
"TICKER",
"fieldName"
);
// Get previous state (for time-series)
const prevValue = SignalPrimitives.getPreviousState(
previousComputed,
"calc-name",
"TICKER",
"fieldName"
);4.4.4 Execution Flow
Manifest Generation:
const manifest = buildManifest( ["core", "gem"], // Product lines to include calculations // Imported calculation classes );Pass Execution:
await runComputationPass(config, dependencies, manifest);Data Streaming:
// Stream portfolio data in chunks of 50 users for await (const chunk of streamPortfolioData(...)) { await Promise.all(calcs.map(c => controller.executor.executePerUser(c, metadata, dateStr, chunk, ...) )); }Result Storage:
Collection: unified_insights └── Date: 2025-01-15 └── results/ └── Category: core └── computations/ └── calculation-name: { data }Status Tracking:
// Global status document (NEW) { "2025-01-15": { "calculation-a": true, "calculation-b": false, // Failed or not run ... } }
4.4.5 Computation Passes
Why Passes?
- Some calculations depend on others
- Must execute in dependency order
- Passes enable parallel execution within each level
Example:
Pass 1: [raw-data-extractors] (no dependencies)
Pass 2: [aggregators] (depends on Pass 1)
Pass 3: [signals] (depends on Pass 2)Configuration:
const config = {
COMPUTATION_PASS_TO_RUN: "1", // Set via environment variable
// Cloud Scheduler runs 3 separate functions:
// - computation-pass-1 (COMPUTATION_PASS_TO_RUN=1)
// - computation-pass-2 (COMPUTATION_PASS_TO_RUN=2)
// - computation-pass-3 (COMPUTATION_PASS_TO_RUN=3)
};4.5 Generic API Module
Entry Point: functions/generic-api/index.js
Purpose: REST API for accessing computed insights.
4.5.1 Endpoints
GET /
Query Parameters:
computations: Comma-separated list of calculation namesstartDate: YYYY-MM-DDendDate: YYYY-MM-DD
Example:
GET /?computations=sentiment-score,momentum&startDate=2025-01-10&endDate=2025-01-15Response:
{
"status": "success",
"metadata": {
"computations": ["sentiment-score", "momentum"],
"startDate": "2025-01-10",
"endDate": "2025-01-15"
},
"data": {
"2025-01-10": {
"sentiment-score": { "AAPL": 0.75, ... },
"momentum": { "AAPL": 1.2, ... }
},
...
}
}GET /manifest
Purpose: List all available computations with schemas
Response:
{
"status": "success",
"summary": {
"totalComputations": 127,
"schemasAvailable": 127
},
"manifest": {
"calculation-name": {
"category": "core",
"structure": { "TICKER": { "field": 0 } },
"metadata": { ... },
"lastUpdated": "2025-01-15T..."
}
}
}GET /manifest/:computationName
Purpose: Get detailed schema for specific computation
POST /manifest/generate/:computationName
Purpose: Manually trigger schema generation for a calculation
GET /structure/:computationName
Purpose: Get example data structure from latest stored result
GET /list-computations
Purpose: Simple list of all computation keys
4.5.2 Caching
API responses are cached in-memory for 10 minutes:
const CACHE = {};
const CACHE_TTL_MS = 10 * 60 * 1000;
// On cache HIT: Return from memory (no DB query)
// On cache MISS: Query DB, cache response4.6 Maintenance Modules
4.6.1 Speculator Cleanup
Purpose: Remove inactive speculators to free up monitoring slots
Logic:
- Query
PendingSpeculatorsfor users older than grace period (12 hours) - Query
SpeculatorBlocksfor users who haven't held assets in 30 days - Batch delete stale users
- Decrement block counts
4.6.2 Invalid Speculator Handler
Purpose: Log CIDs that are private or don't meet speculator criteria
Why?
- Prevents re-discovery of known invalid users
- Used for analytics (discovery success rate)
4.6.3 Fetch Insights
Purpose: Daily fetch of eToro's instrument insights
Data Fetched:
- Market trends
- Volatility metrics
- Trading volume
- Sentiment indicators
Storage:
Collection: daily_instrument_insights
└── Document: YYYY-MM-DD
└── insights: [ {...}, {...}, ... ]4.6.4 Price Fetcher
Purpose: Daily fetch of closing prices
Storage:
Collection: asset_prices
└── Document: shard_N (N = instrumentId % 40)
└── {
"10000": {
"ticker": "AAPL",
"prices": {
"2025-01-15": 150.25,
"2025-01-14": 149.80,
...
}
}
}4.6.5 Social Orchestrator & Task Handler
Purpose: Fetch and analyze social posts from eToro
Flow:
- Orchestrator: Publishes 1 task per target ticker
- Task Handler:
- Fetches posts since last run (12-hour window)
- Deduplicates via
processed_postscollection - Filters to English posts
- Calls Gemini AI for sentiment analysis
- Extracts topics (e.g., "FOMC", "Earnings", "CPI")
- Stores in
daily_social_insights/{date}/posts/{postId}
Gemini Analysis:
const prompt = `Analyze this post. Return JSON:
{
"overallSentiment": "Bullish|Bearish|Neutral",
"topics": ["FOMC", "Inflation", ...]
}`;
const result = await geminiModel.generateContent(prompt);5. Data Flow & Pipelines
5.1 Daily Orchestration Schedule
00:00 UTC - Update Orchestrator (normal users)
00:30 UTC - Update Orchestrator (speculators)
01:00 UTC - Discovery Orchestrator (if needed)
02:00 UTC - Fetch Insights
02:30 UTC - Fetch Prices
03:00 UTC - Computation Pass 1
04:00 UTC - Computation Pass 2
05:00 UTC - Computation Pass 3
06:00 UTC - Social Orchestrator
12:00 UTC - Speculator Cleanup5.2 User Discovery Pipeline
┌─────────────────────────────────────────────────────────────┐
│ 1. Orchestrator: Check block capacities │
│ - Are there <500 users in any block? │
└────────────────┬────────────────────────────────────────────┘
│ YES
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. Orchestrator: Generate candidate CIDs │
│ - Prioritized: Normal users holding speculator assets │
│ - Random: Random CIDs within target blocks │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. Dispatcher: Batch tasks (50 per message, 100ms delay) │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Task Engine (Discover): Test CIDs │
│ - POST to Rankings API │
│ - Filter for active users │
│ - Apply speculator heuristic (if applicable) │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 5. Task Engine (Verify): Fetch portfolios │
│ - Confirm user type │
│ - Check instrument holdings (speculators) │
│ - Write to Firestore blocks │
└─────────────────────────────────────────────────────────────┘5.3 User Update Pipeline
┌─────────────────────────────────────────────────────────────┐
│ 1. Orchestrator: Query timestamp documents │
│ - Normal: lastUpdated < startOfToday │
│ - Speculator: lastVerified < startOfToday │
│ AND lastHeldAsset > 30DaysAgo │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. Dispatcher: Batch update tasks │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. Task Engine: Prepare tasks │
│ - Lookup usernames from cache │
│ - If missing, bulk fetch from Rankings API │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Task Engine (Update): Sequential execution │
│ - Fetch trade history (once per user) │
│ - Fetch portfolio (1x normal, Nx speculator) │
│ - Accumulate in batch manager │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 5. Task Engine: Flush batches │
│ - Write all data to Firestore │
│ - Update timestamps │
└─────────────────────────────────────────────────────────────┘5.4 Computation Pipeline
┌─────────────────────────────────────────────────────────────┐
│ 1. Build Manifest │
│ - Introspect calculation classes │
│ - Validate dependencies │
│ - Topological sort │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. Load Global Status │
│ - Check which calcs completed yesterday │
└────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. For each date (from earliest data to yesterday): │
│ ┌───────────────────────────────────────────────────┐ │
│ │ 3a. Check root data availability │ │
│ │ - Portfolio snapshots exist? │ │
│ │ - Insights fetched? │ │
│ │ - Social data available? │ │
│ └────────────────┬──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────┐ │
│ │ 3b. Filter calculations │ │
│ │ - Skip if already completed │ │
│ │ - Skip if dependencies not met │ │
│ │ - Skip if root data missing │ │
│ └────────────────┬──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────┐ │
│ │ 3c. Stream portfolio data │ │
│ │ - Load in chunks of 50 users │ │
│ │ - Execute computations on each chunk │ │
│ └────────────────┬──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────┐ │
│ │ 3d. Commit results │ │
│ │ - Store to unified_insights │ │
│ │ - Update status document │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘6. Computation System
6.1 Adding a New Calculation
Step 1: Create calculation class file
// backend/core/calculations/category/newcalc.js
class MyNewCalculation {
constructor() {
this.results = {};
}
static getMetadata() {
return {
name: "my-new-calc",
category: "core",
type: "standard",
isHistorical: false,
rootDataDependencies: ["portfolio"],
userType: "all"
};
}
static getDependencies() {
return []; // Or list other calculation names
}
static getSchema() {
return {
"TICKER": {
"myMetric": 0.0,
"confidence": 0.0
}
};
}
async process(context) {
const { user, math, mappings } = context;
// Get positions
const positions = math.extract.getPositions(
user.portfolio.today,
user.type
);
// Process each position
for (const pos of positions) {
const instId = math.extract.getInstrumentId(pos);
const ticker = mappings.instrumentToTicker[instId];
if (!ticker) continue;
// Your calculation logic here
const netProfit = math.extract.getNetProfit(pos);
const weight = math.extract.getPositionWeight(pos);
this.results[ticker] = {
myMetric: netProfit * weight,
confidence: weight > 10 ? 0.9 : 0.5
};
}
}
async getResult() {
return this.results;
}
}
**Step 3**: The manifest builder will automatically:
- Discover your calculation
- Validate its metadata
- Determine its execution pass based on dependencies
- Include it in the manifest
**Step 4**: Deploy and run
```bash
npm version patch
npm publish
# Update calculations package version number in core package.json
# Deploy cloud function
Main computation system will automatically discover the new computation.6.2 Using Dependencies
Example: Calculation that depends on another
static getDependencies() {
return ["sentiment-score", "momentum"];
}
async process(context) {
const { computed, math } = context;
// Access dependency results
const sentimentScore = math.signals.getMetric(
computed,
"sentiment-score",
"AAPL",
"score"
);
const momentum = math.signals.getMetric(
computed,
"momentum",
"AAPL",
"value"
);
// Combine signals
this.results["AAPL"] = {
combinedSignal: (sentimentScore * 0.6) + (momentum * 0.4)
};
}6.3 Historical Calculations
Purpose: Compare today's data with yesterday's
class PortfolioChangeCalculation {
static getMetadata() {
return {
name: "portfolio-change",
isHistorical: true, // Requires yesterday's data
// ...
};
}
async process(context) {
const { user, math } = context;
// Access today's portfolio
const todayPositions = math.extract.getPositions(
user.portfolio.today,
user.type
);
// Access yesterday's portfolio
const yesterdayPositions = math.extract.getPositions(
user.portfolio.yesterday,
user.type
);
// Compare positions
// ...
}
}6.4 Meta Calculations
Purpose: Aggregate across all users (not per-user processing)
class MarketWideSentimentCalculation {
static getMetadata() {
return {
type: "meta", // Not per-user
category: "core",
rootDataDependencies: ["insights", "social"]
};
}
static getDependencies() {
return ["user-level-sentiment"]; // Depends on per-user calc
}
async process(context) {
const { computed, insights, social } = context;
// Access results from per-user calculations
const userSentiments = computed["user-level-sentiment"];
// Access insights data
const marketData = insights.today;
// Access social data
const socialPosts = social.today;
// Perform market-wide aggregation
const tickers = new Set();
for (const ticker in userSentiments) {
tickers.add(ticker);
}
for (const ticker of tickers) {
// Aggregate logic
this.results[ticker] = {
aggregatedSentiment: // ...
};
}
}
}6.5 Using Time Series
Purpose: Track moving averages, trends, etc.
class MovingAverageSentiment {
static getDependencies() {
return ["sentiment-score"];
}
async process(context) {
const { computed, previousComputed, math } = context;
const currentScore = math.signals.getMetric(
computed,
"sentiment-score",
"AAPL",
"score"
);
// Get previous state (EMA parameters)
const prevState = math.signals.getPreviousState(
previousComputed,
"moving-average-sentiment",
"AAPL",
"_state"
);
// Update EMA
const newState = math.TimeSeries.updateEMAState(
currentScore,
prevState || { mean: 0, variance: 1 },
0.1 // alpha (decay factor)
);
this.results["AAPL"] = {
ema: newState.mean,
volatility: Math.sqrt(newState.variance),
_state: newState // Store for next day
};
}
}6.6 Advanced: Monte Carlo Risk Analysis
class StopLossRiskCalculation {
async process(context) {
const { user, math, prices } = context;
const positions = math.extract.getPositions(
user.portfolio.today,
user.type
);
for (const pos of positions) {
const instId = math.extract.getInstrumentId(pos);
const ticker = context.mappings.instrumentToTicker[instId];
// Get current price and stop loss
const currentRate = math.extract.getCurrentRate(pos);
const stopLoss = math.extract.getStopLossRate(pos);
if (!stopLoss || stopLoss === 0) continue;
// Get historical prices for volatility calculation
const priceHistory = math.priceExtractor.getHistory(
prices,
ticker
);
// Calculate historical volatility
const returns = [];
for (let i = 1; i < priceHistory.length; i++) {
const ret = Math.log(
priceHistory[i].price / priceHistory[i-1].price
);
returns.push(ret);
}
const volatility = math.compute.standardDeviation(returns) * Math.sqrt(252);
// Calculate probability of hitting stop loss in next 3 days
const hitProbability = math.compute.calculateHitProbability(
currentRate,
stopLoss,
volatility,
3, // days
0 // drift (risk-neutral)
);
// Run Monte Carlo simulation
const simulations = math.compute.simulateGBM(
currentRate,
volatility,
3,
1000 // number of paths
);
// Count how many simulations hit stop loss
let hitCount = 0;
for (const simPrice of simulations) {
if (simPrice <= stopLoss) hitCount++;
}
const empiricalProbability = hitCount / 1000;
this.results[ticker] = {
theoreticalHitProb: hitProbability,
empiricalHitProb: empiricalProbability,
currentRate,
stopLoss,
volatility
};
}
}
}7. Development Guidelines
7.1 Code Style
7.1.1 Naming Conventions
// Calculation names: kebab-case
"sentiment-score"
"moving-average-momentum"
// File names: snake_case
my_calculation.js
data_loader.js
// Class names: PascalCase
class SentimentScoreCalculation { }
class IntelligentProxyManager { }
// Functions: camelCase
async function handleRequest() { }
function getPositions() { }
// Constants: SCREAMING_SNAKE_CASE
const MAX_RETRIES = 3;
const ETORO_API_URL = "https://...";7.1.2 Logging Standards
// Always use structured logging
logger.log('INFO', '[ModuleName] Descriptive message', {
contextKey: contextValue,
errorMessage: error.message // if error
});
// Log levels
logger.log('TRACE', 'Very detailed debug info');
logger.log('INFO', 'General information');
logger.log('WARN', 'Warning condition');
logger.log('ERROR', 'Error condition');
logger.log('SUCCESS', 'Operation completed successfully');
// Task-specific logging pattern
logger.log('INFO', `[TaskType/${taskId}/${userId}] Message`);
// Example: [UPDATE/batch-123/456789] Portfolio fetch successful7.1.3 Error Handling
// Always wrap API calls in try-catch
try {
const response = await proxyManager.fetch(url, options);
if (!response.ok) {
throw new Error(`API error ${response.status}`);
}
wasSuccess = true;
// Process response
} catch (error) {
logger.log('ERROR', '[Module] Operation failed', {
errorMessage: error.message,
errorStack: error.stack,
context: { userId, url }
});
wasSuccess = false;
} finally {
// Always update performance tracking
if (selectedHeader) {
headerManager.updatePerformance(selectedHeader.id, wasSuccess);
}
}7.1.4 Async/Await Best Practices
// BAD: Uncontrolled concurrency
const promises = users.map(user => fetchUser(user));
await Promise.all(promises); // Can overwhelm APIs
// GOOD: Controlled concurrency
const limit = pLimit(1); // Sequential
const promises = users.map(user =>
limit(() => fetchUser(user))
);
await Promise.all(promises);
// GOOD: Batch processing
for (let i = 0; i < items.length; i += BATCH_SIZE) {
const batch = items.slice(i, i + BATCH_SIZE);
await processBatch(batch);
await sleep(DELAY_MS); // Rate limiting
}7.2 Testing Strategy
7.2.1 Local Testing Setup
// test-setup.js
const admin = require('firebase-admin');
const { pipe } = require('bulltrackers-module');
// Initialize Firebase Admin
admin.initializeApp({
credential: admin.credential.applicationDefault(),
projectId: 'your-project-id'
});
const db = admin.firestore();
const logger = {
log: (level, msg, data) => console.log(`[${level}] ${msg}`, data)
};
const dependencies = {
db,
logger,
// ... other dependencies
};
const config = {
// Your config
};
// Test a specific function
async function testUpdateTask() {
const task = {
type: 'update',
userId: '123456',
userType: 'normal'
};
await pipe.taskEngine.handleUpdate(
task,
'test-task-id',
dependencies,
config,
'testuser'
);
}
testUpdateTask().catch(console.error);7.2.2 Unit Test Example
// test/math-primitives.test.js
const { MathPrimitives } = require('../functions/computation-system/layers/math_primitives');
describe('MathPrimitives', () => {
describe('calculateHitProbability', () => {
it('should return 0 for invalid inputs', () => {
const result = MathPrimitives.calculateHitProbability(
0, 100, 0.3, 5
);
expect(result).toBe(0);
});
it('should return probability between 0 and 1', () => {
const result = MathPrimitives.calculateHitProbability(
100, 90, 0.3, 5
);
expect(result).toBeGreaterThanOrEqual(0);
expect(result).toBeLessThanOrEqual(1);
});
it('should return higher probability for closer barriers', () => {
const far = MathPrimitives.calculateHitProbability(
100, 80, 0.3, 5
);
const close = MathPrimitives.calculateHitProbability(
100, 95, 0.3, 5
);
expect(close).toBeGreaterThan(far);
});
});
});7.2.3 Integration Test Pattern
// test/integration/computation-system.test.js
async function testComputationPipeline() {
// 1. Setup test data
await setupTestPortfolios();
// 2. Build manifest
const calculations = require('aiden-shared-calculations-unified');
const manifest = pipe.computationSystem.buildManifest(
['core'],
calculations
);
// 3. Run computation
const config = {
COMPUTATION_PASS_TO_RUN: "1",
resultsCollection: 'test_unified_insights'
};
await pipe.computationSystem.runComputationPass(
config,
dependencies,
manifest
);
// 4. Verify results
const results = await db
.collection('test_unified_insights')
.doc('2025-01-15')
.collection('results')
.doc('core')
.collection('computations')
.doc('test-calculation')
.get();
expect(results.exists).toBe(true);
expect(results.data()).toHaveProperty('AAPL');
}7.3 Performance Optimization
7.3.1 Firestore Query Optimization
// BAD: Multiple individual reads
for (const userId of userIds) {
const doc = await db.collection('users').doc(userId).get();
// Process doc
}
// GOOD: Batch read (up to 500 docs)
const docRefs = userIds.map(id => db.collection('users').doc(id));
const snapshots = await db.getAll(...docRefs);
snapshots.forEach(doc => {
// Process doc
});7.3.2 Computation Optimization
// BAD: Repeated calculations
for (const position of positions) {
const instrumentId = DataExtractor.getInstrumentId(position);
const ticker = mappings.instrumentToTicker[instrumentId];
const netProfit = DataExtractor.getNetProfit(position);
// ... repeated work
}
// GOOD: Cache and reuse
const tickerCache = new Map();
for (const position of positions) {
const instrumentId = DataExtractor.getInstrumentId(position);
let ticker = tickerCache.get(instrumentId);
if (!ticker) {
ticker = mappings.instrumentToTicker[instrumentId];
tickerCache.set(instrumentId, ticker);
}
// ... use cached ticker
}7.3.3 Memory Management
// BAD: Loading all data into memory
const allUsers = await loadFullDayMap(config, deps, refs);
// Process 1M+ users at once
// GOOD: Streaming
for await (const chunk of streamPortfolioData(config, deps, dateStr, refs)) {
// Process 50 users at a time
await processChunk(chunk);
// Chunk is garbage collected after processing
}7.4 Common Patterns
7.4.1 Retry with Exponential Backoff
async function fetchWithRetry(url, options, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url, options);
if (response.ok) return response;
if (attempt < maxRetries) {
const backoffMs = Math.pow(2, attempt) * 1000;
await sleep(backoffMs);
}
} catch (error) {
if (attempt === maxRetries) throw error;
}
}
}7.4.2 Rate Limiting
class RateLimiter {
constructor(maxRequests, windowMs) {
this.maxRequests = maxRequests;
this.windowMs = windowMs;
this.requests = [];
}
async acquire() {
const now = Date.now();
// Remove old requests
this.requests = this.requests.filter(
time => now - time < this.windowMs
);
// Check if we can proceed
if (this.requests.length >= this.maxRequests) {
const oldestRequest = this.requests[0];
const waitTime = this.windowMs - (now - oldestRequest);
await sleep(waitTime);
return this.acquire(); // Recursive retry
}
this.requests.push(now);
}
}
// Usage
const limiter = new RateLimiter(10, 60000); // 10 requests per minute
for (const task of tasks) {
await limiter.acquire();
await processTask(task);
}7.4.3 Batch Aggregation Pattern
class BatchAggregator {
constructor(flushCallback, maxSize = 500, maxWaitMs = 5000) {
this.flushCallback = flushCallback;
this.maxSize = maxSize;
this.maxWaitMs = maxWaitMs;
this.items = [];
this.timer = null;
}
add(item) {
this.items.push(item);
if (this.items.length >= this.maxSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.maxWaitMs);
}
}
async flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.items.length === 0) return;
const itemsToFlush = this.items;
this.items = [];
await this.flushCallback(itemsToFlush);
}
}
// Usage
const aggregator = new BatchAggregator(async (items) => {
const batch = db.batch();
for (const item of items) {
batch.set(item.ref, item.data);
}
await batch.commit();
});
// Add items
aggregator.add({ ref: docRef1, data: data1 });
aggregator.add({ ref: docRef2, data: data2 });
// ... auto-flushes when full or after timeout8. Deployment & Operations
8.1 Deployment Process
8.1.1 Module Updates
# 1. Make changes to bulltrackers-module
cd Backend/Core/bulltrackers-module
# 2. Update version
npm version patch # or minor, or major
# 3. Publish to NPM
npm publish
# 4. Update dependent packages
cd ../your-cloud-function
npm install bulltrackers-module@latest
# 5. Deploy cloud function
gcloud functions deploy your-function \
--gen2 \
--runtime=nodejs20 \
--region=us-central1 \
--source=. \
--entry-point=yourEntryPoint \
--trigger-topic=your-topic8.1.2 Calculation Updates
# 1. Update calculation in aiden-shared-calculations-unified
cd aiden-shared-calculations-unified
# 2. Version bump
npm version patch
# 3. Publish
npm publish
# 4. Update bulltrackers-module dependency
cd ../bulltrackers-module
npm install aiden-shared-calculations-unified@latest
npm version patch
npm publish
# 5. Redeploy computation functions
# (They will pick up new calculations automatically)8.1.3 Environment Variables
# Set via Cloud Console or gcloud CLI
gcloud functions deploy computation-pass-1 \
--set-env-vars="COMPUTATION_PASS_TO_RUN=1" \
--set-env-vars="FIRESTORE_PROJECT_ID=your-project" \
--set-env-vars="ETORO_API_URL=https://..."8.2 Monitoring
8.2.1 Key Metrics
Cloud Functions:
- Invocation count per function
- Execution time (p50, p95, p99)
- Error rate
- Memory usage
- Active instances
Firestore:
- Read operations per day
- Write operations per day
- Document count per collection
- Storage usage
Pub/Sub:
- Messages published per topic
- Messages consumed
- Oldest unacked message age
- Dead letter queue size
8.2.2 Logging Queries
# Find errors in last 24 hours
resource.type="cloud_function"
severity="ERROR"
timestamp>="2025-01-15T00:00:00Z"
# Find specific user task
resource.type="cloud_function"
jsonPayload.message=~"userId.*123456"
# Find rate limit errors
resource.type="cloud_function"
jsonPayload.message=~"rate limit|429|too many"
# Computation pass performance
resource.type="cloud_function"
resource.labels.function_name="computation-pass-1"
jsonPayload.message=~"Pass.*finished"8.2.3 Alerting Rules
# Example alert policy
displayName: "Task Engine Error Rate High"
conditions:
- displayName: "Error rate > 5%"
conditionThreshold:
filter: |
resource.type="cloud_function"
resource.labels.function_name="task-engine"
severity="ERROR"
aggregations:
- alignmentPeriod: 300s
perSeriesAligner: ALIGN_RATE
comparison: COMPARISON_GT
thresholdValue: 0.05
duration: 600s
notificationChannels:
- "projects/YOUR_PROJECT/notificationChannels/YOUR_CHANNEL"8.3 Cost Optimization
8.3.1 Firestore Costs
Current Architecture (Optimized):
- Sharded writes: ~50 writes per 500 users = 10¢ per 1M users
- Status tracking: Single global document = $0
- Batch commits: 1 batch per flush = 10× cost reduction
Anti-Patterns to Avoid:
// BAD: Individual writes
for (const user of users) {
await db.collection('data').doc(user.id).set(data);
}
// Cost: N writes = $0.18 per 1000 writes
// GOOD: Batched writes
const batch = db.batch();
for (const user of users) {
batch.set(db.collection('data').doc(user.id), data);
}
await batch.commit();
// Cost: 1 write = $0.000188.3.2 Cloud Function Costs
Optimization Strategies:
- Memory Allocation: Use 512MB for most functions (256MB often causes cold starts)
- Timeout: Set realistic timeouts (5min for task engine, 1min for API)
- Concurrency: Limit concurrent instances to avoid quota exhaustion
# function-config.yaml
availableMemoryMb: 512
timeout: 300s
maxInstances: 100
minInstances: 0 # Set to 1-2 for critical functions8.3.3 API Call Optimization
Current Strategy:
- AppScript proxies: Free (within Google's generous limits)
- Direct calls: Fallback only (minimize usage)
- Batch API calls: 50-100 users per request
Cost Comparison:
AppScript Proxy:
- Cost: $0
- Rate Limit: ~100 requests/min per proxy
- Solution: 10+ proxies = 1000 req/min
Direct GCP IP:
- Cost: $0
- Rate Limit: ~20 requests/min (eToro throttles)
- Solution: Use only as fallback8.4 Disaster Recovery
8.4.1 Backup Strategy
// Automated daily backup (Cloud Scheduler)
const {Firestore} = require('@google-cloud/firestore');
async function backupFirestore() {
const firestore = new Firestore();
const bucket = 'gs://your-backup-bucket';
const timestamp = new Date().toISOString().split('T')[0];
await firestore.export({
collectionIds: [
'NormalUserPortfolios',
'SpeculatorBlocks',
'unified_insights'
],
outputUriPrefix: `${bucket}/firestore-backups/${timestamp}`
});
}8.4.2 Data Corruption Recovery
// Rebuild computation results from historical data
async function rebuildComputations(startDate, endDate) {
const config = {
COMPUTATION_PASS_TO_RUN: "1",
// Force recompute by clearing status
};
// Clear status for date range
const statusRef = db.collection('computation_status').doc('global_status');
const updates = {};
for (let date = startDate; date <= endDate; date.setDate(date.getDate() + 1)) {
const dateStr = date.toISOString().slice(0, 10);
// Delete all calculation statuses for this date
for (const calcName of allCalculationNames) {
updates[`${dateStr}.${calcName}`] = FieldValue.delete();
}
}
await statusRef.update(updates);
// Trigger computation passes
// They will recompute all dates with missing status
}8.4.3 Rollback Procedure
# 1. Identify problematic version
git log --oneline
# 2. Revert to previous version
git revert <commit-hash>
# 3. Republish packages
npm version patch
npm publish
# 4. Redeploy functions
gcloud functions deploy all-functions --source=.
# 5. Verify in logs
gcloud logging read "resource.type=cloud_function" --limit=100