WorkflowState — Pydantic Schema for LangGraph Shared Memory
What? (Concept Overview)
WorkflowState is the typed memory that flows between every LangGraph node in the FCA support agent’s MessageWorkflow. Defining it as a Pydantic BaseModel (rather than a dict) gives compile-time field access, validation against the arbitrary_types_allowed=True config (lets you stash ORM objects), and a single source of truth for what state exists between guardrail → classify → agent dispatch → compliance → end.
Project Context
The state schema lives in app/schemas/common.py and is the contract between agent_coordinator.process_message, _node_classify, _node_account_agent, _node_product_recommender, _node_compliance, and the HITL _node_human_approval. The state is also persisted by AsyncPostgresSaver — every checkpoint is a WorkflowState.model_dump() snapshotted into Postgres.
How? (Quick Reference Blocks)
3.1 The Full Schema
# app/schemas/common.py
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional, Dict, Any, List
from datetime import datetime
class WorkflowState(BaseModel):
"""The 'Memory' passed between LangGraph nodes."""
model_config = ConfigDict(arbitrary_types_allowed=True)
# === INPUTS ===
message: str
customer_id: int
conversation_id: int = 0
history: List[Dict[str, str]] = Field(default_factory=list)
context: Dict[str, Any] = Field(default_factory=dict, exclude=True)
# === FLOW STATE ===
intent: Optional[str] = None
intent_confidence: float = 0.0
# === AGENT OUTPUTS ===
agent_type: Optional[str] = None
agent_response: Optional[str] = None
agent_metadata: Dict[str, Any] = Field(default_factory=dict)
confidence: float = 0.0
# === COMPLIANCE ===
is_compliant: bool = True
compliance_check: Optional[str] = None
required_disclaimers: List[str] = Field(default_factory=list)
# === FINAL ===
final_response: Optional[Dict[str, Any]] = None
class AgentResponse(BaseModel):
"""Standard output for every agent."""
content: str
confidence: float
metadata: Dict[str, Any] = Field(default_factory=dict)
agent_name: str
timestamp: datetime = Field(default_factory=datetime.utcnow)3.2 Initialising State at Graph Start
# app/workflows/message_workflow.py — typical start
initial_state = WorkflowState(
message=user_message,
customer_id=customer_id,
conversation_id=conversation_id,
history=fetch_recent_turns(conversation_id, limit=10),
)
# Pass into the StateGraph with the thread_id config
config = {"configurable": {"thread_id": str(conversation_id)}}
final_state = await workflow.ainvoke(initial_state, config=config)3.3 Mutating State Inside a Node
# app/workflows/message_workflow.py — _node_classify
async def _node_classify(self, state: WorkflowState) -> WorkflowState:
classifier = IntentClassifierAgent()
result = await self.execute_with_retry(
classifier.classify, state.message, state.history
)
# LangGraph-style node: returns NEW state, not a mutation
return state.model_copy(update={
"intent": result["intent"],
"intent_confidence": result["confidence"],
})3.4 Adding a Compliance Result to State
# app/workflows/message_workflow.py — _node_compliance
async def _node_compliance(self, state: WorkflowState) -> WorkflowState:
checker = ComplianceCheckerAgent()
outcome = await checker.process(
{"content": state.agent_response or ""},
context={"product_type": state.context.get("product_type", "")},
)
return state.model_copy(update={
"is_compliant": outcome.metadata.get("is_compliant", True),
"compliance_check": outcome.content if not outcome.metadata["is_compliant"] else None,
"required_disclaimers": outcome.metadata.get("required_disclaimers", []),
"confidence": outcome.confidence,
})Why? (Parameter Breakdown
ConfigDict(arbitrary_types_allowed=True)— Lets you stash non-Pydantic objects (e.g., a SQLAlchemy ORM reference) when you need cross-layer access. Without it, Pydantic refuses to validate, so checkpointing would crash on snapshot time.exclude=Trueoncontext—contextcarries transient data that’s massive (ORM objects, raw queries); excluding it from the serialised snapshot keeps the Postgres row compact and prevents ORM-from-Postgres resurrection bugs on resume.Field(default_factory=list)andField(default_factory=dict)— Mutable defaults MUST be factory-built in Pydantic (and Python generally). Usingdefault=[]would share one list across all instances — silent data corruption waiting to happen.model_copy(update={...})instead ofstate.X = Y— Pydantic v2’sBaseModelis immutable by convention (you can mark itfrozen=Truefor enforcement).model_copy(update=...)returns a new model with the updates applied; the original is untouched. Critical for checkpoint consistency: if a node mutates state in-place, the snapshot in the checkpointer sees the new value, and a re-run would replay the same mutation twice.conversation_id: int = 0default —0is a sentinel for “new conversation”; the API route later rewrites it. Without a default, every backend-call site has to pass it explicitly, which is error-prone.history: List[Dict[str, str]]— LangGraph serialises the dict shape via the checkpointer’s binary codec. Dict ofstr→strround-trips beautifully across languages (orjson, msgpack) — keep the schema simple.final_response: Optional[Dict[str, Any]]— Holds the rendered API response (message + metadata + escalation_id). Setting None signals “more work to do”; setting a dict signals “stream to UI”.
Common Pitfalls
- Mutating
state.history.append(...)directly. Pyantic v2BaseModelis permissive by default butmodel_copy(update=...)is the contract. Direct mutation works once but corrupts the checkpoint’s reference (AsyncPostgresSaverserialises a deep-copy). Always usemodel_copy. - Storing Pydantic models inside
contextwithoutarbitrary_types_allowed=True. Defaults break coverage once you stash the second custom type. Set it once at the class level — beats chasing surprises later.
Real-World Interview Prep
Q1: Why use Pydantic instead of just TypedDict for the state schema?
A: Three reasons. (1) Validation on entry — A TypedDict cannot refuse WorkflowState(message=123); Pydantic raises with a precise error pointing at the wrong type. (2) Default factories — TypedDict forces you to write if "history" not in state: state["history"] = [] at every reader. Pydantic factories the default once at construction. (3) Runtime inspection — state.model_dump() gives a serialisable dict; state.model_json_schema() is publishable to API docs. In LangGraph the cost of Pydantic serialization is negligible because LangGraph’s checkpointer works with dict-compatible shapes.
Q2: How do you evolve the WorkflowState schema without breaking older checkpoints?
A: Two strategies. (1) Field additions with defaults — Add new fields with Optional[...] = None. Older checkpoints load as state.new_field = None, gradual rollout. (2) Versioning — Add schema_version: int = 1 and a model_validator(mode="before") that maps old shapes (e.g., state["agent_response"] → state["final_response"]). Bump the version on every breaking change; write a migration script. Between the two, (1) is preferred 95% of the time — defaults make nullable everything.
Q3: How would you thread context across ainvoke resumptions?
A: LangGraph persists state.context (because it’s exclude=False here) only on demand. If you want to thread arbitrary context (customer_profile, last_transaction, support_tier), use the LangGraph config field with configurable={"thread_id": ..., "context_key": ...}. The checkpointer doesn’t snapshot config, so cross-resumption requires an explicit “load on resume” pattern: a node reads config["configurable"]["context_key"] and re-fetches from Redis or the DB. Avoid stuffing big ORM objects into state.context — store the id and re-fetch on demand.
Top-to-Bottom Code Walkthrough (app/schemas/common.py + app/workflows/message_workflow.py)
WorkflowState is the shared memory of every LangGraph node. It must be a Pydantic model so (a) every node is type-safe, (b) checkpoint serialisation is automatic, (c) the schema is self-documenting.
app/schemas/common.py
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional, Any
class AgentResponse(BaseModel):
content: str
confidence: float = Field(ge=0.0, le=1.0)
metadata: dict = Field(default_factory=dict)
timestamp: datetime = Field(default_factory=lambda: datetime.utcnow())
class ComplianceResult(BaseModel):
is_compliant: bool
reason: Optional[str] = None
redacted: bool = False
class WorkflowState(BaseModel):
# Inputs
conversation_id: int
customer_id: int
message: str
history: list[dict] = Field(default_factory=list)
context: dict = Field(default_factory=dict)
# Routing
intent: Optional[str] = None
confidence_scores: dict[str, float] = Field(default_factory=dict}")
# Agent outputs
agent_outputs: dict[str, AgentResponse] = Field(default_factory=dict)
selected_agent: Optional[str] = None
selected_response: Optional[AgentResponse] = None
# Compliance
compliance_check: Optional[ComplianceResult] = None
# Outputs
final_response: Optional[AgentResponse] = None
error: Optional[str] = None
escalation_required: bool = False
escalation_metadata: dict = Field(default_factory=dict)Why Pydantic v2 in 2024+: it’s 5-50x faster than v1 and supports model_validate(state_dict) for restoring from checkpoints.
Graph usage (app/workflows/message_workflow.py)
from langgraph.graph import StateGraph
from app.schemas.common import WorkflowState
graph = StateGraph(WorkflowState)
async def intent_node(state: WorkflowState) -> WorkflowState:
intent, confidence = classifier.classify(state.message)
state.intent = intent
state.confidence_scores[intent] = confidence
return state
graph.add_node("intent_classifier", intent_node)Each node mutates and returns a WorkflowState. LangGraph uses Pydantic to validate the returned object against the schema — passing the wrong shape raises a ValidationError at framework level.
Checkpoint round-trip
# save
await checkpointer.aput(config, state.model_dump(), {})
# resume
state_dict = await checkpointer.aget(config)
state = WorkflowState.model_validate(state_dict)Pydantic’s model_dump() produces a JSON-serialisable dict. Checkpointed state is round-tripped through SQL.
Why every field has a default
Two reasons: (a) __init__ doesn’t blow up with all-positional kwargs required, and (b) partial state restoration is possible — checkpoint can hold old intent while a new message arrives on resume.
This object is the runtime memory of the full nine-node graph. See Specialist Agent Deep Dives & LangGraph Flow for the per-node mutation examples.
Common Pitfalls
Mutating fields directly without returning the state — LangGraph tracks node-side-effects via the return value. If you mutate state.intent = "X" but return None, LangGraph thinks the node returned nothing.
Putting asyncio.Queue or other non-serialisable types into state.context — breaks the JSON encoder and the checkpoint fails to serialise. Use a string ID in state and look up the actual queue elsewhere.
Making schema too tight (Optional[str] = Field(min_length=200)) breaks partial checkpoint restore. Defaults + non-overlapping optional fields are the right shape.
Real-World Interview Prep
Q1: Why does LangGraph recommend Pydantic for WorkflowState?
A: Three reasons. (1) model_validate(state_dict) is how checkpoints become typed objects; (2) Pydantic raises ValidationError immediately if a node returns a malformed shape; (3) the schema is the single source of truth for what flows between nodes.
Q2: Why store confidence_scores (a dict) instead of just confidence: float?
A: Multi-intent classifiers output scores for every label, not just the highest. The graph’s routing decisions can use the second-highest (loan_inquiry vs credit_card) to break ties when scores are close. A single float hides that information.
Q3: What if a node’s response needs to be a parsed JSON from the LLM that doesn’t fit AgentResponse?
A: Validate the LLM’s raw JSON with AgentResponse.model_validate(raw_json). If the LLM produces malformed JSON, Pydantic raises ValidationError early — the agent wraps the call and returns a graceful error to the user. This is the right place to put “the LLM doesn’t always behave” defences.