Coverage for src / taipanstack / bridges / db_bridge.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2DB Bridge — resilient database wrappers. 

3 

4Wraps SQLAlchemy async engine and Redis async client with 

5TaipanStack's circuit breaker and retry patterns. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import logging 

12from typing import TYPE_CHECKING 

13 

14from taipanstack.core.result import Err, Ok, Result 

15from taipanstack.resilience.circuit_breaker import ( 

16 CircuitBreaker, 

17 CircuitBreakerError, 

18 CircuitState, 

19) 

20from taipanstack.resilience.retry import RetryConfig, calculate_delay 

21 

22logger = logging.getLogger("taipanstack.bridges.db") 

23 

24# --- optional imports ------------------------------------------------------ 

25 

26try: 

27 import sqlalchemy # noqa: F401 

28 from sqlalchemy import text as sa_text 

29 from sqlalchemy.ext.asyncio import AsyncSession 

30 

31 _HAS_SQLALCHEMY = True 

32except ImportError: 

33 _HAS_SQLALCHEMY = False 

34 

35try: 

36 import redis.asyncio as aioredis 

37 

38 _HAS_REDIS = True 

39except ImportError: 

40 _HAS_REDIS = False 

41 

42if TYPE_CHECKING: 

43 import redis.asyncio as aioredis 

44 from sqlalchemy.ext.asyncio import AsyncEngine 

45 

46 

47def _breaker_is_open(cb: CircuitBreaker) -> CircuitBreakerError | None: 

48 """Return a ``CircuitBreakerError`` if the breaker is OPEN. 

49 

50 Args: 

51 cb: The circuit breaker to check. 

52 

53 Returns: 

54 Error if open, ``None`` otherwise. 

55 

56 """ 

57 if cb.state == CircuitState.OPEN: 

58 return CircuitBreakerError( 

59 f"Circuit breaker '{cb.name}' is OPEN", 

60 state=cb.state, 

61 ) 

62 return None 

63 

64 

65class ResilientDatabase: 

66 """Wraps a SQLAlchemy async engine with resilience patterns. 

67 

68 Args: 

69 engine: SQLAlchemy ``AsyncEngine`` instance. 

70 circuit_breaker: Optional circuit breaker. 

71 retry_config: Optional retry configuration. 

72 

73 Example: 

74 >>> db = ResilientDatabase(engine, circuit_breaker=breaker) 

75 >>> result = await db.execute(text("SELECT 1")) 

76 

77 """ 

78 

79 def __init__( 

80 self, 

81 engine: AsyncEngine, 

82 *, 

83 circuit_breaker: CircuitBreaker | None = None, 

84 retry_config: RetryConfig | None = None, 

85 ) -> None: 

86 """Initialize the resilient database wrapper. 

87 

88 Args: 

89 engine: SQLAlchemy AsyncEngine. 

90 circuit_breaker: Optional circuit breaker. 

91 retry_config: Optional retry config. 

92 

93 """ 

94 self._engine = engine 

95 self._circuit_breaker = circuit_breaker 

96 self._retry_config = retry_config 

97 

98 async def _handle_attempt_failure( 

99 self, 

100 exc: Exception, 

101 attempt: int, 

102 max_attempts: int, 

103 ) -> bool: 

104 """Handle a failed execution attempt. 

105 

106 Args: 

107 exc: The exception that occurred. 

108 attempt: The current attempt number. 

109 max_attempts: Maximum number of attempts allowed. 

110 

111 Returns: 

112 True if the operation should be retried, False otherwise. 

113 

114 """ 

115 logger.warning( 

116 "DB execute attempt %d failed: %s", 

117 attempt, 

118 exc, 

119 ) 

120 if self._circuit_breaker is not None: 

121 self._circuit_breaker._record_failure(exc) 

122 if self._retry_config is not None and attempt < max_attempts: 

123 delay = calculate_delay(attempt, self._retry_config) 

124 await asyncio.sleep(min(delay, 3600.0)) 

125 return True 

126 return False 

127 

128 async def _execute_loop( 

129 self, 

130 statement: object, 

131 max_attempts: int, 

132 **kwargs: object, 

133 ) -> Result[object, Exception]: 

134 """Execute the retry loop for database operations.""" 

135 last_error: Exception | None = None 

136 

137 for attempt in range(1, max_attempts + 1): 

138 try: 

139 async with AsyncSession(self._engine) as session: 

140 result = await session.execute(statement, **kwargs) 

141 return Ok(result) 

142 except Exception as exc: 

143 last_error = exc 

