Coverage for src / taipanstack / security / sanitizers.py: 100%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
1"""
2Input sanitizers for cleaning untrusted data.
4Provides functions to sanitize strings, filenames, and paths
5to remove potentially dangerous characters.
6"""
8import re
9from pathlib import Path
11# Constants to avoid magic values (PLR2004)
12MAX_SQL_IDENTIFIER_LENGTH = 128 # pragma: no mutate
14# Pre-compiled regex and sets for Performance Benchmarks
15_INVALID_FILENAME_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]') # pragma: no mutate
16_SQL_IDENTIFIER_DENY_RE = re.compile(r"[^a-zA-Z0-9_]") # pragma: no mutate
17_HTML_TAGS_RE = re.compile(r"<[^>]+>") # pragma: no mutate
18# Remove control characters (C0 and C1 sets)
19_CONTROL_CHARS_RE = re.compile(
20 r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]"
21) # pragma: no mutate
22_VALID_SQL_PREFIX = frozenset(
23 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_"
24) # pragma: no mutate
25_WINDOWS_RESERVED_NAMES = frozenset( # pragma: no mutate
26 {
27 "CON",
28 "PRN",
29 "AUX",
30 "NUL",
31 "COM1",
32 "COM2",
33 "COM3",
34 "COM4",
35 "COM5",
36 "COM6",
37 "COM7",
38 "COM8",
39 "COM9",
40 "LPT1",
41 "LPT2",
42 "LPT3",
43 "LPT4",
44 "LPT5",
45 "LPT6",
46 "LPT7",
47 "LPT8",
48 "LPT9",
49 }
50)
53def sanitize_string(
54 value: str,
55 *,
56 max_length: int | None = None,
57 allow_html: bool = False,
58 allow_unicode: bool = True,
59 strip_whitespace: bool = True,
60) -> str:
61 """Sanitize a string by removing dangerous characters.
63 Args:
64 value: The string to sanitize.
65 max_length: Maximum length to truncate to.
66 allow_html: Whether to keep HTML tags (default: False).
67 allow_unicode: Whether to keep non-ASCII characters.
68 strip_whitespace: Whether to strip leading/trailing whitespace.
70 Returns:
71 The sanitized string.
73 Example:
74 ```python
75 sanitize_string("<script>alert('xss')</script>Hello")
76 # Returns: "scriptalert('xss')/scriptHello"
77 ```
79 """
80 if not isinstance(value, str):
81 raise TypeError(f"value must be str, got {type(value).__name__}")
83 if not value:
84 return ""
86 result = value
88 # Strip whitespace first
89 if strip_whitespace:
90 result = result.strip()
92 # Remove null bytes and control characters
93 result = _CONTROL_CHARS_RE.sub("", result)
95 # Handle HTML
96 if not allow_html:
97 # Remove HTML tags
98 result = _HTML_TAGS_RE.sub("", result)
99 # Escape HTML entities
100 result = result.replace("&", "&")
101 result = result.replace("<", "<")
102 result = result.replace(">", ">")
104 # Handle unicode
105 if not allow_unicode:
106 result = result.encode("ascii", errors="ignore").decode("ascii")
108 # Truncate if needed
109 if max_length is not None and len(result) > max_length:
110 result = result[:max_length]
112 return result
115def sanitize_filename(
116 filename: str,
117 *,
118 max_length: int = 255,
119 replacement: str = "_",
120 preserve_extension: bool = True,
121) -> str:
122 """Sanitize a filename to be safe for filesystem use.
124 Removes or replaces characters that are:
125 - Not allowed in filenames on various OSes
126 - Potentially dangerous (path separators, etc.)
128 Args:
129 filename: The filename to sanitize.
130 max_length: Maximum length for the filename.
131 replacement: Character to replace invalid chars with.
132 preserve_extension: Keep original extension.
134 Returns:
135 The sanitized filename.
137 Example:
138 ```python
139 sanitize_filename("my/../file<>:name.txt")
140 # Returns: 'my_file_name.txt'
141 ```
143 """
144 if not isinstance(filename, str):
145 raise TypeError(f"filename must be str, got {type(filename).__name__}")
147 if not filename:
148 filename = "unnamed"
150 # Get parts
151 original_path = Path(filename)
152 stem = original_path.stem
153 suffix = original_path.suffix if preserve_extension else ""
155 # Remove invalid characters using precompiled regex for performance
156 try:
157 # Use lambda to avoid processing regex escape sequences in replacement string
158 safe_stem = _INVALID_FILENAME_CHARS_RE.sub(lambda _: replacement, stem)
159 except re.error: # pragma: no cover
160 safe_stem = _INVALID_FILENAME_CHARS_RE.sub("_", stem)
162 # Remove leading/trailing dots and spaces (Windows issues)
163 safe_stem = safe_stem.strip(". ")
165 # Remove path separators that might have snuck through
166 safe_stem = safe_stem.replace("/", replacement)
167 safe_stem = safe_stem.replace("\\", replacement)
169 # Collapse multiple replacement chars
170 if replacement:
171 try:
172 safe_stem = re.sub(
173 f"({re.escape(replacement)})+",
174 lambda _: replacement,
175 safe_stem,
176 )
177 except re.error: # pragma: no cover
178 safe_stem = re.sub(
179 f"({re.escape(replacement)})+",
180 "_",
181 safe_stem,
182 )
183 safe_stem = safe_stem.strip(replacement)
185 # Handle reserved names (Windows)
186 if safe_stem.upper() in _WINDOWS_RESERVED_NAMES:
187 safe_stem = f"{replacement}{safe_stem}"
189 # Handle empty result
190 if not safe_stem:
191 safe_stem = "unnamed"
193 # Construct result
194 result = f"{safe_stem}{suffix}"
196 # Truncate if needed (keeping extension)
197 if len(result) > max_length:
198 available = max_length - len(suffix)
199 if available > 0:
200 safe_stem = safe_stem[:available]
201 result = f"{safe_stem}{suffix}"
202 else:
203 result = result[:max_length]
205 return result
208def _clean_path_parts(path: Path) -> list[str]:
209 """Clean and sanitize individual path components."""
210 parts: list[str] = []
211 for part in path.parts:
212 if part == "..":
213 if parts and parts[-1] != "..":
214 parts.pop()
215 elif part != ".": # pragma: no branch
216 safe_part = sanitize_filename(part, preserve_extension=True)
217 if safe_part: # pragma: no branch
218 parts.append(safe_part)
219 return parts
222def _apply_base_dir_constraint(
223 sanitized: Path,
224 base_dir: Path | str | None,
225 resolve: bool,
226) -> Path:
227 """Apply base directory constraints to a sanitized path."""
228 if base_dir is None:
229 return sanitized
231 base = Path(base_dir).resolve()
232 if resolve:
233 try:
234 return sanitized.resolve()
235 except (OSError, RuntimeError) as e:
236 msg = f"Cannot resolve path: {e}"
237 raise ValueError(msg) from e
239 # Make absolute relative to base
240 if not sanitized.is_absolute(): # pragma: no branch
241 return base / sanitized
243 return sanitized
246def sanitize_path(
247 path: str | Path,
248 *,
249 base_dir: Path | None = None,
250 max_depth: int | None = 10,
251 resolve: bool = False,
252) -> Path:
253 """Sanitize a path to prevent traversal and normalize it.
255 Args:
256 path: The path to sanitize.
257 base_dir: Optional base directory to constrain to.
258 max_depth: Maximum directory depth allowed.
259 resolve: Whether to resolve the path (requires it to exist).
261 Returns:
262 The sanitized Path object.
264 Raises:
265 ValueError: If path is invalid or too deep.
267 """
268 if isinstance(path, str): # pragma: no branch
269 path = Path(path)
271 # Remove any null bytes and normalize
272 path = Path(str(path).replace("\x00", ""))
274 # Clean components
275 parts = _clean_path_parts(path)
277 # Reconstruct path
278 if path.is_absolute(): # pragma: no branch
279 # Use path.anchor to correctly preserve absolute roots on Windows (e.g. C:\)
280 anchor = Path(path.anchor)
281 sanitized = anchor.joinpath(*parts) if parts else anchor
282 elif parts: # pragma: no branch
283 sanitized = Path().joinpath(*parts)
284 else:
285 sanitized = Path()
287 # Check depth
288 depth = len(sanitized.parts)
289 if max_depth is not None and depth > max_depth:
290 msg = f"Path depth {depth} exceeds maximum of {max_depth}"
291 raise ValueError(msg)
293 # Constrain to base_dir
294 return _apply_base_dir_constraint(sanitized, base_dir, resolve)
297def sanitize_env_value(
298 value: str,
299 *,
300 max_length: int = 4096,
301 allow_multiline: bool = False,
302) -> str:
303 """Sanitize a value for use as an environment variable.
305 Args:
306 value: The value to sanitize.
307 max_length: Maximum length allowed.
308 allow_multiline: Whether to allow newlines.
310 Returns:
311 The sanitized value.
313 Raises:
314 TypeError: If value is not a string.
316 """
317 if not isinstance(value, str):
318 raise TypeError(f"value must be str, got {type(value).__name__}")
320 if not value:
321 return ""
323 # Fast path: if no sensitive characters and within length, return as is
324 if (
325 len(value) <= max_length
326 and "\x00" not in value
327 and (allow_multiline or ("\n" not in value and "\r" not in value))
328 ):
329 return value
331 result = value
333 # Remove null bytes
334 result = result.replace("\x00", "")
336 # Handle newlines
337 if not allow_multiline:
338 result = result.replace("\n", " ").replace("\r", " ")
340 # Truncate
341 if len(result) > max_length:
342 result = result[:max_length]
344 return result
347def sanitize_sql_identifier(identifier: str) -> str:
348 """Sanitize a SQL identifier (table/column name).
350 Note: This is NOT for SQL values - use parameterized queries for those!
352 Args:
353 identifier: The identifier to sanitize.
355 Returns:
356 The sanitized identifier.
358 Raises:
359 TypeError: If identifier is not a string.
360 ValueError: If identifier is empty or too long.
362 """
363 if not isinstance(identifier, str):
364 raise TypeError(f"identifier must be str, got {type(identifier).__name__}")
366 if not identifier:
367 msg = "SQL identifier cannot be empty"
368 raise ValueError(msg)
370 # Fast path: already clean and valid
371 if not _SQL_IDENTIFIER_DENY_RE.search(identifier):
372 if (
373 identifier[0] in _VALID_SQL_PREFIX
374 and len(identifier) <= MAX_SQL_IDENTIFIER_LENGTH
375 ):
376 return identifier
377 result = identifier
378 else:
379 # Only allow alphanumeric and underscore
380 result = _SQL_IDENTIFIER_DENY_RE.sub("", identifier)
382 # Must start with letter or underscore
383 if result and result[0] not in _VALID_SQL_PREFIX:
384 result = f"_{result}"
386 # Check length (most DBs limit to 128 chars)
387 if len(result) > MAX_SQL_IDENTIFIER_LENGTH:
388 result = result[:MAX_SQL_IDENTIFIER_LENGTH]
390 if not result:
391 msg = "SQL identifier contains no valid characters"
392 raise ValueError(msg)
394 return result