Coverage for src / taipanstack / utils / logging.py: 100%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Structured logging with context. 

3 

4Provides a configured logger with support for structured output, 

5context propagation, and proper formatting. 

6""" 

7 

8import logging 

9import re 

10import sys 

11from collections.abc import Iterator, MutableMapping 

12from contextlib import AbstractContextManager, contextmanager 

13from datetime import UTC, datetime 

14from functools import lru_cache 

15from typing import Literal 

16 

17from taipanstack.utils.context import get_correlation_id 

18 

19try: 

20 import structlog 

21 

22 HAS_STRUCTLOG = True 

23except ImportError: 

24 HAS_STRUCTLOG = False 

25 

26 

27# Default log format 

28DEFAULT_FORMAT = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s" 

29JSON_FORMAT = ( 

30 '{"timestamp": "%(asctime)s", "level": "%(levelname)s", ' 

31 '"logger": "%(name)s", "message": "%(message)s"}' 

32) 

33 

34SENSITIVE_KEY_PATTERNS: tuple[str, ...] = ( 

35 "password", 

36 "secret", 

37 "token", 

38 "authorization", 

39 "api_key", 

40) 

41 

42# Pre-compile the regex for O(1) checks per key instead of O(N*M) 

43_SENSITIVE_KEY_REGEX = ( 

44 re.compile("|".join(map(re.escape, SENSITIVE_KEY_PATTERNS)), re.IGNORECASE) 

45 if SENSITIVE_KEY_PATTERNS 

46 else None 

47) 

48 

49REDACTED_VALUE = "***REDACTED***" 

50 

51 

52@lru_cache(maxsize=1024) 

53def _is_sensitive(key: object, regex: re.Pattern[str] | None) -> bool: 

54 """Check if a key is sensitive using cached regex matching. 

55 

56 Args: 

57 key: The key to check. 

58 regex: The regex pattern to use for matching. 

59 

60 Returns: 

61 True if the key is a string and sensitive, False otherwise. 

62 

63 """ 

64 if regex is None: 

65 return False 

66 if not isinstance(key, str): 

67 return False 

68 return bool(regex.search(key)) 

69 

70 

71def _redact(obj: object, seen: set[int] | None = None) -> object: 

72 """Redact sensitive data recursively with circular reference protection. 

73 

74 Args: 

75 obj: The object to redact. 

76 seen: Set of IDs of already processed containers. 

77 

78 Returns: 

79 A new object with sensitive data redacted. 

80 

81 """ 

82 if _SENSITIVE_KEY_REGEX is None: 

83 return obj 

84 

85 if seen is None: 

86 seen = set() 

87 

88 # Detect circular references 

89 if id(obj) in seen: 

90 return REDACTED_VALUE 

91 

92 if isinstance(obj, MutableMapping): 

93 seen.add(id(obj)) 

94 # Create a new dict to avoid in-place mutation of shared state 

95 return { 

96 k: ( 

97 REDACTED_VALUE 

98 if _is_sensitive(k, _SENSITIVE_KEY_REGEX) 

99 else _redact(v, seen) 

100 ) 

101 for k, v in obj.items() 

102 } 

103 

104 if isinstance(obj, list): 

105 seen.add(id(obj)) 

106 return [_redact(item, seen) for item in obj] 

107 

108 if isinstance(obj, tuple): 

109 seen.add(id(obj)) 

110 return tuple(_redact(item, seen) for item in obj) 

111 

112 return obj 

113 

114 

115def _redact_dict(d: MutableMapping[str, object]) -> None: 

116 """Redact sensitive keys in a dictionary in-place (top-level only). 

117 

118 Note: Nested structures are replaced with redacted copies to prevent 

119 shared state mutation. 

120 

121 Args: 

122 d: The dictionary to redact. 

123 

124 """ 

125 if _SENSITIVE_KEY_REGEX is None: 

126 return 

127 

128 for key, value in list(d.items()): 

129 if _is_sensitive(key, _SENSITIVE_KEY_REGEX): 

130 d[key] = REDACTED_VALUE 

131 else: 

132 d[key] = _redact(value) 

133 

134 

135def mask_sensitive_data_processor( 

136 _logger: object, 

137 _method: str, 

138 event_dict: MutableMapping[str, object], 

139) -> MutableMapping[str, object]: 

140 """Mask sensitive data in structlog event dictionaries. 

141 

142 Intercept the *event_dict* produced by structlog and replace the 

143 value of any key whose name contains a sensitive substring with 

144 ``"***REDACTED***"``. Matching is case-insensitive. 

145 

146 Args: 

147 _logger: The wrapped logger object (unused, required by structlog). 

148 _method: The name of the log method called (unused, required by structlog). 

149 event_dict: The structured event dictionary. 

150 

151 Returns: 

152 The event dictionary with sensitive values masked. 

153 

