Coverage for src / taipanstack / resilience / watchdogs / resource_watcher.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Resource watcher — monitors CPU and memory usage. 

3 

4When usage breaches configurable thresholds, invokes a callback 

5so the application can react (e.g. tighten rate limits). 

6""" 

7 

8import logging 

9import time 

10from collections.abc import Callable 

11from dataclasses import dataclass 

12 

13from taipanstack.core.result import Err, Ok, Result 

14from taipanstack.resilience.watchdogs._base import BaseWatcher 

15 

16logger = logging.getLogger("taipanstack.resilience.watchdogs.resource") 

17 

18try: 

19 import psutil 

20 

21 _HAS_PSUTIL = True 

22except ImportError: 

23 psutil = None 

24 _HAS_PSUTIL = False 

25 

26 

27@dataclass(frozen=True) 

28class ResourceSnapshot: 

29 """Point-in-time snapshot of system resource usage. 

30 

31 Attributes: 

32 cpu_percent: Current CPU utilisation (0-100). 

33 memory_percent: Current memory utilisation (0-100). 

34 timestamp: Monotonic timestamp of the reading. 

35 

36 """ 

37 

38 cpu_percent: float 

39 memory_percent: float 

40 timestamp: float 

41 

42 

43def check_resources() -> Result[ResourceSnapshot, Exception]: 

44 """Take a one-shot resource reading. 

45 

46 Returns: 

47 ``Ok(ResourceSnapshot)`` on success, ``Err`` if psutil is 

48 unavailable. 

49 

50 """ 

51 if not _HAS_PSUTIL: 

52 return Err( 

53 ImportError( 

54 "psutil is required for resource monitoring. " 

55 "Install with: pip install taipanstack[resilience]" 

56 ) 

57 ) 

58 

59 cpu = psutil.cpu_percent(interval=0.1) 

60 mem = psutil.virtual_memory().percent 

61 return Ok( 

62 ResourceSnapshot( 

63 cpu_percent=cpu, 

64 memory_percent=mem, 

65 timestamp=time.monotonic(), 

66 ) 

67 ) 

68 

69 

70class ResourceWatcher(BaseWatcher): 

71 """Background watcher that monitors CPU and memory. 

72 

73 When either metric exceeds its configured threshold the 

74 ``on_threshold_breach`` callback is invoked with the resource 

75 name (``"cpu"`` or ``"memory"``) and the current value. 

76 

77 Args: 

78 interval: Seconds between checks. 

79 cpu_threshold: CPU percentage that triggers a breach. 

80 memory_threshold: Memory percentage that triggers a breach. 

81 on_threshold_breach: Optional callback ``(resource, value) -> None``. 

82 

83 Example: 

84 >>> watcher = ResourceWatcher( 

85 ... cpu_threshold=80.0, 

86 ... on_threshold_breach=lambda r, v: print(f"{r} at {v}%"), 

87 ... ) 

88 >>> await watcher.start() 

89 

90 """ 

91 

92 def __init__( 

93 self, 

94 *, 

95 interval: float = 5.0, 

96 cpu_threshold: float = 85.0, 

97 memory_threshold: float = 85.0, 

98 on_threshold_breach: Callable[[str, float], None] | None = None, 

99 ) -> None: 

100 """Initialize the resource watcher. 

101 

102 Args: 

103 interval: Seconds between checks. 

104 cpu_threshold: CPU percentage that triggers a breach. 

105 memory_threshold: Memory percentage that triggers a breach. 

106 on_threshold_breach: Optional breach callback. 

107 

108 """ 

109 super().__init__(interval=interval) 

110 self._cpu_threshold = cpu_threshold 

111 self._memory_threshold = memory_threshold 

112 self._on_threshold_breach = on_threshold_breach 

113 

114 async def start(self) -> Result[None, Exception]: 

115 """Start the resource watcher. 

116 

117 Returns: 

118 ``Err`` if psutil is not installed, otherwise delegates 

119 to ``BaseWatcher.start()``. 

120 

121 """ 

122 if not _HAS_PSUTIL: 

123 return Err( 

124 ImportError( 

125 "psutil is required for ResourceWatcher. " 

126 "Install with: pip install taipanstack[resilience]" 

127 ) 

128 ) 

129 return await super().start() 

130 

131 def _check_threshold(self, name: str, value: float, threshold: float) -> None: 

132 if value >= threshold: 

133 logger.warning( 

134 "%s threshold breached: %.1f%% >= %.1f%%", 

135 name.capitalize(), 

136 value, 

137 threshold, 

138 ) 

139 if self._on_threshold_breach is not None: 

140 self._on_threshold_breach(name, value) 

141 

142 def _handle_snapshot(self, snapshot: ResourceSnapshot) -> None: 

143 self._check_threshold("cpu", snapshot.cpu_percent, self._cpu_threshold) 

144 self._check_threshold("memory", snapshot.memory_percent, self._memory_threshold) 

145 

146 async def _run(self) -> None: 

147 """Execute a single resource check cycle.""" 

148 result = check_resources() 

149 match result: 

150 case Ok(snapshot): 

151 self._handle_snapshot(snapshot) 

152 case Err(error): 

153 logger.error("Resource check failed: %s", error)