Skip to Content
BackendLangGraph Checkpointing with AsyncPostgresSaver

LangGraph Checkpointing with AsyncPostgresSaver

What? (Concept Overview)

LangGraph checkpointing is the persistence layer that lets a StateGraph survive process restarts, resume mid-flight threads, and implement human-in-the-loop interrupts without losing conversational state. AsyncPostgresSaver is the production-grade Postgres-backed implementation of this abstraction, exposed via langgraph.checkpoint.postgres.aio.

Project Context

The FCA Support Agent compiles a StateGraph in app/coordinator/agent_coordinator.py and threads an AsyncPostgresSaver through MessageWorkflow.checkpointer. Conversation identity is bound to LangGraph’s thread_id (the conversation_id), so each chat has an independent checkpointed execution. DDL is run idempotently on first invocation, guarded by a 30-second timeout that surfaces Postgres deadlocks as an operational error rather than silently hanging.

How? (Quick Reference Blocks)

3.1 Connection URL & Setup Guard

The coordinator derives a LangGraph-friendly DSN from the SQLAlchemy DATABASE_URL and runs setup() exactly once per process, with deadlock-aware timeout handling.

# app/coordinator/agent_coordinator.py from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver @property def _checkpointer_url(self) -> str: """Prepare the LangGraph checkpointer connection string.""" # Strip SQLAlchemy driver prefix (asyncpg / psycopg2) so the # raw libpq URL is what AsyncPostgresSaver expects. url = self._settings.database_url.replace("+asyncpg", "").replace( "+psycopg2", "" ) return url async def process_message(self, ...): checkpointer = AsyncPostgresSaver.from_conn_string(self._checkpointer_url) try: await asyncio.wait_for(checkpointer.setup(), timeout=30.0) self._checkpointer_setup_done = True except asyncio.TimeoutError: self.logger.error( "🚨 DATABASE DEADLOCK DETECTED! Another worker is holding " "the checkpointer DDL lock. Aborting to prevent indefinite hang." ) raise

3.2 Wiring the Checkpointer Into a Compiled Graph

The checkpointer instance is injected at compile time so every node transition (add_node / add_edge) writes a checkpoint transparently.

# app/workflows/message_workflow.py self.workflow = self.graph.compile( checkpointer=self.checkpointer, interrupt_before=["human_approval"], # HITL pause point (see HITL page) )

3.3 Thread Identity From conversation_id

Every invocation is namespaced by thread_id so that distinct conversations never share state. The same thread_id is reused across turns to enable resumption.

# app/coordinator/agent_coordinator.py config = { "configurable": { "thread_id": str(conversation_id), } } final_state = await workflow.ainvoke(initial_state, config=config)

3.4 Admin Resumption via aupdate_state

A paused thread is rehydrated by reading its snapshot, mutating state, and re-invoking the graph with the same thread_id.

# app/coordinator/agent_coordinator.py — approve_intervention snapshot = await workflow_wrapper.workflow.aget_state(config) if snapshot.next and "human_approval" in snapshot.next: await workflow_wrapper.workflow.aupdate_state( config, { "agent_response": new_response, "is_compliant": True, "agent_type": "human_admin", }, ) final_state = await workflow_wrapper.workflow.ainvoke(None, config=config)

Why? (Parameter Breakdown)

  • from_conn_string(...) — Factory method accepts a raw libpq URL (no driver prefix). Async variant yields checkpoints via APG_COPY/LISTEN/NOTIFY for low-latency writes.
  • asyncio.wait_for(..., timeout=30.0)checkpointer.setup() runs DDL (CREATE TABLE checkpoints, CREATE INDEX ...). Two web/worker pods starting simultaneously can deadlock on this lock. The explicit timeout surfaces a clean error instead of hanging the request.
  • self._checkpointer_setup_done flag — Avoids re-running DDL on every request. Cheap idempotency layer above the already-idempotent SQL DDL.
  • {"configurable": {"thread_id": ...}} — The only correct namespace for a checkpointer. Using customer_id would cause every conversation for that customer to share state.
  • interrupt_before=[...] at compile time, not invocation time — Lets the same compiled graph support both synchronous (no pause) and HITL flows; pausing is opt-in by node name.
  • aupdate_state(...) before ainvoke(None, ...) — Modifies the persisted checkpoint in place. Passing None as the input signals “resume from where we paused” without re-feeding the original state.

Common Pitfalls

  1. Using MemorySaver in production. MemorySaver works for notebooks and unit tests but cannot survive process death, cannot query historical threads, and has no shared-state semantics across pods. Always switch to AsyncPostgresSaver when shipping.
  2. Forgetting +asyncpg / +psycopg2 strip. AsyncPostgresSaver speaks libpq, not SQLAlchemy. Passing postgresql+asyncpg://... verbatim raises an obscure DSN parse error on first run. Strip the driver prefix before constructing the saver.

Real-World Interview Prep

Q1: How does LangGraph checkpointing interact with horizontal scaling? What is the failure mode if you run two web pods that both call checkpointer.setup() simultaneously?

