ion. We use Pydantic to enforce type safety and prevent silent failures during graph execution.
from pydantic import BaseModel, Field
from typing import List, Optional
from langchain_core.documents import Document
class ValidationPipelineState(BaseModel):
query: str
kb_documents: List[Document] = Field(default_factory=list)
document_scores: List[float] = Field(default_factory=list)
aggregate_score: float = 0.0
routing_verdict: str = "unknown" # high_confidence | low_confidence | mixed
external_results: Optional[str] = None
refined_external_docs: List[Document] = Field(default_factory=list)
assembled_context: List[Document] = Field(default_factory=list)
final_answer: str = ""
execution_path: List[str] = Field(default_factory=list)
Document Scoring Node
Batch scoring introduces prompt leakage and reduces granularity. Each document is evaluated independently using structured JSON output to guarantee parseability.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_openai import ChatOpenAI
SCORING_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are a relevance evaluator. Assign a score between 0.0 and 1.0
for how well the document answers the query.
1.0: Directly and completely addresses the query
0.5: Partially relevant but missing key details
0.0: Unrelated or contradictory
Return ONLY a JSON object: {"score": <float>}"""),
("human", "Query: {query}\nDocument: {content}")
])
class DocumentScorer:
def __init__(self, llm: ChatOpenAI):
self.chain = SCORING_PROMPT | llm.with_structured_output({"score": float})
def evaluate(self, state: ValidationPipelineState) -> ValidationPipelineState:
scores = []
for doc in state.kb_documents:
try:
result = self.chain.invoke({
"query": state.query,
"content": doc.page_content[:500]
})
scores.append(max(0.0, min(1.0, result["score"])))
except Exception:
scores.append(0.0)
avg_score = sum(scores) / len(scores) if scores else 0.0
if avg_score >= 0.7:
verdict = "high_confidence"
elif avg_score <= 0.3:
verdict = "low_confidence"
else:
verdict = "mixed"
return state.model_copy(update={
"document_scores": scores,
"aggregate_score": avg_score,
"routing_verdict": verdict,
"execution_path": state.execution_path + ["scoring"]
})
External Search & Refinement Node
Raw search snippets are noisy. The refinement step extracts factual signals and strips promotional or tangential content.
from langchain_core.tools import Tool
import asyncio
SEARCH_REFINEMENT_PROMPT = ChatPromptTemplate.from_messages([
("system", """Extract only the factual information directly relevant to the query.
Remove advertisements, navigation text, and speculative claims.
Return a clean, concise summary suitable for technical documentation."""),
("human", "Query: {query}\nRaw Results: {raw}")
])
class ExternalSearchNode:
def __init__(self, search_tool: Tool, llm: ChatOpenAI):
self.tool = search_tool
self.refiner = SEARCH_REFINEMENT_PROMPT | llm | JsonOutputParser()
async def execute(self, state: ValidationPipelineState) -> ValidationPipelineState:
try:
raw = await asyncio.wait_for(
asyncio.to_thread(self.tool.invoke, state.query),
timeout=5.0
)
refined = await self.refiner.ainvoke({
"query": state.query,
"raw": raw[:2000]
})
external_doc = Document(
page_content=refined.get("summary", ""),
metadata={"source": "external_search", "timestamp": "auto"}
)
return state.model_copy(update={
"refined_external_docs": [external_doc],
"execution_path": state.execution_path + ["external_search"]
})
except Exception:
return state.model_copy(update={
"refined_external_docs": [],
"execution_path": state.execution_path + ["search_fallback"]
})
Context Assembly Node
Assembly must respect token budgets and preserve source attribution. Blind merging causes context window overflow and attribution loss.
class ContextAssembler:
def __init__(self, max_tokens: int = 3000):
self.max_tokens = max_tokens
def assemble(self, state: ValidationPipelineState) -> ValidationPipelineState:
kb_docs = [
doc for doc, score in zip(state.kb_documents, state.document_scores)
if score >= 0.3
]
external_docs = state.refined_external_docs
if state.routing_verdict == "high_confidence":
assembled = sorted(kb_docs, key=lambda d: d.metadata.get("score", 0), reverse=True)[:3]
elif state.routing_verdict == "low_confidence":
assembled = external_docs if external_docs else [kb_docs[0]] if kb_docs else []
else:
assembled = kb_docs + external_docs
# Token-aware truncation
token_count = 0
final_context = []
for doc in assembled:
estimated_tokens = len(doc.page_content) // 4
if token_count + estimated_tokens > self.max_tokens:
break
final_context.append(doc)
token_count += estimated_tokens
return state.model_copy(update={
"assembled_context": final_context,
"execution_path": state.execution_path + ["assembly"]
})
Graph Routing Logic
The graph uses conditional edges to skip unnecessary steps. High-confidence queries bypass external search entirely, reducing latency and cost.
from langgraph.graph import StateGraph, END
def build_pipeline():
workflow = StateGraph(ValidationPipelineState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("score", DocumentScorer(llm).evaluate)
workflow.add_node("search", ExternalSearchNode(search_tool, llm).execute)
workflow.add_node("assemble", ContextAssembler().assemble)
workflow.add_node("generate", generate_answer)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "score")
workflow.add_conditional_edges(
"score",
lambda s: "search" if s.routing_verdict != "high_confidence" else "assemble",
{"search": "search", "assemble": "assemble"}
)
workflow.add_edge("search", "assemble")
workflow.add_edge("assemble", "generate")
workflow.add_edge("generate", END)
return workflow.compile()
Architecture Rationale
- Structured Output for Scoring: String parsing fails silently. JSON schema enforcement guarantees consistent routing decisions.
- Async Search with Timeout: External APIs introduce unpredictable latency. A 5-second timeout prevents pipeline blocking.
- Token-Aware Assembly: Merging KB and external results without limits causes context overflow. Pre-generation truncation preserves generation quality.
- Conditional Routing: Skipping external search for high-confidence queries reduces LLM call volume by ~40% in production workloads.
Pitfall Guide
1. Hardcoded Threshold Rigidity
Explanation: Using fixed 0.7/0.3 boundaries across all domains causes false negatives in specialized fields where terminology differs from general corpora.
Fix: Calibrate thresholds using a validation set. Run a grid search over threshold pairs and select the combination that maximizes context_precision while maintaining acceptable latency.
2. Unbounded Context Merging
Explanation: Combining top-k KB documents with external search results without token limits causes the LLM to truncate critical information or degrade in coherence.
Fix: Implement token budgeting before assembly. Use a lightweight tokenizer (e.g., tiktoken) to cap context at 70-80% of the model's window, prioritizing higher-scoring documents.
3. Search Result Noise Injection
Explanation: Raw SERP snippets contain ads, navigation menus, and speculative content. Feeding this directly to the LLM corrupts the context window.
Fix: Always route external results through a refinement step. Strip HTML, remove non-factual claims, and enforce a maximum length before assembly.
4. Scoring Latency Multiplication
Explanation: Scoring every retrieved document multiplies LLM calls. With top-10 retrieval, this adds 10 synchronous API requests per query.
Fix: Score only the top-3 to top-5 documents. Use a lightweight cross-encoder or classifier for pre-filtering if the corpus is large. Cache scores for repeated queries.
5. Source Attribution Loss
Explanation: Merging KB and external documents without metadata tracking makes it impossible to audit answers or comply with data governance policies.
Fix: Attach explicit source tags (source: "internal_kb", source: "external_search") during assembly. Pass these tags to the generation prompt for citation formatting.
6. Ignoring Graceful Degradation
Explanation: Network failures or search API rate limits crash the pipeline if external search is treated as mandatory.
Fix: Wrap search calls in try/except blocks with fallback logic. If external search fails, route to the highest-scoring KB document or return a structured "insufficient context" response.
7. Prompt Leakage in Evaluation
Explanation: Asking the LLM to score multiple documents in a single prompt causes attention dilution and inconsistent scoring.
Fix: Evaluate documents sequentially or in small batches with isolated prompts. Use structured output to prevent verbose reasoning from consuming context.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-coverage internal wiki | KB-only with hybrid search | Corpus contains answers; external search adds noise | Low |
| Dynamic market/tech queries | Corrective routing (CRAG) | KB lacks real-time data; external search fills gaps | Medium (+30-50% LLM calls) |
| Mixed-intent chatbot | Self-RAG + CRAG hybrid | Decides whether to retrieve, then validates results | High (requires dual evaluation) |
| Cost-constrained MVP | Pre-filter + top-3 scoring | Reduces API calls while maintaining validation | Low-Medium |
| Compliance-heavy domain | KB-only with strict citation | External search introduces unverified claims | Low |
Configuration Template
pipeline:
retrieval:
top_k: 5
similarity_threshold: 0.65
hybrid_weight: 0.4
validation:
scoring_model: "gpt-4o-mini"
thresholds:
high_confidence: 0.72
low_confidence: 0.28
max_documents_to_score: 3
external_search:
provider: "tavily"
timeout_seconds: 4
max_results: 3
refinement_model: "gpt-4o-mini"
assembly:
max_context_tokens: 2800
preserve_sources: true
generation:
model: "gpt-4o"
temperature: 0.2
max_tokens: 1024
fallback:
on_search_failure: "use_best_kb_doc"
on_low_confidence: "structured_refusal"
Quick Start Guide
- Initialize Dependencies: Install
langgraph, langchain-openai, pydantic, and your preferred search provider SDK. Configure API keys in environment variables.
- Deploy Graph: Compile the pipeline using the provided state definition and node implementations. Run a dry execution with a sample query to verify routing logic.
- Calibrate Thresholds: Execute 50-100 representative queries. Log aggregate scores and routing decisions. Adjust
high_confidence and low_confidence boundaries based on observed precision.
- Enable Monitoring: Attach a callback handler to track execution paths, token usage, and latency. Set up alerts for search timeout rates or scoring failures.
- Evaluate & Iterate: Run RAGAS metrics monthly. Compare context_precision and faithfulness against baseline. Refine assembly token limits and search refinement prompts based on drift patterns.