Coverage for src / taipanstack / bridges / http_bridge.py: 100%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2HTTP Bridge — safe httpx client with SSRF protection and resilience. 

3 

4Wraps ``httpx.AsyncClient`` with TaipanStack's ``guard_ssrf``, 

5retry, and circuit breaker integrations. All outbound URLs are 

6validated against SSRF before the request is sent. 

7""" 

8 

9from __future__ import annotations 

10 

11import asyncio 

12import logging 

13from collections.abc import Awaitable, Callable, Mapping, Sequence 

14from typing import TYPE_CHECKING, cast 

15 

16from typing_extensions import TypedDict, Unpack 

17 

18from taipanstack.core.result import Err, Ok, Result 

19from taipanstack.resilience.circuit_breaker import ( 

20 CircuitBreaker, 

21 CircuitBreakerError, 

22) 

23from taipanstack.resilience.retry import RetryConfig, calculate_delay 

24from taipanstack.security.guards import guard_ssrf 

25 

26logger = logging.getLogger("taipanstack.bridges.http") 

27 

28 

29class HttpRequestKwargs(TypedDict, total=False): 

30 """Type definitions for HTTP request kwargs.""" 

31 

32 content: bytes | str | None 

33 data: ( 

34 dict[str, str | int | float | bool | None] 

35 | list[tuple[str, str]] 

36 | bytes 

37 | str 

38 | None 

39 ) 

40 files: dict[str, bytes | tuple[str, bytes]] 

41 json: dict[str, object] | list[object] | str | int | float | bool | None 

42 params: ( 

43 dict[ 

44 str, 

45 str | int | float | bool | None | Sequence[str | int | float | bool | None], 

46 ] 

47 | list[tuple[str, str | int | float | bool | None]] 

48 | str 

49 | bytes 

50 | None 

51 ) 

52 headers: dict[str, str] 

53 cookies: dict[str, str] 

54 auth: tuple[str, str] 

55 follow_redirects: bool 

56 extensions: dict[str, object] 

57 

58 

59class HttpClientKwargs(TypedDict, total=False): 

60 """Type definitions for HTTP client kwargs.""" 

61 

62 base_url: str 

63 headers: dict[str, str] 

64 cookies: dict[str, str] 

65 verify: bool | str 

66 cert: str | tuple[str, str] | tuple[str, str, str] 

67 http1: bool 

68 http2: bool 

69 proxy: str 

70 mounts: Mapping[str, httpx.AsyncBaseTransport | None] 

71 follow_redirects: bool 

72 max_redirects: int 

73 event_hooks: dict[str, list[Callable[..., object]]] 

74 trust_env: bool 

75 default_encoding: str | Callable[[bytes], str] 

76 

77 

78# --- optional httpx import ------------------------------------------------ 

79 

80try: 

81 import httpx 

82 

83 _HAS_HTTPX = True 

84except ImportError: 

85 _HAS_HTTPX = False 

86 

87if TYPE_CHECKING: 

88 import httpx 

89 

90# Default status codes that trigger a retry 

91_RETRYABLE_STATUS_CODES: frozenset[int] = frozenset({500, 502, 503, 504, 429}) 

92 

93 

94def _check_circuit_breaker( 

95 circuit_breaker: CircuitBreaker, 

96) -> CircuitBreakerError | None: 

97 """Check circuit breaker state and return error if OPEN. 

98 

99 Args: 

100 circuit_breaker: The circuit breaker to check. 

101 

102 Returns: 

103 ``CircuitBreakerError`` if open, ``None`` otherwise. 

104 

105 """ 

106 if not circuit_breaker._should_attempt(): 

107 return CircuitBreakerError( 

108 f"Circuit '{circuit_breaker.name}' is open", 

109 state=circuit_breaker.state, 

110 ) 

111 return None 

112 

113 

114def _check_ssrf(url: str, ssrf_protection: bool) -> Result[None, Exception]: 

115 """Validate URL against SSRF if protection is enabled. 

116 

117 Args: 

118 url: The URL to validate. 

