Project Architecture
Plain Language
Think of this capstone project like building a smart librarian for a private document collection. When someone walks into a traditional library and asks a question, the librarian knows where each book is shelved, which chapters are relevant, and how to synthesize information from multiple sources into a coherent answer. Our document portal does exactly the same thing, but with digital documents and an LLM acting as the librarian's brain.
The system has three major layers. The first layer is the ingestion pipeline, which is responsible for accepting uploaded documents (PDFs, Word files, text files), breaking them into smaller chunks, converting those chunks into numerical vectors (embeddings), and storing everything in a vector database. Think of this as the librarian carefully reading each new book, writing detailed index cards for every section, and filing those cards in a well-organized catalog system.
The second layer is the retrieval pipeline, which handles incoming user questions. When a user asks "What does our company policy say about remote work?", the system converts that question into a vector, searches the vector database for the most similar document chunks, and ranks the results by relevance. This is like the librarian taking your question, mentally mapping it to relevant catalog entries, and pulling the most useful books off the shelves.
The third layer is the synthesis layer, where the LLM takes the retrieved document chunks and the user's question, then generates a coherent, grounded answer with citations back to the source documents. The librarian doesn't just hand you a stack of books — they read the relevant passages and give you a clear, well-organized answer while pointing you to where they found the information.
All three layers are tied together by a FastAPI backend that exposes REST endpoints for document upload, querying, and conversation management, plus a simple chat frontend that gives users a familiar conversational interface. The entire system is containerized with Docker for easy deployment to cloud environments like AWS ECS or a simple VPS.
Deep Dive
The architecture follows a clean separation of concerns that mirrors real-world production RAG systems. At the highest level, we have four distinct services: an API server (FastAPI), a worker service (for async document processing), a vector database (ChromaDB or pgvector), and a frontend (static HTML/JS or Streamlit). These communicate over well-defined interfaces, making each component independently testable and replaceable.
The API server is the central coordinator. It exposes endpoints for document upload (accepting multipart file uploads), document listing and management, chat/query endpoints that trigger the RAG pipeline, and health checks for monitoring. FastAPI was chosen for its async support, automatic OpenAPI documentation, Pydantic validation, and excellent performance characteristics. Every endpoint returns structured JSON responses with consistent error handling.
The ingestion worker runs asynchronously to avoid blocking the API server during document processing. When a user uploads a 200-page PDF, the API immediately returns a job ID and status URL, then queues the document for background processing. The worker parses the document, splits it into chunks, generates embeddings, and writes everything to the vector store. For simplicity in this capstone, we use Python's asyncio with background tasks, but in production you would use Celery or AWS SQS for distributed job processing.
The vector store holds the embedded document chunks and supports similarity search. We use ChromaDB for local development because it requires zero infrastructure setup, but the architecture is designed so you can swap in pgvector (PostgreSQL extension) or a managed service like Amazon OpenSearch Serverless for production. The vector store is accessed by both the ingestion worker (writes) and the API server (reads during retrieval).
The metadata database (PostgreSQL or SQLite for development) tracks document-level metadata: upload timestamps, processing status, file names, page counts, user ownership, and collection assignments. This is separate from the vector store because relational queries (list all documents, filter by date, check processing status) are better served by a traditional database than a vector index.
The file storage layer persists the original uploaded documents. In local development, this is simply a directory on disk. In production, it maps to an S3 bucket with appropriate lifecycle policies. Keeping the originals allows reprocessing if you change your chunking strategy, embedding model, or need to debug retrieval quality issues.
This architecture makes the system horizontally scalable. You can run multiple API server instances behind a load balancer, scale ingestion workers independently based on upload volume, and the vector store and metadata database are shared services that can be scaled with their native clustering mechanisms.
Start with the simplest possible setup: a single process running FastAPI with background tasks, SQLite for metadata, ChromaDB in-process, and local file storage. Only split into separate services when you have a specific scaling need. Premature distribution adds complexity without benefit.
Document Ingestion Pipeline
Plain Language
Document ingestion is the process of taking a raw file — a PDF, a Word document, a plain text file — and converting it into a format that our RAG system can search through. This is like a librarian receiving a new book and creating detailed catalog cards for every chapter, section, and important passage before shelving it.
The pipeline has four stages. First, parsing: extracting the raw text content from whatever file format the user uploaded. PDFs are particularly tricky because they're designed for visual layout, not text extraction. A two-column academic paper, a scanned invoice, and a text-heavy report all require different parsing strategies. We use libraries like PyMuPDF (fitz) for native-text PDFs and can optionally integrate OCR for scanned documents.
Second, cleaning: the extracted text often contains artifacts like repeated headers and footers, page numbers scattered through the content, excessive whitespace, or encoding issues. We normalize this text to ensure clean, consistent input for the chunking stage. Think of it as the librarian tidying up photocopied pages before filing them — removing smudges and straightening text so the catalog cards are readable.
Third, chunking: splitting the cleaned text into smaller pieces (typically 500-1500 tokens each) that are the right size for embedding and retrieval. The art of chunking is finding pieces that are large enough to contain meaningful context but small enough to be precise when retrieved. We use recursive character text splitting with awareness of natural boundaries like paragraphs and section headings, so chunks don't cut mid-sentence or separate a heading from its content.
Fourth, metadata enrichment: each chunk gets tagged with information about where it came from — which document, which page, which section, when it was processed. This metadata is crucial for citations (telling the user "this answer came from page 14 of the Q3 report") and for filtering (searching only within documents from a specific collection or date range).
Deep Dive
Let's build the ingestion pipeline step by step, starting with the document parser that handles multiple file formats.
import fitz # PyMuPDF
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
import re, hashlib, uuid
@dataclass
class ParsedPage:
page_number: int
text: str
char_count: int
@dataclass
class ParsedDocument:
doc_id: str
filename: str
pages: list[ParsedPage]
total_pages: int
content_hash: str
class DocumentParser:
"""Multi-format document parser."""
SUPPORTED = {".pdf", ".txt", ".md", ".docx"}
def parse(self, file_path: Path) -> ParsedDocument:
suffix = file_path.suffix.lower()
if suffix == ".pdf":
pages = self._parse_pdf(file_path)
elif suffix in {".txt", ".md"}:
pages = self._parse_text(file_path)
else:
raise ValueError(f"Unsupported: {suffix}")
full_text = "\n".join(p.text for p in pages)
return ParsedDocument(
doc_id=str(uuid.uuid4()),
filename=file_path.name,
pages=pages,
total_pages=len(pages),
content_hash=hashlib.sha256(full_text.encode()).hexdigest()[:16]
)
def _parse_pdf(self, path: Path) -> list[ParsedPage]:
doc = fitz.open(path)
pages = []
for i, page in enumerate(doc):
text = page.get_text("text")
text = self._clean(text)
if text.strip():
pages.append(ParsedPage(i + 1, text, len(text)))
return pages
def _parse_text(self, path: Path) -> list[ParsedPage]:
text = path.read_text(encoding="utf-8")
text = self._clean(text)
return [ParsedPage(1, text, len(text))]
def _clean(self, text: str) -> str:
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
The parser produces a ParsedDocument containing individual pages with their text content. The content hash allows deduplication — if a user uploads the same document twice, we can detect it and skip reprocessing. The cleaning step normalizes whitespace, which prevents downstream chunking from producing chunks that are mostly empty space.
Next, the chunker splits parsed pages into retrieval-sized pieces with proper overlap:
@dataclass
class Chunk:
chunk_id: str
doc_id: str
text: str
metadata: dict
class RecursiveChunker:
def __init__(self, chunk_size: int = 800, overlap: int = 200):
self.chunk_size = chunk_size
self.overlap = overlap
self.separators = ["\n\n", "\n", ". ", " "]
def chunk_document(self, doc: ParsedDocument) -> list[Chunk]:
chunks = []
for page in doc.pages:
page_chunks = self._split_recursive(page.text, self.separators)
for i, text in enumerate(page_chunks):
chunks.append(Chunk(
chunk_id=f"{doc.doc_id}_p{page.page_number}_c{i}",
doc_id=doc.doc_id,
text=text,
metadata={
"filename": doc.filename,
"page": page.page_number,
"chunk_index": i,
"char_count": len(text),
}
))
return chunks
def _split_recursive(self, text: str, seps: list[str]) -> list[str]:
if len(text) <= self.chunk_size:
return [text] if text.strip() else []
sep = seps[0] if seps else ""
parts = text.split(sep) if sep else [text[i:i+self.chunk_size] for i in range(0, len(text), self.chunk_size)]
chunks, current = [], ""
for part in parts:
candidate = (current + sep + part).strip() if current else part.strip()
if len(candidate) <= self.chunk_size:
current = candidate
else:
if current:
chunks.append(current)
if len(part) > self.chunk_size and len(seps) > 1:
chunks.extend(self._split_recursive(part, seps[1:]))
current = ""
else:
current = part
if current:
chunks.append(current)
# Add overlap between consecutive chunks
return self._add_overlap(chunks)
def _add_overlap(self, chunks: list[str]) -> list[str]:
if len(chunks) <= 1:
return chunks
result = [chunks[0]]
for i in range(1, len(chunks)):
overlap_text = chunks[i - 1][-self.overlap:]
result.append(overlap_text + " " + chunks[i])
return result
The recursive chunking strategy tries to split on paragraph boundaries first (\n\n), then line breaks, then sentences, and finally words. This preserves semantic coherence — a chunk about "revenue projections" won't be split mid-sentence across two retrieval results. The 200-character overlap ensures that information at chunk boundaries isn't lost; both the end of one chunk and the beginning of the next contain the boundary text.
Start with 800-character chunks and 200-character overlap. These values work well for most business documents. If retrieval quality is poor, experiment with larger chunks (1200-1500 characters) for documents with long, interconnected arguments, or smaller chunks (400-600) for FAQ-style content with discrete answers.
The full ingestion orchestrator ties parsing and chunking together with metadata tracking:
import asyncio
from datetime import datetime
class IngestionService:
def __init__(self, parser, chunker, embedder, vector_store, metadata_db):
self.parser = parser
self.chunker = chunker
self.embedder = embedder
self.vector_store = vector_store
self.db = metadata_db
async def ingest(self, file_path: Path, collection: str = "default") -> str:
# 1. Parse the document
doc = self.parser.parse(file_path)
# 2. Check for duplicates
if await self.db.hash_exists(doc.content_hash):
return f"Duplicate detected: {doc.content_hash}"
# 3. Record in metadata DB
await self.db.insert_document({
"doc_id": doc.doc_id,
"filename": doc.filename,
"pages": doc.total_pages,
"hash": doc.content_hash,
"collection": collection,
"status": "processing",
"created_at": datetime.utcnow().isoformat()
})
# 4. Chunk the document
chunks = self.chunker.chunk_document(doc)
# 5. Generate embeddings in batches
texts = [c.text for c in chunks]
embeddings = await self.embedder.embed_batch(texts, batch_size=64)
# 6. Store in vector database
self.vector_store.add(
ids=[c.chunk_id for c in chunks],
documents=texts,
embeddings=embeddings,
metadatas=[c.metadata for c in chunks]
)
# 7. Update status
await self.db.update_status(doc.doc_id, "ready", chunk_count=len(chunks))
return doc.doc_id
The ingestion service follows a clear pipeline: parse, deduplicate, record metadata, chunk, embed, store, and update status. Each step is idempotent — if the process crashes after chunking but before embedding, the metadata DB still shows "processing" status, and a retry will start fresh. The batch embedding call (step 5) is critical for performance: embedding 200 chunks one at a time would take 200 API calls, but batching them sends far fewer requests and often takes advantage of GPU parallelism on the embedding service side.
Embeddings & Vector Store
Plain Language
Embeddings are the magic that lets our system understand meaning rather than just matching keywords. When you embed a piece of text, you convert it into a list of numbers (a vector) that captures the semantic meaning of that text. Two pieces of text that discuss similar topics will have similar vectors, even if they use completely different words. This is what makes RAG so much more powerful than traditional keyword search.
Imagine you have a document chunk that says "The company's quarterly earnings exceeded analyst expectations by 15%." If a user asks "How did the business perform financially?", a keyword search would fail — there's no word overlap. But the embedding vectors for both texts would be very close in the vector space, because they're both about financial performance. The embedding model has learned, from training on billions of text examples, that "quarterly earnings exceeded expectations" and "business performed financially" refer to related concepts.
The vector store is a specialized database optimized for storing these vectors and performing similarity searches. When a query comes in, we embed the query text into a vector, then ask the vector store "find me the 10 stored vectors most similar to this query vector." The store uses mathematical distance metrics (cosine similarity or L2 distance) and indexing algorithms (like HNSW — Hierarchical Navigable Small World graphs) to do this search efficiently, even across millions of vectors.
For this capstone, we use ChromaDB because it's the simplest to set up — it runs in-process with your Python application, requires no external services, and persists data to disk. For production deployments with millions of documents, you'd graduate to pgvector (if you already use PostgreSQL), Pinecone, Weaviate, or Amazon OpenSearch Serverless.
Deep Dive
Let's implement the embedding service with batching, caching, and provider abstraction:
from openai import AsyncOpenAI
import numpy as np
class EmbeddingService:
def __init__(self, model: str = "text-embedding-3-small"):
self.client = AsyncOpenAI()
self.model = model
self._cache: dict[str, list[float]] = {}
async def embed_batch(
self, texts: list[str], batch_size: int = 64
) -> list[list[float]]:
all_embeddings = [None] * len(texts)
uncached_indices = []
# Check cache first
for i, text in enumerate(texts):
key = hashlib.md5(text.encode()).hexdigest()
if key in self._cache:
all_embeddings[i] = self._cache[key]
else:
uncached_indices.append(i)
# Batch embed uncached texts
for start in range(0, len(uncached_indices), batch_size):
batch_idx = uncached_indices[start:start + batch_size]
batch_texts = [texts[i] for i in batch_idx]
response = await self.client.embeddings.create(
model=self.model,
input=batch_texts
)
for j, item in enumerate(response.data):
idx = batch_idx[j]
all_embeddings[idx] = item.embedding
key = hashlib.md5(texts[idx].encode()).hexdigest()
self._cache[key] = item.embedding
return all_embeddings
async def embed_query(self, text: str) -> list[float]:
result = await self.embed_batch([text])
return result[0]
The embedding service uses text-embedding-3-small from OpenAI, which produces 1536-dimensional vectors at very low cost ($0.02 per million tokens). The in-memory cache prevents re-embedding identical text, which matters when you're iterating on chunking strategies and reprocessing the same documents. For production, you'd replace this cache with Redis or a persistent store.
Now let's set up ChromaDB as our vector store with a clean wrapper:
import chromadb
from chromadb.config import Settings
class VectorStore:
def __init__(self, persist_dir: str = "./chroma_data"):
self.client = chromadb.PersistentClient(path=persist_dir)
def get_collection(self, name: str = "documents"):
return self.client.get_or_create_collection(
name=name,
metadata={"hnsw:space": "cosine"}
)
def add(self, ids, documents, embeddings, metadatas,
collection_name="documents"):
col = self.get_collection(collection_name)
col.add(
ids=ids,
documents=documents,
embeddings=embeddings,
metadatas=metadatas
)
def query(self, query_embedding, n_results=5,
where=None, collection_name="documents"):
col = self.get_collection(collection_name)
results = col.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=where,
include=["documents", "metadatas", "distances"]
)
return [
{
"text": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"score": 1.0 - results["distances"][0][i],
}
for i in range(len(results["documents"][0]))
]
def delete_document(self, doc_id: str, collection_name="documents"):
col = self.get_collection(collection_name)
col.delete(where={"doc_id": doc_id})
The vector store wrapper provides a clean interface with just four operations: create a collection, add vectors, query for similar vectors, and delete by document ID. ChromaDB's PersistentClient saves data to disk, so vectors survive application restarts. The HNSW (Hierarchical Navigable Small World) index with cosine similarity is the industry standard for approximate nearest neighbor search — it provides sub-millisecond query times even for collections with hundreds of thousands of vectors.
| Embedding Model | Dimensions | MTEB Score | Cost (per 1M tokens) | Best For |
|---|---|---|---|---|
| text-embedding-3-small | 1536 | 62.3 | $0.02 | Cost-sensitive, general use |
| text-embedding-3-large | 3072 | 64.6 | $0.13 | High accuracy requirements |
| Cohere embed-v3 | 1024 | 64.5 | $0.10 | Multilingual documents |
| all-MiniLM-L6-v2 (local) | 384 | 56.3 | Free | Offline / privacy-sensitive |
For production with pgvector, the setup is: CREATE EXTENSION vector; then create a table with a vector(1536) column. pgvector supports both exact and approximate (IVFFlat, HNSW) indexing. The advantage is that your vectors live alongside your relational metadata in the same PostgreSQL database, simplifying operations and enabling hybrid queries that combine vector similarity with SQL filters.
RAG Retrieval Pipeline
Plain Language
The retrieval pipeline is the heart of our document portal — it takes a user's question and finds the most relevant pieces of information from all the documents in our system. Think of it as the moment a library patron approaches the reference desk with a question, and the librarian's expertise kicks in to find exactly the right sources.
The retrieval process is more sophisticated than a simple vector search. First, we embed the query using the same embedding model we used for documents. Then we perform a similarity search in the vector store to get candidate chunks. But raw similarity scores aren't always reliable — sometimes a chunk scores highly because it shares vocabulary with the query but isn't actually answering the question. So we add a reranking step that uses a cross-encoder model to more accurately score each candidate's relevance to the specific question.
After reranking, we have our top-K most relevant chunks. We then assemble context by formatting these chunks with their source metadata (filename, page number) into a prompt that the LLM can use. The LLM receives the user's question plus these context chunks and generates an answer that's grounded in the actual document content, complete with citations pointing back to the specific sources.
We also implement conversation memory so users can ask follow-up questions. If a user asks "What's the return policy?" and then follows up with "How long do I have?", the system needs to understand that "how long" refers to the return policy from the previous question. We do this by maintaining a conversation history and reformulating follow-up questions to be self-contained before running retrieval.
Deep Dive
The retrieval pipeline combines vector search, optional reranking, and context assembly into a single cohesive service:
from openai import AsyncOpenAI
from dataclasses import dataclass
@dataclass
class RetrievalResult:
text: str
score: float
metadata: dict
class RAGPipeline:
def __init__(self, embedder, vector_store, llm_client=None):
self.embedder = embedder
self.vector_store = vector_store
self.llm = llm_client or AsyncOpenAI()
async def reformulate_query(
self, query: str, history: list[dict]
) -> str:
"""Rewrite follow-up questions to be self-contained."""
if not history:
return query
messages = [
{"role": "system", "content": (
"Rewrite the user's follow-up question to be "
"self-contained, incorporating context from the "
"conversation history. Return ONLY the rewritten "
"question, nothing else."
)},
*history[-4:], # Last 4 turns
{"role": "user", "content": query}
]
resp = await self.llm.chat.completions.create(
model="gpt-4o-mini", messages=messages,
temperature=0, max_tokens=200
)
return resp.choices[0].message.content.strip()
async def retrieve(
self, query: str, top_k: int = 5,
filters: dict = None, history: list[dict] = None
) -> list[RetrievalResult]:
# Step 1: Reformulate if there's conversation history
search_query = await self.reformulate_query(
query, history or []
)
# Step 2: Embed the query
query_vec = await self.embedder.embed_query(search_query)
# Step 3: Vector search (retrieve more than needed for reranking)
raw = self.vector_store.query(
query_embedding=query_vec,
n_results=top_k * 3,
where=filters
)
# Step 4: Score-based filtering (remove low quality)
candidates = [
r for r in raw if r["score"] > 0.3
]
# Step 5: LLM-based reranking for top precision
reranked = await self._rerank(search_query, candidates, top_k)
return reranked
async def _rerank(self, query, candidates, top_k):
if len(candidates) <= top_k:
return [
RetrievalResult(c["text"], c["score"], c["metadata"])
for c in candidates
]
# Use LLM to score relevance (lightweight reranker)
scored = []
for c in candidates[:top_k * 2]:
resp = await self.llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content":
f"Rate 0-10 how relevant this passage is to "
f"the query.\nQuery: {query}\n"
f"Passage: {c['text'][:500]}\n"
f"Score (just the number):"
}],
temperature=0, max_tokens=5
)
try:
score = float(resp.choices[0].message.content.strip())
except:
score = c["score"] * 10
scored.append((c, score))
scored.sort(key=lambda x: x[1], reverse=True)
return [
RetrievalResult(c["text"], s / 10, c["metadata"])
for c, s in scored[:top_k]
]
The retrieval pipeline follows a multi-stage funnel: we retrieve 3x the desired results from the vector store (cast a wide net), filter out low-confidence matches (score below 0.3), then rerank the remaining candidates using the LLM as a judge. The LLM-based reranker is a lightweight alternative to a dedicated cross-encoder model — it costs a few cents per query but significantly improves precision. For higher throughput, you'd replace this with a local cross-encoder like cross-encoder/ms-marco-MiniLM-L-6-v2 from sentence-transformers.
The synthesis step generates the final answer with citations:
class RAGSynthesizer:
def __init__(self, llm_client=None):
self.llm = llm_client or AsyncOpenAI()
def build_context(self, results: list[RetrievalResult]) -> str:
parts = []
for i, r in enumerate(results, 1):
source = (
f"{r.metadata['filename']}, "
f"page {r.metadata.get('page', '?')}"
)
parts.append(
f"[Source {i}] ({source}):\n{r.text}\n"
)
return "\n".join(parts)
async def generate(
self, query: str, results: list[RetrievalResult],
history: list[dict] = None, stream: bool = False
):
context = self.build_context(results)
system_prompt = (
"You are a document assistant. Answer the user's "
"question based ONLY on the provided sources. "
"Cite sources using [Source N] notation. If the "
"sources don't contain the answer, say so clearly. "
"Never make up information."
)
messages = [
{"role": "system", "content": system_prompt},
*(history or []),
{"role": "user", "content":
f"Sources:\n{context}\n\nQuestion: {query}"
}
]
if stream:
return await self.llm.chat.completions.create(
model="gpt-4o", messages=messages,
temperature=0.1, max_tokens=1500, stream=True
)
resp = await self.llm.chat.completions.create(
model="gpt-4o", messages=messages,
temperature=0.1, max_tokens=1500
)
return {
"answer": resp.choices[0].message.content,
"sources": [
{"filename": r.metadata["filename"],
"page": r.metadata.get("page"),
"score": round(r.score, 3)}
for r in results
],
"model": "gpt-4o"
}
The synthesizer constructs a carefully structured prompt that includes numbered source citations, which the LLM can reference in its answer. Setting temperature=0.1 keeps the output factual and closely grounded in the sources. The system prompt explicitly instructs the model to say "I don't know" rather than hallucinate — this is critical for a document Q&A system where users expect accurate, verifiable answers. The streaming option enables real-time response display in the chat interface, improving perceived latency.
Don't skip the "answer only from sources" instruction in your system prompt. Without it, the LLM will happily blend its pre-training knowledge with your document content, making it impossible for users to distinguish between verified document information and the model's general knowledge. This is the number one source of trust issues in RAG applications.
FastAPI Backend
Plain Language
The FastAPI backend is the central nervous system of our document portal. It's the web server that accepts HTTP requests from the frontend (or any client), routes them to the appropriate service (ingestion, retrieval, or synthesis), and returns structured responses. Think of it as the receptionist at our smart library — it greets visitors, directs them to the right department, and makes sure everyone's requests are handled properly.
FastAPI was chosen for several reasons. It's async-native, meaning it can handle many simultaneous requests without blocking — while one user's document is being processed, another user's query is being answered, and a third user is uploading a new file. It has automatic API documentation — just by defining your endpoints with type hints, you get a beautiful interactive Swagger UI at /docs that lets developers (and the course instructors!) explore and test every endpoint. And it uses Pydantic models for request/response validation, ensuring that malformed requests are rejected with clear error messages before they reach your business logic.
Our API exposes five core endpoints: POST /upload for document ingestion, GET /documents for listing uploaded documents, POST /query for single-shot RAG questions, POST /chat for conversational RAG with history, and GET /health for monitoring. Each endpoint is designed to be stateless (all state lives in the databases), making horizontal scaling straightforward — just run more instances behind a load balancer.
Deep Dive
Here's the complete FastAPI application with all endpoints, dependency injection, and error handling:
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
import tempfile, shutil, json, asyncio
from pathlib import Path
app = FastAPI(title="Document Portal API", version="1.0")
app.add_middleware(
CORSMiddleware, allow_origins=["*"],
allow_methods=["*"], allow_headers=["*"]
)
# --- Services (initialized at startup) ---
parser = DocumentParser()
chunker = RecursiveChunker()
embedder = EmbeddingService()
store = VectorStore()
pipeline = RAGPipeline(embedder, store)
synthesizer = RAGSynthesizer()
# --- Pydantic Models ---
class QueryRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=2000)
collection: str = "default"
top_k: int = Field(default=5, ge=1, le=20)
class ChatRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=2000)
conversation_id: Optional[str] = None
collection: str = "default"
stream: bool = False
class SourceInfo(BaseModel):
filename: str
page: Optional[int]
score: float
class QueryResponse(BaseModel):
answer: str
sources: list[SourceInfo]
model: str
# --- Conversation store (in-memory; use Redis in prod) ---
conversations: dict[str, list[dict]] = {}
# --- Endpoints ---
@app.get("/health")
async def health():
return {"status": "ok", "version": "1.0"}
@app.post("/upload")
async def upload_document(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = None,
collection: str = "default"
):
# Validate file type
suffix = Path(file.filename).suffix.lower()
if suffix not in DocumentParser.SUPPORTED:
raise HTTPException(400, f"Unsupported: {suffix}")
# Save to temp file
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
shutil.copyfileobj(file.file, tmp)
tmp_path = Path(tmp.name)
tmp.close()
# Process in background
ingestion = IngestionService(parser, chunker, embedder, store, db)
background_tasks.add_task(ingestion.ingest, tmp_path, collection)
return {"status": "processing", "filename": file.filename}
@app.post("/query", response_model=QueryResponse)
async def query_documents(req: QueryRequest):
results = await pipeline.retrieve(
req.question, top_k=req.top_k,
filters={"collection": req.collection} if req.collection != "default" else None
)
if not results:
return QueryResponse(
answer="No relevant documents found.",
sources=[], model="none"
)
response = await synthesizer.generate(req.question, results)
return QueryResponse(**response)
@app.post("/chat")
async def chat(req: ChatRequest):
conv_id = req.conversation_id or str(uuid.uuid4())
history = conversations.get(conv_id, [])
results = await pipeline.retrieve(
req.message, history=history
)
response = await synthesizer.generate(
req.message, results, history=history, stream=req.stream
)
if req.stream:
async def event_stream():
full = ""
async for chunk in response:
delta = chunk.choices[0].delta.content or ""
full += delta
yield f"data: {json.dumps({'text': delta})}\n\n"
history.append({"role": "user", "content": req.message})
history.append({"role": "assistant", "content": full})
conversations[conv_id] = history
return StreamingResponse(
event_stream(), media_type="text/event-stream",
headers={"X-Conversation-Id": conv_id}
)
# Non-streaming response
history.append({"role": "user", "content": req.message})
history.append({"role": "assistant", "content": response["answer"]})
conversations[conv_id] = history
return {
"conversation_id": conv_id,
"answer": response["answer"],
"sources": response["sources"]
}
The application uses FastAPI's BackgroundTasks for document processing — when a user uploads a file, the API immediately returns a "processing" status while ingestion happens asynchronously. This prevents the upload endpoint from timing out on large documents. The /chat endpoint supports both streaming (Server-Sent Events) and non-streaming responses, giving the frontend flexibility in how it renders answers.
The conversation store is in-memory here for simplicity, but in production you'd use Redis with TTL-based expiration (conversations expire after 24 hours of inactivity) or store them in PostgreSQL for persistence. The conversation ID is returned to the client so it can maintain context across multiple requests.
CORS middleware is configured to allow all origins during development. For production, you'd restrict this to your frontend's domain. The Pydantic models provide automatic validation — if someone sends a question longer than 2000 characters or requests more than 20 results, FastAPI returns a 422 error with a clear description of the validation failure.
FastAPI automatically generates interactive API docs at /docs (Swagger UI) and /redoc (ReDoc). Use these to test your endpoints during development without writing any frontend code. You can upload files, send queries, and inspect responses directly in the browser.
Frontend & Deployment
Plain Language
The frontend is the face of our document portal — the interface users interact with to upload documents and ask questions. We build a simple but effective chat-based interface that feels familiar to anyone who has used ChatGPT or similar tools. Users can drag-and-drop documents to upload them, type questions in a chat input, see answers stream in real-time, and click on source citations to see which document and page the answer came from.
For deployment, we containerize the entire application with Docker so it can run anywhere — on a developer's laptop, on a cloud VM, or on AWS ECS. Docker ensures that "it works on my machine" translates to "it works everywhere" by bundling the application code, Python dependencies, and system libraries into a single portable image. We use Docker Compose to orchestrate the multiple services (API server, ChromaDB, PostgreSQL) and define how they connect to each other.
The deployment section also covers essential production concerns: environment variable management for API keys, health check endpoints for load balancers, logging for debugging, and basic monitoring to know when something goes wrong. These aren't glamorous features, but they're the difference between a demo that works once and a service that runs reliably for months.
Deep Dive
Let's start with a minimal but functional Streamlit frontend for rapid prototyping:
import streamlit as st
import requests
API_URL = "http://localhost:8000"
st.set_page_config(page_title="Document Portal", layout="wide")
st.title("📄 Document Q&A Portal")
# --- Sidebar: Document Upload ---
with st.sidebar:
st.header("Upload Documents")
uploaded = st.file_uploader(
"Drop PDF, TXT, or MD files",
type=["pdf", "txt", "md"],
accept_multiple_files=True
)
if uploaded:
for f in uploaded:
resp = requests.post(
f"{API_URL}/upload",
files={"file": (f.name, f, f.type)}
)
if resp.status_code == 200:
st.success(f"Uploaded: {f.name}")
else:
st.error(f"Failed: {f.name}")
# --- Chat Interface ---
if "messages" not in st.session_state:
st.session_state.messages = []
if "conv_id" not in st.session_state:
st.session_state.conv_id = None
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
if "sources" in msg:
with st.expander("📚 Sources"):
for s in msg["sources"]:
st.caption(f"{s['filename']} p.{s['page']} "
f"(score: {s['score']:.2f})")
if prompt := st.chat_input("Ask about your documents..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("Searching documents..."):
resp = requests.post(f"{API_URL}/chat", json={
"message": prompt,
"conversation_id": st.session_state.conv_id
})
data = resp.json()
st.markdown(data["answer"])
st.session_state.conv_id = data["conversation_id"]
st.session_state.messages.append({
"role": "assistant",
"content": data["answer"],
"sources": data.get("sources", [])
})
Streamlit provides a rapid prototyping framework — this entire chat UI is about 50 lines of Python. The sidebar handles document uploads, the main area shows the conversation, and sources are displayed in expandable sections beneath each answer. For a production frontend, you'd build a React or Next.js application with proper state management, but Streamlit is perfect for the capstone because it lets you focus on the RAG pipeline rather than frontend engineering.
Now let's containerize everything with Docker Compose:
# docker-compose.yml
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- CHROMA_PERSIST_DIR=/data/chroma
- DATABASE_URL=postgresql://portal:secret@db:5432/portal
volumes:
- chroma_data:/data/chroma
- uploads:/data/uploads
depends_on:
db:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
frontend:
build:
context: .
dockerfile: Dockerfile.frontend
ports:
- "8501:8501"
environment:
- API_URL=http://api:8000
depends_on:
- api
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: portal
POSTGRES_PASSWORD: secret
POSTGRES_DB: portal
volumes:
- pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U portal"]
interval: 5s
timeout: 5s
retries: 5
volumes:
chroma_data:
pg_data:
uploads:
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system deps
RUN apt-get update && apt-get install -y --no-install-recommends \
curl build-essential && rm -rf /var/lib/apt/lists/*
# Install Python deps
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app/ ./app/
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", \
"--port", "8000", "--workers", "2"]
The Docker Compose setup orchestrates three services: the FastAPI backend, the Streamlit frontend, and a PostgreSQL database for metadata. ChromaDB runs embedded within the API process and persists to a Docker volume. Health checks ensure services start in the right order — the API waits for PostgreSQL to be ready before starting, and the frontend waits for the API.
For production deployment on AWS, you'd push the Docker images to ECR and run them on ECS Fargate:
# Deploy to AWS ECS (simplified)
# 1. Build and push to ECR
aws ecr get-login-password | docker login --username AWS \
--password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com
docker build -t doc-portal .
docker tag doc-portal:latest 123456789.dkr.ecr.us-east-1.amazonaws.com/doc-portal:latest
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/doc-portal:latest
# 2. Create ECS service with Fargate
aws ecs create-service \
--cluster genai-cluster \
--service-name doc-portal \
--task-definition doc-portal:1 \
--desired-count 2 \
--launch-type FARGATE \
--network-configuration "awsvpcConfiguration={subnets=[subnet-xxx],securityGroups=[sg-xxx],assignPublicIp=ENABLED}"
Before deploying to production: (1) Store API keys in AWS Secrets Manager, not environment variables, (2) Set up CloudWatch alarms for error rates and latency, (3) Enable HTTPS via an ALB with ACM certificate, (4) Restrict CORS to your frontend domain, (5) Add rate limiting to prevent abuse, (6) Set up log aggregation with CloudWatch Logs or similar.
| Component | Dev Setup | Production Setup |
|---|---|---|
| API Server | uvicorn, 1 worker | ECS Fargate, 2+ tasks, ALB |
| Vector Store | ChromaDB (in-process) | pgvector on RDS or OpenSearch |
| Metadata DB | SQLite | PostgreSQL on RDS |
| File Storage | Local disk | S3 with lifecycle policies |
| Frontend | Streamlit | React/Next.js on CloudFront |
| Secrets | .env file | AWS Secrets Manager |
| Monitoring | Console logs | CloudWatch + X-Ray |
Interview Ready
How to Explain This in 2 Minutes
I built a production-grade Document Q&A Portal that lets users upload PDFs, Word docs, and text files, then ask natural-language questions and get accurate, cited answers. The system uses a document ingestion pipeline that parses, chunks, and embeds documents into a vector store. At query time a RAG pipeline retrieves the most relevant chunks, injects them into a prompt, and streams an LLM-generated answer through a FastAPI backend to a chat-based frontend. I also added monitoring, evaluation metrics, and a deployment pipeline so the whole application runs reliably in production.
Likely Interview Questions
| Question | What They're Really Asking |
|---|---|
| Walk me through how a user's document goes from upload to being searchable. | Do you understand the full ingestion pipeline: parsing, chunking, embedding, and indexing? |
| How did you choose your chunking strategy and chunk size? | Can you reason about trade-offs between recall, precision, and latency in retrieval? |
| What happens when the retriever returns irrelevant chunks? | Do you have fallback strategies and understand failure modes in RAG systems? |
| How would you scale this system to millions of documents? | Do you understand production concerns: sharding, async ingestion, caching, and infrastructure? |
| How do you evaluate the quality of the answers? | Can you measure faithfulness, relevance, and groundedness beyond simple accuracy? |
Model Answers
Ingestion Pipeline: When a document is uploaded, the backend parses it into raw text using format-specific extractors (PyPDF, python-docx, etc.), then splits it into overlapping chunks of roughly 512 tokens. Each chunk is embedded using a sentence-transformer model and stored alongside its metadata in a vector database like ChromaDB or Pinecone. This pipeline is idempotent so re-uploading the same document does not create duplicates.
RAG Retrieval: At query time the user question is embedded with the same model, a cosine-similarity search retrieves the top-k chunks, and a reranker optionally re-scores them. The selected chunks are injected into a system prompt that instructs the LLM to answer only from the provided context and cite paragraph numbers. This design keeps the LLM grounded and reduces hallucination.
Production Readiness: The FastAPI backend exposes async endpoints with streaming responses so the user sees tokens as they are generated. I containerised the stack with Docker Compose, added structured logging and health-check endpoints, and set up basic CloudWatch metrics for latency, error rate, and token usage. For scale I would swap the local vector store for a managed service and add an async task queue for ingestion.
System Design Scenario
Your company wants to build an internal knowledge base that indexes 50,000 policy documents and serves 500 concurrent users. Describe the end-to-end architecture. Consider how you would handle document versioning, access control per department, near-real-time ingestion of new documents, and how you would monitor retrieval quality over time. Sketch the key components: an ingestion worker, a vector database with namespace isolation, a retrieval API with caching, an LLM gateway with rate limiting, and a feedback loop that captures user thumbs-up/down to continuously improve chunk quality.
Common Mistakes
- Ignoring chunk overlap: Using non-overlapping chunks causes answers to miss information that spans chunk boundaries, leading to incomplete or incorrect responses.
- Skipping evaluation: Deploying a RAG system without measuring faithfulness and relevance means you cannot tell whether the system is hallucinating or returning stale information.
- Treating the vector store as a black box: Not inspecting retrieved chunks during debugging makes it nearly impossible to diagnose whether poor answers stem from bad retrieval, bad prompting, or bad source data.