Coverage for src / taipanstack / resilience / adaptive / orchestrator.py: 100%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Resilience Orchestrator — compose multiple patterns into a pipeline. 

3 

4Provides a fluent builder to combine bulkhead, circuit breaker, 

5retry, timeout, and fallback into a single execution pipeline. 

6 

7Execution order: bulkhead → circuit breaker → retry → timeout → fn → fallback. 

8""" 

9 

10from __future__ import annotations 

11 

12import asyncio 

13import logging 

14import math 

15from collections.abc import Awaitable, Callable 

16from typing import Generic, ParamSpec, TypeVar, cast 

17 

18from taipanstack.core.result import Err, Ok, Result 

19from taipanstack.resilience.adaptive.adaptive_breaker import AdaptiveCircuitBreaker 

20from taipanstack.resilience.adaptive.adaptive_retry import AdaptiveRetry 

21from taipanstack.resilience.adaptive.bulkhead import Bulkhead, BulkheadFullError 

22from taipanstack.resilience.circuit_breaker import ( 

23 CircuitBreaker, 

24 CircuitBreakerError, 

25) 

26from taipanstack.resilience.retry import RetryConfig, calculate_delay 

27 

28logger = logging.getLogger("taipanstack.resilience.adaptive.orchestrator") 

29 

30T = TypeVar("T") 

31E = TypeVar("E", bound=Exception) 

32P = ParamSpec("P") 

33 

34 

35class ResilienceOrchestrator(Generic[T]): 

36 """Compose resilience patterns into a single pipeline. 

37 

38 Provides a fluent builder API to add patterns in order. 

39 Execution proceeds through each configured layer. 

40 

41 Args: 

42 name: Pipeline name for logging. 

43 

44 Example: 

45 >>> orch = ( 

46 ... ResilienceOrchestrator("api") 

47 ... .with_bulkhead(max_concurrent=5) 

48 ... .with_circuit_breaker(breaker) 

49 ... .with_retry(RetryConfig(max_attempts=3)) 

50 ... .with_timeout(10.0) 

51 ... .with_fallback({"status": "cached"}) 

52 ... ) 

53 >>> result = await orch.execute(call_api, endpoint) 

54 

55 """ 

56 

57 def __init__(self, name: str = "default") -> None: 

58 """Initialize the orchestrator. 

59 

60 Args: 

61 name: Pipeline name. 

62 

63 """ 

64 self.name = name 

65 self._bulkhead: Bulkhead | None = None 

66 self._breaker: CircuitBreaker | None = None 

67 self._adaptive_breaker: AdaptiveCircuitBreaker | None = None 

68 self._retry_config: RetryConfig | None = None 

69 self._adaptive_retry: AdaptiveRetry | None = None 

70 self._timeout: float | None = None 

71 self._fallback_value: T | object = _SENTINEL 

72 

73 def with_bulkhead( 

74 self, 

75 max_concurrent: int = 10, 

76 max_queue: int = 50, 

77 timeout: float = 30.0, 

78 ) -> ResilienceOrchestrator[T]: 

79 """Add a bulkhead concurrency limiter. 

80 

81 Args: 

82 max_concurrent: Max concurrent executions. 

83 max_queue: Max queued callers. 

84 timeout: Permit acquisition timeout. 

85 

86 Returns: 

87 self for chaining. 

88 

89 """ 

90 if not math.isfinite(timeout) or timeout < 0: 

91 raise ValueError("timeout must be a finite non-negative number") 

92 self._bulkhead = Bulkhead( 

93 f"{self.name}-bulkhead", 

94 max_concurrent=max_concurrent, 

95 max_queue=max_queue, 

96 timeout=timeout, 

97 ) 

98 return self 

99 

100 def with_circuit_breaker( 

101 self, 

102 breaker: CircuitBreaker | AdaptiveCircuitBreaker, 

103 ) -> ResilienceOrchestrator[T]: 

104 """Add a circuit breaker. 

105 

106 Args: 

107 breaker: Standard or adaptive circuit breaker. 

108 

109 Returns: 

110 self for chaining. 

111 

112 """ 

113 if isinstance(breaker, AdaptiveCircuitBreaker): 

114 self._adaptive_breaker = breaker 

115 self._breaker = None 

116 else: 

117 self._breaker = breaker 

118 return self 

119 

120 def with_retry( 

121 self, 

122 config: RetryConfig | AdaptiveRetry, 

123 ) -> ResilienceOrchestrator[T]: 

124 """Add retry logic. 

