Skip to Content
BackendCelery Async Worker Bridge

Celery Async Worker Bridge

What

The repeatable pattern for running asyncio coroutines inside a synchronous Celery task: detect or replace the worker event loop, then loop.run_until_complete(coro()). The worker is sync but the underlying service is an async SQLAlchemy/RAG tool that has no sync API.

Project Context

In full_project_context_updated.txt -> app/worker.py -> ingest_pdf_task, a Celery worker receives a file_path string, instantiates RAGService() (which is async-only), and calls await service.ingest_pdf(file_path). The wrapper function ingest_pdf_task is sync (Celery’s contract) so the inner async work is bridged via the existing or freshly-created event loop. The same bridge is reused for any future async-only task (send_email_task, process_image_task, etc.).

How

Sync Celery task wrapping async service code

celery_app = Celery( "fca_worker", broker=os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0"), backend=os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0"), ) @celery_app.task(name="ingest_pdf_task") def ingest_pdf_task(file_path: str): async def _run_ingest(): service = RAGService() return await service.ingest_pdf(file_path) try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) chunks_count = loop.run_until_complete(_run_ingest()) return {"status": "success", "chunks": chunks_count} except Exception as e: return {"status": "error", "message": str(e)}
  • asyncio.get_event_loop() returns the worker’s current loop if alive, else raises DeprecationWarning/RuntimeError on Python 3.10+. The fallback new_event_loop() + set_event_loop(...) is the correct recovery for daemon workers.
  • The async inner function (_run_ingest) imports the service lazily, so a Celery import-time failure cannot crash the worker before the task is even enqueued.
  • The task returns a structured dict, not a value or exception, so Celery’s result.ready() / result.get() consumers in the web layer can branch on status without parsing tracebacks.

Top-to-Bottom Code Walkthrough (app/worker.py)

app/worker.py is forty lines total and does exactly two things: declare a Celery app and define one task that wraps an async RAG service. Despite the small size, every line is load-bearing — the file is the seam between the synchronous Celery world and the asyncio-only RAG service.

The imports: os for environment variable reads, asyncio for the event-loop dance, Celery from celery for the broker wiring, and RAGService from app.services.rag_service — the only thing the worker actually runs.

celery_app = Celery("fca_worker", broker=os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0"), backend=os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0")) is the module-level Celery app. The string "fca_worker" is the Celery app’s name — it surfaces in celery -A fca_worker inspect ping and in ps listings. broker is the URL Celery’s task queue talks to; here it’s Redis, the broker that purposefully doesn’t lose tasks when a worker is briefly down. os.getenv(URL, default) is the right shape because env vars are strings and Celery’s URL parser accepts that string. The redis://redis:6379/0 default works only when running under docker compose up (where redis resolves to the Redis container’s service name); on a host without compose, you’d point it at redis://localhost:6379/0.

@celery_app.task(name="ingest_pdf_task") decorates the task function. name="ingest_pdf_task" is the registered task name — this is what gets serialised into the broker queue and what the web layer’s .delay(...) matches against. Without an explicit name, Celery uses the qualified function name (app.worker.ingest_pdf_task), which couples the broker payload to the Python module path — refactoring the file or renaming it would silently break enqueuing from the web. The explicit name decouples the wire format from the Python structure: rename the function freely; the broker keeps working.

def ingest_pdf_task(file_path: str) is the sync task body. Celery’s contract is that task functions are sync — they cannot be async def at the top level because Celery’s worker pool expects to call them like pool.apply(task, args). A file_path: str is the only argument; serialisable types are mandatory because the broker pickles them. The docstring states what the task does and notes the sync-via-async bridge, which is the right place for that orientation hint.

The print statement print(f"🚀 [Worker] Starting background ingestion for: {file_path}") is stdout chatter for the worker logs. It uses print rather than logger.info(...) because at this stage the root logger is the broker’s logger, not the app’s logger — apples-to-apples with the broker’s worker console.

async def _run_ingest() is the inner async coroutine. It is defined inside the sync function so the import of RAGService happens at task-call time, not at module-import time. This is critical: if RAGService raises at import (e.g., SQLAlchemy fails because the DB isn’t ready during worker boot), the worker process would crash before it could even start its task loop. Lazy import inside the task body isolates the failure to a single task invocation, not the whole worker.

service = RAGService() constructs the service. RAGService.__init__ sets up its asyncpg pool when first called; subsequent calls re-use the pool. The DB connection is handled internally — the worker is not responsible for session lifecycle; the service owns it.