144 if await self._handle_attempt_failure(exc, attempt, max_attempts): 

145 continue 

146 break 

147 

148 return Err(last_error or RuntimeError("Database execute failed")) 

149 

150 async def execute( 

151 self, 

152 statement: object, 

153 **kwargs: object, 

154 ) -> Result[object, Exception]: 

155 """Execute a SQL statement with resilience. 

156 

157 Args: 

158 statement: SQLAlchemy statement to execute. 

159 **kwargs: Passed to ``session.execute``. 

160 

161 Returns: 

162 ``Ok(result)`` on success, ``Err`` on failure. 

163 

164 """ 

165 if not _HAS_SQLALCHEMY: 

166 return Err( 

167 ImportError( 

168 "sqlalchemy is required for ResilientDatabase. " 

169 "Install with: pip install taipanstack[bridges-db]" 

170 ) 

171 ) 

172 

173 # Circuit breaker gate 

174 if self._circuit_breaker is not None: 

175 cb_err = _breaker_is_open(self._circuit_breaker) 

176 if cb_err is not None: 

177 return Err(cb_err) 

178 

179 max_attempts = 1 

180 if self._retry_config is not None: 

181 max_attempts = self._retry_config.max_attempts 

182 

183 return await self._execute_loop(statement, max_attempts, **kwargs) 

184 

185 async def health_check(self) -> Result[bool, Exception]: 

186 """Check database connectivity. 

187 

188 Executes ``SELECT 1`` to verify the connection is alive. 

189 

190 Returns: 

191 ``Ok(True)`` if healthy, ``Err`` on failure. 

192 

193 """ 

194 if not _HAS_SQLALCHEMY: 

195 return Err(ImportError("sqlalchemy is required for health check")) 

196 

197 try: 

198 async with AsyncSession(self._engine) as session: 

199 await session.execute(sa_text("SELECT 1")) 

200 return Ok(True) 

201 except Exception as exc: 

202 return Err(exc) 

203 

204 

205class ResilientRedis: 

206 """Wraps a Redis async client with resilience patterns. 

207 

208 Args: 

209 client: ``redis.asyncio.Redis`` instance. 

210 circuit_breaker: Optional circuit breaker. 

211 

212 Example: 

213 >>> r = ResilientRedis(redis_client, circuit_breaker=breaker) 

214 >>> result = await r.execute("GET", "my_key") 

215 

216 """ 

217 

218 def __init__( 

219 self, 

220 client: aioredis.Redis[bytes | str], 

221 *, 

222 circuit_breaker: CircuitBreaker | None = None, 

223 ) -> None: 

224 """Initialize the resilient Redis wrapper. 

225 

226 Args: 

227 client: Redis async client. 

228 circuit_breaker: Optional circuit breaker. 

229 

230 """ 

231 self._client = client 

232 self._circuit_breaker = circuit_breaker 

233 

234 async def execute( 

235 self, 

236 command: str, 

237 *args: object, 

238 ) -> Result[object, Exception]: 

239 """Execute a Redis command with resilience. 

240 

241 Args: 

242 command: Redis command name (e.g. ``"GET"``, ``"SET"``). 

243 *args: Command arguments. 

244 

245 Returns: 

246 ``Ok(result)`` on success, ``Err`` on failure. 

247 

248 """ 

249 if not _HAS_REDIS: 

250 return Err( 

251 ImportError( 

252 "redis is required for ResilientRedis. " 

253 "Install with: pip install taipanstack[bridges-db]" 

254 ) 

255 ) 

256 

257 # Circuit breaker gate 

258 if self._circuit_breaker is not None: 

259 cb_err = _breaker_is_open(self._circuit_breaker) 

260 if cb_err is not None: 

261 return Err(cb_err) 

262 

263 try: 

264 fn = getattr(self._client, command.lower()) 

265 result = await fn(*args) 

266 return Ok(result) 

267 except Exception as exc: 

268 logger.warning("Redis command '%s' failed: %s", command, exc) 

269 if self._circuit_breaker is not None: 

270 self._circuit_breaker._record_failure(exc) 

271 return Err(exc) 

272 

273 async def health_check(self) -> Result[bool, Exception]: 

274 """Check Redis connectivity via PING. 

275 

276 Returns: 

277 ``Ok(True)`` if healthy, ``Err`` on failure. 

278 

279 """ 

280 if not _HAS_REDIS: 

281 return Err(ImportError("redis is required for health check")) 

282 

283 try: 

284 pong = await self._client.ping() 

285 return Ok(bool(pong)) 

286 except Exception as exc: 

287 return Err(exc)