etl module added
This commit is contained in:
parent
0f619dd954
commit
ff36173720
16 changed files with 1573 additions and 0 deletions
123
modules/anime_etl/rabbit_worker.py
Normal file
123
modules/anime_etl/rabbit_worker.py
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Dict
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
import aio_pika
|
||||
import psycopg
|
||||
from psycopg.rows import dict_row
|
||||
|
||||
from services.anilist_importer import AniListImporter
|
||||
|
||||
load_dotenv()
|
||||
|
||||
PG_DSN = os.getenv("NYANIMEDB_PG_DSN")
|
||||
RMQ_URL = os.getenv("NYANIMEDB_RABBITMQ_URL", "amqp://guest:guest@10.1.0.65:5672/")
|
||||
RPC_QUEUE_NAME = os.getenv("NYANIMEDB_IMPORT_RPC_QUEUE", "anime_import_rpc")
|
||||
|
||||
|
||||
def rmq_request_to_filters(payload: Dict[str, Any]) -> Dict[str, Any]:
|
||||
filters: Dict[str, Any] = {}
|
||||
|
||||
name = payload.get("name")
|
||||
if isinstance(name, str) and name.strip():
|
||||
filters["query"] = name.strip()
|
||||
|
||||
year = payload.get("year")
|
||||
if isinstance(year, int) and year > 0:
|
||||
filters["year"] = year
|
||||
|
||||
season = payload.get("season")
|
||||
if isinstance(season, str) and season:
|
||||
filters["season"] = season.lower()
|
||||
|
||||
filters.setdefault("limit", 10)
|
||||
return filters
|
||||
|
||||
|
||||
def create_handler(channel: aio_pika.Channel):
|
||||
async def handle_message(message: aio_pika.IncomingMessage) -> None:
|
||||
async with message.process():
|
||||
try:
|
||||
payload = json.loads(message.body.decode("utf-8"))
|
||||
except json.JSONDecodeError:
|
||||
return
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
return
|
||||
|
||||
filters = rmq_request_to_filters(payload)
|
||||
timestamp = payload.get("timestamp")
|
||||
|
||||
try:
|
||||
async with await psycopg.AsyncConnection.connect(
|
||||
PG_DSN,
|
||||
row_factory=dict_row,
|
||||
) as conn:
|
||||
importer = AniListImporter()
|
||||
titles = await importer.import_by_filters_in_tx(conn, filters)
|
||||
|
||||
response: dict[str, Any] = {
|
||||
"timestamp": timestamp,
|
||||
"ok": True,
|
||||
"titles": titles,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
response = {
|
||||
"timestamp": timestamp,
|
||||
"ok": False,
|
||||
"titles": [],
|
||||
"error": {
|
||||
"code": "import_failed",
|
||||
"message": str(e),
|
||||
},
|
||||
}
|
||||
|
||||
body = json.dumps(response).encode("utf-8")
|
||||
|
||||
if message.reply_to:
|
||||
await channel.default_exchange.publish(
|
||||
aio_pika.Message(
|
||||
body=body,
|
||||
content_type="application/json",
|
||||
correlation_id=message.correlation_id,
|
||||
),
|
||||
routing_key=message.reply_to,
|
||||
)
|
||||
|
||||
return handle_message
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
if not PG_DSN:
|
||||
raise RuntimeError("NYANIMEDB_PG_DSN is not set")
|
||||
|
||||
connection = await aio_pika.connect_robust(RMQ_URL)
|
||||
channel = await connection.channel()
|
||||
|
||||
queue = await channel.declare_queue(
|
||||
RPC_QUEUE_NAME,
|
||||
durable=True,
|
||||
)
|
||||
|
||||
handler = create_handler(channel)
|
||||
await queue.consume(handler)
|
||||
|
||||
try:
|
||||
await asyncio.Future() # run forever
|
||||
finally:
|
||||
await connection.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.platform.startswith("win"):
|
||||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||||
|
||||
asyncio.run(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue