Coverage for src / taipanstack / resilience / watchdogs / config_watcher.py: 100%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Configuration watcher — detects file changes and hot-reloads config. 

3 

4Polls configuration files for modifications using SHA-256 hashes 

5and validates new content via Pydantic before applying changes. 

6""" 

7 

8import hashlib 

9import json 

10import logging 

11from collections.abc import Callable, Sequence 

12from pathlib import Path 

13 

14from pydantic import BaseModel, ValidationError 

15 

16from taipanstack.core.result import Err, Ok, Result 

17from taipanstack.resilience.watchdogs._base import BaseWatcher 

18 

19logger = logging.getLogger("taipanstack.resilience.watchdogs.config") 

20 

21 

22def _hash_file(path: Path) -> Result[str, Exception]: 

23 """Compute the SHA-256 hex digest of a file. 

24 

25 Args: 

26 path: Path to the file. 

27 

28 Returns: 

29 ``Ok(hex_digest)`` on success, ``Err`` on I/O failure. 

30 

31 """ 

32 try: 

33 data = path.read_bytes() 

34 return Ok(hashlib.sha256(data).hexdigest()) 

35 except OSError as exc: 

36 return Err(exc) 

37 

38 

39def _parse_env(text: str) -> dict[str, object]: 

40 """Parse a simple ``.env`` key=value file. 

41 

42 Lines starting with ``#`` or blank lines are skipped. 

43 Surrounding quotes on values are stripped. 

44 

45 Args: 

46 text: Raw text content of the ``.env`` file. 

47 

48 Returns: 

49 Parsed key-value mapping. 

50 

51 """ 

52 result: dict[str, object] = {} 

53 for line in text.splitlines(): 

54 stripped = line.strip() 

55 if not stripped or stripped.startswith("#"): 

56 continue 

57 if "=" not in stripped: 

58 continue 

59 key, _, value = stripped.partition("=") 

60 value = value.strip().strip("\"'") 

61 result[key.strip()] = value 

62 return result 

63 

64 

65def _parse_json(text: str) -> Result[dict[str, object], Exception]: 

66 """Parse JSON text. 

67 

68 Args: 

69 text: Raw JSON string. 

70 

71 Returns: 

72 ``Ok(dict)`` on success, ``Err`` on parse failure. 

73 

74 """ 

75 try: 

76 data = json.loads(text) 

77 if not isinstance(data, dict): 

78 return Err(TypeError(f"Expected JSON object, got {type(data).__name__}")) 

79 return Ok(data) 

80 except (json.JSONDecodeError, ValueError) as exc: 

81 return Err(exc) 

82 

83 

84def _load_file_data(path: Path) -> Result[dict[str, object], Exception]: 

85 """Read and parse a configuration file based on its extension. 

86 

87 Supported extensions: ``.env``, ``.json``. 

88 

89 Args: 

90 path: Path to the config file. 

91 

92 Returns: 

93 ``Ok(dict)`` with parsed data, or ``Err`` on failure. 

94 

95 """ 

96 try: 

97 text = path.read_text(encoding="utf-8") 

98 except OSError as exc: 

99 return Err(exc) 

100 

101 suffix = path.suffix.lower() 

102 if suffix == ".json": 

103 return _parse_json(text) 

104 if suffix == ".env" or path.name == ".env": 

105 return Ok(_parse_env(text)) 

106 

107 return Err(ValueError(f"Unsupported config file extension: {suffix}")) 

108 

109 

110def validate_config( 

111 data: dict[str, object], 

112 model: type[BaseModel], 

113) -> Result[BaseModel, Exception]: 

114 """Validate a data dictionary against a Pydantic model. 

115 

116 Args: 

117 data: Raw configuration data. 

118 model: Pydantic model class to validate against. 

119 

120 Returns: 

121 ``Ok(model_instance)`` on success, ``Err(ValidationError)`` 

122 on failure. 

123 

124 """ 

125 try: 

126 return Ok(model.model_validate(data)) 

127 except ValidationError as exc: 

128 return Err(exc) 

129 

130 

131class ConfigWatcher(BaseWatcher): 

132 """Background watcher that detects configuration file changes. 

133 

134 Polls file hashes at each interval. When a change is detected 

135 the content is validated via the provided Pydantic model and, 

136 if valid, the ``on_config_change`` callback is invoked. 

137 

138 Args: 