154 """ 

155 _redact_dict(event_dict) 

156 return event_dict 

157 

158 

159def correlation_id_processor( 

160 _logger: object, 

161 _method: str, 

162 event_dict: MutableMapping[str, object], 

163) -> MutableMapping[str, object]: 

164 """Structlog processor to inject correlation ID into events. 

165 

166 Args: 

167 _logger: The wrapped logger object. 

168 _method: The name of the log method called. 

169 event_dict: The structured event dictionary. 

170 

171 Returns: 

172 The event dictionary with correlation_id injected if sets. 

173 

174 """ 

175 correlation_id = get_correlation_id() 

176 if correlation_id is not None: 

177 event_dict["correlation_id"] = correlation_id 

178 return event_dict 

179 

180 

181class StackLogger: 

182 """Enhanced logger with context support. 

183 

184 Provides a wrapper around standard logging with additional features 

185 like context propagation and structured output support. 

186 

187 Attributes: 

188 name: Logger name. 

189 level: Current log level. 

190 

191 """ 

192 

193 def __init__( 

194 self, 

195 name: str = "stack", 

196 level: str = "INFO", 

197 *, 

198 use_structured: bool = False, 

199 ) -> None: 

200 """Initialize the logger. 

201 

202 Args: 

203 name: Logger name. 

204 level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). 

205 use_structured: Use structured logging if structlog is available. 

206 

207 """ 

208 self.name = name 

209 self.level = level 

210 self._context: dict[str, object] = {} 

211 

212 if use_structured and HAS_STRUCTLOG: 

213 self._logger = structlog.get_logger(name) 

214 self._structured = True 

215 else: 

216 self._logger = logging.getLogger(name) 

217 self._logger.setLevel(getattr(logging, level.upper())) 

218 self._structured = False 

219 

220 def bind(self, **context: object) -> "StackLogger": 

221 """Add context to logger. 

222 

223 Args: 

224 **context: Key-value pairs to add to context. 

225 

226 Returns: 

227 Self for chaining. 

228 

229 """ 

230 self._context.update(context) 

231 if self._structured and HAS_STRUCTLOG: 

232 self._logger = self._logger.bind(**context) 

233 return self 

234 

235 def unbind(self, *keys: str) -> "StackLogger": 

236 """Remove context keys. 

237 

238 Args: 

239 *keys: Keys to remove from context. 

240 

241 Returns: 

242 Self for chaining. 

243 

244 """ 

245 for key in keys: 

246 self._context.pop(key, None) 

247 if self._structured and HAS_STRUCTLOG: 

248 self._logger = self._logger.unbind(*keys) 

249 return self 

250 

251 def _format_message(self, message: str, **kwargs: object) -> str: 

252 """Format message with context. 

253 

254 Args: 

255 message: The log message. 

256 **kwargs: Additional context for this message. 

257 

258 Returns: 

259 Formatted message string. 

260 

261 """ 

262 if not kwargs and not self._context: 

263 return message 

264 

265 context = {**self._context, **kwargs} 

266 _redact_dict(context) 

267 

268 context_str = " ".join(f"{k}={v}" for k, v in context.items()) 

269 return f"{message} | {context_str}" 

270 

271 def debug(self, message: str, **kwargs: object) -> None: 

272 """Log a debug message. 

273 

274 Args: 

275 message: The message to log. 

276 **kwargs: Additional context. 

277 

278 """ 

279 if self._structured: 

280 self._logger.debug(message, **kwargs) 

281 else: 

282 self._logger.debug(self._format_message(message, **kwargs)) 

283 

284 def info(self, message: str, **kwargs: object) -> None: 

285 """Log an info message. 

286 

287 Args: 

288 message: The message to log. 

289 **kwargs: Additional context. 

290 

291 """ 

292 if self._structured: 

293 self._logger.info(message, **kwargs) 

294 else: 

295 self._logger.info(self._format_message(message, **kwargs)) 

296 

297 def warning(self, message: str, **kwargs: object) -> None: 

298 """Log a warning message. 

299 

300 Args: 

301 message: The message to log. 

302 **kwargs: Additional context. 

303 

304 """ 

305 if self._structured: 

306 self._logger.warning(message, **kwargs) 

307 else: 

308 self._logger.warning(self._format_message(message, **kwargs)) 

309 

310 def error(self, message: str, **kwargs: object) -> None: 

311 """Log an error message. 

312 

313 Args: 

314 message: The message to log. 

315 **kwargs: Additional context. 

316 

317 """ 

318 if self._structured: 

319 self._logger.error(message, **kwargs) 

320 else: 

321 self._logger.error(self._format_message(message, **kwargs)) 

322 

323 def critical(self, message: str, **kwargs: object) -> None: 

324 """Log a critical message. 

325 

326 Args: 

327 message: The message to log. 

328 **kwargs: Additional context. 

329 

330 """ 

331 if self._structured: 

332 self._logger.critical(message, **kwargs) 

333 else: 

334 self._logger.critical(self._format_message(message, **kwargs)) 

335 

336 def exception(self, message: str, **kwargs: object) -> None: 

337 """Log an exception with traceback. 

