Coverage for src / taipanstack / utils / filesystem.py: 100%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Safe filesystem operations. 

3 

4Provides secure wrappers around file operations with path validation, 

5atomic writes, and proper error handling using Result types. 

6""" 

7 

8import contextlib 

9import os 

10import shutil 

11import tempfile 

12from dataclasses import dataclass 

13from pathlib import Path 

14from typing import TypeAlias 

15 

16from taipanstack.core.result import Err, Ok, Result 

17from taipanstack.security.guards import ( 

18 TRAVERSAL_REGEX, 

19 SecurityError, 

20 guard_path_traversal, 

21) 

22from taipanstack.security.sanitizers import sanitize_filename 

23 

24 

25@dataclass(frozen=True) 

26class FileNotFoundErr: 

27 """Error when file is not found.""" 

28 

29 path: Path 

30 

31 @property 

32 def message(self) -> str: 

33 """Get the error message.""" 

34 return f"File not found: {self.path}" 

35 

36 

37@dataclass(frozen=True) 

38class NotAFileErr: 

39 """Error when path is not a file.""" 

40 

41 path: Path 

42 

43 @property 

44 def message(self) -> str: 

45 """Get the error message.""" 

46 return f"Not a file: {self.path}" 

47 

48 

49def _validate_path( 

50 path: Path | str, 

51 base_dir: Path | str | None = None, 

52 *, 

53 allow_symlinks: bool = False, 

54) -> Path: 

55 """Validate path for traversal. 

56 

57 If base_dir is None, we only check for explicit traversal patterns 

58 to allow absolute paths (required for tests and some use cases), 

59 but still prevent '..' attacks. 

60 """ 

61 path = Path(path) 

62 if base_dir is not None: 

63 return guard_path_traversal(path, base_dir, allow_symlinks=allow_symlinks) 

64 

65 # Check for explicit traversal patterns 

66 path_str = str(path).lower() 

67 if TRAVERSAL_REGEX.search(path_str): 

68 raise SecurityError( 

69 "Path traversal pattern detected", 

70 guard_name="path_traversal", 

71 value=path_str[:50], 

72 ) 

73 return path 

74 

75 

76@dataclass(frozen=True) 

77class FileTooLargeErr: 

78 """Error when file exceeds size limit.""" 

79 

80 path: Path 

81 size: int 

82 max_size: int 

83 

84 @property 

85 def message(self) -> str: 

86 """Get the error message.""" 

87 return f"File too large: {self.size} bytes (max: {self.max_size})" 

88 

89 

90@dataclass(frozen=True) 

91class WriteOptions: 

92 """Options for safe_write. 

93 

94 Attributes: 

95 base_dir: Base directory to constrain to. 

96 encoding: File encoding. 

97 create_parents: Create parent directories if needed. 

98 backup: Create backup of existing file. 

99 atomic: Use atomic write. 

100 

101 """ 

102 

103 base_dir: Path | str | None = None 

104 encoding: str = "utf-8" 

105 create_parents: bool = True 

106 backup: bool = True 

107 atomic: bool = True 

108 

109 

110# Union type for safe_read errors 

111ReadFileError: TypeAlias = ( 

112 FileNotFoundErr | NotAFileErr | FileTooLargeErr | SecurityError 

113) 

114 

115 

116def safe_read( 

117 path: Path | str, 

118 *, 

119 base_dir: Path | str | None = None, 

120 encoding: str = "utf-8", 

121 max_size_bytes: int | None = 10 * 1024 * 1024, # 10MB default 

122) -> Result[str, ReadFileError]: 

123 """Read a file safely with path validation. 

124 

125 Args: 

126 path: Path to the file to read. 

127 base_dir: Base directory to constrain to. 

128 encoding: File encoding. 

129 max_size_bytes: Maximum file size to read (None for no limit). 

130 

131 Returns: 

132 Ok(str): File contents on success. 

133 Err(ReadFileError): Error details on failure. 

134 

135 Example: 

136 >>> match safe_read("config.json"): 

137 ... case Ok(content): 

138 ... data = json.loads(content) 

139 ... case Err(FileNotFoundErr(path=p)): 

140 ... print(f"Missing: {p}") 

141 ... case Err(FileTooLargeErr(size=s)): 

142 ... print(f"Too big: {s} bytes") 

143 

