Coverage for src / taipanstack / utils / logging.py: 100%
151 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Structured logging with context.
4Provides a configured logger with support for structured output,
5context propagation, and proper formatting.
6"""
8import logging
9import re
10import sys
11from collections.abc import Iterator, MutableMapping
12from contextlib import AbstractContextManager, contextmanager
13from datetime import UTC, datetime
14from functools import lru_cache
15from typing import Literal
17from taipanstack.utils.context import get_correlation_id
19try:
20 import structlog
22 HAS_STRUCTLOG = True
23except ImportError:
24 HAS_STRUCTLOG = False
27# Default log format
28DEFAULT_FORMAT = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
29JSON_FORMAT = (
30 '{"timestamp": "%(asctime)s", "level": "%(levelname)s", '
31 '"logger": "%(name)s", "message": "%(message)s"}'
32)
34SENSITIVE_KEY_PATTERNS: tuple[str, ...] = (
35 "password",
36 "secret",
37 "token",
38 "authorization",
39 "api_key",
40)
42# Pre-compile the regex for O(1) checks per key instead of O(N*M)
43_SENSITIVE_KEY_REGEX = (
44 re.compile("|".join(map(re.escape, SENSITIVE_KEY_PATTERNS)), re.IGNORECASE)
45 if SENSITIVE_KEY_PATTERNS
46 else None
47)
49REDACTED_VALUE = "***REDACTED***"
52@lru_cache(maxsize=1024)
53def _is_sensitive(key: object, regex: re.Pattern[str] | None) -> bool:
54 """Check if a key is sensitive using cached regex matching.
56 Args:
57 key: The key to check.
58 regex: The regex pattern to use for matching.
60 Returns:
61 True if the key is a string and sensitive, False otherwise.
63 """
64 if regex is None:
65 return False
66 if not isinstance(key, str):
67 return False
68 return bool(regex.search(key))
71def _redact(obj: object, seen: set[int] | None = None) -> object:
72 """Redact sensitive data recursively with circular reference protection.
74 Args:
75 obj: The object to redact.
76 seen: Set of IDs of already processed containers.
78 Returns:
79 A new object with sensitive data redacted.
81 """
82 if _SENSITIVE_KEY_REGEX is None:
83 return obj
85 if seen is None:
86 seen = set()
88 # Detect circular references
89 if id(obj) in seen:
90 return REDACTED_VALUE
92 if isinstance(obj, MutableMapping):
93 seen.add(id(obj))
94 # Create a new dict to avoid in-place mutation of shared state
95 return {
96 k: (
97 REDACTED_VALUE
98 if _is_sensitive(k, _SENSITIVE_KEY_REGEX)
99 else _redact(v, seen)
100 )
101 for k, v in obj.items()
102 }
104 if isinstance(obj, list):
105 seen.add(id(obj))
106 return [_redact(item, seen) for item in obj]
108 if isinstance(obj, tuple):
109 seen.add(id(obj))
110 return tuple(_redact(item, seen) for item in obj)
112 return obj
115def _redact_dict(d: MutableMapping[str, object]) -> None:
116 """Redact sensitive keys in a dictionary in-place (top-level only).
118 Note: Nested structures are replaced with redacted copies to prevent
119 shared state mutation.
121 Args:
122 d: The dictionary to redact.
124 """
125 if _SENSITIVE_KEY_REGEX is None:
126 return
128 for key, value in list(d.items()):
129 if _is_sensitive(key, _SENSITIVE_KEY_REGEX):
130 d[key] = REDACTED_VALUE
131 else:
132 d[key] = _redact(value)
135def mask_sensitive_data_processor(
136 _logger: object,
137 _method: str,
138 event_dict: MutableMapping[str, object],
139) -> MutableMapping[str, object]:
140 """Mask sensitive data in structlog event dictionaries.
142 Intercept the *event_dict* produced by structlog and replace the
143 value of any key whose name contains a sensitive substring with
144 ``"***REDACTED***"``. Matching is case-insensitive.
146 Args:
147 _logger: The wrapped logger object (unused, required by structlog).
148 _method: The name of the log method called (unused, required by structlog).
149 event_dict: The structured event dictionary.
151 Returns:
152 The event dictionary with sensitive values masked.
154 """
155 _redact_dict(event_dict)
156 return event_dict
159def correlation_id_processor(
160 _logger: object,
161 _method: str,
162 event_dict: MutableMapping[str, object],
163) -> MutableMapping[str, object]:
164 """Structlog processor to inject correlation ID into events.
166 Args:
167 _logger: The wrapped logger object.
168 _method: The name of the log method called.
169 event_dict: The structured event dictionary.
171 Returns:
172 The event dictionary with correlation_id injected if sets.
174 """
175 correlation_id = get_correlation_id()
176 if correlation_id is not None:
177 event_dict["correlation_id"] = correlation_id
178 return event_dict
181class StackLogger:
182 """Enhanced logger with context support.
184 Provides a wrapper around standard logging with additional features
185 like context propagation and structured output support.
187 Attributes:
188 name: Logger name.
189 level: Current log level.
191 """
193 def __init__(
194 self,
195 name: str = "stack",
196 level: str = "INFO",
197 *,
198 use_structured: bool = False,
199 ) -> None:
200 """Initialize the logger.
202 Args:
203 name: Logger name.
204 level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
205 use_structured: Use structured logging if structlog is available.
207 """
208 self.name = name
209 self.level = level
210 self._context: dict[str, object] = {}
212 if use_structured and HAS_STRUCTLOG:
213 self._logger = structlog.get_logger(name)
214 self._structured = True
215 else:
216 self._logger = logging.getLogger(name)
217 self._logger.setLevel(getattr(logging, level.upper()))
218 self._structured = False
220 def bind(self, **context: object) -> "StackLogger":
221 """Add context to logger.
223 Args:
224 **context: Key-value pairs to add to context.
226 Returns:
227 Self for chaining.
229 """
230 self._context.update(context)
231 if self._structured and HAS_STRUCTLOG:
232 self._logger = self._logger.bind(**context)
233 return self
235 def unbind(self, *keys: str) -> "StackLogger":
236 """Remove context keys.
238 Args:
239 *keys: Keys to remove from context.
241 Returns:
242 Self for chaining.
244 """
245 for key in keys:
246 self._context.pop(key, None)
247 if self._structured and HAS_STRUCTLOG:
248 self._logger = self._logger.unbind(*keys)
249 return self
251 def _format_message(self, message: str, **kwargs: object) -> str:
252 """Format message with context.
254 Args:
255 message: The log message.
256 **kwargs: Additional context for this message.
258 Returns:
259 Formatted message string.
261 """
262 if not kwargs and not self._context:
263 return message
265 context = {**self._context, **kwargs}
266 _redact_dict(context)
268 context_str = " ".join(f"{k}={v}" for k, v in context.items())
269 return f"{message} | {context_str}"
271 def debug(self, message: str, **kwargs: object) -> None:
272 """Log a debug message.
274 Args:
275 message: The message to log.
276 **kwargs: Additional context.
278 """
279 if self._structured:
280 self._logger.debug(message, **kwargs)
281 else:
282 self._logger.debug(self._format_message(message, **kwargs))
284 def info(self, message: str, **kwargs: object) -> None:
285 """Log an info message.
287 Args:
288 message: The message to log.
289 **kwargs: Additional context.
291 """
292 if self._structured:
293 self._logger.info(message, **kwargs)
294 else:
295 self._logger.info(self._format_message(message, **kwargs))
297 def warning(self, message: str, **kwargs: object) -> None:
298 """Log a warning message.
300 Args:
301 message: The message to log.
302 **kwargs: Additional context.
304 """
305 if self._structured:
306 self._logger.warning(message, **kwargs)
307 else:
308 self._logger.warning(self._format_message(message, **kwargs))
310 def error(self, message: str, **kwargs: object) -> None:
311 """Log an error message.
313 Args:
314 message: The message to log.
315 **kwargs: Additional context.
317 """
318 if self._structured:
319 self._logger.error(message, **kwargs)
320 else:
321 self._logger.error(self._format_message(message, **kwargs))
323 def critical(self, message: str, **kwargs: object) -> None:
324 """Log a critical message.
326 Args:
327 message: The message to log.
328 **kwargs: Additional context.
330 """
331 if self._structured:
332 self._logger.critical(message, **kwargs)
333 else:
334 self._logger.critical(self._format_message(message, **kwargs))
336 def exception(self, message: str, **kwargs: object) -> None:
337 """Log an exception with traceback.
339 Args:
340 message: The message to log.
341 **kwargs: Additional context.
343 """
344 if self._structured:
345 self._logger.exception(message, **kwargs)
346 else:
347 self._logger.exception(self._format_message(message, **kwargs))
350def _configure_structlog() -> None:
351 """Configure structlog with default processors and settings."""
352 structlog.configure(
353 processors=[
354 structlog.stdlib.filter_by_level,
355 structlog.stdlib.add_logger_name,
356 structlog.stdlib.add_log_level,
357 structlog.processors.TimeStamper(fmt="iso"),
358 correlation_id_processor,
359 mask_sensitive_data_processor,
360 structlog.processors.StackInfoRenderer(),
361 structlog.processors.format_exc_info,
362 structlog.processors.UnicodeDecoder(),
363 structlog.processors.JSONRenderer(),
364 ],
365 wrapper_class=structlog.stdlib.BoundLogger,
366 context_class=dict,
367 logger_factory=structlog.stdlib.LoggerFactory(),
368 )
371def _get_log_format(format_type: str) -> str:
372 """Get the appropriate log format string."""
373 match format_type:
374 case "simple":
375 return "%(levelname)s: %(message)s"
376 case "json":
377 return JSON_FORMAT
378 case _:
379 return DEFAULT_FORMAT
382def setup_logging(
383 level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
384 *,
385 format_type: Literal["simple", "detailed", "json"] = "detailed",
386 log_file: str | None = None,
387 use_structured: bool = False,
388) -> None:
389 """Configure the root logger.
391 Args:
392 level: Log level to set.
393 format_type: Output format type.
394 log_file: Optional file to log to.
395 use_structured: Use structlog if available.
397 """
398 # Configure structlog if available and requested
399 if use_structured and HAS_STRUCTLOG:
400 _configure_structlog()
401 return
403 log_format = _get_log_format(format_type)
405 handlers: list[logging.Handler] = [logging.StreamHandler(sys.stdout)]
407 if log_file:
408 handlers.append(logging.FileHandler(log_file, encoding="utf-8"))
410 logging.basicConfig(
411 level=getattr(logging, level.upper()),
412 format=log_format,
413 handlers=handlers,
414 force=True,
415 )
418def get_logger(
419 name: str = "stack",
420 *,
421 level: str = "INFO",
422 use_structured: bool = False,
423) -> StackLogger:
424 """Get a configured logger instance.
426 Args:
427 name: Logger name.
428 level: Log level.
429 use_structured: Use structlog if available.
431 Returns:
432 Configured StackLogger instance.
434 Example:
435 >>> logger = get_logger("my_module")
436 >>> logger.bind(request_id="123").info("Processing request")
438 """
439 return StackLogger(name, level, use_structured=use_structured)
442def log_operation(
443 operation: str,
444 *,
445 logger: StackLogger | None = None,
446 level: str = "INFO",
447 expected_exceptions: tuple[type[Exception], ...] | type[Exception] = Exception,
448) -> AbstractContextManager[StackLogger]:
449 """Context manager for logging operations.
451 Args:
452 operation: Name of the operation.
453 logger: Logger to use (creates one if not provided).
454 level: Log level for messages.
455 expected_exceptions: Exceptions to catch and log as failures.
457 Yields:
458 The logger instance.
460 Example:
461 >>> with log_operation("setup") as logger:
462 ... logger.info("Setting up environment")
464 """
466 @contextmanager
467 def _log_context() -> Iterator[StackLogger]:
468 nonlocal logger
469 if logger is None:
470 logger = get_logger()
472 start_time = datetime.now(UTC)
473 logger.bind(operation=operation)
475 log_method = getattr(logger, level.lower())
476 log_method(f"Starting: {operation}")
478 try:
479 yield logger
480 duration = (datetime.now(UTC) - start_time).total_seconds()
481 log_method(f"Completed: {operation}", duration_seconds=duration)
482 except expected_exceptions as e:
483 duration = (datetime.now(UTC) - start_time).total_seconds()
484 logger.exception(
485 f"Failed: {operation}",
486 duration_seconds=duration,
487 error=str(e),
488 )
489 raise
490 finally:
491 logger.unbind("operation")
493 return _log_context()