Coverage for src / taipanstack / security / models.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 14:54 +0000

1"""Secure base models.""" 

2 

3import json 

4import re 

5from collections.abc import Iterator 

6from typing import Any, TypeAlias 

7 

8from pydantic import BaseModel, ConfigDict 

9 

10from taipanstack.utils.logging import REDACTED_VALUE, SENSITIVE_KEY_PATTERNS 

11 

12JSONValue: TypeAlias = ( 

13 dict[str, "JSONValue"] | list["JSONValue"] | str | int | float | bool | None 

14) 

15 

16__all__ = ["SecureBaseModel"] 

17 

18_SENSITIVE_KEY_REGEX = ( 

19 re.compile("|".join(map(re.escape, SENSITIVE_KEY_PATTERNS)), re.IGNORECASE) 

20 if SENSITIVE_KEY_PATTERNS 

21 else None 

22) 

23 

24_MAX_RECURSION_DEPTH = 100 

25 

26 

27def _mask_data(data: JSONValue, _depth: int = 0) -> JSONValue: 

28 """Recursively mask sensitive keys in data.""" 

29 if _SENSITIVE_KEY_REGEX is None: 

30 return data 

31 

32 # Prevent ReDoS or stack overflow on deeply nested payloads 

33 if _depth > _MAX_RECURSION_DEPTH: 

34 return "<MAX_DEPTH_REACHED>" 

35 

36 if isinstance(data, dict): 

37 masked: dict[str, JSONValue] = {} 

38 for k, v in data.items(): 

39 if isinstance(k, str) and _SENSITIVE_KEY_REGEX.search(k): 

40 masked[k] = REDACTED_VALUE 

41 else: 

42 masked[k] = _mask_data(v, _depth + 1) 

43 return masked 

44 if isinstance(data, list): 

45 return [_mask_data(item, _depth + 1) for item in data] 

46 return data 

47 

48 

49class SecureBaseModel(BaseModel): 

50 """Secure base model that redacts sensitive fields when dumped.""" 

51 

52 model_config = ConfigDict(frozen=True) 

53 

54 def __str__(self) -> str: 

55 """Return a string representation with sensitive fields redacted.""" 

56 return self.__repr__() 

57 

58 def __repr_args__(self) -> Iterator[tuple[str | None, object]]: 

59 """Provide arguments for string representation, redacting sensitive fields.""" 

60 for k, v in super().__repr_args__(): 

61 if ( 

62 isinstance(k, str) 

63 and _SENSITIVE_KEY_REGEX is not None 

64 and _SENSITIVE_KEY_REGEX.search(k) 

65 ): 

66 yield k, REDACTED_VALUE 

67 else: 

68 yield k, v 

69 

70 def model_dump( 

71 self, 

72 **kwargs: Any, 

73 ) -> dict[str, Any]: 

74 """Dump the model to a dictionary, redacting sensitive fields. 

75 

76 Args: 

77 **kwargs: Arguments to pass to Pydantic's model_dump. 

78 

79 Returns: 

80 The redacting dictionary representation of the model. 

81 

82 """ 

83 data = super().model_dump(**kwargs) 

84 return _mask_data(data) # type: ignore[return-value] 

85 

86 def model_dump_json( 

87 self, 

88 **kwargs: Any, 

89 ) -> str: 

90 """Dump the model to a JSON string, redacting sensitive fields. 

91 

92 Args: 

93 **kwargs: Arguments to pass to Pydantic's model_dump. 

94 

95 Returns: 

96 The redacted JSON string representation of the model. 

97 

98 """ 

99 # Extract indent if any, as model_dump does not accept it 

100 indent = kwargs.pop("indent", None) 

101 # Dump to JSON-compatible dict, mask, then serialize 

102 dumped_dict = super().model_dump(mode="json", **kwargs) 

103 masked_dict = _mask_data(dumped_dict) 

104 # We need to respect Pydantic's indent/separators if possible, 

105 # but json.dumps is the safest standard way. 

106 if indent is not None: 

107 return json.dumps(masked_dict, indent=indent) 

108 return json.dumps(masked_dict)