changed to interact with standalone image downloader service

This commit is contained in:
garaev kamil 2025-12-06 06:37:33 +03:00
parent 93f12666cd
commit fc0ddf334d
2 changed files with 46 additions and 69 deletions

View file

@ -30,6 +30,16 @@ def _choose_primary_name(
return None return None
import re
_SHA1_PATH_RE = re.compile(
r"^[a-zA-Z0-9_-]+/[0-9a-f]{2}/[0-9a-f]{2}/[0-9a-f]{40}\.[a-zA-Z0-9]{1,5}$"
)
def is_normalized_image_path(path: str) -> bool:
return bool(_SHA1_PATH_RE.match(path))
async def get_or_create_image( async def get_or_create_image(
conn: Conn, conn: Conn,
img: Optional[Image], img: Optional[Image],
@ -39,14 +49,22 @@ async def get_or_create_image(
if img is None or not img.image_path: if img is None or not img.image_path:
return None return None
# img.image_path сейчас — URL из AniList
url = img.image_path url = img.image_path
# 1) решаем, куда кладём картинку, и если надо — скачиваем # 1) Если это URL → скачиваем через image-service
rel_path = await ensure_image_downloaded(url, subdir=subdir) if url.startswith("http://") or url.startswith("https://"):
try:
rel_path = await ensure_image_downloaded(url, subdir=subdir)
except Exception as e:
# не удалось скачать картинку — просто пропускаем
return None
else:
# неправильный формат — просто пропуск
return None
# 3) Проверка в базе
async with conn.cursor(row_factory=dict_row) as cur: async with conn.cursor(row_factory=dict_row) as cur:
# 2) пробуем найти уже существующую запись по относительному пути
await cur.execute( await cur.execute(
"SELECT id FROM images WHERE image_path = %s", "SELECT id FROM images WHERE image_path = %s",
(rel_path,), (rel_path,),
@ -55,19 +73,20 @@ async def get_or_create_image(
if row: if row:
return row["id"] return row["id"]
# 3) создаём новую запись # 4) Вставляем запись
await cur.execute( await cur.execute(
""" """
INSERT INTO images (storage_type, image_path) INSERT INTO images (storage_type, image_path)
VALUES (%s, %s) VALUES (%s, %s)
RETURNING id RETURNING id
""", """,
("local", rel_path), ("image-service", rel_path),
) )
row = await cur.fetchone() row = await cur.fetchone()
return row["id"] return row["id"]
async def get_or_create_studio( async def get_or_create_studio(
conn: Conn, conn: Conn,
studio: Optional[Studio], studio: Optional[Studio],

View file

@ -1,77 +1,35 @@
# anime_etl/images/downloader.py
from __future__ import annotations from __future__ import annotations
import asyncio
import hashlib
import os import os
from pathlib import Path from typing import Final
from typing import Tuple
from urllib.parse import urlparse
import httpx import httpx
# Корень хранилища картинок внутри контейнера/процесса IMAGE_SERVICE_URL: Final[str] = os.getenv(
MEDIA_ROOT = Path(os.getenv("NYANIMEDB_MEDIA_ROOT", "media")).resolve() "NYANIMEDB_IMAGE_SERVICE_URL",
"http://127.0.0.1:8000"
)
def _guess_ext_from_url(url: str) -> str:
path = urlparse(url).path
_, ext = os.path.splitext(path)
if ext and len(ext) <= 5:
return ext
return ".jpg"
def _build_rel_path_from_hash(h: str, ext: str, subdir: str = "posters") -> Tuple[str, Path]:
"""
Строим путь вида subdir/ab/cd/<hash>.ext по sha1-хешу содержимого.
"""
level1 = h[:2]
level2 = h[2:4]
rel = f"{subdir}/{level1}/{level2}/{h}{ext}"
abs_path = MEDIA_ROOT / rel
return rel, abs_path
async def _fetch_bytes(url: str) -> bytes:
async with httpx.AsyncClient(timeout=20.0) as client:
r = await client.get(url)
r.raise_for_status()
return r.content
async def ensure_image_downloaded(url: str, subdir: str = "posters") -> str: async def ensure_image_downloaded(url: str, subdir: str = "posters") -> str:
""" """
Гарантирует, что картинка по URL лежит в MEDIA_ROOT/subdir в структуре: Просит image-service скачать картинку по URL и сохранить её у себя.
subdir/ab/cd/<sha1(content)>.ext
Возвращает относительный путь (для записи в БД). Возвращает относительный путь (subdir/ab/cd/<sha1>.ext),
Один и тот же файл (по содержимому) всегда даёт один и тот же путь, который можно писать в images.image_path.
даже если URL меняется.
""" """
# Скачиваем данные async with httpx.AsyncClient(timeout=20.0) as client:
data = await _fetch_bytes(url) resp = await client.post(
f"{IMAGE_SERVICE_URL}/download-by-url",
json={"url": url, "subdir": subdir},
)
resp.raise_for_status()
data = resp.json()
# Хешируем именно содержимое, а не URL # ожидаем {"path": "..."}
h = hashlib.sha1(data).hexdigest() path = data["path"]
ext = _guess_ext_from_url(url) if not isinstance(path, str):
raise RuntimeError(f"Invalid response from image service: {data!r}")
rel, abs_path = _build_rel_path_from_hash(h, ext, subdir=subdir) return path
# Если файл уже есть (другой процесс/воркер успел сохранить) — просто возвращаем путь
if abs_path.exists():
return rel
abs_path.parent.mkdir(parents=True, exist_ok=True)
# Пишем во временный файл и затем делаем atomic rename
tmp_path = abs_path.with_suffix(abs_path.suffix + ".tmp")
def _write() -> None:
with open(tmp_path, "wb") as f:
f.write(data)
# os.replace атомарно заменит файл, даже если он уже появился
os.replace(tmp_path, abs_path)
await asyncio.to_thread(_write)
return rel