Coverage for src / taipanstack / utils / subprocess.py: 100%
70 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 14:54 +0000
1"""
2Safe subprocess execution with security guards.
4Provides secure wrappers around subprocess execution with
5command validation, timeout handling, and retry logic.
6"""
8import os
9import shutil
10import subprocess # nosec B404
11import time
12from collections.abc import Sequence
13from dataclasses import dataclass
14from pathlib import Path
16from taipanstack.security.guards import (
17 _DEFAULT_DENIED_ENV_VARS,
18 _SENSITIVE_ENV_VAR_PATTERN,
19 SecurityError,
20 guard_command_injection,
21)
24@dataclass(frozen=True)
25class SafeCommandResult:
26 """Result of a safe command execution.
28 Attributes:
29 command: The executed command.
30 returncode: Exit code of the command.
31 stdout: Standard output.
32 stderr: Standard error.
33 success: Whether the command succeeded (returncode == 0).
34 duration_seconds: How long the command took.
36 """
38 command: list[str]
39 returncode: int
40 stdout: str = ""
41 stderr: str = ""
42 duration_seconds: float = 0.0
44 @property
45 def success(self) -> bool:
46 """Check if command succeeded."""
47 return self.returncode == 0
49 def raise_on_error(self) -> "SafeCommandResult":
50 """Raise an exception if command failed.
52 Returns:
53 Self if successful.
55 Raises:
56 subprocess.CalledProcessError: If command failed.
58 """
59 if not self.success:
60 raise subprocess.CalledProcessError(
61 self.returncode,
62 self.command,
63 self.stdout,
64 self.stderr,
65 )
66 return self
69# Default allowed commands whitelist
70DEFAULT_ALLOWED_COMMANDS: frozenset[str] = frozenset(
71 {
72 # Python/Poetry
73 "python",
74 "python3",
75 "pip",
76 "pip3",
77 "poetry",
78 "pipx",
79 # Version control
80 "git",
81 # Build tools
82 "make",
83 # Testing
84 "pytest",
85 "mypy",
86 "ruff",
87 "bandit",
88 "safety",
89 "semgrep",
90 "pre-commit",
91 # System
92 "echo",
93 "cat",
94 "ls",
95 "pwd",
96 "mkdir",
97 "rm",
98 "cp",
99 "mv",
100 "touch",
101 "chmod",
102 "which",
103 }
104)
107def _validate_and_resolve_command(
108 command: Sequence[str],
109 allowed_commands: Sequence[str] | None,
110) -> list[str]:
111 """Validate and resolve a command.
113 Args:
114 command: Command and arguments as a sequence.
115 allowed_commands: Whitelist of allowed commands.
117 Returns:
118 The validated command as a list.
120 Raises:
121 SecurityError: If validation fails.
123 """
124 cmd_list = list(command)
126 if not cmd_list:
127 raise SecurityError(
128 "Empty command is not allowed",
129 guard_name="safe_command",
130 )
132 if allowed_commands is not None:
133 whitelist = list(allowed_commands)
134 else:
135 whitelist = list(DEFAULT_ALLOWED_COMMANDS)
137 validated_cmd = guard_command_injection(cmd_list, allowed_commands=whitelist)
139 base_command = validated_cmd[0]
140 if not shutil.which(base_command):
141 raise SecurityError(
142 f"Command not found: {base_command}",
143 guard_name="safe_command",
144 value=base_command,
145 )
147 return validated_cmd
150def _filter_environment(env: dict[str, str] | None) -> dict[str, str]:
151 """Filter environment variables for safe execution.
153 Args:
154 env: Environment variables to filter.
156 Returns:
157 Filtered environment variables.
159 """
160 safe_env: dict[str, str] = {}
161 env_to_filter = env if env is not None else dict(os.environ)
163 for env_key, env_val in env_to_filter.items():
164 name_upper = env_key.upper()
165 if (
166 name_upper not in _DEFAULT_DENIED_ENV_VARS
167 and not _SENSITIVE_ENV_VAR_PATTERN.match(name_upper)
168 ):
169 safe_env[env_key] = str(env_val)
171 return safe_env
174def _execute_command(
175 validated_cmd: list[str],
176 cwd: Path | None,
177 timeout: float,
178 capture_output: bool,
179 safe_env: dict[str, str],
180) -> SafeCommandResult:
181 """Execute the command and handle TimeoutExpired.
183 Args:
184 validated_cmd: Validated command list.
185 cwd: Resolved working directory.
186 timeout: Execution timeout.
187 capture_output: Capture stdout/stderr.
188 safe_env: Filtered environment variables.
190 Returns:
191 The execution result.
193 """
194 start_time = time.time()
196 try:
197 result = subprocess.run( # nosec B603
198 validated_cmd,
199 cwd=cwd,
200 timeout=timeout,
201 capture_output=capture_output,
202 text=True,
203 encoding="utf-8",
204 env=safe_env,
205 check=False,
206 )
207 except subprocess.TimeoutExpired as e:
208 duration = time.time() - start_time
209 stdout_str = ""
210 if hasattr(e, "stdout") and e.stdout is not None: # pragma: no branch
211 if isinstance(e.stdout, str):
212 stdout_str = e.stdout
213 else: # pragma: no cover
214 stdout_str = e.stdout.decode("utf-8", errors="replace")
215 return SafeCommandResult(
216 command=validated_cmd,
217 returncode=-1,
218 stdout=stdout_str,
219 stderr=f"Command timed out after {timeout}s",
220 duration_seconds=duration,
221 )
223 duration = time.time() - start_time
225 return SafeCommandResult(
226 command=validated_cmd,
227 returncode=result.returncode,
228 stdout=result.stdout or "",
229 stderr=result.stderr or "",
230 duration_seconds=duration,
231 )
234def run_safe_command(
235 command: Sequence[str],
236 *,
237 cwd: Path | str | None = None,
238 timeout: float = 300.0,
239 capture_output: bool = True,
240 check: bool = False,
241 allowed_commands: Sequence[str] | None = None,
242 env: dict[str, str] | None = None,
243 dry_run: bool = False,
244) -> SafeCommandResult:
245 """Execute a command safely with security guards.
247 This function provides a secure wrapper around subprocess.run
248 with command injection protection, timeout handling, and
249 optional command whitelisting.
251 Args:
252 command: Command and arguments as a sequence.
253 cwd: Working directory for the command.
254 timeout: Maximum execution time in seconds.
255 capture_output: Whether to capture stdout/stderr.
256 check: Whether to raise on non-zero exit.
257 allowed_commands: Whitelist of allowed commands.
258 env: Environment variables to set.
259 dry_run: If True, don't actually execute the command.
261 Returns:
262 SafeCommandResult with execution details.
264 Raises:
265 SecurityError: If command validation fails.
266 subprocess.TimeoutExpired: If command times out.
267 subprocess.CalledProcessError: If check=True and command fails.
269 Example:
270 >>> result = run_safe_command(["poetry", "install"])
271 >>> if result.success:
272 ... print("Installation complete!")
274 """
275 validated_cmd = _validate_and_resolve_command(command, allowed_commands)
276 safe_env = _filter_environment(env)
278 if dry_run:
279 return SafeCommandResult(
280 command=validated_cmd,
281 returncode=0,
282 stdout=f"[DRY-RUN] Would execute: {' '.join(validated_cmd)}",
283 stderr="",
284 duration_seconds=0.0,
285 )
287 resolved_cwd: Path | None = None
288 if cwd is not None:
289 resolved_cwd = Path(cwd).resolve()
290 if not resolved_cwd.exists():
291 raise SecurityError(
292 f"Working directory does not exist: {cwd}",
293 guard_name="safe_command",
294 )
296 safe_result = _execute_command(
297 validated_cmd,
298 resolved_cwd,
299 timeout,
300 capture_output,
301 safe_env,
302 )
304 if check:
305 safe_result.raise_on_error()
307 return safe_result