144 """ 

145 path = Path(path) 

146 

147 # Validate path 

148 try: 

149 path = _validate_path(path, base_dir) 

150 except SecurityError as e: 

151 return Err(e) 

152 

153 if not path.exists(): 

154 return Err(FileNotFoundErr(path=path)) 

155 

156 if not path.is_file(): 

157 return Err(NotAFileErr(path=path)) 

158 

159 # Check file size 

160 if max_size_bytes is not None: 

161 file_size = path.stat().st_size 

162 if file_size > max_size_bytes: 

163 return Err( 

164 FileTooLargeErr(path=path, size=file_size, max_size=max_size_bytes) 

165 ) 

166 

167 return Ok(path.read_text(encoding=encoding)) 

168 

169 

170def _validate_safe_write_path(path: Path, opts: WriteOptions) -> None: 

171 """Validate the path for safe_write.""" 

172 if opts.base_dir is not None: 

173 base = Path(opts.base_dir).resolve() 

174 # For new files, validate the parent 

175 if not path.exists(): 

176 parent = path.parent 

177 guard_path_traversal(parent, base) 

178 else: 

179 guard_path_traversal(path, base) 

180 else: 

181 _validate_path(path) 

182 

183 

184def _sanitize_write_path(path: Path) -> Path: 

185 """Sanitize the filename for safe_write.""" 

186 safe_name = sanitize_filename(path.name) 

187 if safe_name != path.name: 

188 raise SecurityError( 

189 f"Unsafe or invalid characters in filename: '{path.name}'. " 

190 f"Expected safe name: '{safe_name}'", 

191 guard_name="sanitize_filename", 

192 value=path.name, 

193 ) 

194 return path.parent / safe_name 

195 

196 

197def _perform_atomic_write(path: Path, content: str, opts: WriteOptions) -> None: 

198 """Perform an atomic write operation.""" 

199 # Write to temp file first, then rename 

200 _fd, temp_path = tempfile.mkstemp( 

201 dir=path.parent, 

202 prefix=f".{path.name}.", 

203 suffix=".tmp", 

204 ) 

205 try: 

206 # Write directly to the returned file descriptor to prevent TOCTOU 

207 # We MUST close the file descriptor before renaming/modifying it, 

208 # otherwise Windows will throw a PermissionError (WinError 32). 

209 with os.fdopen(_fd, "w", encoding=opts.encoding) as f: 

210 f.write(content) 

211 f.flush() 

212 os.fsync(_fd) 

213 

214 temp_file = Path(temp_path) 

215 # Preserve permissions if original exists 

216 if path.exists(): 

217 shutil.copymode(path, temp_file) 

218 # On Windows, we need to remove the target first if it exists 

219 if path.exists(): 

220 path.unlink() 

221 temp_file.rename(path) 

222 except Exception: 

223 # Clean up descriptor and temp file on error 

224 with contextlib.suppress(OSError): 

225 os.close(_fd) 

226 Path(temp_path).unlink(missing_ok=True) 

227 raise 

228 

229 

230def safe_write( 

231 path: Path | str, 

232 content: str, 

233 *, 

234 options: WriteOptions | None = None, 

235) -> Path: 

236 """Write to a file safely with path validation. 

237 

238 Args: 

239 path: Path to write to. 

240 content: Content to write. 

241 options: Write options. 

242 

243 Returns: 

244 Path to the written file. 

245 

246 Raises: 

247 SecurityError: If path validation fails. 

248 

249 """ 

250 opts = options or WriteOptions() 

251 path = Path(path) 

252 

253 _validate_safe_write_path(path, opts) 

254 path = _sanitize_write_path(path) 

255 

256 # Create parents if needed 

257 if opts.create_parents: 

258 path.parent.mkdir(parents=True, exist_ok=True) 

259 

260 # Create backup if file exists 

261 if opts.backup and path.exists(): 

262 backup_path = path.with_suffix(f"{path.suffix}.bak") 

263 shutil.copy2(path, backup_path) 

264 

265 # Write file 

266 if opts.atomic: 

267 _perform_atomic_write(path, content, opts) 

268 else: 

269 path.write_text(content, encoding=opts.encoding) 

270 

271 return path.resolve() 

272 

273 

274def ensure_dir( 

275 path: Path | str, 

276 *, 

277 base_dir: Path | str | None = None, 

278 mode: int = 0o755, 

279) -> Path: 

280 """Ensure a directory exists, creating it if needed. 

281 

282 Args: 

283 path: Path to the directory. 

284 base_dir: Base directory to constrain to. 

285 mode: Directory permissions. 

286 

287 Returns: 

288 Path to the directory. 

289 

290 Raises: 

291 SecurityError: If path validation fails. 

292 FileExistsError: If a file already exists at the given path or intermediate 

293 paths. 

294 

295 """ 

296 path = Path(path) 

297 

298 # Validate path 

299 path = _validate_path(path, base_dir, allow_symlinks=True) 

300 resolved_path = path.resolve() 

301 

302 # Identify missing parent directories from root to leaf 

303 paths_to_create: list[Path] = [] 

304 current_path = resolved_path 

305 

306 while not current_path.is_dir(): 

307 if current_path.exists(): 

308 raise FileExistsError(f"Path exists but is not a directory: {current_path}") 

309 paths_to_create.insert(0, current_path) 

310 parent = current_path.parent 

311 if parent == current_path: 

312 break 

313 current_path = parent 

314 

315 # Iterate through parents and create them with specific mode 

316 for p in paths_to_create: 

317 p.mkdir(mode=mode, exist_ok=True) 

318 

319 return resolved_path