Coverage for src / taipanstack / resilience / retry.py: 100%
193 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Retry logic with exponential backoff.
4Provides decorators for automatic retry of failing operations
5with configurable backoff strategies. Compatible with any
6Python framework (sync and async).
7"""
9import asyncio
10import functools
11import inspect
12import logging
13import math
14import secrets
15import time
16from collections.abc import Awaitable, Callable
17from dataclasses import dataclass
18from types import TracebackType
19from typing import NoReturn, ParamSpec, Protocol, TypeVar, cast, overload
21from taipanstack.core.result import Err
23P = ParamSpec("P")
24R = TypeVar("R")
27class RetryDecorator(Protocol):
28 """Protocol for the retry decorator."""
30 @overload
31 def __call__(self, func: Callable[P, R]) -> Callable[P, R]: ...
33 @overload
34 def __call__(
35 self, func: Callable[P, Awaitable[R]]
36 ) -> Callable[P, Awaitable[R]]: ...
39logger = logging.getLogger("taipanstack.resilience.retry")
41try:
42 import structlog as _structlog
44 _structlog_logger = _structlog.get_logger("taipanstack.resilience.retry")
45 _HAS_STRUCTLOG = True
46except ImportError:
47 _structlog_logger = None
48 _HAS_STRUCTLOG = False
51def _validate_finite_or_default(
52 obj: object, attr_name: str, default_val: float | int
53) -> None:
54 """Validate that an attribute is finite, falling back to a default."""
55 try:
56 val = cast(float | int, getattr(obj, attr_name))
57 if not math.isfinite(val):
58 raise ValueError(f"{attr_name} must be a finite number")
59 except TypeError:
60 object.__setattr__(obj, attr_name, default_val)
63@dataclass(frozen=True)
64class RetryConfig:
65 """Configuration for retry behavior.
67 Attributes:
68 max_attempts: Maximum number of retry attempts.
69 initial_delay: Initial delay between retries in seconds.
70 max_delay: Maximum delay between retries.
71 exponential_base: Base for exponential backoff (2 = double each time).
72 jitter: Whether to add random jitter to delays.
73 jitter_factor: Maximum jitter as fraction of delay (0.1 = 10%).
74 log_retries: Whether to emit standard log messages.
75 on_retry: Optional callback invoked on each retry.
77 """
79 max_attempts: int = 3
80 initial_delay: float = 1.0
81 max_delay: float = 60.0
82 exponential_base: float = 2.0
83 jitter: bool = True
84 jitter_factor: float = 0.1
85 log_retries: bool = True
86 on_retry: Callable[[int, int, Exception, float], None] | None = None
88 def __post_init__(self) -> None:
89 """Validate configuration parameters."""
90 _validate_finite_or_default(self, "max_attempts", 3)
91 _validate_finite_or_default(self, "initial_delay", 1.0)
92 _validate_finite_or_default(self, "max_delay", 60.0)
93 _validate_finite_or_default(self, "exponential_base", 2.0)
94 _validate_finite_or_default(self, "jitter_factor", 0.1)
97class RetryError(Exception):
98 """Raised when all retry attempts have failed."""
100 def __init__(
101 self,
102 message: str,
103 attempts: int,
104 last_exception: Exception | None = None,
105 ) -> None:
106 """Initialize RetryError.
108 Args:
109 message: Description of the retry failure.
110 attempts: Number of attempts made.
111 last_exception: The last exception that was raised.
113 """
114 self.attempts = attempts
115 self.last_exception = last_exception
116 super().__init__(message)
119def _calculate_base_delay(attempt: int, config: RetryConfig) -> float:
120 """Calculate base delay with exponential backoff."""
121 safe_attempt = max(1, attempt)
122 try:
123 delay = config.initial_delay * (config.exponential_base ** (safe_attempt - 1))
124 if not math.isfinite(delay):
125 delay = config.max_delay
126 except (OverflowError, TypeError):
127 delay = config.max_delay
129 try:
130 if not math.isfinite(delay):
131 delay = 0.0
132 return min(delay, config.max_delay)
133 except TypeError:
134 return 0.0
137def _apply_jitter(delay: float, config: RetryConfig) -> float:
138 """Apply jitter to delay."""
139 if not config.jitter or not math.isfinite(delay):
140 return delay
142 try:
143 jitter_amount = delay * config.jitter_factor
144 if math.isfinite(jitter_amount):
145 try:
146 delay += secrets.SystemRandom().uniform(-jitter_amount, jitter_amount)
147 except Exception as e:
148 logger.warning("Failed to add jitter to delay: %s", str(e))
149 except (TypeError, OverflowError, ValueError, Exception) as e:
150 logger.warning("Failed to add jitter to delay due to mutation: %s", str(e))
152 return delay
155def calculate_delay(
156 attempt: int,
157 config: RetryConfig,
158) -> float:
159 """Calculate delay before next retry.
161 Args:
162 attempt: Current attempt number (1-indexed).
163 config: Retry configuration.
165 Returns:
166 Delay in seconds before next retry.
168 """
169 delay = _calculate_base_delay(attempt, config)
170 delay = _apply_jitter(delay, config)
172 if not math.isfinite(delay) or delay < 0:
173 return 0.0
175 return delay
178def _log_retry_callback_failure(func_name: str, e: Exception) -> None:
179 """Log a failure during the retry callback execution."""
180 if _HAS_STRUCTLOG and _structlog_logger is not None:
181 _structlog_logger.error(
182 "retry_callback_failed",
183 function=func_name,
184 error=str(e),
185 )
186 else:
187 logger.error(
188 "Retry callback failed for %s: %s",
189 func_name,
190 str(e),
191 )
194def _log_retry_attempt_fallback(
195 func_name: str,
196 attempt: int,
197 exc: Exception,
198 delay: float,
199 config: RetryConfig,
200) -> None:
201 """Log the retry attempt if no callback is provided."""
202 if _HAS_STRUCTLOG and _structlog_logger is not None: # pragma: no branch
203 _structlog_logger.warning(
204 "retry_attempted",
205 function=func_name,
206 attempt=attempt,
207 max_attempts=config.max_attempts,
208 error=str(exc),
209 delay_seconds=round(delay, 3),
210 )
213def _invoke_retry_callback(
214 func_name: str,
215 attempt: int,
216 exc: Exception,
217 delay: float,
218 config: RetryConfig,
219) -> None:
220 """Invoke the retry callback if set, or emit structured log.
222 Args:
223 func_name: Name of the retried function.
224 attempt: Current attempt number.
225 exc: The exception that triggered the retry.
226 delay: Delay in seconds before the next attempt.
227 config: Retry configuration.
229 """
230 if config.on_retry is not None:
231 try:
232 config.on_retry(attempt, config.max_attempts, exc, delay)
233 except Exception as e:
234 _log_retry_callback_failure(func_name, e)
235 else:
236 _log_retry_attempt_fallback(func_name, attempt, exc, delay, config)
239def _log_retry_attempt(
240 func_name: str,
241 attempt: int,
242 exc: Exception,
243 delay: float,
244 config: RetryConfig,
245) -> None:
246 """Log a retry attempt via callback, structlog, or stdlib logger.
248 Args:
249 func_name: Name of the retried function.
250 attempt: Current attempt number.
251 exc: The exception that triggered the retry.
252 delay: Delay in seconds before the next attempt.
253 config: Retry configuration.
255 """
256 if config.log_retries:
257 logger.info(
258 "Attempt %d/%d failed for %s: %s. Retrying in %.2f seconds...",
259 attempt,
260 config.max_attempts,
261 func_name,
262 str(exc),
263 delay,
264 )
266 _invoke_retry_callback(func_name, attempt, exc, delay, config)
269def _log_all_failed(
270 func_name: str,
271 exc: Exception,
272 config: RetryConfig,
273) -> None:
274 """Log when all retry attempts have been exhausted.
276 Args:
277 func_name: Name of the retried function.
278 exc: The last exception raised.
279 config: Retry configuration.
281 """
282 if config.log_retries:
283 logger.warning(
284 "All %d attempts failed for %s: %s",
285 config.max_attempts,
286 func_name,
287 str(exc),
288 )
291def _raise_retry_error(
292 func_name: str,
293 max_attempts: int,
294 reraise: bool,
295 last_exception: Exception | None,
296) -> NoReturn:
297 """Raise a RetryError after all attempts fail.
299 Args:
300 func_name: Name of the retried function.
301 max_attempts: Number of attempts made.
302 reraise: Whether to reraise the original exception.
303 last_exception: The last exception that was raised.
305 Raises:
306 RetryError: The wrapped or unwrapped exception.
308 """
309 if reraise and last_exception is not None:
310 raise RetryError(
311 f"All {max_attempts} attempts failed for {func_name}",
312 attempts=max_attempts,
313 last_exception=last_exception,
314 ) from last_exception
316 raise RetryError(
317 f"All {max_attempts} attempts failed for {func_name}",
318 attempts=max_attempts,
319 last_exception=last_exception,
320 )
323def retry(
324 *,
325 max_attempts: int = 3,
326 initial_delay: float = 1.0,
327 max_delay: float = 60.0,
328 exponential_base: float = 2.0,
329 jitter: bool = True,
330 on: tuple[type[Exception], ...] = (Exception,),
331 reraise: bool = True,
332 log_retries: bool = True,
333 on_retry: Callable[[int, int, Exception, float], None] | None = None,
334) -> RetryDecorator:
335 """Retry a sync or async function with exponential backoff.
337 Automatically retries the decorated function when specified
338 exceptions are raised, with configurable backoff strategy.
339 Detects coroutine functions and preserves their async nature.
341 Args:
342 max_attempts: Maximum number of retry attempts.
343 initial_delay: Initial delay between retries in seconds.
344 max_delay: Maximum delay between retries.
345 exponential_base: Base for exponential backoff.
346 jitter: Whether to add random jitter to delays.
347 on: Exception types to retry on.
348 reraise: Whether to reraise the last exception on failure.
349 log_retries: Whether to log retry attempts.
350 on_retry: Optional callback invoked on each retry with
351 (attempt, max_attempts, exception, delay). Useful for
352 custom monitoring or metrics collection.
354 Returns:
355 Decorated function with retry logic.
357 Example:
358 >>> @retry(max_attempts=3, on=(ConnectionError, TimeoutError))
359 ... def fetch_data(url: str) -> dict:
360 ... return requests.get(url, timeout=10).json()
362 >>> @retry(max_attempts=3, on_retry=lambda a, m, e, d: print(f"Retry {a}/{m}"))
363 ... def fragile_operation() -> str:
364 ... return do_something()
366 """
367 config = RetryConfig(
368 max_attempts=max_attempts,
369 initial_delay=initial_delay,
370 max_delay=max_delay,
371 exponential_base=exponential_base,
372 jitter=jitter,
373 log_retries=log_retries,
374 on_retry=on_retry,
375 )
377 def decorator(
378 func: Callable[P, R] | Callable[P, Awaitable[R]],
379 ) -> Callable[P, R] | Callable[P, Awaitable[R]]:
380 if inspect.iscoroutinefunction(func):
381 func_coro = cast(Callable[P, Awaitable[R]], func)
383 @functools.wraps(func_coro)
384 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
385 last_exception: Exception | None = None
386 last_result: R | None = None
388 for attempt in range(1, max_attempts + 1): # pragma: no branch
389 last_result = None
390 try:
391 last_result = await func_coro(*args, **kwargs)
392 if isinstance(last_result, Err):
393 err_val = last_result.unwrap_err()
394 if isinstance(err_val, on):
395 raise err_val
396 return last_result
397 except on as e:
398 last_exception = e
400 if attempt == max_attempts:
401 _log_all_failed(
402 func_coro.__name__,
403 e,
404 config,
405 )
406 break
408 delay = calculate_delay(attempt, config)
409 _log_retry_attempt(
410 func_coro.__name__,
411 attempt,
412 e,
413 delay,
414 config,
415 )
416 await asyncio.sleep(min(delay, 3600.0))
418 if last_result is not None and isinstance(last_result, Err):
419 return cast(R, last_result)
420 _raise_retry_error(
421 func_coro.__name__,
422 max_attempts,
423 reraise,
424 last_exception,
425 )
427 return async_wrapper
429 func_sync = cast(Callable[P, R], func)
431 @functools.wraps(func_sync)
432 def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
433 last_exception: Exception | None = None
434 last_result: R | None = None
436 for attempt in range(1, max_attempts + 1): # pragma: no branch
437 last_result = None
438 try:
439 last_result = func_sync(*args, **kwargs)
440 if isinstance(last_result, Err):
441 err_val = last_result.unwrap_err()
442 if isinstance(err_val, on):
443 raise err_val
444 return last_result
445 except on as e:
446 last_exception = e
448 if attempt == max_attempts:
449 _log_all_failed(
450 func_sync.__name__,
451 e,
452 config,
453 )
454 break
456 # Calculate delay and wait
457 delay = calculate_delay(attempt, config)
458 _log_retry_attempt(
459 func_sync.__name__,
460 attempt,
461 e,
462 delay,
463 config,
464 )
465 time.sleep(min(delay, 3600.0))
467 if last_result is not None and isinstance(last_result, Err):
468 return cast(R, last_result)
469 _raise_retry_error(
470 func_sync.__name__,
471 max_attempts,
472 reraise,
473 last_exception,
474 )
476 return wrapper
478 return cast(RetryDecorator, decorator)
481def retry_on_exception(
482 exception_types: tuple[type[Exception], ...],
483 max_attempts: int = 3,
484) -> RetryDecorator:
485 """Retry on specific exceptions.
487 A simpler alternative to the full retry decorator when you
488 just need basic retry functionality.
490 Args:
491 exception_types: Exception types to retry on.
492 max_attempts: Maximum number of attempts.
494 Returns:
495 Decorated function with retry logic.
497 Example:
498 >>> @retry_on_exception((ValueError,), max_attempts=2)
499 ... def parse_data(data: str) -> dict:
500 ... return json.loads(data)
502 """
503 return retry(
504 max_attempts=max_attempts,
505 on=exception_types,
506 jitter=False,
507 log_retries=False,
508 )
511class Retrier:
512 """Context manager for retry logic.
514 Provides a context manager interface for retry logic when
515 decorators are not suitable.
517 Example:
518 >>> retrier = Retrier(max_attempts=3, on=(ConnectionError,))
519 >>> with retrier:
520 ... result = some_operation()
522 """
524 def __init__(
525 self,
526 *,
527 max_attempts: int = 3,
528 initial_delay: float = 1.0,
529 max_delay: float = 60.0,
530 on: tuple[type[Exception], ...] = (Exception,),
531 ) -> None:
532 """Initialize Retrier.
534 Args:
535 max_attempts: Maximum retry attempts.
536 initial_delay: Initial delay between retries.
537 max_delay: Maximum delay between retries.
538 on: Exception types to retry on.
540 """
541 self.config = RetryConfig(
542 max_attempts=max_attempts,
543 initial_delay=initial_delay,
544 max_delay=max_delay,
545 )
546 self.exception_types = on
547 self.attempt = 0
548 self.last_exception: Exception | None = None
550 def __enter__(self) -> "Retrier":
551 """Enter the retry context."""
552 return self
554 def __exit__(
555 self,
556 exc_type: type[BaseException] | None,
557 exc_val: BaseException | None,
558 _exc_tb: TracebackType | None,
559 ) -> bool:
560 """Exit the retry context.
562 Returns True to suppress the exception if we should retry,
563 False to let it propagate.
564 """
565 if exc_type is None:
566 return False # No exception, exit normally
568 if not issubclass(exc_type, self.exception_types):
569 return False # Exception type not in retry list
571 # Safe cast: issubclass guard above ensures exc_val is Exception
572 self.last_exception = exc_val if isinstance(exc_val, Exception) else None
573 try:
574 if not math.isfinite(self.attempt):
575 return False
576 self.attempt += 1
577 except TypeError:
578 return False
580 if self.attempt >= self.config.max_attempts:
581 return False # Max attempts reached, propagate exception
583 # Calculate delay and wait
584 delay = calculate_delay(self.attempt, self.config)
585 time.sleep(min(delay, 3600.0))
587 return True # Suppress exception and retry