Skip to Content
DatabaseRAGService ingest_pdf End-to-End

RAGService ingest_pdf End-to-End

What

A single async service class that turns a path to a PDF on disk into an integer chunk count written to PostgreSQL via pgvector. The service owns the HuggingFace-embedding-gated PDF pipeline from page extraction, through text chunking, through vector embedding, through INSERT ... :embedding::vector persistence, to a commit-and-return.

RAGService is the only place in the codebase that knows how to produce a chunk into pgvector end-to-end. It is called from three call-sites:

  1. The one-shot CLI adapter ingest.py for manual / first-time ingestion.
  2. The Celery worker task app/worker.py::ingest_pdf_task for background PDF uploads.
  3. The HTTP upload endpoint /api/v1/admin/upload-background (which fans out into the Celery task above).

Project Context

In full_project_context_updated.txt, the FCA Support Agent connects four artifacts around one service:

  • ingest.pyawait rag.ingest_pdf("data/FCA faqs.pdf"); prints the chunk count. The simplest contract in the codebase.
  • app/worker.py — wraps RAGService().ingest_pdf(file_path) in Celery’s sync-by-async bridge, returning {"status": "success", "chunks": chunks_count}.
  • docker-compose.yml — the worker service declares HF_TOKEN: ${HF_TOKEN} in its environment alongside DATABASE_URL and CELERY_BROKER_URL. Without HF_TOKEN set, hosted HuggingFace Inference API endpoints fall back to a warning; locally-bundled sentence-transformers still work because they don’t call the API.
  • requirements.txt — pins the three RAG-critical packages: PyPDF2 (PDF text extraction), huggingface_hub>=0.21.0 (library side, not the sentence-transformers model itself), and langchain-groq (chat completions is on the other stack — Groq is for LLM output, HF is for embeddings). This split is deliberate and is the architecture this page documents.

The pgvector column itself is enabled by init_db() in app/database.py, which runs CREATE EXTENSION IF NOT EXISTS vector; before Base.metadata.create_all. pgvector-rag-ingestion documents that bootstrap; this page documents what runs after bootstrap on every PDF ingest.

How

Architecture: who calls what

┌─────────────────┐ ┌──────────────────┐ ┌──────────────┐ │ ingest.py │──▶ │ RAGService │──▶ │ pgvector │ │ (one-shot CLI) │ │ .ingest_pdf() │ │ faqs table │ └─────────────────┘ └──────────────────┘ └──────────────┘ ┌─────────────────┐ ┌──────┴────────┐ ┌──────────────┐ │ /api/v1/admin/ │──▶ │ Celery worker │──▶ │ HF Embedding │ │ upload-bg │ │ ingest_pdf_ │ │ + PyPDF2 + │ │ (POST) │ │ task │ │ Recursive… │ └─────────────────┘ └───────────────┘ └──────────────┘

Three call paths funnel into one production-critical method. That’s why the service surface is small (one method, one return type) but the body is careful.

High-level ingest_pdf algorithm

class RAGService: def __init__(self) -> None: self._session_factory = AsyncSessionLocal self._embedder = self._init_embedder() # HF- or local-only self._vector_dim = settings.rag_embedding_dim # default 384 async def ingest_pdf(self, file_path: str) -> int: # 1. PDF → list[str] (one string per page) pages = self._read_pdf(file_path) # 2. Flatten + chunk with overlap chunks = self._chunk_pages(pages) # 3. Embed in a tight loop. Local mode keeps everything in-process; # hosted HF mode makes an HTTP call per chunk. vectors = [self._embedder.encode(c.text) for c in chunks] # 4. Bulk INSERT into faq (or document_chunks), one row per chunk. async with self._session_factory() as session: await session.execute( text(""" INSERT INTO document_chunks (source, page, content, embedding) VALUES (:src, :page, :content, :embedding::vector) """), [{"src": c.source, "page": c.page, "content": c.text, "embedding": v} for c, v in zip(chunks, vectors)], ) await session.commit() # 5. Return chunk count — never raise on a partial failure. return len(chunks)

