Coverage for src / taipanstack / resilience / adaptive / orchestrator.py: 100%
146 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Resilience Orchestrator — compose multiple patterns into a pipeline.
4Provides a fluent builder to combine bulkhead, circuit breaker,
5retry, timeout, and fallback into a single execution pipeline.
7Execution order: bulkhead → circuit breaker → retry → timeout → fn → fallback.
8"""
10from __future__ import annotations
12import asyncio
13import logging
14import math
15from collections.abc import Awaitable, Callable
16from typing import Generic, ParamSpec, TypeVar, cast
18from taipanstack.core.result import Err, Ok, Result
19from taipanstack.resilience.adaptive.adaptive_breaker import AdaptiveCircuitBreaker
20from taipanstack.resilience.adaptive.adaptive_retry import AdaptiveRetry
21from taipanstack.resilience.adaptive.bulkhead import Bulkhead, BulkheadFullError
22from taipanstack.resilience.circuit_breaker import (
23 CircuitBreaker,
24 CircuitBreakerError,
25)
26from taipanstack.resilience.retry import RetryConfig, calculate_delay
28logger = logging.getLogger("taipanstack.resilience.adaptive.orchestrator")
30T = TypeVar("T")
31E = TypeVar("E", bound=Exception)
32P = ParamSpec("P")
35class ResilienceOrchestrator(Generic[T]):
36 """Compose resilience patterns into a single pipeline.
38 Provides a fluent builder API to add patterns in order.
39 Execution proceeds through each configured layer.
41 Args:
42 name: Pipeline name for logging.
44 Example:
45 >>> orch = (
46 ... ResilienceOrchestrator("api")
47 ... .with_bulkhead(max_concurrent=5)
48 ... .with_circuit_breaker(breaker)
49 ... .with_retry(RetryConfig(max_attempts=3))
50 ... .with_timeout(10.0)
51 ... .with_fallback({"status": "cached"})
52 ... )
53 >>> result = await orch.execute(call_api, endpoint)
55 """
57 def __init__(self, name: str = "default") -> None:
58 """Initialize the orchestrator.
60 Args:
61 name: Pipeline name.
63 """
64 self.name = name
65 self._bulkhead: Bulkhead | None = None
66 self._breaker: CircuitBreaker | None = None
67 self._adaptive_breaker: AdaptiveCircuitBreaker | None = None
68 self._retry_config: RetryConfig | None = None
69 self._adaptive_retry: AdaptiveRetry | None = None
70 self._timeout: float | None = None
71 self._fallback_value: T | object = _SENTINEL
73 def with_bulkhead(
74 self,
75 max_concurrent: int = 10,
76 max_queue: int = 50,
77 timeout: float = 30.0,
78 ) -> ResilienceOrchestrator[T]:
79 """Add a bulkhead concurrency limiter.
81 Args:
82 max_concurrent: Max concurrent executions.
83 max_queue: Max queued callers.
84 timeout: Permit acquisition timeout.
86 Returns:
87 self for chaining.
89 """
90 if not math.isfinite(timeout) or timeout < 0:
91 raise ValueError("timeout must be a finite non-negative number")
92 self._bulkhead = Bulkhead(
93 f"{self.name}-bulkhead",
94 max_concurrent=max_concurrent,
95 max_queue=max_queue,
96 timeout=timeout,
97 )
98 return self
100 def with_circuit_breaker(
101 self,
102 breaker: CircuitBreaker | AdaptiveCircuitBreaker,
103 ) -> ResilienceOrchestrator[T]:
104 """Add a circuit breaker.
106 Args:
107 breaker: Standard or adaptive circuit breaker.
109 Returns:
110 self for chaining.
112 """
113 if isinstance(breaker, AdaptiveCircuitBreaker):
114 self._adaptive_breaker = breaker
115 self._breaker = None
116 else:
117 self._breaker = breaker
118 return self
120 def with_retry(
121 self,
122 config: RetryConfig | AdaptiveRetry,
123 ) -> ResilienceOrchestrator[T]:
124 """Add retry logic.
126 Args:
127 config: Standard retry config or adaptive retry.
129 Returns:
130 self for chaining.
132 """
133 if isinstance(config, AdaptiveRetry):
134 self._adaptive_retry = config
135 self._retry_config = config.to_retry_config()
136 else:
137 self._retry_config = config
138 return self
140 def with_timeout(self, seconds: float) -> ResilienceOrchestrator[T]:
141 """Add a timeout.
143 Args:
144 seconds: Maximum execution time.
146 Returns:
147 self for chaining.
149 """
150 if not math.isfinite(seconds) or seconds < 0:
151 raise ValueError("timeout must be a finite non-negative number")
152 self._timeout = seconds
153 return self
155 def with_fallback(self, value: T) -> ResilienceOrchestrator[T]:
156 """Add a fallback value for failures.
158 Args:
159 value: Value to return on failure.
161 Returns:
162 self for chaining.
164 """
165 self._fallback_value = value
166 return self
168 async def execute(
169 self,
170 fn: Callable[P, Awaitable[T]],
171 *args: P.args,
172 **kwargs: P.kwargs,
173 ) -> Result[T, Exception]:
174 """Execute the function through the resilience pipeline.
176 Order: bulkhead → circuit breaker → retry → timeout → fn → fallback.
178 Args:
179 fn: Async callable to execute.
180 *args: Positional arguments.
181 **kwargs: Keyword arguments.
183 Returns:
184 ``Ok(result)`` on success, ``Err`` on failure.
186 """
187 # Layer 1: Bulkhead — use semaphore directly to avoid double-wrapping
188 if self._bulkhead is not None:
189 bh = self._bulkhead
190 if bh.queued >= bh._max_queue:
191 result: Result[T, Exception] = Err(
192 BulkheadFullError(bh.name, bh._max_concurrent, bh._max_queue)
193 )
194 return self._apply_fallback(result)
196 bh._queued += 1
197 try:
198 try:
199 await asyncio.wait_for(
200 bh._semaphore.acquire(),
201 timeout=bh._timeout,
202 )
203 except TimeoutError:
204 return self._apply_fallback(
205 Err(
206 TimeoutError(
207 f"Bulkhead '{bh.name}' timed out after {bh._timeout}s"
208 )
209 )
210 )
211 except (RuntimeError, OSError, MemoryError) as e:
212 return self._apply_fallback(
213 Err(RuntimeError(f"Resource exhaustion: {e!s}"))
214 )
215 finally:
216 bh._queued -= 1
218 bh._active += 1
219 try:
220 return await self._execute_inner(fn, *args, **kwargs)
221 finally:
222 bh._active -= 1
223 bh._semaphore.release()
225 return await self._execute_inner(fn, *args, **kwargs)
227 def _evaluate_circuit_breaker(self) -> Result[T, Exception] | None:
228 """Check if execution is allowed by the circuit breaker."""
229 if self._adaptive_breaker is not None:
230 if not self._adaptive_breaker.should_allow():
231 return Err(
232 CircuitBreakerError(
233 f"Circuit '{self._adaptive_breaker.name}' is open",
234 state=self._adaptive_breaker.state,
235 )
236 )
237 elif self._breaker is not None and not self._breaker._should_attempt():
238 return Err(
239 CircuitBreakerError(
240 f"Circuit '{self._breaker.name}' is open",
241 state=self._breaker.state,
242 )
243 )
244 return None
246 def _record_success_outcome(self, attempt: int) -> None:
247 """Record a successful execution outcome."""
248 if self._adaptive_breaker is not None:
249 self._adaptive_breaker.record_success()
250 elif self._breaker is not None:
251 self._breaker._record_success()
253 if self._adaptive_retry is not None:
254 self._adaptive_retry.record_outcome(attempt, True, 0.0)
256 def _record_failure_outcome(self, error: Exception, attempt: int) -> None:
257 """Record a failed execution outcome."""
258 if self._adaptive_breaker is not None:
259 self._adaptive_breaker.record_failure(error)
260 elif self._breaker is not None:
261 self._breaker._record_failure(error)
263 if self._adaptive_retry is not None:
264 self._adaptive_retry.record_outcome(attempt, False, 0.0)
266 def _calculate_retry_delay(self, attempt: int) -> float:
267 """Calculate the retry delay for the given attempt."""
268 if self._adaptive_retry is not None:
269 return self._adaptive_retry.get_delay(attempt)
270 if self._retry_config is not None:
271 return calculate_delay(attempt, self._retry_config)
272 return 0.0
274 async def _execute_inner(
275 self,
276 fn: Callable[P, Awaitable[T]],
277 *args: P.args,
278 **kwargs: P.kwargs,
279 ) -> Result[T, Exception]:
280 """Execute through breaker → retry → timeout → fn layers."""
281 cb_err = self._evaluate_circuit_breaker()
282 if cb_err is not None:
283 return self._apply_fallback(cb_err)
285 max_attempts = (
286 self._retry_config.max_attempts if self._retry_config is not None else 1
287 )
288 return await self._execute_with_retries(max_attempts, fn, *args, **kwargs)
290 async def _handle_retry_failure(
291 self,
292 error: Exception,
293 attempt: int,
294 max_attempts: int,
295 ) -> bool:
296 self._record_failure_outcome(error, attempt)
297 if self._retry_config is not None and attempt < max_attempts:
298 delay = self._calculate_retry_delay(attempt)
299 await asyncio.sleep(min(delay, 3600.0))
300 return True
301 return False
303 async def _execute_with_retries(
304 self,
305 max_attempts: int,
306 fn: Callable[P, Awaitable[T]],
307 *args: P.args,
308 **kwargs: P.kwargs,
309 ) -> Result[T, Exception]:
310 last_error: Exception | None = None
311 for attempt in range(1, max_attempts + 1):
312 result = await self._execute_with_timeout(fn, *args, **kwargs)
313 match result:
314 case Ok():
315 self._record_success_outcome(attempt)
316 return result
317 case Err(error):
318 last_error = error
319 if await self._handle_retry_failure(error, attempt, max_attempts):
320 continue
321 break
323 final_result: Result[T, Exception] = Err(
324 last_error or RuntimeError("Execution failed")
325 )
326 return self._apply_fallback(final_result)
328 async def _execute_with_timeout(
329 self,
330 fn: Callable[P, Awaitable[T]],
331 *args: P.args,
332 **kwargs: P.kwargs,
333 ) -> Result[T, Exception]:
334 """Execute fn with optional timeout.
336 Args:
337 fn: Async callable.
338 *args: Positional arguments.
339 **kwargs: Keyword arguments.
341 Returns:
342 ``Ok(result)`` or ``Err``.
344 """
345 try:
346 if self._timeout is not None:
347 result = await asyncio.wait_for(
348 fn(*args, **kwargs),
349 timeout=self._timeout,
350 )
351 else:
352 result = await fn(*args, **kwargs)
353 return Ok(result)
354 except TimeoutError:
355 return Err(
356 TimeoutError(f"Pipeline '{self.name}' timed out after {self._timeout}s")
357 )
358 except Exception as exc:
359 return Err(exc)
361 def _apply_fallback(
362 self,
363 result: Result[T, Exception],
364 ) -> Result[T, Exception]:
365 """Apply fallback if configured and result is Err.
367 Args:
368 result: The result to potentially replace.
370 Returns:
371 Original result or ``Ok(fallback_value)``.
373 """
374 match result:
375 case Err():
376 if self._fallback_value is not _SENTINEL:
377 return Ok(cast(T, self._fallback_value))
378 case Ok():
379 pass
380 return result
383# Sentinel for distinguishing "no fallback" from "fallback=None"
384_SENTINEL = object()