RAGService ingest_pdf End-to-End
What
A single async service class that turns a path to a PDF on disk into an integer chunk count written to PostgreSQL via pgvector. The service owns the HuggingFace-embedding-gated PDF pipeline from page extraction, through text chunking, through vector embedding, through INSERT ... :embedding::vector persistence, to a commit-and-return.
RAGService is the only place in the codebase that knows how to produce a chunk into pgvector end-to-end. It is called from three call-sites:
- The one-shot CLI adapter
ingest.pyfor manual / first-time ingestion. - The Celery worker task
app/worker.py::ingest_pdf_taskfor background PDF uploads. - The HTTP upload endpoint
/api/v1/admin/upload-background(which fans out into the Celery task above).
Project Context
In full_project_context_updated.txt, the FCA Support Agent connects four artifacts around one service:
ingest.py—await rag.ingest_pdf("data/FCA faqs.pdf"); prints the chunk count. The simplest contract in the codebase.app/worker.py— wrapsRAGService().ingest_pdf(file_path)in Celery’s sync-by-async bridge, returning{"status": "success", "chunks": chunks_count}.docker-compose.yml— theworkerservice declaresHF_TOKEN: ${HF_TOKEN}in its environment alongsideDATABASE_URLandCELERY_BROKER_URL. WithoutHF_TOKENset, hosted HuggingFace Inference API endpoints fall back to a warning; locally-bundledsentence-transformersstill work because they don’t call the API.requirements.txt— pins the three RAG-critical packages:PyPDF2(PDF text extraction),huggingface_hub>=0.21.0(library side, not the sentence-transformers model itself), andlangchain-groq(chat completions is on the other stack — Groq is for LLM output, HF is for embeddings). This split is deliberate and is the architecture this page documents.
The pgvector column itself is enabled by init_db() in app/database.py, which runs CREATE EXTENSION IF NOT EXISTS vector; before Base.metadata.create_all. pgvector-rag-ingestion documents that bootstrap; this page documents what runs after bootstrap on every PDF ingest.
How
Architecture: who calls what
┌─────────────────┐ ┌──────────────────┐ ┌──────────────┐
│ ingest.py │──▶ │ RAGService │──▶ │ pgvector │
│ (one-shot CLI) │ │ .ingest_pdf() │ │ faqs table │
└─────────────────┘ └──────────────────┘ └──────────────┘
▲
│
┌─────────────────┐ ┌──────┴────────┐ ┌──────────────┐
│ /api/v1/admin/ │──▶ │ Celery worker │──▶ │ HF Embedding │
│ upload-bg │ │ ingest_pdf_ │ │ + PyPDF2 + │
│ (POST) │ │ task │ │ Recursive… │
└─────────────────┘ └───────────────┘ └──────────────┘Three call paths funnel into one production-critical method. That’s why the service surface is small (one method, one return type) but the body is careful.
High-level ingest_pdf algorithm
class RAGService:
def __init__(self) -> None:
self._session_factory = AsyncSessionLocal
self._embedder = self._init_embedder() # HF- or local-only
self._vector_dim = settings.rag_embedding_dim # default 384
async def ingest_pdf(self, file_path: str) -> int:
# 1. PDF → list[str] (one string per page)
pages = self._read_pdf(file_path)
# 2. Flatten + chunk with overlap
chunks = self._chunk_pages(pages)
# 3. Embed in a tight loop. Local mode keeps everything in-process;
# hosted HF mode makes an HTTP call per chunk.
vectors = [self._embedder.encode(c.text) for c in chunks]
# 4. Bulk INSERT into faq (or document_chunks), one row per chunk.
async with self._session_factory() as session:
await session.execute(
text("""
INSERT INTO document_chunks
(source, page, content, embedding)
VALUES
(:src, :page, :content, :embedding::vector)
"""),
[{"src": c.source, "page": c.page,
"content": c.text, "embedding": v}
for c, v in zip(chunks, vectors)],
)
await session.commit()
# 5. Return chunk count — never raise on a partial failure.
return len(chunks)Two things to internalise before reading the line-by-line walkthrough:
- The service is a single class with no public state, just one async method. All infrastructure (session, embedder, splitter) is set up in
__init__. This is theservice-layer-async-sessionpattern; theCustomerServiceetc. follow the same shape. - HF_TOKEN is the only knob that switches between fully-local and HTTP-backed embedding generation. If
HF_TOKENis unset and the model is hosted (e.g.,thenlper/gte-large), the service prints a warning and either falls through to a local model or fails loudly — depending on configuration. The CLI operator is responsible for picking a model that matches theirHF_TOKENavailability.
HuggingFace token: where it’s read, what it gates
HF_TOKEN is read directly from the environment, never hardcoded and never stored in Settings:
import os
HF_TOKEN = os.getenv("HF_TOKEN") # optional; absence is non-fatalWhy not put it in app/config.py’s Settings? Two reasons:
HF_TOKENis rotation-friendly — operators can swap keys without restarting the FastAPI app, by passing new tokens to the worker container viadocker compose up --env-file. Pydantic Settings would freeze it at import time.- It is only needed by the worker service (
docker-compose.yml’sworker:block) and byingest.py. Thewebservice uses HuggingFace only in the agent path vialangfuse-llm-tracing-decorator, which doesn’t need the token unless the user explicitly opts in to hosted HF inference.
What the token gates in RAGService.__init__:
| Scenario | Behaviour without HF_TOKEN |
|---|---|
self._embedder = SentenceTransformerEmbeddings("sentence-transformers/all-MiniLM-L6-v2") | ✅ works — sentence-transformers downloads the model on first use, no token needed |
self._embedder = HuggingFaceHubEmbeddings(model="thenlper/gte-large") | ❌ fails — InferenceClient requires auth |
self._embedder = HuggingFaceEndpointEmbeddings("https://…") | ⚠️ depends on endpoint privacy |
So the gating is model-class-driven, not token-driven. The docstring on _init_embedder() should list which model IDs are token-free vs token-required. The current production config uses the token-free local path, which is why missing HF_TOKEN is a yellow warning, not a red error.
Top-to-Bottom Code Walkthrough (app/services/rag_service.py)
The file is roughly 150 lines and split into four sections — imports/constructor, _read_pdf, _chunk_pages, ingest_pdf. Walking top to bottom, in the same order you’d read them.
Imports and module-level constants
import os
import hashlib
from dataclasses import dataclass
from typing import List
from sqlalchemy import text
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
import PyPDF2 # PDF text extraction
# Embedding layer: HuggingFace side, NOT langchain-groq.
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
# Local model fallback (no token required):
from sentence_transformers import SentenceTransformer
from app.config import settings
from app.database import AsyncSessionLocalPyPDF2 is the simplest PDF text extractor that works for the FCA’s machine-generated PDFs. If you swap to image-only / scanned PDFs you’d add pypdfium2 or pdfplumber (and an OCR step with pytesseract); the pipeline otherwise stays identical. The HuggingFace embedding layer is intentionally NOT cross-wired with the Groq layer (langchain-groq) — the architecture is “embeddings from HF, completions from Groq” so the two providers’ rate limits and quotas don’t compete.
@dataclass Chunk at the top carries one chunk plus the metadata the persistence layer needs:
@dataclass
class Chunk:
text: str # the chunked text
source: str # original PDF filename
page: int # 1-indexed page number
chunk_id: str # hash(text + page) — used for idempotencyThe chunk_id is what makes re-ingestion safe: if you re-run ingest_pdf on the same PDF, the INSERT path checks for chunk_id collisions before writing.
Constructor: __init__
class RAGService:
def __init__(self, *, embedder_model: str | None = None) -> None:
# Session factory — not a fresh session, the factory.
# ingest_pdf opens its own session per call.
self._session_factory: async_sessionmaker[AsyncSession] = AsyncSessionLocal
# Pick embedder; default is the local 384-dim sentence-transformers.
model_name = embedder_model or settings.rag_embedding_model
# DEFAULT: "sentence-transformers/all-MiniLM-L6-v2" (384 dims, local-only)
self._embedder = self._build_embedder(model_name)
# The pgvector column type MUST match the model's output dim.
self._vector_dim = self._detect_dim(model_name)
# Stash the column type for safety checks on every insert.
if settings.rag_embedding_dim and settings.rag_embedding_dim != self._vector_dim:
raise RuntimeError(
f"Configured dim {settings.rag_embedding_dim} does not match "
f"model {model_name} dim {self._vector_dim}. "
f"This will fail at INSERT time with a vector dim mismatch."
)Three points worth surfacing:
AsyncSessionLocalis the factory, not a session. The constructor does not open a connection.ingest_pdfopens its own connection per call insideasync with. This matchesapp/database.py’sasync_sessionmakerpattern and ensures no connection leaks across multiple ingest calls.embedder_modeldefaults tosettings.rag_embedding_modelso operators can override per-environment (e.g., a 768-dimall-mpnet-base-v2in dev for higher-fidelity tests).- The dim-mismatch check runs at startup. It’s better to fail at construction time than to wait until the first
INSERT ... ::vectorand then get an opaqueexpected 384 dimensions, got 768from pgvector. The check bypasses ifsettings.rag_embedding_dim is None(dev convenience).
_build_embedder — choosing the HuggingFace backend
@staticmethod
def _build_embedder(model_name: str):
name = model_name.lower()
# LOCAL path — no token, no internet (after first download).
if name.startswith("sentence-transformers/"):
return SentenceTransformerEmbeddings(model_name=model_name)
# HF INFERENCE API path — token required, hosted HTTP each chunk.
if name.startswith("hf-"):
# Strip "hf-" prefix to get the actual repo id.
repo_id = name[3:]
return HuggingFaceHubEmbeddings(
repo_id=repo_id,
huggingfacehub_api_token=os.getenv("HF_TOKEN"),
)
raise ValueError(
f"Unknown embedder scheme for {model_name!r}. "
f"Use 'sentence-transformers/<id>' for local or 'hf-<repo-id>' for hosted."
)The branch logic is at a name.startswith(...) selector, not a magic module attribute. Two reasons:
- It surfaces dependency misconfiguration loudly — if someone passes
"text-embedding-ada-002"(OpenAI), theraise ValueErrorfires immediately instead of silently falling into aNoneembedder. huggingfacehub_api_token=Nonein production does NOT raise on construction — it raises on the first.embed_query(...)call. That’s the right time to fail (you’ve at least read the model name); but if you want fail-fast, wrap construction in a try/except at the call site.
_detect_dim (used by __init__) is a simple lookup against a hardcoded table:
KNOWN_DIMS = {
"sentence-transformers/all-MiniLM-L6-v2": 384,
"sentence-transformers/all-mpnet-base-v2": 768,
"thenlper/gte-large": 1024,
"BAAI/bge-small-en-v1.5": 384,
"BAAI/bge-large-en-v1.5": 1024,
}For any model not in this table, the constructor attempts a one-time probe embedding to read the dim. Don’t ship the probe in production — pin your model’s dim in the table.
_read_pdf — page extraction with PyPDF2
@staticmethod
def _read_pdf(file_path: str) -> List[str]:
if not os.path.exists(file_path):
raise FileNotFoundError(file_path)
reader = PyPDF2.PdfReader(file_path)
pages: List[str] = []
for i, page in enumerate(reader.pages, start=1):
# PyPDF2 returns "" for scanned pages — caller has to OCR.
text = page.extract_text() or ""
pages.append(text)
return pagesThree intentional choices:
raise FileNotFoundErrorearly instead of returning(False, [...])— operators want a stack trace, not a chunk count of 0.page.extract_text() or ""—PyPDF2returnsNonefor pages without a text layer (scans). The default empty string lets chunking proceed without crashing, but the resulting embeddings will be unusable. Production scans should be pre-OCR’d withpdf2image+pytesseract.- Returns
List[str](one per page) — chunking happens in a separate method so_read_pdfstays easy to test.
_chunk_pages — recursive character splitter with overlap
@staticmethod
def _chunk_pages(pages: List[str], source_path: str) -> List[Chunk]:
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # tokens approximated by chars/4
chunk_overlap=50, # 10% — straddles token boundaries
separators=["\n\n", "\n", ". ", " "],
length_function=len,
)
chunks: List[Chunk] = []
for page_idx, text in enumerate(pages, start=1):
for piece in splitter.split_text(text):
if not piece.strip():
continue
tid = hashlib.sha256(
f"{source_path}|{page_idx}|{piece}".encode("utf-8")
).hexdigest()[:16]
chunks.append(Chunk(text=piece, source=source_path,
page=page_idx, chunk_id=tid))
return chunksRecursiveCharacterTextSplitter is the langchain default because it chunks on semantic boundaries first (\n\n → \n → . → space) rather than pure character count. The 500/50 split is the langchain-recommended ratio; tune up for technical content (longer contiguous explanation) or down for chat content (smaller messages).
The chunk_id is the idempotency key. sha256(source + page + text)[:16] is collision-resistant enough for de-duping. The INSERT later checks ON CONFLICT (chunk_id) DO NOTHING to make re-ingestion safe — see ingest_pdf’s SQL.
Why dedupe at chunk level instead of whole-PDF level? Because if just one page is edited, you can re-run ingestion and only that page’s chunks get rewritten; the rest are no-op’d by the constraint. This is what makes partial updates cheap.
_embed_chunks — HuggingFace embed call per chunk
async def _embed_chunks(self, chunks: List[Chunk]) -> List[List[float]]:
# sentence-transformers' .encode() is sync inside process pool —
# we run it on a worker thread to keep the event loop responsive.
loop = asyncio.get_running_loop()
vectors: List[List[float]] = []
for c in chunks:
vec = await loop.run_in_executor(
None, # default ThreadPoolExecutor
self._embedder.embed_query, # HF calls this; sentence-transformers too
c.text,
)
vectors.append(vec)
return vectorsTwo architectural decisions worth documenting:
run_in_executorfor the embed call —SentenceTransformer.encode()andHuggingFaceHubEmbeddings.embed_query()are both sync. Calling them directly inside anasync defwould block the event loop for ~50-200 ms per chunk. With a PDF producing 200 chunks, that’s 10-40 seconds of event loop stall — fatal for any concurrent traffic (like/chat/streamSSE consumers). The executor pushes the call to a thread, freeing the loop.- Sequential embedding, not batched —
embed_queryis per-call. If you switch toself._embedder.embed_documents([c.text for c in chunks])you get ~5-10× throughput but you lose progress visibility. The current code keeps sequential + per-chunk logging for operational clarity (you see exactly which chunk failed).
ingest_pdf — the public method that ties it together
async def ingest_pdf(self, file_path: str) -> int:
"""Read a PDF, chunk it, embed it, persist to pgvector, return chunk count."""
logger.info(f"[RAG] Starting ingest: {file_path}")
# 1. PDF → pages
pages = self._read_pdf(file_path)
logger.info(f"[RAG] Extracted {len(pages)} pages from {file_path}")
# 2. Flatten + chunk
chunks = self._chunk_pages(pages, source_path=os.path.basename(file_path))
logger.info(f"[RAG] Generated {len(chunks)} chunks")
# 3. Embed (sequential, in executor)
try:
vectors = await self._embed_chunks(chunks)
except Exception as e:
logger.error(f"[RAG] Embedding failed for {file_path}: {e}")
# Partial state: do NOT commit; fail loudly.
raise
# 4. Bulk INSERT with idempotency
async with self._session_factory() as session:
try:
await session.execute(
text("""
INSERT INTO document_chunks
(chunk_id, source, page, content, embedding)
VALUES
(:chunk_id, :source, :page, :content, :embedding::vector)
ON CONFLICT (chunk_id) DO NOTHING
"""),
[
{
"chunk_id": c.chunk_id,
"source": c.source,
"page": c.page,
"content": c.text,
"embedding": v,
}
for c, v in zip(chunks, vectors)
],
)
await session.commit()
except Exception as e:
await session.rollback()
logger.error(f"[RAG] DB insert failed for {file_path}: {e}")
raise
logger.info(
f"[RAG] Ingest complete: {file_path} → {len(chunks)} chunks"
)
return len(chunks)Things to notice in order:
logger.info(...)at every transition —[RAG]prefix lets you grep logs across Celery worker stdout, FastAPI SSE journal, and the CLI run. Theapp/logger.pyJSON formatter (covered instructured-logging-with-contextvar) preserves the prefix in production.- Embedding failure raises before the DB session is opened — by the time
async with self._session_factory() as session:runs, you already know the vectors. No partial inserts allowed. ON CONFLICT (chunk_id) DO NOTHINGis the idempotency guarantee. Re-running on the same PDF is a no-op for unchanged chunks. (Thechunk_idcolumn needs aUNIQUEconstraint at the schema layer; see the FAQ model.)embedding::vectorcasts the Python[float]list into the pgvectorvector(384)type. Without the cast, you getcolumn "embedding" is of type vector but expression is of type text.return len(chunks)— returns whole chunks, not newly inserted chunks. If the operator runsingest.pytwice, they’ll see “200 chunks” both times, even though the second run inserted 0. To get “newly inserted chunks”, query the inserted rows from the cursor. This is a known UX wart documented in the pitfalls.- No retry on embedding failure — if
embed_queryraises once (transient HF outage, network blip), the whole ingestion aborts. Celery’s retry semantics on the calling task will re-run. Don’t add retry here — let the higher layer own retries.
Wiring it all up: HF_TOKEN and Docker
The worker service in docker-compose.yml sets HF_TOKEN only on the worker container — not on the web service:
worker:
environment:
DATABASE_URL: postgresql+asyncpg://...
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/0
HF_TOKEN: ${HF_TOKEN} # <-- only hereWhy not also on web? Because the web service uses HuggingFace only through the @observe Langfuse decorator for chat-side LLM calls (the LangGraph agents don’t run on web — they run on the coordinator). The HF_TOKEN is for embedding-side calls, which only happen during ingestion. If you eventually add agent-side HF retrieval re-ranking on web, add the env var there too.
Why these choices
- HuggingFace embeddings + Groq chat: HF is the de facto OSS standard for sentence embeddings; Groq is fast/cheap for generation. Picking one provider for both means you accept that provider’s rate limits for both RAG and chat — splitting lets you grow them independently. Tradeoff: SDK surface doubles (you maintain HF + Groq client code).
sentence-transformers/all-MiniLM-L6-v2for default: 384 dims ispgvector’s sweet spot — smaller than 768 cuts storage ~half and search time ~third. The quality drop from MiniLM → mpnet is ~5% on standard retrieval benchmarks; for FCA FAQ-style span retrieval, MiniLM-L6 is sufficient.ON CONFLICT (chunk_id) DO NOTHINGover explicit dedupe: Postgres-native, runs in the same transaction, and thechunk_idindex serves as the dedup index for free. Don’t write a Python-sideif exists: skiploop — that’s racy and slow.RecursiveCharacterTextSplitteroverCharacterTextSplitter: tries\n\n→\n→.→ space in order, which matches how humans organise documents. Pure character splitting breaks paragraphs mid-sentence.run_in_executorper embed call over batching: gives you per-chunk error visibility. If a single chunk fails, you know exactly which one. The 5-10× batching speedup is a real cost — you’ll feel it on 500+ chunk PDFs.
Common Pitfalls
Reading len(chunks) and inferring “I just inserted N rows”. Re-ingestion is idempotent, so the count returned is all chunks even when the ON CONFLICT clause skipped them. To get a true insert count, run a follow-up SELECT COUNT(*) FROM document_chunks WHERE source = :s after the call, or expose a RETURNING clause on the INSERT and count the cursor rows.
Setting HF_TOKEN on web but not worker (or vice versa). The token has different effects depending on which service reads it. If your agents want hosted HF retrieval re-ranking on web, set it there too. If only ingestion uses hosted HF, keep it scoped to the worker.
Using sentence-transformers/all-mpnet-base-v2 (768 dims) but the pgvector column is vector(384). pgvector refuses mismatched dims with expected 384 dimensions, not 768. The _detect_dim startup check catches this if you’re using a hardcoded model name — but if you let the model name change at runtime via env var without a corresponding column migration, you get a runtime error per INSERT.
Choosing an embedder factory path that calls the HF Inference API without HF_TOKEN. HuggingFaceHubEmbeddings(repo_id=..., huggingfacehub_api_token=None) raises on every call. Surface this at service startup by attempting a probe embed in __init__ if you want fail-fast behaviour.
Forgetting the ::vector cast in the INSERT SQL. Without it, you get column "embedding" is of type vector but expression is of type text even though the bind is a Python list[float]. SQLAlchemy binds typed parameters based on the column type — but text(...) doesn’t infer types, so you must cast explicitly.
Splitting on . (period space) but not handling line breaks inside lists — your Q&A chunks end up cutting a bullet list in half. Test against real FCA FAQ content; tune separators if needed.
Replacing PyPDF2 with pdfplumber then forgetting to update the _read_pdf signature — pdfplumber.PDF.open(...).pages[i].extract_text() returns the same thing but the import path changes. Keep _read_pdf thin so the swap is one line.
Embedding in a tight sync loop without run_in_executor. A 200-chunk PDF blocks the event loop for 10-40 seconds. The /chat/stream SSE endpoint will time out and the entire FastAPI app becomes unresponsive. Always use run_in_executor or batched async via embed_documents.
Treating the chunk count as a quality signal. A higher chunk count doesn’t mean better coverage — it usually means smaller chunks (more overlap) and noisier retrieval. Pick the chunk size that maximises retrieval precision, not the one that maximises chunk count.
Real-World Interview Prep
Q1: Why embed with HuggingFace and chat with Groq? When would you unify them on one provider?
A: The split is bandwidth and quota management. HuggingFace’s free tier covers > 1M embedding requests/day but their hosted inference rate-limits harshly on chat models. Groq’s LPU is 10× faster than hosted HF inference for chat, but their embedding endpoints are a single called-and-forgotten product. By splitting: ingest happens on the cheap/free provider (HF), answer generation happens on the low-latency provider (Groq), and neither’s quota governs the other. Unify when (a) your corpus is so small you barely hit either’s free tier — pick whichever SDK is fewer lines; (b) you’re already paying for an enterprise OpenAI or Anthropic key and bundle pricing matters more than quota isolation; (c) latency from one provider is best-in-class at a fixed monthly cost. The tradeoff of unified providers: a single bad day on the upstream provider kills both ingest-path and answer-path. Two-provider gives you blast-radius isolation at the cost of more SDK surface.
Q2: How would you keep embeddings fresh when the source PDF changes?
A: Three layers. (1) Content-hash layer: store sha256(file_path + mtime) somewhere (e.g., a ingest_runs(source, content_hash, started_at, chunk_count) table). On re-run, skip if content_hash matches the last successful run. (2) Chunk-idempotency layer: every chunk has its chunk_id = sha256(source|page|text)[:16]. Re-runs of an unchanged PDF produce zero new INSERTs (ON CONFLICT DO NOTHING). (3) Diff-then-patch layer: compute which pages have changed (PDF byte-level diff or git-style dedup on the raw bytes), then call ingest_pdf with a page-range filter. The third layer requires RAGService to accept a pages: range | None parameter so the same code path can re-ingest one page without re-scanning the whole PDF. In production most teams stop at layer (2) because it handles 90% of the “I re-uploaded the same PDF” case with zero ops overhead.
Q3: How do you add idempotency to a long-running ingest that’s currently mid-flight?
A: Two patterns. (1) Cooperative checkpoint: in ingest_pdf, after every N chunks (e.g., every 50), update a ingest_progress(source, last_chunk_id, completed_at) row. If the worker is killed mid-run and Celery retries, the re-run starts from last_chunk_id based on the progress row. Tradeoff: progress-row write itself becomes a hot path. (2) Bitemporal versioning: replace ON CONFLICT (chunk_id) DO NOTHING with ON CONFLICT (chunk_id) DO UPDATE SET version = document_chunks.version + 1, embedded_at = now(). Every ingestion becomes a new version row; queries can opt into “latest version only” via SELECT DISTINCT ON (chunk_id) .... Tradeoff: storage grows linearly with re-ingestions; requires a periodic prune job. For the FCA the cooperative-checkpoint pattern fits better — partial inserts cost little and the chunk_id idempotency already covers re-runs from clean state.