121 lines
3.9 KiB
Python
121 lines
3.9 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import time
|
||
from abc import ABC, abstractmethod
|
||
from typing import Awaitable, Callable, TypeVar, Optional
|
||
|
||
T = TypeVar("T")
|
||
|
||
NowFn = Callable[[], float]
|
||
SleepFn = Callable[[float], Awaitable[None]]
|
||
|
||
|
||
class RateLimiter(ABC):
|
||
"""
|
||
Базовый интерфейс rate-limiter'а.
|
||
"""
|
||
|
||
@abstractmethod
|
||
async def acquire(self) -> None:
|
||
"""
|
||
Дождаться слота для выполнения запроса.
|
||
"""
|
||
raise NotImplementedError
|
||
|
||
|
||
class TokenBucketRateLimiter(RateLimiter):
|
||
"""
|
||
Token-bucket rate limiter.
|
||
|
||
rate — сколько токенов выдаём за период `per` секунд.
|
||
per — длина окна в секундах.
|
||
capacity — максимальное количество токенов в корзине (burst). По умолчанию = rate.
|
||
|
||
Пример:
|
||
limiter = TokenBucketRateLimiter(rate=30, per=60.0)
|
||
await limiter.acquire() # перед каждым запросом к AniList
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
rate: int,
|
||
per: float,
|
||
*,
|
||
capacity: Optional[int] = None,
|
||
now_fn: NowFn | None = None,
|
||
sleep_fn: SleepFn | None = None,
|
||
) -> None:
|
||
self.rate = float(rate)
|
||
self.per = float(per)
|
||
self.capacity = float(capacity if capacity is not None else rate)
|
||
|
||
self._now: NowFn = now_fn or time.monotonic
|
||
self._sleep: SleepFn = sleep_fn or asyncio.sleep
|
||
|
||
# начальное состояние: полный бак
|
||
self._tokens: float = self.capacity
|
||
self._updated_at: float = self._now()
|
||
self._lock = asyncio.Lock()
|
||
|
||
async def acquire(self) -> None:
|
||
"""
|
||
Подождать, пока не будет доступен хотя бы один токен.
|
||
Важно: ожидание (sleep) происходит ВНЕ lock'а, чтобы несколько корутин
|
||
могли "делить" ожидание.
|
||
"""
|
||
refill_rate_per_second = self.rate / self.per
|
||
|
||
while True:
|
||
async with self._lock:
|
||
now = self._now()
|
||
elapsed = now - self._updated_at
|
||
|
||
if elapsed > 0:
|
||
self._tokens = min(
|
||
self.capacity,
|
||
self._tokens + elapsed * refill_rate_per_second,
|
||
)
|
||
self._updated_at = now
|
||
|
||
if self._tokens >= 1.0:
|
||
self._tokens -= 1.0
|
||
return
|
||
|
||
# токенов нет — считаем, сколько нужно ждать
|
||
missing = 1.0 - self._tokens
|
||
wait_for = missing / refill_rate_per_second
|
||
if wait_for <= 0:
|
||
wait_for = 0.01
|
||
|
||
# спим уже без lock'а, чтобы другие могли проверить состояние
|
||
await self._sleep(wait_for)
|
||
|
||
|
||
def wrap_with_rate_limiter(
|
||
limiter: RateLimiter,
|
||
) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
|
||
"""
|
||
Декоратор для функций, которые делают запросы к внешнему API.
|
||
|
||
Пример:
|
||
limiter = TokenBucketRateLimiter(rate=30, per=60.0)
|
||
|
||
@wrap_with_rate_limiter(limiter)
|
||
async def fetch_something(...):
|
||
...
|
||
"""
|
||
|
||
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
|
||
async def wrapper(*args, **kwargs) -> T:
|
||
await limiter.acquire()
|
||
return await func(*args, **kwargs)
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
# Глобальные инстансы per-process для источников
|
||
ANILIST_RATE_LIMITER = TokenBucketRateLimiter(rate=30, per=60.0)
|
||
SHIKIMORI_RATE_LIMITER = TokenBucketRateLimiter(rate=90, per=60.0)
|