Coverage for src / taipanstack / resilience / retry.py: 100%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Retry logic with exponential backoff. 

3 

4Provides decorators for automatic retry of failing operations 

5with configurable backoff strategies. Compatible with any 

6Python framework (sync and async). 

7""" 

8 

9import asyncio 

10import functools 

11import inspect 

12import logging 

13import math 

14import secrets 

15import time 

16from collections.abc import Awaitable, Callable 

17from dataclasses import dataclass 

18from types import TracebackType 

19from typing import NoReturn, ParamSpec, Protocol, TypeVar, cast, overload 

20 

21from taipanstack.core.result import Err 

22 

23P = ParamSpec("P") 

24R = TypeVar("R") 

25 

26 

27class RetryDecorator(Protocol): 

28 """Protocol for the retry decorator.""" 

29 

30 @overload 

31 def __call__(self, func: Callable[P, R]) -> Callable[P, R]: ... 

32 

33 @overload 

34 def __call__( 

35 self, func: Callable[P, Awaitable[R]] 

36 ) -> Callable[P, Awaitable[R]]: ... 

37 

38 

39logger = logging.getLogger("taipanstack.resilience.retry") 

40 

41try: 

42 import structlog as _structlog 

43 

44 _structlog_logger = _structlog.get_logger("taipanstack.resilience.retry") 

45 _HAS_STRUCTLOG = True 

46except ImportError: 

47 _structlog_logger = None 

48 _HAS_STRUCTLOG = False 

49 

50 

51def _validate_finite_or_default( 

52 obj: object, attr_name: str, default_val: float | int 

53) -> None: 

54 """Validate that an attribute is finite, falling back to a default.""" 

55 try: 

56 val = cast(float | int, getattr(obj, attr_name)) 

57 if not math.isfinite(val): 

58 raise ValueError(f"{attr_name} must be a finite number") 

59 except TypeError: 

60 object.__setattr__(obj, attr_name, default_val) 

61 

62 

63@dataclass(frozen=True) 

64class RetryConfig: 

65 """Configuration for retry behavior. 

66 

67 Attributes: 

68 max_attempts: Maximum number of retry attempts. 

69 initial_delay: Initial delay between retries in seconds. 

70 max_delay: Maximum delay between retries. 

71 exponential_base: Base for exponential backoff (2 = double each time). 

72 jitter: Whether to add random jitter to delays. 

73 jitter_factor: Maximum jitter as fraction of delay (0.1 = 10%). 

74 log_retries: Whether to emit standard log messages. 

75 on_retry: Optional callback invoked on each retry. 

76 

77 """ 

78 

79 max_attempts: int = 3 

80 initial_delay: float = 1.0 

81 max_delay: float = 60.0 

82 exponential_base: float = 2.0 

83 jitter: bool = True 

84 jitter_factor: float = 0.1 

85 log_retries: bool = True 

86 on_retry: Callable[[int, int, Exception, float], None] | None = None 

87 

88 def __post_init__(self) -> None: 

89 """Validate configuration parameters.""" 

90 _validate_finite_or_default(self, "max_attempts", 3) 

91 _validate_finite_or_default(self, "initial_delay", 1.0) 

92 _validate_finite_or_default(self, "max_delay", 60.0) 

93 _validate_finite_or_default(self, "exponential_base", 2.0) 

94 _validate_finite_or_default(self, "jitter_factor", 0.1) 

95 

96 

97class RetryError(Exception): 

98 """Raised when all retry attempts have failed.""" 

99 

100 def __init__( 

101 self, 

102 message: str, 

103 attempts: int, 

104 last_exception: Exception | None = None, 

105 ) -> None: 

106 """Initialize RetryError. 

107 

108 Args: 

109 message: Description of the retry failure. 

110 attempts: Number of attempts made. 

111 last_exception: The last exception that was raised. 

112 

113 """ 

114 self.attempts = attempts 

115 self.last_exception = last_exception 

116 super().__init__(message) 

117 

118 

119def _calculate_base_delay(attempt: int, config: RetryConfig) -> float: 

120 """Calculate base delay with exponential backoff.""" 

121 safe_attempt = max(1, attempt) 

122 try: 

123 delay = config.initial_delay * (config.exponential_base ** (safe_attempt - 1)) 

124 if not math.isfinite(delay): 

125 delay = config.max_delay 

126 except (OverflowError, TypeError): 

127 delay = config.max_delay 

128 

129 try: 

130 if not math.isfinite(delay): 

131 delay = 0.0 

132 return min(delay, config.max_delay) 

133 except TypeError: 

134 return 0.0 

135 

136 

137def _apply_jitter(delay: float, config: RetryConfig) -> float: 

138 """Apply jitter to delay.""" 

139 if not config.jitter or not math.isfinite(delay): 

140 return delay 

141 

142 try: 

143 jitter_amount = delay * config.jitter_factor 

144 if math.isfinite(jitter_amount): 

145 try: 

146 delay += secrets.SystemRandom().uniform(-jitter_amount, jitter_amount) 

147 except Exception as e: 

148 logger.warning("Failed to add jitter to delay: %s", str(e)) 

149 except (TypeError, OverflowError, ValueError, Exception) as e: 

150 logger.warning("Failed to add jitter to delay due to mutation: %s", str(e)) 

151 

152 return delay 

153 

154 

155def calculate_delay( 

156 attempt: int, 

157 config: RetryConfig, 

158) -> float: 

159 """Calculate delay before next retry. 

