diff --git a/modules/anime_etl/rate_limiter.py b/modules/anime_etl/rate_limiter.py new file mode 100644 index 0000000..c2b06ff --- /dev/null +++ b/modules/anime_etl/rate_limiter.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import asyncio +import time +from abc import ABC, abstractmethod +from typing import Awaitable, Callable, TypeVar, Optional + +T = TypeVar("T") + +NowFn = Callable[[], float] +SleepFn = Callable[[float], Awaitable[None]] + + +class RateLimiter(ABC): + """ + Базовый интерфейс rate-limiter'а. + """ + + @abstractmethod + async def acquire(self) -> None: + """ + Дождаться слота для выполнения запроса. + """ + raise NotImplementedError + + +class TokenBucketRateLimiter(RateLimiter): + """ + Token-bucket rate limiter. + + rate — сколько токенов выдаём за период `per` секунд. + per — длина окна в секундах. + capacity — максимальное количество токенов в корзине (burst). По умолчанию = rate. + + Пример: + limiter = TokenBucketRateLimiter(rate=30, per=60.0) + await limiter.acquire() # перед каждым запросом к AniList + """ + + def __init__( + self, + rate: int, + per: float, + *, + capacity: Optional[int] = None, + now_fn: NowFn | None = None, + sleep_fn: SleepFn | None = None, + ) -> None: + self.rate = float(rate) + self.per = float(per) + self.capacity = float(capacity if capacity is not None else rate) + + self._now: NowFn = now_fn or time.monotonic + self._sleep: SleepFn = sleep_fn or asyncio.sleep + + # начальное состояние: полный бак + self._tokens: float = self.capacity + self._updated_at: float = self._now() + self._lock = asyncio.Lock() + + async def acquire(self) -> None: + """ + Подождать, пока не будет доступен хотя бы один токен. + Важно: ожидание (sleep) происходит ВНЕ lock'а, чтобы несколько корутин + могли "делить" ожидание. + """ + refill_rate_per_second = self.rate / self.per + + while True: + async with self._lock: + now = self._now() + elapsed = now - self._updated_at + + if elapsed > 0: + self._tokens = min( + self.capacity, + self._tokens + elapsed * refill_rate_per_second, + ) + self._updated_at = now + + if self._tokens >= 1.0: + self._tokens -= 1.0 + return + + # токенов нет — считаем, сколько нужно ждать + missing = 1.0 - self._tokens + wait_for = missing / refill_rate_per_second + if wait_for <= 0: + wait_for = 0.01 + + # спим уже без lock'а, чтобы другие могли проверить состояние + await self._sleep(wait_for) + + +def wrap_with_rate_limiter( + limiter: RateLimiter, +) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]: + """ + Декоратор для функций, которые делают запросы к внешнему API. + + Пример: + limiter = TokenBucketRateLimiter(rate=30, per=60.0) + + @wrap_with_rate_limiter(limiter) + async def fetch_something(...): + ... + """ + + def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: + async def wrapper(*args, **kwargs) -> T: + await limiter.acquire() + return await func(*args, **kwargs) + + return wrapper + + return decorator + + +# Глобальные инстансы per-process для источников +ANILIST_RATE_LIMITER = TokenBucketRateLimiter(rate=30, per=60.0) +SHIKIMORI_RATE_LIMITER = TokenBucketRateLimiter(rate=90, per=60.0)