Coverage for src / taipanstack / security / sanitizers.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 14:54 +0000

1""" 

2Input sanitizers for cleaning untrusted data. 

3 

4Provides functions to sanitize strings, filenames, and paths 

5to remove potentially dangerous characters. 

6""" 

7 

8import re 

9from pathlib import Path 

10 

11# Constants to avoid magic values (PLR2004) 

12MAX_SQL_IDENTIFIER_LENGTH = 128 # pragma: no mutate 

13 

14# Pre-compiled regex and sets for Performance Benchmarks 

15_INVALID_FILENAME_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]') # pragma: no mutate 

16_SQL_IDENTIFIER_DENY_RE = re.compile(r"[^a-zA-Z0-9_]") # pragma: no mutate 

17_HTML_TAGS_RE = re.compile(r"<[^>]+>") # pragma: no mutate 

18# Remove control characters (C0 and C1 sets) 

19_CONTROL_CHARS_RE = re.compile( 

20 r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]" 

21) # pragma: no mutate 

22_VALID_SQL_PREFIX = frozenset( 

23 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_" 

24) # pragma: no mutate 

25_WINDOWS_RESERVED_NAMES = frozenset( # pragma: no mutate 

26 { 

27 "CON", 

28 "PRN", 

29 "AUX", 

30 "NUL", 

31 "COM1", 

32 "COM2", 

33 "COM3", 

34 "COM4", 

35 "COM5", 

36 "COM6", 

37 "COM7", 

38 "COM8", 

39 "COM9", 

40 "LPT1", 

41 "LPT2", 

42 "LPT3", 

43 "LPT4", 

44 "LPT5", 

45 "LPT6", 

46 "LPT7", 

47 "LPT8", 

48 "LPT9", 

49 } 

50) 

51 

52 

53def sanitize_string( 

54 value: str, 

55 *, 

56 max_length: int | None = None, 

57 allow_html: bool = False, 

58 allow_unicode: bool = True, 

59 strip_whitespace: bool = True, 

60) -> str: 

61 """Sanitize a string by removing dangerous characters. 

62 

63 Args: 

64 value: The string to sanitize. 

65 max_length: Maximum length to truncate to. 

66 allow_html: Whether to keep HTML tags (default: False). 

67 allow_unicode: Whether to keep non-ASCII characters. 

68 strip_whitespace: Whether to strip leading/trailing whitespace. 

69 

70 Returns: 

71 The sanitized string. 

72 

73 Example: 

74 ```python 

75 sanitize_string("<script>alert('xss')</script>Hello") 

76 # Returns: "scriptalert('xss')/scriptHello" 

77 ``` 

78 

79 """ 

80 if not isinstance(value, str): 

81 raise TypeError(f"value must be str, got {type(value).__name__}") 

82 

83 if not value: 

84 return "" 

85 

86 result = value 

87 

88 # Strip whitespace first 

89 if strip_whitespace: 

90 result = result.strip() 

91 

92 # Remove null bytes and control characters 

93 result = _CONTROL_CHARS_RE.sub("", result) 

94 

95 # Handle HTML 

96 if not allow_html: 

97 # Remove HTML tags 

98 result = _HTML_TAGS_RE.sub("", result) 

99 # Escape HTML entities 

100 result = result.replace("&", "&amp;") 

101 result = result.replace("<", "&lt;") 

102 result = result.replace(">", "&gt;") 

103 

104 # Handle unicode 

105 if not allow_unicode: 

106 result = result.encode("ascii", errors="ignore").decode("ascii") 

107 

108 # Truncate if needed 

109 if max_length is not None and len(result) > max_length: 

110 result = result[:max_length] 

111 

112 return result 

113 

114 

115def sanitize_filename( 

116 filename: str, 

117 *, 

118 max_length: int = 255, 

119 replacement: str = "_", 

120 preserve_extension: bool = True, 

121) -> str: 

122 """Sanitize a filename to be safe for filesystem use. 

123 

124 Removes or replaces characters that are: 

125 - Not allowed in filenames on various OSes 

126 - Potentially dangerous (path separators, etc.) 

127 

128 Args: 

129 filename: The filename to sanitize. 

130 max_length: Maximum length for the filename. 

131 replacement: Character to replace invalid chars with. 

132 preserve_extension: Keep original extension. 

133 

134 Returns: 

135 The sanitized filename. 

136 

137 Example: 

138 ```python 

139 sanitize_filename("my/../file<>:name.txt") 

140 # Returns: 'my_file_name.txt' 

141 ``` 

142 

143 """ 

