Coverage for src / taipanstack / resilience / resilience.py: 100%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Resilience decorators.
4Provides tools for graceful fallback and timeouts using the Result monad.
5"""
7import asyncio
8import functools
9import inspect
10import math
11import threading
12from collections.abc import Awaitable, Callable
13from typing import ParamSpec, Protocol, TypeAlias, TypeVar, cast, overload
15from taipanstack.core.result import Err, Ok, Result
17P = ParamSpec("P")
18T = TypeVar("T")
19E = TypeVar("E", bound=Exception)
21ResultFunc: TypeAlias = Callable[P, Result[T, E]]
22AsyncResultFunc: TypeAlias = Callable[P, Awaitable[Result[T, E]]]
25class FallbackDecorator(Protocol):
26 """Protocol for the fallback decorator."""
28 @overload
29 def __call__(self, func: ResultFunc[P, T, E]) -> ResultFunc[P, T, E]: ...
31 @overload
32 def __call__(self, func: AsyncResultFunc[P, T, E]) -> AsyncResultFunc[P, T, E]: ...
35def fallback(
36 fallback_value: T,
37 exceptions: tuple[type[Exception], ...] = (Exception,),
38) -> FallbackDecorator:
39 """Provide a fallback value on failures.
41 If the wrapped function returns an Err() or raises a specified exception,
42 the fallback value is returned wrapped in an Ok().
44 Args:
45 fallback_value: The value to return on failure.
46 exceptions: Exceptions to catch.
48 Returns:
49 Decorator function.
51 """
53 def decorator(
54 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E],
55 ) -> ResultFunc[P, T, E] | AsyncResultFunc[P, T, E]:
56 if inspect.iscoroutinefunction(func):
58 @functools.wraps(func)
59 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]:
60 try:
61 # func is a coroutine function here
62 func_coro = cast(AsyncResultFunc[P, T, E], func)
63 result = await func_coro(*args, **kwargs)
64 match result:
65 case Err():
66 return Ok(fallback_value)
67 case Ok():
68 return result
69 except exceptions:
70 return Ok(fallback_value)
71 return Err(cast(E, RuntimeError("Unreachable")))
73 return async_wrapper
75 @functools.wraps(func)
76 def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]:
77 try:
78 # func is a normal function here
79 func_sync = cast(ResultFunc[P, T, E], func)
80 result = func_sync(*args, **kwargs)
81 match result:
82 case Err():
83 return Ok(fallback_value)
84 case Ok():
85 return result
86 except exceptions:
87 return Ok(fallback_value)
88 return Err(cast(E, RuntimeError("Unreachable")))
90 return sync_wrapper
92 return cast(FallbackDecorator, decorator)
95class TimeoutDecorator(Protocol):
96 """Protocol for the timeout decorator."""
98 @overload
99 def __call__(
100 self, func: ResultFunc[P, T, E]
101 ) -> Callable[P, Result[T, TimeoutError | E]]: ...
103 @overload
104 def __call__(
105 self, func: AsyncResultFunc[P, T, E]
106 ) -> Callable[P, Awaitable[Result[T, TimeoutError | E]]]: ...
109def timeout(seconds: float) -> TimeoutDecorator:
110 """Enforce a maximum execution time.
112 If the execution time exceeds the specified limit, returns Err(TimeoutError).
114 Args:
115 seconds: Maximum allowed execution time in seconds.
117 Returns:
118 Decorator function.
120 """
122 def decorator(
123 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E],
124 ) -> (
125 Callable[P, Result[T, TimeoutError | E]]
126 | Callable[P, Awaitable[Result[T, TimeoutError | E]]]
127 ):
128 if inspect.iscoroutinefunction(func):
130 @functools.wraps(func)
131 async def async_wrapper(
132 *args: P.args, **kwargs: P.kwargs
133 ) -> Result[T, TimeoutError | E]:
134 if not math.isfinite(seconds) or seconds < 0:
135 return Err(
136 cast(
137 E,
138 ValueError("Timeout must be a finite non-negative number"),
139 )
140 )
142 try:
143 func_coro = cast(
144 Callable[P, Awaitable[Result[T, TimeoutError | E]]],
145 func,
146 )
147 return await asyncio.wait_for(
148 func_coro(*args, **kwargs),
149 timeout=seconds,
150 )
151 except TimeoutError:
152 return Err(
153 TimeoutError(f"Execution timed out after {seconds} seconds.")
154 )
156 return async_wrapper
158 @functools.wraps(func)
159 def sync_wrapper(
160 *args: P.args, **kwargs: P.kwargs
161 ) -> Result[T, TimeoutError | E]:
162 if not math.isfinite(seconds) or seconds < 0:
163 return Err(
164 cast(
165 E,
166 ValueError("Timeout must be a finite non-negative number"),
167 )
168 )
170 result: list[Result[T, TimeoutError | E]] = []
171 exception: list[BaseException] = []
173 def worker() -> None:
174 try:
175 func_sync = cast(Callable[P, Result[T, TimeoutError | E]], func)
176 result.append(func_sync(*args, **kwargs))
177 except BaseException as e:
178 exception.append(e)
180 thread = threading.Thread(target=worker, daemon=True)
181 try:
182 thread.start()
183 thread.join(timeout=seconds)
184 except RuntimeError as e:
185 return Err(cast(E, RuntimeError(f"Thread exhaustion: {e!s}")))
186 except OSError as e:
187 return Err(cast(E, RuntimeError(f"Resource exhaustion: {e!s}")))
188 except MemoryError as e:
189 return Err(cast(E, RuntimeError(f"Memory exhaustion: {e!s}")))
191 if thread.is_alive():
192 return Err(
193 TimeoutError(f"Execution timed out after {seconds} seconds.")
194 )
196 if exception:
197 raise exception[0]
199 return result[0]
201 return sync_wrapper
203 return cast(TimeoutDecorator, decorator)