Coverage for src / taipanstack / resilience / adaptive / bulkhead.py: 100%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Bulkhead pattern — concurrency isolation via semaphore.
4Limits the number of concurrent executions to prevent a single
5failing dependency from consuming all available resources.
6"""
8from __future__ import annotations
10import asyncio
11import logging
12import math
13from collections.abc import Awaitable, Callable
14from typing import ParamSpec, TypeVar
16from taipanstack.core.result import Err, Ok, Result
18logger = logging.getLogger("taipanstack.resilience.adaptive.bulkhead")
20T = TypeVar("T")
21P = ParamSpec("P")
24class BulkheadFullError(Exception):
25 """Raised when the bulkhead queue is at capacity."""
27 def __init__(self, name: str, max_concurrent: int, max_queue: int) -> None:
28 """Initialize BulkheadFullError.
30 Args:
31 name: Bulkhead name.
32 max_concurrent: Concurrency limit.
33 max_queue: Queue limit.
35 """
36 self.bulkhead_name = name
37 self.max_concurrent = max_concurrent
38 self.max_queue = max_queue
39 super().__init__(
40 f"Bulkhead '{name}' is full "
41 f"(max_concurrent={max_concurrent}, max_queue={max_queue})"
42 )
45class Bulkhead:
46 """Concurrency limiter using ``asyncio.Semaphore``.
48 Limits the number of concurrent executions of a callable.
49 Excess callers are queued up to ``max_queue``; beyond that
50 a ``BulkheadFullError`` is returned.
52 Args:
53 name: Identifier for logging.
54 max_concurrent: Maximum concurrent executions.
55 max_queue: Maximum queued callers beyond concurrent limit.
56 timeout: Seconds to wait for a permit before timing out.
58 Example:
59 >>> bulk = Bulkhead("db", max_concurrent=5, max_queue=10)
60 >>> result = await bulk.execute(fetch_data, user_id)
62 """
64 def __init__(
65 self,
66 name: str = "default",
67 *,
68 max_concurrent: int = 10,
69 max_queue: int = 50,
70 timeout: float = 30.0,
71 ) -> None:
72 """Initialize the bulkhead.
74 Args:
75 name: Bulkhead name.
76 max_concurrent: Concurrency limit.
77 max_queue: Queue limit.
78 timeout: Permit acquisition timeout.
80 """
81 self.name = name
82 self._max_concurrent = max_concurrent
83 self._max_queue = max_queue
84 if not math.isfinite(timeout) or timeout < 0:
85 raise ValueError("timeout must be a finite non-negative number")
86 self._timeout = timeout
87 self._semaphore = asyncio.Semaphore(max_concurrent)
88 self._queued = 0
89 self._active = 0
91 @property
92 def available_permits(self) -> int:
93 """Number of available concurrency permits."""
94 return self._max_concurrent - self._active
96 @property
97 def queued(self) -> int:
98 """Number of callers currently waiting in the queue."""
99 return self._queued
101 @property
102 def active(self) -> int:
103 """Number of currently executing tasks."""
104 return self._active
106 async def execute(
107 self,
108 fn: Callable[P, Awaitable[T]],
109 *args: P.args,
110 **kwargs: P.kwargs,
111 ) -> Result[T, Exception]:
112 """Execute a callable within bulkhead limits.
114 Args:
115 fn: Async callable to execute.
116 *args: Positional arguments for fn.
117 **kwargs: Keyword arguments for fn.
119 Returns:
120 ``Ok(result)`` on success, ``Err`` on failure.
122 """
123 # Check queue capacity
124 if self._queued >= self._max_queue:
125 return Err(
126 BulkheadFullError(
127 self.name,
128 self._max_concurrent,
129 self._max_queue,
130 )
131 )
133 self._queued += 1
134 try:
135 # Wait for a permit
136 try:
137 await asyncio.wait_for(
138 self._semaphore.acquire(),
139 timeout=self._timeout,
140 )
141 except TimeoutError:
142 return Err(
143 TimeoutError(
144 f"Bulkhead '{self.name}' timed out "
145 f"after {self._timeout}s waiting for permit"
146 )
147 )
148 except (RuntimeError, OSError, MemoryError) as e:
149 return Err(RuntimeError(f"Resource exhaustion: {e!s}"))
150 finally:
151 self._queued -= 1
153 # Execute within the permit
154 self._active += 1
155 try:
156 result = await fn(*args, **kwargs)
157 return Ok(result)
158 except Exception as exc:
159 logger.warning(
160 "Bulkhead '%s' execution failed: %s",
161 self.name,
162 exc,
163 )
164 return Err(exc)
165 finally:
166 self._active -= 1
167 self._semaphore.release()