Case Study 04 — Data Layer cho AI Product (RAG)
“Design production RAG cho enterprise search: 10M docs, 1K queries/s, hybrid retrieval, citation, freshness. Stack 2024-2026.”
Tags: case-study rag ai vector-db embeddings Liên quan: Tuan-15-Vector-DB-AI · Tuan-13-Search-Engines-ES
1. Requirements
Functional
- “AI assistant” for company knowledge base
- Sources: Confluence, GitHub, Slack, internal docs (PDF, MD, etc.)
- Q&A with citations
- Real-time updates as docs change
- Per-user access control (some docs restricted)
- Multi-language
Non-functional
- 10M document chunks
- 1K queries/sec peak
- P99 latency < 3s (including LLM)
- Document update propagates < 5 min
- Cost optimization (LLM dominates)
2. Architecture
graph TB subgraph "Indexing" Sources[Sources: Confluence, GitHub, Slack, S3] --> Crawler[Crawler/Connector] Crawler --> Parser[Parser: PDF, MD, code] Parser --> Chunker[Chunker] Chunker --> Embedder[Embedder: OpenAI/BGE] Embedder --> VecDB[(Qdrant<br/>vector store)] Chunker --> ES[(Elasticsearch<br/>BM25)] Parser --> Meta[(Postgres<br/>doc metadata, ACL)] end subgraph "Query" User[User] --> QApi[Query API] QApi --> QEmbed[Embed query] QApi --> AcL[Check ACL] QEmbed --> Vec[Vector search<br/>top 50] QApi --> BM[BM25 top 50] Vec --> Fusion[RRF fusion] BM --> Fusion Fusion --> Rerank[Cross-encoder rerank<br/>top 5] Rerank --> LLM[LLM: GPT-4o/Claude/Llama] LLM --> Answer[Answer + citations] Answer --> Cache[Cache common Q] end
3. Document Ingestion
3.1 Source connectors
class ConfluenceConnector:
async def sync(self, since: datetime):
async for page in self.client.list_updated(since):
yield Document(
source='confluence',
source_id=page.id,
title=page.title,
content=html_to_md(page.body),
url=page.url,
authors=page.authors,
updated_at=page.updated_at,
space=page.space,
acl=compute_acl(page)
)
# Similar for GitHub, Slack, S3Schedule sync: every 5 minutes for incremental.
3.2 Parsing
- Markdown: native
- HTML: convert to MD
- PDF: pypdf, unstructured.io for layout
- Office: pandoc
- Code: language-aware chunking
- Slack: thread-aware
- Images in docs: OCR (Tesseract) or skip
3.3 Chunking strategy
def chunk_document(doc):
if doc.source == 'code':
return chunk_by_function(doc.content, doc.language)
if doc.source == 'slack':
return chunk_by_thread(doc.content)
# General prose
return chunk_recursive(
doc.content,
max_size=500, # tokens
overlap=50,
respect_sentences=True,
respect_headings=True
)Result: list of chunks with metadata.
3.4 Embedding
async def embed_batch(chunks):
texts = [c.content for c in chunks]
response = await openai.embeddings.create(
model="text-embedding-3-small",
input=texts,
dimensions=1536
)
return [r.embedding for r in response.data]Batch 100 chunks per API call. Rate limit handling.
3.5 Storage
Postgres for metadata + Qdrant for vectors + ES for BM25.
CREATE TABLE documents (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
source text NOT NULL,
source_id text NOT NULL,
title text,
url text,
authors text[],
space text,
acl jsonb NOT NULL, -- ACL info
updated_at timestamptz,
metadata jsonb,
UNIQUE (source, source_id)
);
CREATE TABLE chunks (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
document_id bigint REFERENCES documents(id) ON DELETE CASCADE,
chunk_index int NOT NULL,
content text NOT NULL,
content_hash text NOT NULL, -- dedup detection
section text, -- for citation
embedding vector(1536), -- if using pgvector instead of Qdrant
tokens int,
indexed_at timestamptz DEFAULT now()
);
CREATE INDEX idx_chunks_document ON chunks(document_id);Qdrant collection:
qdrant.upsert("chunks", points=[
PointStruct(
id=chunk.id,
vector=chunk.embedding,
payload={
"document_id": chunk.document_id,
"source": doc.source,
"section": chunk.section,
"acl_groups": doc.acl['groups'], # for filter
"updated_at": doc.updated_at.isoformat()
}
)
])4. Hybrid Search
4.1 Retrieval
async def retrieve(query, user, top_k=50):
# Get user's accessible groups
user_groups = await get_user_groups(user.id)
# Embed query
q_embed = await embed(query)
# Vector search (with ACL filter)
vec_results = await qdrant.search(
collection="chunks",
query_vector=q_embed,
query_filter=Filter(
must=[FieldCondition(key="acl_groups", match=MatchAny(any=user_groups))]
),
limit=top_k
)
# BM25 search
bm25_results = await es.search(
index="chunks",
body={
"query": {
"bool": {
"must": {"match": {"content": query}},
"filter": {"terms": {"acl_groups": user_groups}}
}
},
"size": top_k
}
)
# RRF fusion
fused = reciprocal_rank_fusion(vec_results, bm25_results, k=60)
return fused4.2 Reranking
async def rerank(query, candidates, top_k=5):
# Cohere Rerank API (or local cross-encoder)
response = await cohere.rerank(
query=query,
documents=[c.content for c in candidates],
top_n=top_k,
model="rerank-multilingual-v3.0"
)
return [candidates[r.index] for r in response.results]Or self-host cross-encoder:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2')
def rerank_local(query, candidates, top_k=5):
pairs = [(query, c.content) for c in candidates]
scores = reranker.predict(pairs)
ranked = sorted(zip(candidates, scores), key=lambda x: -x[1])
return [c for c, _ in ranked[:top_k]]Trade-off: API cost (~$2/1K reranks) vs latency/GPU cost self-host.
4.3 Context construction
def build_context(chunks):
# Sort by document, then by chunk_index for natural ordering
context = ""
for chunk in sorted(chunks, key=lambda c: (c.document_id, c.chunk_index)):
context += f"\n\n[Source: {chunk.document.title} ({chunk.document.url})]\n"
context += chunk.content
return context5. LLM Call
async def generate_answer(query, context, history=None):
system_prompt = """You are a helpful AI assistant. Answer based ONLY on the provided context.
If the answer is not in the context, say so. Always cite sources using [Source: <url>] format.
Be concise."""
messages = [{"role": "system", "content": system_prompt}]
if history:
messages.extend(history[-5:]) # last 5 turns
messages.append({
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {query}"
})
response = await openai.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.1,
max_tokens=500
)
return response.choices[0].message.contentFor cost optimization:
- GPT-4o-mini for simple questions (5)
- GPT-4o for complex
- Self-host Llama 3.1 70B if volume justifies
6. Caching
6.1 Cache levels
graph LR Q[Query] --> SemanticCache[Semantic cache<br/>recent similar Q] SemanticCache -->|hit| Return[Return cached] SemanticCache -->|miss| Embed Embed --> Search Search --> LLM LLM --> Store[Store in cache]
6.2 Semantic cache
Use the embedding! If user query embedding very similar to recently-cached query → return cached answer.
async def get_cached_answer(query_embed):
# Search in cache vector store
similar = await qdrant.search("query_cache", query_embed, limit=1, score_threshold=0.95)
if similar and similar[0].score > 0.95:
return similar[0].payload['answer'], similar[0].payload['sources']
return None
async def cache_answer(query, query_embed, answer, sources):
await qdrant.upsert("query_cache", points=[{
"id": str(uuid4()),
"vector": query_embed,
"payload": {
"query": query,
"answer": answer,
"sources": sources,
"ttl": time.time() + 3600
}
}])Saves 30-50% LLM cost for common Qs.
Caveat: invalidate when sources update.
6.3 Embedding cache
User asks same question 10 times → only embed once.
@redis.cache(ttl=300, key=lambda q: f"emb:{hash(q)}")
async def embed(query):
return await openai.embeddings.create(...)7. Access Control
7.1 Document-level ACL
{
"groups": ["engineering", "all_employees"],
"users": [42, 43],
"public": false
}User’s groups → filter chunks at retrieval.
7.2 Postgres for fast ACL check
CREATE TABLE user_groups (
user_id bigint, group_name text,
PRIMARY KEY (user_id, group_name)
);
-- Fetch user's groups (cached)
SELECT array_agg(group_name) FROM user_groups WHERE user_id = $1;7.3 In retrieval
Filter at vector + BM25 level. Never let LLM see chunk user shouldn’t.
Audit log: who asked what, what was returned.
8. Freshness
8.1 Real-time updates
Connector polls every 5 min. Changed docs:
- Delete old chunks (
DELETE FROM chunks WHERE document_id = X) - Re-chunk + re-embed
- Insert new
- Qdrant + ES update
8.2 Webhook-based
Confluence/GitHub webhooks → trigger immediate re-index.
8.3 Stale tag
Show “Last updated 5 min ago” in citations.
9. Evaluation
9.1 Golden set
50-200 question + ideal answer + ideal source citations.
eval_set = [
{
"question": "How do we deploy to production?",
"ideal_chunks": [chunk_id_42, chunk_id_43],
"ideal_answer_keywords": ["kubectl", "deploy", "rollout"]
},
...
]9.2 Metrics
- Hit rate@5: % questions where at least one ideal chunk in top-5 retrieved
- MRR: mean reciprocal rank
- Answer faithfulness: LLM-as-judge or human
- Hallucination rate: claims not supported by context
9.3 Continuous eval
Run eval set on every deploy. CI fails if metrics drop.
9.4 User feedback
Thumbs up/down on answers. Train better retriever/reranker.
10. Cost Analysis
10.1 Indexing cost
10M chunks × ~200 tokens = 2B tokens. At 40 one-time.
Re-index on schema change: $40-200 per re-embed.
10.2 Query cost
Per query:
- Embed: ~$0.0001
- Vector + BM25 search: ~$0 (own infra)
- Rerank: 0 (self-host)
- LLM: ~$0.005 (GPT-4o-mini, 4K context, 500 tokens out)
Total: ~$0.01/query.
1M queries/day = 300K/month**.
10.3 Optimization
- Semantic cache: -30% LLM cost
- GPT-4o-mini for simple Q: -80% on those
- Self-host Llama 3.1 70B: ~$0.001/query (~10x cheaper at scale)
After optimization: ~$50-100K/month at 1M queries/day.
11. Failure Modes
| Failure | Impact | Mitigation |
|---|---|---|
| OpenAI API down | No new queries | Fallback to local model (cheaper, lower quality) |
| Qdrant down | No vector retrieval | BM25-only fallback |
| ES down | No keyword retrieval | Vector-only |
| Embedding cost spike | Bill shock | Quota + alert |
| LLM hallucination | Wrong answer | Faithfulness metric, prompt engineering |
| Stale data | Wrong info | Auto-refresh + display “last updated” |
12. Privacy
- User queries: ephemeral or per-user-session retained
- Don’t send PII to OpenAI (or use Azure OpenAI with DPA)
- Self-host model for confidential workloads
- Per-user encryption at rest for query history
13. Tiếp theo
Case-Design-Migrate-Monolith-DB
Cập nhật: 2026-05-16