Coverage for src / taipanstack / resilience / watchdogs / health_pinger.py: 100%
68 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Health pinger — proactively checks dependency health.
4Runs async health checks against registered targets. If a target
5becomes unhealthy the associated ``CircuitBreaker`` is opened
6preventively.
7"""
9import asyncio
10import logging
11from collections.abc import Awaitable, Callable, Sequence
12from dataclasses import dataclass
14from taipanstack.core.result import Err, Ok, Result
15from taipanstack.resilience.circuit_breaker import CircuitBreaker, CircuitState
16from taipanstack.resilience.watchdogs._base import BaseWatcher
18logger = logging.getLogger("taipanstack.resilience.watchdogs.health")
21@dataclass
22class HealthTarget:
23 """A dependency to be monitored by :class:`HealthPinger`.
25 Attributes:
26 name: Human-readable name for logging.
27 check: Async callable returning ``True`` if the target is
28 healthy, ``False`` otherwise.
29 circuit_breaker: Optional circuit breaker to open on failure.
31 """
33 name: str
34 check: Callable[[], Awaitable[bool]]
35 circuit_breaker: CircuitBreaker | None = None
38async def check_target(target: HealthTarget) -> Result[bool, Exception]:
39 """Run a single health check.
41 Args:
42 target: The target to check.
44 Returns:
45 ``Ok(True)`` if healthy, ``Ok(False)`` if unhealthy,
46 ``Err`` if the check itself raises.
48 """
49 try:
50 healthy = await target.check()
51 return Ok(healthy)
52 except Exception as exc:
53 return Err(exc)
56async def check_all(
57 targets: Sequence[HealthTarget],
58) -> Result[dict[str, bool], Exception]:
59 """Run health checks for all targets concurrently.
61 Args:
62 targets: Targets to check.
64 Returns:
65 ``Ok(dict)`` mapping target names to health status.
67 """
68 # Run all checks concurrently
69 check_results = await asyncio.gather(*(check_target(t) for t in targets))
71 results: dict[str, bool] = {}
72 for target, result in zip(targets, check_results, strict=True):
73 match result:
74 case Ok(healthy):
75 results[target.name] = healthy
76 case Err(error):
77 logger.warning(
78 "Health check for '%s' failed during aggregation: %s",
79 target.name,
80 error,
81 )
82 results[target.name] = False
83 return Ok(results)
86class HealthPinger(BaseWatcher):
87 """Background watcher that pings external dependencies.
89 For each registered ``HealthTarget``, calls its ``check``
90 coroutine on every cycle. If a target is unhealthy and has
91 an associated ``CircuitBreaker``, the breaker is opened
92 preventively.
94 Args:
95 targets: Dependencies to monitor.
96 interval: Seconds between ping cycles.
97 on_health_change: Optional callback ``(name, is_healthy)``.
99 Example:
100 >>> async def db_ping() -> bool:
101 ... return await pool.fetchval("SELECT 1") == 1
102 >>> pinger = HealthPinger(
103 ... targets=[HealthTarget("db", db_ping, breaker)],
104 ... )
105 >>> await pinger.start()
107 """
109 def __init__(
110 self,
111 *,
112 targets: Sequence[HealthTarget],
113 interval: float = 10.0,
114 on_health_change: Callable[[str, bool], None] | None = None,
115 ) -> None:
116 """Initialize the health pinger.
118 Args:
119 targets: Dependencies to monitor.
120 interval: Seconds between ping cycles.
121 on_health_change: Optional callback on status change.
123 """
124 super().__init__(interval=interval)
125 self._targets = list(targets)
126 self._on_health_change = on_health_change
127 self._last_status: dict[str, bool] = {}
129 async def _process_target(self, target: HealthTarget) -> None:
130 """Process a single health target."""
131 result = await check_target(target)
133 match result:
134 case Ok(healthy):
135 is_healthy = healthy
136 case Err(error):
137 logger.warning(
138 "Health check for '%s' raised: %s",
139 target.name,
140 error,
141 )
142 is_healthy = False
144 self._update_target_status(target, is_healthy)
146 def _check_and_open_breaker(self, target: HealthTarget, is_healthy: bool) -> None:
147 """Open circuit breaker preventively on failure (always checked)."""
148 if (
149 not is_healthy
150 and target.circuit_breaker is not None
151 and target.circuit_breaker.state != CircuitState.OPEN
152 ):
153 _force_open_breaker(target.circuit_breaker, target.name)
155 def _notify_health_change(self, target: HealthTarget, is_healthy: bool) -> None:
156 """Notify on health status change and log it."""
157 if self._on_health_change is not None:
158 self._on_health_change(target.name, is_healthy)
160 if is_healthy:
161 logger.info("Target '%s' is now healthy", target.name)
162 else:
163 logger.warning("Target '%s' is now unhealthy", target.name)
165 def _update_target_status(self, target: HealthTarget, is_healthy: bool) -> None:
166 """Update target status and handle side-effects."""
167 previous = self._last_status.get(target.name)
169 self._check_and_open_breaker(target, is_healthy)
171 if previous == is_healthy:
172 return
174 self._last_status[target.name] = is_healthy
176 self._notify_health_change(target, is_healthy)
178 async def _run(self) -> None:
179 """Execute a single health-check cycle concurrently."""
180 await asyncio.gather(*(self._process_target(t) for t in self._targets))
183def _force_open_breaker(breaker: CircuitBreaker, target_name: str) -> None:
184 """Force a circuit breaker into OPEN state.
186 Simulates enough failures to trip the breaker by recording
187 a synthetic exception.
189 Args:
190 breaker: The circuit breaker to trip.
191 target_name: Target name for logging context.
193 """
194 synthetic = ConnectionError(f"Health ping failed for '{target_name}'")
195 # Record failures until the breaker opens
196 while breaker.state != CircuitState.OPEN:
197 breaker._record_failure(synthetic)
198 logger.warning(
199 "Circuit breaker '%s' opened preventively for target '%s'",
200 breaker.name,
201 target_name,
202 )