return await service.ingest_pdf(file_path) is the actual work. ingest_pdf is the async method that reads the PDF, chunks it, embeds each chunk, and persists vectors into pgvector. Awaiting here returns the chunk count.

try: ... except Exception as e: is the safety net. Outside it, any uncaught exception would trigger Celery’s _RETRY-on-error semantics on top of file-system side effects that have already run — meaning the broker re-delivered the task and the same file got re-ingested, duplicate rows. The try/except short-circuits that.

Inside the try, the asyncio dance: loop = asyncio.get_event_loop() retrieves the worker’s currently-installed event loop. if loop.is_closed(): then loop = asyncio.new_event_loop() and asyncio.set_event_loop(loop) creates a fresh loop and installs it as the current thread’s loop. This dance is necessary because of how Celery forks and Python’s asyncio lifecycle interact: when Celery’s --pool=prefork mode starts, each worker process is fork()ed after the parent process’s interpreter initialised an event loop; every child inherits a closed loop in dead state. Without the recreate branch, the first task call raises RuntimeError: Event loop is closed. The is_closed() / new_event_loop() / set_event_loop(...) sequence is the documented recovery.

chunks_count = loop.run_until_complete(_run_ingest()) runs the async coroutine to completion on the worker thread and returns the integer chunk count.

print(f"✅ [Worker] Ingestion complete. Chunks: {chunks_count}") logs the result via print for the worker’s console. return {"status": "success", "chunks": chunks_count} returns a JSON-friendly dict — Celery’s result backend serialises it back to the caller. The structured-dict shape (not a raw int) lets the web layer’s result.ready() / result.get() consumers branch on status without parsing tracebacks.

The except block: return {"status": "error", "message": str(e)} returns a structured error instead of raising. Combined with the success path’s dict, the web layer always gets {"status": ..., ...} and never the traceback text. This is the explicit “don’t trigger Celery RETRY” decision: a retry on a successful partial state (file already moved/renamed/deleted) would silently corrupt downstream.

Common Pitfalls

Reusing a closed event loop crashes with RuntimeError: Event loop is closed after the first invocation. Always check loop.is_closed() and create a fresh loop when needed.

Raising out of the task body triggers Celery’s RETRY semantics on top of an uncaught exception, which retries the whole task including the file system side-effects. Catch broadly and return {"status": "error", ...} so the caller can decide.

Real-World Interview Prep

Q1: How would you debug “RuntimeError: Event loop is closed” inside a Celery worker?

A: Walk three layers. (1) Check asyncio.get_event_loop() is guarded by if loop.is_closed() + recreate via asyncio.new_event_loop() + asyncio.set_event_loop(...) — your code may be missing the recreate branch. (2) Inspect the worker’s worker_process_init signal: Celery forks the worker AFTER asyncio initialises; the loop in the parent process is dead in every child. (3) Use Celery’s --pool=prefork consistently — --pool=threads shares one loop but blocks asyncio concurrency on blocking sync deps. The fallback fix is always recreate-and-cache the loop at module level via tornado.platform.asyncio.AsyncIOMainLoop().install() on Python 3.11+; or migrate the whole task to asyncio.run(...) per submission.

Q2: When would you run sync Celery vs asyncio.run() directly inside the task?

A: Use Celery when you need the broker (Redis/RabbitMQ) features: retries, dead-letter queues, rate limits, scheduled tasks. Use asyncio.run() for one-off scripts that the operator runs on demand. The hybrid pattern (@celery_app.task wrapping asyncio.run(...)) is fine for cron-style work — the broker provides durability, the inner asyncio.run provides the runtime. The pitfall is that asyncio.run() creates-and-closes a new event loop per call, so any shared resource (DB pool, httpx client) MUST be re-initialised per task or moved to apprise-async context.

Q3: How would you add concurrency to a Celery worker that runs many ingest_pdf_task jobs?

A: Use Celery’s --concurrency=N flag (with prefork pool); each worker process runs one task at a time but you can run 4-8 in parallel. For I/O-bound work (like an LLM/api call), --pool=gevent gives many concurrent tasks in one process at the cost of monkey-patching. Verify in production with celery -A app.worker.celery_app inspect statspool.processes and pool.max-concurrency should reflect your goals. If tasks are blocking on PostgreSQL/Redis, profile with pg_stat_activity and redis-cli --latency before adding concurrency.

Last updated on