LangGraph Multi-Agent Streaming
What
A langgraph graph compiled with astream_events exposed as async def stream_message() that emits one event per node transition, allowing the SSE endpoint to render the agent’s reasoning path live.
Project Context
In full_project_context_updated.txt -> app/main.py -> /chat/stream, each LangGraph event arrives as either a (node_name, state) tuple for node transitions or a raw dict for token chunks. The SSE consumer turns tuples into {"type": "status", "step": node_name} notifications, and either state["final_response"] or a human_agent/escalation branch is projected to a {"type": "response"} event. Without this every chat reply would be a single block dropped at the end.
How
Node-by-node stream consumer
async def run_graph():
try:
async for event in coordinator.stream_message(
message, customer_id, conversation_id
):
if isinstance(event, tuple):
node_name, state = event
await q.put({
"type": "status",
"step": node_name,
"content": f"Processing in {node_name}...",
})
# Standard agent end node
if "final_response" in state:
final_resp = state["final_response"]
await q.put({
"type": "response",
"content": final_resp.get("message", ""),
"metadata": final_resp.get("metadata", {}),
"conversation_id": conversation_id,
})
# Human escalation branch
elif (
node_name.lower() in ("human_agent", "human")
or state.get("agent_metadata", {}).get("escalated")
):
if "agent_response" in state:
await q.put({
"type": "response",
"content": state.get("agent_response"),
"metadata": {
"intent_confidence": state.get("confidence", 1.0),
"is_compliant": True,
"escalation_id": state.get("agent_metadata", {})
.get("escalation_id"),
},
"conversation_id": conversation_id,
})
else:
await q.put(event)
except Exception as e:
await q.put({"type": "log", "content": f"[ERROR] {str(e)}"})
finally:
await q.put({"type": "done"})- The consumer is type-driven (
isinstance(event, tuple)) — node events and token chunks travel through the same iterator but have fundamentally different shapes. - Two parallel branches project to
{"type": "response"}: the normalfinal_responseend node and thehuman_agentescalation branch. Both go to the UI in the same protocol so the frontend has one render path. - Stream errors go onto the same queue as a
{"type": "log"}item — never raise out ofrun_graph, or the SSE connection drops with no UI signal.
Common Pitfalls
Reading final_response without a default raises KeyError mid-stream and kills the SSE connection. Always .get("final_response", {...}) so a missing key degrades to a payload the UI can render.
Blocking the event loop inside a node callback — any requests.post(...) or time.sleep(...) call inside a LangGraph node will freeze the SSE consumer for the duration. Wrap external calls with asyncio.to_thread or use the async client libraries.
Real-World Interview Prep
Q1: When should you call graph.astream_events vs the older graph.stream API in LangGraph?
A: Use astream_events(version="v2") for token-level streaming — RAG, chat completions, anything LLM that produces tokens mid-node. It emits a typed event stream with on_llm_stream, on_tool_start, on_chain_end etc. Use the older graph.stream(input, stream_mode="values"|"updates"|"debug") for node-by-node state transitions where you don’t care about intermediate tokens — when you only care about which agent ran and what its final state was. The FCA implementation here uses the older stream() for routing-logs (output of coordinator.stream_message) and an internal SSE consumer turns (node_name, state) tuples into UI events. Mixing both APIs in the same graph is supported but the event shapes are incompatible downstream.
Q2: How do you handle a mid-graph error without killing the SSE consumer?
A: Wrap the async for event in coordinator.stream_message(...) consumer in try/except Exception as e: await q.put({"type": "log", "content": f"[ERROR] {str(e)}"}) — the queue captures the error and the SSE generator emits it as a log event before sending [DONE]. The client renders the error inline instead of seeing a half-empty stream. Do NOT raise out of the loop; the SSE protocol has no graceful-close that would surface to the browser. Pair this with a finally: await q.put({"type": "done"}) to guarantee the client gets the terminator even on exception paths.
Q3: What is the right place to add tracing when streaming LangGraph events?
A: Two layers. (1) Decorate each async def _node_X(self, state) with @observe(name="intent_classifier.classify") — Langfuse opens a span per node without any extra wiring. (2) Wrap the SSE consumer itself in a @observe(name="chat_stream.consumer") — this parent span covers the entire request and ties the per-node spans to the user/session. In the Langfuse UI you get a tree view where the streaming tokens and the routing decisions are visible together. Critically, never observe the SSE generator (yields bytes to the wire): it runs in a different task and the open-span tree becomes misleading; observe the upstream async-for-loop instead.