144 if not isinstance(filename, str): 

145 raise TypeError(f"filename must be str, got {type(filename).__name__}") 

146 

147 if not filename: 

148 filename = "unnamed" 

149 

150 # Get parts 

151 original_path = Path(filename) 

152 stem = original_path.stem 

153 suffix = original_path.suffix if preserve_extension else "" 

154 

155 # Remove invalid characters using precompiled regex for performance 

156 try: 

157 # Use lambda to avoid processing regex escape sequences in replacement string 

158 safe_stem = _INVALID_FILENAME_CHARS_RE.sub(lambda _: replacement, stem) 

159 except re.error: # pragma: no cover 

160 safe_stem = _INVALID_FILENAME_CHARS_RE.sub("_", stem) 

161 

162 # Remove leading/trailing dots and spaces (Windows issues) 

163 safe_stem = safe_stem.strip(". ") 

164 

165 # Remove path separators that might have snuck through 

166 safe_stem = safe_stem.replace("/", replacement) 

167 safe_stem = safe_stem.replace("\\", replacement) 

168 

169 # Collapse multiple replacement chars 

170 if replacement: 

171 try: 

172 safe_stem = re.sub( 

173 f"({re.escape(replacement)})+", 

174 lambda _: replacement, 

175 safe_stem, 

176 ) 

177 except re.error: # pragma: no cover 

178 safe_stem = re.sub( 

179 f"({re.escape(replacement)})+", 

180 "_", 

181 safe_stem, 

182 ) 

183 safe_stem = safe_stem.strip(replacement) 

184 

185 # Handle reserved names (Windows) 

186 if safe_stem.upper() in _WINDOWS_RESERVED_NAMES: 

187 safe_stem = f"{replacement}{safe_stem}" 

188 

189 # Handle empty result 

190 if not safe_stem: 

191 safe_stem = "unnamed" 

192 

193 # Construct result 

194 result = f"{safe_stem}{suffix}" 

195 

196 # Truncate if needed (keeping extension) 

197 if len(result) > max_length: 

198 available = max_length - len(suffix) 

199 if available > 0: 

200 safe_stem = safe_stem[:available] 

201 result = f"{safe_stem}{suffix}" 

202 else: 

203 result = result[:max_length] 

204 

205 return result 

206 

207 

208def _clean_path_parts(path: Path) -> list[str]: 

209 """Clean and sanitize individual path components.""" 

210 parts: list[str] = [] 

211 for part in path.parts: 

212 if part == "..": 

213 if parts and parts[-1] != "..": 

214 parts.pop() 

215 elif part != ".": # pragma: no branch 

216 safe_part = sanitize_filename(part, preserve_extension=True) 

217 if safe_part: # pragma: no branch 

218 parts.append(safe_part) 

219 return parts 

220 

221 

222def _apply_base_dir_constraint( 

223 sanitized: Path, 

224 base_dir: Path | str | None, 

225 resolve: bool, 

226) -> Path: 

227 """Apply base directory constraints to a sanitized path.""" 

228 if base_dir is None: 

229 return sanitized 

230 

231 base = Path(base_dir).resolve() 

232 if resolve: 

233 try: 

234 return sanitized.resolve() 

235 except (OSError, RuntimeError) as e: 

236 msg = f"Cannot resolve path: {e}" 

237 raise ValueError(msg) from e 

238 

239 # Make absolute relative to base 

240 if not sanitized.is_absolute(): # pragma: no branch 

241 return base / sanitized 

242 

243 return sanitized 

244 

245 

246def sanitize_path( 

247 path: str | Path, 

248 *, 

249 base_dir: Path | None = None, 

250 max_depth: int | None = 10, 

251 resolve: bool = False, 

252) -> Path: 

253 """Sanitize a path to prevent traversal and normalize it. 

254 

255 Args: 

256 path: The path to sanitize. 

257 base_dir: Optional base directory to constrain to. 

258 max_depth: Maximum directory depth allowed. 

259 resolve: Whether to resolve the path (requires it to exist). 

260 

261 Returns: 

262 The sanitized Path object. 

263 

264 Raises: 

265 ValueError: If path is invalid or too deep. 

266 

267 """ 

