Coverage for src / taipanstack / security / models.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1"""Secure base models.""" 

2 

3import json 

4import re 

5from collections.abc import Callable, Iterator 

6from typing import TYPE_CHECKING, Literal, TypeAlias, cast 

7 

8from pydantic import BaseModel, ConfigDict 

9 

10if TYPE_CHECKING: 

11 from pydantic.main import IncEx 

12else: 

13 IncEx: TypeAlias = set[int] | set[str] | dict[int, object] | dict[str, object] 

14 

15from taipanstack.utils.logging import REDACTED_VALUE, SENSITIVE_KEY_PATTERNS 

16 

17JSONValue: TypeAlias = ( 

18 dict[str, "JSONValue"] | list["JSONValue"] | str | int | float | bool | None 

19) 

20 

21__all__ = ["SecureBaseModel"] 

22 

23_SENSITIVE_KEY_REGEX = ( 

24 re.compile("|".join(map(re.escape, SENSITIVE_KEY_PATTERNS)), re.IGNORECASE) 

25 if SENSITIVE_KEY_PATTERNS 

26 else None 

27) 

28 

29_MAX_RECURSION_DEPTH = 100 

30 

31 

32def _mask_dict(data: dict[str, JSONValue], depth: int) -> dict[str, JSONValue]: 

33 """Mask sensitive keys in a dictionary.""" 

34 masked: dict[str, JSONValue] = {} 

35 for k, v in data.items(): 

36 if ( 

37 isinstance(k, str) 

38 and _SENSITIVE_KEY_REGEX is not None 

39 and _SENSITIVE_KEY_REGEX.search(k) 

40 ): 

41 masked[k] = REDACTED_VALUE 

42 else: 

43 masked[k] = _mask_data(v, depth) 

44 return masked 

45 

46 

47def _mask_list(data: list[JSONValue], depth: int) -> list[JSONValue]: 

48 """Mask sensitive keys in a list.""" 

49 return [_mask_data(item, depth) for item in data] 

50 

51 

52def _mask_data(data: JSONValue, _depth: int = 0) -> JSONValue: 

53 """Recursively mask sensitive keys in data.""" 

54 if _SENSITIVE_KEY_REGEX is None: 

55 return data 

56 

57 # Prevent ReDoS or stack overflow on deeply nested payloads 

58 if _depth > _MAX_RECURSION_DEPTH: 

59 return "<MAX_DEPTH_REACHED>" 

60 

61 if isinstance(data, dict): 

62 return _mask_dict(data, _depth + 1) 

63 if isinstance(data, list): 

64 return _mask_list(data, _depth + 1) 

65 return data 

66 

67 

68class SecureBaseModel(BaseModel): 

69 """Secure base model that redacts sensitive fields when dumped.""" 

70 

71 model_config = ConfigDict(frozen=True) 

72 

73 def __str__(self) -> str: 

74 """Return a string representation with sensitive fields redacted.""" 

75 return self.__repr__() 

76 

77 def __repr_args__(self) -> Iterator[tuple[str | None, object]]: 

78 """Provide arguments for string representation, redacting sensitive fields.""" 

79 for k, v in super().__repr_args__(): 

80 if ( 

81 isinstance(k, str) 

82 and _SENSITIVE_KEY_REGEX is not None 

83 and _SENSITIVE_KEY_REGEX.search(k) 

84 ): 

85 yield k, REDACTED_VALUE 

86 else: 

87 yield k, v 

88 

89 def model_dump( # noqa: PLR0913 

90 self, 

91 *, 

92 mode: Literal["json", "python"] | str = "python", 

93 include: IncEx | None = None, 

94 exclude: IncEx | None = None, 

95 context: dict[str, object] | None = None, 

96 by_alias: bool | None = None, 

97 exclude_unset: bool = False, 

98 exclude_defaults: bool = False, 

99 exclude_none: bool = False, 

100 exclude_computed_fields: bool = False, 

101 round_trip: bool = False, 

102 warnings: bool | Literal["none", "warn", "error"] = True, 

103 fallback: Callable[[object], object] | None = None, 

104 serialize_as_any: bool = False, 

105 polymorphic_serialization: bool | None = None, 

106 ) -> dict[str, object]: 

107 """Dump the model to a dictionary, redacting sensitive fields. 

108 

109 Returns: 

110 The redacting dictionary representation of the model. 

111 

112 """ 

113 data = super().model_dump( 

114 mode=mode, 

115 include=include, 

116 exclude=exclude, 

117 context=context, 

118 by_alias=by_alias, 

119 exclude_unset=exclude_unset, 

120 exclude_defaults=exclude_defaults, 

121 exclude_none=exclude_none, 

122 exclude_computed_fields=exclude_computed_fields, 

123 round_trip=round_trip, 

124 warnings=warnings, 

125 fallback=fallback, 

126 serialize_as_any=serialize_as_any, 

127 polymorphic_serialization=polymorphic_serialization, 

128 ) 

129 return cast(dict[str, object], _mask_data(data)) 

130 

131 def model_dump_json( # noqa: PLR0913 

132 self, 

133 *, 

134 indent: int | None = None, 

135 ensure_ascii: bool = False, 

136 include: IncEx | None = None, 

137 exclude: IncEx | None = None, 

138 context: dict[str, object] | None = None, 

139 by_alias: bool | None = None, 

140 exclude_unset: bool = False, 

141 exclude_defaults: bool = False, 

142 exclude_none: bool = False, 

143 exclude_computed_fields: bool = False, 

144 round_trip: bool = False, 

145 warnings: bool | Literal["none", "warn", "error"] = True, 

146 fallback: Callable[[object], object] | None = None, 

147 serialize_as_any: bool = False, 

148 polymorphic_serialization: bool | None = None, 

149 ) -> str: 

150 """Dump the model to a JSON string, redacting sensitive fields. 

151 

152 Returns: 

153 The redacted JSON string representation of the model. 

154 

155 """ 

156 # Extract indent if any, as model_dump does not accept it 

157 

158 # Dump to JSON-compatible dict, mask, then serialize 

159 dumped_dict = super().model_dump( 

160 mode="json", 

161 include=include, 

162 exclude=exclude, 

163 context=context, 

164 by_alias=by_alias, 

165 exclude_unset=exclude_unset, 

166 exclude_defaults=exclude_defaults, 

167 exclude_none=exclude_none, 

168 exclude_computed_fields=exclude_computed_fields, 

169 round_trip=round_trip, 

170 warnings=warnings, 

171 fallback=fallback, 

172 serialize_as_any=serialize_as_any, 

173 polymorphic_serialization=polymorphic_serialization, 

174 ) 

175 masked_dict = _mask_data(dumped_dict) 

176 # We need to respect Pydantic's indent/separators if possible, 

177 # but json.dumps is the safest standard way. 

178 if indent is not None: 

179 return json.dumps(masked_dict, indent=indent, ensure_ascii=ensure_ascii) 

180 return json.dumps(masked_dict, ensure_ascii=ensure_ascii)