Structured Logging with Custom JSONFormatter & Helper Functions
What? (Concept Overview)
Structured logging replaces the human-readable string emitted by the default Python logger with a JSON object containing stable, machine-parseable fields (timestamp, level, logger, module, function, line, plus arbitrary extra keys). The custom JSONFormatter in app/logger.py is a single-class implementation that handles every log call uniformly, and helper functions (log_request, log_agent_action, log_database_query) keep the schema consistent across teams without re-typing field names.
Project Context
The FCA Support Agent emits JSON-only logs to disk (rotating 10MB x 5 backups) for SIEM ingestion, and human-readable logs to stdout for docker logs. The SSELogHandler (a small adapter installed on the root logger) intercepts every log call and pushes a SSE-friendly line into a ContextVar-bound asyncio.Queue, which the SSE endpoint yields to the browser. This pattern unifies CLI log scraping, observability, and live-tailing without recompiling logs for each surface.
How? (Quick Reference Blocks)
3.1 The JSONFormatter Class
# app/logger.py
import json
from datetime import datetime, timezone
import logging
class JSONFormatter(logging.Formatter):
"""Format every log record as a JSON object with stable schema."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.now(timezone.utc)
.isoformat().replace("+00:00", "Z"),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
if hasattr(record, "extra"):
log_data.update(record.extra)
return json.dumps(log_data)3.2 Setup with Rotating Handlers
# app/logger.py — setup_logging
def setup_logging(
log_level: str | None = None,
log_file: str | None = None,
log_format: str | None = None,
) -> None:
log_level = log_level or settings.log_level
log_file = log_file or settings.log_file
log_format = log_format or settings.log_format
text_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
formatter = text_formatter if log_format == "text" else JSONFormatter()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
file_handler = logging.handlers.RotatingFileHandler(
filename=log_file,
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5,
encoding="utf-8",
)
file_handler.setFormatter(JSONFormatter()) # always JSON on disk
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
root_logger.handlers.clear()
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)3.3 Helper Functions That Look Like API Methods
# app/logger.py — log_request / log_agent_action / log_database_query
def log_request(method, path, status_code, duration_ms, extra=None):
logger = get_logger("app.requests")
log_data = {"method": method, "path": path, "status_code": status_code,
"duration_ms": duration_ms, **(extra or {})}
if status_code >= 500: logger.error(f"{method} {path}", extra=log_data)
elif status_code >= 400: logger.warning(f"{method} {path}", extra=log_data)
else: logger.info(f"{method} {path}", extra=log_data)
def log_agent_action(agent_name, action, conversation_id, success, duration_ms, extra=None):
logger = get_logger(f"app.agents.{agent_name}")
log_data = {"agent": agent_name, "action": action,
"conversation_id": conversation_id, "success": success,
"duration_ms": duration_ms, **(extra or {})}
(logger.info if success else logger.error)(
f"Agent {agent_name} - {action}", extra=log_data,
)3.4 The SSELogHandler Bridge (Live-Tail Into Browser)
# app/main.py
stream_queue_var: ContextVar[asyncio.Queue] = ContextVar(
"stream_queue_var", default=None
)
class SSELogHandler(logging.Handler):
def emit(self, record):
q = stream_queue_var.get()
if q is None:
return
msg = f"[{record.name.split('.')[-1].upper()}] {record.getMessage()}"
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(q.put_nowait,
{"type": "log", "content": msg})
except RuntimeError:
pass # no loop (during interpreter shutdown)
logging.getLogger().addHandler(SSELogHandler())Why? (Parameter Breakdown
- Overriding
format()notformatMessage()—format()is the canonical hook inlogging.Formatter. UsingformatMessage()skipsexc_infohandling and breaks tracebacks. datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")— Outputs RFC3339 with explicitZ, the format every modern SIEM (Datadog, Loki, ELK) parses without ambiguity. Plain Pythonisoformat()includes+00:00which trips up older log shippers.- Rotating
RotatingFileHandler(maxBytes=10MB, backupCount=5)— Bounds disk usage to ~60MB per worker in the worst case. Without rotation, a chatty endpoint can fill the disk in minutes and crash the pod. - Filtered third-party loggers (
uvicorn,sqlalchemy,httpx) — These libraries emit at DEBUG by default; in production their volume drowns out the application logs. Setting them toWARNINGcuts 90% of noise with one line. extra={...}kwarg to logger methods — The Python logging module treats unknown kwargs as user “extra” fields and exposes them asrecord.extra. OurJSONFormatterreadshasattr(record, "extra")and merges them into the output dict.- Status-code-driven log level —
log_requestchooses ERROR/WARN/INFO based on status range. Avoids writing log-level routing logic at every call site. ContextVarinstead of a global for SSE queue — Multiple concurrent SSE requests would otherwise share a single queue and corrupt events.ContextVaris per-task, so each request gets its own queue.
Top-to-Bottom Code Walkthrough (app/logger.py)
app/logger.py is organised in four layers: a custom JSON formatter class, a top-level setup_logging() function, three domain-specific helper functions, and a __main__ smoke test. Here is what every piece does.
The module imports logging, logging.config, logging.handlers, and sys — the standard-library tools for configuring Python’s logging. Path from pathlib is used to build the log file’s parent directory. Optional from typing makes helper signatures explicit about which arguments can be omitted. json and datetime/timezone power the JSON formatter. settings from app.config lets the helpers fall back to centralised values (log level, log file, log format) without re-reading .env themselves.
class JSONFormatter(logging.Formatter) defines a custom formatter that emits one JSON object per log line. The format(self, record) method is what Python’s logging system calls when it renders a LogRecord as a string. Inside, a dictionary log_data is built with seven fixed fields: timestamp (ISO-8601 UTC with Z suffix), level (e.g., INFO), logger (the dotted name like app.agents.human_agent), message (the rendered user message), module (file basename), function (function name), line (line number). These seven fields are minimum useful telemetry for any structured-logging backend. If record.exc_info is set (i.e., the log call used logger.exception(...)), log_data["exception"] = self.formatException(record.exc_info) adds a multi-line traceback string. If the caller passed extra={"key": "value"}, hasattr(record, "extra") is True and log_data.update(record.extra) merges the custom fields into the JSON. The function returns json.dumps(log_data) — a single line of valid JSON, easy for Datadog, Loki, or any JSON-ingest pipeline to parse event-by-event.
def setup_logging(log_level, log_file, log_format) is the application-wide configurator. Each parameter is Optional[str] and, if None, falls back to the corresponding settings.* value — so the most common call site is just setup_logging() with no args, reading everything from .env. Path(log_file).parent.mkdir(parents=True, exist_ok=True) is the idempotent directory-creation idiom: parents=True creates intermediate directories; exist_ok=True silently no-ops if it already exists. Without this, the first log write on a fresh machine crashes with FileNotFoundError.
The formatters block builds two: a text_formatter for human-readable console (2024-01-01 12:00:00 - app.database - INFO - Connected) and a json_formatter instance of JSONFormatter(). The ternary formatter = json_formatter if log_format == "json" else text_formatter picks based on the env-driven setting — production keeps JSON for log shipping, local dev keeps text for readability.
The handlers block attaches two sinks. console_handler = logging.StreamHandler(sys.stdout) writes to stdout, level=DEBUG so everything passes through (the root logger’s level filters downstream), formatted with the chosen formatter. file_handler = logging.handlers.RotatingFileHandler(filename=log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8") writes to disk with size-based rotation — when the file reaches 10 MB, it rotates to app.log.1, app.log.2, etc., up to 5 backups. Worst-case disk budget: 10 MB * 6 = 60 MB per worker regardless of run length. Note the file handler’s formatter is always JSON — even if LOG_FORMAT=text, the file is JSON because that’s what gets shipped to aggregators.
The root logger config block is where most multi-handler disasters are born. root_logger = logging.getLogger() gets the root, root_logger.setLevel(log_level) sets the global minimum, and root_logger.handlers.clear() is critical — without it, repeated setup_logging() calls duplicate every log line by however many handlers have piled up (uvicorn’s auto-installed handlers in some configs add another). root_logger.addHandler(console_handler) and addHandler(file_handler) then attach ours.
The third-party logger section is the noisemaker silencer. logging.getLogger("uvicorn").setLevel(logging.INFO) keeps uvicorn’s startup messages; uvicorn.access set to WARNING suppresses per-request access logs because the SSE pipeline already logs them via our own handler. sqlalchemy at WARNING suppresses query-builder noise. httpx at WARNING suppresses Langfuse/SSE HTTP chatter which would otherwise double-log outbound calls. langfuse at WARNING keeps Langfuse quiet so the @observe annotations own the trace narrative alone.
The last line in setup_logging() writes a self-test confirmation: logger.info("Logging configured", extra={"log_level": ..., "log_file": ..., "log_format": ...}) includes the resolved configuration values — every operator’s first instinct is to grep for this line and confirm the setup matches intent.
get_logger(name) is a trivial convenience over logging.getLogger(name). Exists so callers don’t import logging directly when they want a domain-named logger.
log_request(method, path, status_code, duration_ms, extra=None) writes a domain HTTP-request line. It pulls a domain-specific logger app.requests (so ops has a dedicated Grafana panel), merges extra into the structured payload, and the level-by-status ladder (>= 500 → error, >= 400 → warning, else info) auto-tags without callers deciding.
log_agent_action(agent_name, action, conversation_id, success, duration_ms, extra=None) is the same pattern for AI agents: per-agent logger (app.agents.human_agent), success/failure level split, structured conversation_id so traces correlate across the system.
log_database_query(operation, table, duration_ms, rows_affected=0, extra=None) is the DB-query variant. Always DEBUG because per-query noise is too verbose at INFO; flip the global level to DEBUG on a single container when investigating slow queries.
The __main__ block at the bottom is a smoke test: setup_logging(log_level="DEBUG", log_format="text") overrides the level + format to text, runs every severity, calls each helper with plausible inputs, and triggers a logger.exception(...) inside a try/except ValueError to verify the traceback-formatting path works.
Common Pitfalls
- Using
logger.info("...", extra={"message": "..."})— Python’s logging module disallows reserved attribute names;extrakeys cannot collide withLogRecordbuilt-ins (name,msg,args,levelname, …). Trying causes aKeyErrorat log time. Use prefixed keys likeextra={"agent": ..., "thread_id": ...}. - Forgetting that the console handler can run before the file handler is set up. If you forget
root_logger.handlers.clear(), the defaultlastResorthandler prints to stderr in a different format and your tests catch no logs.
Real-World Interview Prep
Q1: How do you correlate logs across services in a microservice topology?
A: Add a trace_id/span_id to every log record. Use OpenTelemetry’s trace.get_current_span().get_span_context() to read these values and pass them as extra= to every logger call, or use otel_instrumentation_logging injects them automatically. Pair with the W3C traceparent HTTP header on inbound requests so the trace ID flows across services. The JSONFormatter should always include trace_id (or None), so SIEM queries can group logs by trace.
Q2: Your CTO says “the logs are too noisy.” What’s your debugging ladder?
A: (1) Inspect log volume per logger (grep -c 'level":"ERROR"' app.log | head); identify the chattiest modules. (2) Bump third-party loggers to WARNING (sqlalchemy, httpx, uvicorn.access). (3) Add per-endpoint sampling for high-RPS endpoints (if random.random() > 0.1: return; logger.info(...) for /health pings). (4) Replace INFO request logs with structured access_log rotating into its own file with reduced retention. (5) For LLM calls, log ONLY the token count + latency, not the prompt (PHI/PII risk).
Q3: When would you prefer structlog over a hand-rolled JSONFormatter?
A: When you need (a) chain-style contextual loggers (log = log.bind(request_id=...); log.info("...")), (b) pre-built integrations for OpenTelemetry / Sentry / Datadog, (c) schema validation on log fields, or (d) WARNING-level type-checking on extra. A hand-rolled JSONFormatter is sufficient for the 80% case where you just need JSON output, full control over the schema, and no extra dependency surface.