Skip to Content
BackendLangGraph Multi-Agent Streaming

LangGraph Multi-Agent Streaming

What

A langgraph graph compiled with astream_events exposed as async def stream_message() that emits one event per node transition, allowing the SSE endpoint to render the agent’s reasoning path live.

Project Context

In full_project_context_updated.txt -> app/main.py -> /chat/stream, each LangGraph event arrives as either a (node_name, state) tuple for node transitions or a raw dict for token chunks. The SSE consumer turns tuples into {"type": "status", "step": node_name} notifications, and either state["final_response"] or a human_agent/escalation branch is projected to a {"type": "response"} event. Without this every chat reply would be a single block dropped at the end.

How

Node-by-node stream consumer

async def run_graph(): try: async for event in coordinator.stream_message( message, customer_id, conversation_id ): if isinstance(event, tuple): node_name, state = event await q.put({ "type": "status", "step": node_name, "content": f"Processing in {node_name}...", }) # Standard agent end node if "final_response" in state: final_resp = state["final_response"] await q.put({ "type": "response", "content": final_resp.get("message", ""), "metadata": final_resp.get("metadata", {}), "conversation_id": conversation_id, }) # Human escalation branch elif ( node_name.lower() in ("human_agent", "human") or state.get("agent_metadata", {}).get("escalated") ): if "agent_response" in state: await q.put({ "type": "response", "content": state.get("agent_response"), "metadata": { "intent_confidence": state.get("confidence", 1.0), "is_compliant": True, "escalation_id": state.get("agent_metadata", {}) .get("escalation_id"), }, "conversation_id": conversation_id, }) else: await q.put(event) except Exception as e: await q.put({"type": "log", "content": f"[ERROR] {str(e)}"}) finally: await q.put({"type": "done"})
  • The consumer is type-driven (isinstance(event, tuple)) — node events and token chunks travel through the same iterator but have fundamentally different shapes.
  • Two parallel branches project to {"type": "response"}: the normal final_response end node and the human_agent escalation branch. Both go to the UI in the same protocol so the frontend has one render path.
  • Stream errors go onto the same queue as a {"type": "log"} item — never raise out of run_graph, or the SSE connection drops with no UI signal.

Common Pitfalls

Reading final_response without a default raises KeyError mid-stream and kills the SSE connection. Always .get("final_response", {...}) so a missing key degrades to a payload the UI can render.

Blocking the event loop inside a node callback — any requests.post(...) or time.sleep(...) call inside a LangGraph node will freeze the SSE consumer for the duration. Wrap external calls with asyncio.to_thread or use the async client libraries.

Real-World Interview Prep

Q1: When should you call graph.astream_events vs the older graph.stream API in LangGraph?

A: Use astream_events(version="v2") for token-level streaming — RAG, chat completions, anything LLM that produces tokens mid-node. It emits a typed event stream with on_llm_stream, on_tool_start, on_chain_end etc. Use the older graph.stream(input, stream_mode="values"|"updates"|"debug") for node-by-node state transitions where you don’t care about intermediate tokens — when you only care about which agent ran and what its final state was. The FCA implementation here uses the older stream() for routing-logs (output of coordinator.stream_message) and an internal SSE consumer turns (node_name, state) tuples into UI events. Mixing both APIs in the same graph is supported but the event shapes are incompatible downstream.

Q2: How do you handle a mid-graph error without killing the SSE consumer?

A: Wrap the async for event in coordinator.stream_message(...) consumer in try/except Exception as e: await q.put({"type": "log", "content": f"[ERROR] {str(e)}"}) — the queue captures the error and the SSE generator emits it as a log event before sending [DONE]. The client renders the error inline instead of seeing a half-empty stream. Do NOT raise out of the loop; the SSE protocol has no graceful-close that would surface to the browser. Pair this with a finally: await q.put({"type": "done"}) to guarantee the client gets the terminator even on exception paths.

Q3: What is the right place to add tracing when streaming LangGraph events?

A: Two layers. (1) Decorate each async def _node_X(self, state) with @observe(name="intent_classifier.classify") — Langfuse opens a span per node without any extra wiring. (2) Wrap the SSE consumer itself in a @observe(name="chat_stream.consumer") — this parent span covers the entire request and ties the per-node spans to the user/session. In the Langfuse UI you get a tree view where the streaming tokens and the routing decisions are visible together. Critically, never observe the SSE generator (yields bytes to the wire): it runs in a different task and the open-span tree becomes misleading; observe the upstream async-for-loop instead.

Last updated on