Coverage for src / taipanstack / resilience / watchdogs / resource_watcher.py: 100%
49 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Resource watcher — monitors CPU and memory usage.
4When usage breaches configurable thresholds, invokes a callback
5so the application can react (e.g. tighten rate limits).
6"""
8import logging
9import time
10from collections.abc import Callable
11from dataclasses import dataclass
13from taipanstack.core.result import Err, Ok, Result
14from taipanstack.resilience.watchdogs._base import BaseWatcher
16logger = logging.getLogger("taipanstack.resilience.watchdogs.resource")
18try:
19 import psutil
21 _HAS_PSUTIL = True
22except ImportError:
23 psutil = None
24 _HAS_PSUTIL = False
27@dataclass(frozen=True)
28class ResourceSnapshot:
29 """Point-in-time snapshot of system resource usage.
31 Attributes:
32 cpu_percent: Current CPU utilisation (0-100).
33 memory_percent: Current memory utilisation (0-100).
34 timestamp: Monotonic timestamp of the reading.
36 """
38 cpu_percent: float
39 memory_percent: float
40 timestamp: float
43def check_resources() -> Result[ResourceSnapshot, Exception]:
44 """Take a one-shot resource reading.
46 Returns:
47 ``Ok(ResourceSnapshot)`` on success, ``Err`` if psutil is
48 unavailable.
50 """
51 if not _HAS_PSUTIL:
52 return Err(
53 ImportError(
54 "psutil is required for resource monitoring. "
55 "Install with: pip install taipanstack[resilience]"
56 )
57 )
59 cpu = psutil.cpu_percent(interval=0.1)
60 mem = psutil.virtual_memory().percent
61 return Ok(
62 ResourceSnapshot(
63 cpu_percent=cpu,
64 memory_percent=mem,
65 timestamp=time.monotonic(),
66 )
67 )
70class ResourceWatcher(BaseWatcher):
71 """Background watcher that monitors CPU and memory.
73 When either metric exceeds its configured threshold the
74 ``on_threshold_breach`` callback is invoked with the resource
75 name (``"cpu"`` or ``"memory"``) and the current value.
77 Args:
78 interval: Seconds between checks.
79 cpu_threshold: CPU percentage that triggers a breach.
80 memory_threshold: Memory percentage that triggers a breach.
81 on_threshold_breach: Optional callback ``(resource, value) -> None``.
83 Example:
84 >>> watcher = ResourceWatcher(
85 ... cpu_threshold=80.0,
86 ... on_threshold_breach=lambda r, v: print(f"{r} at {v}%"),
87 ... )
88 >>> await watcher.start()
90 """
92 def __init__(
93 self,
94 *,
95 interval: float = 5.0,
96 cpu_threshold: float = 85.0,
97 memory_threshold: float = 85.0,
98 on_threshold_breach: Callable[[str, float], None] | None = None,
99 ) -> None:
100 """Initialize the resource watcher.
102 Args:
103 interval: Seconds between checks.
104 cpu_threshold: CPU percentage that triggers a breach.
105 memory_threshold: Memory percentage that triggers a breach.
106 on_threshold_breach: Optional breach callback.
108 """
109 super().__init__(interval=interval)
110 self._cpu_threshold = cpu_threshold
111 self._memory_threshold = memory_threshold
112 self._on_threshold_breach = on_threshold_breach
114 async def start(self) -> Result[None, Exception]:
115 """Start the resource watcher.
117 Returns:
118 ``Err`` if psutil is not installed, otherwise delegates
119 to ``BaseWatcher.start()``.
121 """
122 if not _HAS_PSUTIL:
123 return Err(
124 ImportError(
125 "psutil is required for ResourceWatcher. "
126 "Install with: pip install taipanstack[resilience]"
127 )
128 )
129 return await super().start()
131 def _check_threshold(self, name: str, value: float, threshold: float) -> None:
132 if value >= threshold:
133 logger.warning(
134 "%s threshold breached: %.1f%% >= %.1f%%",
135 name.capitalize(),
136 value,
137 threshold,
138 )
139 if self._on_threshold_breach is not None:
140 self._on_threshold_breach(name, value)
142 def _handle_snapshot(self, snapshot: ResourceSnapshot) -> None:
143 self._check_threshold("cpu", snapshot.cpu_percent, self._cpu_threshold)
144 self._check_threshold("memory", snapshot.memory_percent, self._memory_threshold)
146 async def _run(self) -> None:
147 """Execute a single resource check cycle."""
148 result = check_resources()
149 match result:
150 case Ok(snapshot):
151 self._handle_snapshot(snapshot)
152 case Err(error):
153 logger.error("Resource check failed: %s", error)