Case Study 04 — Data Layer cho AI Product (RAG)

“Design production RAG cho enterprise search: 10M docs, 1K queries/s, hybrid retrieval, citation, freshness. Stack 2024-2026.”

Tags: case-study rag ai vector-db embeddings Liên quan: Tuan-15-Vector-DB-AI · Tuan-13-Search-Engines-ES


1. Requirements

Functional

  • “AI assistant” for company knowledge base
  • Sources: Confluence, GitHub, Slack, internal docs (PDF, MD, etc.)
  • Q&A with citations
  • Real-time updates as docs change
  • Per-user access control (some docs restricted)
  • Multi-language

Non-functional

  • 10M document chunks
  • 1K queries/sec peak
  • P99 latency < 3s (including LLM)
  • Document update propagates < 5 min
  • Cost optimization (LLM dominates)

2. Architecture

graph TB
    subgraph "Indexing"
        Sources[Sources: Confluence, GitHub, Slack, S3] --> Crawler[Crawler/Connector]
        Crawler --> Parser[Parser: PDF, MD, code]
        Parser --> Chunker[Chunker]
        Chunker --> Embedder[Embedder: OpenAI/BGE]
        Embedder --> VecDB[(Qdrant<br/>vector store)]
        Chunker --> ES[(Elasticsearch<br/>BM25)]
        Parser --> Meta[(Postgres<br/>doc metadata, ACL)]
    end

    subgraph "Query"
        User[User] --> QApi[Query API]
        QApi --> QEmbed[Embed query]
        QApi --> AcL[Check ACL]
        QEmbed --> Vec[Vector search<br/>top 50]
        QApi --> BM[BM25 top 50]
        Vec --> Fusion[RRF fusion]
        BM --> Fusion
        Fusion --> Rerank[Cross-encoder rerank<br/>top 5]
        Rerank --> LLM[LLM: GPT-4o/Claude/Llama]
        LLM --> Answer[Answer + citations]
        Answer --> Cache[Cache common Q]
    end

3. Document Ingestion

3.1 Source connectors

class ConfluenceConnector:
    async def sync(self, since: datetime):
        async for page in self.client.list_updated(since):
            yield Document(
                source='confluence',
                source_id=page.id,
                title=page.title,
                content=html_to_md(page.body),
                url=page.url,
                authors=page.authors,
                updated_at=page.updated_at,
                space=page.space,
                acl=compute_acl(page)
            )
 
# Similar for GitHub, Slack, S3

Schedule sync: every 5 minutes for incremental.

3.2 Parsing

  • Markdown: native
  • HTML: convert to MD
  • PDF: pypdf, unstructured.io for layout
  • Office: pandoc
  • Code: language-aware chunking
  • Slack: thread-aware
  • Images in docs: OCR (Tesseract) or skip

3.3 Chunking strategy

def chunk_document(doc):
    if doc.source == 'code':
        return chunk_by_function(doc.content, doc.language)
    if doc.source == 'slack':
        return chunk_by_thread(doc.content)
    # General prose
    return chunk_recursive(
        doc.content,
        max_size=500,        # tokens
        overlap=50,
        respect_sentences=True,
        respect_headings=True
    )

Result: list of chunks with metadata.

3.4 Embedding

async def embed_batch(chunks):
    texts = [c.content for c in chunks]
    response = await openai.embeddings.create(
        model="text-embedding-3-small",
        input=texts,
        dimensions=1536
    )
    return [r.embedding for r in response.data]

Batch 100 chunks per API call. Rate limit handling.

3.5 Storage

Postgres for metadata + Qdrant for vectors + ES for BM25.

CREATE TABLE documents (
    id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    source text NOT NULL,
    source_id text NOT NULL,
    title text,
    url text,
    authors text[],
    space text,
    acl jsonb NOT NULL,  -- ACL info
    updated_at timestamptz,
    metadata jsonb,
    UNIQUE (source, source_id)
);
 
