Coverage for src / taipanstack / resilience / watchdogs / health_pinger.py: 100%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Health pinger — proactively checks dependency health. 

3 

4Runs async health checks against registered targets. If a target 

5becomes unhealthy the associated ``CircuitBreaker`` is opened 

6preventively. 

7""" 

8 

9import asyncio 

10import logging 

11from collections.abc import Awaitable, Callable, Sequence 

12from dataclasses import dataclass 

13 

14from taipanstack.core.result import Err, Ok, Result 

15from taipanstack.resilience.circuit_breaker import CircuitBreaker, CircuitState 

16from taipanstack.resilience.watchdogs._base import BaseWatcher 

17 

18logger = logging.getLogger("taipanstack.resilience.watchdogs.health") 

19 

20 

21@dataclass 

22class HealthTarget: 

23 """A dependency to be monitored by :class:`HealthPinger`. 

24 

25 Attributes: 

26 name: Human-readable name for logging. 

27 check: Async callable returning ``True`` if the target is 

28 healthy, ``False`` otherwise. 

29 circuit_breaker: Optional circuit breaker to open on failure. 

30 

31 """ 

32 

33 name: str 

34 check: Callable[[], Awaitable[bool]] 

35 circuit_breaker: CircuitBreaker | None = None 

36 

37 

38async def check_target(target: HealthTarget) -> Result[bool, Exception]: 

39 """Run a single health check. 

40 

41 Args: 

42 target: The target to check. 

43 

44 Returns: 

45 ``Ok(True)`` if healthy, ``Ok(False)`` if unhealthy, 

46 ``Err`` if the check itself raises. 

47 

48 """ 

49 try: 

50 healthy = await target.check() 

51 return Ok(healthy) 

52 except Exception as exc: 

53 return Err(exc) 

54 

55 

56async def check_all( 

57 targets: Sequence[HealthTarget], 

58) -> Result[dict[str, bool], Exception]: 

59 """Run health checks for all targets concurrently. 

60 

61 Args: 

62 targets: Targets to check. 

63 

64 Returns: 

65 ``Ok(dict)`` mapping target names to health status. 

66 

67 """ 

68 # Run all checks concurrently 

69 check_results = await asyncio.gather(*(check_target(t) for t in targets)) 

70 

71 results: dict[str, bool] = {} 

72 for target, result in zip(targets, check_results, strict=True): 

73 match result: 

74 case Ok(healthy): 

75 results[target.name] = healthy 

76 case Err(error): 

77 logger.warning( 

78 "Health check for '%s' failed during aggregation: %s", 

79 target.name, 

80 error, 

81 ) 

82 results[target.name] = False 

83 return Ok(results) 

84 

85 

86class HealthPinger(BaseWatcher): 

87 """Background watcher that pings external dependencies. 

88 

89 For each registered ``HealthTarget``, calls its ``check`` 

90 coroutine on every cycle. If a target is unhealthy and has 

91 an associated ``CircuitBreaker``, the breaker is opened 

92 preventively. 

93 

94 Args: 

95 targets: Dependencies to monitor. 

96 interval: Seconds between ping cycles. 

97 on_health_change: Optional callback ``(name, is_healthy)``. 

98 

99 Example: 

100 >>> async def db_ping() -> bool: 

101 ... return await pool.fetchval("SELECT 1") == 1 

102 >>> pinger = HealthPinger( 

103 ... targets=[HealthTarget("db", db_ping, breaker)], 

104 ... ) 

105 >>> await pinger.start() 

106 

107 """ 

108 

109 def __init__( 

110 self, 

111 *, 

112 targets: Sequence[HealthTarget], 

113 interval: float = 10.0, 

114 on_health_change: Callable[[str, bool], None] | None = None, 

115 ) -> None: 

116 """Initialize the health pinger. 

117 

118 Args: 

119 targets: Dependencies to monitor. 

120 interval: Seconds between ping cycles. 

121 on_health_change: Optional callback on status change. 

122 

123 """ 

124 super().__init__(interval=interval) 

125 self._targets = list(targets) 

126 self._on_health_change = on_health_change 

127 self._last_status: dict[str, bool] = {} 

128 

129 async def _process_target(self, target: HealthTarget) -> None: 

130 """Process a single health target.""" 

131 result = await check_target(target) 

132 

133 match result: 

134 case Ok(healthy): 

135 is_healthy = healthy 

136 case Err(error): 

137 logger.warning( 

138 "Health check for '%s' raised: %s", 

139 target.name, 

140 error, 

141 ) 

142 is_healthy = False 

143 

144 self._update_target_status(target, is_healthy) 

145 

146 def _check_and_open_breaker(self, target: HealthTarget, is_healthy: bool) -> None: 

147 """Open circuit breaker preventively on failure (always checked).""" 

148 if ( 

149 not is_healthy 

150 and target.circuit_breaker is not None 

151 and target.circuit_breaker.state != CircuitState.OPEN 

152 ): 

153 _force_open_breaker(target.circuit_breaker, target.name) 

154 

155 def _notify_health_change(self, target: HealthTarget, is_healthy: bool) -> None: 

156 """Notify on health status change and log it.""" 

157 if self._on_health_change is not None: 

158 self._on_health_change(target.name, is_healthy) 

159 

160 if is_healthy: 

161 logger.info("Target '%s' is now healthy", target.name) 

162 else: 

163 logger.warning("Target '%s' is now unhealthy", target.name) 

164 

165 def _update_target_status(self, target: HealthTarget, is_healthy: bool) -> None: 

166 """Update target status and handle side-effects.""" 

167 previous = self._last_status.get(target.name) 

168 

169 self._check_and_open_breaker(target, is_healthy) 

170 

171 if previous == is_healthy: 

172 return 

173 

174 self._last_status[target.name] = is_healthy 

175 

176 self._notify_health_change(target, is_healthy) 

177 

178 async def _run(self) -> None: 

179 """Execute a single health-check cycle concurrently.""" 

180 await asyncio.gather(*(self._process_target(t) for t in self._targets)) 

181 

182 

183def _force_open_breaker(breaker: CircuitBreaker, target_name: str) -> None: 

184 """Force a circuit breaker into OPEN state. 

185 

186 Simulates enough failures to trip the breaker by recording 

187 a synthetic exception. 

188 

189 Args: 

190 breaker: The circuit breaker to trip. 

191 target_name: Target name for logging context. 

192 

193 """ 

194 synthetic = ConnectionError(f"Health ping failed for '{target_name}'") 

195 # Record failures until the breaker opens 

196 while breaker.state != CircuitState.OPEN: 

197 breaker._record_failure(synthetic) 

198 logger.warning( 

199 "Circuit breaker '%s' opened preventively for target '%s'", 

200 breaker.name, 

201 target_name, 

202 )