119 ssrf_protection: Whether SSRF protection is enabled. 

120 

121 Returns: 

122 ``Ok(None)`` if valid or protection disabled, ``Err(Exception)`` otherwise. 

123 

124 """ 

125 if not ssrf_protection: 

126 return Ok(None) 

127 

128 ssrf_result = guard_ssrf(url) 

129 match ssrf_result: 

130 case Err(security_err): 

131 return Err(security_err) 

132 case Ok(): # pragma: no branch 

133 return Ok(None) 

134 

135 

136def _should_retry_status( 

137 response: httpx.Response, 

138 retry_config: RetryConfig | None, 

139 retryable_status_codes: frozenset[int], 

140 attempt: int, 

141 max_attempts: int, 

142) -> bool: 

143 """Determine if a request should be retried based on status code.""" 

144 return ( 

145 retry_config is not None 

146 and response.status_code in retryable_status_codes 

147 and attempt < max_attempts 

148 ) 

149 

150 

151async def _handle_http_exception( 

152 exc: Exception, 

153 attempt: int, 

154 max_attempts: int, 

155 retry_config: RetryConfig | None, 

156 circuit_breaker: CircuitBreaker | None, 

157) -> bool: 

158 """Handle exception and return True if we should retry, False otherwise.""" 

159 if circuit_breaker is not None: # pragma: no branch 

160 circuit_breaker._record_failure(exc) 

161 if retry_config is not None and attempt < max_attempts: 

162 delay = calculate_delay(attempt, retry_config) 

163 await asyncio.sleep(min(delay, 3600.0)) 

164 return True 

165 return False 

166 

167 

168async def _execute_single_attempt( 

169 request_func: Callable[[], Awaitable[httpx.Response]], 

170 retry_config: RetryConfig | None, 

171 circuit_breaker: CircuitBreaker | None, 

172 retryable_status_codes: frozenset[int], 

173 attempt: int, 

174 max_attempts: int, 

175) -> Result[httpx.Response, Exception] | Exception | bool: 

176 """Execute a single HTTP attempt. 

177 

178 Returns Result on success/final failure, Exception if it failed and we 

179 should record it, or True if we should just retry because of status code. 

180 """ 

181 try: 

182 response = await request_func() 

183 if _should_retry_status( 

184 response, 

185 retry_config, 

186 retryable_status_codes, 

187 attempt, 

188 max_attempts, 

189 ): 

190 # Config is not None if we reach here 

191 delay = calculate_delay(attempt, cast(RetryConfig, retry_config)) 

192 await asyncio.sleep(min(delay, 3600.0)) 

193 return True 

194 return Ok(response) 

195 except Exception as exc: 

196 should_retry = await _handle_http_exception( 

197 exc, attempt, max_attempts, retry_config, circuit_breaker 

198 ) 

199 if should_retry: 

200 return exc 

201 return Err(exc) 

202 

203 

204async def _execute_with_retries( 

205 request_func: Callable[[], Awaitable[httpx.Response]], 

206 retry_config: RetryConfig | None, 

207 circuit_breaker: CircuitBreaker | None, 

208 retryable_status_codes: frozenset[int], 

209) -> Result[httpx.Response, Exception]: 

210 """Execute a request function with optional retries and circuit breaker. 

211 

212 Args: 

213 request_func: Async callable that performs the request. 

214 retry_config: Optional retry configuration. 

215 circuit_breaker: Optional circuit breaker. 

216 retryable_status_codes: Status codes to retry on. 

217 

218 Returns: 

219 ``Ok(Response)`` on success, ``Err`` on failure. 

220 

