elemetryClient } from '@codcompass/data-observability';
const telemetryClient = new WarehouseTelemetryClient({
provider: 'snowflake',
credentials: {
account: process.env.SF_ACCOUNT,
role: 'LINEAGE_OBSERVER', // Read-only system role
warehouse: 'ANALYTICS_WH'
},
retentionPolicy: {
scanWindowDays: 90,
incrementalSync: true
}
});
// Fetch DDL/DML execution metadata
async function ingestQueryHistory(startDate: Date, endDate: Date) {
const metadata = await telemetryClient.fetchExecutionLogs({
queryTypes: ['CREATE_TABLE_AS_SELECT', 'INSERT', 'MERGE', 'ALTER_TABLE'],
timeRange: { start: startDate, end: endDate },
includeExecutionPlan: false
});
return metadata.map(log => ({
queryId: log.query_id,
sourceObjects: log.referenced_tables,
targetObject: log.target_table,
transformationType: log.query_type,
executionStatus: log.status,
timestamp: log.start_time
}));
}
**Why this works:** Reading `account_usage` or equivalent system tables avoids application-level instrumentation. The `LINEAGE_OBSERVER` role enforces least privilege. Incremental sync prevents redundant processing and aligns with warehouse query history retention windows.
### Step 2: Asset Classification & Business Mapping
Technical dependencies alone fail compliance audits. You must bind catalog objects to business classifications, ownership records, and regulatory tags.
```typescript
import { ClassificationEngine } from '@codcompass/data-governance';
const classifier = new ClassificationEngine({
storage: 'postgresql',
connectionUri: process.env.CLASSIFICATION_DB_URI
});
async function registerComplianceDomains() {
await classifier.upsertAssets([
{
catalogPath: 'PROD_DB.RAW.CUSTOMER_PROFILES',
classification: 'restricted',
regulatoryTags: ['gdpr', 'ccpa', 'pii'],
steward: 'data-platform@company.io',
maskingPolicy: 'HASH_EMAIL'
},
{
catalogPath: 'PROD_DB.ANALYTICS.FINANCIAL_SUMMARY',
classification: 'confidential',
regulatoryTags: ['sox', 'financial'],
steward: 'finance-data@company.io',
maskingPolicy: 'NONE'
}
]);
}
Why this works: Classifications are stored separately from execution logs. This decouples governance metadata from engineering telemetry, allowing compliance teams to update tags without triggering pipeline redeployments. The steward field creates accountability, which auditors require.
Step 3: Graph Construction & Backfill
Lineage is a directed acyclic graph (DAG). Store it in a graph database for efficient traversal. Configure an initial backfill to reconstruct historical dependencies, then switch to incremental polling.
import { GraphStore } from '@codcompass/lineage-graph';
const graph = new GraphStore({
driver: 'neo4j',
uri: process.env.NEO4J_URI,
credentials: { username: 'lineage_admin', password: process.env.NEO4J_PASS }
});
async function buildLineageDAG(executionLogs: any[]) {
const session = graph.driver.session();
try {
await session.executeWrite(tx => {
const query = `
UNWIND $logs AS log
MERGE (src:Table {path: log.source})
MERGE (tgt:Table {path: log.target})
MERGE (src)-[r:DERIVES_FROM]->(tgt)
SET r.transformationType = log.transformationType,
r.lastUpdated = log.timestamp,
r.executionStatus = log.executionStatus
`;
return tx.run(query, { logs: executionLogs });
});
} finally {
await session.close();
}
}
// Scheduler configuration
const lineageScheduler = {
intervalMinutes: 15,
backfillDays: 90,
retryPolicy: { maxAttempts: 3, backoffMs: 5000 }
};
Why this works: Neo4j (or equivalent) optimizes path traversal. The MERGE pattern prevents duplicate edges while updating execution metadata. The scheduler runs as a background job, not a blocking process. Backfill reconstructs historical context, which is critical for compliance windows.
Step 4: Compliance Query & Exposure Detection
Auditors and security teams need programmatic access to trace data flow paths, verify masking policies, and detect unauthorized exposure.
async function traceExposurePaths(targetTable: string, maxDepth: number) {
const session = graph.driver.session();
try {
const result = await session.run(`
MATCH path = (start:Table {path: $target})<-[:DERIVES_FROM*1..$depth]-(upstream:Table)
WHERE upstream.classification IN ['restricted', 'confidential']
RETURN path, upstream.path AS sourcePath, upstream.regulatoryTags AS tags
`, { target: targetTable, depth: maxDepth });
return result.records.map(record => ({
path: record.get('path').segments.map(s => s.startNode.properties.path),
source: record.get('sourcePath'),
tags: record.get('tags'),
riskLevel: record.get('tags').includes('pii') ? 'HIGH' : 'MEDIUM'
}));
} finally {
await session.close();
}
}
// Usage
const exposureReport = await traceExposurePaths('PROD_DB.ANALYTICS.FINANCIAL_SUMMARY', 5);
console.log(JSON.stringify(exposureReport, null, 2));
Why this works: Cypher (or equivalent graph query language) efficiently traverses upstream dependencies. Filtering by classification tags isolates compliance-relevant paths. The maxDepth parameter prevents unbounded traversal in large warehouses.
Pitfall Guide
1. Proxy Interception Overhead
Explanation: Routing queries through a middleware proxy to capture lineage adds network latency, breaks TLS termination, and complicates connection pooling.
Fix: Use native system tables (account_usage, INFORMATION_SCHEMA, query_log) for read-only introspection. Never sit between the application and the database.
Explanation: Tracking only column-to-column dependencies misses execution status, job versions, and failure states. This leaves incident response blind to why a transformation broke.
Fix: Ingest execution logs alongside schema changes. Store executionStatus, jobVersion, and runTimestamp as edge properties in the graph.
3. Static Tagging Without Ownership
Explanation: Regulatory tags drift when no steward is assigned. Auditors reject lineage maps that lack accountability records.
Fix: Bind every classified asset to an active steward email or team alias. Enforce quarterly review cycles via governance workflows.
4. Skipping Historical Backfill
Explanation: Starting the crawler today leaves a compliance gap for the past 30β90 days. Auditors require continuous coverage, not point-in-time snapshots.
Fix: Configure an initial backfill window aligned with warehouse retention policies. Validate completeness by comparing backfilled edge counts against DDL audit logs.
5. Over-Indexing Transient Queries
Explanation: Logging every SELECT or temporary table creation creates graph noise, slows traversal, and inflates storage costs.
Fix: Filter ingestion to DDL and materialized DML (CREATE, INSERT, MERGE, ALTER). Exclude session-scoped temp tables and ad-hoc analytics queries.
6. Hardcoding String Path Matching
Explanation: Relying on string comparison for table names breaks when schemas are renamed or databases are migrated.
Fix: Resolve lineage using internal catalog object IDs where available. Fall back to normalized database.schema.table paths with case-insensitive matching.
Explanation: Lineage shows data flow but not data exposure. Auditors require proof of who accessed sensitive columns and when.
Fix: Integrate IAM/role usage logs alongside query history. Map grantee and privilege_type to graph nodes to reconstruct access paths.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Single cloud warehouse (Snowflake/BigQuery) | Native query history introspection | Zero latency, leverages platform telemetry, simple IAM setup | Low (storage only) |
| Hybrid/on-prem + cloud | ETL metadata extraction + graph sync | On-prem systems lack unified query logs; requires connector layer | Medium (connector licensing) |
| Heavy dbt/Airflow dependency | DAG parsing + execution log ingestion | dbt already tracks model dependencies; Airflow provides run status | Low (open-source tooling) |
| Strict compliance (SOX/HIPAA) | Introspection + access log integration | Auditors require data flow + access proof; graph traversal satisfies both | Medium (log storage + graph compute) |
Configuration Template
# lineage-engine.config.yaml
telemetry:
provider: snowflake
role: LINEAGE_OBSERVER
scan_interval_minutes: 15
backfill_days: 90
query_filters:
- CREATE_TABLE_AS_SELECT
- INSERT
- MERGE
- ALTER_TABLE
graph:
type: neo4j
uri: ${NEO4J_URI}
credentials: ${NEO4J_AUTH}
indexes:
- property: path
type: BTREE
- property: lastUpdated
type: BTREE
compliance:
classification_db: postgresql
steward_enforcement: true
review_cycle_days: 90
risk_thresholds:
pii_exposure: HIGH
financial_data: MEDIUM
public_data: LOW
monitoring:
slack_webhook: ${SLACK_ALERT_URI}
alert_on:
- unmasked_pii_path
- failed_backfill
- steward_missing
Quick Start Guide
- Provision credentials: Create a read-only warehouse role with access to system metadata tables. Export connection strings for the telemetry client and graph database.
- Deploy the engine: Run the lineage service container or serverless function. Mount the configuration template and inject environment variables.
- Execute backfill: Trigger the initial 90-day historical scan. Monitor logs for edge ingestion rate and graph commit success.
- Validate coverage: Run a test traversal against a known sensitive table. Verify upstream paths, classification tags, and masking policy references match expected values.
- Enable scheduling: Activate the 15-minute incremental sync. Configure Slack/email alerts for HIGH risk exposure paths and backfill failures. Lineage is now live and audit-ready.