125 

126 Args: 

127 config: Standard retry config or adaptive retry. 

128 

129 Returns: 

130 self for chaining. 

131 

132 """ 

133 if isinstance(config, AdaptiveRetry): 

134 self._adaptive_retry = config 

135 self._retry_config = config.to_retry_config() 

136 else: 

137 self._retry_config = config 

138 return self 

139 

140 def with_timeout(self, seconds: float) -> ResilienceOrchestrator[T]: 

141 """Add a timeout. 

142 

143 Args: 

144 seconds: Maximum execution time. 

145 

146 Returns: 

147 self for chaining. 

148 

149 """ 

150 if not math.isfinite(seconds) or seconds < 0: 

151 raise ValueError("timeout must be a finite non-negative number") 

152 self._timeout = seconds 

153 return self 

154 

155 def with_fallback(self, value: T) -> ResilienceOrchestrator[T]: 

156 """Add a fallback value for failures. 

157 

158 Args: 

159 value: Value to return on failure. 

160 

161 Returns: 

162 self for chaining. 

163 

164 """ 

165 self._fallback_value = value 

166 return self 

167 

168 async def execute( 

169 self, 

170 fn: Callable[P, Awaitable[T]], 

171 *args: P.args, 

172 **kwargs: P.kwargs, 

173 ) -> Result[T, Exception]: 

174 """Execute the function through the resilience pipeline. 

175 

176 Order: bulkhead → circuit breaker → retry → timeout → fn → fallback. 

177 

178 Args: 

179 fn: Async callable to execute. 

180 *args: Positional arguments. 

181 **kwargs: Keyword arguments. 

182 

183 Returns: 

184 ``Ok(result)`` on success, ``Err`` on failure. 

185 