221 """ 

222 max_attempts = retry_config.max_attempts if retry_config is not None else 1 

223 last_error: Exception | None = None 

224 

225 for attempt in range(1, max_attempts + 1): 

226 outcome = await _execute_single_attempt( 

227 request_func, 

228 retry_config, 

229 circuit_breaker, 

230 retryable_status_codes, 

231 attempt, 

232 max_attempts, 

233 ) 

234 if isinstance(outcome, (Ok, Err)): 

235 return outcome 

236 if isinstance(outcome, Exception): 

237 last_error = outcome 

238 

239 return Err(last_error or RuntimeError("Request failed")) 

240 

241 

242async def safe_request( 

243 method: str, 

244 url: str, 

245 *, 

246 ssrf_protection: bool = True, 

247 retry_config: RetryConfig | None = None, 

248 circuit_breaker: CircuitBreaker | None = None, 

249 retryable_status_codes: frozenset[int] = _RETRYABLE_STATUS_CODES, 

250 timeout: float | None = 10.0, 

251 **kwargs: Unpack[HttpRequestKwargs], 

252) -> Result[httpx.Response, Exception]: 

253 """Perform a one-shot HTTP request with safety features. 

254 

255 Args: 

256 method: HTTP method (GET, POST, etc.). 

257 url: Target URL. 

258 ssrf_protection: Validate URL against SSRF. 

259 retry_config: Optional retry configuration. 

260 circuit_breaker: Optional circuit breaker. 

261 retryable_status_codes: Status codes that trigger retries. 

262 timeout: Explicit timeout in seconds (default: 10.0). 

263 **kwargs: Passed to ``httpx.AsyncClient.request``. 

264 

265 Returns: 

266 ``Ok(Response)`` on success, ``Err`` on failure. 

267 

268 """ 

269 if not _HAS_HTTPX: 

270 return Err( 

271 ImportError( 

272 "httpx is required for HTTP bridge. " 

273 "Install with: pip install taipanstack[bridges-http]" 

274 ) 

275 ) 

276 

277 # SSRF check 

278 ssrf_check = _check_ssrf(url, ssrf_protection) 

279 if isinstance(ssrf_check, Err): 

280 return ssrf_check 

281 

282 # Circuit breaker gate 

283 if circuit_breaker is not None: 

284 cb_err = _check_circuit_breaker(circuit_breaker) 

285 if cb_err is not None: 

286 return Err(cb_err) 

287 

288 async def _do_request() -> httpx.Response: 

289 async with httpx.AsyncClient(timeout=timeout) as client: # nosemgrep 

290 request_func = cast( 

291 Callable[..., Awaitable[httpx.Response]], client.request 

292 ) 

293 response = await request_func(method, url, **kwargs) 

294 return response 

295 

296 return await _execute_with_retries( 

297 _do_request, 

298 retry_config, 

299 circuit_breaker, 

300 retryable_status_codes, 

301 ) 

302 

303 

304class SafeHttpClient: 

305 """Async context manager wrapping httpx with TaipanStack safety. 

306 

307 Args: 

308 ssrf_protection: Enable SSRF validation on all requests. 

309 retry_config: Retry configuration for transient failures. 

310 circuit_breaker: Optional circuit breaker for all requests. 

311 retryable_status_codes: HTTP status codes to retry on. 

312 **client_kwargs: Passed to ``httpx.AsyncClient``. 

313 

314 Example: 

315 >>> async with SafeHttpClient() as client: 

316 ... result = await client.get("https://api.example.com/data") 

317 ... match result: 

318 ... case Ok(response): print(response.json()) 

319 ... case Err(e): print(f"Error: {e}") 

320 

321 """ 

322 

323 def __init__( 

324 self, 

325 *, 

326 ssrf_protection: bool = True, 

327 retry_config: RetryConfig | None = None, 

328 circuit_breaker: CircuitBreaker | None = None, 

329 retryable_status_codes: frozenset[int] = _RETRYABLE_STATUS_CODES, 

330 timeout: float = 10.0, 

331 **client_kwargs: Unpack[HttpClientKwargs], 

332 ) -> None: 

333 """Initialize the safe HTTP client. 

334 

335 Args: 

336 ssrf_protection: Enable SSRF validation. 

337 retry_config: Retry configuration. 

338 circuit_breaker: Circuit breaker instance. 

339 retryable_status_codes: Status codes to retry. 

340 **client_kwargs: Keyword args for httpx.AsyncClient. 

341 Default timeout is 10.0 seconds if not provided. 

