Coverage for src / taipanstack / resilience / watchdogs / _base.py: 100%
41 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Base watcher abstract class.
4Provides a shared lifecycle (start/stop) for all background
5watchdog tasks running on the asyncio event loop.
6"""
8import asyncio
9import logging
10from abc import ABC, abstractmethod
12from taipanstack.core.result import Err, Ok, Result
14logger = logging.getLogger("taipanstack.resilience.watchdogs")
17class BaseWatcher(ABC):
18 """Abstract base for background watchdog tasks.
20 Subclasses implement ``_run`` which is called repeatedly at
21 ``_interval`` seconds until ``stop`` is called.
23 Args:
24 interval: Seconds between each poll cycle.
26 Example:
27 >>> class MyWatcher(BaseWatcher):
28 ... async def _run(self) -> None:
29 ... print("checking...")
30 >>> watcher = MyWatcher(interval=5.0)
31 >>> await watcher.start()
33 """
35 def __init__(self, *, interval: float = 5.0) -> None:
36 """Initialize the base watcher.
38 Args:
39 interval: Seconds between each poll cycle.
41 """
42 self._interval = interval
43 self._stop_event: asyncio.Event = asyncio.Event()
44 self._task: asyncio.Task[None] | None = None
46 @property
47 def is_running(self) -> bool:
48 """Return ``True`` if the background task is active."""
49 return self._task is not None and not self._task.done()
51 async def start(self) -> Result[None, Exception]:
52 """Start the background watcher loop.
54 Returns:
55 ``Ok(None)`` on success, ``Err`` if already running.
57 """
58 if self.is_running:
59 return Err(RuntimeError("Watcher is already running"))
61 self._stop_event.clear()
62 self._task = asyncio.create_task(self._loop())
63 logger.info("%s started (interval=%.1fs)", type(self).__name__, self._interval)
64 return Ok(None)
66 async def stop(self) -> None:
67 """Signal the watcher to stop and wait for it to finish."""
68 self._stop_event.set()
69 if self._task is not None:
70 try:
71 await asyncio.wait_for(self._task, timeout=self._interval + 1.0)
72 except (TimeoutError, asyncio.CancelledError):
73 self._task.cancel()
74 self._task = None
75 logger.info("%s stopped", type(self).__name__)
77 async def _loop(self) -> None:
78 """Internal loop that calls ``_run`` at each interval."""
79 while not self._stop_event.is_set():
80 try:
81 await self._run()
82 except Exception:
83 logger.exception("%s encountered an error", type(self).__name__)
84 try:
85 await asyncio.wait_for(self._stop_event.wait(), timeout=self._interval)
86 except TimeoutError:
87 continue
89 @abstractmethod
90 async def _run(self) -> None:
91 """Execute a single poll cycle.
93 Subclasses must override this with the actual monitoring logic.
94 """
95 ...
97 def _get_extra_repr(self) -> dict[str, object]:
98 """Return extra fields for ``__repr__``.
100 Subclasses can override to add custom fields.
101 """
102 return {}
104 def __repr__(self) -> str:
105 """Return a developer-friendly representation."""
106 extras = self._get_extra_repr()
107 parts = [f"interval={self._interval}"]
108 parts.extend(f"{k}={v}" for k, v in extras.items())
109 return f"{type(self).__name__}({', '.join(parts)})"