Coverage for src / taipanstack / utils / filesystem.py: 100%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 14:54 +0000

1""" 

2Safe filesystem operations. 

3 

4Provides secure wrappers around file operations with path validation, 

5atomic writes, and proper error handling using Result types. 

6""" 

7 

8import contextlib 

9import functools 

10import hashlib 

11import os 

12import shutil 

13import tempfile 

14from dataclasses import dataclass 

15from pathlib import Path 

16from typing import TypeAlias 

17 

18from taipanstack.core.result import Err, Ok, Result 

19from taipanstack.security.guards import ( 

20 TRAVERSAL_REGEX, 

21 SecurityError, 

22 guard_hash_algorithm, 

23 guard_path_traversal, 

24) 

25from taipanstack.security.sanitizers import sanitize_filename 

26 

27 

28@dataclass(frozen=True) 

29class FileNotFoundErr: 

30 """Error when file is not found.""" 

31 

32 path: Path 

33 message: str = "" 

34 

35 def __post_init__(self) -> None: 

36 """Set default message.""" 

37 object.__setattr__( 

38 self, "message", self.message or f"File not found: {self.path}" 

39 ) 

40 

41 

42@dataclass(frozen=True) 

43class NotAFileErr: 

44 """Error when path is not a file.""" 

45 

46 path: Path 

47 message: str = "" 

48 

49 def __post_init__(self) -> None: 

50 """Set default message.""" 

51 object.__setattr__(self, "message", self.message or f"Not a file: {self.path}") 

52 

53 

54def _validate_path( 

55 path: Path | str, 

56 base_dir: Path | str | None = None, 

57 *, 

58 allow_symlinks: bool = False, 

59) -> Path: 

60 """Validate path for traversal. 

61 

62 If base_dir is None, we only check for explicit traversal patterns 

63 to allow absolute paths (required for tests and some use cases), 

64 but still prevent '..' attacks. 

65 """ 

66 path = Path(path) 

67 if base_dir is not None: 

68 return guard_path_traversal(path, base_dir, allow_symlinks=allow_symlinks) 

69 

70 # Check for explicit traversal patterns 

71 path_str = str(path).lower() 

72 if TRAVERSAL_REGEX.search(path_str): 

73 raise SecurityError( 

74 "Path traversal pattern detected", 

75 guard_name="path_traversal", 

76 value=path_str[:50], 

77 ) 

78 return path 

79 

80 

81@dataclass(frozen=True) 

82class FileTooLargeErr: 

83 """Error when file exceeds size limit.""" 

84 

85 path: Path 

86 size: int 

87 max_size: int 

88 message: str = "" 

89 

90 def __post_init__(self) -> None: 

91 """Set default message.""" 

92 object.__setattr__( 

93 self, 

94 "message", 

95 self.message or f"File too large: {self.size} bytes (max: {self.max_size})", 

96 ) 

97 

98 

99@dataclass(frozen=True) 

100class WriteOptions: 

101 """Options for safe_write. 

102 

103 Attributes: 

104 base_dir: Base directory to constrain to. 

105 encoding: File encoding. 

106 create_parents: Create parent directories if needed. 

107 backup: Create backup of existing file. 

108 atomic: Use atomic write. 

109 

110 """ 

111 

112 base_dir: Path | str | None = None 

113 encoding: str = "utf-8" 

114 create_parents: bool = True 

115 backup: bool = True 

116 atomic: bool = True 

117 

118 

119# Union type for safe_read errors 

120ReadFileError: TypeAlias = ( 

121 FileNotFoundErr | NotAFileErr | FileTooLargeErr | SecurityError 

122) 

123 

124 

125def safe_read( 

126 path: Path | str, 

127 *, 

128 base_dir: Path | str | None = None, 

129 encoding: str = "utf-8", 

130 max_size_bytes: int | None = 10 * 1024 * 1024, # 10MB default 

131) -> Result[str, ReadFileError]: 

132 """Read a file safely with path validation. 

133 

134 Args: 

135 path: Path to the file to read. 

136 base_dir: Base directory to constrain to. 

137 encoding: File encoding. 

138 max_size_bytes: Maximum file size to read (None for no limit). 

139 

140 Returns: 

141 Ok(str): File contents on success. 

142 Err(ReadFileError): Error details on failure. 

143 

144 Example: 

145 >>> match safe_read("config.json"): 

146 ... case Ok(content): 

147 ... data = json.loads(content) 

148 ... case Err(FileNotFoundErr(path=p)): 

149 ... print(f"Missing: {p}") 

150 ... case Err(FileTooLargeErr(size=s)): 

151 ... print(f"Too big: {s} bytes") 

152 

153 """ 