Two things to internalise before reading the line-by-line walkthrough:

  1. The service is a single class with no public state, just one async method. All infrastructure (session, embedder, splitter) is set up in __init__. This is the service-layer-async-session pattern; the CustomerService etc. follow the same shape.
  2. HF_TOKEN is the only knob that switches between fully-local and HTTP-backed embedding generation. If HF_TOKEN is unset and the model is hosted (e.g., thenlper/gte-large), the service prints a warning and either falls through to a local model or fails loudly — depending on configuration. The CLI operator is responsible for picking a model that matches their HF_TOKEN availability.

HuggingFace token: where it’s read, what it gates

HF_TOKEN is read directly from the environment, never hardcoded and never stored in Settings:

import os HF_TOKEN = os.getenv("HF_TOKEN") # optional; absence is non-fatal

Why not put it in app/config.py’s Settings? Two reasons:

  1. HF_TOKEN is rotation-friendly — operators can swap keys without restarting the FastAPI app, by passing new tokens to the worker container via docker compose up --env-file. Pydantic Settings would freeze it at import time.
  2. It is only needed by the worker service (docker-compose.yml’s worker: block) and by ingest.py. The web service uses HuggingFace only in the agent path via langfuse-llm-tracing-decorator, which doesn’t need the token unless the user explicitly opts in to hosted HF inference.

What the token gates in RAGService.__init__:

ScenarioBehaviour without HF_TOKEN
self._embedder = SentenceTransformerEmbeddings("sentence-transformers/all-MiniLM-L6-v2")✅ works — sentence-transformers downloads the model on first use, no token needed
self._embedder = HuggingFaceHubEmbeddings(model="thenlper/gte-large")❌ fails — InferenceClient requires auth
self._embedder = HuggingFaceEndpointEmbeddings("https://…")⚠️ depends on endpoint privacy

So the gating is model-class-driven, not token-driven. The docstring on _init_embedder() should list which model IDs are token-free vs token-required. The current production config uses the token-free local path, which is why missing HF_TOKEN is a yellow warning, not a red error.

Top-to-Bottom Code Walkthrough (app/services/rag_service.py)

The file is roughly 150 lines and split into four sections — imports/constructor, _read_pdf, _chunk_pages, ingest_pdf. Walking top to bottom, in the same order you’d read them.

Imports and module-level constants

import os import hashlib from dataclasses import dataclass from typing import List from sqlalchemy import text from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession import PyPDF2 # PDF text extraction # Embedding layer: HuggingFace side, NOT langchain-groq. from langchain.embeddings.huggingface import HuggingFaceEmbeddings # Local model fallback (no token required): from sentence_transformers import SentenceTransformer from app.config import settings from app.database import AsyncSessionLocal

PyPDF2 is the simplest PDF text extractor that works for the FCA’s machine-generated PDFs. If you swap to image-only / scanned PDFs you’d add pypdfium2 or pdfplumber (and an OCR step with pytesseract); the pipeline otherwise stays identical. The HuggingFace embedding layer is intentionally NOT cross-wired with the Groq layer (langchain-groq) — the architecture is “embeddings from HF, completions from Groq” so the two providers’ rate limits and quotas don’t compete.

@dataclass Chunk at the top carries one chunk plus the metadata the persistence layer needs:

@dataclass class Chunk: text: str # the chunked text source: str # original PDF filename page: int # 1-indexed page number chunk_id: str # hash(text + page) — used for idempotency

The chunk_id is what makes re-ingestion safe: if you re-run ingest_pdf on the same PDF, the INSERT path checks for chunk_id collisions before writing.

Constructor: __init__

class RAGService: def __init__(self, *, embedder_model: str | None = None) -> None: # Session factory — not a fresh session, the factory. # ingest_pdf opens its own session per call. self._session_factory: async_sessionmaker[AsyncSession] = AsyncSessionLocal # Pick embedder; default is the local 384-dim sentence-transformers. model_name = embedder_model or settings.rag_embedding_model # DEFAULT: "sentence-transformers/all-MiniLM-L6-v2" (384 dims, local-only) self._embedder = self._build_embedder(model_name) # The pgvector column type MUST match the model's output dim. self._vector_dim = self._detect_dim(model_name) # Stash the column type for safety checks on every insert. if settings.rag_embedding_dim and settings.rag_embedding_dim != self._vector_dim: raise RuntimeError( f"Configured dim {settings.rag_embedding_dim} does not match " f"model {model_name} dim {self._vector_dim}. " f"This will fail at INSERT time with a vector dim mismatch." )

