Coverage for src / taipanstack / resilience / adaptive / adaptive_breaker.py: 100%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Adaptive Circuit Breaker — auto-tunes failure threshold via rolling window.
4Unlike standard Circuit Breakers that use static absolute failure counts,
5the AdaptiveCircuitBreaker opens its circuit ONLY when the error rate
6exceeds a target percentage in a rolling window of recent calls AND a
7minimum throughput of requests has been met.
8"""
10from __future__ import annotations
12import logging
13import math
14import threading
15import time
16from collections import deque
17from dataclasses import dataclass
18from typing import TypeVar
20from taipanstack.core.result import Err, Ok, Result
21from taipanstack.resilience.circuit_breaker import CircuitState
23logger = logging.getLogger("taipanstack.resilience.adaptive.breaker")
25T = TypeVar("T")
28@dataclass(frozen=True)
29class AdaptiveMetrics:
30 """Snapshot of adaptive circuit breaker metrics.
32 Attributes:
33 success_rate: Current success rate (0.0 - 1.0).
34 error_rate: Current error rate (0.0 - 1.0).
35 total_calls: Total calls in the window.
36 error_count: Errors in the window.
37 state: Current circuit state.
39 """
41 success_rate: float
42 error_rate: float
43 total_calls: int
44 error_count: int
45 state: CircuitState
48class AdaptiveCircuitBreaker:
49 """Circuit breaker that opens based on an error rate percentage.
51 Maintains a rolling window of call outcomes. The circuit trips to OPEN if:
52 1. The `window_size` history has at least `min_throughput` events.
53 2. The error rate (errors / total) > `target_error_rate`.
55 Once OPEN, it waits `recovery_timeout` seconds before transitioning
56 to HALF_OPEN. In HALF_OPEN, if a request succeeds, it CLOSES and
57 clears the window. If it fails, it returns to OPEN.
59 Args:
60 name: Identifier for logging.
61 window_size: Number of recent calls to track.
62 min_throughput: Minimum requests before considering error rate.
63 target_error_rate: Desired error rate boundary (0.0 - 1.0).
64 recovery_timeout: Seconds before attempting half-open recovery.
66 """
68 def __init__(
69 self,
70 name: str = "default",
71 *,
72 window_size: int = 100,
73 min_throughput: int = 10,
74 target_error_rate: float = 0.5,
75 recovery_timeout: float = 30.0,
76 ) -> None:
77 """Initialize the adaptive circuit breaker."""
78 self.name = name
79 self._min_throughput = min_throughput
80 self._target_error_rate = target_error_rate
81 if not math.isfinite(recovery_timeout) or recovery_timeout < 0:
82 raise ValueError("recovery_timeout must be a finite non-negative number")
83 self._recovery_timeout = recovery_timeout
85 # Rolling window: True = success, False = failure
86 self._window: deque[bool] = deque(maxlen=window_size)
87 self._state = CircuitState.CLOSED
88 self._last_opened_at: float = 0.0
89 self._lock = threading.Lock()
91 @property
92 def state(self) -> CircuitState:
93 """Current circuit state. May evaluate timeouts and switch to HALF_OPEN."""
94 with self._lock:
95 if self._state == CircuitState.OPEN:
96 now = time.monotonic()
97 if now - self._last_opened_at >= self._recovery_timeout:
98 self._state = CircuitState.HALF_OPEN
99 logger.info(
100 "Adaptive breaker '%s' entering HALF_OPEN state", self.name
101 )
102 return self._state
104 def _evaluate_trip(self) -> None:
105 """Evaluate if the circuit should trip open.
107 MUST BE CALLED UNDER LOCK.
108 """
109 if self._state != CircuitState.CLOSED:
110 return
112 total = len(self._window)
113 if total < self._min_throughput:
114 return
116 errors = sum(1 for ok in self._window if not ok)
117 error_rate = errors / total
119 if error_rate > self._target_error_rate:
120 self._state = CircuitState.OPEN
121 self._last_opened_at = time.monotonic()
122 logger.warning(
123 "Adaptive breaker '%s' OPENED. Error rate %.2f > %.2f",
124 self.name,
125 error_rate,
126 self._target_error_rate,
127 )
129 def record_success(self) -> None:
130 """Record a successful call."""
131 with self._lock:
132 if self._state == CircuitState.HALF_OPEN:
133 # Full recovery on success
134 self._state = CircuitState.CLOSED
135 self._window.clear()
136 logger.info(
137 "Adaptive breaker '%s' CLOSED after successful half-open recovery.",
138 self.name,
139 )
141 self._window.append(True)
142 self._evaluate_trip()
144 def record_failure(self, _exc: Exception) -> None:
145 """Record a failed call.
147 Args:
148 _exc: The exception that occurred.
150 """
151 with self._lock:
152 if self._state == CircuitState.HALF_OPEN:
153 # Return to open immediately on failure
154 self._state = CircuitState.OPEN
155 self._last_opened_at = time.monotonic()
156 logger.warning(
157 "Adaptive breaker '%s' RETURNED to OPEN after half-open failure.",
158 self.name,
159 )
161 self._window.append(False)
162 self._evaluate_trip()
164 def evaluate_result(self, result: Result[T, Exception]) -> Result[T, Exception]:
165 """Evaluate a Result and record success or failure.
167 Args:
168 result: A ``Result`` to evaluate.
170 Returns:
171 The original Result.
173 """
174 match result:
175 case Ok(_):
176 self.record_success()
177 case Err(error):
178 self.record_failure(error)
179 return result
181 def should_allow(self) -> bool:
182 """Check if a call should be attempted.
184 Returns:
185 ``True`` if the circuit permits a call.
187 """
188 return self.state in (CircuitState.CLOSED, CircuitState.HALF_OPEN)
190 def reset(self) -> None:
191 """Reset the breaker and window."""
192 with self._lock:
193 self._window.clear()
194 self._state = CircuitState.CLOSED
195 self._last_opened_at = 0.0
197 @property
198 def metrics(self) -> AdaptiveMetrics:
199 """Snapshot of current adaptive metrics."""
200 with self._lock:
201 total = len(self._window)
202 errors = sum(1 for ok in self._window if not ok)
203 error_rate = errors / total if total > 0 else 0.0
204 success_rate = 1.0 - error_rate
205 state_val = self._state
207 return AdaptiveMetrics(
208 success_rate=success_rate,
209 error_rate=error_rate,
210 total_calls=total,
211 error_count=errors,
212 state=state_val,
213 )