154 path = Path(path) 

155 

156 # Validate path 

157 try: 

158 path = _validate_path(path, base_dir) 

159 except SecurityError as e: 

160 return Err(e) 

161 

162 if not path.exists(): 

163 return Err(FileNotFoundErr(path=path)) 

164 

165 if not path.is_file(): 

166 return Err(NotAFileErr(path=path)) 

167 

168 # Check file size 

169 if max_size_bytes is not None: 

170 file_size = path.stat().st_size 

171 if file_size > max_size_bytes: 

172 return Err( 

173 FileTooLargeErr(path=path, size=file_size, max_size=max_size_bytes) 

174 ) 

175 

176 return Ok(path.read_text(encoding=encoding)) 

177 

178 

179def safe_write( 

180 path: Path | str, 

181 content: str, 

182 *, 

183 options: WriteOptions | None = None, 

184) -> Path: 

185 """Write to a file safely with path validation. 

186 

187 Args: 

188 path: Path to write to. 

189 content: Content to write. 

190 options: Write options. 

191 

192 Returns: 

193 Path to the written file. 

194 

195 Raises: 

196 SecurityError: If path validation fails. 

197 

198 """ 

199 opts = options or WriteOptions() 

200 path = Path(path) 

201 

202 # Validate path 

203 if opts.base_dir is not None: 

204 base = Path(opts.base_dir).resolve() 

205 # For new files, validate the parent 

206 if not path.exists(): 

207 parent = path.parent 

208 guard_path_traversal(parent, base) 

209 else: 

210 guard_path_traversal(path, base) 

211 else: 

212 _validate_path(path) 

213 

214 # Sanitize filename 

215 safe_name = sanitize_filename(path.name) 

216 path = path.parent / safe_name 

217 

218 # Create parents if needed 

219 if opts.create_parents: 

220 path.parent.mkdir(parents=True, exist_ok=True) 

221 

222 # Create backup if file exists 

223 if opts.backup and path.exists(): 

224 backup_path = path.with_suffix(f"{path.suffix}.bak") 

225 shutil.copy2(path, backup_path) 

226 

227 # Write file 

228 if opts.atomic: 

229 # Write to temp file first, then rename 

230 _fd, temp_path = tempfile.mkstemp( 

231 dir=path.parent, 

232 prefix=f".{path.name}.", 

233 suffix=".tmp", 

234 ) 

235 try: 

236 # Close the file descriptor immediately - required for Windows 

237 os.close(_fd) 

238 temp_file = Path(temp_path) 

239 temp_file.write_text(content, encoding=opts.encoding) 

240 # Preserve permissions if original exists 

241 if path.exists(): 

242 shutil.copymode(path, temp_file) 

243 # On Windows, we need to remove the target first if it exists 

244 if path.exists(): 

245 path.unlink() 

246 temp_file.rename(path) 

247 except Exception: 

248 # Clean up temp file on error 

249 with contextlib.suppress(OSError): 

250 Path(temp_path).unlink(missing_ok=True) 

251 raise 

252 else: 

253 path.write_text(content, encoding=opts.encoding) 

254 

255 return path.resolve() 

256 

257 

258def ensure_dir( 

259 path: Path | str, 

260 *, 

261 base_dir: Path | str | None = None, 

262 mode: int = 0o755, 

263) -> Path: 

264 """Ensure a directory exists, creating it if needed. 

265 

266 Args: 

267 path: Path to the directory. 

268 base_dir: Base directory to constrain to. 

269 mode: Directory permissions. 

270 

271 Returns: 

272 Path to the directory. 

273 

274 Raises: 

275 SecurityError: If path validation fails. 

276 

277 """ 

278 path = Path(path) 

279 

280 # Validate path 

281 path = _validate_path(path, base_dir, allow_symlinks=True) 

282 

283 path.mkdir(parents=True, exist_ok=True, mode=mode) 

284 return path.resolve() 

285 

286 

287def safe_copy( 

288 src: Path | str, 

289 dst: Path | str, 

290 *, 

291 base_dir: Path | str | None = None, 

292 overwrite: bool = False, 

293) -> Path: 

