nyanimedb/modules/anime_etl/rate_limiter.py
2025-12-05 14:04:07 +03:00

122 lines
No EOL
4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import time
from abc import ABC, abstractmethod
from typing import Awaitable, Callable, TypeVar, Optional
T = TypeVar("T")
NowFn = Callable[[], float]
SleepFn = Callable[[float], Awaitable[None]]
class RateLimiter(ABC):
"""
Базовый интерфейс rate-limiter'а.
"""
@abstractmethod
async def acquire(self) -> None:
"""
Дождаться слота для выполнения запроса.
"""
raise NotImplementedError
class TokenBucketRateLimiter(RateLimiter):
"""
Token-bucket rate limiter.
rate — сколько токенов выдаём за период `per` секунд.
per — длина окна в секундах.
capacity — максимальное количество токенов в корзине (burst). По умолчанию = rate.
Пример:
limiter = TokenBucketRateLimiter(rate=30, per=60.0)
await limiter.acquire() # перед каждым запросом к AniList
"""
def __init__(
self,
rate: int,
per: float,
*,
capacity: Optional[int] = None,
now_fn: NowFn | None = None,
sleep_fn: SleepFn | None = None,
) -> None:
self.rate = float(rate)
self.per = float(per)
self.capacity = float(capacity if capacity is not None else rate)
self._now: NowFn = now_fn or time.monotonic
self._sleep: SleepFn = sleep_fn or asyncio.sleep
# начальное состояние: полный бак
self._tokens: float = self.capacity
self._updated_at: float = self._now()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""
Подождать, пока не будет доступен хотя бы один токен.
Важно: ожидание (sleep) происходит ВНЕ lock'а, чтобы несколько корутин
могли "делить" ожидание.
"""
refill_rate_per_second = self.rate / self.per
while True:
async with self._lock:
now = self._now()
elapsed = now - self._updated_at
if elapsed > 0:
self._tokens = min(
self.capacity,
self._tokens + elapsed * refill_rate_per_second,
)
self._updated_at = now
if self._tokens >= 1.0:
self._tokens -= 1.0
return
# токенов нет — считаем, сколько нужно ждать
missing = 1.0 - self._tokens
wait_for = missing / refill_rate_per_second
if wait_for <= 0:
wait_for = 0.01
# спим уже без lock'а, чтобы другие могли проверить состояние
await self._sleep(wait_for)
def wrap_with_rate_limiter(
limiter: RateLimiter,
) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
"""
Декоратор для функций, которые делают запросы к внешнему API.
Пример:
limiter = TokenBucketRateLimiter(rate=30, per=60.0)
@wrap_with_rate_limiter(limiter)
async def fetch_something(...):
...
"""
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
async def wrapper(*args, **kwargs) -> T:
await limiter.acquire()
return await func(*args, **kwargs)
return wrapper
return decorator
# Глобальные инстансы per-process для источников
ANILIST_RATE_LIMITER = TokenBucketRateLimiter(rate=30, per=60.0)
SHIKIMORI_RATE_LIMITER = TokenBucketRateLimiter(rate=90, per=60.0)
JIKAN_RATE_LIMITER = TokenBucketRateLimiter(rate=30, per=60.0)