true; required: false };
}
interface ParsedLog {
timestamp: Date;
level: string;
service: string;
traceId?: string;
message: string;
userId?: string;
_raw?: string;
_parseError?: string;
}
// --- Schema Compiler ---
class SchemaCompiler {
private regexCache: Map<string, RegExp> = new Map();
compilePattern(pattern: string): RegExp {
if (!this.regexCache.has(pattern)) {
this.regexCache.set(pattern, new RegExp(pattern));
}
return this.regexCache.get(pattern)!;
}
}
// --- High-Performance Parser ---
export class LogParser extends Transform {
private schema: LogSchema;
private compiler: SchemaCompiler;
private piiMasker: (val: string) => string;
constructor(schema: LogSchema, options = {}) {
super({ objectMode: true, ...options });
this.schema = schema;
this.compiler = new SchemaCompiler();
this.piiMasker = (val: string) => val.replace(/./g, '').slice(0, 4) + '***';
}
_transform(chunk: any, encoding: string, callback: Function) {
const raw = typeof chunk === 'string' ? chunk : chunk.toString();
try {
// Fast path: JSON detection
if (raw.trim().startsWith('{')) {
const parsed = JSON.parse(raw);
const normalized = this.normalize(parsed);
this.push(normalized);
} else {
// Slow path: Key-Value or structured text extraction
const extracted = this.extractKV(raw);
const normalized = this.normalize(extracted);
this.push(normalized);
}
callback();
} catch (err) {
// Graceful degradation: push raw log with error metadata
this.push({
_raw: raw,
_parseError: err instanceof Error ? err.message : 'Unknown parse error',
timestamp: new Date(),
level: 'ERROR',
service: 'unknown',
message: 'Parse failure'
});
callback();
}
}
private normalize(data: Record<string, any>): ParsedLog {
const result: any = {};
for (const [key, def] of Object.entries(this.schema)) {
let value = data[key];
if (value === undefined || value === null) {
if (def.required) {
throw new Error(`Missing required field: ${key}`);
}
if ('default' in def) {
value = def.default;
} else {
continue;
}
}
// Type coercion
switch (def.type) {
case 'date':
value = new Date(value);
if (isNaN(value.getTime())) throw new Error(`Invalid date format for ${key}`);
break;
case 'enum':
if (!def.values.includes(value)) {
throw new Error(`Invalid enum value for ${key}: ${value}`);
}
break;
case 'string':
value = String(value);
if (def.pattern) {
const regex = this.compiler.compilePattern(def.pattern);
if (!regex.test(value)) {
throw new Error(`Pattern mismatch for ${key}`);
}
}
break;
}
// PII Masking
if (def.pii && value) {
value = this.piiMasker(value);
}
result[key] = value;
}
return result as ParsedLog;
}
private extractKV(text: string): Record<string, any> {
// Optimized KV extraction: splits on common delimiters
// Avoids regex backtracking by using simple string operations
const result: Record<string, any> = {};
const pairs = text.split(/\s+(?=\w+=)/);
for (const pair of pairs) {
const eqIdx = pair.indexOf('=');
if (eqIdx > -1) {
const key = pair.substring(0, eqIdx);
let value = pair.substring(eqIdx + 1);
// Remove surrounding quotes if present
if (value.startsWith('"') && value.endsWith('"')) {
value = value.slice(1, -1);
}
result[key] = value;
}
}
return result;
}
}
// --- Usage Example ---
const schema: LogSchema = {
timestamp: { type: 'date', format: 'ISO8601' },
level: { type: 'enum', values: ['DEBUG', 'INFO', 'WARN', 'ERROR'], default: 'INFO' },
service: { type: 'string', required: true },
traceId: { type: 'string', pattern: '^[a-f0-9]{32}$', required: false },
message: { type: 'string', required: true },
userId: { type: 'string', pii: true, required: false },
};
const parser = new LogParser(schema);
// Simulate input stream
const input = Readable.from([
JSON.stringify({ timestamp: '2023-10-27T10:00:00Z', level: 'INFO', service: 'auth', message: 'Login success', userId: 'user-12345' }),
'timestamp=2023-10-27T10:00:01Z level=WARN service=payment traceId=aabbccdd11223344 message=Retry attempt userId=user-67890',
'INVALID LOG LINE',
]);
input.pipe(parser).pipe(new Writable({
write(chunk, _, cb) {
console.log(JSON.stringify(chunk, null, 2));
cb();
}
}));
### Key Implementation Details
1. **Regex Caching:** The `SchemaCompiler` caches compiled regex patterns. This avoids the overhead of recompiling patterns for every log line, reducing CPU usage significantly.
2. **KV Extraction Optimization:** The `extractKV` method uses string splitting rather than regex for key-value pairs. This is faster and immune to backtracking issues common in regex-based KV parsers.
3. **Schema Validation:** Required fields, enums, and patterns are validated during normalization. Invalid logs are caught early with descriptive errors.
4. **PII Handling:** PII masking is applied during normalization, ensuring sensitive data is redacted before logs leave the parser. This reduces compliance risk.
## Pitfall Guide
### 1. Catastrophic Regex Backtracking
**Mistake:** Using complex regex patterns with nested quantifiers (e.g., `(a+)+`) on untrusted input.
**Impact:** A single malformed log line can consume 100% CPU for seconds, stalling the collector and causing data loss.
**Best Practice:** Use possessive quantifiers or atomic grouping where supported. Prefer schema-compiled parsers or simple string operations. Test regex patterns with tools like `regex101` to detect backtracking.
### 2. Schema Drift Blindness
**Mistake:** Deploying parsers without monitoring for schema violations.
**Impact:** Application updates add new fields or change formats. The parser silently drops data or produces incomplete records. Teams lose visibility during critical deployments.
**Best Practice:** Implement schema versioning. Route logs that fail validation to a dead-letter queue with a `schema_violation` tag. Set alerts on schema violation rates.
### 3. Timestamp Ambiguity
**Mistake:** Parsing timestamps without explicit timezone or format validation.
**Impact:** Logs from different regions or legacy systems arrive with mixed formats (ISO8601, epoch, custom). Analysis queries return incorrect time ranges, making correlation impossible.
**Best Practice:** Enforce ISO8601 in schemas. Reject or flag logs with unparseable timestamps. Normalize all timestamps to UTC at parse time.
### 4. High Cardinality Explosion
**Mistake:** Extracting unbounded fields (e.g., user IDs, request URLs) as indexable tags.
**Impact:** The observability backend creates millions of unique series. Query performance degrades, and storage costs spike. Some backends enforce hard limits on cardinality, causing ingestion errors.
**Best Practice:** Only extract low-cardinality fields as tags. Store high-cardinality data in the message body or use sampling. Hash or bucket high-cardinality values if aggregation is needed.
### 5. Blocking the Event Loop
**Mistake:** Running CPU-intensive parsing synchronously in the main thread.
**Impact:** The collector cannot accept new connections or flush buffers. Memory usage grows until OOM kill.
**Best Practice:** Use worker threads for parsing. Implement backpressure handling. Limit the size of the parsing queue to prevent memory exhaustion.
### 6. Ignoring Log Sampling
**Mistake:** Parsing and storing every log line regardless of severity or source.
**Impact:** Wasted resources on debug logs or health checks. High costs with diminishing returns on analysis value.
**Best Practice:** Implement sampling strategies. Drop or sample low-severity logs from high-volume services. Retain full fidelity for error logs and traces.
### 7. PII Leakage in Debug Mode
**Mistake:** Enabling verbose debug logging that includes raw request bodies or tokens.
**Impact:** Sensitive data appears in observability tools, violating GDPR/CCPA. Parsers that do not mask data propagate the leakage.
**Best Practice:** Integrate PII detection into the parser. Use allowlists for safe fields. Regularly audit log outputs for sensitive patterns.
## Production Bundle
### Action Checklist
- [ ] **Define Schema:** Create a schema definition for each log source, including types, required fields, and PII flags.
- [ ] **Benchmark Parsers:** Measure CPU and latency of regex vs. compiled parsers using production log samples.
- [ ] **Implement Backpressure:** Ensure the parsing pipeline handles traffic spikes without dropping data or crashing.
- [ ] **Add Dead-Letter Routing:** Configure a fallback path for logs that fail parsing to prevent data loss.
- [ ] **Enable PII Masking:** Verify that all PII fields are masked before logs are shipped to the backend.
- [ ] **Monitor Schema Violations:** Set up alerts for spikes in schema violation rates to catch application regressions.
- [ ] **Test Regex Safety:** Audit all regex patterns for catastrophic backtracking using stress testing tools.
- [ ] **Validate Timestamps:** Ensure all timestamps are normalized to UTC and formatted consistently.
### Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|----------|---------------------|-----|-------------|
| **New Microservices** | Native JSON + Schema | Zero parsing overhead; type-safe; easy evolution. | Low storage; low CPU. |
| **Legacy Monolith** | Compiled KV Parser | Faster than regex; handles key-value text efficiently. | Medium storage; low CPU. |
| **Complex Unstructured** | Grok/Pattern DSL | Flexible pattern matching for irregular formats. | Medium CPU; high maintenance. |
| **High Compliance** | Schema + PII Masking | Enforces data governance; prevents leakage. | Low risk cost; medium CPU. |
| **Cost Constraints** | Sampling + Structured | Reduces volume; improves compression. | Low storage; low CPU. |
### Configuration Template
TypeScript configuration for a production-ready log pipeline using the parser above.
```typescript
// pipeline.config.ts
import { LogParser } from './LogParser';
export const logPipelineConfig = {
// Schema definitions per service
schemas: {
auth: {
timestamp: { type: 'date', format: 'ISO8601' },
level: { type: 'enum', values: ['DEBUG', 'INFO', 'WARN', 'ERROR'] },
service: { type: 'string', required: true },
traceId: { type: 'string', pattern: '^[a-f0-9]{32}$' },
message: { type: 'string', required: true },
userId: { type: 'string', pii: true },
},
payment: {
timestamp: { type: 'date', format: 'ISO8601' },
level: { type: 'enum', values: ['INFO', 'WARN', 'ERROR'] },
service: { type: 'string', required: true },
transactionId: { type: 'string', required: true },
amount: { type: 'string', pattern: '^[0-9]+\\.[0-9]{2}$' },
message: { type: 'string', required: true },
},
},
// Parser settings
parser: {
batchSize: 1000,
timeoutMs: 5000,
maxRetries: 3,
deadLetterQueue: 'logs-dlq',
enablePiiMasking: true,
timestampNormalization: 'UTC',
},
// Sampling rules
sampling: {
debugLogs: { rate: 0.1 }, // Sample 10% of debug logs
healthChecks: { drop: true }, // Drop health check logs
},
};
Quick Start Guide
- Install Dependencies:
npm install stream @types/node
- Define Schema: Create a
schema.ts file defining your log structure using the LogSchema interface.
- Instantiate Parser:
import { LogParser } from './LogParser';
import { schema } from './schema';
const parser = new LogParser(schema);
- Pipe Input: Connect your log source to the parser stream.
logSourceStream.pipe(parser).pipe(outputStream);
- Verify Output: Run the pipeline with sample logs. Check that fields are extracted, types are coerced, PII is masked, and errors are routed to the dead-letter queue. Monitor CPU usage to confirm performance gains.
Log parsing is the foundation of reliable observability. By adopting schema-driven, compiled parsing strategies, engineering teams eliminate regex debt, reduce infrastructure costs, and ensure data integrity across the pipeline. Implement these patterns to build observability systems that scale with your architecture.