rate_limiter impl
This commit is contained in:
parent
51bf7b6f7e
commit
df188cb531
1 changed files with 121 additions and 0 deletions
121
modules/anime_etl/rate_limiter.py
Normal file
121
modules/anime_etl/rate_limiter.py
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Awaitable, Callable, TypeVar, Optional
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
NowFn = Callable[[], float]
|
||||
SleepFn = Callable[[float], Awaitable[None]]
|
||||
|
||||
|
||||
class RateLimiter(ABC):
|
||||
"""
|
||||
Базовый интерфейс rate-limiter'а.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def acquire(self) -> None:
|
||||
"""
|
||||
Дождаться слота для выполнения запроса.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class TokenBucketRateLimiter(RateLimiter):
|
||||
"""
|
||||
Token-bucket rate limiter.
|
||||
|
||||
rate — сколько токенов выдаём за период `per` секунд.
|
||||
per — длина окна в секундах.
|
||||
capacity — максимальное количество токенов в корзине (burst). По умолчанию = rate.
|
||||
|
||||
Пример:
|
||||
limiter = TokenBucketRateLimiter(rate=30, per=60.0)
|
||||
await limiter.acquire() # перед каждым запросом к AniList
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
rate: int,
|
||||
per: float,
|
||||
*,
|
||||
capacity: Optional[int] = None,
|
||||
now_fn: NowFn | None = None,
|
||||
sleep_fn: SleepFn | None = None,
|
||||
) -> None:
|
||||
self.rate = float(rate)
|
||||
self.per = float(per)
|
||||
self.capacity = float(capacity if capacity is not None else rate)
|
||||
|
||||
self._now: NowFn = now_fn or time.monotonic
|
||||
self._sleep: SleepFn = sleep_fn or asyncio.sleep
|
||||
|
||||
# начальное состояние: полный бак
|
||||
self._tokens: float = self.capacity
|
||||
self._updated_at: float = self._now()
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def acquire(self) -> None:
|
||||
"""
|
||||
Подождать, пока не будет доступен хотя бы один токен.
|
||||
Важно: ожидание (sleep) происходит ВНЕ lock'а, чтобы несколько корутин
|
||||
могли "делить" ожидание.
|
||||
"""
|
||||
refill_rate_per_second = self.rate / self.per
|
||||
|
||||
while True:
|
||||
async with self._lock:
|
||||
now = self._now()
|
||||
elapsed = now - self._updated_at
|
||||
|
||||
if elapsed > 0:
|
||||
self._tokens = min(
|
||||
self.capacity,
|
||||
self._tokens + elapsed * refill_rate_per_second,
|
||||
)
|
||||
self._updated_at = now
|
||||
|
||||
if self._tokens >= 1.0:
|
||||
self._tokens -= 1.0
|
||||
return
|
||||
|
||||
# токенов нет — считаем, сколько нужно ждать
|
||||
missing = 1.0 - self._tokens
|
||||
wait_for = missing / refill_rate_per_second
|
||||
if wait_for <= 0:
|
||||
wait_for = 0.01
|
||||
|
||||
# спим уже без lock'а, чтобы другие могли проверить состояние
|
||||
await self._sleep(wait_for)
|
||||
|
||||
|
||||
def wrap_with_rate_limiter(
|
||||
limiter: RateLimiter,
|
||||
) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
|
||||
"""
|
||||
Декоратор для функций, которые делают запросы к внешнему API.
|
||||
|
||||
Пример:
|
||||
limiter = TokenBucketRateLimiter(rate=30, per=60.0)
|
||||
|
||||
@wrap_with_rate_limiter(limiter)
|
||||
async def fetch_something(...):
|
||||
...
|
||||
"""
|
||||
|
||||
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
|
||||
async def wrapper(*args, **kwargs) -> T:
|
||||
await limiter.acquire()
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
# Глобальные инстансы per-process для источников
|
||||
ANILIST_RATE_LIMITER = TokenBucketRateLimiter(rate=30, per=60.0)
|
||||
SHIKIMORI_RATE_LIMITER = TokenBucketRateLimiter(rate=90, per=60.0)
|
||||
Loading…
Add table
Add a link
Reference in a new issue