loop = async () => {
while (this.isRunning) {
try {
const rawValue = await reader();
const reading: SensorReading = {
id: crypto.randomUUID(),
tsMonotonic: performance.now() * 1e6,
tsWall: Date.now(),
value: rawValue,
sensorId: 'env-sensor-01'
};
this.pushToBuffer(reading);
this.emit('data', reading);
} catch (err) {
console.error('Read error:', err);
}
await new Promise(res => setTimeout(res, sampleIntervalMs));
}
};
loop();
}
private pushToBuffer(reading: SensorReading): void {
if (this.buffer.length >= this.capacity) {
this.buffer.shift(); // Drop oldest to prevent OOM
}
this.buffer.push(reading);
}
public stop(): void {
this.isRunning = false;
}
}
#### 2. Signal Processing and Feature Engineering
Raw sensor data is noisy. The processing layer applies Exponential Moving Averages (EMA) and rolling statistics to extract features. This reduces the dimensionality of data sent to the inference model and the cloud.
**Rationale:** EMA provides noise reduction with O(1) memory, crucial for constrained environments. Rolling stats capture trend information without storing full history.
```typescript
// processing/signal-processor.ts
export interface ProcessedFeature {
ts: number;
ema: number;
rollingMean: number;
rollingVar: number;
rateOfChange: number;
}
export class SignalProcessor {
private emaState: number | null = null;
private window: number[] = [];
private readonly windowSize: number;
private readonly alpha: number;
private lastValue: number | null = null;
constructor(windowSize: number = 60, alpha: number = 0.2) {
this.windowSize = windowSize;
this.alpha = alpha;
}
public process(value: number, ts: number): ProcessedFeature {
// EMA Calculation
this.emaState = this.emaState === null
? value
: this.alpha * value + (1 - this.alpha) * this.emaState;
// Window Management
this.window.push(value);
if (this.window.length > this.windowSize) {
this.window.shift();
}
// Rolling Stats
const sum = this.window.reduce((a, b) => a + b, 0);
const mean = sum / this.window.length;
const variance = this.window.reduce((acc, v) => acc + Math.pow(v - mean, 2), 0) / (this.window.length - 1 || 1);
// Rate of Change
const roc = this.lastValue !== null ? value - this.lastValue : 0;
this.lastValue = value;
return {
ts,
ema: this.emaState!,
rollingMean: mean,
rollingVar: variance,
rateOfChange: roc
};
}
}
3. On-Device Inference
Inference must be lightweight. Models should be quantized (INT8) to minimize latency and memory footprint. The runtime should wrap the model to handle feature mapping and thresholding.
Architecture Decision: Use a wrapper class that abstracts the inference engine (e.g., ONNX Runtime Micro or TFLite). This allows swapping the backend without changing business logic.
// inference/micro-classifier.ts
export interface InferenceResult {
score: number;
isAnomaly: boolean;
confidence: number;
}
export class MicroClassifier {
private weights: Float32Array;
private bias: number;
private threshold: number;
constructor(weights: number[], bias: number, threshold: number) {
this.weights = new Float32Array(weights);
this.bias = bias;
this.threshold = threshold;
}
public predict(features: { mean: number; var: number; roc: number }): InferenceResult {
const input = new Float32Array([features.mean, features.var, features.roc]);
// Linear combination (simplified for example; real impl uses matrix ops)
let score = this.bias;
for (let i = 0; i < input.length; i++) {
score += input[i] * this.weights[i];
}
// Sigmoid approximation for confidence
const confidence = 1 / (1 + Math.exp(-score));
return {
score,
isAnomaly: score > this.threshold,
confidence
};
}
}
4. Resilient Synchronization and Storage
The sync layer must handle intermittent connectivity with exponential backoff, jitter, and idempotency. Data is stored locally in a WAL format and flushed in batches. Critical events (anomalies) are prioritized.
Rationale: Idempotency prevents duplicate data corruption during retries. Jitter prevents thundering herd issues when connectivity restores. Priority queues ensure critical alerts are sent first.
// sync/sync-manager.ts
import { EventEmitter } from 'events';
export interface BatchPayload {
batchId: string;
events: any[];
priority: 'normal' | 'critical';
timestamp: number;
}
export class SyncManager extends EventEmitter {
private queue: BatchPayload[] = [];
private isSyncing = false;
private maxRetries = 5;
private baseDelay = 1000;
public enqueue(batch: BatchPayload): void {
this.queue.push(batch);
// Sort by priority: critical first
this.queue.sort((a, b) =>
a.priority === 'critical' && b.priority !== 'critical' ? -1 : 1
);
this.triggerSync();
}
private async triggerSync(): Promise<void> {
if (this.isSyncing || this.queue.length === 0) return;
this.isSyncing = true;
while (this.queue.length > 0) {
const batch = this.queue[0];
const success = await this.attemptUpload(batch);
if (success) {
this.queue.shift();
this.emit('synced', batch.batchId);
} else {
// Exponential backoff with jitter
const attempt = this.maxRetries - this.queue[0]['retryCount'] || 0;
const delay = Math.min(this.baseDelay * Math.pow(2, attempt), 60000);
const jitter = Math.random() * 500;
await new Promise(res => setTimeout(res, delay + jitter));
if (!this.queue[0]['retryCount']) this.queue[0]['retryCount'] = 0;
this.queue[0]['retryCount']++;
if (this.queue[0]['retryCount'] >= this.maxRetries) {
console.warn('Batch dropped after max retries:', batch.batchId);
this.queue.shift();
}
}
}
this.isSyncing = false;
}
private async attemptUpload(batch: BatchPayload): Promise<boolean> {
// Simulate MQTT/HTTP upload
// In production, use a client with TLS and mTLS
console.log(`Uploading batch ${batch.batchId} (${batch.events.length} events)`);
return Math.random() > 0.2; // Simulate 80% success rate
}
}
Pitfall Guide
-
Clock Drift and NTP Jitter
- Explanation: Relying solely on wall-clock time causes timestamp jumps when NTP syncs, breaking time-series queries and sequence ordering.
- Fix: Use a monotonic clock for event sequencing and store the offset to wall-clock time. Apply corrections only during low-activity windows.
-
Storage Exhaustion and Log Rotations
- Explanation: Edge devices have limited storage. Without retention policies, WAL files grow until the filesystem fills, causing the OS to kill processes or corrupt data.
- Fix: Implement strict size-based and time-based retention. Use a circular buffer for the WAL and delete chunks only after successful cloud acknowledgment.
-
Retry Storms and Thundering Herds
- Explanation: When connectivity returns, all devices attempt to sync simultaneously, overwhelming the broker or cloud endpoint.
- Fix: Implement jitter in backoff algorithms. Add a random delay to the retry interval. Use circuit breakers to pause sync if the cloud returns 5xx errors.
-
Model Staleness and Drift
- Explanation: Environmental conditions change over time. A static model trained on historical data may degrade, leading to false positives or missed anomalies.
- Fix: Deploy model versioning. Run new models in "shadow mode" alongside the production model to compare outputs before switching. Support OTA updates for model weights.
-
Power Spikes During Inference
- Explanation: Inference bursts can cause current spikes that trigger brownouts on battery-powered devices, especially if the power supply lacks sufficient capacitance.
- Fix: Profile power consumption. Schedule inference during charging windows if possible. Use duty cycling to throttle inference frequency based on battery level.
-
Idempotency Failures
- Explanation: Network retries can cause duplicate messages. If the backend processes duplicates, metrics become inflated, and alerts may fire repeatedly.
- Fix: Generate unique batch IDs at the edge. Ensure the cloud ingestion endpoint checks for duplicate IDs and discards replays.
-
Silent Sensor Failures
- Explanation: A sensor may stop returning valid data but not crash the process. The pipeline continues to stream stale or zero values, masking hardware faults.
- Fix: Implement synthetic health checks. Inject known values periodically or monitor signal variance. Alert if variance drops below a noise floor threshold, indicating a stuck sensor.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-Frequency Vibration Data | Edge Inference + Event Streaming | Raw data volume is too high; only anomalies matter. | Higher edge compute cost; massive bandwidth savings. |
| Low-Battery Soil Moisture Node | Edge Aggregation + Periodic Sync | Inference may drain battery; aggregation reduces payload size. | Low compute cost; low bandwidth cost; moderate latency. |
| Critical Safety Monitoring | Edge Inference + Priority Queue | Immediate local action required; cloud latency unacceptable. | High reliability cost; requires robust hardware. |
| Development/Debugging | Raw Streaming + Local Buffer | Need full data visibility for model tuning and debugging. | High bandwidth cost; low edge logic complexity. |
Configuration Template
# edge-config.yaml
device:
id: "edge-node-42"
region: "us-west-2"
sensors:
- id: "temp-01"
type: "thermistor"
sample_rate_ms: 1000
calibration_curve: "polynomial_3rd"
processing:
ema_alpha: 0.15
window_size: 60
outlier_threshold_sigma: 3.0
inference:
model_path: "/models/anomaly_v2.int8"
threshold: 0.75
shadow_mode: false
quantization: "int8"
sync:
protocol: "mqtt"
broker: "tls://broker.example.com:8883"
batch_size: 100
max_retries: 5
base_delay_ms: 1000
jitter_ms: 500
priority_events: ["anomaly", "hardware_fault"]
storage:
wal_path: "/data/wal"
max_size_mb: 512
retention_hours: 72
observability:
metrics_port: 9090
health_check_interval_s: 30
log_level: "info"
Quick Start Guide
- Initialize the Project: Set up a TypeScript project with dependencies for MQTT, hardware I/O, and model inference. Configure the
edge-config.yaml with your sensor parameters and cloud endpoints.
- Wire the Ingestion Loop: Instantiate
TelemetryCollector with your sensor driver. Connect the data event to SignalProcessor to generate features.
- Deploy the Inference Model: Load your quantized model into
MicroClassifier. Connect processed features to the predictor. Route isAnomaly: true results to SyncManager with priority: 'critical'.
- Configure Sync and Storage: Initialize
SyncManager with your MQTT client and WAL store. Ensure idempotency keys are attached to every batch. Set retention policies to match your storage constraints.
- Validate Resilience: Simulate network outages by blocking the broker port. Verify data accumulates in the WAL without loss. Restore connectivity and confirm batches upload with correct ordering and no duplicates. Monitor metrics for latency and energy usage.