Retrieval-Augmented Generation (RAG) Pipeline
RAG is the most widely deployed pattern for giving LLMs access to large, dynamic knowledge bases. Instead of fine-tuning the model on your data (expensive and inflexible), RAG retrieves relevant context at query time and injects it into the prompt. This lesson walks through building a complete production-grade RAG pipeline.
The RAG Architecture
User query
│
▼
Query embedding (text → vector)
│
▼
Vector similarity search → Top-k relevant chunks
│
▼
Prompt construction: [System] + [Retrieved chunks] + [User query]
│
▼
LLM generates answer grounded in retrieved context
│
▼
Response + source citations
Step 1: Document Ingestion and Chunking
Before retrieval, documents must be loaded, split into chunks, and embedded:
from langchain_community.document_loaders import PyPDFLoader, TextLoader, DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
# Load documents
def load_documents(source_path: str) -> list[Document]:
"""Load documents from files or directories."""
if source_path.endswith(".pdf"):
loader = PyPDFLoader(source_path)
elif source_path.endswith(".txt"):
loader = TextLoader(source_path)
else:
# Load all supported files from a directory
loader = DirectoryLoader(source_path, glob="**/*.{pdf,txt,md}")
return loader.load()
# Split into chunks
def chunk_documents(documents: list[Document]) -> list[Document]:
"""
Split documents into semantically-sized chunks.
RecursiveCharacterTextSplitter splits on paragraph breaks, sentences,
then words — preferring larger natural boundaries before falling back
to smaller ones.
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Target chunk size in characters
chunk_overlap=200, # Overlap to preserve context at boundaries
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""], # Try splitting on these in order
)
chunks = splitter.split_documents(documents)
# Add chunk metadata for traceability
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_index"] = i
chunk.metadata["chunk_size"] = len(chunk.page_content)
return chunks
# Embed and store
def build_vector_store(chunks: list[Document], persist_dir: str) -> Chroma:
"""Build and persist a vector store from document chunks."""
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_dir,
collection_name="knowledge_base",
)
return vector_store
# Full ingestion pipeline
docs = load_documents("./knowledge_base/")
chunks = chunk_documents(docs)
print(f"Loaded {len(docs)} documents → {len(chunks)} chunks")
vector_store = build_vector_store(chunks, "./.vector_db")
Step 2: Retrieval
from langchain_core.retrievers import BaseRetriever
# Basic similarity retriever
basic_retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# MMR retriever: maximizes diversity among retrieved chunks
mmr_retriever = vector_store.as_retriever(
search_type="mmr", # Maximal Marginal Relevance
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.7}
)
# Similarity with score threshold: only return relevant results
threshold_retriever = vector_store.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.7, "k": 5}
)
# Test retrieval
query = "How do I configure Kafka consumer groups?"
results = basic_retriever.invoke(query)
for doc in results:
print(f"Source: {doc.metadata.get('source', 'unknown')}")
print(f"Content: {doc.page_content[:200]}...\n")
Step 3: Building the RAG Chain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Format retrieved documents for the prompt
def format_docs(docs: list[Document]) -> str:
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "Unknown")
formatted.append(f"[Source {i}: {source}]\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
# RAG prompt
rag_prompt = ChatPromptTemplate.from_messages([
("system", """You are a knowledgeable assistant. Answer questions using ONLY the provided context.
Rules:
1. If the context contains the answer, provide it with a reference to the source number.
2. If the context does NOT contain enough information, say: "I don't have enough information in my knowledge base to answer that."
3. Never fabricate information not present in the context.
4. Quote relevant passages when helpful."""),
("human", """Context:
{context}
Question: {question}""")
])
# Complete RAG chain
rag_chain = (
{"context": basic_retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
# Usage
answer = rag_chain.invoke("How do I configure Kafka consumer groups?")
print(answer)
Step 4: Adding Citations
from langchain_core.runnables import RunnableParallel
# Return both the answer and the source documents
rag_chain_with_sources = RunnableParallel(
answer=rag_chain,
sources=basic_retriever,
)
result = rag_chain_with_sources.invoke("What is partition rebalancing in Kafka?")
print(f"Answer: {result['answer']}\n")
print("Sources:")
for doc in result["sources"]:
print(f" - {doc.metadata.get('source', 'unknown')} (page {doc.metadata.get('page', '?')})")
Common RAG Failure Modes and Fixes
| Problem | Symptom | Fix |
|---|---|---|
| Chunks too large | Retrieved chunks have too much irrelevant content | Reduce chunk_size to 500-700 |
| Chunks too small | Answer spans multiple chunks, none individually sufficient | Increase chunk_size, increase overlap |
| Low recall | Correct answer not retrieved | Use MMR retrieval, increase k, check embeddings |
| Hallucination | Model answers questions not in context | Stronger prompt constraints, add "only use context" instruction |
| Wrong document | Retrieves semantically similar but wrong content | Add metadata filtering, use hybrid search |
A well-tuned RAG pipeline can achieve 85-95% answer accuracy on domain-specific questions — far better than a general LLM with no context, and far cheaper than fine-tuning.