186 """ 

187 # Layer 1: Bulkhead — use semaphore directly to avoid double-wrapping 

188 if self._bulkhead is not None: 

189 bh = self._bulkhead 

190 if bh.queued >= bh._max_queue: 

191 result: Result[T, Exception] = Err( 

192 BulkheadFullError(bh.name, bh._max_concurrent, bh._max_queue) 

193 ) 

194 return self._apply_fallback(result) 

195 

196 bh._queued += 1 

197 try: 

198 try: 

199 await asyncio.wait_for( 

200 bh._semaphore.acquire(), 

201 timeout=bh._timeout, 

202 ) 

203 except TimeoutError: 

204 return self._apply_fallback( 

205 Err( 

206 TimeoutError( 

207 f"Bulkhead '{bh.name}' timed out after {bh._timeout}s" 

208 ) 

209 ) 

210 ) 

211 except (RuntimeError, OSError, MemoryError) as e: 

212 return self._apply_fallback( 

213 Err(RuntimeError(f"Resource exhaustion: {e!s}")) 

214 ) 

215 finally: 

216 bh._queued -= 1 

217 

218 bh._active += 1 

219 try: 

220 return await self._execute_inner(fn, *args, **kwargs) 

221 finally: 

222 bh._active -= 1 

223 bh._semaphore.release() 

224 

225 return await self._execute_inner(fn, *args, **kwargs) 

226 

227 def _evaluate_circuit_breaker(self) -> Result[T, Exception] | None: 

228 """Check if execution is allowed by the circuit breaker.""" 

229 if self._adaptive_breaker is not None: 

230 if not self._adaptive_breaker.should_allow(): 

231 return Err( 

232 CircuitBreakerError( 

233 f"Circuit '{self._adaptive_breaker.name}' is open", 

234 state=self._adaptive_breaker.state, 

235 ) 

236 ) 

237 elif self._breaker is not None and not self._breaker._should_attempt(): 

238 return Err( 

239 CircuitBreakerError( 

240 f"Circuit '{self._breaker.name}' is open", 

241 state=self._breaker.state, 

242 ) 

243 ) 

244 return None 

245 

246 def _record_success_outcome(self, attempt: int) -> None: 

247 """Record a successful execution outcome.""" 

248 if self._adaptive_breaker is not None: 

249 self._adaptive_breaker.record_success() 

250 elif self._breaker is not None: 

251 self._breaker._record_success() 

252 

253 if self._adaptive_retry is not None: 

254 self._adaptive_retry.record_outcome(attempt, True, 0.0) 

255 

256 def _record_failure_outcome(self, error: Exception, attempt: int) -> None: 

257 """Record a failed execution outcome.""" 

258 if self._adaptive_breaker is not None: 

259 self._adaptive_breaker.record_failure(error) 

260 elif self._breaker is not None: 

261 self._breaker._record_failure(error) 

262 

263 if self._adaptive_retry is not None: 

264 self._adaptive_retry.record_outcome(attempt, False, 0.0) 

265 

266 def _calculate_retry_delay(self, attempt: int) -> float: 

267 """Calculate the retry delay for the given attempt.""" 

268 if self._adaptive_retry is not None: 

269 return self._adaptive_retry.get_delay(attempt) 

270 if self._retry_config is not None: 

271 return calculate_delay(attempt, self._retry_config) 

272 return 0.0 

273 

274 async def _execute_inner( 

275 self, 

276 fn: Callable[P, Awaitable[T]], 

277 *args: P.args, 

278 **kwargs: P.kwargs, 

279 ) -> Result[T, Exception]: 

280 """Execute through breaker → retry → timeout → fn layers.""" 

281 cb_err = self._evaluate_circuit_breaker() 

282 if cb_err is not None: 

283 return self._apply_fallback(cb_err) 

284 

285 max_attempts = ( 

286 self._retry_config.max_attempts if self._retry_config is not None else 1 

287 ) 

288 return await self._execute_with_retries(max_attempts, fn, *args, **kwargs) 

289 

290 async def _handle_retry_failure( 

291 self, 

292 error: Exception, 

293 attempt: int, 

294 max_attempts: int, 

295 ) -> bool: 

296 self._record_failure_outcome(error, attempt) 

297 if self._retry_config is not None and attempt < max_attempts: 

298 delay = self._calculate_retry_delay(attempt) 

299 await asyncio.sleep(min(delay, 3600.0)) 

300 return True 

301 return False 

302 

303 async def _execute_with_retries( 

304 self, 

305 max_attempts: int, 

306 fn: Callable[P, Awaitable[T]], 

307 *args: P.args, 

308 **kwargs: P.kwargs, 

309 ) -> Result[T, Exception]: 

310 last_error: Exception | None = None 

311 for attempt in range(1, max_attempts + 1): 

312 result = await self._execute_with_timeout(fn, *args, **kwargs) 

313 match result: 

314 case Ok(): 

315 self._record_success_outcome(attempt) 

316 return result 

317 case Err(error): 

318 last_error = error 

319 if await self._handle_retry_failure(error, attempt, max_attempts): 

320 continue 

321 break 

322 

323 final_result: Result[T, Exception] = Err( 

324 last_error or RuntimeError("Execution failed") 

325 ) 

326 return self._apply_fallback(final_result) 

327 

328 async def _execute_with_timeout( 

329 self, 

330 fn: Callable[P, Awaitable[T]], 

331 *args: P.args, 

332 **kwargs: P.kwargs, 

333 ) -> Result[T, Exception]: 

334 """Execute fn with optional timeout. 

335 

336 Args: 

337 fn: Async callable. 

338 *args: Positional arguments. 

339 **kwargs: Keyword arguments. 

340 

341 Returns: 

342 ``Ok(result)`` or ``Err``. 

343 

344 """ 

345 try: 

346 if self._timeout is not None: 

347 result = await asyncio.wait_for( 

348 fn(*args, **kwargs), 

349 timeout=self._timeout, 

350 ) 

351 else: 

352 result = await fn(*args, **kwargs) 

353 return Ok(result) 

354 except TimeoutError: 

355 return Err( 

356 TimeoutError(f"Pipeline '{self.name}' timed out after {self._timeout}s") 

357 ) 

358 except Exception as exc: 

359 return Err(exc) 

360 

361 def _apply_fallback( 

362 self, 

363 result: Result[T, Exception], 

364 ) -> Result[T, Exception]: 

365 """Apply fallback if configured and result is Err. 

366 

367 Args: 

368 result: The result to potentially replace. 

369 

370 Returns: 

371 Original result or ``Ok(fallback_value)``. 

372 

373 """ 

374 match result: 

375 case Err(): 

376 if self._fallback_value is not _SENTINEL: 

377 return Ok(cast(T, self._fallback_value)) 

378 case Ok(): 

379 pass 

380 return result 

381 

382 

383# Sentinel for distinguishing "no fallback" from "fallback=None" 

384_SENTINEL = object()