-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathretries.py
More file actions
482 lines (376 loc) · 18.2 KB
/
retries.py
File metadata and controls
482 lines (376 loc) · 18.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import random
import threading
from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from typing import Any, Literal
from .exceptions import RetryError
from .interfaces import retries as retries_interface
from .interfaces.retries import RetryStrategy
RetryStrategyType = Literal["simple", "standard"]
@dataclass(kw_only=True, frozen=True)
class RetryStrategyOptions:
"""Options for configuring retry behavior."""
retry_mode: RetryStrategyType = "standard"
"""The retry mode to use."""
max_attempts: int | None = None
"""Maximum number of attempts (initial attempt plus retries). If None, uses the strategy's default."""
class RetryStrategyResolver:
"""Retry strategy resolver that caches retry strategies based on configuration options.
This resolver caches retry strategy instances based on their configuration to reuse existing
instances of RetryStrategy with the same settings. Uses LRU cache for thread-safe caching.
"""
async def resolve_retry_strategy(
self, *, retry_strategy: RetryStrategy | RetryStrategyOptions | None
) -> RetryStrategy:
"""Resolve a retry strategy from the provided options, using cache when possible.
:param retry_strategy: An explicitly configured retry strategy or options for creating one.
"""
if isinstance(retry_strategy, RetryStrategy):
return retry_strategy
elif retry_strategy is None:
retry_strategy = RetryStrategyOptions()
elif not isinstance(retry_strategy, RetryStrategyOptions): # type: ignore[reportUnnecessaryIsInstance]
raise TypeError(
f"retry_strategy must be RetryStrategy, RetryStrategyOptions, or None, "
f"got {type(retry_strategy).__name__}"
)
return self._create_retry_strategy(
retry_strategy.retry_mode, retry_strategy.max_attempts
)
@lru_cache
def _create_retry_strategy(
self, retry_mode: RetryStrategyType, max_attempts: int | None
) -> RetryStrategy:
kwargs = {"max_attempts": max_attempts}
filtered_kwargs: dict[str, Any] = {
k: v for k, v in kwargs.items() if v is not None
}
match retry_mode:
case "simple":
return SimpleRetryStrategy(**filtered_kwargs)
case "standard":
return StandardRetryStrategy(**filtered_kwargs)
case _:
raise ValueError(f"Unknown retry mode: {retry_mode}")
class ExponentialBackoffJitterType(Enum):
"""Jitter mode for exponential backoff.
For use with :py:class:`ExponentialRetryBackoffStrategy`.
"""
DEFAULT = 1
"""Truncated binary exponential backoff delay with equal jitter:
.. code-block:: python
capped = min(max_backoff, backoff_scale_value * 2 ** (retry_attempt - 1))
(capped / 2) + random_between(0, capped / 2)
Also known as "Equal Jitter". Similar to :py:var:`FULL` but always keep some of the
backoff and jitters by a smaller amount.
"""
NONE = 2
"""Truncated binary exponential backoff delay without jitter:
.. code-block:: python
min(max_backoff, backoff_scale_value * 2 ** (retry_attempt - 1))
"""
FULL = 3
"""Truncated binary exponential backoff delay with full jitter:
.. code-block:: python
random_between(
max_backoff,
min(max_backoff, backoff_scale_value * 2 ** (retry_attempt - 1))
)
"""
DECORRELATED = 4
"""Truncated binary exponential backoff delay with decorrelated jitter:
.. code-block:: python
min(max_backoff, random_between(backoff_scale_value, t_(i-1) * 3))
Similar to :py:var:`FULL`, but also increases the maximum jitter at each retry.
"""
class ExponentialRetryBackoffStrategy(retries_interface.RetryBackoffStrategy):
def __init__(
self,
*,
backoff_scale_value: float = 0.025,
max_backoff: float = 20,
jitter_type: ExponentialBackoffJitterType = ExponentialBackoffJitterType.DEFAULT,
random: Callable[[], float] = random.random,
):
"""Exponential backoff with optional jitter.
.. seealso:: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
:param backoff_scale_value: Factor that linearly adjusts returned backoff delay
values. See the methods ``_next_delay_*`` for the formula used to calculate the
delay for each jitter type. If set to ``None`` (the default), :py:attr:`random`
will be called to generate a value.
:param max_backoff: Upper limit for backoff delay values returned, in seconds.
:param jitter_type: Determines the formula used to apply jitter to the backoff
delay.
:param random: A callable that returns random numbers between ``0`` and ``1``.
Use the default ``random.random`` unless you require an alternate source of
randomness or a non-uniform distribution.
"""
self._backoff_scale_value = backoff_scale_value
self._max_backoff = max_backoff
self._jitter_type = jitter_type
self._random = random
self._previous_delay_seconds = self._backoff_scale_value
def compute_next_backoff_delay(self, retry_attempt: int) -> float:
"""Calculate timespan in seconds to delay before next retry.
See the methods ``_next_delay_*`` for the formula used to calculate the delay
for each jitter type for values of ``retry_attempt > 0``.
:param retry_attempt: The index of the retry attempt that is about to be made
after the delay. The initial attempt, before any retries, is index ``0``, and
will return a delay of ``0``. The first retry attempt after a failed initial
attempt is index ``1``, and so on.
"""
if retry_attempt == 0:
return 0
match self._jitter_type:
case ExponentialBackoffJitterType.NONE:
seconds = self._next_delay_no_jitter(retry_attempt=retry_attempt)
case ExponentialBackoffJitterType.DEFAULT:
seconds = self._next_delay_equal_jitter(retry_attempt=retry_attempt)
case ExponentialBackoffJitterType.FULL:
seconds = self._next_delay_full_jitter(retry_attempt=retry_attempt)
case ExponentialBackoffJitterType.DECORRELATED:
seconds = self._next_delay_decorrelated_jitter(
previous_delay=self._previous_delay_seconds
)
self._previous_delay_seconds = seconds
return seconds
def _jitter_free_uncapped_delay(self, retry_attempt: int) -> float:
"""The basic exponential delay without jitter or upper bound:
.. code-block:: python
backoff_scale_value * 2 ** (retry_attempt - 1)
"""
return self._backoff_scale_value * (2.0 ** (retry_attempt - 1))
def _next_delay_no_jitter(self, retry_attempt: int) -> float:
"""Calculates truncated binary exponential backoff delay without jitter.
Used when :py:var:`jitter_type` is :py:attr:`ExponentialBackoffJitterType.NONE`.
"""
no_jitter_delay = self._jitter_free_uncapped_delay(retry_attempt)
return min(no_jitter_delay, self._max_backoff)
def _next_delay_full_jitter(self, retry_attempt: int) -> float:
"""Calculates truncated binary exponential backoff delay with full jitter.
Used when :py:var:`jitter_type` is :py:attr:`ExponentialBackoffJitterType.FULL`.
"""
no_jitter_delay = self._jitter_free_uncapped_delay(retry_attempt)
return self._random() * min(no_jitter_delay, self._max_backoff)
def _next_delay_equal_jitter(self, retry_attempt: int) -> float:
"""Calculates truncated binary exponential backoff delay with equal jitter:
Used when :py:var:`jitter_type` is
:py:attr:`ExponentialBackoffJitterType.DEFAULT`.
"""
no_jitter_delay = self._jitter_free_uncapped_delay(retry_attempt)
return (self._random() * 0.5 + 0.5) * min(no_jitter_delay, self._max_backoff)
def _next_delay_decorrelated_jitter(self, previous_delay: float) -> float:
"""Calculates truncated binary exp. backoff delay with decorrelated jitter:
Used when :py:var:`jitter_type` is
:py:attr:`ExponentialBackoffJitterType.DECORRELATED`.
"""
return min(
self._backoff_scale_value + self._random() * previous_delay * 3,
self._max_backoff,
)
@dataclass(kw_only=True)
class SimpleRetryToken:
"""Basic retry token that stores only the attempt count and backoff strategy.
Retry tokens should always be obtained from an implementation of
:py:class:`retries_interface.RetryStrategy`.
"""
retry_count: int
"""Retry count is the total number of attempts minus the initial attempt."""
retry_delay: float
"""Delay in seconds to wait before the retry attempt."""
@property
def attempt_count(self) -> int:
"""The total number of attempts including the initial attempt and retries."""
return self.retry_count + 1
class SimpleRetryStrategy(retries_interface.RetryStrategy):
def __init__(
self,
*,
backoff_strategy: retries_interface.RetryBackoffStrategy | None = None,
max_attempts: int = 5,
):
"""Basic retry strategy that simply invokes the given backoff strategy.
:param backoff_strategy: The backoff strategy used by returned tokens to compute
the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`.
:param max_attempts: Upper limit on total number of attempts made, including
initial attempt and retries.
"""
self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy()
self.max_attempts = max_attempts
def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> SimpleRetryToken:
"""Create a base retry token for the start of a request.
:param token_scope: This argument is ignored by this retry strategy.
"""
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
return SimpleRetryToken(retry_count=0, retry_delay=retry_delay)
def refresh_retry_token_for_retry(
self,
*,
token_to_renew: retries_interface.RetryToken,
error: Exception,
) -> SimpleRetryToken:
"""Replace an existing retry token from a failed attempt with a new token.
This retry strategy always returns a token until the attempt count stored in
the new token exceeds the ``max_attempts`` value.
:param token_to_renew: The token used for the previous failed attempt.
:param error: The error that triggered the need for a retry.
:raises RetryError: If no further retry attempts are allowed.
"""
if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe:
retry_count = token_to_renew.retry_count + 1
if retry_count >= self.max_attempts:
raise RetryError(
f"Reached maximum number of allowed attempts: {self.max_attempts}"
) from error
retry_delay = self.backoff_strategy.compute_next_backoff_delay(retry_count)
return SimpleRetryToken(retry_count=retry_count, retry_delay=retry_delay)
else:
raise RetryError(f"Error is not retryable: {error}") from error
def record_success(self, *, token: retries_interface.RetryToken) -> None:
"""Not used by this retry strategy."""
def __deepcopy__(self, memo: Any) -> "SimpleRetryStrategy":
return self
class StandardRetryQuota:
"""Retry quota used by :py:class:`StandardRetryStrategy`."""
INITIAL_RETRY_TOKENS: int = 500
RETRY_COST: int = 5
NO_RETRY_INCREMENT: int = 1
TIMEOUT_RETRY_COST: int = 10
def __init__(self, initial_capacity: int = INITIAL_RETRY_TOKENS):
"""Initialize retry quota with configurable capacity.
:param initial_capacity: The initial and maximum capacity for the retry quota.
"""
self._max_capacity = initial_capacity
self._available_capacity = initial_capacity
self._lock = threading.Lock()
def acquire(self, *, error: Exception) -> int:
"""Attempt to acquire capacity for a retry attempt.
If there's insufficient capacity available, raise an exception.
Otherwise, return the amount of capacity successfully allocated.
"""
is_timeout = (
isinstance(error, retries_interface.ErrorRetryInfo)
and error.is_timeout_error
)
capacity_amount = self.TIMEOUT_RETRY_COST if is_timeout else self.RETRY_COST
with self._lock:
if capacity_amount > self._available_capacity:
raise RetryError("Retry quota exceeded")
self._available_capacity -= capacity_amount
return capacity_amount
def release(self, *, release_amount: int) -> None:
"""Release capacity back to the retry quota.
The capacity being released will be truncated if necessary to ensure the max
capacity is never exceeded.
"""
increment = self.NO_RETRY_INCREMENT if release_amount == 0 else release_amount
if self._available_capacity == self._max_capacity:
return
with self._lock:
self._available_capacity = min(
self._available_capacity + increment, self._max_capacity
)
@property
def available_capacity(self) -> int:
"""Return the amount of capacity available."""
return self._available_capacity
@dataclass(kw_only=True)
class StandardRetryToken:
retry_count: int
"""Retry count is the total number of attempts minus the initial attempt."""
retry_delay: float
"""Delay in seconds to wait before the retry attempt."""
quota_acquired: int = 0
"""The amount of quota acquired for this retry attempt."""
class StandardRetryStrategy(retries_interface.RetryStrategy):
def __init__(
self,
*,
backoff_strategy: retries_interface.RetryBackoffStrategy | None = None,
max_attempts: int = 3,
retry_quota: StandardRetryQuota | None = None,
):
"""Standard retry strategy using truncated binary exponential backoff with full
jitter.
:param backoff_strategy: The backoff strategy used by returned tokens to compute
the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`.
:param max_attempts: Upper limit on total number of attempts made, including
initial attempt and retries.
:param retry_quota: The retry quota to use for managing retry capacity. Defaults
to a new :py:class:`StandardRetryQuota` instance.
"""
if max_attempts < 0:
raise ValueError(
f"max_attempts must be a non-negative integer, got {max_attempts}"
)
self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy(
backoff_scale_value=1,
max_backoff=20,
jitter_type=ExponentialBackoffJitterType.FULL,
)
self.max_attempts = max_attempts
self._retry_quota = retry_quota or StandardRetryQuota()
def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> StandardRetryToken:
"""Create a base retry token for the start of a request.
:param token_scope: This argument is ignored by this retry strategy.
"""
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
return StandardRetryToken(retry_count=0, retry_delay=retry_delay)
def refresh_retry_token_for_retry(
self,
*,
token_to_renew: retries_interface.RetryToken,
error: Exception,
) -> StandardRetryToken:
"""Replace an existing retry token from a failed attempt with a new token.
This retry strategy always returns a token until the attempt count stored in
the new token exceeds the ``max_attempts`` value.
:param token_to_renew: The token used for the previous failed attempt.
:param error: The error that triggered the need for a retry.
:raises RetryError: If no further retry attempts are allowed.
"""
if not isinstance(token_to_renew, StandardRetryToken):
raise TypeError(
f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}"
)
if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe:
retry_count = token_to_renew.retry_count + 1
if retry_count >= self.max_attempts:
raise RetryError(
f"Reached maximum number of allowed attempts: {self.max_attempts}"
) from error
# Acquire additional quota for this retry attempt
# (may raise a RetryError if none is available)
quota_acquired = self._retry_quota.acquire(error=error)
if error.retry_after is not None:
retry_delay = error.retry_after
else:
retry_delay = self.backoff_strategy.compute_next_backoff_delay(
retry_count
)
return StandardRetryToken(
retry_count=retry_count,
retry_delay=retry_delay,
quota_acquired=quota_acquired,
)
else:
raise RetryError(f"Error is not retryable: {error}") from error
def record_success(self, *, token: retries_interface.RetryToken) -> None:
"""Release retry quota back based on the amount consumed by the last retry.
:param token: The token used for the previous successful attempt.
"""
if not isinstance(token, StandardRetryToken):
raise TypeError(
f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}"
)
self._retry_quota.release(release_amount=token.quota_acquired)
def __deepcopy__(self, memo: Any) -> "StandardRetryStrategy":
return self