Skip to Content
BackendWorkflowState — Pydantic Schema for LangGraph Shared Memory

WorkflowState — Pydantic Schema for LangGraph Shared Memory

What? (Concept Overview)

WorkflowState is the typed memory that flows between every LangGraph node in the FCA support agent’s MessageWorkflow. Defining it as a Pydantic BaseModel (rather than a dict) gives compile-time field access, validation against the arbitrary_types_allowed=True config (lets you stash ORM objects), and a single source of truth for what state exists between guardrail → classify → agent dispatch → compliance → end.

Project Context

The state schema lives in app/schemas/common.py and is the contract between agent_coordinator.process_message, _node_classify, _node_account_agent, _node_product_recommender, _node_compliance, and the HITL _node_human_approval. The state is also persisted by AsyncPostgresSaver — every checkpoint is a WorkflowState.model_dump() snapshotted into Postgres.

How? (Quick Reference Blocks)

3.1 The Full Schema

# app/schemas/common.py from pydantic import BaseModel, Field, ConfigDict from typing import Optional, Dict, Any, List from datetime import datetime class WorkflowState(BaseModel): """The 'Memory' passed between LangGraph nodes.""" model_config = ConfigDict(arbitrary_types_allowed=True) # === INPUTS === message: str customer_id: int conversation_id: int = 0 history: List[Dict[str, str]] = Field(default_factory=list) context: Dict[str, Any] = Field(default_factory=dict, exclude=True) # === FLOW STATE === intent: Optional[str] = None intent_confidence: float = 0.0 # === AGENT OUTPUTS === agent_type: Optional[str] = None agent_response: Optional[str] = None agent_metadata: Dict[str, Any] = Field(default_factory=dict) confidence: float = 0.0 # === COMPLIANCE === is_compliant: bool = True compliance_check: Optional[str] = None required_disclaimers: List[str] = Field(default_factory=list) # === FINAL === final_response: Optional[Dict[str, Any]] = None class AgentResponse(BaseModel): """Standard output for every agent.""" content: str confidence: float metadata: Dict[str, Any] = Field(default_factory=dict) agent_name: str timestamp: datetime = Field(default_factory=datetime.utcnow)

3.2 Initialising State at Graph Start

# app/workflows/message_workflow.py — typical start initial_state = WorkflowState( message=user_message, customer_id=customer_id, conversation_id=conversation_id, history=fetch_recent_turns(conversation_id, limit=10), ) # Pass into the StateGraph with the thread_id config config = {"configurable": {"thread_id": str(conversation_id)}} final_state = await workflow.ainvoke(initial_state, config=config)

3.3 Mutating State Inside a Node

# app/workflows/message_workflow.py — _node_classify async def _node_classify(self, state: WorkflowState) -> WorkflowState: classifier = IntentClassifierAgent() result = await self.execute_with_retry( classifier.classify, state.message, state.history ) # LangGraph-style node: returns NEW state, not a mutation return state.model_copy(update={ "intent": result["intent"], "intent_confidence": result["confidence"], })

3.4 Adding a Compliance Result to State

# app/workflows/message_workflow.py — _node_compliance async def _node_compliance(self, state: WorkflowState) -> WorkflowState: checker = ComplianceCheckerAgent() outcome = await checker.process( {"content": state.agent_response or ""}, context={"product_type": state.context.get("product_type", "")}, ) return state.model_copy(update={ "is_compliant": outcome.metadata.get("is_compliant", True), "compliance_check": outcome.content if not outcome.metadata["is_compliant"] else None, "required_disclaimers": outcome.metadata.get("required_disclaimers", []), "confidence": outcome.confidence, })

