Coverage for src / taipanstack / resilience / watchdogs / _base.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Base watcher abstract class. 

3 

4Provides a shared lifecycle (start/stop) for all background 

5watchdog tasks running on the asyncio event loop. 

6""" 

7 

8import asyncio 

9import logging 

10from abc import ABC, abstractmethod 

11 

12from taipanstack.core.result import Err, Ok, Result 

13 

14logger = logging.getLogger("taipanstack.resilience.watchdogs") 

15 

16 

17class BaseWatcher(ABC): 

18 """Abstract base for background watchdog tasks. 

19 

20 Subclasses implement ``_run`` which is called repeatedly at 

21 ``_interval`` seconds until ``stop`` is called. 

22 

23 Args: 

24 interval: Seconds between each poll cycle. 

25 

26 Example: 

27 >>> class MyWatcher(BaseWatcher): 

28 ... async def _run(self) -> None: 

29 ... print("checking...") 

30 >>> watcher = MyWatcher(interval=5.0) 

31 >>> await watcher.start() 

32 

33 """ 

34 

35 def __init__(self, *, interval: float = 5.0) -> None: 

36 """Initialize the base watcher. 

37 

38 Args: 

39 interval: Seconds between each poll cycle. 

40 

41 """ 

42 self._interval = interval 

43 self._stop_event: asyncio.Event = asyncio.Event() 

44 self._task: asyncio.Task[None] | None = None 

45 

46 @property 

47 def is_running(self) -> bool: 

48 """Return ``True`` if the background task is active.""" 

49 return self._task is not None and not self._task.done() 

50 

51 async def start(self) -> Result[None, Exception]: 

52 """Start the background watcher loop. 

53 

54 Returns: 

55 ``Ok(None)`` on success, ``Err`` if already running. 

56 

57 """ 

58 if self.is_running: 

59 return Err(RuntimeError("Watcher is already running")) 

60 

61 self._stop_event.clear() 

62 self._task = asyncio.create_task(self._loop()) 

63 logger.info("%s started (interval=%.1fs)", type(self).__name__, self._interval) 

64 return Ok(None) 

65 

66 async def stop(self) -> None: 

67 """Signal the watcher to stop and wait for it to finish.""" 

68 self._stop_event.set() 

69 if self._task is not None: 

70 try: 

71 await asyncio.wait_for(self._task, timeout=self._interval + 1.0) 

72 except (TimeoutError, asyncio.CancelledError): 

73 self._task.cancel() 

74 self._task = None 

75 logger.info("%s stopped", type(self).__name__) 

76 

77 async def _loop(self) -> None: 

78 """Internal loop that calls ``_run`` at each interval.""" 

79 while not self._stop_event.is_set(): 

80 try: 

81 await self._run() 

82 except Exception: 

83 logger.exception("%s encountered an error", type(self).__name__) 

84 try: 

85 await asyncio.wait_for(self._stop_event.wait(), timeout=self._interval) 

86 except TimeoutError: 

87 continue 

88 

89 @abstractmethod 

90 async def _run(self) -> None: 

91 """Execute a single poll cycle. 

92 

93 Subclasses must override this with the actual monitoring logic. 

94 """ 

95 ... 

96 

97 def _get_extra_repr(self) -> dict[str, object]: 

98 """Return extra fields for ``__repr__``. 

99 

100 Subclasses can override to add custom fields. 

101 """ 

102 return {} 

103 

104 def __repr__(self) -> str: 

105 """Return a developer-friendly representation.""" 

106 extras = self._get_extra_repr() 

107 parts = [f"interval={self._interval}"] 

108 parts.extend(f"{k}={v}" for k, v in extras.items()) 

109 return f"{type(self).__name__}({', '.join(parts)})"