160 

161 Args: 

162 attempt: Current attempt number (1-indexed). 

163 config: Retry configuration. 

164 

165 Returns: 

166 Delay in seconds before next retry. 

167 

168 """ 

169 delay = _calculate_base_delay(attempt, config) 

170 delay = _apply_jitter(delay, config) 

171 

172 if not math.isfinite(delay) or delay < 0: 

173 return 0.0 

174 

175 return delay 

176 

177 

178def _log_retry_callback_failure(func_name: str, e: Exception) -> None: 

179 """Log a failure during the retry callback execution.""" 

180 if _HAS_STRUCTLOG and _structlog_logger is not None: 

181 _structlog_logger.error( 

182 "retry_callback_failed", 

183 function=func_name, 

184 error=str(e), 

185 ) 

186 else: 

187 logger.error( 

188 "Retry callback failed for %s: %s", 

189 func_name, 

190 str(e), 

191 ) 

192 

193 

194def _log_retry_attempt_fallback( 

195 func_name: str, 

196 attempt: int, 

197 exc: Exception, 

198 delay: float, 

199 config: RetryConfig, 

200) -> None: 

201 """Log the retry attempt if no callback is provided.""" 

202 if _HAS_STRUCTLOG and _structlog_logger is not None: # pragma: no branch 

203 _structlog_logger.warning( 

204 "retry_attempted", 

205 function=func_name, 

206 attempt=attempt, 

207 max_attempts=config.max_attempts, 

208 error=str(exc), 

209 delay_seconds=round(delay, 3), 

210 ) 

211 

212 

213def _invoke_retry_callback( 

214 func_name: str, 

215 attempt: int, 

216 exc: Exception, 

217 delay: float, 

218 config: RetryConfig, 

219) -> None: 

220 """Invoke the retry callback if set, or emit structured log. 

221 

222 Args: 

223 func_name: Name of the retried function. 

224 attempt: Current attempt number. 

225 exc: The exception that triggered the retry. 

226 delay: Delay in seconds before the next attempt. 

227 config: Retry configuration. 

228 

229 """ 

230 if config.on_retry is not None: 

231 try: 

232 config.on_retry(attempt, config.max_attempts, exc, delay) 

233 except Exception as e: 

234 _log_retry_callback_failure(func_name, e) 

235 else: 

236 _log_retry_attempt_fallback(func_name, attempt, exc, delay, config) 

237 

238 

239def _log_retry_attempt( 

240 func_name: str, 

241 attempt: int, 

242 exc: Exception, 

243 delay: float, 

244 config: RetryConfig, 

245) -> None: 

246 """Log a retry attempt via callback, structlog, or stdlib logger. 

247 

248 Args: 

249 func_name: Name of the retried function. 

250 attempt: Current attempt number. 

251 exc: The exception that triggered the retry. 

252 delay: Delay in seconds before the next attempt. 

253 config: Retry configuration. 

254 

255 """ 

256 if config.log_retries: 

257 logger.info( 

258 "Attempt %d/%d failed for %s: %s. Retrying in %.2f seconds...", 

259 attempt, 

260 config.max_attempts, 

261 func_name, 

262 str(exc), 

263 delay, 

264 ) 

265 

266 _invoke_retry_callback(func_name, attempt, exc, delay, config) 

267 

268 

269def _log_all_failed( 

270 func_name: str, 

271 exc: Exception, 

272 config: RetryConfig, 

273) -> None: 

274 """Log when all retry attempts have been exhausted. 

275 

276 Args: 

277 func_name: Name of the retried function. 

278 exc: The last exception raised. 

279 config: Retry configuration. 

280 

281 """ 

282 if config.log_retries: 

283 logger.warning( 

284 "All %d attempts failed for %s: %s", 

285 config.max_attempts, 

286 func_name, 

287 str(exc), 

288 ) 

289 

290 

291def _raise_retry_error( 

292 func_name: str, 

293 max_attempts: int, 

294 reraise: bool, 

295 last_exception: Exception | None, 

296) -> NoReturn: 

297 """Raise a RetryError after all attempts fail. 

