Coverage for src / taipanstack / resilience / watchdogs / config_watcher.py: 100%
106 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Configuration watcher — detects file changes and hot-reloads config.
4Polls configuration files for modifications using SHA-256 hashes
5and validates new content via Pydantic before applying changes.
6"""
8import hashlib
9import json
10import logging
11from collections.abc import Callable, Sequence
12from pathlib import Path
14from pydantic import BaseModel, ValidationError
16from taipanstack.core.result import Err, Ok, Result
17from taipanstack.resilience.watchdogs._base import BaseWatcher
19logger = logging.getLogger("taipanstack.resilience.watchdogs.config")
22def _hash_file(path: Path) -> Result[str, Exception]:
23 """Compute the SHA-256 hex digest of a file.
25 Args:
26 path: Path to the file.
28 Returns:
29 ``Ok(hex_digest)`` on success, ``Err`` on I/O failure.
31 """
32 try:
33 data = path.read_bytes()
34 return Ok(hashlib.sha256(data).hexdigest())
35 except OSError as exc:
36 return Err(exc)
39def _parse_env(text: str) -> dict[str, object]:
40 """Parse a simple ``.env`` key=value file.
42 Lines starting with ``#`` or blank lines are skipped.
43 Surrounding quotes on values are stripped.
45 Args:
46 text: Raw text content of the ``.env`` file.
48 Returns:
49 Parsed key-value mapping.
51 """
52 result: dict[str, object] = {}
53 for line in text.splitlines():
54 stripped = line.strip()
55 if not stripped or stripped.startswith("#"):
56 continue
57 if "=" not in stripped:
58 continue
59 key, _, value = stripped.partition("=")
60 value = value.strip().strip("\"'")
61 result[key.strip()] = value
62 return result
65def _parse_json(text: str) -> Result[dict[str, object], Exception]:
66 """Parse JSON text.
68 Args:
69 text: Raw JSON string.
71 Returns:
72 ``Ok(dict)`` on success, ``Err`` on parse failure.
74 """
75 try:
76 data = json.loads(text)
77 if not isinstance(data, dict):
78 return Err(TypeError(f"Expected JSON object, got {type(data).__name__}"))
79 return Ok(data)
80 except (json.JSONDecodeError, ValueError) as exc:
81 return Err(exc)
84def _load_file_data(path: Path) -> Result[dict[str, object], Exception]:
85 """Read and parse a configuration file based on its extension.
87 Supported extensions: ``.env``, ``.json``.
89 Args:
90 path: Path to the config file.
92 Returns:
93 ``Ok(dict)`` with parsed data, or ``Err`` on failure.
95 """
96 try:
97 text = path.read_text(encoding="utf-8")
98 except OSError as exc:
99 return Err(exc)
101 suffix = path.suffix.lower()
102 if suffix == ".json":
103 return _parse_json(text)
104 if suffix == ".env" or path.name == ".env":
105 return Ok(_parse_env(text))
107 return Err(ValueError(f"Unsupported config file extension: {suffix}"))
110def validate_config(
111 data: dict[str, object],
112 model: type[BaseModel],
113) -> Result[BaseModel, Exception]:
114 """Validate a data dictionary against a Pydantic model.
116 Args:
117 data: Raw configuration data.
118 model: Pydantic model class to validate against.
120 Returns:
121 ``Ok(model_instance)`` on success, ``Err(ValidationError)``
122 on failure.
124 """
125 try:
126 return Ok(model.model_validate(data))
127 except ValidationError as exc:
128 return Err(exc)
131class ConfigWatcher(BaseWatcher):
132 """Background watcher that detects configuration file changes.
134 Polls file hashes at each interval. When a change is detected
135 the content is validated via the provided Pydantic model and,
136 if valid, the ``on_config_change`` callback is invoked.
138 Args:
139 config_paths: Files to watch.
140 config_model: Pydantic model for validation.
141 interval: Seconds between polls.
142 on_config_change: Callback receiving the validated model.
143 on_validation_error: Callback receiving the ``Exception``
144 when validation fails.
146 Example:
147 >>> watcher = ConfigWatcher(
148 ... config_paths=[Path(".env")],
149 ... config_model=MySettings,
150 ... on_config_change=lambda cfg: apply(cfg),
151 ... )
152 >>> await watcher.start()
154 """
156 def __init__(
157 self,
158 *,
159 config_paths: Sequence[Path],
160 config_model: type[BaseModel],
161 interval: float = 2.0,
162 on_config_change: Callable[[BaseModel], None] | None = None,
163 on_validation_error: Callable[[Exception], None] | None = None,
164 ) -> None:
165 """Initialize the config watcher.
167 Args:
168 config_paths: Files to watch.
169 config_model: Pydantic model for validation.
170 interval: Seconds between polls.
171 on_config_change: Callback for valid config changes.
172 on_validation_error: Callback for validation failures.
174 """
175 super().__init__(interval=interval)
176 self._config_paths = list(config_paths)
177 self._config_model = config_model
178 self._on_config_change = on_config_change
179 self._on_validation_error = on_validation_error
180 self._file_hashes: dict[Path, str] = {}
182 def _process_hash_result(
183 self, path: Path, current_hash: str, changed: list[Path]
184 ) -> None:
185 """Process successful hash result and update changed list."""
186 previous = self._file_hashes.get(path)
187 if previous is None:
188 # First time seeing this file — record hash
189 self._file_hashes[path] = current_hash
190 elif current_hash != previous:
191 self._file_hashes[path] = current_hash
192 changed.append(path)
194 def _detect_changes(self) -> Result[list[Path], Exception]:
195 """Detect which watched files have changed since last check.
197 Returns:
198 ``Ok(list[Path])`` of changed file paths.
200 """
201 changed: list[Path] = []
202 for path in self._config_paths:
203 hash_result = _hash_file(path)
204 match hash_result:
205 case Ok(current_hash):
206 self._process_hash_result(path, current_hash, changed)
207 case Err(error):
208 logger.warning("Cannot hash %s: %s", path, error)
209 return Ok(changed)
211 def _handle_validation_success(
212 self, path: Path, model: BaseModel
213 ) -> Result[BaseModel, Exception]:
214 """Handle successful validation."""
215 logger.info(
216 "Config hot-reloaded from %s",
217 path,
218 )
219 if self._on_config_change is not None:
220 self._on_config_change(model)
221 return Ok(model)
223 def _handle_validation_failure(
224 self, path: Path, val_error: Exception
225 ) -> Result[BaseModel, Exception]:
226 """Handle validation failure."""
227 logger.error(
228 "Config validation failed for %s: %s",
229 path,
230 val_error,
231 )
232 if self._on_validation_error is not None:
233 self._on_validation_error(val_error)
234 return Err(val_error)
236 def _validate_and_apply(self, path: Path) -> Result[BaseModel, Exception]:
237 """Load, validate, and apply configuration from a file.
239 Args:
240 path: Path to the changed config file.
242 Returns:
243 ``Ok(model)`` if valid, ``Err`` otherwise.
245 """
246 load_result = _load_file_data(path)
247 match load_result:
248 case Err(error):
249 return Err(error)
250 case Ok(data):
251 validation = validate_config(data, self._config_model)
252 match validation:
253 case Ok(model):
254 return self._handle_validation_success(path, model)
255 case Err(val_error):
256 return self._handle_validation_failure(path, val_error)
258 async def _run(self) -> None:
259 """Execute a single config-check cycle."""
260 changes = self._detect_changes()
261 match changes:
262 case Ok(paths):
263 for path in paths:
264 self._validate_and_apply(path)
265 case Err(error):
266 logger.error("Change detection failed: %s", error)