294 """Copy a file safely. 

295 

296 Args: 

297 src: Source file path. 

298 dst: Destination file path. 

299 base_dir: Base directory to constrain both paths to. 

300 overwrite: Allow overwriting existing file. 

301 

302 Returns: 

303 Path to the destination file. 

304 

305 Raises: 

306 SecurityError: If path validation fails. 

307 FileExistsError: If destination exists and overwrite=False. 

308 

309 """ 

310 src = Path(src) 

311 dst = Path(dst) 

312 

313 # Validate paths 

314 if base_dir is not None: 

315 base = Path(base_dir) 

316 src = guard_path_traversal(src, base) 

317 # For dst, validate parent if file doesn't exist 

318 if dst.exists(): 

319 dst = guard_path_traversal(dst, base) 

320 else: 

321 guard_path_traversal(dst.parent, base) 

322 else: 

323 src = _validate_path(src) 

324 _validate_path(dst.parent if not dst.exists() else dst) 

325 

326 if not src.exists(): 

327 raise FileNotFoundError(f"Source file not found: {src}") 

328 

329 if dst.exists() and not overwrite: 

330 raise FileExistsError(f"Destination already exists: {dst}") 

331 

332 # Ensure parent directory exists 

333 dst.parent.mkdir(parents=True, exist_ok=True) 

334 

335 shutil.copy2(src, dst) 

336 return dst.resolve() 

337 

338 

339def safe_delete( 

340 path: Path | str, 

341 *, 

342 base_dir: Path | str | None = None, 

343 missing_ok: bool = True, 

344 recursive: bool = False, 

345) -> bool: 

346 """Delete a file or directory safely. 

347 

348 Args: 

349 path: Path to delete. 

350 base_dir: Base directory to constrain to. 

351 missing_ok: Don't raise if path doesn't exist. 

352 recursive: Allow deleting directories recursively. 

353 

354 Returns: 

355 True if something was deleted. 

356 

357 Raises: 

358 SecurityError: If path validation fails. 

359 FileNotFoundError: If path doesn't exist and missing_ok=False. 

360 

361 """ 

362 path = Path(path) 

363 

364 # Validate path 

365 path = _validate_path(path, base_dir) 

366 

367 if not path.exists(): 

368 if missing_ok: 

369 return False 

370 raise FileNotFoundError(f"Path not found: {path}") 

371 

372 if path.is_dir(): 

373 if recursive: 

374 shutil.rmtree(path) 

375 else: 

376 path.rmdir() 

377 else: 

378 path.unlink() 

379 

380 return True 

381 

382 

383def get_file_hash( 

384 path: Path | str, 

385 *, 

386 algorithm: str = "sha256", 

387 base_dir: Path | str | None = None, 

388) -> str: 

389 """Get hash of a file. 

390 

391 Args: 

392 path: Path to the file. 

393 algorithm: Hash algorithm (sha256, sha512, etc). 

394 base_dir: Base directory to constrain to. 

395 

396 Returns: 

397 Hex digest of the file hash. 

398 

399 """ 

400 path = Path(path) 

401 

402 # Validate path 

403 path = _validate_path(path, base_dir) 

404 

405 # Validate algorithm 

406 algorithm = guard_hash_algorithm(algorithm) 

407 

408 hasher = hashlib.new(algorithm) 

409 

410 with path.open("rb") as f: 

411 for chunk in iter(functools.partial(f.read, 8192), b""): 

412 hasher.update(chunk) 

413 

414 return hasher.hexdigest() 

415 

416 

417def find_files( 

418 directory: Path | str, 

419 pattern: str = "*", 

420 *, 

421 base_dir: Path | str | None = None, 

422 recursive: bool = True, 

423 include_hidden: bool = False, 

424) -> list[Path]: 

425 """Find files matching a pattern. 

426 

427 Args: 

428 directory: Directory to search in. 

429 pattern: Glob pattern to match. 

430 base_dir: Base directory to constrain to. 

431 recursive: Search recursively. 

432 include_hidden: Include hidden files (starting with .). 

433 

434 Returns: 

435 List of matching file paths. 

436 

437 """ 

438 directory = Path(directory) 

439 

440 # Validate path 

441 directory = _validate_path(directory, base_dir) 

442 

443 if not directory.exists(): 

444 return [] 

445 

446 if recursive: 

447 files = list(directory.rglob(pattern)) 

448 else: 

449 files = list(directory.glob(pattern)) 

450 

451 # Filter hidden files if needed 

452 if not include_hidden: 

453 files = [f for f in files if not any(p.startswith(".") for p in f.parts)] 

454 

455 # Only return files, not directories 

456 return [f for f in files if f.is_file()]