Coverage for src / taipanstack / resilience / adaptive / adaptive_retry.py: 100%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Adaptive Retry — learns optimal backoff from runtime outcomes. 

3 

4Tracks recent retry outcomes in a rolling window and computes 

5the best delay for each attempt level, favouring delays that 

6historically led to successful retries. 

7""" 

8 

9from __future__ import annotations 

10 

11import logging 

12import statistics 

13import threading 

14from collections import defaultdict, deque 

15from dataclasses import dataclass 

16 

17from taipanstack.resilience.retry import RetryConfig 

18 

19logger = logging.getLogger("taipanstack.resilience.adaptive.retry") 

20 

21 

22@dataclass(frozen=True) 

23class RetryMetrics: 

24 """Snapshot of adaptive retry metrics. 

25 

26 Attributes: 

27 success_rate: Overall success rate (0.0 - 1.0). 

28 avg_delay: Average delay across all successful retries. 

29 p95_delay: 95th percentile delay. 

30 total_outcomes: Total tracked outcomes. 

31 

32 """ 

33 

34 success_rate: float 

35 avg_delay: float 

36 p95_delay: float 

37 total_outcomes: int 

38 

39 

40@dataclass(frozen=True) 

41class _Outcome: 

42 """Record of a single retry outcome.""" 

43 

44 attempt: int 

45 success: bool 

46 elapsed: float 

47 

48 

49class AdaptiveRetry: 

50 """Retry strategy that learns optimal delays from outcomes. 

51 

52 Maintains per-attempt-level statistics and returns the delay 

53 that historically led to successful retries at that attempt 

54 level. 

55 

56 Args: 

57 min_delay: Minimum delay in seconds. 

58 max_delay: Maximum delay in seconds. 

59 window_size: Number of recent outcomes to track. 

60 max_attempts: Default max attempts for ``to_retry_config()``. 

61 

62 Example: 

63 >>> ar = AdaptiveRetry(min_delay=0.1, max_delay=30.0) 

64 >>> ar.record_outcome(attempt=1, success=True, elapsed=0.5) 

65 >>> delay = ar.get_delay(attempt=1) 

66 

67 """ 

68 

69 def __init__( 

70 self, 

71 *, 

72 min_delay: float = 0.1, 

73 max_delay: float = 60.0, 

74 window_size: int = 50, 

75 max_attempts: int = 3, 

76 ) -> None: 

77 """Initialize the adaptive retry. 

78 

79 Args: 

80 min_delay: Minimum delay. 

81 max_delay: Maximum delay. 

82 window_size: Rolling window size. 

83 max_attempts: Default max attempts. 

84 

85 """ 

86 self._min_delay = min_delay 

87 self._max_delay = max_delay 

88 self._max_attempts = max_attempts 

89 self._lock = threading.Lock() 

90 

91 # Per-attempt deque of outcomes 

92 self._outcomes: deque[_Outcome] = deque(maxlen=window_size) 

93 # Per-attempt successful delays 

94 self._success_delays: dict[int, deque[float]] = defaultdict( 

95 lambda: deque(maxlen=window_size) 

96 ) 

97 

98 def record_outcome( 

99 self, 

100 attempt: int, 

101 success: bool, 

102 elapsed: float, 

103 ) -> None: 

104 """Record a retry outcome. 

105 

106 Args: 

107 attempt: Attempt number (1-indexed). 

108 success: Whether the attempt succeeded. 

109 elapsed: Time elapsed before this attempt was made. 

110 

111 """ 

112 outcome = _Outcome(attempt=attempt, success=success, elapsed=elapsed) 

113 with self._lock: 

114 self._outcomes.append(outcome) 

115 if success: 

116 self._success_delays[attempt].append(elapsed) 

117 

118 def get_delay(self, attempt: int) -> float: 

119 """Get the learned optimal delay for this attempt level. 

120 

121 If there is historical data for this attempt level, returns 

122 the median of successful delays. Otherwise uses exponential 

123 backoff with the configured bounds. 

124 

125 Args: 

126 attempt: Attempt number (1-indexed). 

127 

128 Returns: 

129 Delay in seconds. 

130 

131 """ 

132 with self._lock: 

133 delays = list(self._success_delays.get(attempt, [])) 

134 

135 if delays: 

136 # Use median of successful delays as the optimal delay 

137 learned = statistics.median(delays) 

138 return max(self._min_delay, min(learned, self._max_delay)) 

139 

140 # Fallback: exponential backoff 

141 fallback_delay = self._min_delay * (2.0 ** (attempt - 1)) 

142 return max(self._min_delay, min(fallback_delay, self._max_delay)) 

143 

144 def to_retry_config(self) -> RetryConfig: 

145 """Export current state as a standard ``RetryConfig``. 

146 

147 Uses the learned initial delay (attempt=1) if available. 

148 

149 Returns: 

150 A ``RetryConfig`` snapshot. 

151 

152 """ 

153 initial = self.get_delay(1) 

154 return RetryConfig( 

155 max_attempts=self._max_attempts, 

156 initial_delay=initial, 

157 max_delay=self._max_delay, 

158 jitter=False, 

159 ) 

160 

161 @property 

162 def metrics(self) -> RetryMetrics: 

163 """Snapshot of current adaptive retry metrics.""" 

164 with self._lock: 

165 total = len(self._outcomes) 

166 successes = sum(1 for o in self._outcomes if o.success) 

167 all_delays = [o.elapsed for o in self._outcomes] 

168 

169 success_rate = successes / total if total > 0 else 1.0 

170 

171 if all_delays: 

172 avg_delay = statistics.mean(all_delays) 

173 sorted_delays = sorted(all_delays) 

174 idx = int(len(sorted_delays) * 0.95) 

175 p95_delay = sorted_delays[min(idx, len(sorted_delays) - 1)] 

176 else: 

177 avg_delay = 0.0 

178 p95_delay = 0.0 

179 

180 return RetryMetrics( 

181 success_rate=success_rate, 

182 avg_delay=avg_delay, 

183 p95_delay=p95_delay, 

184 total_outcomes=total, 

185 )