Coverage for src / taipanstack / utils / subprocess.py: 100%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 14:54 +0000

1""" 

2Safe subprocess execution with security guards. 

3 

4Provides secure wrappers around subprocess execution with 

5command validation, timeout handling, and retry logic. 

6""" 

7 

8import os 

9import shutil 

10import subprocess # nosec B404 

11import time 

12from collections.abc import Sequence 

13from dataclasses import dataclass 

14from pathlib import Path 

15 

16from taipanstack.security.guards import ( 

17 _DEFAULT_DENIED_ENV_VARS, 

18 _SENSITIVE_ENV_VAR_PATTERN, 

19 SecurityError, 

20 guard_command_injection, 

21) 

22 

23 

24@dataclass(frozen=True) 

25class SafeCommandResult: 

26 """Result of a safe command execution. 

27 

28 Attributes: 

29 command: The executed command. 

30 returncode: Exit code of the command. 

31 stdout: Standard output. 

32 stderr: Standard error. 

33 success: Whether the command succeeded (returncode == 0). 

34 duration_seconds: How long the command took. 

35 

36 """ 

37 

38 command: list[str] 

39 returncode: int 

40 stdout: str = "" 

41 stderr: str = "" 

42 duration_seconds: float = 0.0 

43 

44 @property 

45 def success(self) -> bool: 

46 """Check if command succeeded.""" 

47 return self.returncode == 0 

48 

49 def raise_on_error(self) -> "SafeCommandResult": 

50 """Raise an exception if command failed. 

51 

52 Returns: 

53 Self if successful. 

54 

55 Raises: 

56 subprocess.CalledProcessError: If command failed. 

57 

58 """ 

59 if not self.success: 

60 raise subprocess.CalledProcessError( 

61 self.returncode, 

62 self.command, 

63 self.stdout, 

64 self.stderr, 

65 ) 

66 return self 

67 

68 

69# Default allowed commands whitelist 

70DEFAULT_ALLOWED_COMMANDS: frozenset[str] = frozenset( 

71 { 

72 # Python/Poetry 

73 "python", 

74 "python3", 

75 "pip", 

76 "pip3", 

77 "poetry", 

78 "pipx", 

79 # Version control 

80 "git", 

81 # Build tools 

82 "make", 

83 # Testing 

84 "pytest", 

85 "mypy", 

86 "ruff", 

87 "bandit", 

88 "safety", 

89 "semgrep", 

90 "pre-commit", 

91 # System 

92 "echo", 

93 "cat", 

94 "ls", 

95 "pwd", 

96 "mkdir", 

97 "rm", 

98 "cp", 

99 "mv", 

100 "touch", 

101 "chmod", 

102 "which", 

103 } 

104) 

105 

106 

107def _validate_and_resolve_command( 

108 command: Sequence[str], 

109 allowed_commands: Sequence[str] | None, 

110) -> list[str]: 

111 """Validate and resolve a command. 

112 

113 Args: 

114 command: Command and arguments as a sequence. 

115 allowed_commands: Whitelist of allowed commands. 

116 

117 Returns: 

118 The validated command as a list. 

119 

120 Raises: 

121 SecurityError: If validation fails. 

122 

123 """ 

124 cmd_list = list(command) 

125 

126 if not cmd_list: 

127 raise SecurityError( 

128 "Empty command is not allowed", 

129 guard_name="safe_command", 

130 ) 

131 

132 if allowed_commands is not None: 

133 whitelist = list(allowed_commands) 

134 else: 

135 whitelist = list(DEFAULT_ALLOWED_COMMANDS) 

136 

137 validated_cmd = guard_command_injection(cmd_list, allowed_commands=whitelist) 

138 

139 base_command = validated_cmd[0] 

140 if not shutil.which(base_command): 

141 raise SecurityError( 

142 f"Command not found: {base_command}", 

143 guard_name="safe_command", 

144 value=base_command, 

145 ) 

146 

147 return validated_cmd 

148 

149 

150def _filter_environment(env: dict[str, str] | None) -> dict[str, str]: 

151 """Filter environment variables for safe execution. 

152 

153 Args: 

154 env: Environment variables to filter. 

155 

156 Returns: 

157 Filtered environment variables. 

158 

159 """ 

160 safe_env: dict[str, str] = {} 

161 env_to_filter = env if env is not None else dict(os.environ) 

162 

163 for env_key, env_val in env_to_filter.items(): 

164 name_upper = env_key.upper() 

