Coverage for src / taipanstack / resilience / adaptive / adaptive_breaker.py: 100%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Adaptive Circuit Breaker — auto-tunes failure threshold via rolling window. 

3 

4Unlike standard Circuit Breakers that use static absolute failure counts, 

5the AdaptiveCircuitBreaker opens its circuit ONLY when the error rate 

6exceeds a target percentage in a rolling window of recent calls AND a 

7minimum throughput of requests has been met. 

8""" 

9 

10from __future__ import annotations 

11 

12import logging 

13import math 

14import threading 

15import time 

16from collections import deque 

17from dataclasses import dataclass 

18from typing import TypeVar 

19 

20from taipanstack.core.result import Err, Ok, Result 

21from taipanstack.resilience.circuit_breaker import CircuitState 

22 

23logger = logging.getLogger("taipanstack.resilience.adaptive.breaker") 

24 

25T = TypeVar("T") 

26 

27 

28@dataclass(frozen=True) 

29class AdaptiveMetrics: 

30 """Snapshot of adaptive circuit breaker metrics. 

31 

32 Attributes: 

33 success_rate: Current success rate (0.0 - 1.0). 

34 error_rate: Current error rate (0.0 - 1.0). 

35 total_calls: Total calls in the window. 

36 error_count: Errors in the window. 

37 state: Current circuit state. 

38 

39 """ 

40 

41 success_rate: float 

42 error_rate: float 

43 total_calls: int 

44 error_count: int 

45 state: CircuitState 

46 

47 

48class AdaptiveCircuitBreaker: 

49 """Circuit breaker that opens based on an error rate percentage. 

50 

51 Maintains a rolling window of call outcomes. The circuit trips to OPEN if: 

52 1. The `window_size` history has at least `min_throughput` events. 

53 2. The error rate (errors / total) > `target_error_rate`. 

54 

55 Once OPEN, it waits `recovery_timeout` seconds before transitioning 

56 to HALF_OPEN. In HALF_OPEN, if a request succeeds, it CLOSES and 

57 clears the window. If it fails, it returns to OPEN. 

58 

59 Args: 

60 name: Identifier for logging. 

61 window_size: Number of recent calls to track. 

62 min_throughput: Minimum requests before considering error rate. 

63 target_error_rate: Desired error rate boundary (0.0 - 1.0). 

64 recovery_timeout: Seconds before attempting half-open recovery. 

65 

66 """ 

67 

68 def __init__( 

69 self, 

70 name: str = "default", 

71 *, 

72 window_size: int = 100, 

73 min_throughput: int = 10, 

74 target_error_rate: float = 0.5, 

75 recovery_timeout: float = 30.0, 

76 ) -> None: 

77 """Initialize the adaptive circuit breaker.""" 

78 self.name = name 

79 self._min_throughput = min_throughput 

80 self._target_error_rate = target_error_rate 

81 if not math.isfinite(recovery_timeout) or recovery_timeout < 0: 

82 raise ValueError("recovery_timeout must be a finite non-negative number") 

83 self._recovery_timeout = recovery_timeout 

84 

85 # Rolling window: True = success, False = failure 

86 self._window: deque[bool] = deque(maxlen=window_size) 

87 self._state = CircuitState.CLOSED 

88 self._last_opened_at: float = 0.0 

89 self._lock = threading.Lock() 

90 

91 @property 

92 def state(self) -> CircuitState: 

93 """Current circuit state. May evaluate timeouts and switch to HALF_OPEN.""" 

94 with self._lock: 

95 if self._state == CircuitState.OPEN: 

96 now = time.monotonic() 

97 if now - self._last_opened_at >= self._recovery_timeout: 

98 self._state = CircuitState.HALF_OPEN 

99 logger.info( 

100 "Adaptive breaker '%s' entering HALF_OPEN state", self.name 

101 ) 

102 return self._state 

103 

104 def _evaluate_trip(self) -> None: 

105 """Evaluate if the circuit should trip open. 

106 

107 MUST BE CALLED UNDER LOCK. 

108 """ 

109 if self._state != CircuitState.CLOSED: 

110 return 

111 

112 total = len(self._window) 

113 if total < self._min_throughput: 

114 return 

115 

116 errors = sum(1 for ok in self._window if not ok) 

117 error_rate = errors / total 

118 

119 if error_rate > self._target_error_rate: 

120 self._state = CircuitState.OPEN 

121 self._last_opened_at = time.monotonic() 

122 logger.warning( 

123 "Adaptive breaker '%s' OPENED. Error rate %.2f > %.2f", 

124 self.name, 

125 error_rate, 

126 self._target_error_rate, 

127 ) 

128 

129 def record_success(self) -> None: 

130 """Record a successful call.""" 

131 with self._lock: 

132 if self._state == CircuitState.HALF_OPEN: 

133 # Full recovery on success 

134 self._state = CircuitState.CLOSED 

135 self._window.clear() 

136 logger.info( 

137 "Adaptive breaker '%s' CLOSED after successful half-open recovery.", 

138 self.name, 

139 ) 

140 

141 self._window.append(True) 

142 self._evaluate_trip() 

143 

144 def record_failure(self, _exc: Exception) -> None: 

145 """Record a failed call. 

146 

147 Args: 

148 _exc: The exception that occurred. 

149 

150 """ 

151 with self._lock: 

152 if self._state == CircuitState.HALF_OPEN: 

153 # Return to open immediately on failure 

154 self._state = CircuitState.OPEN 

155 self._last_opened_at = time.monotonic() 

156 logger.warning( 

157 "Adaptive breaker '%s' RETURNED to OPEN after half-open failure.", 

158 self.name, 

159 ) 

160 

161 self._window.append(False) 

162 self._evaluate_trip() 

163 

164 def evaluate_result(self, result: Result[T, Exception]) -> Result[T, Exception]: 

165 """Evaluate a Result and record success or failure. 

166 

167 Args: 

168 result: A ``Result`` to evaluate. 

169 

170 Returns: 

171 The original Result. 

172 

173 """ 

174 match result: 

175 case Ok(_): 

176 self.record_success() 

177 case Err(error): 

178 self.record_failure(error) 

179 return result 

180 

181 def should_allow(self) -> bool: 

182 """Check if a call should be attempted. 

183 

184 Returns: 

185 ``True`` if the circuit permits a call. 

186 

187 """ 

188 return self.state in (CircuitState.CLOSED, CircuitState.HALF_OPEN) 

189 

190 def reset(self) -> None: 

191 """Reset the breaker and window.""" 

192 with self._lock: 

193 self._window.clear() 

194 self._state = CircuitState.CLOSED 

195 self._last_opened_at = 0.0 

196 

197 @property 

198 def metrics(self) -> AdaptiveMetrics: 

199 """Snapshot of current adaptive metrics.""" 

200 with self._lock: 

201 total = len(self._window) 

202 errors = sum(1 for ok in self._window if not ok) 

203 error_rate = errors / total if total > 0 else 0.0 

204 success_rate = 1.0 - error_rate 

205 state_val = self._state 

206 

207 return AdaptiveMetrics( 

208 success_rate=success_rate, 

209 error_rate=error_rate, 

210 total_calls=total, 

211 error_count=errors, 

212 state=state_val, 

213 )