diff --git a/telebot/util.py b/telebot/util.py index c0c4d2d3a..ffadfe8f5 100644 --- a/telebot/util.py +++ b/telebot/util.py @@ -638,6 +638,50 @@ def antiflood(function: Callable, *args, number_retries=5, **kwargs): return function(*args, **kwargs) +async def async_antiflood(function: Callable, *args, number_retries=5, **kwargs): + """ + Async counterpart to :func:`antiflood`. Awaits ``function(*args, **kwargs)`` + in a retry loop that honours Telegram's ``retry_after`` parameter on 429 + responses. Use with :obj:`telebot.async_telebot.AsyncTeleBot` calls. + + Example: + + .. code-block:: python3 + + from telebot.util import async_antiflood + + for chat_id in chat_id_list: + msg = await async_antiflood(bot.send_message, chat_id, text) + + :param function: The awaitable callable to invoke. Called fresh on each + attempt, so pass the function itself (for example ``bot.send_message``), + not a pre-awaited coroutine. + :type function: :obj:`Callable` + + :param number_retries: Total number of attempts. After ``number_retries - 1`` + retried 429 responses the final call is issued without the 429 guard, + so its exception will propagate. + :type number_retries: :obj:`int` + + :param args: Positional arguments forwarded to ``function``. + :param kwargs: Keyword arguments forwarded to ``function``. + + :return: Whatever ``function`` returns on the first successful call. + """ + from telebot.asyncio_helper import ApiTelegramException + import asyncio + + for _ in range(number_retries - 1): + try: + return await function(*args, **kwargs) + except ApiTelegramException as ex: + if ex.error_code == 429: + await asyncio.sleep(ex.result_json['parameters']['retry_after']) + else: + raise + return await function(*args, **kwargs) + + def parse_web_app_data(token: str, raw_init_data: str): """ Parses web app data. @@ -719,7 +763,7 @@ def extract_bot_id(token) -> Union[int, None]: "chunks", "generate_random_token", "pil_image_to_file", "is_command", "extract_command", "extract_arguments", "split_string", "smart_split", "escape", "user_link", "quick_markup", - "antiflood", "parse_web_app_data", "validate_web_app_data", + "antiflood", "async_antiflood", "parse_web_app_data", "validate_web_app_data", "or_set", "or_clear", "orify", "OrEvent", "per_thread", "webhook_google_functions", "validate_token", "extract_bot_id" ) diff --git a/tests/test_async_telebot.py b/tests/test_async_telebot.py index fae838a52..ceaa33fbe 100644 --- a/tests/test_async_telebot.py +++ b/tests/test_async_telebot.py @@ -74,3 +74,123 @@ async def driver(): assert bot._pending_tasks == set(), ( "Completed processing tasks must be discarded from _pending_tasks" ) + + +def _fake_429(retry_after: int): + from telebot.asyncio_helper import ApiTelegramException + + class _FakeResponse: + status = 429 + reason = "Too Many Requests" + + result_json = { + "ok": False, + "error_code": 429, + "description": "Too Many Requests", + "parameters": {"retry_after": retry_after}, + } + return ApiTelegramException("sendMessage", _FakeResponse(), result_json) + + +def test_async_antiflood_returns_result_without_retrying_when_ok(): + from telebot.util import async_antiflood + + calls = {"n": 0} + + async def ok(value): + calls["n"] += 1 + return value + + result = asyncio.run(async_antiflood(ok, "hello")) + + assert result == "hello" + assert calls["n"] == 1 + + +def test_async_antiflood_sleeps_retry_after_then_succeeds(): + from telebot.util import async_antiflood + + calls = {"n": 0} + + async def flaky(): + calls["n"] += 1 + if calls["n"] == 1: + raise _fake_429(retry_after=7) + return "ok" + + sleeps: list[float] = [] + + async def fake_sleep(seconds): + sleeps.append(seconds) + + original_sleep = asyncio.sleep + asyncio.sleep = fake_sleep # type: ignore[assignment] + try: + result = asyncio.run(async_antiflood(flaky)) + finally: + asyncio.sleep = original_sleep # type: ignore[assignment] + + assert result == "ok" + assert calls["n"] == 2 + assert sleeps == [7], "Must sleep exactly retry_after seconds between attempts" + + +def test_async_antiflood_propagates_non_429_immediately(): + from telebot.asyncio_helper import ApiTelegramException + from telebot.util import async_antiflood + + class _FakeResponse: + status = 400 + reason = "Bad Request" + + err = ApiTelegramException( + "sendMessage", + _FakeResponse(), + {"ok": False, "error_code": 400, "description": "Bad Request"}, + ) + + calls = {"n": 0} + + async def boom(): + calls["n"] += 1 + raise err + + try: + asyncio.run(async_antiflood(boom)) + except ApiTelegramException as raised: + assert raised is err + else: + raise AssertionError("Non-429 error should propagate") + + assert calls["n"] == 1, "Non-429 errors must not trigger a retry" + + +def test_async_antiflood_final_attempt_propagates_after_budget_exhausted(): + from telebot.asyncio_helper import ApiTelegramException + from telebot.util import async_antiflood + + calls = {"n": 0} + budget = 3 + + async def always_429(): + calls["n"] += 1 + raise _fake_429(retry_after=1) + + async def fake_sleep(_seconds): + return None + + original_sleep = asyncio.sleep + asyncio.sleep = fake_sleep # type: ignore[assignment] + try: + try: + asyncio.run(async_antiflood(always_429, number_retries=budget)) + except ApiTelegramException: + pass + else: + raise AssertionError("Final 429 must surface once retries are exhausted") + finally: + asyncio.sleep = original_sleep # type: ignore[assignment] + + assert calls["n"] == budget, ( + f"Expected {budget} total attempts, got {calls['n']}" + )