CREATE TABLE chunks (
    id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    document_id bigint REFERENCES documents(id) ON DELETE CASCADE,
    chunk_index int NOT NULL,
    content text NOT NULL,
    content_hash text NOT NULL,  -- dedup detection
    section text,                 -- for citation
    embedding vector(1536),       -- if using pgvector instead of Qdrant
    tokens int,
    indexed_at timestamptz DEFAULT now()
);
 
CREATE INDEX idx_chunks_document ON chunks(document_id);

Qdrant collection:

qdrant.upsert("chunks", points=[
    PointStruct(
        id=chunk.id,
        vector=chunk.embedding,
        payload={
            "document_id": chunk.document_id,
            "source": doc.source,
            "section": chunk.section,
            "acl_groups": doc.acl['groups'],  # for filter
            "updated_at": doc.updated_at.isoformat()
        }
    )
])

4.1 Retrieval

async def retrieve(query, user, top_k=50):
    # Get user's accessible groups
    user_groups = await get_user_groups(user.id)
 
    # Embed query
    q_embed = await embed(query)
 
    # Vector search (with ACL filter)
    vec_results = await qdrant.search(
        collection="chunks",
        query_vector=q_embed,
        query_filter=Filter(
            must=[FieldCondition(key="acl_groups", match=MatchAny(any=user_groups))]
        ),
        limit=top_k
    )
 
    # BM25 search
    bm25_results = await es.search(
        index="chunks",
        body={
            "query": {
                "bool": {
                    "must": {"match": {"content": query}},
                    "filter": {"terms": {"acl_groups": user_groups}}
                }
            },
            "size": top_k
        }
    )
 
    # RRF fusion
    fused = reciprocal_rank_fusion(vec_results, bm25_results, k=60)
    return fused

4.2 Reranking

async def rerank(query, candidates, top_k=5):
    # Cohere Rerank API (or local cross-encoder)
    response = await cohere.rerank(
        query=query,
        documents=[c.content for c in candidates],
        top_n=top_k,
        model="rerank-multilingual-v3.0"
    )
    return [candidates[r.index] for r in response.results]

Or self-host cross-encoder:

from sentence_transformers import CrossEncoder
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2')
 
def rerank_local(query, candidates, top_k=5):
    pairs = [(query, c.content) for c in candidates]
    scores = reranker.predict(pairs)
    ranked = sorted(zip(candidates, scores), key=lambda x: -x[1])
    return [c for c, _ in ranked[:top_k]]

Trade-off: API cost (~$2/1K reranks) vs latency/GPU cost self-host.

4.3 Context construction

def build_context(chunks):
    # Sort by document, then by chunk_index for natural ordering
    context = ""
    for chunk in sorted(chunks, key=lambda c: (c.document_id, c.chunk_index)):
        context += f"\n\n[Source: {chunk.document.title} ({chunk.document.url})]\n"
        context += chunk.content
    return context

5. LLM Call

async def generate_answer(query, context, history=None):
    system_prompt = """You are a helpful AI assistant. Answer based ONLY on the provided context.
If the answer is not in the context, say so. Always cite sources using [Source: <url>] format.
Be concise."""
 
    messages = [{"role": "system", "content": system_prompt}]
    if history:
        messages.extend(history[-5:])  # last 5 turns
    messages.append({
        "role": "user",
        "content": f"Context:\n{context}\n\nQuestion: {query}"
    })
 
    response = await openai.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        temperature=0.1,
        max_tokens=500
    )
    return response.choices[0].message.content

For cost optimization:

  • GPT-4o-mini for simple questions (5)
  • GPT-4o for complex
  • Self-host Llama 3.1 70B if volume justifies

6. Caching

6.1 Cache levels

graph LR
    Q[Query] --> SemanticCache[Semantic cache<br/>recent similar Q]
    SemanticCache -->|hit| Return[Return cached]
    SemanticCache -->|miss| Embed
    Embed --> Search
    Search --> LLM
    LLM --> Store[Store in cache]

6.2 Semantic cache

Use the embedding! If user query embedding very similar to recently-cached query → return cached answer.

async def get_cached_answer(query_embed):
    # Search in cache vector store
    similar = await qdrant.search("query_cache", query_embed, limit=1, score_threshold=0.95)
    if similar and similar[0].score > 0.95:
        return similar[0].payload['answer'], similar[0].payload['sources']
    return None
 
