Building Tool-Augmented Agents

Retrieval-Augmented Generation (RAG) Pipeline

14m read

Retrieval-Augmented Generation (RAG) Pipeline

RAG is the most widely deployed pattern for giving LLMs access to large, dynamic knowledge bases. Instead of fine-tuning the model on your data (expensive and inflexible), RAG retrieves relevant context at query time and injects it into the prompt. This lesson walks through building a complete production-grade RAG pipeline.

The RAG Architecture

User query
    │
    ▼
Query embedding (text → vector)
    │
    ▼
Vector similarity search → Top-k relevant chunks
    │
    ▼
Prompt construction: [System] + [Retrieved chunks] + [User query]
    │
    ▼
LLM generates answer grounded in retrieved context
    │
    ▼
Response + source citations

Step 1: Document Ingestion and Chunking

Before retrieval, documents must be loaded, split into chunks, and embedded:

from langchain_community.document_loaders import PyPDFLoader, TextLoader, DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document

# Load documents
def load_documents(source_path: str) -> list[Document]:
    """Load documents from files or directories."""
    if source_path.endswith(".pdf"):
        loader = PyPDFLoader(source_path)
    elif source_path.endswith(".txt"):
        loader = TextLoader(source_path)
    else:
        # Load all supported files from a directory
        loader = DirectoryLoader(source_path, glob="**/*.{pdf,txt,md}")
    return loader.load()

# Split into chunks
def chunk_documents(documents: list[Document]) -> list[Document]:
    """
    Split documents into semantically-sized chunks.
    
    RecursiveCharacterTextSplitter splits on paragraph breaks, sentences,
    then words — preferring larger natural boundaries before falling back
    to smaller ones.
    """
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,          # Target chunk size in characters
        chunk_overlap=200,        # Overlap to preserve context at boundaries
        length_function=len,
        separators=["\n\n", "\n", ". ", " ", ""],  # Try splitting on these in order
    )
    chunks = splitter.split_documents(documents)
    
    # Add chunk metadata for traceability
    for i, chunk in enumerate(chunks):
        chunk.metadata["chunk_index"] = i
        chunk.metadata["chunk_size"] = len(chunk.page_content)
    
    return chunks

# Embed and store
def build_vector_store(chunks: list[Document], persist_dir: str) -> Chroma:
    """Build and persist a vector store from document chunks."""
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    
    vector_store = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=persist_dir,
        collection_name="knowledge_base",
    )
    return vector_store

# Full ingestion pipeline
docs = load_documents("./knowledge_base/")
chunks = chunk_documents(docs)
print(f"Loaded {len(docs)} documents → {len(chunks)} chunks")
vector_store = build_vector_store(chunks, "./.vector_db")

Step 2: Retrieval

from langchain_core.retrievers import BaseRetriever

# Basic similarity retriever
basic_retriever = vector_store.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

# MMR retriever: maximizes diversity among retrieved chunks
mmr_retriever = vector_store.as_retriever(
    search_type="mmr",  # Maximal Marginal Relevance
    search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.7}
)

# Similarity with score threshold: only return relevant results
threshold_retriever = vector_store.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={"score_threshold": 0.7, "k": 5}
)

# Test retrieval
query = "How do I configure Kafka consumer groups?"
results = basic_retriever.invoke(query)
for doc in results:
    print(f"Source: {doc.metadata.get('source', 'unknown')}")
    print(f"Content: {doc.page_content[:200]}...\n")

Step 3: Building the RAG Chain

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# Format retrieved documents for the prompt
def format_docs(docs: list[Document]) -> str:
    formatted = []
    for i, doc in enumerate(docs, 1):
        source = doc.metadata.get("source", "Unknown")
        formatted.append(f"[Source {i}: {source}]\n{doc.page_content}")
    return "\n\n---\n\n".join(formatted)

# RAG prompt
rag_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a knowledgeable assistant. Answer questions using ONLY the provided context.

Rules:
1. If the context contains the answer, provide it with a reference to the source number.
2. If the context does NOT contain enough information, say: "I don't have enough information in my knowledge base to answer that."
3. Never fabricate information not present in the context.
4. Quote relevant passages when helpful."""),
    ("human", """Context:
{context}

Question: {question}""")
])

# Complete RAG chain
rag_chain = (
    {"context": basic_retriever | format_docs, "question": RunnablePassthrough()}
    | rag_prompt
    | llm
    | StrOutputParser()
)

# Usage
answer = rag_chain.invoke("How do I configure Kafka consumer groups?")
print(answer)

Step 4: Adding Citations

from langchain_core.runnables import RunnableParallel

# Return both the answer and the source documents
rag_chain_with_sources = RunnableParallel(
    answer=rag_chain,
    sources=basic_retriever,
)

result = rag_chain_with_sources.invoke("What is partition rebalancing in Kafka?")
print(f"Answer: {result['answer']}\n")
print("Sources:")
for doc in result["sources"]:
    print(f"  - {doc.metadata.get('source', 'unknown')} (page {doc.metadata.get('page', '?')})")

Common RAG Failure Modes and Fixes

ProblemSymptomFix
Chunks too largeRetrieved chunks have too much irrelevant contentReduce chunk_size to 500-700
Chunks too smallAnswer spans multiple chunks, none individually sufficientIncrease chunk_size, increase overlap
Low recallCorrect answer not retrievedUse MMR retrieval, increase k, check embeddings
HallucinationModel answers questions not in contextStronger prompt constraints, add "only use context" instruction
Wrong documentRetrieves semantically similar but wrong contentAdd metadata filtering, use hybrid search

A well-tuned RAG pipeline can achieve 85-95% answer accuracy on domain-specific questions — far better than a general LLM with no context, and far cheaper than fine-tuning.