Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion telebot/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,50 @@ def antiflood(function: Callable, *args, number_retries=5, **kwargs):
return function(*args, **kwargs)


async def async_antiflood(function: Callable, *args, number_retries=5, **kwargs):
"""
Async counterpart to :func:`antiflood`. Awaits ``function(*args, **kwargs)``
in a retry loop that honours Telegram's ``retry_after`` parameter on 429
responses. Use with :obj:`telebot.async_telebot.AsyncTeleBot` calls.

Example:

.. code-block:: python3

from telebot.util import async_antiflood

for chat_id in chat_id_list:
msg = await async_antiflood(bot.send_message, chat_id, text)

:param function: The awaitable callable to invoke. Called fresh on each
attempt, so pass the function itself (for example ``bot.send_message``),
not a pre-awaited coroutine.
:type function: :obj:`Callable`

:param number_retries: Total number of attempts. After ``number_retries - 1``
retried 429 responses the final call is issued without the 429 guard,
so its exception will propagate.
:type number_retries: :obj:`int`

:param args: Positional arguments forwarded to ``function``.
:param kwargs: Keyword arguments forwarded to ``function``.

:return: Whatever ``function`` returns on the first successful call.
"""
from telebot.asyncio_helper import ApiTelegramException
import asyncio

for _ in range(number_retries - 1):
try:
return await function(*args, **kwargs)
except ApiTelegramException as ex:
if ex.error_code == 429:
await asyncio.sleep(ex.result_json['parameters']['retry_after'])
else:
raise
return await function(*args, **kwargs)


def parse_web_app_data(token: str, raw_init_data: str):
"""
Parses web app data.
Expand Down Expand Up @@ -719,7 +763,7 @@ def extract_bot_id(token) -> Union[int, None]:
"chunks", "generate_random_token", "pil_image_to_file",
"is_command", "extract_command", "extract_arguments",
"split_string", "smart_split", "escape", "user_link", "quick_markup",
"antiflood", "parse_web_app_data", "validate_web_app_data",
"antiflood", "async_antiflood", "parse_web_app_data", "validate_web_app_data",
"or_set", "or_clear", "orify", "OrEvent", "per_thread",
"webhook_google_functions", "validate_token", "extract_bot_id"
)
120 changes: 120 additions & 0 deletions tests/test_async_telebot.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,123 @@ async def driver():
assert bot._pending_tasks == set(), (
"Completed processing tasks must be discarded from _pending_tasks"
)


def _fake_429(retry_after: int):
from telebot.asyncio_helper import ApiTelegramException

class _FakeResponse:
status = 429
reason = "Too Many Requests"

result_json = {
"ok": False,
"error_code": 429,
"description": "Too Many Requests",
"parameters": {"retry_after": retry_after},
}
return ApiTelegramException("sendMessage", _FakeResponse(), result_json)


def test_async_antiflood_returns_result_without_retrying_when_ok():
from telebot.util import async_antiflood

calls = {"n": 0}

async def ok(value):
calls["n"] += 1
return value

result = asyncio.run(async_antiflood(ok, "hello"))

assert result == "hello"
assert calls["n"] == 1


def test_async_antiflood_sleeps_retry_after_then_succeeds():
from telebot.util import async_antiflood

calls = {"n": 0}

async def flaky():
calls["n"] += 1
if calls["n"] == 1:
raise _fake_429(retry_after=7)
return "ok"

sleeps: list[float] = []

async def fake_sleep(seconds):
sleeps.append(seconds)

original_sleep = asyncio.sleep
asyncio.sleep = fake_sleep # type: ignore[assignment]
try:
result = asyncio.run(async_antiflood(flaky))
finally:
asyncio.sleep = original_sleep # type: ignore[assignment]

assert result == "ok"
assert calls["n"] == 2
assert sleeps == [7], "Must sleep exactly retry_after seconds between attempts"


def test_async_antiflood_propagates_non_429_immediately():
from telebot.asyncio_helper import ApiTelegramException
from telebot.util import async_antiflood

class _FakeResponse:
status = 400
reason = "Bad Request"

err = ApiTelegramException(
"sendMessage",
_FakeResponse(),
{"ok": False, "error_code": 400, "description": "Bad Request"},
)

calls = {"n": 0}

async def boom():
calls["n"] += 1
raise err

try:
asyncio.run(async_antiflood(boom))
except ApiTelegramException as raised:
assert raised is err
else:
raise AssertionError("Non-429 error should propagate")

assert calls["n"] == 1, "Non-429 errors must not trigger a retry"


def test_async_antiflood_final_attempt_propagates_after_budget_exhausted():
from telebot.asyncio_helper import ApiTelegramException
from telebot.util import async_antiflood

calls = {"n": 0}
budget = 3

async def always_429():
calls["n"] += 1
raise _fake_429(retry_after=1)

async def fake_sleep(_seconds):
return None

original_sleep = asyncio.sleep
asyncio.sleep = fake_sleep # type: ignore[assignment]
try:
try:
asyncio.run(async_antiflood(always_429, number_retries=budget))
except ApiTelegramException:
pass
else:
raise AssertionError("Final 429 must surface once retries are exhausted")
finally:
asyncio.sleep = original_sleep # type: ignore[assignment]

assert calls["n"] == budget, (
f"Expected {budget} total attempts, got {calls['n']}"
)
Loading