nyanimedb/modules/anime_etl/db/repository.py
nihonium fe1bf7ec10
All checks were successful
Build and Deploy Go App / build (push) Successful in 11m53s
Build and Deploy Go App / deploy (push) Successful in 48s
fix(etl): sql enum for image storage
2025-12-06 08:38:34 +03:00

254 lines
No EOL
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# anime_etl/db/repository.py
from __future__ import annotations
import json
from typing import Optional, Dict, List
import psycopg
from psycopg.rows import dict_row
from models import CanonicalTitle, Studio, Image
from images.downloader import ensure_image_downloaded
Conn = psycopg.AsyncConnection
def _choose_primary_name(
title_names: Dict[str, List[str]],
) -> Optional[tuple[str, str]]:
# (lang, name)
for lang in ("en", "romaji", "ja"):
variants = title_names.get(lang) or []
if variants:
return lang, variants[0]
for lang, variants in title_names.items():
if variants:
return lang, variants[0]
return None
import re
_SHA1_PATH_RE = re.compile(
r"^[a-zA-Z0-9_-]+/[0-9a-f]{2}/[0-9a-f]{2}/[0-9a-f]{40}\.[a-zA-Z0-9]{1,5}$"
)
def is_normalized_image_path(path: str) -> bool:
return bool(_SHA1_PATH_RE.match(path))
async def get_or_create_image(
conn: Conn,
img: Optional[Image],
*,
subdir: str = "posters",
) -> Optional[int]:
if img is None or not img.image_path:
return None
url = img.image_path
# 1) Если это URL → скачиваем через image-service
if url.startswith("http://") or url.startswith("https://"):
try:
rel_path = await ensure_image_downloaded(url, subdir=subdir)
except Exception as e:
# не удалось скачать картинку — просто пропускаем
return None
else:
# неправильный формат — просто пропуск
return None
# 3) Проверка в базе
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(
"SELECT id FROM images WHERE image_path = %s",
(rel_path,),
)
row = await cur.fetchone()
if row:
return row["id"]
# 4) Вставляем запись
await cur.execute(
"""
INSERT INTO images (storage_type, image_path)
VALUES (%s, %s)
RETURNING id
""",
("local", rel_path),
)
row = await cur.fetchone()
return row["id"]
async def get_or_create_studio(
conn: Conn,
studio: Optional[Studio],
) -> Optional[int]:
if studio is None or not studio.name:
return None
async with conn.cursor(row_factory=dict_row) as cur:
# 1. Сначала ищем студию
await cur.execute(
"SELECT id, illust_id, studio_desc FROM studios WHERE studio_name = %s",
(studio.name,),
)
row = await cur.fetchone()
if row:
studio_id = row["id"]
illust_id = row["illust_id"]
studio_desc = row["studio_desc"]
# 1a. Если нет illust_id, а нам пришёл постер — докачаем и обновим
if illust_id is None and studio.poster is not None:
illust_id = await get_or_create_image(conn, studio.poster, subdir="studios")
await cur.execute(
"UPDATE studios SET illust_id = %s WHERE id = %s",
(illust_id, studio_id),
)
# 1b. Если нет описания, а enrich уже поднял description — обновим описание
if studio_desc is None and studio.description:
await cur.execute(
"UPDATE studios SET studio_desc = %s WHERE id = %s",
(studio.description, studio_id),
)
return studio_id
# 2. Студии нет — создаём
illust_id: Optional[int] = None
if studio.poster is not None:
illust_id = await get_or_create_image(conn, studio.poster, subdir="studios")
await cur.execute(
"""
INSERT INTO studios (studio_name, illust_id, studio_desc)
VALUES (%s, %s, %s)
RETURNING id
""",
(studio.name, illust_id, studio.description),
)
row = await cur.fetchone()
return row["id"]
async def find_title_id_by_name_and_year(
conn: Conn,
title_names: Dict[str, List[str]],
release_year: Optional[int],
) -> Optional[int]:
if release_year is None:
return None
pair = _choose_primary_name(title_names)
if not pair:
return None
lang, primary_name = pair
probe = json.dumps({lang: [primary_name]})
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(
"""
SELECT id
FROM titles
WHERE release_year = %s
AND title_names @> %s::jsonb
LIMIT 1
""",
(release_year, probe),
)
row = await cur.fetchone()
if not row:
return None
return row["id"]
async def insert_title(
conn: Conn,
title: CanonicalTitle,
studio_id: Optional[int],
poster_id: Optional[int],
) -> int:
episodes_len_json = (
json.dumps(title.episodes_len) if title.episodes_len is not None else None
)
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(
"""
INSERT INTO titles (
title_names,
studio_id,
poster_id,
title_status,
rating,
rating_count,
release_year,
release_season,
season,
episodes_aired,
episodes_all,
episodes_len
)
VALUES (
%s::jsonb,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s::jsonb
)
RETURNING id
""",
(
json.dumps(title.title_names),
studio_id,
poster_id,
title.title_status,
title.rating,
title.rating_count,
title.release_year,
title.release_season,
title.season,
title.episodes_aired,
title.episodes_all,
episodes_len_json,
),
)
row = await cur.fetchone()
return row["id"]
async def insert_title_if_not_exists(
conn: Conn,
title: CanonicalTitle,
studio_id: Optional[int],
poster_id: Optional[int],
) -> int:
existing_id = await find_title_id_by_name_and_year(
conn,
title.title_names,
title.release_year,
)
if existing_id is not None:
return existing_id
return await insert_title(conn, title, studio_id, poster_id)