Three points worth surfacing:

  1. AsyncSessionLocal is the factory, not a session. The constructor does not open a connection. ingest_pdf opens its own connection per call inside async with. This matches app/database.py’s async_sessionmaker pattern and ensures no connection leaks across multiple ingest calls.
  2. embedder_model defaults to settings.rag_embedding_model so operators can override per-environment (e.g., a 768-dim all-mpnet-base-v2 in dev for higher-fidelity tests).
  3. The dim-mismatch check runs at startup. It’s better to fail at construction time than to wait until the first INSERT ... ::vector and then get an opaque expected 384 dimensions, got 768 from pgvector. The check bypasses if settings.rag_embedding_dim is None (dev convenience).

_build_embedder — choosing the HuggingFace backend

@staticmethod def _build_embedder(model_name: str): name = model_name.lower() # LOCAL path — no token, no internet (after first download). if name.startswith("sentence-transformers/"): return SentenceTransformerEmbeddings(model_name=model_name) # HF INFERENCE API path — token required, hosted HTTP each chunk. if name.startswith("hf-"): # Strip "hf-" prefix to get the actual repo id. repo_id = name[3:] return HuggingFaceHubEmbeddings( repo_id=repo_id, huggingfacehub_api_token=os.getenv("HF_TOKEN"), ) raise ValueError( f"Unknown embedder scheme for {model_name!r}. " f"Use 'sentence-transformers/<id>' for local or 'hf-<repo-id>' for hosted." )

The branch logic is at a name.startswith(...) selector, not a magic module attribute. Two reasons:

  1. It surfaces dependency misconfiguration loudly — if someone passes "text-embedding-ada-002" (OpenAI), the raise ValueError fires immediately instead of silently falling into a None embedder.
  2. huggingfacehub_api_token=None in production does NOT raise on construction — it raises on the first .embed_query(...) call. That’s the right time to fail (you’ve at least read the model name); but if you want fail-fast, wrap construction in a try/except at the call site.

_detect_dim (used by __init__) is a simple lookup against a hardcoded table:

KNOWN_DIMS = { "sentence-transformers/all-MiniLM-L6-v2": 384, "sentence-transformers/all-mpnet-base-v2": 768, "thenlper/gte-large": 1024, "BAAI/bge-small-en-v1.5": 384, "BAAI/bge-large-en-v1.5": 1024, }

For any model not in this table, the constructor attempts a one-time probe embedding to read the dim. Don’t ship the probe in production — pin your model’s dim in the table.

_read_pdf — page extraction with PyPDF2

@staticmethod def _read_pdf(file_path: str) -> List[str]: if not os.path.exists(file_path): raise FileNotFoundError(file_path) reader = PyPDF2.PdfReader(file_path) pages: List[str] = [] for i, page in enumerate(reader.pages, start=1): # PyPDF2 returns "" for scanned pages — caller has to OCR. text = page.extract_text() or "" pages.append(text) return pages

Three intentional choices:

  1. raise FileNotFoundError early instead of returning (False, [...]) — operators want a stack trace, not a chunk count of 0.
  2. page.extract_text() or ""PyPDF2 returns None for pages without a text layer (scans). The default empty string lets chunking proceed without crashing, but the resulting embeddings will be unusable. Production scans should be pre-OCR’d with pdf2image + pytesseract.
  3. Returns List[str] (one per page) — chunking happens in a separate method so _read_pdf stays easy to test.

_chunk_pages — recursive character splitter with overlap

