Coverage for src / taipanstack / bridges / http_bridge.py: 100%
150 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2HTTP Bridge — safe httpx client with SSRF protection and resilience.
4Wraps ``httpx.AsyncClient`` with TaipanStack's ``guard_ssrf``,
5retry, and circuit breaker integrations. All outbound URLs are
6validated against SSRF before the request is sent.
7"""
9from __future__ import annotations
11import asyncio
12import logging
13from collections.abc import Awaitable, Callable, Mapping, Sequence
14from typing import TYPE_CHECKING, cast
16from typing_extensions import TypedDict, Unpack
18from taipanstack.core.result import Err, Ok, Result
19from taipanstack.resilience.circuit_breaker import (
20 CircuitBreaker,
21 CircuitBreakerError,
22)
23from taipanstack.resilience.retry import RetryConfig, calculate_delay
24from taipanstack.security.guards import guard_ssrf
26logger = logging.getLogger("taipanstack.bridges.http")
29class HttpRequestKwargs(TypedDict, total=False):
30 """Type definitions for HTTP request kwargs."""
32 content: bytes | str | None
33 data: (
34 dict[str, str | int | float | bool | None]
35 | list[tuple[str, str]]
36 | bytes
37 | str
38 | None
39 )
40 files: dict[str, bytes | tuple[str, bytes]]
41 json: dict[str, object] | list[object] | str | int | float | bool | None
42 params: (
43 dict[
44 str,
45 str | int | float | bool | None | Sequence[str | int | float | bool | None],
46 ]
47 | list[tuple[str, str | int | float | bool | None]]
48 | str
49 | bytes
50 | None
51 )
52 headers: dict[str, str]
53 cookies: dict[str, str]
54 auth: tuple[str, str]
55 follow_redirects: bool
56 extensions: dict[str, object]
59class HttpClientKwargs(TypedDict, total=False):
60 """Type definitions for HTTP client kwargs."""
62 base_url: str
63 headers: dict[str, str]
64 cookies: dict[str, str]
65 verify: bool | str
66 cert: str | tuple[str, str] | tuple[str, str, str]
67 http1: bool
68 http2: bool
69 proxy: str
70 mounts: Mapping[str, httpx.AsyncBaseTransport | None]
71 follow_redirects: bool
72 max_redirects: int
73 event_hooks: dict[str, list[Callable[..., object]]]
74 trust_env: bool
75 default_encoding: str | Callable[[bytes], str]
78# --- optional httpx import ------------------------------------------------
80try:
81 import httpx
83 _HAS_HTTPX = True
84except ImportError:
85 _HAS_HTTPX = False
87if TYPE_CHECKING:
88 import httpx
90# Default status codes that trigger a retry
91_RETRYABLE_STATUS_CODES: frozenset[int] = frozenset({500, 502, 503, 504, 429})
94def _check_circuit_breaker(
95 circuit_breaker: CircuitBreaker,
96) -> CircuitBreakerError | None:
97 """Check circuit breaker state and return error if OPEN.
99 Args:
100 circuit_breaker: The circuit breaker to check.
102 Returns:
103 ``CircuitBreakerError`` if open, ``None`` otherwise.
105 """
106 if not circuit_breaker._should_attempt():
107 return CircuitBreakerError(
108 f"Circuit '{circuit_breaker.name}' is open",
109 state=circuit_breaker.state,
110 )
111 return None
114def _check_ssrf(url: str, ssrf_protection: bool) -> Result[None, Exception]:
115 """Validate URL against SSRF if protection is enabled.
117 Args:
118 url: The URL to validate.
119 ssrf_protection: Whether SSRF protection is enabled.
121 Returns:
122 ``Ok(None)`` if valid or protection disabled, ``Err(Exception)`` otherwise.
124 """
125 if not ssrf_protection:
126 return Ok(None)
128 ssrf_result = guard_ssrf(url)
129 match ssrf_result:
130 case Err(security_err):
131 return Err(security_err)
132 case Ok(): # pragma: no branch
133 return Ok(None)
136def _should_retry_status(
137 response: httpx.Response,
138 retry_config: RetryConfig | None,
139 retryable_status_codes: frozenset[int],
140 attempt: int,
141 max_attempts: int,
142) -> bool:
143 """Determine if a request should be retried based on status code."""
144 return (
145 retry_config is not None
146 and response.status_code in retryable_status_codes
147 and attempt < max_attempts
148 )
151async def _handle_http_exception(
152 exc: Exception,
153 attempt: int,
154 max_attempts: int,
155 retry_config: RetryConfig | None,
156 circuit_breaker: CircuitBreaker | None,
157) -> bool:
158 """Handle exception and return True if we should retry, False otherwise."""
159 if circuit_breaker is not None: # pragma: no branch
160 circuit_breaker._record_failure(exc)
161 if retry_config is not None and attempt < max_attempts:
162 delay = calculate_delay(attempt, retry_config)
163 await asyncio.sleep(min(delay, 3600.0))
164 return True
165 return False
168async def _execute_single_attempt(
169 request_func: Callable[[], Awaitable[httpx.Response]],
170 retry_config: RetryConfig | None,
171 circuit_breaker: CircuitBreaker | None,
172 retryable_status_codes: frozenset[int],
173 attempt: int,
174 max_attempts: int,
175) -> Result[httpx.Response, Exception] | Exception | bool:
176 """Execute a single HTTP attempt.
178 Returns Result on success/final failure, Exception if it failed and we
179 should record it, or True if we should just retry because of status code.
180 """
181 try:
182 response = await request_func()
183 if _should_retry_status(
184 response,
185 retry_config,
186 retryable_status_codes,
187 attempt,
188 max_attempts,
189 ):
190 # Config is not None if we reach here
191 delay = calculate_delay(attempt, cast(RetryConfig, retry_config))
192 await asyncio.sleep(min(delay, 3600.0))
193 return True
194 return Ok(response)
195 except Exception as exc:
196 should_retry = await _handle_http_exception(
197 exc, attempt, max_attempts, retry_config, circuit_breaker
198 )
199 if should_retry:
200 return exc
201 return Err(exc)
204async def _execute_with_retries(
205 request_func: Callable[[], Awaitable[httpx.Response]],
206 retry_config: RetryConfig | None,
207 circuit_breaker: CircuitBreaker | None,
208 retryable_status_codes: frozenset[int],
209) -> Result[httpx.Response, Exception]:
210 """Execute a request function with optional retries and circuit breaker.
212 Args:
213 request_func: Async callable that performs the request.
214 retry_config: Optional retry configuration.
215 circuit_breaker: Optional circuit breaker.
216 retryable_status_codes: Status codes to retry on.
218 Returns:
219 ``Ok(Response)`` on success, ``Err`` on failure.
221 """
222 max_attempts = retry_config.max_attempts if retry_config is not None else 1
223 last_error: Exception | None = None
225 for attempt in range(1, max_attempts + 1):
226 outcome = await _execute_single_attempt(
227 request_func,
228 retry_config,
229 circuit_breaker,
230 retryable_status_codes,
231 attempt,
232 max_attempts,
233 )
234 if isinstance(outcome, (Ok, Err)):
235 return outcome
236 if isinstance(outcome, Exception):
237 last_error = outcome
239 return Err(last_error or RuntimeError("Request failed"))
242async def safe_request(
243 method: str,
244 url: str,
245 *,
246 ssrf_protection: bool = True,
247 retry_config: RetryConfig | None = None,
248 circuit_breaker: CircuitBreaker | None = None,
249 retryable_status_codes: frozenset[int] = _RETRYABLE_STATUS_CODES,
250 timeout: float | None = 10.0,
251 **kwargs: Unpack[HttpRequestKwargs],
252) -> Result[httpx.Response, Exception]:
253 """Perform a one-shot HTTP request with safety features.
255 Args:
256 method: HTTP method (GET, POST, etc.).
257 url: Target URL.
258 ssrf_protection: Validate URL against SSRF.
259 retry_config: Optional retry configuration.
260 circuit_breaker: Optional circuit breaker.
261 retryable_status_codes: Status codes that trigger retries.
262 timeout: Explicit timeout in seconds (default: 10.0).
263 **kwargs: Passed to ``httpx.AsyncClient.request``.
265 Returns:
266 ``Ok(Response)`` on success, ``Err`` on failure.
268 """
269 if not _HAS_HTTPX:
270 return Err(
271 ImportError(
272 "httpx is required for HTTP bridge. "
273 "Install with: pip install taipanstack[bridges-http]"
274 )
275 )
277 # SSRF check
278 ssrf_check = _check_ssrf(url, ssrf_protection)
279 if isinstance(ssrf_check, Err):
280 return ssrf_check
282 # Circuit breaker gate
283 if circuit_breaker is not None:
284 cb_err = _check_circuit_breaker(circuit_breaker)
285 if cb_err is not None:
286 return Err(cb_err)
288 async def _do_request() -> httpx.Response:
289 async with httpx.AsyncClient(timeout=timeout) as client: # nosemgrep
290 request_func = cast(
291 Callable[..., Awaitable[httpx.Response]], client.request
292 )
293 response = await request_func(method, url, **kwargs)
294 return response
296 return await _execute_with_retries(
297 _do_request,
298 retry_config,
299 circuit_breaker,
300 retryable_status_codes,
301 )
304class SafeHttpClient:
305 """Async context manager wrapping httpx with TaipanStack safety.
307 Args:
308 ssrf_protection: Enable SSRF validation on all requests.
309 retry_config: Retry configuration for transient failures.
310 circuit_breaker: Optional circuit breaker for all requests.
311 retryable_status_codes: HTTP status codes to retry on.
312 **client_kwargs: Passed to ``httpx.AsyncClient``.
314 Example:
315 >>> async with SafeHttpClient() as client:
316 ... result = await client.get("https://api.example.com/data")
317 ... match result:
318 ... case Ok(response): print(response.json())
319 ... case Err(e): print(f"Error: {e}")
321 """
323 def __init__(
324 self,
325 *,
326 ssrf_protection: bool = True,
327 retry_config: RetryConfig | None = None,
328 circuit_breaker: CircuitBreaker | None = None,
329 retryable_status_codes: frozenset[int] = _RETRYABLE_STATUS_CODES,
330 timeout: float = 10.0,
331 **client_kwargs: Unpack[HttpClientKwargs],
332 ) -> None:
333 """Initialize the safe HTTP client.
335 Args:
336 ssrf_protection: Enable SSRF validation.
337 retry_config: Retry configuration.
338 circuit_breaker: Circuit breaker instance.
339 retryable_status_codes: Status codes to retry.
340 **client_kwargs: Keyword args for httpx.AsyncClient.
341 Default timeout is 10.0 seconds if not provided.
343 """
344 self._ssrf_protection = ssrf_protection
345 self._retry_config = retry_config
346 self._circuit_breaker = circuit_breaker
347 self._retryable_status_codes = retryable_status_codes
348 self._client_kwargs = client_kwargs
349 self._timeout = timeout
350 self._client: httpx.AsyncClient | None = None
352 async def __aenter__(self) -> SafeHttpClient:
353 """Enter the async context manager."""
354 if not _HAS_HTTPX:
355 msg = (
356 "httpx is required for SafeHttpClient. "
357 "Install with: pip install taipanstack[bridges-http]"
358 )
359 raise ImportError(msg)
360 self._client = httpx.AsyncClient(
361 timeout=self._timeout,
362 **self._client_kwargs,
363 ) # nosemgrep
364 return self
366 async def __aexit__(
367 self,
368 _exc_type: type[BaseException] | None,
369 _exc_val: BaseException | None,
370 _exc_tb: object,
371 ) -> None:
372 """Exit the async context manager."""
373 if self._client is not None:
374 await self._client.aclose()
375 self._client = None
377 async def request(
378 self,
379 method: str,
380 url: str,
381 **kwargs: Unpack[HttpRequestKwargs],
382 ) -> Result[httpx.Response, Exception]:
383 """Send an HTTP request with safety features.
385 Args:
386 method: HTTP method.
387 url: Target URL.
388 **kwargs: Passed to the underlying client.
390 Returns:
391 ``Ok(Response)`` on success, ``Err`` on failure.
393 """
394 if self._client is None:
395 return Err(RuntimeError("Client not initialised. Use 'async with'."))
397 # SSRF check
398 ssrf_check = _check_ssrf(url, self._ssrf_protection)
399 if isinstance(ssrf_check, Err):
400 return ssrf_check
402 # Circuit breaker gate
403 if self._circuit_breaker is not None:
404 cb_err = _check_circuit_breaker(self._circuit_breaker)
405 if cb_err is not None:
406 return Err(cb_err)
408 async def _do_request() -> httpx.Response:
409 # We explicitly verified client is not None above
410 client: httpx.AsyncClient = cast(httpx.AsyncClient, self._client)
411 request_func = cast(
412 Callable[..., Awaitable[httpx.Response]], client.request
413 )
414 response = await request_func(method, url, **kwargs)
415 return response
417 return await _execute_with_retries(
418 _do_request,
419 self._retry_config,
420 self._circuit_breaker,
421 self._retryable_status_codes,
422 )
424 async def get(
425 self, url: str, **kw: Unpack[HttpRequestKwargs]
426 ) -> Result[httpx.Response, Exception]:
427 """Send a GET request."""
428 return await self.request("GET", url, **kw)
430 async def post(
431 self, url: str, **kw: Unpack[HttpRequestKwargs]
432 ) -> Result[httpx.Response, Exception]:
433 """Send a POST request."""
434 return await self.request("POST", url, **kw)
436 async def put(
437 self, url: str, **kw: Unpack[HttpRequestKwargs]
438 ) -> Result[httpx.Response, Exception]:
439 """Send a PUT request."""
440 return await self.request("PUT", url, **kw)
442 async def delete(
443 self, url: str, **kw: Unpack[HttpRequestKwargs]
444 ) -> Result[httpx.Response, Exception]:
445 """Send a DELETE request."""
446 return await self.request("DELETE", url, **kw)
448 async def patch(
449 self, url: str, **kw: Unpack[HttpRequestKwargs]
450 ) -> Result[httpx.Response, Exception]:
451 """Send a PATCH request."""
452 return await self.request("PATCH", url, **kw)