Coverage for src / taipanstack / security / models.py: 100%
48 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""Secure base models."""
3import json
4import re
5from collections.abc import Callable, Iterator
6from typing import TYPE_CHECKING, Literal, TypeAlias, cast
8from pydantic import BaseModel, ConfigDict
10if TYPE_CHECKING:
11 from pydantic.main import IncEx
12else:
13 IncEx: TypeAlias = set[int] | set[str] | dict[int, object] | dict[str, object]
15from taipanstack.utils.logging import REDACTED_VALUE, SENSITIVE_KEY_PATTERNS
17JSONValue: TypeAlias = (
18 dict[str, "JSONValue"] | list["JSONValue"] | str | int | float | bool | None
19)
21__all__ = ["SecureBaseModel"]
23_SENSITIVE_KEY_REGEX = (
24 re.compile("|".join(map(re.escape, SENSITIVE_KEY_PATTERNS)), re.IGNORECASE)
25 if SENSITIVE_KEY_PATTERNS
26 else None
27)
29_MAX_RECURSION_DEPTH = 100
32def _mask_dict(data: dict[str, JSONValue], depth: int) -> dict[str, JSONValue]:
33 """Mask sensitive keys in a dictionary."""
34 masked: dict[str, JSONValue] = {}
35 for k, v in data.items():
36 if (
37 isinstance(k, str)
38 and _SENSITIVE_KEY_REGEX is not None
39 and _SENSITIVE_KEY_REGEX.search(k)
40 ):
41 masked[k] = REDACTED_VALUE
42 else:
43 masked[k] = _mask_data(v, depth)
44 return masked
47def _mask_list(data: list[JSONValue], depth: int) -> list[JSONValue]:
48 """Mask sensitive keys in a list."""
49 return [_mask_data(item, depth) for item in data]
52def _mask_data(data: JSONValue, _depth: int = 0) -> JSONValue:
53 """Recursively mask sensitive keys in data."""
54 if _SENSITIVE_KEY_REGEX is None:
55 return data
57 # Prevent ReDoS or stack overflow on deeply nested payloads
58 if _depth > _MAX_RECURSION_DEPTH:
59 return "<MAX_DEPTH_REACHED>"
61 if isinstance(data, dict):
62 return _mask_dict(data, _depth + 1)
63 if isinstance(data, list):
64 return _mask_list(data, _depth + 1)
65 return data
68class SecureBaseModel(BaseModel):
69 """Secure base model that redacts sensitive fields when dumped."""
71 model_config = ConfigDict(frozen=True)
73 def __str__(self) -> str:
74 """Return a string representation with sensitive fields redacted."""
75 return self.__repr__()
77 def __repr_args__(self) -> Iterator[tuple[str | None, object]]:
78 """Provide arguments for string representation, redacting sensitive fields."""
79 for k, v in super().__repr_args__():
80 if (
81 isinstance(k, str)
82 and _SENSITIVE_KEY_REGEX is not None
83 and _SENSITIVE_KEY_REGEX.search(k)
84 ):
85 yield k, REDACTED_VALUE
86 else:
87 yield k, v
89 def model_dump( # noqa: PLR0913
90 self,
91 *,
92 mode: Literal["json", "python"] | str = "python",
93 include: IncEx | None = None,
94 exclude: IncEx | None = None,
95 context: dict[str, object] | None = None,
96 by_alias: bool | None = None,
97 exclude_unset: bool = False,
98 exclude_defaults: bool = False,
99 exclude_none: bool = False,
100 exclude_computed_fields: bool = False,
101 round_trip: bool = False,
102 warnings: bool | Literal["none", "warn", "error"] = True,
103 fallback: Callable[[object], object] | None = None,
104 serialize_as_any: bool = False,
105 polymorphic_serialization: bool | None = None,
106 ) -> dict[str, object]:
107 """Dump the model to a dictionary, redacting sensitive fields.
109 Returns:
110 The redacting dictionary representation of the model.
112 """
113 data = super().model_dump(
114 mode=mode,
115 include=include,
116 exclude=exclude,
117 context=context,
118 by_alias=by_alias,
119 exclude_unset=exclude_unset,
120 exclude_defaults=exclude_defaults,
121 exclude_none=exclude_none,
122 exclude_computed_fields=exclude_computed_fields,
123 round_trip=round_trip,
124 warnings=warnings,
125 fallback=fallback,
126 serialize_as_any=serialize_as_any,
127 polymorphic_serialization=polymorphic_serialization,
128 )
129 return cast(dict[str, object], _mask_data(data))
131 def model_dump_json( # noqa: PLR0913
132 self,
133 *,
134 indent: int | None = None,
135 ensure_ascii: bool = False,
136 include: IncEx | None = None,
137 exclude: IncEx | None = None,
138 context: dict[str, object] | None = None,
139 by_alias: bool | None = None,
140 exclude_unset: bool = False,
141 exclude_defaults: bool = False,
142 exclude_none: bool = False,
143 exclude_computed_fields: bool = False,
144 round_trip: bool = False,
145 warnings: bool | Literal["none", "warn", "error"] = True,
146 fallback: Callable[[object], object] | None = None,
147 serialize_as_any: bool = False,
148 polymorphic_serialization: bool | None = None,
149 ) -> str:
150 """Dump the model to a JSON string, redacting sensitive fields.
152 Returns:
153 The redacted JSON string representation of the model.
155 """
156 # Extract indent if any, as model_dump does not accept it
158 # Dump to JSON-compatible dict, mask, then serialize
159 dumped_dict = super().model_dump(
160 mode="json",
161 include=include,
162 exclude=exclude,
163 context=context,
164 by_alias=by_alias,
165 exclude_unset=exclude_unset,
166 exclude_defaults=exclude_defaults,
167 exclude_none=exclude_none,
168 exclude_computed_fields=exclude_computed_fields,
169 round_trip=round_trip,
170 warnings=warnings,
171 fallback=fallback,
172 serialize_as_any=serialize_as_any,
173 polymorphic_serialization=polymorphic_serialization,
174 )
175 masked_dict = _mask_data(dumped_dict)
176 # We need to respect Pydantic's indent/separators if possible,
177 # but json.dumps is the safest standard way.
178 if indent is not None:
179 return json.dumps(masked_dict, indent=indent, ensure_ascii=ensure_ascii)
180 return json.dumps(masked_dict, ensure_ascii=ensure_ascii)