FastAPI SSE Streaming with ContextVar + asyncio.Queue
What
Server-Sent Events streaming in FastAPI using an asyncio.Queue shared between an inner background coroutine and an outer StreamingResponse generator, with a ContextVar so global logging handlers can transparently route into the per-request stream.
Project Context
In full_project_context_updated.txt -> app/main.py this drives the /chat/stream endpoint. AgentCoordinator.stream_message() runs as a background task and pushes LangGraph node tuples onto a per-request queue. The response generator yields each item as data: {...} SSE events. The SSELogHandler reads the same ContextVar so any logger.info(...) call inside the agent notebook flows live to the UI without the agent knowing it is inside an HTTP request.
How
Background task + SSE generator sharing a queue
@app.get("/chat/stream")
async def chat_stream(message: str, customer_id: int, conversation_id: int = 0):
async def event_generator():
q = asyncio.Queue()
token = stream_queue_var.set(q)
async def run_graph():
try:
async for event in coordinator.stream_message(
message, customer_id, conversation_id
):
if isinstance(event, tuple):
node_name, state = event
await q.put({"type": "status", "step": node_name})
if "final_response" in state:
await q.put({
"type": "response",
"content": state["final_response"].get("message", ""),
"metadata": state["final_response"].get("metadata", {}),
})
else:
await q.put(event)
finally:
await q.put({"type": "done"})
task = asyncio.create_task(run_graph())
try:
while True:
item = await q.get()
if item.get("type") == "done":
yield "data: [DONE]\n\n"
break
yield f"data: {json.dumps(item)}\n\n"
finally:
stream_queue_var.reset(token)
task.cancel()
return StreamingResponse(event_generator(), media_type="text/event-stream")stream_queue_varis aContextVar, not a module global, so concurrent streams never share queues; ContextVar carries the queue across everyawaitinside the same task tree.asyncio.create_task(run_graph())puts the LangGraph loop on the same event loop as the SSE generator, so they interleave without blocking.- Yielding
"data: ...\n\n"is the SSE wire format — every event MUST terminate with a blank line, otherwise the client will hang. task.cancel()insidefinallyis critical: an un-cancelled background task keeps running after the client disconnects, eventually filling the queue and crashing the loop.
ContextVar + SSELogHandler
stream_queue_var: ContextVar[asyncio.Queue] = ContextVar(
"stream_queue_var", default=None
)
class SSELogHandler(logging.Handler):
def emit(self, record):
q = stream_queue_var.get()
if q is not None:
msg = f"[{record.name.split('.')[-1].upper()}] {record.getMessage()}"
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(
q.put_nowait, {"type": "log", "content": msg}
)
except RuntimeError:
pass
logging.getLogger().addHandler(SSELogHandler())- Prefer
ContextVaroverthreading.local— async handlers all share one thread but switch tasks acrossawait. ContextVar follows the task tree, the OS thread does not. call_soon_threadsafeis defensive for the future case where a sync logger call runs on a worker thread; in pure-async code it is harmless but it is the safe belt-and-braces form.- A logging handler MUST NOT raise — wrap
RuntimeErrorat minimum so a buggy log call cannot bring down a request.
Top-to-Bottom Code Walkthrough (app/main.py — SSE bridge)
The SSE endpoint, the SSELogHandler, and the ContextVar plumbing all live in app/main.py. Together they solve one problem: pipe LangGraph node events AND any internal logger.info(...) call out of the agent notebook, into the browser, as live Server-Sent Events.
stream_queue_var: ContextVar[asyncio.Queue] = ContextVar("stream_queue_var", default=None) declares a context-bound asyncio.Queue slot. The default=None says “if no caller has set this yet, return None” — the SSELogHandler.emit check if q is not None: return then short-circuits. Crucially, this is NOT a module global — ContextVar is designed for asyncio, where each task gets its own copy. Concurrent SSE requests therefore never share queues. record.name.split('.')[-1].upper() in the handler appends record.getMessage() to give the UI a short prefix per log line (e.g., [COORDINATOR] Started ingest).
class SSELogHandler(logging.Handler) is a custom logging.Handler whose only job is emit(self, record). Inside, q = stream_queue_var.get() retrieves the queue for the currently-running task (or None for non-SSE contexts). if q is not None: proceeds to the safe emit path. The format f"[{record.name.split('.')[-1].upper()}] {record.getMessage()}" produces lines like [COORDINATOR] Started routing for easy grep. try: loop = asyncio.get_running_loop(); loop.call_soon_threadsafe(q.put_nowait, {"type": "log", "content": msg}) is the belt-and-braces form: get_running_loop() first asserts we’re inside an async context; call_soon_threadsafe is needed if emit ever gets called from a worker thread (e.g., SQLAlchemy sync code emits logger.info(...) from a thread). except RuntimeError: pass swallows the no-loop case (interpreter shutdown, sync test) — handlers MUST NOT raise. logging.getLogger().addHandler(SSELogHandler()) installs the handler on the root logger; it fires for every logger.info(...) call anywhere in the app.
@app.get("/chat/stream") registers the endpoint. The signature message: str, customer_id: int, conversation_id: int = 0 takes message + customer ID as query params; conversation_id defaults to 0 (start a new conversation). The function returns a StreamingResponse whose body is the inner event_generator async generator; the media_type="text/event-stream" header is the SSE protocol signal.
Inside event_generator: q = asyncio.Queue() creates the per-request queue; token = stream_queue_var.set(q) binds the queue to the current async task. From this line onward, every logger.info(...) in this task tree sees stream_queue_var.get() == q — not None — and SSELogHandler.emit pushes into it.
The inner async def run_graph() runs the LangGraph loop off the response generator path. async for event in coordinator.stream_message(message, customer_id, conversation_id): iterates the events the coordinator emits (tuples of (node_name, state) or pre-built dicts). if isinstance(event, tuple): branches to the tuple form: node_name, state = event unpacks; await q.put({"type": "status", "step": node_name}) pushes a UI status update; then the conditional checks for final_response in state (the standard end-of-graph response) or for human-agent-escalation indicators. Both paths push a typed {"type": "response", ...} dict onto the queue. The else: await q.put(event) covers events that are already dicts (e.g., the pre-formatted log lines from the agent). except Exception as e: await q.put({"type": "log", "content": f"[ERROR] {str(e)}"}) catches ANY exception in the graph loop and surfaces it as a log event — never re-raises out of the loop. finally: await q.put({"type": "done"}) guarantees the terminator reaches the queue.
task = asyncio.create_task(run_graph()) schedules the background task on the same event loop as the SSE generator, so they interleave via await preemption — no thread, no queue, no callback wiring beyond that one create_task.
The response generator’s main loop: while True: item = await q.get() blocks until the next item; if item.get("type") == "done": yield "data: [DONE]\n\n"; break writes the SSR-protocol terminator (literal data: [DONE] followed by the mandatory blank line) and exits. Otherwise yield f"data: {json.dumps(item)}\n\n" writes the JSON payload wrapped in data: prefix and the \n\n terminator. The PipelineResponse flushes after every yield — the browser sees events in real time.
finally: stream_queue_var.reset(token); task.cancel() is the cleanup path. reset(token) returns the ContextVar to its previous state — essential so a next SSE request on the same worker doesn’t inherit a leaked queue. task.cancel() cancels the background coroutine, halts run_graph, and (since we’re inside an awaiting task) raises CancelledError inside it which the inner except clause and finally handle cleanly. Without task.cancel(), the background coroutine keeps producing events after the client disconnects, fills a finite queue, and raises QueueFull retroactively.
The app.include_router(...) lines register the auth, messages, and admin routers. The endpoint /chat/stream is intentionally registered inline on app rather than via a router because it depends on the global coordinator and stream_queue_var — moving it to a router would require either exporting those globals or recreating them per-router, which is more surface than it’s worth.
Common Pitfalls
Missing task.cancel() keeps the background coroutine alive after the client disconnects. The undrained queue fills, put_nowait raises QueueFull, and the loop logs LostConnection errors minutes after every abandoned stream.
Yielding without the terminating blank line violates the SSE wire format (data: <event>\n\n). The client waits indefinitely; the stream appears to hang silently and the bug only surfaces on long-running first responses.
Real-World Interview Prep
Q1: Why should you prefer ContextVar over threading.local for cross-await state?
A: threading.local is tied to the OS thread. Async code shares a thread but switches tasks across await boundaries; threading.local cannot follow the task. ContextVar is part of contextvars, designed for asyncio tasks: each task gets its own copy of any ContextVar set inside the task tree. For SSE, that means the per-request queue flows through every await in the request handler without ever bleeding into a concurrent request on the same thread. Bonus: ContextVar.reset(token) lets you tear down the context cleanly inside finally, which threading.local cannot do without a manual wrapper.
Q2: How do you gracefully terminate an SSE stream when the client disconnects mid-reply?
A: Three layers. (1) The event_generator runs as an async generator; FastAPI’s StreamingResponse calls .aclose() when the client disconnects. Wrap your body in try/finally with task.cancel() to kill the background coroutine. (2) Inside the background task, listen for asyncio.CancelledError and break the loop; do not swallow it. (3) On the server, publish the disconnect to a per-connection cleanup hook (e.g., decrement an in-flight gauge for /metrics). Without all three the dead task stays alive, fills the queue, and visibly corrupts the next request — many “stream hangs randomly” bugs in FastAPI are exactly this.
Q3: When would you choose SSE over WebSockets?
A: SSE fits server-to-client streaming over plain HTTP — perfect for chat transcripts, log tails, GenAI tokens. It’s a single-direction stream over a long-lived HTTP response; the browser auto-reconnects on socket close. WebSockets fit bidirectional. Choose SSE when (a) the client never talks back during the stream, (b) you want HTTP auth/semantics to apply directly, (c) you want to leverage a CDN/proxy (Cloudflare pass-through works for SSE; WebSockets need a dedicated routing layer). Avoid SSE when you need sub-100ms BOTH-WAYS latency (multiplayer games, IDE collab) — the HTTP handshake overhead per-packet is too costly.