Why? (Parameter Breakdown

  • ConfigDict(arbitrary_types_allowed=True) — Lets you stash non-Pydantic objects (e.g., a SQLAlchemy ORM reference) when you need cross-layer access. Without it, Pydantic refuses to validate, so checkpointing would crash on snapshot time.
  • exclude=True on contextcontext carries transient data that’s massive (ORM objects, raw queries); excluding it from the serialised snapshot keeps the Postgres row compact and prevents ORM-from-Postgres resurrection bugs on resume.
  • Field(default_factory=list) and Field(default_factory=dict) — Mutable defaults MUST be factory-built in Pydantic (and Python generally). Using default=[] would share one list across all instances — silent data corruption waiting to happen.
  • model_copy(update={...}) instead of state.X = Y — Pydantic v2’s BaseModel is immutable by convention (you can mark it frozen=True for enforcement). model_copy(update=...) returns a new model with the updates applied; the original is untouched. Critical for checkpoint consistency: if a node mutates state in-place, the snapshot in the checkpointer sees the new value, and a re-run would replay the same mutation twice.
  • conversation_id: int = 0 default0 is a sentinel for “new conversation”; the API route later rewrites it. Without a default, every backend-call site has to pass it explicitly, which is error-prone.
  • history: List[Dict[str, str]] — LangGraph serialises the dict shape via the checkpointer’s binary codec. Dict of str→str round-trips beautifully across languages (orjson, msgpack) — keep the schema simple.
  • final_response: Optional[Dict[str, Any]] — Holds the rendered API response (message + metadata + escalation_id). Setting None signals “more work to do”; setting a dict signals “stream to UI”.

Common Pitfalls

  1. Mutating state.history.append(...) directly. Pyantic v2 BaseModel is permissive by default but model_copy(update=...) is the contract. Direct mutation works once but corrupts the checkpoint’s reference (AsyncPostgresSaver serialises a deep-copy). Always use model_copy.
  2. Storing Pydantic models inside context without arbitrary_types_allowed=True. Defaults break coverage once you stash the second custom type. Set it once at the class level — beats chasing surprises later.

Real-World Interview Prep

Q1: Why use Pydantic instead of just TypedDict for the state schema?

A: Three reasons. (1) Validation on entry — A TypedDict cannot refuse WorkflowState(message=123); Pydantic raises with a precise error pointing at the wrong type. (2) Default factoriesTypedDict forces you to write if "history" not in state: state["history"] = [] at every reader. Pydantic factories the default once at construction. (3) Runtime inspectionstate.model_dump() gives a serialisable dict; state.model_json_schema() is publishable to API docs. In LangGraph the cost of Pydantic serialization is negligible because LangGraph’s checkpointer works with dict-compatible shapes.

Q2: How do you evolve the WorkflowState schema without breaking older checkpoints?

A: Two strategies. (1) Field additions with defaults — Add new fields with Optional[...] = None. Older checkpoints load as state.new_field = None, gradual rollout. (2) Versioning — Add schema_version: int = 1 and a model_validator(mode="before") that maps old shapes (e.g., state["agent_response"]state["final_response"]). Bump the version on every breaking change; write a migration script. Between the two, (1) is preferred 95% of the time — defaults make nullable everything.

Q3: How would you thread context across ainvoke resumptions?

A: LangGraph persists state.context (because it’s exclude=False here) only on demand. If you want to thread arbitrary context (customer_profile, last_transaction, support_tier), use the LangGraph config field with configurable={"thread_id": ..., "context_key": ...}. The checkpointer doesn’t snapshot config, so cross-resumption requires an explicit “load on resume” pattern: a node reads config["configurable"]["context_key"] and re-fetches from Redis or the DB. Avoid stuffing big ORM objects into state.context — store the id and re-fetch on demand.

Top-to-Bottom Code Walkthrough (app/schemas/common.py + app/workflows/message_workflow.py)

WorkflowState is the shared memory of every LangGraph node. It must be a Pydantic model so (a) every node is type-safe, (b) checkpoint serialisation is automatic, (c) the schema is self-documenting.

app/schemas/common.py

from pydantic import BaseModel, Field from datetime import datetime from typing import Optional, Any class AgentResponse(BaseModel): content: str confidence: float = Field(ge=0.0, le=1.0) metadata: dict = Field(default_factory=dict) timestamp: datetime = Field(default_factory=lambda: datetime.utcnow()) class ComplianceResult(BaseModel): is_compliant: bool reason: Optional[str] = None redacted: bool = False class WorkflowState(BaseModel): # Inputs conversation_id: int customer_id: int message: str history: list[dict] = Field(default_factory=list) context: dict = Field(default_factory=dict) # Routing intent: Optional[str] = None confidence_scores: dict[str, float] = Field(default_factory=dict}") # Agent outputs agent_outputs: dict[str, AgentResponse] = Field(default_factory=dict) selected_agent: Optional[str] = None selected_response: Optional[AgentResponse] = None # Compliance compliance_check: Optional[ComplianceResult] = None # Outputs final_response: Optional[AgentResponse] = None error: Optional[str] = None escalation_required: bool = False escalation_metadata: dict = Field(default_factory=dict)

Why Pydantic v2 in 2024+: it’s 5-50x faster than v1 and supports model_validate(state_dict) for restoring from checkpoints.

Graph usage (app/workflows/message_workflow.py)

from langgraph.graph import StateGraph from app.schemas.common import WorkflowState graph = StateGraph(WorkflowState) async def intent_node(state: WorkflowState) -> WorkflowState: intent, confidence = classifier.classify(state.message) state.intent = intent state.confidence_scores[intent] = confidence return state graph.add_node("intent_classifier", intent_node)

Each node mutates and returns a WorkflowState. LangGraph uses Pydantic to validate the returned object against the schema — passing the wrong shape raises a ValidationError at framework level.

Checkpoint round-trip

# save await checkpointer.aput(config, state.model_dump(), {}) # resume state_dict = await checkpointer.aget(config) state = WorkflowState.model_validate(state_dict)

Pydantic’s model_dump() produces a JSON-serialisable dict. Checkpointed state is round-tripped through SQL.

Why every field has a default

Two reasons: (a) __init__ doesn’t blow up with all-positional kwargs required, and (b) partial state restoration is possible — checkpoint can hold old intent while a new message arrives on resume.

This object is the runtime memory of the full nine-node graph. See Specialist Agent Deep Dives & LangGraph Flow for the per-node mutation examples.

Common Pitfalls

Mutating fields directly without returning the state — LangGraph tracks node-side-effects via the return value. If you mutate state.intent = "X" but return None, LangGraph thinks the node returned nothing.

Putting asyncio.Queue or other non-serialisable types into state.context — breaks the JSON encoder and the checkpoint fails to serialise. Use a string ID in state and look up the actual queue elsewhere.

Making schema too tight (Optional[str] = Field(min_length=200)) breaks partial checkpoint restore. Defaults + non-overlapping optional fields are the right shape.

Real-World Interview Prep

Q1: Why does LangGraph recommend Pydantic for WorkflowState?

A: Three reasons. (1) model_validate(state_dict) is how checkpoints become typed objects; (2) Pydantic raises ValidationError immediately if a node returns a malformed shape; (3) the schema is the single source of truth for what flows between nodes.

Q2: Why store confidence_scores (a dict) instead of just confidence: float?

A: Multi-intent classifiers output scores for every label, not just the highest. The graph’s routing decisions can use the second-highest (loan_inquiry vs credit_card) to break ties when scores are close. A single float hides that information.

Q3: What if a node’s response needs to be a parsed JSON from the LLM that doesn’t fit AgentResponse?

A: Validate the LLM’s raw JSON with AgentResponse.model_validate(raw_json). If the LLM produces malformed JSON, Pydantic raises ValidationError early — the agent wraps the call and returns a graceful error to the user. This is the right place to put “the LLM doesn’t always behave” defences.

Last updated on