idirectional WAL Stream**: The pipeline attaches to PostgreSQL via logical replication slots, tailing the Write-Ahead Log (WAL) using the pgoutput protocol. This captures committed transactions without impacting query execution or requiring additional load on the source database.
2. Schema-Aware Parquet Serialization: Row-level changes are transformed into columnar Parquet format. The writer maintains schema evolution awareness, partitioning data by ingestion windows and applying background compaction. This optimizes storage layout for analytical scans while preserving immutability.
3. IAM-Only Access: Output is written to append-only object storage (e.g., S3-compatible buckets). No database credentials are required; access is mediated exclusively through IAM roles and temporary session tokens. This enforces least-privilege access at the infrastructure level.
4. Catalog-Driven Governance: Consumers interact with a governed catalog that enforces access via column-level tags, row-level filters, and audit logging. Every read operation is tracked, and policies are applied dynamically based on metadata rather than static grants.
Implementation Example
The following TypeScript examples demonstrate the configuration of a governed data plane and the agent-side client interface. This replaces direct database connections with a secure, catalog-mediated query surface.
Pipeline Configuration
import { z } from 'zod';
// Zod schema for pipeline configuration validation
const PipelineConfigSchema = z.object({
source: z.object({
host: z.string(),
port: z.number().default(5432),
database: z.string(),
replicationSlot: z.string().min(1),
publicationName: z.string().min(1),
walKeepSize: z.string().default('1GB'),
maxSlotLag: z.number().default(1024 * 1024 * 100), // 100MB
}),
sink: z.object({
storageUri: z.string().url(),
format: z.enum(['parquet']),
compression: z.enum(['zstd', 'snappy', 'gzip']).default('zstd'),
partitionStrategy: z.enum(['hourly', 'daily']).default('hourly'),
compaction: z.object({
targetFileSize: z.number().default(128 * 1024 * 1024), // 128MB
maxSmallFiles: z.number().default(50),
schedule: z.string().default('0 */6 * * *'), // Every 6 hours
}),
}),
governance: z.object({
catalogUri: z.string().url(),
tagEnforcement: z.boolean().default(true),
auditLogging: z.boolean().default(true),
}),
});
type PipelineConfig = z.infer<typeof PipelineConfigSchema>;
// Factory function to create validated pipeline config
export function createPipelineConfig(raw: unknown): PipelineConfig {
return PipelineConfigSchema.parse(raw);
}
Agent Query Client
import { GovernedCatalogClient } from './catalog-client';
import { IAMTokenProvider } from './iam-provider';
interface QueryPolicy {
allowedColumns: string[];
rowFilters?: Record<string, unknown>;
maxScanSize: number;
}
export class AgentDataClient {
private catalog: GovernedCatalogClient;
private tokenProvider: IAMTokenProvider;
constructor(catalogUri: string, tokenProvider: IAMTokenProvider) {
this.catalog = new GovernedCatalogClient(catalogUri);
this.tokenProvider = tokenProvider;
}
/**
* Executes a query against the governed data plane.
* The catalog enforces column-level tags and row-level filters.
* No database credentials are used; access is mediated via IAM.
*/
async executeQuery(
tableName: string,
query: string,
policy: QueryPolicy
): Promise<Record<string, unknown>[]> {
// 1. Obtain ephemeral IAM token for storage access
const token = await this.tokenProvider.getTemporaryToken({
resource: `arn:aws:s3:::data-lake/${tableName}`,
actions: ['s3:GetObject'],
});
// 2. Resolve table metadata and enforce governance tags
const metadata = await this.catalog.resolveTable(tableName);
this.validatePolicy(metadata, policy);
// 3. Execute scan against Parquet files via catalog
const results = await this.catalog.scan(
metadata.location,
query,
{
token,
rowFilters: policy.rowFilters,
projection: policy.allowedColumns,
}
);
// 4. Audit logging is handled by the catalog automatically
return results;
}
private validatePolicy(
metadata: TableMetadata,
policy: QueryPolicy
): void {
const unauthorizedColumns = policy.allowedColumns.filter(
(col) => !metadata.tags.includes(`column:${col}`)
);
if (unauthorizedColumns.length > 0) {
throw new Error(
`Access denied for columns: ${unauthorizedColumns.join(', ')}`
);
}
}
}
interface TableMetadata {
location: string;
tags: string[];
schema: Record<string, string>;
}
Rationale:
- Zod Validation: Ensures configuration integrity at startup, preventing runtime failures due to malformed settings.
- Ephemeral Tokens: The agent never holds long-lived credentials. Tokens are scoped to specific resources and actions, reducing the impact of token leakage.
- Catalog Mediation: The
GovernedCatalogClient abstracts storage details and enforces policies. This allows governance to evolve independently of the agent code.
- Policy Validation: Client-side checks provide fast feedback, but the catalog enforces policies server-side, ensuring defense in depth.
Pitfall Guide
1. Replication Slot Exhaustion
PostgreSQL replication slots retain WAL segments until consumed. If the CDC pipeline falls behind or experiences downtime, the slot lag grows, eventually consuming disk space and potentially crashing the database.
- Fix: Monitor
pg_replication_slots for restart_lsn and confirmed_flush_lsn. Implement alerting when lag exceeds thresholds. Configure wal_keep_size conservatively and use max_slot_lag to drop slots that exceed acceptable lag, preventing disk exhaustion.
2. Schema Drift Catastrophe
DDL operations like ALTER TABLE or column drops can break logical replication if not handled. The pipeline may fail to parse new WAL entries, causing ingestion to halt.
- Fix: Implement schema versioning and evolution strategies. Use additive changes only where possible. Integrate a schema registry to validate DDL changes against pipeline capabilities. Partition Parquet layouts by schema version to isolate breaking changes.
3. The Small File Spiral
Frequent micro-batches can generate thousands of small Parquet files, degrading query performance and increasing storage costs.
- Fix: Tune compaction thresholds based on ingestion rate. Set
targetFileSize to 128MB or higher. Schedule compaction jobs to run during off-peak hours. Monitor file count and size distribution to detect drift.
4. IAM Policy Bloat
Replacing database credentials with IAM roles requires strict least-privilege enforcement. Broad policies like s3:GetObject on the entire bucket defeat the air-gap model.
- Fix: Scope policies to specific prefixes, tags, and read-only actions. Use IAM policy validation in CI/CD to catch over-permissive configurations. Rotate session tokens frequently and monitor for anomalous access patterns.
5. Temporal Mismatch
CDC introduces eventual consistency, typically with 2-5 seconds of lag. Agents requiring sub-second transactional accuracy may encounter stale reads, leading to hallucinations or incorrect business logic.
- Fix: Architect agents for eventual consistency explicitly. Use time-travel queries to query historical snapshots. Implement fallback mechanisms for critical paths that require strong consistency, such as direct API calls for specific operations.
Without column-level tagging and catalog registration during the Parquet write phase, downstream access control and audit trails cannot be enforced. Retroactive tagging is error-prone and incomplete.
- Fix: Enforce schema-first tagging. Integrate metadata extraction into the serialization pipeline. Validate tags against governance policies before writing to storage. Use automated tools to scan and tag legacy data.
7. Compression Codec Misalignment
Choosing the wrong compression codec can impact query performance and storage costs. ZSTD offers high compression but higher CPU overhead, while SNAPPY is faster but less efficient.
- Fix: Benchmark codecs against your workload. Use ZSTD for cold storage and SNAPPY for hot data. Configure codec selection based on data volatility and access patterns. Monitor CPU utilization during compaction to avoid resource contention.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Real-Time Trading | Direct DB/Proxy | Latency critical; sub-ms response required | High Risk, Low Latency Cost |
| AI Agent Analytics | CDC Air-Gap | Safety critical; blast radius containment | Low Risk, Moderate Latency Cost |
| Compliance Audit | CDC Air-Gap | Immutable trail; catalog-driven governance | Medium Cost, High Compliance Value |
| Ad-Hoc Reporting | CDC Air-Gap | No impact on OLTP; scalable analytics | Low Risk, Storage Cost |
Configuration Template
// production-pipeline.config.ts
import { createPipelineConfig } from './pipeline-config';
export const productionConfig = createPipelineConfig({
source: {
host: 'prod-db.internal',
port: 5432,
database: 'app_production',
replicationSlot: 'cdc_pipeline_slot',
publicationName: 'cdc_publication',
walKeepSize: '2GB',
maxSlotLag: 209715200, // 200MB
},
sink: {
storageUri: 's3://governed-data-lake/parquet',
format: 'parquet',
compression: 'zstd',
partitionStrategy: 'hourly',
compaction: {
targetFileSize: 134217728, // 128MB
maxSmallFiles: 50,
schedule: '0 */6 * * *',
},
},
governance: {
catalogUri: 'https://catalog.internal/api',
tagEnforcement: true,
auditLogging: true,
},
});
Quick Start Guide
- Enable Logical Replication: Configure PostgreSQL with
wal_level = logical and create a publication for the tables you want to replicate.
- Deploy Pipeline: Use the configuration template to deploy the CDC pipeline. Ensure replication slots are created and WAL consumption begins.
- Configure IAM: Create IAM roles for the pipeline and agents. Scope policies to specific storage prefixes and actions.
- Register Catalog: Initialize the governed catalog and register table metadata with column-level tags.
- Validate: Run test queries through the agent client to verify governance enforcement and audit logging. Monitor ingestion lag and compaction metrics.