@staticmethod def _chunk_pages(pages: List[str], source_path: str) -> List[Chunk]: splitter = RecursiveCharacterTextSplitter( chunk_size=500, # tokens approximated by chars/4 chunk_overlap=50, # 10% — straddles token boundaries separators=["\n\n", "\n", ". ", " "], length_function=len, ) chunks: List[Chunk] = [] for page_idx, text in enumerate(pages, start=1): for piece in splitter.split_text(text): if not piece.strip(): continue tid = hashlib.sha256( f"{source_path}|{page_idx}|{piece}".encode("utf-8") ).hexdigest()[:16] chunks.append(Chunk(text=piece, source=source_path, page=page_idx, chunk_id=tid)) return chunks

RecursiveCharacterTextSplitter is the langchain default because it chunks on semantic boundaries first (\n\n\n. → space) rather than pure character count. The 500/50 split is the langchain-recommended ratio; tune up for technical content (longer contiguous explanation) or down for chat content (smaller messages).

The chunk_id is the idempotency key. sha256(source + page + text)[:16] is collision-resistant enough for de-duping. The INSERT later checks ON CONFLICT (chunk_id) DO NOTHING to make re-ingestion safe — see ingest_pdf’s SQL.

Why dedupe at chunk level instead of whole-PDF level? Because if just one page is edited, you can re-run ingestion and only that page’s chunks get rewritten; the rest are no-op’d by the constraint. This is what makes partial updates cheap.

_embed_chunks — HuggingFace embed call per chunk

async def _embed_chunks(self, chunks: List[Chunk]) -> List[List[float]]: # sentence-transformers' .encode() is sync inside process pool — # we run it on a worker thread to keep the event loop responsive. loop = asyncio.get_running_loop() vectors: List[List[float]] = [] for c in chunks: vec = await loop.run_in_executor( None, # default ThreadPoolExecutor self._embedder.embed_query, # HF calls this; sentence-transformers too c.text, ) vectors.append(vec) return vectors

Two architectural decisions worth documenting:

  1. run_in_executor for the embed callSentenceTransformer.encode() and HuggingFaceHubEmbeddings.embed_query() are both sync. Calling them directly inside an async def would block the event loop for ~50-200 ms per chunk. With a PDF producing 200 chunks, that’s 10-40 seconds of event loop stall — fatal for any concurrent traffic (like /chat/stream SSE consumers). The executor pushes the call to a thread, freeing the loop.
  2. Sequential embedding, not batchedembed_query is per-call. If you switch to self._embedder.embed_documents([c.text for c in chunks]) you get ~5-10× throughput but you lose progress visibility. The current code keeps sequential + per-chunk logging for operational clarity (you see exactly which chunk failed).

ingest_pdf — the public method that ties it together

async def ingest_pdf(self, file_path: str) -> int: """Read a PDF, chunk it, embed it, persist to pgvector, return chunk count.""" logger.info(f"[RAG] Starting ingest: {file_path}") # 1. PDF → pages pages = self._read_pdf(file_path) logger.info(f"[RAG] Extracted {len(pages)} pages from {file_path}") # 2. Flatten + chunk chunks = self._chunk_pages(pages, source_path=os.path.basename(file_path)) logger.info(f"[RAG] Generated {len(chunks)} chunks") # 3. Embed (sequential, in executor) try: vectors = await self._embed_chunks(chunks) except Exception as e: logger.error(f"[RAG] Embedding failed for {file_path}: {e}") # Partial state: do NOT commit; fail loudly. raise # 4. Bulk INSERT with idempotency async with self._session_factory() as session: try: await session.execute( text(""" INSERT INTO document_chunks (chunk_id, source, page, content, embedding) VALUES (:chunk_id, :source, :page, :content, :embedding::vector) ON CONFLICT (chunk_id) DO NOTHING """), [ { "chunk_id": c.chunk_id, "source": c.source, "page": c.page, "content": c.text, "embedding": v, } for c, v in zip(chunks, vectors) ], ) await session.commit() except Exception as e: await session.rollback() logger.error(f"[RAG] DB insert failed for {file_path}: {e}") raise logger.info( f"[RAG] Ingest complete: {file_path}{len(chunks)} chunks" ) return len(chunks)

Things to notice in order:

  1. logger.info(...) at every transition[RAG] prefix lets you grep logs across Celery worker stdout, FastAPI SSE journal, and the CLI run. The app/logger.py JSON formatter (covered in structured-logging-with-contextvar) preserves the prefix in production.
  2. Embedding failure raises before the DB session is opened — by the time async with self._session_factory() as session: runs, you already know the vectors. No partial inserts allowed.
  3. ON CONFLICT (chunk_id) DO NOTHING is the idempotency guarantee. Re-running on the same PDF is a no-op for unchanged chunks. (The chunk_id column needs a UNIQUE constraint at the schema layer; see the FAQ model.)
  4. embedding::vector casts the Python [float] list into the pgvector vector(384) type. Without the cast, you get column "embedding" is of type vector but expression is of type text.
  5. return len(chunks) — returns whole chunks, not newly inserted chunks. If the operator runs ingest.py twice, they’ll see “200 chunks” both times, even though the second run inserted 0. To get “newly inserted chunks”, query the inserted rows from the cursor. This is a known UX wart documented in the pitfalls.
  6. No retry on embedding failure — if embed_query raises once (transient HF outage, network blip), the whole ingestion aborts. Celery’s retry semantics on the calling task will re-run. Don’t add retry here — let the higher layer own retries.

Wiring it all up: HF_TOKEN and Docker

The worker service in docker-compose.yml sets HF_TOKEN only on the worker container — not on the web service:

worker: environment: DATABASE_URL: postgresql+asyncpg://... CELERY_BROKER_URL: redis://redis:6379/0 CELERY_RESULT_BACKEND: redis://redis:6379/0 HF_TOKEN: ${HF_TOKEN} # <-- only here

Why not also on web? Because the web service uses HuggingFace only through the @observe Langfuse decorator for chat-side LLM calls (the LangGraph agents don’t run on web — they run on the coordinator). The HF_TOKEN is for embedding-side calls, which only happen during ingestion. If you eventually add agent-side HF retrieval re-ranking on web, add the env var there too.

Why these choices

  • HuggingFace embeddings + Groq chat: HF is the de facto OSS standard for sentence embeddings; Groq is fast/cheap for generation. Picking one provider for both means you accept that provider’s rate limits for both RAG and chat — splitting lets you grow them independently. Tradeoff: SDK surface doubles (you maintain HF + Groq client code).
  • sentence-transformers/all-MiniLM-L6-v2 for default: 384 dims is pgvector’s sweet spot — smaller than 768 cuts storage ~half and search time ~third. The quality drop from MiniLM → mpnet is ~5% on standard retrieval benchmarks; for FCA FAQ-style span retrieval, MiniLM-L6 is sufficient.
  • ON CONFLICT (chunk_id) DO NOTHING over explicit dedupe: Postgres-native, runs in the same transaction, and the chunk_id index serves as the dedup index for free. Don’t write a Python-side if exists: skip loop — that’s racy and slow.
  • RecursiveCharacterTextSplitter over CharacterTextSplitter: tries \n\n\n. → space in order, which matches how humans organise documents. Pure character splitting breaks paragraphs mid-sentence.
  • run_in_executor per embed call over batching: gives you per-chunk error visibility. If a single chunk fails, you know exactly which one. The 5-10× batching speedup is a real cost — you’ll feel it on 500+ chunk PDFs.

Common Pitfalls

Reading len(chunks) and inferring “I just inserted N rows”. Re-ingestion is idempotent, so the count returned is all chunks even when the ON CONFLICT clause skipped them. To get a true insert count, run a follow-up SELECT COUNT(*) FROM document_chunks WHERE source = :s after the call, or expose a RETURNING clause on the INSERT and count the cursor rows.

Setting HF_TOKEN on web but not worker (or vice versa). The token has different effects depending on which service reads it. If your agents want hosted HF retrieval re-ranking on web, set it there too. If only ingestion uses hosted HF, keep it scoped to the worker.

Using sentence-transformers/all-mpnet-base-v2 (768 dims) but the pgvector column is vector(384). pgvector refuses mismatched dims with expected 384 dimensions, not 768. The _detect_dim startup check catches this if you’re using a hardcoded model name — but if you let the model name change at runtime via env var without a corresponding column migration, you get a runtime error per INSERT.

Choosing an embedder factory path that calls the HF Inference API without HF_TOKEN. HuggingFaceHubEmbeddings(repo_id=..., huggingfacehub_api_token=None) raises on every call. Surface this at service startup by attempting a probe embed in __init__ if you want fail-fast behaviour.

Forgetting the ::vector cast in the INSERT SQL. Without it, you get column "embedding" is of type vector but expression is of type text even though the bind is a Python list[float]. SQLAlchemy binds typed parameters based on the column type — but text(...) doesn’t infer types, so you must cast explicitly.

Splitting on . (period space) but not handling line breaks inside lists — your Q&A chunks end up cutting a bullet list in half. Test against real FCA FAQ content; tune separators if needed.

Replacing PyPDF2 with pdfplumber then forgetting to update the _read_pdf signature — pdfplumber.PDF.open(...).pages[i].extract_text() returns the same thing but the import path changes. Keep _read_pdf thin so the swap is one line.

Embedding in a tight sync loop without run_in_executor. A 200-chunk PDF blocks the event loop for 10-40 seconds. The /chat/stream SSE endpoint will time out and the entire FastAPI app becomes unresponsive. Always use run_in_executor or batched async via embed_documents.

Treating the chunk count as a quality signal. A higher chunk count doesn’t mean better coverage — it usually means smaller chunks (more overlap) and noisier retrieval. Pick the chunk size that maximises retrieval precision, not the one that maximises chunk count.

Real-World Interview Prep

Q1: Why embed with HuggingFace and chat with Groq? When would you unify them on one provider?

A: The split is bandwidth and quota management. HuggingFace’s free tier covers > 1M embedding requests/day but their hosted inference rate-limits harshly on chat models. Groq’s LPU is 10× faster than hosted HF inference for chat, but their embedding endpoints are a single called-and-forgotten product. By splitting: ingest happens on the cheap/free provider (HF), answer generation happens on the low-latency provider (Groq), and neither’s quota governs the other. Unify when (a) your corpus is so small you barely hit either’s free tier — pick whichever SDK is fewer lines; (b) you’re already paying for an enterprise OpenAI or Anthropic key and bundle pricing matters more than quota isolation; (c) latency from one provider is best-in-class at a fixed monthly cost. The tradeoff of unified providers: a single bad day on the upstream provider kills both ingest-path and answer-path. Two-provider gives you blast-radius isolation at the cost of more SDK surface.

Q2: How would you keep embeddings fresh when the source PDF changes?

A: Three layers. (1) Content-hash layer: store sha256(file_path + mtime) somewhere (e.g., a ingest_runs(source, content_hash, started_at, chunk_count) table). On re-run, skip if content_hash matches the last successful run. (2) Chunk-idempotency layer: every chunk has its chunk_id = sha256(source|page|text)[:16]. Re-runs of an unchanged PDF produce zero new INSERTs (ON CONFLICT DO NOTHING). (3) Diff-then-patch layer: compute which pages have changed (PDF byte-level diff or git-style dedup on the raw bytes), then call ingest_pdf with a page-range filter. The third layer requires RAGService to accept a pages: range | None parameter so the same code path can re-ingest one page without re-scanning the whole PDF. In production most teams stop at layer (2) because it handles 90% of the “I re-uploaded the same PDF” case with zero ops overhead.

Q3: How do you add idempotency to a long-running ingest that’s currently mid-flight?

A: Two patterns. (1) Cooperative checkpoint: in ingest_pdf, after every N chunks (e.g., every 50), update a ingest_progress(source, last_chunk_id, completed_at) row. If the worker is killed mid-run and Celery retries, the re-run starts from last_chunk_id based on the progress row. Tradeoff: progress-row write itself becomes a hot path. (2) Bitemporal versioning: replace ON CONFLICT (chunk_id) DO NOTHING with ON CONFLICT (chunk_id) DO UPDATE SET version = document_chunks.version + 1, embedded_at = now(). Every ingestion becomes a new version row; queries can opt into “latest version only” via SELECT DISTINCT ON (chunk_id) .... Tradeoff: storage grows linearly with re-ingestions; requires a periodic prune job. For the FCA the cooperative-checkpoint pattern fits better — partial inserts cost little and the chunk_id idempotency already covers re-runs from clean state.

Last updated on