Coverage for src / taipanstack / utils / filesystem.py: 100%
123 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2Safe filesystem operations.
4Provides secure wrappers around file operations with path validation,
5atomic writes, and proper error handling using Result types.
6"""
8import contextlib
9import os
10import shutil
11import tempfile
12from dataclasses import dataclass
13from pathlib import Path
14from typing import TypeAlias
16from taipanstack.core.result import Err, Ok, Result
17from taipanstack.security.guards import (
18 TRAVERSAL_REGEX,
19 SecurityError,
20 guard_path_traversal,
21)
22from taipanstack.security.sanitizers import sanitize_filename
25@dataclass(frozen=True)
26class FileNotFoundErr:
27 """Error when file is not found."""
29 path: Path
31 @property
32 def message(self) -> str:
33 """Get the error message."""
34 return f"File not found: {self.path}"
37@dataclass(frozen=True)
38class NotAFileErr:
39 """Error when path is not a file."""
41 path: Path
43 @property
44 def message(self) -> str:
45 """Get the error message."""
46 return f"Not a file: {self.path}"
49def _validate_path(
50 path: Path | str,
51 base_dir: Path | str | None = None,
52 *,
53 allow_symlinks: bool = False,
54) -> Path:
55 """Validate path for traversal.
57 If base_dir is None, we only check for explicit traversal patterns
58 to allow absolute paths (required for tests and some use cases),
59 but still prevent '..' attacks.
60 """
61 path = Path(path)
62 if base_dir is not None:
63 return guard_path_traversal(path, base_dir, allow_symlinks=allow_symlinks)
65 # Check for explicit traversal patterns
66 path_str = str(path).lower()
67 if TRAVERSAL_REGEX.search(path_str):
68 raise SecurityError(
69 "Path traversal pattern detected",
70 guard_name="path_traversal",
71 value=path_str[:50],
72 )
73 return path
76@dataclass(frozen=True)
77class FileTooLargeErr:
78 """Error when file exceeds size limit."""
80 path: Path
81 size: int
82 max_size: int
84 @property
85 def message(self) -> str:
86 """Get the error message."""
87 return f"File too large: {self.size} bytes (max: {self.max_size})"
90@dataclass(frozen=True)
91class WriteOptions:
92 """Options for safe_write.
94 Attributes:
95 base_dir: Base directory to constrain to.
96 encoding: File encoding.
97 create_parents: Create parent directories if needed.
98 backup: Create backup of existing file.
99 atomic: Use atomic write.
101 """
103 base_dir: Path | str | None = None
104 encoding: str = "utf-8"
105 create_parents: bool = True
106 backup: bool = True
107 atomic: bool = True
110# Union type for safe_read errors
111ReadFileError: TypeAlias = (
112 FileNotFoundErr | NotAFileErr | FileTooLargeErr | SecurityError
113)
116def safe_read(
117 path: Path | str,
118 *,
119 base_dir: Path | str | None = None,
120 encoding: str = "utf-8",
121 max_size_bytes: int | None = 10 * 1024 * 1024, # 10MB default
122) -> Result[str, ReadFileError]:
123 """Read a file safely with path validation.
125 Args:
126 path: Path to the file to read.
127 base_dir: Base directory to constrain to.
128 encoding: File encoding.
129 max_size_bytes: Maximum file size to read (None for no limit).
131 Returns:
132 Ok(str): File contents on success.
133 Err(ReadFileError): Error details on failure.
135 Example:
136 >>> match safe_read("config.json"):
137 ... case Ok(content):
138 ... data = json.loads(content)
139 ... case Err(FileNotFoundErr(path=p)):
140 ... print(f"Missing: {p}")
141 ... case Err(FileTooLargeErr(size=s)):
142 ... print(f"Too big: {s} bytes")
144 """
145 path = Path(path)
147 # Validate path
148 try:
149 path = _validate_path(path, base_dir)
150 except SecurityError as e:
151 return Err(e)
153 if not path.exists():
154 return Err(FileNotFoundErr(path=path))
156 if not path.is_file():
157 return Err(NotAFileErr(path=path))
159 # Check file size
160 if max_size_bytes is not None:
161 file_size = path.stat().st_size
162 if file_size > max_size_bytes:
163 return Err(
164 FileTooLargeErr(path=path, size=file_size, max_size=max_size_bytes)
165 )
167 return Ok(path.read_text(encoding=encoding))
170def _validate_safe_write_path(path: Path, opts: WriteOptions) -> None:
171 """Validate the path for safe_write."""
172 if opts.base_dir is not None:
173 base = Path(opts.base_dir).resolve()
174 # For new files, validate the parent
175 if not path.exists():
176 parent = path.parent
177 guard_path_traversal(parent, base)
178 else:
179 guard_path_traversal(path, base)
180 else:
181 _validate_path(path)
184def _sanitize_write_path(path: Path) -> Path:
185 """Sanitize the filename for safe_write."""
186 safe_name = sanitize_filename(path.name)
187 if safe_name != path.name:
188 raise SecurityError(
189 f"Unsafe or invalid characters in filename: '{path.name}'. "
190 f"Expected safe name: '{safe_name}'",
191 guard_name="sanitize_filename",
192 value=path.name,
193 )
194 return path.parent / safe_name
197def _perform_atomic_write(path: Path, content: str, opts: WriteOptions) -> None:
198 """Perform an atomic write operation."""
199 # Write to temp file first, then rename
200 _fd, temp_path = tempfile.mkstemp(
201 dir=path.parent,
202 prefix=f".{path.name}.",
203 suffix=".tmp",
204 )
205 try:
206 # Write directly to the returned file descriptor to prevent TOCTOU
207 # We MUST close the file descriptor before renaming/modifying it,
208 # otherwise Windows will throw a PermissionError (WinError 32).
209 with os.fdopen(_fd, "w", encoding=opts.encoding) as f:
210 f.write(content)
211 f.flush()
212 os.fsync(_fd)
214 temp_file = Path(temp_path)
215 # Preserve permissions if original exists
216 if path.exists():
217 shutil.copymode(path, temp_file)
218 # On Windows, we need to remove the target first if it exists
219 if path.exists():
220 path.unlink()
221 temp_file.rename(path)
222 except Exception:
223 # Clean up descriptor and temp file on error
224 with contextlib.suppress(OSError):
225 os.close(_fd)
226 Path(temp_path).unlink(missing_ok=True)
227 raise
230def safe_write(
231 path: Path | str,
232 content: str,
233 *,
234 options: WriteOptions | None = None,
235) -> Path:
236 """Write to a file safely with path validation.
238 Args:
239 path: Path to write to.
240 content: Content to write.
241 options: Write options.
243 Returns:
244 Path to the written file.
246 Raises:
247 SecurityError: If path validation fails.
249 """
250 opts = options or WriteOptions()
251 path = Path(path)
253 _validate_safe_write_path(path, opts)
254 path = _sanitize_write_path(path)
256 # Create parents if needed
257 if opts.create_parents:
258 path.parent.mkdir(parents=True, exist_ok=True)
260 # Create backup if file exists
261 if opts.backup and path.exists():
262 backup_path = path.with_suffix(f"{path.suffix}.bak")
263 shutil.copy2(path, backup_path)
265 # Write file
266 if opts.atomic:
267 _perform_atomic_write(path, content, opts)
268 else:
269 path.write_text(content, encoding=opts.encoding)
271 return path.resolve()
274def ensure_dir(
275 path: Path | str,
276 *,
277 base_dir: Path | str | None = None,
278 mode: int = 0o755,
279) -> Path:
280 """Ensure a directory exists, creating it if needed.
282 Args:
283 path: Path to the directory.
284 base_dir: Base directory to constrain to.
285 mode: Directory permissions.
287 Returns:
288 Path to the directory.
290 Raises:
291 SecurityError: If path validation fails.
292 FileExistsError: If a file already exists at the given path or intermediate
293 paths.
295 """
296 path = Path(path)
298 # Validate path
299 path = _validate_path(path, base_dir, allow_symlinks=True)
300 resolved_path = path.resolve()
302 # Identify missing parent directories from root to leaf
303 paths_to_create: list[Path] = []
304 current_path = resolved_path
306 while not current_path.is_dir():
307 if current_path.exists():
308 raise FileExistsError(f"Path exists but is not a directory: {current_path}")
309 paths_to_create.insert(0, current_path)
310 parent = current_path.parent
311 if parent == current_path:
312 break
313 current_path = parent
315 # Iterate through parents and create them with specific mode
316 for p in paths_to_create:
317 p.mkdir(mode=mode, exist_ok=True)
319 return resolved_path