268 if isinstance(path, str): # pragma: no branch 

269 path = Path(path) 

270 

271 # Remove any null bytes and normalize 

272 path = Path(str(path).replace("\x00", "")) 

273 

274 # Clean components 

275 parts = _clean_path_parts(path) 

276 

277 # Reconstruct path 

278 if path.is_absolute(): # pragma: no branch 

279 # Use path.anchor to correctly preserve absolute roots on Windows (e.g. C:\) 

280 anchor = Path(path.anchor) 

281 sanitized = anchor.joinpath(*parts) if parts else anchor 

282 elif parts: # pragma: no branch 

283 sanitized = Path().joinpath(*parts) 

284 else: 

285 sanitized = Path() 

286 

287 # Check depth 

288 depth = len(sanitized.parts) 

289 if max_depth is not None and depth > max_depth: 

290 msg = f"Path depth {depth} exceeds maximum of {max_depth}" 

291 raise ValueError(msg) 

292 

293 # Constrain to base_dir 

294 return _apply_base_dir_constraint(sanitized, base_dir, resolve) 

295 

296 

297def sanitize_env_value( 

298 value: str, 

299 *, 

300 max_length: int = 4096, 

301 allow_multiline: bool = False, 

302) -> str: 

303 """Sanitize a value for use as an environment variable. 

304 

305 Args: 

306 value: The value to sanitize. 

307 max_length: Maximum length allowed. 

308 allow_multiline: Whether to allow newlines. 

309 

310 Returns: 

311 The sanitized value. 

312 

313 Raises: 

314 TypeError: If value is not a string. 

315 

316 """ 

317 if not isinstance(value, str): 

318 raise TypeError(f"value must be str, got {type(value).__name__}") 

319 

320 if not value: 

321 return "" 

322 

323 # Fast path: if no sensitive characters and within length, return as is 

324 if ( 

325 len(value) <= max_length 

326 and "\x00" not in value 

327 and (allow_multiline or ("\n" not in value and "\r" not in value)) 

328 ): 

329 return value 

330 

331 result = value 

332 

333 # Remove null bytes 

334 result = result.replace("\x00", "") 

335 

336 # Handle newlines 

337 if not allow_multiline: 

338 result = result.replace("\n", " ").replace("\r", " ") 

339 

340 # Truncate 

341 if len(result) > max_length: 

342 result = result[:max_length] 

343 

344 return result 

345 

346 

347def sanitize_sql_identifier(identifier: str) -> str: 

348 """Sanitize a SQL identifier (table/column name). 

349 

350 Note: This is NOT for SQL values - use parameterized queries for those! 

351 

352 Args: 

353 identifier: The identifier to sanitize. 

354 

355 Returns: 

356 The sanitized identifier. 

357 

358 Raises: 

359 TypeError: If identifier is not a string. 

360 ValueError: If identifier is empty or too long. 

361 

362 """ 

363 if not isinstance(identifier, str): 

364 raise TypeError(f"identifier must be str, got {type(identifier).__name__}") 

365 

366 if not identifier: 

367 msg = "SQL identifier cannot be empty" 

368 raise ValueError(msg) 

369 

370 # Fast path: already clean and valid 

371 if not _SQL_IDENTIFIER_DENY_RE.search(identifier): 

372 if ( 

373 identifier[0] in _VALID_SQL_PREFIX 

374 and len(identifier) <= MAX_SQL_IDENTIFIER_LENGTH 

375 ): 

376 return identifier 

377 result = identifier 

378 else: 

379 # Only allow alphanumeric and underscore 

380 result = _SQL_IDENTIFIER_DENY_RE.sub("", identifier) 

381 

382 # Must start with letter or underscore 

383 if result and result[0] not in _VALID_SQL_PREFIX: 

384 result = f"_{result}" 

385 

386 # Check length (most DBs limit to 128 chars) 

387 if len(result) > MAX_SQL_IDENTIFIER_LENGTH: 

388 result = result[:MAX_SQL_IDENTIFIER_LENGTH] 

389 

390 if not result: 

391 msg = "SQL identifier contains no valid characters" 

392 raise ValueError(msg) 

393 

394 return result