342 

343 """ 

344 self._ssrf_protection = ssrf_protection 

345 self._retry_config = retry_config 

346 self._circuit_breaker = circuit_breaker 

347 self._retryable_status_codes = retryable_status_codes 

348 self._client_kwargs = client_kwargs 

349 self._timeout = timeout 

350 self._client: httpx.AsyncClient | None = None 

351 

352 async def __aenter__(self) -> SafeHttpClient: 

353 """Enter the async context manager.""" 

354 if not _HAS_HTTPX: 

355 msg = ( 

356 "httpx is required for SafeHttpClient. " 

357 "Install with: pip install taipanstack[bridges-http]" 

358 ) 

359 raise ImportError(msg) 

360 self._client = httpx.AsyncClient( 

361 timeout=self._timeout, 

362 **self._client_kwargs, 

363 ) # nosemgrep 

364 return self 

365 

366 async def __aexit__( 

367 self, 

368 _exc_type: type[BaseException] | None, 

369 _exc_val: BaseException | None, 

370 _exc_tb: object, 

371 ) -> None: 

372 """Exit the async context manager.""" 

373 if self._client is not None: 

374 await self._client.aclose() 

375 self._client = None 

376 

377 async def request( 

378 self, 

379 method: str, 

380 url: str, 

381 **kwargs: Unpack[HttpRequestKwargs], 

382 ) -> Result[httpx.Response, Exception]: 

383 """Send an HTTP request with safety features. 

384 

385 Args: 

386 method: HTTP method. 

387 url: Target URL. 

388 **kwargs: Passed to the underlying client. 

389 

390 Returns: 

391 ``Ok(Response)`` on success, ``Err`` on failure. 

392 

393 """ 

394 if self._client is None: 

395 return Err(RuntimeError("Client not initialised. Use 'async with'.")) 

396 

397 # SSRF check 

398 ssrf_check = _check_ssrf(url, self._ssrf_protection) 

399 if isinstance(ssrf_check, Err): 

400 return ssrf_check 

401 

402 # Circuit breaker gate 

403 if self._circuit_breaker is not None: 

404 cb_err = _check_circuit_breaker(self._circuit_breaker) 

405 if cb_err is not None: 

406 return Err(cb_err) 

407 

408 async def _do_request() -> httpx.Response: 

409 # We explicitly verified client is not None above 

410 client: httpx.AsyncClient = cast(httpx.AsyncClient, self._client) 

411 request_func = cast( 

412 Callable[..., Awaitable[httpx.Response]], client.request 

413 ) 

414 response = await request_func(method, url, **kwargs) 

415 return response 

416 

417 return await _execute_with_retries( 

418 _do_request, 

419 self._retry_config, 

420 self._circuit_breaker, 

421 self._retryable_status_codes, 

422 ) 

423 

424 async def get( 

425 self, url: str, **kw: Unpack[HttpRequestKwargs] 

426 ) -> Result[httpx.Response, Exception]: 

427 """Send a GET request.""" 

428 return await self.request("GET", url, **kw) 

429 

430 async def post( 

431 self, url: str, **kw: Unpack[HttpRequestKwargs] 

432 ) -> Result[httpx.Response, Exception]: 

433 """Send a POST request.""" 

434 return await self.request("POST", url, **kw) 

435 

436 async def put( 

437 self, url: str, **kw: Unpack[HttpRequestKwargs] 

438 ) -> Result[httpx.Response, Exception]: 

439 """Send a PUT request.""" 

440 return await self.request("PUT", url, **kw) 

441 

442 async def delete( 

443 self, url: str, **kw: Unpack[HttpRequestKwargs] 

444 ) -> Result[httpx.Response, Exception]: 

445 """Send a DELETE request.""" 

446 return await self.request("DELETE", url, **kw) 

447 

448 async def patch( 

449 self, url: str, **kw: Unpack[HttpRequestKwargs] 

450 ) -> Result[httpx.Response, Exception]: 

451 """Send a PATCH request.""" 

452 return await self.request("PATCH", url, **kw)