Coverage for src / taipanstack / utils / resilience.py: 100%
75 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
1"""
2Resilience decorators.
4Provides tools for graceful fallback and timeouts using the Result monad.
5"""
7import asyncio
8import functools
9import inspect
10import threading
11from collections.abc import Callable, Coroutine
12from typing import Any, ParamSpec, Protocol, TypeAlias, TypeVar, cast, overload
14from taipanstack.core.result import Err, Ok, Result
16P = ParamSpec("P")
17T = TypeVar("T")
18E = TypeVar("E", bound=Exception)
20ResultFunc: TypeAlias = Callable[P, Result[T, E]]
21AsyncResultFunc: TypeAlias = Callable[P, Coroutine[Any, Any, Result[T, E]]]
24class FallbackDecorator(Protocol):
25 """Protocol for the fallback decorator."""
27 @overload
28 def __call__(
29 self, func: ResultFunc[P, T, E]
30 ) -> ResultFunc[P, T, E]: ... # pragma: no cover
32 @overload
33 def __call__(
34 self, func: AsyncResultFunc[P, T, E]
35 ) -> AsyncResultFunc[P, T, E]: ... # pragma: no cover
38def fallback(
39 fallback_value: T,
40 exceptions: tuple[type[Exception], ...] = (Exception,),
41) -> FallbackDecorator:
42 """Provide a fallback value on failures.
44 If the wrapped function returns an Err() or raises a specified exception,
45 the fallback value is returned wrapped in an Ok().
47 Args:
48 fallback_value: The value to return on failure.
49 exceptions: Exceptions to catch.
51 Returns:
52 Decorator function.
54 """
56 def decorator(
57 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E],
58 ) -> ResultFunc[P, T, E] | AsyncResultFunc[P, T, E]:
59 if inspect.iscoroutinefunction(func):
61 @functools.wraps(func)
62 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]:
63 try:
64 # func is a coroutine function here
65 func_coro = cast(AsyncResultFunc[P, T, E], func)
66 result = await func_coro(*args, **kwargs)
67 match result:
68 case Err():
69 return Ok(fallback_value)
70 case Ok():
71 return result
72 except exceptions:
73 return Ok(fallback_value)
74 return Err(cast(E, RuntimeError("Unreachable"))) # pragma: no cover
76 return async_wrapper
78 @functools.wraps(func)
79 def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[T, E]:
80 try:
81 # func is a normal function here
82 func_sync = cast(ResultFunc[P, T, E], func)
83 result = func_sync(*args, **kwargs)
84 match result:
85 case Err():
86 return Ok(fallback_value)
87 case Ok():
88 return result
89 except exceptions:
90 return Ok(fallback_value)
91 return Err(cast(E, RuntimeError("Unreachable"))) # pragma: no cover
93 return sync_wrapper
95 return decorator # type: ignore[return-value]
98class TimeoutDecorator(Protocol):
99 """Protocol for the timeout decorator."""
101 @overload
102 def __call__(
103 self, func: ResultFunc[P, T, E]
104 ) -> Callable[P, Result[T, TimeoutError | E]]: ... # pragma: no cover
106 @overload
107 def __call__(
108 self, func: AsyncResultFunc[P, T, E]
109 ) -> Callable[
110 P, Coroutine[Any, Any, Result[T, TimeoutError | E]]
111 ]: ... # pragma: no cover
114def timeout(seconds: float) -> TimeoutDecorator:
115 """Enforce a maximum execution time.
117 If the execution time exceeds the specified limit, returns Err(TimeoutError).
119 Args:
120 seconds: Maximum allowed execution time in seconds.
122 Returns:
123 Decorator function.
125 """
127 def decorator(
128 func: ResultFunc[P, T, E] | AsyncResultFunc[P, T, E],
129 ) -> (
130 Callable[P, Result[T, TimeoutError | E]]
131 | Callable[P, Coroutine[Any, Any, Result[T, TimeoutError | E]]]
132 ):
133 if inspect.iscoroutinefunction(func):
135 @functools.wraps(func)
136 async def async_wrapper(
137 *args: P.args, **kwargs: P.kwargs
138 ) -> Result[T, TimeoutError | E]:
139 try:
140 func_coro = cast(
141 Callable[P, Coroutine[Any, Any, Result[T, TimeoutError | E]]],
142 func,
143 )
144 return await asyncio.wait_for(
145 func_coro(*args, **kwargs),
146 timeout=seconds,
147 )
148 except TimeoutError:
149 return Err(
150 TimeoutError(f"Execution timed out after {seconds} seconds.")
151 )
153 return async_wrapper
155 @functools.wraps(func)
156 def sync_wrapper(
157 *args: P.args, **kwargs: P.kwargs
158 ) -> Result[T, TimeoutError | E]:
159 result: list[Result[T, TimeoutError | E]] = []
160 exception: list[Exception] = []
162 def worker() -> None:
163 try:
164 func_sync = cast(Callable[P, Result[T, TimeoutError | E]], func)
165 result.append(func_sync(*args, **kwargs))
166 except Exception as e:
167 exception.append(e)
169 thread = threading.Thread(target=worker, daemon=True)
170 thread.start()
171 thread.join(timeout=seconds)
173 if thread.is_alive():
174 return Err(
175 TimeoutError(f"Execution timed out after {seconds} seconds.")
176 )
178 if exception:
179 raise exception[0]
181 return result[0]
183 return sync_wrapper
185 return decorator # type: ignore[return-value]