async def cache_answer(query, query_embed, answer, sources):
    await qdrant.upsert("query_cache", points=[{
        "id": str(uuid4()),
        "vector": query_embed,
        "payload": {
            "query": query,
            "answer": answer,
            "sources": sources,
            "ttl": time.time() + 3600
        }
    }])

Saves 30-50% LLM cost for common Qs.

Caveat: invalidate when sources update.

6.3 Embedding cache

User asks same question 10 times → only embed once.

@redis.cache(ttl=300, key=lambda q: f"emb:{hash(q)}")
async def embed(query):
    return await openai.embeddings.create(...)

7. Access Control

7.1 Document-level ACL

{
    "groups": ["engineering", "all_employees"],
    "users": [42, 43],
    "public": false
}

User’s groups → filter chunks at retrieval.

7.2 Postgres for fast ACL check

CREATE TABLE user_groups (
    user_id bigint, group_name text,
    PRIMARY KEY (user_id, group_name)
);
 
-- Fetch user's groups (cached)
SELECT array_agg(group_name) FROM user_groups WHERE user_id = $1;

7.3 In retrieval

Filter at vector + BM25 level. Never let LLM see chunk user shouldn’t.

Audit log: who asked what, what was returned.


8. Freshness

8.1 Real-time updates

Connector polls every 5 min. Changed docs:

  1. Delete old chunks (DELETE FROM chunks WHERE document_id = X)
  2. Re-chunk + re-embed
  3. Insert new
  4. Qdrant + ES update

8.2 Webhook-based

Confluence/GitHub webhooks → trigger immediate re-index.

8.3 Stale tag

Show “Last updated 5 min ago” in citations.


9. Evaluation

9.1 Golden set

50-200 question + ideal answer + ideal source citations.

eval_set = [
    {
        "question": "How do we deploy to production?",
        "ideal_chunks": [chunk_id_42, chunk_id_43],
        "ideal_answer_keywords": ["kubectl", "deploy", "rollout"]
    },
    ...
]

9.2 Metrics

  • Hit rate@5: % questions where at least one ideal chunk in top-5 retrieved
  • MRR: mean reciprocal rank
  • Answer faithfulness: LLM-as-judge or human
  • Hallucination rate: claims not supported by context

9.3 Continuous eval

Run eval set on every deploy. CI fails if metrics drop.

9.4 User feedback

Thumbs up/down on answers. Train better retriever/reranker.


10. Cost Analysis

10.1 Indexing cost

10M chunks × ~200 tokens = 2B tokens. At 40 one-time.

Re-index on schema change: $40-200 per re-embed.

10.2 Query cost

Per query:

  • Embed: ~$0.0001
  • Vector + BM25 search: ~$0 (own infra)
  • Rerank: 0 (self-host)
  • LLM: ~$0.005 (GPT-4o-mini, 4K context, 500 tokens out)

Total: ~$0.01/query.

1M queries/day = 300K/month**.

10.3 Optimization

  • Semantic cache: -30% LLM cost
  • GPT-4o-mini for simple Q: -80% on those
  • Self-host Llama 3.1 70B: ~$0.001/query (~10x cheaper at scale)

After optimization: ~$50-100K/month at 1M queries/day.


11. Failure Modes

FailureImpactMitigation
OpenAI API downNo new queriesFallback to local model (cheaper, lower quality)
Qdrant downNo vector retrievalBM25-only fallback
ES downNo keyword retrievalVector-only
Embedding cost spikeBill shockQuota + alert
LLM hallucinationWrong answerFaithfulness metric, prompt engineering
Stale dataWrong infoAuto-refresh + display “last updated”

12. Privacy

  • User queries: ephemeral or per-user-session retained
  • Don’t send PII to OpenAI (or use Azure OpenAI with DPA)
  • Self-host model for confidential workloads
  • Per-user encryption at rest for query history

13. Tiếp theo

Case-Design-Migrate-Monolith-DB

Cập nhật: 2026-05-16