Coverage for src / taipanstack / resilience / adaptive / adaptive_retry.py: 100%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Adaptive Retry — learns optimal backoff from runtime outcomes.
4Tracks recent retry outcomes in a rolling window and computes
5the best delay for each attempt level, favouring delays that
6historically led to successful retries.
7"""
9from __future__ import annotations
11import logging
12import statistics
13import threading
14from collections import defaultdict, deque
15from dataclasses import dataclass
17from taipanstack.resilience.retry import RetryConfig
19logger = logging.getLogger("taipanstack.resilience.adaptive.retry")
22@dataclass(frozen=True)
23class RetryMetrics:
24 """Snapshot of adaptive retry metrics.
26 Attributes:
27 success_rate: Overall success rate (0.0 - 1.0).
28 avg_delay: Average delay across all successful retries.
29 p95_delay: 95th percentile delay.
30 total_outcomes: Total tracked outcomes.
32 """
34 success_rate: float
35 avg_delay: float
36 p95_delay: float
37 total_outcomes: int
40@dataclass(frozen=True)
41class _Outcome:
42 """Record of a single retry outcome."""
44 attempt: int
45 success: bool
46 elapsed: float
49class AdaptiveRetry:
50 """Retry strategy that learns optimal delays from outcomes.
52 Maintains per-attempt-level statistics and returns the delay
53 that historically led to successful retries at that attempt
54 level.
56 Args:
57 min_delay: Minimum delay in seconds.
58 max_delay: Maximum delay in seconds.
59 window_size: Number of recent outcomes to track.
60 max_attempts: Default max attempts for ``to_retry_config()``.
62 Example:
63 >>> ar = AdaptiveRetry(min_delay=0.1, max_delay=30.0)
64 >>> ar.record_outcome(attempt=1, success=True, elapsed=0.5)
65 >>> delay = ar.get_delay(attempt=1)
67 """
69 def __init__(
70 self,
71 *,
72 min_delay: float = 0.1,
73 max_delay: float = 60.0,
74 window_size: int = 50,
75 max_attempts: int = 3,
76 ) -> None:
77 """Initialize the adaptive retry.
79 Args:
80 min_delay: Minimum delay.
81 max_delay: Maximum delay.
82 window_size: Rolling window size.
83 max_attempts: Default max attempts.
85 """
86 self._min_delay = min_delay
87 self._max_delay = max_delay
88 self._max_attempts = max_attempts
89 self._lock = threading.Lock()
91 # Per-attempt deque of outcomes
92 self._outcomes: deque[_Outcome] = deque(maxlen=window_size)
93 # Per-attempt successful delays
94 self._success_delays: dict[int, deque[float]] = defaultdict(
95 lambda: deque(maxlen=window_size)
96 )
98 def record_outcome(
99 self,
100 attempt: int,
101 success: bool,
102 elapsed: float,
103 ) -> None:
104 """Record a retry outcome.
106 Args:
107 attempt: Attempt number (1-indexed).
108 success: Whether the attempt succeeded.
109 elapsed: Time elapsed before this attempt was made.
111 """
112 outcome = _Outcome(attempt=attempt, success=success, elapsed=elapsed)
113 with self._lock:
114 self._outcomes.append(outcome)
115 if success:
116 self._success_delays[attempt].append(elapsed)
118 def get_delay(self, attempt: int) -> float:
119 """Get the learned optimal delay for this attempt level.
121 If there is historical data for this attempt level, returns
122 the median of successful delays. Otherwise uses exponential
123 backoff with the configured bounds.
125 Args:
126 attempt: Attempt number (1-indexed).
128 Returns:
129 Delay in seconds.
131 """
132 with self._lock:
133 delays = list(self._success_delays.get(attempt, []))
135 if delays:
136 # Use median of successful delays as the optimal delay
137 learned = statistics.median(delays)
138 return max(self._min_delay, min(learned, self._max_delay))
140 # Fallback: exponential backoff
141 fallback_delay = self._min_delay * (2.0 ** (attempt - 1))
142 return max(self._min_delay, min(fallback_delay, self._max_delay))
144 def to_retry_config(self) -> RetryConfig:
145 """Export current state as a standard ``RetryConfig``.
147 Uses the learned initial delay (attempt=1) if available.
149 Returns:
150 A ``RetryConfig`` snapshot.
152 """
153 initial = self.get_delay(1)
154 return RetryConfig(
155 max_attempts=self._max_attempts,
156 initial_delay=initial,
157 max_delay=self._max_delay,
158 jitter=False,
159 )
161 @property
162 def metrics(self) -> RetryMetrics:
163 """Snapshot of current adaptive retry metrics."""
164 with self._lock:
165 total = len(self._outcomes)
166 successes = sum(1 for o in self._outcomes if o.success)
167 all_delays = [o.elapsed for o in self._outcomes]
169 success_rate = successes / total if total > 0 else 1.0
171 if all_delays:
172 avg_delay = statistics.mean(all_delays)
173 sorted_delays = sorted(all_delays)
174 idx = int(len(sorted_delays) * 0.95)
175 p95_delay = sorted_delays[min(idx, len(sorted_delays) - 1)]
176 else:
177 avg_delay = 0.0
178 p95_delay = 0.0
180 return RetryMetrics(
181 success_rate=success_rate,
182 avg_delay=avg_delay,
183 p95_delay=p95_delay,
184 total_outcomes=total,
185 )