Coverage for src / taipanstack / utils / resilience.py: 100%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 14:54 +0000

1""" 

2Resilience decorators. 

3 

4Provides tools for graceful fallback and timeouts using the Result monad. 

5""" 

6 

7import asyncio 

8import functools 

9import inspect 

10import threading 

11from collections.abc import Callable, Coroutine 

12from typing import Any, ParamSpec, Protocol, TypeAlias, TypeVar, cast, overload 

13 

14from taipanstack.core.result import Err, Ok, Result 

15 

16P = ParamSpec("P") 

17T = TypeVar("T") 

18E = TypeVar("E", bound=Exception) 

19 

20ResultFunc: TypeAlias = Callable[P, Result[T, E]] 

21AsyncResultFunc: TypeAlias = Callable[P, Coroutine[Any, Any, Result[T, E]]] 

22 

23 

24class FallbackDecorator(Protocol): 

25 """Protocol for the fallback decorator.""" 

26 

27 @overload 

28 def __call__( 

29 self, func: ResultFunc[P, T, E] 

30 ) -> ResultFunc[P, T, E]: ... # pragma: no cover 

31 

32 @overload 

33 def __call__( 

34 self, func: AsyncResultFunc[P, T, E] 

35 ) -> AsyncResultFunc[P, T, E]: ... # pragma: no cover 

36 

37 

38def fallback( 

39 fallback_value: T, 

40 exceptions: tuple[type[Exception], ...] = (Exception,), 

41) -> FallbackDecorator: 

42 """Provide a fallback value on failures. 

43 

44 If the wrapped function returns an Err() or raises a specified exception, 

45 the fallback value is returned wrapped in an Ok(). 

46 

47 Args: 

48 fallback_value: The value to return on failure. 

49 exceptions: Exceptions to catch. 

50 

51 Returns: 

52 Decorator function. 

53 

54 """ 

55 

56 def decorator( 

57 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E], 

58 ) -> ResultFunc[P, T, E] | AsyncResultFunc[P, T, E]: 

59 if inspect.iscoroutinefunction(func): 

60 

61 @functools.wraps(func) 

62 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]: 

63 try: 

64 # func is a coroutine function here 

65 func_coro = cast(AsyncResultFunc[P, T, E], func) 

66 result = await func_coro(*args, **kwargs) 

67 match result: 

68 case Err(): 

69 return Ok(fallback_value) 

70 case Ok(): 

71 return result 

72 except exceptions: 

73 return Ok(fallback_value) 

74 return Err(cast(E, RuntimeError("Unreachable"))) # pragma: no cover 

75 

76 return async_wrapper 

77 

78 @functools.wraps(func) 

79 def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]: 

80 try: 

81 # func is a normal function here 

82 func_sync = cast(ResultFunc[P, T, E], func) 

83 result = func_sync(*args, **kwargs) 

84 match result: 

85 case Err(): 

86 return Ok(fallback_value) 

87 case Ok(): 

88 return result 

89 except exceptions: 

90 return Ok(fallback_value) 

91 return Err(cast(E, RuntimeError("Unreachable"))) # pragma: no cover 

92 

93 return sync_wrapper 

94 

95 return decorator # type: ignore[return-value] 

96 

97 

98class TimeoutDecorator(Protocol): 

99 """Protocol for the timeout decorator.""" 

100 

101 @overload 

102 def __call__( 

103 self, func: ResultFunc[P, T, E] 

104 ) -> Callable[P, Result[T, TimeoutError | E]]: ... # pragma: no cover 

105 

106 @overload 

107 def __call__( 

108 self, func: AsyncResultFunc[P, T, E] 

109 ) -> Callable[ 

110 P, Coroutine[Any, Any, Result[T, TimeoutError | E]] 

111 ]: ... # pragma: no cover 

112 

113 

114def timeout(seconds: float) -> TimeoutDecorator: 

115 """Enforce a maximum execution time. 

116 

117 If the execution time exceeds the specified limit, returns Err(TimeoutError). 

118 

119 Args: 

120 seconds: Maximum allowed execution time in seconds. 

121 

122 Returns: 

123 Decorator function. 

124 

125 """ 

126 

127 def decorator( 

128 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E], 

129 ) -> ( 

130 Callable[P, Result[T, TimeoutError | E]] 

131 | Callable[P, Coroutine[Any, Any, Result[T, TimeoutError | E]]] 

132 ): 

133 if inspect.iscoroutinefunction(func): 

134 

135 @functools.wraps(func) 

136 async def async_wrapper( 

137 *args: P.args, **kwargs: P.kwargs 

138 ) -> Result[T, TimeoutError | E]: 

139 try: 

140 func_coro = cast( 

141 Callable[P, Coroutine[Any, Any, Result[T, TimeoutError | E]]], 

142 func, 

143 ) 

144 return await asyncio.wait_for( 

145 func_coro(*args, **kwargs), 

146 timeout=seconds, 

147 ) 

148 except TimeoutError: 

149 return Err( 

150 TimeoutError(f"Execution timed out after {seconds} seconds.") 

151 ) 

152 

153 return async_wrapper 

154 

155 @functools.wraps(func) 

156 def sync_wrapper( 

157 *args: P.args, **kwargs: P.kwargs 

158 ) -> Result[T, TimeoutError | E]: 

159 result: list[Result[T, TimeoutError | E]] = [] 

160 exception: list[Exception] = [] 

161 

162 def worker() -> None: 

163 try: 

164 func_sync = cast(Callable[P, Result[T, TimeoutError | E]], func) 

165 result.append(func_sync(*args, **kwargs)) 

166 except Exception as e: 

167 exception.append(e) 

168 

169 thread = threading.Thread(target=worker, daemon=True) 

170 thread.start() 

171 thread.join(timeout=seconds) 

172 

173 if thread.is_alive(): 

174 return Err( 

175 TimeoutError(f"Execution timed out after {seconds} seconds.") 

176 ) 

177 

178 if exception: 

179 raise exception[0] 

180 

181 return result[0] 

182 

183 return sync_wrapper 

184 

185 return decorator # type: ignore[return-value]