A: Both pods will attempt to CREATE TABLE checkpoints and CREATE INDEX. Postgres serialises DDL on an ACCESS EXCLUSIVE lock, so whichever pod loses the race blocks until the winner commits. Without a timeout, the loser hangs indefinitely. The production fix is asyncio.wait_for(checkpointer.setup(), timeout=N) followed by a structured error log + retry-with-backoff at the orchestrator level; this matches the agent_coordinator pattern that catches asyncio.TimeoutError, logs 🚨 DATABASE DEADLOCK DETECTED!, and re-raises so Kubernetes reschedules the pod. Alternative approach: gate setup() behind a Postgres advisory lock (pg_try_advisory_lock(<constant>)) so only one process runs DDL per cluster.

Q2: What is the difference between interrupt_before, interrupt_after, and Command(resume=...) in LangGraph, and when would you reach for each?

A: interrupt_before=["node_name"] (compile-time) pauses the graph JUST BEFORE entering the named node. interrupt_after pauses after the named node has executed. Both produce a snapshot where snapshot.next lists the interrupted node; resume with graph.ainvoke(None, config=config). Command(resume=value) is an imperative “fork” used inside a node to inject state from the outside while a graph is running (e.g., dynamic tool approval in the middle of a ReAct node). In production HITL flows, interrupt_before + aupdate_state + ainvoke(None, ...) is the dominant pattern because it (a) requires no code in the paused node and (b) supports audit trails because the snapshot is fully flushed before any human touches state.

Q3: How would you debug a LangGraph thread that “stuck” mid-execution? Walk through the diagnostic steps.

A: (1) Query Postgres: SELECT thread_id, checkpoint_id, metadata FROM checkpoints WHERE thread_id = '<id>' ORDER BY checkpoint_id DESC LIMIT 5; — this gives the last five snapshots in execution order. (2) In Python: snapshot = await workflow.aget_state(config) and inspect snapshot.next (which node is pending), snapshot.values (current state), and snapshot.created_at (when it paused). (3) Cross-reference snapshot.values with the message-log table to find which turn produced the pause. (4) If snapshot.next is empty but the request never returned, the bug is almost always in the node itself (infinite loop, uncaught exception) — pull stderr from the pod. (5) To force-clean: await checkpointer.adelete_thread(config["configurable"]["thread_id"]) drops every checkpoint for that thread and lets the caller start fresh on the next request.

Top-to-Bottom Code Walkthrough (app/coordinator/agent_coordinator.py)

LangGraph’s killer feature is persistent state. Without checkpointing, a crash mid-graph loses every node’s progress. With it, you can resume from any node, time-travel debug, and pause for human review. Let’s see how this is wired in the project.

Imports

  • from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver — the async Postgres-backed checkpointer. Stores every node’s WorkflowState snapshot into a checkpoints table.
  • from langgraph import StateGraph, END — the graph DSL.
  • from app.workflows.message_workflow import build_message_workflow — the assembled graph factory.

_setup_checkpointer() (constructor)

The checkpointer needs a connection string. Tricky bit: it’s synchronous, but the workflow runs in an async event loop. The coordinator uses AsyncPostgresSaver.from_conn_string(DATABASE_URL).

self.checkpointer = AsyncPostgresSaver.from_conn_string( settings.database_url_sync # +asyncpg stripped — see Settings ) await self.checkpointer.setup() # CREATE TABLE checkpoints (idempotent)

The setup() is idempotent: it runs Alembic-equivalent SQL to create the checkpoints table if absent. Safe to call at every restart.

compile()

self.graph = build_message_workflow().compile( checkpointer=self.checkpointer, interrupt_before=["human_agent"], # Pause BEFORE human_agent )

interrupt_before=["human_agent"] is the human-in-the-loop hook: when the graph reaches a route to human_agent, it freezes. The conversation thread continues to accept messages but no LLM call happens until a human releases the checkpoint.

Checkpoint thread IDs

The checkout key is the thread_id — by default, conversation_id:

config = {"configurable": {"thread_id": str(conversation_id)}} output = await self.graph.ainvoke(state, config=config)

Every node execution writes a row into checkpoints(thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata).

Resume semantics

To resume a paused graph:

output = await self.graph.ainvoke(None, config=config) # pass None

The graph reads the last checkpoint from Postgres, restores WorkflowState, and runs the next node only.

Common Pitfalls

Using database_url (with +asyncpg) in from_conn_string — asyncpg is not comprehensible to the synchronous psycopg2 driver the checkpointer uses internally. Always pass settings.database_url_sync.

Forgetting await self.checkpointer.setup() on first run raises relation "checkpoints" does not exist. Run setup once at app startup.

Storing WorkflowState with non-serialisable values (e.g. asyncio.Queue) breaks JSON encoding. Limit state to Pydantic-friendly types.

Real-World Interview Prep

Q1: Why Postgres checkpointing instead of Redis?

A: Postgres holds the whole conversation history (Message table) anyway — keeping checkpoints in the same DB simplifies backup/restore. Redis is excellent for ephemeral state but its persistence story is weaker.

Q2: When would you use interrupt_before vs interrupt_after?

A: interrupt_before=["human_agent"] — pause before running the node, allowing a human to inject context. interrupt_after=["human_agent"] — give the LLM a first attempt, then let a human edit. Pick before when the human owns the response; pick after when the LLM drafts and the human refines.

Q3: What’s the migration story for checkpoint tables?

A: await checkpointer.setup() runs the schema migration automatically — but it’s tied to LangGraph version. In production, pin the version and run setup once per deploy, not per request.

Last updated on