298 

299 Args: 

300 func_name: Name of the retried function. 

301 max_attempts: Number of attempts made. 

302 reraise: Whether to reraise the original exception. 

303 last_exception: The last exception that was raised. 

304 

305 Raises: 

306 RetryError: The wrapped or unwrapped exception. 

307 

308 """ 

309 if reraise and last_exception is not None: 

310 raise RetryError( 

311 f"All {max_attempts} attempts failed for {func_name}", 

312 attempts=max_attempts, 

313 last_exception=last_exception, 

314 ) from last_exception 

315 

316 raise RetryError( 

317 f"All {max_attempts} attempts failed for {func_name}", 

318 attempts=max_attempts, 

319 last_exception=last_exception, 

320 ) 

321 

322 

323def retry( 

324 *, 

325 max_attempts: int = 3, 

326 initial_delay: float = 1.0, 

327 max_delay: float = 60.0, 

328 exponential_base: float = 2.0, 

329 jitter: bool = True, 

330 on: tuple[type[Exception], ...] = (Exception,), 

331 reraise: bool = True, 

332 log_retries: bool = True, 

333 on_retry: Callable[[int, int, Exception, float], None] | None = None, 

334) -> RetryDecorator: 

335 """Retry a sync or async function with exponential backoff. 

336 

337 Automatically retries the decorated function when specified 

338 exceptions are raised, with configurable backoff strategy. 

339 Detects coroutine functions and preserves their async nature. 

340 

341 Args: 

342 max_attempts: Maximum number of retry attempts. 

343 initial_delay: Initial delay between retries in seconds. 

344 max_delay: Maximum delay between retries. 

345 exponential_base: Base for exponential backoff. 

346 jitter: Whether to add random jitter to delays. 

347 on: Exception types to retry on. 

348 reraise: Whether to reraise the last exception on failure. 

349 log_retries: Whether to log retry attempts. 

350 on_retry: Optional callback invoked on each retry with 

351 (attempt, max_attempts, exception, delay). Useful for 

352 custom monitoring or metrics collection. 

353 

354 Returns: 

355 Decorated function with retry logic. 

356 

357 Example: 

358 >>> @retry(max_attempts=3, on=(ConnectionError, TimeoutError)) 

359 ... def fetch_data(url: str) -> dict: 

360 ... return requests.get(url, timeout=10).json() 

361 

362 >>> @retry(max_attempts=3, on_retry=lambda a, m, e, d: print(f"Retry {a}/{m}")) 

363 ... def fragile_operation() -> str: 

364 ... return do_something() 

365 

366 """ 

367 config = RetryConfig( 

368 max_attempts=max_attempts, 

369 initial_delay=initial_delay, 

370 max_delay=max_delay, 

371 exponential_base=exponential_base, 

372 jitter=jitter, 

373 log_retries=log_retries, 

374 on_retry=on_retry, 

375 ) 

376 

377 def decorator( 

378 func: Callable[P, R] | Callable[P, Awaitable[R]], 

379 ) -> Callable[P, R] | Callable[P, Awaitable[R]]: 

380 if inspect.iscoroutinefunction(func): 

381 func_coro = cast(Callable[P, Awaitable[R]], func) 

382 

383 @functools.wraps(func_coro) 

384 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 

385 last_exception: Exception | None = None 

386 last_result: R | None = None 

387 

388 for attempt in range(1, max_attempts + 1): # pragma: no branch 

389 last_result = None 

390 try: 

391 last_result = await func_coro(*args, **kwargs) 

392 if isinstance(last_result, Err): 

393 err_val = last_result.unwrap_err() 

394 if isinstance(err_val, on): 

395 raise err_val 

396 return last_result 

397 except on as e: 

398 last_exception = e 

399 

400 if attempt == max_attempts: 

401 _log_all_failed( 

402 func_coro.__name__, 

403 e, 

404 config, 

405 ) 

406 break 

407 

408 delay = calculate_delay(attempt, config) 

409 _log_retry_attempt( 

410 func_coro.__name__, 

411 attempt, 

412 e, 

413 delay, 

414 config, 

415 ) 

416 await asyncio.sleep(min(delay, 3600.0)) 

417 

418 if last_result is not None and isinstance(last_result, Err): 

419 return cast(R, last_result) 

420 _raise_retry_error( 

421 func_coro.__name__, 

422 max_attempts, 

423 reraise, 

424 last_exception, 

425 ) 

426 

427 return async_wrapper 

428 

429 func_sync = cast(Callable[P, R], func) 

430 

431 @functools.wraps(func_sync) 

432 def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 

433 last_exception: Exception | None = None 

434 last_result: R | None = None 

435 

436 for attempt in range(1, max_attempts + 1): # pragma: no branch 

437 last_result = None 

438 try: 

439 last_result = func_sync(*args, **kwargs) 

440 if isinstance(last_result, Err): 

441 err_val = last_result.unwrap_err() 

442 if isinstance(err_val, on): 

443 raise err_val 

444 return last_result 

445 except on as e: 

446 last_exception = e 

447 

448 if attempt == max_attempts: 

449 _log_all_failed( 

450 func_sync.__name__, 

451 e, 

452 config, 

453 ) 

454 break 

455 

456 # Calculate delay and wait 

457 delay = calculate_delay(attempt, config) 

458 _log_retry_attempt( 

459 func_sync.__name__, 

460 attempt, 

461 e, 

462 delay, 

463 config, 

464 ) 

465 time.sleep(min(delay, 3600.0)) 

466 

467 if last_result is not None and isinstance(last_result, Err): 

468 return cast(R, last_result) 

469 _raise_retry_error( 

470 func_sync.__name__, 

471 max_attempts, 

472 reraise, 

473 last_exception, 

474 ) 

475 

476 return wrapper 

477 

478 return cast(RetryDecorator, decorator) 

479 

480 

481def retry_on_exception( 

482 exception_types: tuple[type[Exception], ...], 

483 max_attempts: int = 3, 

484) -> RetryDecorator: 

485 """Retry on specific exceptions. 

