diff --git a/modules/anime_etl/rabbit_worker.py b/modules/anime_etl/rabbit_worker.py index fed33dc..2c4e595 100644 --- a/modules/anime_etl/rabbit_worker.py +++ b/modules/anime_etl/rabbit_worker.py @@ -6,18 +6,15 @@ import os import sys from typing import Any, Dict -from dotenv import load_dotenv - import aio_pika +from aio_pika.exceptions import AMQPConnectionError import psycopg from psycopg.rows import dict_row from services.anilist_importer import AniListImporter -load_dotenv() - -PG_DSN = os.getenv("NYANIMEDB_PG_DSN") -RMQ_URL = os.getenv("NYANIMEDB_RABBITMQ_URL", "amqp://guest:guest@10.1.0.65:5672/") +PG_DSN = os.getenv("NYANIMEDB_PG_DSN") or os.getenv("DATABASE_URL") +RMQ_URL = os.getenv("NYANIMEDB_RMQ_URL") or os.getenv("RABBITMQ_URL") or "amqp://guest:guest@rabbitmq:5672/" RPC_QUEUE_NAME = os.getenv("NYANIMEDB_IMPORT_RPC_QUEUE", "anime_import_rpc") @@ -95,11 +92,37 @@ def create_handler(channel: aio_pika.Channel): return handle_message +async def connect_rmq_with_retry( + url: str, + retries: int = 20, + delay: float = 3.0, +) -> aio_pika.RobustConnection: + last_exc: Exception | None = None + + for attempt in range(1, retries + 1): + try: + print(f"[worker] Connecting to RabbitMQ ({attempt}/{retries}) {url}", flush=True) + conn = await aio_pika.connect_robust(url) + print("[worker] Connected to RabbitMQ", flush=True) + return conn + except AMQPConnectionError as e: + last_exc = e + print(f"[worker] RabbitMQ connection failed: {e!r}, retry in {delay}s", flush=True) + await asyncio.sleep(delay) + + print("[worker] Failed to connect to RabbitMQ after retries", file=sys.stderr, flush=True) + if last_exc: + raise last_exc + raise RuntimeError("Failed to connect to RabbitMQ") + + async def main() -> None: if not PG_DSN: - raise RuntimeError("NYANIMEDB_PG_DSN is not set") + raise RuntimeError("PG_DSN is not set (NYANIMEDB_PG_DSN / DATABASE_URL)") - connection = await aio_pika.connect_robust(RMQ_URL) + print(f"[worker] Starting. PG_DSN={PG_DSN!r}, RMQ_URL={RMQ_URL!r}, queue={RPC_QUEUE_NAME!r}", flush=True) + + connection = await connect_rmq_with_retry(RMQ_URL) channel = await connection.channel() queue = await channel.declare_queue( @@ -110,6 +133,8 @@ async def main() -> None: handler = create_handler(channel) await queue.consume(handler) + print(f"[*] Waiting for messages in '{RPC_QUEUE_NAME}'. Ctrl+C to exit.", flush=True) + try: await asyncio.Future() # run forever finally: @@ -119,5 +144,4 @@ async def main() -> None: if __name__ == "__main__": if sys.platform.startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - asyncio.run(main())