338 

339 Args: 

340 message: The message to log. 

341 **kwargs: Additional context. 

342 

343 """ 

344 if self._structured: 

345 self._logger.exception(message, **kwargs) 

346 else: 

347 self._logger.exception(self._format_message(message, **kwargs)) 

348 

349 

350def _configure_structlog() -> None: 

351 """Configure structlog with default processors and settings.""" 

352 structlog.configure( 

353 processors=[ 

354 structlog.stdlib.filter_by_level, 

355 structlog.stdlib.add_logger_name, 

356 structlog.stdlib.add_log_level, 

357 structlog.processors.TimeStamper(fmt="iso"), 

358 correlation_id_processor, 

359 mask_sensitive_data_processor, 

360 structlog.processors.StackInfoRenderer(), 

361 structlog.processors.format_exc_info, 

362 structlog.processors.UnicodeDecoder(), 

363 structlog.processors.JSONRenderer(), 

364 ], 

365 wrapper_class=structlog.stdlib.BoundLogger, 

366 context_class=dict, 

367 logger_factory=structlog.stdlib.LoggerFactory(), 

368 ) 

369 

370 

371def _get_log_format(format_type: str) -> str: 

372 """Get the appropriate log format string.""" 

373 match format_type: 

374 case "simple": 

375 return "%(levelname)s: %(message)s" 

376 case "json": 

377 return JSON_FORMAT 

378 case _: 

379 return DEFAULT_FORMAT 

380 

381 

382def setup_logging( 

383 level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO", 

384 *, 

385 format_type: Literal["simple", "detailed", "json"] = "detailed", 

386 log_file: str | None = None, 

387 use_structured: bool = False, 

388) -> None: 

389 """Configure the root logger. 

390 

391 Args: 

392 level: Log level to set. 

393 format_type: Output format type. 

394 log_file: Optional file to log to. 

395 use_structured: Use structlog if available. 

396 

397 """ 

398 # Configure structlog if available and requested 

399 if use_structured and HAS_STRUCTLOG: 

400 _configure_structlog() 

401 return 

402 

403 log_format = _get_log_format(format_type) 

404 

405 handlers: list[logging.Handler] = [logging.StreamHandler(sys.stdout)] 

406 

407 if log_file: 

408 handlers.append(logging.FileHandler(log_file, encoding="utf-8")) 

409 

410 logging.basicConfig( 

411 level=getattr(logging, level.upper()), 

412 format=log_format, 

413 handlers=handlers, 

414 force=True, 

415 ) 

416 

417 

418def get_logger( 

419 name: str = "stack", 

420 *, 

421 level: str = "INFO", 

422 use_structured: bool = False, 

423) -> StackLogger: 

424 """Get a configured logger instance. 

425 

426 Args: 

427 name: Logger name. 

428 level: Log level. 

429 use_structured: Use structlog if available. 

430 

431 Returns: 

432 Configured StackLogger instance. 

433 

434 Example: 

435 >>> logger = get_logger("my_module") 

436 >>> logger.bind(request_id="123").info("Processing request") 

437 

438 """ 

439 return StackLogger(name, level, use_structured=use_structured) 

440 

441 

442def log_operation( 

443 operation: str, 

444 *, 

445 logger: StackLogger | None = None, 

446 level: str = "INFO", 

447 expected_exceptions: tuple[type[Exception], ...] | type[Exception] = Exception, 

448) -> AbstractContextManager[StackLogger]: 

449 """Context manager for logging operations. 

450 

451 Args: 

452 operation: Name of the operation. 

453 logger: Logger to use (creates one if not provided). 

454 level: Log level for messages. 

455 expected_exceptions: Exceptions to catch and log as failures. 

456 

457 Yields: 

458 The logger instance. 

459 

460 Example: 

461 >>> with log_operation("setup") as logger: 

462 ... logger.info("Setting up environment") 

463 

464 """ 

465 

466 @contextmanager 

467 def _log_context() -> Iterator[StackLogger]: 

468 nonlocal logger 

469 if logger is None: 

470 logger = get_logger() 

471 

472 start_time = datetime.now(UTC) 

473 logger.bind(operation=operation) 

474 

475 log_method = getattr(logger, level.lower()) 

476 log_method(f"Starting: {operation}") 

477 

478 try: 

479 yield logger 

480 duration = (datetime.now(UTC) - start_time).total_seconds() 

481 log_method(f"Completed: {operation}", duration_seconds=duration) 

482 except expected_exceptions as e: 

483 duration = (datetime.now(UTC) - start_time).total_seconds() 

484 logger.exception( 

485 f"Failed: {operation}", 

486 duration_seconds=duration, 

487 error=str(e), 

488 ) 

489 raise 

490 finally: 

491 logger.unbind("operation") 

492 

493 return _log_context()