165 if ( 

166 name_upper not in _DEFAULT_DENIED_ENV_VARS 

167 and not _SENSITIVE_ENV_VAR_PATTERN.match(name_upper) 

168 ): 

169 safe_env[env_key] = str(env_val) 

170 

171 return safe_env 

172 

173 

174def _execute_command( 

175 validated_cmd: list[str], 

176 cwd: Path | None, 

177 timeout: float, 

178 capture_output: bool, 

179 safe_env: dict[str, str], 

180) -> SafeCommandResult: 

181 """Execute the command and handle TimeoutExpired. 

182 

183 Args: 

184 validated_cmd: Validated command list. 

185 cwd: Resolved working directory. 

186 timeout: Execution timeout. 

187 capture_output: Capture stdout/stderr. 

188 safe_env: Filtered environment variables. 

189 

190 Returns: 

191 The execution result. 

192 

193 """ 

194 start_time = time.time() 

195 

196 try: 

197 result = subprocess.run( # nosec B603 

198 validated_cmd, 

199 cwd=cwd, 

200 timeout=timeout, 

201 capture_output=capture_output, 

202 text=True, 

203 encoding="utf-8", 

204 env=safe_env, 

205 check=False, 

206 ) 

207 except subprocess.TimeoutExpired as e: 

208 duration = time.time() - start_time 

209 stdout_str = "" 

210 if hasattr(e, "stdout") and e.stdout is not None: # pragma: no branch 

211 if isinstance(e.stdout, str): 

212 stdout_str = e.stdout 

213 else: # pragma: no cover 

214 stdout_str = e.stdout.decode("utf-8", errors="replace") 

215 return SafeCommandResult( 

216 command=validated_cmd, 

217 returncode=-1, 

218 stdout=stdout_str, 

219 stderr=f"Command timed out after {timeout}s", 

220 duration_seconds=duration, 

221 ) 

222 

223 duration = time.time() - start_time 

224 

225 return SafeCommandResult( 

226 command=validated_cmd, 

227 returncode=result.returncode, 

228 stdout=result.stdout or "", 

229 stderr=result.stderr or "", 

230 duration_seconds=duration, 

231 ) 

232 

233 

234def run_safe_command( 

235 command: Sequence[str], 

236 *, 

237 cwd: Path | str | None = None, 

238 timeout: float = 300.0, 

239 capture_output: bool = True, 

240 check: bool = False, 

241 allowed_commands: Sequence[str] | None = None, 

242 env: dict[str, str] | None = None, 

243 dry_run: bool = False, 

244) -> SafeCommandResult: 

245 """Execute a command safely with security guards. 

246 

247 This function provides a secure wrapper around subprocess.run 

248 with command injection protection, timeout handling, and 

249 optional command whitelisting. 

250 

251 Args: 

252 command: Command and arguments as a sequence. 

253 cwd: Working directory for the command. 

254 timeout: Maximum execution time in seconds. 

255 capture_output: Whether to capture stdout/stderr. 

256 check: Whether to raise on non-zero exit. 

257 allowed_commands: Whitelist of allowed commands. 

258 env: Environment variables to set. 

259 dry_run: If True, don't actually execute the command. 

260 

261 Returns: 

262 SafeCommandResult with execution details. 

263 

264 Raises: 

265 SecurityError: If command validation fails. 

266 subprocess.TimeoutExpired: If command times out. 

267 subprocess.CalledProcessError: If check=True and command fails. 

268 

269 Example: 

270 >>> result = run_safe_command(["poetry", "install"]) 

271 >>> if result.success: 

272 ... print("Installation complete!") 

273 

274 """ 

275 validated_cmd = _validate_and_resolve_command(command, allowed_commands) 

276 safe_env = _filter_environment(env) 

277 

278 if dry_run: 

279 return SafeCommandResult( 

280 command=validated_cmd, 

281 returncode=0, 

282 stdout=f"[DRY-RUN] Would execute: {' '.join(validated_cmd)}", 

283 stderr="", 

284 duration_seconds=0.0, 

285 ) 

286 

287 resolved_cwd: Path | None = None 

288 if cwd is not None: 

289 resolved_cwd = Path(cwd).resolve() 

290 if not resolved_cwd.exists(): 

291 raise SecurityError( 

292 f"Working directory does not exist: {cwd}", 

293 guard_name="safe_command", 

294 ) 

295 

296 safe_result = _execute_command( 

297 validated_cmd, 

298 resolved_cwd, 

299 timeout, 

300 capture_output, 

301 safe_env, 

302 ) 

303 

304 if check: 

305 safe_result.raise_on_error() 

306 

307 return safe_result