486 

487 A simpler alternative to the full retry decorator when you 

488 just need basic retry functionality. 

489 

490 Args: 

491 exception_types: Exception types to retry on. 

492 max_attempts: Maximum number of attempts. 

493 

494 Returns: 

495 Decorated function with retry logic. 

496 

497 Example: 

498 >>> @retry_on_exception((ValueError,), max_attempts=2) 

499 ... def parse_data(data: str) -> dict: 

500 ... return json.loads(data) 

501 

502 """ 

503 return retry( 

504 max_attempts=max_attempts, 

505 on=exception_types, 

506 jitter=False, 

507 log_retries=False, 

508 ) 

509 

510 

511class Retrier: 

512 """Context manager for retry logic. 

513 

514 Provides a context manager interface for retry logic when 

515 decorators are not suitable. 

516 

517 Example: 

518 >>> retrier = Retrier(max_attempts=3, on=(ConnectionError,)) 

519 >>> with retrier: 

520 ... result = some_operation() 

521 

522 """ 

523 

524 def __init__( 

525 self, 

526 *, 

527 max_attempts: int = 3, 

528 initial_delay: float = 1.0, 

529 max_delay: float = 60.0, 

530 on: tuple[type[Exception], ...] = (Exception,), 

531 ) -> None: 

532 """Initialize Retrier. 

533 

534 Args: 

535 max_attempts: Maximum retry attempts. 

536 initial_delay: Initial delay between retries. 

537 max_delay: Maximum delay between retries. 

538 on: Exception types to retry on. 

539 

540 """ 

541 self.config = RetryConfig( 

542 max_attempts=max_attempts, 

543 initial_delay=initial_delay, 

544 max_delay=max_delay, 

545 ) 

546 self.exception_types = on 

547 self.attempt = 0 

548 self.last_exception: Exception | None = None 

549 

550 def __enter__(self) -> "Retrier": 

551 """Enter the retry context.""" 

552 return self 

553 

554 def __exit__( 

555 self, 

556 exc_type: type[BaseException] | None, 

557 exc_val: BaseException | None, 

558 _exc_tb: TracebackType | None, 

559 ) -> bool: 

560 """Exit the retry context. 

561 

562 Returns True to suppress the exception if we should retry, 

563 False to let it propagate. 

564 """ 

565 if exc_type is None: 

566 return False # No exception, exit normally 

567 

568 if not issubclass(exc_type, self.exception_types): 

569 return False # Exception type not in retry list 

570 

571 # Safe cast: issubclass guard above ensures exc_val is Exception 

572 self.last_exception = exc_val if isinstance(exc_val, Exception) else None 

573 try: 

574 if not math.isfinite(self.attempt): 

575 return False 

576 self.attempt += 1 

577 except TypeError: 

578 return False 

579 

580 if self.attempt >= self.config.max_attempts: 

581 return False # Max attempts reached, propagate exception 

582 

583 # Calculate delay and wait 

584 delay = calculate_delay(self.attempt, self.config) 

585 time.sleep(min(delay, 3600.0)) 

586 

587 return True # Suppress exception and retry