Coverage for src / taipanstack / utils / filesystem.py: 100%
153 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
1"""
2Safe filesystem operations.
4Provides secure wrappers around file operations with path validation,
5atomic writes, and proper error handling using Result types.
6"""
8import contextlib
9import functools
10import hashlib
11import os
12import shutil
13import tempfile
14from dataclasses import dataclass
15from pathlib import Path
16from typing import TypeAlias
18from taipanstack.core.result import Err, Ok, Result
19from taipanstack.security.guards import (
20 TRAVERSAL_REGEX,
21 SecurityError,
22 guard_hash_algorithm,
23 guard_path_traversal,
24)
25from taipanstack.security.sanitizers import sanitize_filename
28@dataclass(frozen=True)
29class FileNotFoundErr:
30 """Error when file is not found."""
32 path: Path
33 message: str = ""
35 def __post_init__(self) -> None:
36 """Set default message."""
37 object.__setattr__(
38 self, "message", self.message or f"File not found: {self.path}"
39 )
42@dataclass(frozen=True)
43class NotAFileErr:
44 """Error when path is not a file."""
46 path: Path
47 message: str = ""
49 def __post_init__(self) -> None:
50 """Set default message."""
51 object.__setattr__(self, "message", self.message or f"Not a file: {self.path}")
54def _validate_path(
55 path: Path | str,
56 base_dir: Path | str | None = None,
57 *,
58 allow_symlinks: bool = False,
59) -> Path:
60 """Validate path for traversal.
62 If base_dir is None, we only check for explicit traversal patterns
63 to allow absolute paths (required for tests and some use cases),
64 but still prevent '..' attacks.
65 """
66 path = Path(path)
67 if base_dir is not None:
68 return guard_path_traversal(path, base_dir, allow_symlinks=allow_symlinks)
70 # Check for explicit traversal patterns
71 path_str = str(path).lower()
72 if TRAVERSAL_REGEX.search(path_str):
73 raise SecurityError(
74 "Path traversal pattern detected",
75 guard_name="path_traversal",
76 value=path_str[:50],
77 )
78 return path
81@dataclass(frozen=True)
82class FileTooLargeErr:
83 """Error when file exceeds size limit."""
85 path: Path
86 size: int
87 max_size: int
88 message: str = ""
90 def __post_init__(self) -> None:
91 """Set default message."""
92 object.__setattr__(
93 self,
94 "message",
95 self.message or f"File too large: {self.size} bytes (max: {self.max_size})",
96 )
99@dataclass(frozen=True)
100class WriteOptions:
101 """Options for safe_write.
103 Attributes:
104 base_dir: Base directory to constrain to.
105 encoding: File encoding.
106 create_parents: Create parent directories if needed.
107 backup: Create backup of existing file.
108 atomic: Use atomic write.
110 """
112 base_dir: Path | str | None = None
113 encoding: str = "utf-8"
114 create_parents: bool = True
115 backup: bool = True
116 atomic: bool = True
119# Union type for safe_read errors
120ReadFileError: TypeAlias = (
121 FileNotFoundErr | NotAFileErr | FileTooLargeErr | SecurityError
122)
125def safe_read(
126 path: Path | str,
127 *,
128 base_dir: Path | str | None = None,
129 encoding: str = "utf-8",
130 max_size_bytes: int | None = 10 * 1024 * 1024, # 10MB default
131) -> Result[str, ReadFileError]:
132 """Read a file safely with path validation.
134 Args:
135 path: Path to the file to read.
136 base_dir: Base directory to constrain to.
137 encoding: File encoding.
138 max_size_bytes: Maximum file size to read (None for no limit).
140 Returns:
141 Ok(str): File contents on success.
142 Err(ReadFileError): Error details on failure.
144 Example:
145 >>> match safe_read("config.json"):
146 ... case Ok(content):
147 ... data = json.loads(content)
148 ... case Err(FileNotFoundErr(path=p)):
149 ... print(f"Missing: {p}")
150 ... case Err(FileTooLargeErr(size=s)):
151 ... print(f"Too big: {s} bytes")
153 """
154 path = Path(path)
156 # Validate path
157 try:
158 path = _validate_path(path, base_dir)
159 except SecurityError as e:
160 return Err(e)
162 if not path.exists():
163 return Err(FileNotFoundErr(path=path))
165 if not path.is_file():
166 return Err(NotAFileErr(path=path))
168 # Check file size
169 if max_size_bytes is not None:
170 file_size = path.stat().st_size
171 if file_size > max_size_bytes:
172 return Err(
173 FileTooLargeErr(path=path, size=file_size, max_size=max_size_bytes)
174 )
176 return Ok(path.read_text(encoding=encoding))
179def safe_write(
180 path: Path | str,
181 content: str,
182 *,
183 options: WriteOptions | None = None,
184) -> Path:
185 """Write to a file safely with path validation.
187 Args:
188 path: Path to write to.
189 content: Content to write.
190 options: Write options.
192 Returns:
193 Path to the written file.
195 Raises:
196 SecurityError: If path validation fails.
198 """
199 opts = options or WriteOptions()
200 path = Path(path)
202 # Validate path
203 if opts.base_dir is not None:
204 base = Path(opts.base_dir).resolve()
205 # For new files, validate the parent
206 if not path.exists():
207 parent = path.parent
208 guard_path_traversal(parent, base)
209 else:
210 guard_path_traversal(path, base)
211 else:
212 _validate_path(path)
214 # Sanitize filename
215 safe_name = sanitize_filename(path.name)
216 path = path.parent / safe_name
218 # Create parents if needed
219 if opts.create_parents:
220 path.parent.mkdir(parents=True, exist_ok=True)
222 # Create backup if file exists
223 if opts.backup and path.exists():
224 backup_path = path.with_suffix(f"{path.suffix}.bak")
225 shutil.copy2(path, backup_path)
227 # Write file
228 if opts.atomic:
229 # Write to temp file first, then rename
230 _fd, temp_path = tempfile.mkstemp(
231 dir=path.parent,
232 prefix=f".{path.name}.",
233 suffix=".tmp",
234 )
235 try:
236 # Close the file descriptor immediately - required for Windows
237 os.close(_fd)
238 temp_file = Path(temp_path)
239 temp_file.write_text(content, encoding=opts.encoding)
240 # Preserve permissions if original exists
241 if path.exists():
242 shutil.copymode(path, temp_file)
243 # On Windows, we need to remove the target first if it exists
244 if path.exists():
245 path.unlink()
246 temp_file.rename(path)
247 except Exception:
248 # Clean up temp file on error
249 with contextlib.suppress(OSError):
250 Path(temp_path).unlink(missing_ok=True)
251 raise
252 else:
253 path.write_text(content, encoding=opts.encoding)
255 return path.resolve()
258def ensure_dir(
259 path: Path | str,
260 *,
261 base_dir: Path | str | None = None,
262 mode: int = 0o755,
263) -> Path:
264 """Ensure a directory exists, creating it if needed.
266 Args:
267 path: Path to the directory.
268 base_dir: Base directory to constrain to.
269 mode: Directory permissions.
271 Returns:
272 Path to the directory.
274 Raises:
275 SecurityError: If path validation fails.
277 """
278 path = Path(path)
280 # Validate path
281 path = _validate_path(path, base_dir, allow_symlinks=True)
283 path.mkdir(parents=True, exist_ok=True, mode=mode)
284 return path.resolve()
287def safe_copy(
288 src: Path | str,
289 dst: Path | str,
290 *,
291 base_dir: Path | str | None = None,
292 overwrite: bool = False,
293) -> Path:
294 """Copy a file safely.
296 Args:
297 src: Source file path.
298 dst: Destination file path.
299 base_dir: Base directory to constrain both paths to.
300 overwrite: Allow overwriting existing file.
302 Returns:
303 Path to the destination file.
305 Raises:
306 SecurityError: If path validation fails.
307 FileExistsError: If destination exists and overwrite=False.
309 """
310 src = Path(src)
311 dst = Path(dst)
313 # Validate paths
314 if base_dir is not None:
315 base = Path(base_dir)
316 src = guard_path_traversal(src, base)
317 # For dst, validate parent if file doesn't exist
318 if dst.exists():
319 dst = guard_path_traversal(dst, base)
320 else:
321 guard_path_traversal(dst.parent, base)
322 else:
323 src = _validate_path(src)
324 _validate_path(dst.parent if not dst.exists() else dst)
326 if not src.exists():
327 raise FileNotFoundError(f"Source file not found: {src}")
329 if dst.exists() and not overwrite:
330 raise FileExistsError(f"Destination already exists: {dst}")
332 # Ensure parent directory exists
333 dst.parent.mkdir(parents=True, exist_ok=True)
335 shutil.copy2(src, dst)
336 return dst.resolve()
339def safe_delete(
340 path: Path | str,
341 *,
342 base_dir: Path | str | None = None,
343 missing_ok: bool = True,
344 recursive: bool = False,
345) -> bool:
346 """Delete a file or directory safely.
348 Args:
349 path: Path to delete.
350 base_dir: Base directory to constrain to.
351 missing_ok: Don't raise if path doesn't exist.
352 recursive: Allow deleting directories recursively.
354 Returns:
355 True if something was deleted.
357 Raises:
358 SecurityError: If path validation fails.
359 FileNotFoundError: If path doesn't exist and missing_ok=False.
361 """
362 path = Path(path)
364 # Validate path
365 path = _validate_path(path, base_dir)
367 if not path.exists():
368 if missing_ok:
369 return False
370 raise FileNotFoundError(f"Path not found: {path}")
372 if path.is_dir():
373 if recursive:
374 shutil.rmtree(path)
375 else:
376 path.rmdir()
377 else:
378 path.unlink()
380 return True
383def get_file_hash(
384 path: Path | str,
385 *,
386 algorithm: str = "sha256",
387 base_dir: Path | str | None = None,
388) -> str:
389 """Get hash of a file.
391 Args:
392 path: Path to the file.
393 algorithm: Hash algorithm (sha256, sha512, etc).
394 base_dir: Base directory to constrain to.
396 Returns:
397 Hex digest of the file hash.
399 """
400 path = Path(path)
402 # Validate path
403 path = _validate_path(path, base_dir)
405 # Validate algorithm
406 algorithm = guard_hash_algorithm(algorithm)
408 hasher = hashlib.new(algorithm)
410 with path.open("rb") as f:
411 for chunk in iter(functools.partial(f.read, 8192), b""):
412 hasher.update(chunk)
414 return hasher.hexdigest()
417def find_files(
418 directory: Path | str,
419 pattern: str = "*",
420 *,
421 base_dir: Path | str | None = None,
422 recursive: bool = True,
423 include_hidden: bool = False,
424) -> list[Path]:
425 """Find files matching a pattern.
427 Args:
428 directory: Directory to search in.
429 pattern: Glob pattern to match.
430 base_dir: Base directory to constrain to.
431 recursive: Search recursively.
432 include_hidden: Include hidden files (starting with .).
434 Returns:
435 List of matching file paths.
437 """
438 directory = Path(directory)
440 # Validate path
441 directory = _validate_path(directory, base_dir)
443 if not directory.exists():
444 return []
446 if recursive:
447 files = list(directory.rglob(pattern))
448 else:
449 files = list(directory.glob(pattern))
451 # Filter hidden files if needed
452 if not include_hidden:
453 files = [f for f in files if not any(p.startswith(".") for p in f.parts)]
455 # Only return files, not directories
456 return [f for f in files if f.is_file()]