Fully open-source RAG with pgvector + pgai + Ollama, and ragvitals watching for drift
Building Observable RAG Pipelines Inside PostgreSQL: A Local-First Architecture
Current Situation Analysis
Shipping a Retrieval-Augmented Generation (RAG) system to production is rarely the challenge. Keeping it stable after the first model swap, corpus expansion, or embedder upgrade is. Teams consistently encounter silent degradation: retrieval recall drops, generation faithfulness erodes, and query intent distributions shift. Because these changes are semantic rather than infrastructural, traditional observability stacks miss them entirely. Latency, error rates, and throughput dashboards remain green while the actual intelligence pipeline deteriorates.
The root cause is architectural fragmentation. Most RAG implementations stitch together separate services: a vector database for storage, an external embedding API, a cloud LLM endpoint, and a third-party evaluation framework. Each hop introduces latency, serialization overhead, and blind spots. When a metric degrades, engineers spend days tracing whether the fault lies in the embedding space, the retrieval algorithm, the prompt template, or the generative model. Without a unified baseline, component isolation is guesswork.
This problem is systematically overlooked because teams treat RAG as a static function rather than a dynamic system. Evaluation is often performed offline during development, then abandoned in production. The industry lacks lightweight, open-source tooling that bridges the gap between database operations and semantic monitoring. The result is a reliance on user complaints or manual spot-checks to detect drift.
Data from production deployments shows that changing a single component produces predictable, isolated shifts across specific evaluation dimensions. Swapping an embedder alters query and embedding distributions but leaves generation quality stable. Replacing the LLM impacts faithfulness and relevance scores while retrieval metrics remain flat. A multi-dimensional monitoring layer that tracks these shifts independently transforms RAG from a black box into a diagnosable system.
WOW Moment: Key Findings
The critical insight is that semantic drift is not monolithic. By tracking five distinct dimensions simultaneously, you can isolate exactly which layer of the pipeline changed and quantify the impact. The following table maps component modifications to their observable effects across the monitoring dimensions:
| Component Changed | Query Distribution | Embedding Drift | Retrieval Relevance | Response Quality | Judge Drift |
|---|---|---|---|---|---|
| Swap Embedder | π΄ Shift | π΄ Shift | π‘ Minor | π’ Stable | π’ Stable |
| Swap Generator | π’ Stable | π’ Stable | π’ Stable | π΄ Shift | π΄ Shift |
| Update Corpus | π΄ Shift | π‘ Minor | π΄ Shift | π‘ Minor | π’ Stable |
| Modify Prompt | π’ Stable | π’ Stable | π’ Stable | π΄ Shift | π΄ Shift |
| Baseline (Normal) | π’ Stable | π’ Stable | π’ Stable | π’ Stable | π’ Stable |
This isolation capability matters because it eliminates diagnostic ambiguity. Instead of rolling back an entire stack when quality drops, you can pinpoint whether the retriever needs tuning, the embedder requires re-calibration, or the generator is hallucinating. The system becomes self-auditing. Engineers gain confidence to iterate on models and data because every change is immediately measurable against a rolling baseline. This transforms RAG operations from reactive firefighting to proactive optimization.
Core Solution
The architecture centers on three principles: consolidation, locality, and continuous evaluation. By embedding AI operations directly into PostgreSQL, eliminating external API dependencies, and attaching a lightweight drift detector to every inference call, you create a pipeline that is both operationally simple and semantically transparent.
Step 1: Infrastructure Initialization
Start with a PostgreSQL instance extended for vector operations and inline AI execution. The TimescaleDB distribution provides production-ready extensions without additional orchestration.
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS ai CASCADE;
The vector extension handles high-dimensional storage and similarity search. The ai extension (pgai) exposes Ollama as a native SQL interface, allowing embedding and generation to execute within the database engine. This removes network serialization, connection pooling complexity, and cross-service fault tolerance concerns.
Step 2: Schema Design & Vectorization Strategy
Design a single source-of-truth table. Avoid separating raw text from embeddings unless you have strict compliance requirements requiring data partitioning.
CREATE TABLE knowledge_store (
doc_id TEXT PRIMARY KEY,
raw_text TEXT NOT NULL,
doc_vector VECTOR(768) NOT NULL,
indexed_at TIMESTAMPTZ DEFAULT NOW()
);
For ingestion, you have two patterns. Inline embedding works for batch loads or low-frequency updates. The vectorizer pattern is mandatory for production-scale continuous ingestion.
-- Inline embedding at insert time
INSERT INTO knowledge_store (doc_id, raw_text, doc_vector)
VALUES (
'sys-arch-001',
'Stateless microservices require external session stores for horizontal scaling.',
ai.ollama_embed('nomic-embed-text', 'Stateless microservices require external session stores for horizontal scaling.')::vector
);
-- Production vectorizer configuration
SELECT ai.create_vectorizer(
'knowledge_store'::regclass,
embedding => ai.embedding_ollama('nomic-embed-text', 768),
chunking => ai.chunking_recursive_character_text_splitter('raw_text', 512, 64),
scheduling => ai.scheduling_continuous()
);
The vectorizer spawns a background worker that watches the base table, chunks text, generates embeddings, and maintains the vector column asynchronously. This prevents insert latency spikes and ensures the embedding pipeline scales independently of write throughput.
Step 3: Unified Retrieval & Generation
Combine semantic search and LLM inference into a single database round trip. The <=> operator computes cosine distance. Subtracting it from 1 yields a similarity score suitable for ranking.
-- Context retrieval
SELECT doc_id, raw_text,
1 - (doc_vector <=> ai.ollama_embed('nomic-embed-text', $1)::vector) AS similarity
FROM knowledge_store
ORDER BY doc_
vector <=> ai.ollama_embed('nomic-embed-text', $1)::vector LIMIT 5;
Pass the retrieved context directly to the generation function. pgai returns structured JSON; extract the payload using PostgreSQL operators.
```sql
-- Inline generation
SELECT ai.ollama_generate('gemma2:9b',
'Context:' || E'\n' || $1 || E'\n\nQuestion: ' || $2 || E'\nAnswer:'
)->>'response' AS generated_output;
Wrapping these in a PL/pgSQL function or a Python adapter standardizes the interface and enables consistent prompt templating.
Step 4: Drift Detection Pipeline
Attach a monitoring layer that captures every inference cycle. The ragvitals library provides a dimension-aware detector that compares live traces against a rolling baseline.
from ragvitals import (
Detector, EmbeddingDrift, InMemorySink, JudgeDrift,
QueryDistribution, ResponseQuality, RetrievalRelevance, Trace
)
from datetime import datetime
class SemanticMonitor:
def __init__(self, k: int = 5):
self.detector = Detector(
dimensions=[
QueryDistribution(),
RetrievalRelevance(metric="hit_rate", k=k),
EmbeddingDrift(),
ResponseQuality(score_keys=["faithfulness", "relevance"]),
JudgeDrift(score_key="faithfulness")
],
sinks=[InMemorySink(window_size=1000)]
)
self.k = k
def calibrate(self, reference_embeddings: list, baseline_scores: dict):
self.detector.dimensions[0].set_reference(reference_embeddings)
self.detector.dimensions[2].set_reference(reference_embeddings)
self.detector.dimensions[4].set_reference(baseline_scores)
def record(self, query: str, query_vec: list, retrieved: list,
answer: str, judge_scores: dict, metadata: dict):
trace = Trace(
timestamp=datetime.utcnow(),
query=query,
query_embedding=query_vec,
retrieval_scores=[item[2] for item in retrieved],
relevance_labels=[1] + [0] * (len(retrieved) - 1),
response=answer,
judge_scores=judge_scores,
metadata=metadata
)
self.detector.ingest(trace)
def generate_report(self) -> dict:
return self.detector.report()
Initialize the monitor once per deployment cycle. Feed it traces from every production query. The detector maintains statistical baselines and flags deviations across each dimension independently. When you swap gemma2:9b for llama3.1:8b, only ResponseQuality and JudgeDrift will trigger. When you update the corpus, QueryDistribution and RetrievalRelevance will shift. The architecture turns component changes into measurable signals.
Pitfall Guide
1. Embedding Dimension Mismatch
Explanation: Hardcoding vector dimensions in the schema while using an embedder that outputs a different size causes silent truncation or insertion failures.
Fix: Query the embedder's output dimension programmatically before schema creation. Use ai.embedding_ollama('model-name', dimension) to enforce consistency. Validate dimensions during vectorizer initialization.
2. Synchronous Generation Blocking
Explanation: Running LLM generation inside the same transaction as retrieval blocks database connections while the model generates tokens. This collapses connection pools under concurrent load.
Fix: Decouple retrieval and generation. Fetch context synchronously, then dispatch generation to an async worker pool or message queue. Use pgai for retrieval only, and route generation through a dedicated inference service when QPS exceeds local GPU capacity.
3. Uninitialized Baselines
Explanation: Feeding live traces into the detector without setting reference distributions causes immediate false positives. The system flags normal variance as drift because it lacks a baseline. Fix: Run a calibration phase with 500-1000 representative queries before enabling alerts. Store the reference embeddings and score distributions in a separate configuration table. Update baselines only during scheduled maintenance windows.
4. Cosine Similarity for Structured Data
Explanation: Pure vector search struggles with exact keyword matching, code syntax, or contractual clauses where semantic proximity is less important than lexical precision.
Fix: Implement hybrid search. Combine pgvector cosine similarity with pg_search or a dedicated Elasticsearch shard for BM25 scoring. Weight the results using a reciprocal rank fusion algorithm before passing to the generator.
5. Missing Multi-Tenant Isolation
Explanation: Storing all documents in a single table exposes cross-tenant data during retrieval. Vector indexes do not enforce access control.
Fix: Enable PostgreSQL Row-Level Security (RLS). Add a tenant_id column and create policies that filter queries by session context. Alternatively, use schema-per-tenant for strict compliance requirements. Never rely on application-layer filtering alone.
6. Prompt Template Hardcoding
Explanation: Embedding prompt strings directly in SQL functions or Python code makes versioning impossible. A/B testing prompt variations requires code deployments.
Fix: Store prompts in a prompt_registry table with versioning, effective dates, and rollback capabilities. Fetch the active template at runtime. Log which prompt version was used in the drift metadata for correlation analysis.
7. Treating Drift as Binary Failure
Explanation: Alerting on any deviation causes alert fatigue. Minor statistical noise triggers pages that require no action.
Fix: Implement threshold bands. Use warn for deviations within 1.5 standard deviations and degraded for deviations beyond 3 standard deviations. Correlate drift spikes with deployment timestamps before triggering incident response.
Production Bundle
Action Checklist
- Initialize PostgreSQL with
vectorandaiextensions on a dedicated instance - Define vector dimensions matching the chosen embedder before table creation
- Configure the vectorizer for continuous ingestion instead of inline embedding
- Implement hybrid search (BM25 + cosine) for structured or keyword-heavy corpora
- Enable Row-Level Security if supporting multiple tenants or departments
- Calibrate the drift detector with a representative query baseline before go-live
- Decouple generation from retrieval using async workers for high-throughput workloads
- Version prompt templates in a database registry and log usage in trace metadata
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|---|---|---|
| < 100k documents, single tenant | Inline pgai embedding + retrieval | Lowest operational overhead, single round trip | Minimal infrastructure cost |
| > 1M documents, high write volume | Vectorizer background worker + pgvectorscale | Prevents insert latency, scales to billions of vectors | Moderate storage cost, negligible compute |
| Strict compliance / multi-tenant | Schema-per-tenant + RLS policies | Guarantees data isolation, simplifies audit trails | Higher operational complexity |
| Code/contract retrieval | Hybrid BM25 + vector search | Preserves lexical precision while capturing semantics | Requires additional search index maintenance |
| High QPS generation | Async worker pool + local Ollama | Prevents DB connection exhaustion, enables GPU batching | Requires dedicated inference hardware |
Configuration Template
-- Production schema with metadata tracking
CREATE TABLE document_index (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_text TEXT NOT NULL,
embedding VECTOR(768) NOT NULL,
tenant_id TEXT NOT NULL,
chunk_index INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Enable row-level security
ALTER TABLE document_index ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON document_index
USING (tenant_id = current_setting('app.current_tenant', true));
-- Vectorizer setup for continuous sync
SELECT ai.create_vectorizer(
'document_index'::regclass,
embedding => ai.embedding_ollama('nomic-embed-text', 768),
chunking => ai.chunking_recursive_character_text_splitter('source_text', 1024, 128),
scheduling => ai.scheduling_continuous(interval '30 seconds')
);
# Production drift monitor initialization
from ragvitals import Detector, EmbeddingDrift, JsonlSink, JudgeDrift, QueryDistribution, ResponseQuality, RetrievalRelevance
def initialize_monitor(tenant: str, output_dir: str = "./drift_logs"):
sink = JsonlSink(filepath=f"{output_dir}/{tenant}_drift.jsonl")
monitor = Detector(
dimensions=[
QueryDistribution(),
RetrievalRelevance(metric="hit_rate", k=5),
EmbeddingDrift(),
ResponseQuality(score_keys=["faithfulness", "relevance"]),
JudgeDrift(score_key="faithfulness")
],
sinks=[sink]
)
return monitor
Quick Start Guide
- Launch PostgreSQL with extensions: Run
timescale/timescaledb-ha:pg17-allvia Docker, expose port 5432, and executeCREATE EXTENSION vector; CREATE EXTENSION ai CASCADE; - Pull local models: Execute
ollama pull nomic-embed-text,ollama pull gemma2:9b, andollama pull llama3.1:8bon the inference host - Create schema & vectorizer: Run the production schema SQL, configure the vectorizer to watch your base table, and set the chunking strategy to match your document structure
- Calibrate & ingest: Load 500 representative documents, generate reference embeddings, initialize the drift detector with baseline scores, and begin routing production queries through the unified retrieval-generation pipeline
- Monitor & iterate: Review the daily drift report, correlate dimension shifts with deployment events, and adjust retrieval thresholds or prompt templates based on isolated metric changes