139 config_paths: Files to watch. 

140 config_model: Pydantic model for validation. 

141 interval: Seconds between polls. 

142 on_config_change: Callback receiving the validated model. 

143 on_validation_error: Callback receiving the ``Exception`` 

144 when validation fails. 

145 

146 Example: 

147 >>> watcher = ConfigWatcher( 

148 ... config_paths=[Path(".env")], 

149 ... config_model=MySettings, 

150 ... on_config_change=lambda cfg: apply(cfg), 

151 ... ) 

152 >>> await watcher.start() 

153 

154 """ 

155 

156 def __init__( 

157 self, 

158 *, 

159 config_paths: Sequence[Path], 

160 config_model: type[BaseModel], 

161 interval: float = 2.0, 

162 on_config_change: Callable[[BaseModel], None] | None = None, 

163 on_validation_error: Callable[[Exception], None] | None = None, 

164 ) -> None: 

165 """Initialize the config watcher. 

166 

167 Args: 

168 config_paths: Files to watch. 

169 config_model: Pydantic model for validation. 

170 interval: Seconds between polls. 

171 on_config_change: Callback for valid config changes. 

172 on_validation_error: Callback for validation failures. 

173 

174 """ 

175 super().__init__(interval=interval) 

176 self._config_paths = list(config_paths) 

177 self._config_model = config_model 

178 self._on_config_change = on_config_change 

179 self._on_validation_error = on_validation_error 

180 self._file_hashes: dict[Path, str] = {} 

181 

182 def _process_hash_result( 

183 self, path: Path, current_hash: str, changed: list[Path] 

184 ) -> None: 

185 """Process successful hash result and update changed list.""" 

186 previous = self._file_hashes.get(path) 

187 if previous is None: 

188 # First time seeing this file — record hash 

189 self._file_hashes[path] = current_hash 

190 elif current_hash != previous: 

191 self._file_hashes[path] = current_hash 

192 changed.append(path) 

193 

194 def _detect_changes(self) -> Result[list[Path], Exception]: 

195 """Detect which watched files have changed since last check. 

196 

197 Returns: 

198 ``Ok(list[Path])`` of changed file paths. 

199 

200 """ 

201 changed: list[Path] = [] 

202 for path in self._config_paths: 

203 hash_result = _hash_file(path) 

204 match hash_result: 

205 case Ok(current_hash): 

206 self._process_hash_result(path, current_hash, changed) 

207 case Err(error): 

208 logger.warning("Cannot hash %s: %s", path, error) 

209 return Ok(changed) 

210 

211 def _handle_validation_success( 

212 self, path: Path, model: BaseModel 

213 ) -> Result[BaseModel, Exception]: 

214 """Handle successful validation.""" 

215 logger.info( 

216 "Config hot-reloaded from %s", 

217 path, 

218 ) 

219 if self._on_config_change is not None: 

220 self._on_config_change(model) 

221 return Ok(model) 

222 

223 def _handle_validation_failure( 

224 self, path: Path, val_error: Exception 

225 ) -> Result[BaseModel, Exception]: 

226 """Handle validation failure.""" 

227 logger.error( 

228 "Config validation failed for %s: %s", 

229 path, 

230 val_error, 

231 ) 

232 if self._on_validation_error is not None: 

233 self._on_validation_error(val_error) 

234 return Err(val_error) 

235 

236 def _validate_and_apply(self, path: Path) -> Result[BaseModel, Exception]: 

237 """Load, validate, and apply configuration from a file. 

238 

239 Args: 

240 path: Path to the changed config file. 

241 

242 Returns: 

243 ``Ok(model)`` if valid, ``Err`` otherwise. 

244 

245 """ 

246 load_result = _load_file_data(path) 

247 match load_result: 

248 case Err(error): 

249 return Err(error) 

250 case Ok(data): 

251 validation = validate_config(data, self._config_model) 

252 match validation: 

253 case Ok(model): 

254 return self._handle_validation_success(path, model) 

255 case Err(val_error): 

256 return self._handle_validation_failure(path, val_error) 

257 

258 async def _run(self) -> None: 

259 """Execute a single config-check cycle.""" 

260 changes = self._detect_changes() 

261 match changes: 

262 case Ok(paths): 

263 for path in paths: 

264 self._validate_and_apply(path) 

265 case Err(error): 

266 logger.error("Change detection failed: %s", error)