Coverage for src / taipanstack / resilience / resilience.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Resilience decorators. 

3 

4Provides tools for graceful fallback and timeouts using the Result monad. 

5""" 

6 

7import asyncio 

8import functools 

9import inspect 

10import math 

11import threading 

12from collections.abc import Awaitable, Callable 

13from typing import ParamSpec, Protocol, TypeAlias, TypeVar, cast, overload 

14 

15from taipanstack.core.result import Err, Ok, Result 

16 

17P = ParamSpec("P") 

18T = TypeVar("T") 

19E = TypeVar("E", bound=Exception) 

20 

21ResultFunc: TypeAlias = Callable[P, Result[T, E]] 

22AsyncResultFunc: TypeAlias = Callable[P, Awaitable[Result[T, E]]] 

23 

24 

25class FallbackDecorator(Protocol): 

26 """Protocol for the fallback decorator.""" 

27 

28 @overload 

29 def __call__(self, func: ResultFunc[P, T, E]) -> ResultFunc[P, T, E]: ... 

30 

31 @overload 

32 def __call__(self, func: AsyncResultFunc[P, T, E]) -> AsyncResultFunc[P, T, E]: ... 

33 

34 

35def fallback( 

36 fallback_value: T, 

37 exceptions: tuple[type[Exception], ...] = (Exception,), 

38) -> FallbackDecorator: 

39 """Provide a fallback value on failures. 

40 

41 If the wrapped function returns an Err() or raises a specified exception, 

42 the fallback value is returned wrapped in an Ok(). 

43 

44 Args: 

45 fallback_value: The value to return on failure. 

46 exceptions: Exceptions to catch. 

47 

48 Returns: 

49 Decorator function. 

50 

51 """ 

52 

53 def decorator( 

54 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E], 

55 ) -> ResultFunc[P, T, E] | AsyncResultFunc[P, T, E]: 

56 if inspect.iscoroutinefunction(func): 

57 

58 @functools.wraps(func) 

59 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]: 

60 try: 

61 # func is a coroutine function here 

62 func_coro = cast(AsyncResultFunc[P, T, E], func) 

63 result = await func_coro(*args, **kwargs) 

64 match result: 

65 case Err(): 

66 return Ok(fallback_value) 

67 case Ok(): 

68 return result 

69 except exceptions: 

70 return Ok(fallback_value) 

71 return Err(cast(E, RuntimeError("Unreachable"))) 

72 

73 return async_wrapper 

74 

75 @functools.wraps(func) 

76 def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]: 

77 try: 

78 # func is a normal function here 

79 func_sync = cast(ResultFunc[P, T, E], func) 

80 result = func_sync(*args, **kwargs) 

81 match result: 

82 case Err(): 

83 return Ok(fallback_value) 

84 case Ok(): 

85 return result 

86 except exceptions: 

87 return Ok(fallback_value) 

88 return Err(cast(E, RuntimeError("Unreachable"))) 

89 

90 return sync_wrapper 

91 

92 return cast(FallbackDecorator, decorator) 

93 

94 

95class TimeoutDecorator(Protocol): 

96 """Protocol for the timeout decorator.""" 

97 

98 @overload 

99 def __call__( 

100 self, func: ResultFunc[P, T, E] 

101 ) -> Callable[P, Result[T, TimeoutError | E]]: ... 

102 

103 @overload 

104 def __call__( 

105 self, func: AsyncResultFunc[P, T, E] 

106 ) -> Callable[P, Awaitable[Result[T, TimeoutError | E]]]: ... 

107 

108 

109def timeout(seconds: float) -> TimeoutDecorator: 

110 """Enforce a maximum execution time. 

111 

112 If the execution time exceeds the specified limit, returns Err(TimeoutError). 

113 

114 Args: 

115 seconds: Maximum allowed execution time in seconds. 

116 

117 Returns: 

118 Decorator function. 

119 

120 """ 

121 

122 def decorator( 

123 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E], 

124 ) -> ( 

125 Callable[P, Result[T, TimeoutError | E]] 

126 | Callable[P, Awaitable[Result[T, TimeoutError | E]]] 

127 ): 

128 if inspect.iscoroutinefunction(func): 

129 

130 @functools.wraps(func) 

131 async def async_wrapper( 

132 *args: P.args, **kwargs: P.kwargs 

133 ) -> Result[T, TimeoutError | E]: 

134 if not math.isfinite(seconds) or seconds < 0: 

135 return Err( 

136 cast( 

137 E, 

138 ValueError("Timeout must be a finite non-negative number"), 

139 ) 

140 ) 

141 

142 try: 

143 func_coro = cast( 

144 Callable[P, Awaitable[Result[T, TimeoutError | E]]], 

145 func, 

146 ) 

147 return await asyncio.wait_for( 

148 func_coro(*args, **kwargs), 

149 timeout=seconds, 

150 ) 

151 except TimeoutError: 

152 return Err( 

153 TimeoutError(f"Execution timed out after {seconds} seconds.") 

154 ) 

155 

156 return async_wrapper 

157 

158 @functools.wraps(func) 

159 def sync_wrapper( 

160 *args: P.args, **kwargs: P.kwargs 

161 ) -> Result[T, TimeoutError | E]: 

162 if not math.isfinite(seconds) or seconds < 0: 

163 return Err( 

164 cast( 

165 E, 

166 ValueError("Timeout must be a finite non-negative number"), 

167 ) 

168 ) 

169 

170 result: list[Result[T, TimeoutError | E]] = [] 

171 exception: list[BaseException] = [] 

172 

173 def worker() -> None: 

174 try: 

175 func_sync = cast(Callable[P, Result[T, TimeoutError | E]], func) 

176 result.append(func_sync(*args, **kwargs)) 

177 except BaseException as e: 

178 exception.append(e) 

179 

180 thread = threading.Thread(target=worker, daemon=True) 

181 try: 

182 thread.start() 

183 thread.join(timeout=seconds) 

184 except RuntimeError as e: 

185 return Err(cast(E, RuntimeError(f"Thread exhaustion: {e!s}"))) 

186 except OSError as e: 

187 return Err(cast(E, RuntimeError(f"Resource exhaustion: {e!s}"))) 

188 except MemoryError as e: 

189 return Err(cast(E, RuntimeError(f"Memory exhaustion: {e!s}"))) 

190 

191 if thread.is_alive(): 

192 return Err( 

193 TimeoutError(f"Execution timed out after {seconds} seconds.") 

194 ) 

195 

196 if exception: 

197 raise exception[0] 

198 

199 return result[0] 

200 

201 return sync_wrapper 

202 

203 return cast(TimeoutDecorator, decorator)