Coverage for src / taipanstack / resilience / adaptive / bulkhead.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Bulkhead pattern — concurrency isolation via semaphore. 

3 

4Limits the number of concurrent executions to prevent a single 

5failing dependency from consuming all available resources. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import logging 

12import math 

13from collections.abc import Awaitable, Callable 

14from typing import ParamSpec, TypeVar 

15 

16from taipanstack.core.result import Err, Ok, Result 

17 

18logger = logging.getLogger("taipanstack.resilience.adaptive.bulkhead") 

19 

20T = TypeVar("T") 

21P = ParamSpec("P") 

22 

23 

24class BulkheadFullError(Exception): 

25 """Raised when the bulkhead queue is at capacity.""" 

26 

27 def __init__(self, name: str, max_concurrent: int, max_queue: int) -> None: 

28 """Initialize BulkheadFullError. 

29 

30 Args: 

31 name: Bulkhead name. 

32 max_concurrent: Concurrency limit. 

33 max_queue: Queue limit. 

34 

35 """ 

36 self.bulkhead_name = name 

37 self.max_concurrent = max_concurrent 

38 self.max_queue = max_queue 

39 super().__init__( 

40 f"Bulkhead '{name}' is full " 

41 f"(max_concurrent={max_concurrent}, max_queue={max_queue})" 

42 ) 

43 

44 

45class Bulkhead: 

46 """Concurrency limiter using ``asyncio.Semaphore``. 

47 

48 Limits the number of concurrent executions of a callable. 

49 Excess callers are queued up to ``max_queue``; beyond that 

50 a ``BulkheadFullError`` is returned. 

51 

52 Args: 

53 name: Identifier for logging. 

54 max_concurrent: Maximum concurrent executions. 

55 max_queue: Maximum queued callers beyond concurrent limit. 

56 timeout: Seconds to wait for a permit before timing out. 

57 

58 Example: 

59 >>> bulk = Bulkhead("db", max_concurrent=5, max_queue=10) 

60 >>> result = await bulk.execute(fetch_data, user_id) 

61 

62 """ 

63 

64 def __init__( 

65 self, 

66 name: str = "default", 

67 *, 

68 max_concurrent: int = 10, 

69 max_queue: int = 50, 

70 timeout: float = 30.0, 

71 ) -> None: 

72 """Initialize the bulkhead. 

73 

74 Args: 

75 name: Bulkhead name. 

76 max_concurrent: Concurrency limit. 

77 max_queue: Queue limit. 

78 timeout: Permit acquisition timeout. 

79 

80 """ 

81 self.name = name 

82 self._max_concurrent = max_concurrent 

83 self._max_queue = max_queue 

84 if not math.isfinite(timeout) or timeout < 0: 

85 raise ValueError("timeout must be a finite non-negative number") 

86 self._timeout = timeout 

87 self._semaphore = asyncio.Semaphore(max_concurrent) 

88 self._queued = 0 

89 self._active = 0 

90 

91 @property 

92 def available_permits(self) -> int: 

93 """Number of available concurrency permits.""" 

94 return self._max_concurrent - self._active 

95 

96 @property 

97 def queued(self) -> int: 

98 """Number of callers currently waiting in the queue.""" 

99 return self._queued 

100 

101 @property 

102 def active(self) -> int: 

103 """Number of currently executing tasks.""" 

104 return self._active 

105 

106 async def execute( 

107 self, 

108 fn: Callable[P, Awaitable[T]], 

109 *args: P.args, 

110 **kwargs: P.kwargs, 

111 ) -> Result[T, Exception]: 

112 """Execute a callable within bulkhead limits. 

113 

114 Args: 

115 fn: Async callable to execute. 

116 *args: Positional arguments for fn. 

117 **kwargs: Keyword arguments for fn. 

118 

119 Returns: 

120 ``Ok(result)`` on success, ``Err`` on failure. 

121 

122 """ 

123 # Check queue capacity 

124 if self._queued >= self._max_queue: 

125 return Err( 

126 BulkheadFullError( 

127 self.name, 

128 self._max_concurrent, 

129 self._max_queue, 

130 ) 

131 ) 

132 

133 self._queued += 1 

134 try: 

135 # Wait for a permit 

136 try: 

137 await asyncio.wait_for( 

138 self._semaphore.acquire(), 

139 timeout=self._timeout, 

140 ) 

141 except TimeoutError: 

142 return Err( 

143 TimeoutError( 

144 f"Bulkhead '{self.name}' timed out " 

145 f"after {self._timeout}s waiting for permit" 

146 ) 

147 ) 

148 except (RuntimeError, OSError, MemoryError) as e: 

149 return Err(RuntimeError(f"Resource exhaustion: {e!s}")) 

150 finally: 

151 self._queued -= 1 

152 

153 # Execute within the permit 

154 self._active += 1 

155 try: 

156 result = await fn(*args, **kwargs) 

157 return Ok(result) 

158 except Exception as exc: 

159 logger.warning( 

160 "Bulkhead '%s' execution failed: %s", 

161 self.name, 

162 exc, 

163 ) 

164 return Err(exc) 

165 finally: 

166 self._active -= 1 

167 self._semaphore.release()