Coverage for src / taipanstack / bridges / db_bridge.py: 100%
99 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 21:18 +0000
1"""
2DB Bridge — resilient database wrappers.
4Wraps SQLAlchemy async engine and Redis async client with
5TaipanStack's circuit breaker and retry patterns.
6"""
8from __future__ import annotations
10import asyncio
11import logging
12from typing import TYPE_CHECKING
14from taipanstack.core.result import Err, Ok, Result
15from taipanstack.resilience.circuit_breaker import (
16 CircuitBreaker,
17 CircuitBreakerError,
18 CircuitState,
19)
20from taipanstack.resilience.retry import RetryConfig, calculate_delay
22logger = logging.getLogger("taipanstack.bridges.db")
24# --- optional imports ------------------------------------------------------
26try:
27 import sqlalchemy # noqa: F401
28 from sqlalchemy import text as sa_text
29 from sqlalchemy.ext.asyncio import AsyncSession
31 _HAS_SQLALCHEMY = True
32except ImportError:
33 _HAS_SQLALCHEMY = False
35try:
36 import redis.asyncio as aioredis
38 _HAS_REDIS = True
39except ImportError:
40 _HAS_REDIS = False
42if TYPE_CHECKING:
43 import redis.asyncio as aioredis
44 from sqlalchemy.ext.asyncio import AsyncEngine
47def _breaker_is_open(cb: CircuitBreaker) -> CircuitBreakerError | None:
48 """Return a ``CircuitBreakerError`` if the breaker is OPEN.
50 Args:
51 cb: The circuit breaker to check.
53 Returns:
54 Error if open, ``None`` otherwise.
56 """
57 if cb.state == CircuitState.OPEN:
58 return CircuitBreakerError(
59 f"Circuit breaker '{cb.name}' is OPEN",
60 state=cb.state,
61 )
62 return None
65class ResilientDatabase:
66 """Wraps a SQLAlchemy async engine with resilience patterns.
68 Args:
69 engine: SQLAlchemy ``AsyncEngine`` instance.
70 circuit_breaker: Optional circuit breaker.
71 retry_config: Optional retry configuration.
73 Example:
74 >>> db = ResilientDatabase(engine, circuit_breaker=breaker)
75 >>> result = await db.execute(text("SELECT 1"))
77 """
79 def __init__(
80 self,
81 engine: AsyncEngine,
82 *,
83 circuit_breaker: CircuitBreaker | None = None,
84 retry_config: RetryConfig | None = None,
85 ) -> None:
86 """Initialize the resilient database wrapper.
88 Args:
89 engine: SQLAlchemy AsyncEngine.
90 circuit_breaker: Optional circuit breaker.
91 retry_config: Optional retry config.
93 """
94 self._engine = engine
95 self._circuit_breaker = circuit_breaker
96 self._retry_config = retry_config
98 async def _handle_attempt_failure(
99 self,
100 exc: Exception,
101 attempt: int,
102 max_attempts: int,
103 ) -> bool:
104 """Handle a failed execution attempt.
106 Args:
107 exc: The exception that occurred.
108 attempt: The current attempt number.
109 max_attempts: Maximum number of attempts allowed.
111 Returns:
112 True if the operation should be retried, False otherwise.
114 """
115 logger.warning(
116 "DB execute attempt %d failed: %s",
117 attempt,
118 exc,
119 )
120 if self._circuit_breaker is not None:
121 self._circuit_breaker._record_failure(exc)
122 if self._retry_config is not None and attempt < max_attempts:
123 delay = calculate_delay(attempt, self._retry_config)
124 await asyncio.sleep(min(delay, 3600.0))
125 return True
126 return False
128 async def _execute_loop(
129 self,
130 statement: object,
131 max_attempts: int,
132 **kwargs: object,
133 ) -> Result[object, Exception]:
134 """Execute the retry loop for database operations."""
135 last_error: Exception | None = None
137 for attempt in range(1, max_attempts + 1):
138 try:
139 async with AsyncSession(self._engine) as session:
140 result = await session.execute(statement, **kwargs)
141 return Ok(result)
142 except Exception as exc:
143 last_error = exc
144 if await self._handle_attempt_failure(exc, attempt, max_attempts):
145 continue
146 break
148 return Err(last_error or RuntimeError("Database execute failed"))
150 async def execute(
151 self,
152 statement: object,
153 **kwargs: object,
154 ) -> Result[object, Exception]:
155 """Execute a SQL statement with resilience.
157 Args:
158 statement: SQLAlchemy statement to execute.
159 **kwargs: Passed to ``session.execute``.
161 Returns:
162 ``Ok(result)`` on success, ``Err`` on failure.
164 """
165 if not _HAS_SQLALCHEMY:
166 return Err(
167 ImportError(
168 "sqlalchemy is required for ResilientDatabase. "
169 "Install with: pip install taipanstack[bridges-db]"
170 )
171 )
173 # Circuit breaker gate
174 if self._circuit_breaker is not None:
175 cb_err = _breaker_is_open(self._circuit_breaker)
176 if cb_err is not None:
177 return Err(cb_err)
179 max_attempts = 1
180 if self._retry_config is not None:
181 max_attempts = self._retry_config.max_attempts
183 return await self._execute_loop(statement, max_attempts, **kwargs)
185 async def health_check(self) -> Result[bool, Exception]:
186 """Check database connectivity.
188 Executes ``SELECT 1`` to verify the connection is alive.
190 Returns:
191 ``Ok(True)`` if healthy, ``Err`` on failure.
193 """
194 if not _HAS_SQLALCHEMY:
195 return Err(ImportError("sqlalchemy is required for health check"))
197 try:
198 async with AsyncSession(self._engine) as session:
199 await session.execute(sa_text("SELECT 1"))
200 return Ok(True)
201 except Exception as exc:
202 return Err(exc)
205class ResilientRedis:
206 """Wraps a Redis async client with resilience patterns.
208 Args:
209 client: ``redis.asyncio.Redis`` instance.
210 circuit_breaker: Optional circuit breaker.
212 Example:
213 >>> r = ResilientRedis(redis_client, circuit_breaker=breaker)
214 >>> result = await r.execute("GET", "my_key")
216 """
218 def __init__(
219 self,
220 client: aioredis.Redis[bytes | str],
221 *,
222 circuit_breaker: CircuitBreaker | None = None,
223 ) -> None:
224 """Initialize the resilient Redis wrapper.
226 Args:
227 client: Redis async client.
228 circuit_breaker: Optional circuit breaker.
230 """
231 self._client = client
232 self._circuit_breaker = circuit_breaker
234 async def execute(
235 self,
236 command: str,
237 *args: object,
238 ) -> Result[object, Exception]:
239 """Execute a Redis command with resilience.
241 Args:
242 command: Redis command name (e.g. ``"GET"``, ``"SET"``).
243 *args: Command arguments.
245 Returns:
246 ``Ok(result)`` on success, ``Err`` on failure.
248 """
249 if not _HAS_REDIS:
250 return Err(
251 ImportError(
252 "redis is required for ResilientRedis. "
253 "Install with: pip install taipanstack[bridges-db]"
254 )
255 )
257 # Circuit breaker gate
258 if self._circuit_breaker is not None:
259 cb_err = _breaker_is_open(self._circuit_breaker)
260 if cb_err is not None:
261 return Err(cb_err)
263 try:
264 fn = getattr(self._client, command.lower())
265 result = await fn(*args)
266 return Ok(result)
267 except Exception as exc:
268 logger.warning("Redis command '%s' failed: %s", command, exc)
269 if self._circuit_breaker is not None:
270 self._circuit_breaker._record_failure(exc)
271 return Err(exc)
273 async def health_check(self) -> Result[bool, Exception]:
274 """Check Redis connectivity via PING.
276 Returns:
277 ``Ok(True)`` if healthy, ``Err`` on failure.
279 """
280 if not _HAS_REDIS:
281 return Err(ImportError("redis is required for health check"))
283 try:
284 pong = await self._client.ping()
285 return Ok(bool(pong))
286 except Exception as exc:
287 return Err(exc)