Resilience¶
TaipanStack's resilience module provides components for ensuring system stability and self-healing under failure conditions.
taipanstack.resilience¶
Resilience module for TaipanStack.
Provides circuit breaker, retry, fallback, and timeout patterns for building resilient applications. This is the canonical home for all resilience-related utilities.
AdaptiveCircuitBreaker
¶
AdaptiveCircuitBreaker(
name: str = "default",
*,
window_size: int = 100,
min_throughput: int = 10,
target_error_rate: float = 0.5,
recovery_timeout: float = 30.0,
)
Circuit breaker that opens based on an error rate percentage.
Maintains a rolling window of call outcomes. The circuit trips to OPEN if:
1. The window_size history has at least min_throughput events.
2. The error rate (errors / total) > target_error_rate.
Once OPEN, it waits recovery_timeout seconds before transitioning
to HALF_OPEN. In HALF_OPEN, if a request succeeds, it CLOSES and
clears the window. If it fails, it returns to OPEN.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Identifier for logging.
TYPE:
|
window_size
|
Number of recent calls to track.
TYPE:
|
min_throughput
|
Minimum requests before considering error rate.
TYPE:
|
target_error_rate
|
Desired error rate boundary (0.0 - 1.0).
TYPE:
|
recovery_timeout
|
Seconds before attempting half-open recovery.
TYPE:
|
Initialize the adaptive circuit breaker.
state
property
¶
state: CircuitState
Current circuit state. May evaluate timeouts and switch to HALF_OPEN.
record_failure
¶
record_failure(_exc: Exception) -> None
Record a failed call.
| PARAMETER | DESCRIPTION |
|---|---|
_exc
|
The exception that occurred.
TYPE:
|
evaluate_result
¶
evaluate_result(
result: Result[T, Exception],
) -> Result[T, Exception]
Evaluate a Result and record success or failure.
| PARAMETER | DESCRIPTION |
|---|---|
result
|
A
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[T, Exception]
|
The original Result. |
should_allow
¶
should_allow() -> bool
Check if a call should be attempted.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
AdaptiveMetrics
dataclass
¶
AdaptiveMetrics(
success_rate: float,
error_rate: float,
total_calls: int,
error_count: int,
state: CircuitState,
)
Snapshot of adaptive circuit breaker metrics.
| ATTRIBUTE | DESCRIPTION |
|---|---|
success_rate |
Current success rate (0.0 - 1.0).
TYPE:
|
error_rate |
Current error rate (0.0 - 1.0).
TYPE:
|
total_calls |
Total calls in the window.
TYPE:
|
error_count |
Errors in the window.
TYPE:
|
state |
Current circuit state.
TYPE:
|
AdaptiveRetry
¶
AdaptiveRetry(
*,
min_delay: float = 0.1,
max_delay: float = 60.0,
window_size: int = 50,
max_attempts: int = 3,
)
Retry strategy that learns optimal delays from outcomes.
Maintains per-attempt-level statistics and returns the delay that historically led to successful retries at that attempt level.
| PARAMETER | DESCRIPTION |
|---|---|
min_delay
|
Minimum delay in seconds.
TYPE:
|
max_delay
|
Maximum delay in seconds.
TYPE:
|
window_size
|
Number of recent outcomes to track.
TYPE:
|
max_attempts
|
Default max attempts for
TYPE:
|
Example
ar = AdaptiveRetry(min_delay=0.1, max_delay=30.0) ar.record_outcome(attempt=1, success=True, elapsed=0.5) delay = ar.get_delay(attempt=1)
Initialize the adaptive retry.
| PARAMETER | DESCRIPTION |
|---|---|
min_delay
|
Minimum delay.
TYPE:
|
max_delay
|
Maximum delay.
TYPE:
|
window_size
|
Rolling window size.
TYPE:
|
max_attempts
|
Default max attempts.
TYPE:
|
record_outcome
¶
record_outcome(
attempt: int, success: bool, elapsed: float
) -> None
Record a retry outcome.
| PARAMETER | DESCRIPTION |
|---|---|
attempt
|
Attempt number (1-indexed).
TYPE:
|
success
|
Whether the attempt succeeded.
TYPE:
|
elapsed
|
Time elapsed before this attempt was made.
TYPE:
|
get_delay
¶
get_delay(attempt: int) -> float
Get the learned optimal delay for this attempt level.
If there is historical data for this attempt level, returns the median of successful delays. Otherwise uses exponential backoff with the configured bounds.
| PARAMETER | DESCRIPTION |
|---|---|
attempt
|
Attempt number (1-indexed).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
float
|
Delay in seconds. |
to_retry_config
¶
to_retry_config() -> RetryConfig
Export current state as a standard RetryConfig.
Uses the learned initial delay (attempt=1) if available.
| RETURNS | DESCRIPTION |
|---|---|
RetryConfig
|
A |
Bulkhead
¶
Bulkhead(
name: str = "default",
*,
max_concurrent: int = 10,
max_queue: int = 50,
timeout: float = 30.0,
)
Concurrency limiter using asyncio.Semaphore.
Limits the number of concurrent executions of a callable.
Excess callers are queued up to max_queue; beyond that
a BulkheadFullError is returned.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Identifier for logging.
TYPE:
|
max_concurrent
|
Maximum concurrent executions.
TYPE:
|
max_queue
|
Maximum queued callers beyond concurrent limit.
TYPE:
|
timeout
|
Seconds to wait for a permit before timing out.
TYPE:
|
Example
bulk = Bulkhead("db", max_concurrent=5, max_queue=10) result = await bulk.execute(fetch_data, user_id)
Initialize the bulkhead.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Bulkhead name.
TYPE:
|
max_concurrent
|
Concurrency limit.
TYPE:
|
max_queue
|
Queue limit.
TYPE:
|
timeout
|
Permit acquisition timeout.
TYPE:
|
execute
async
¶
execute(
fn: Callable[P, Awaitable[T]],
*args: args,
**kwargs: kwargs,
) -> Result[T, Exception]
Execute a callable within bulkhead limits.
| PARAMETER | DESCRIPTION |
|---|---|
fn
|
Async callable to execute.
TYPE:
|
*args
|
Positional arguments for fn.
TYPE:
|
**kwargs
|
Keyword arguments for fn.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[T, Exception]
|
|
BulkheadFullError
¶
BulkheadFullError(
name: str, max_concurrent: int, max_queue: int
)
Bases: Exception
Raised when the bulkhead queue is at capacity.
Initialize BulkheadFullError.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Bulkhead name.
TYPE:
|
max_concurrent
|
Concurrency limit.
TYPE:
|
max_queue
|
Queue limit.
TYPE:
|
ResilienceOrchestrator
¶
ResilienceOrchestrator(name: str = 'default')
Bases: Generic[T]
Compose resilience patterns into a single pipeline.
Provides a fluent builder API to add patterns in order. Execution proceeds through each configured layer.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Pipeline name for logging.
TYPE:
|
Example
orch = ( ... ResilienceOrchestrator("api") ... .with_bulkhead(max_concurrent=5) ... .with_circuit_breaker(breaker) ... .with_retry(RetryConfig(max_attempts=3)) ... .with_timeout(10.0) ... .with_fallback({"status": "cached"}) ... ) result = await orch.execute(call_api, endpoint)
Initialize the orchestrator.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Pipeline name.
TYPE:
|
with_bulkhead
¶
with_bulkhead(
max_concurrent: int = 10,
max_queue: int = 50,
timeout: float = 30.0,
) -> ResilienceOrchestrator[T]
Add a bulkhead concurrency limiter.
| PARAMETER | DESCRIPTION |
|---|---|
max_concurrent
|
Max concurrent executions.
TYPE:
|
max_queue
|
Max queued callers.
TYPE:
|
timeout
|
Permit acquisition timeout.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_circuit_breaker
¶
with_circuit_breaker(
breaker: CircuitBreaker | AdaptiveCircuitBreaker,
) -> ResilienceOrchestrator[T]
Add a circuit breaker.
| PARAMETER | DESCRIPTION |
|---|---|
breaker
|
Standard or adaptive circuit breaker.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_retry
¶
with_retry(
config: RetryConfig | AdaptiveRetry,
) -> ResilienceOrchestrator[T]
Add retry logic.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Standard retry config or adaptive retry.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_timeout
¶
with_timeout(seconds: float) -> ResilienceOrchestrator[T]
Add a timeout.
| PARAMETER | DESCRIPTION |
|---|---|
seconds
|
Maximum execution time.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_fallback
¶
with_fallback(value: T) -> ResilienceOrchestrator[T]
Add a fallback value for failures.
| PARAMETER | DESCRIPTION |
|---|---|
value
|
Value to return on failure.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
execute
async
¶
execute(
fn: Callable[P, Awaitable[T]],
*args: args,
**kwargs: kwargs,
) -> Result[T, Exception]
Execute the function through the resilience pipeline.
Order: bulkhead → circuit breaker → retry → timeout → fn → fallback.
| PARAMETER | DESCRIPTION |
|---|---|
fn
|
Async callable to execute.
TYPE:
|
*args
|
Positional arguments.
TYPE:
|
**kwargs
|
Keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[T, Exception]
|
|
RetryMetrics
dataclass
¶
RetryMetrics(
success_rate: float,
avg_delay: float,
p95_delay: float,
total_outcomes: int,
)
Snapshot of adaptive retry metrics.
| ATTRIBUTE | DESCRIPTION |
|---|---|
success_rate |
Overall success rate (0.0 - 1.0).
TYPE:
|
avg_delay |
Average delay across all successful retries.
TYPE:
|
p95_delay |
95th percentile delay.
TYPE:
|
total_outcomes |
Total tracked outcomes.
TYPE:
|
CircuitBreaker
¶
CircuitBreaker(
*,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
name: str = "default",
on_state_change: Callable[
[CircuitState, CircuitState], None
]
| None = None,
)
Circuit breaker implementation.
Monitors function calls and opens the circuit when too many failures occur, preventing further calls until the service recovers. Supports both sync and async functions.
Example
breaker = CircuitBreaker(failure_threshold=3) @breaker ... def call_external_api(): ... return requests.get("https://api.example.com", timeout=10)
Initialize CircuitBreaker.
| PARAMETER | DESCRIPTION |
|---|---|
failure_threshold
|
Failures before opening circuit.
TYPE:
|
success_threshold
|
Successes to close from half-open.
TYPE:
|
timeout
|
Seconds before attempting half-open.
TYPE:
|
excluded_exceptions
|
Exceptions that don't trip circuit.
TYPE:
|
failure_exceptions
|
Exceptions that count as failures.
TYPE:
|
name
|
Name for logging/identification.
TYPE:
|
on_state_change
|
Optional callback invoked on state transitions with (old_state, new_state). Useful for custom monitoring.
TYPE:
|
__call__
¶
__call__(
func: Callable[P, R] | Callable[P, Awaitable[R]],
) -> Callable[P, R] | Callable[P, Awaitable[R]]
Decorate a sync or async function with circuit breaker protection.
CircuitBreakerConfig
dataclass
¶
CircuitBreakerConfig(
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
)
Configuration for circuit breaker behavior.
| ATTRIBUTE | DESCRIPTION |
|---|---|
failure_threshold |
Number of failures before opening circuit.
TYPE:
|
success_threshold |
Successes needed in half-open to close.
TYPE:
|
timeout |
Seconds before trying half-open after open.
TYPE:
|
excluded_exceptions |
Exceptions that don't count as failures.
TYPE:
|
failure_exceptions |
Exceptions that count as failures.
TYPE:
|
CircuitBreakerError
¶
CircuitBreakerError(message: str, state: CircuitState)
Bases: Exception
Raised when circuit breaker is open.
Initialize CircuitBreakerError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Error description.
TYPE:
|
state
|
Current circuit state.
TYPE:
|
CircuitBreakerState
dataclass
¶
CircuitBreakerState(
state: CircuitState = CLOSED,
failure_count: int = 0,
success_count: int = 0,
half_open_attempts: int = 0,
last_failure_time: float = 0.0,
lock: Lock = Lock(),
)
Internal state tracking for circuit breaker.
CircuitState
¶
Bases: Enum
States of the circuit breaker.
Retrier
¶
Retrier(
*,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
on: tuple[type[Exception], ...] = (Exception,),
)
Context manager for retry logic.
Provides a context manager interface for retry logic when decorators are not suitable.
Example
retrier = Retrier(max_attempts=3, on=(ConnectionError,)) with retrier: ... result = some_operation()
Initialize Retrier.
| PARAMETER | DESCRIPTION |
|---|---|
max_attempts
|
Maximum retry attempts.
TYPE:
|
initial_delay
|
Initial delay between retries.
TYPE:
|
max_delay
|
Maximum delay between retries.
TYPE:
|
on
|
Exception types to retry on.
TYPE:
|
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> bool
Exit the retry context.
Returns True to suppress the exception if we should retry, False to let it propagate.
RetryConfig
dataclass
¶
RetryConfig(
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
jitter_factor: float = 0.1,
log_retries: bool = True,
on_retry: Callable[[int, int, Exception, float], None]
| None = None,
)
Configuration for retry behavior.
| ATTRIBUTE | DESCRIPTION |
|---|---|
max_attempts |
Maximum number of retry attempts.
TYPE:
|
initial_delay |
Initial delay between retries in seconds.
TYPE:
|
max_delay |
Maximum delay between retries.
TYPE:
|
exponential_base |
Base for exponential backoff (2 = double each time).
TYPE:
|
jitter |
Whether to add random jitter to delays.
TYPE:
|
jitter_factor |
Maximum jitter as fraction of delay (0.1 = 10%).
TYPE:
|
log_retries |
Whether to emit standard log messages.
TYPE:
|
on_retry |
Optional callback invoked on each retry.
TYPE:
|
RetryError
¶
RetryError(
message: str,
attempts: int,
last_exception: Exception | None = None,
)
Bases: Exception
Raised when all retry attempts have failed.
Initialize RetryError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Description of the retry failure.
TYPE:
|
attempts
|
Number of attempts made.
TYPE:
|
last_exception
|
The last exception that was raised.
TYPE:
|
BaseWatcher
¶
BaseWatcher(*, interval: float = 5.0)
Bases: ABC
Abstract base for background watchdog tasks.
Subclasses implement _run which is called repeatedly at
_interval seconds until stop is called.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between each poll cycle.
TYPE:
|
Example
class MyWatcher(BaseWatcher): ... async def _run(self) -> None: ... print("checking...") watcher = MyWatcher(interval=5.0) await watcher.start()
Initialize the base watcher.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between each poll cycle.
TYPE:
|
start
async
¶
start() -> Result[None, Exception]
Start the background watcher loop.
| RETURNS | DESCRIPTION |
|---|---|
Result[None, Exception]
|
|
ConfigWatcher
¶
ConfigWatcher(
*,
config_paths: Sequence[Path],
config_model: type[BaseModel],
interval: float = 2.0,
on_config_change: Callable[[BaseModel], None]
| None = None,
on_validation_error: Callable[[Exception], None]
| None = None,
)
Bases: BaseWatcher
Background watcher that detects configuration file changes.
Polls file hashes at each interval. When a change is detected
the content is validated via the provided Pydantic model and,
if valid, the on_config_change callback is invoked.
| PARAMETER | DESCRIPTION |
|---|---|
config_paths
|
Files to watch.
TYPE:
|
config_model
|
Pydantic model for validation.
TYPE:
|
interval
|
Seconds between polls.
TYPE:
|
on_config_change
|
Callback receiving the validated model.
TYPE:
|
on_validation_error
|
Callback receiving the
TYPE:
|
Example
watcher = ConfigWatcher( ... config_paths=[Path(".env")], ... config_model=MySettings, ... on_config_change=lambda cfg: apply(cfg), ... ) await watcher.start()
Initialize the config watcher.
| PARAMETER | DESCRIPTION |
|---|---|
config_paths
|
Files to watch.
TYPE:
|
config_model
|
Pydantic model for validation.
TYPE:
|
interval
|
Seconds between polls.
TYPE:
|
on_config_change
|
Callback for valid config changes.
TYPE:
|
on_validation_error
|
Callback for validation failures.
TYPE:
|
HealthPinger
¶
HealthPinger(
*,
targets: Sequence[HealthTarget],
interval: float = 10.0,
on_health_change: Callable[[str, bool], None]
| None = None,
)
Bases: BaseWatcher
Background watcher that pings external dependencies.
For each registered HealthTarget, calls its check
coroutine on every cycle. If a target is unhealthy and has
an associated CircuitBreaker, the breaker is opened
preventively.
| PARAMETER | DESCRIPTION |
|---|---|
targets
|
Dependencies to monitor.
TYPE:
|
interval
|
Seconds between ping cycles.
TYPE:
|
on_health_change
|
Optional callback
TYPE:
|
Example
async def db_ping() -> bool: ... return await pool.fetchval("SELECT 1") == 1 pinger = HealthPinger( ... targets=[HealthTarget("db", db_ping, breaker)], ... ) await pinger.start()
Initialize the health pinger.
| PARAMETER | DESCRIPTION |
|---|---|
targets
|
Dependencies to monitor.
TYPE:
|
interval
|
Seconds between ping cycles.
TYPE:
|
on_health_change
|
Optional callback on status change.
TYPE:
|
HealthTarget
dataclass
¶
HealthTarget(
name: str,
check: Callable[[], Awaitable[bool]],
circuit_breaker: CircuitBreaker | None = None,
)
A dependency to be monitored by :class:HealthPinger.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Human-readable name for logging.
TYPE:
|
check |
Async callable returning
TYPE:
|
circuit_breaker |
Optional circuit breaker to open on failure.
TYPE:
|
ResourceSnapshot
dataclass
¶
ResourceSnapshot(
cpu_percent: float,
memory_percent: float,
timestamp: float,
)
Point-in-time snapshot of system resource usage.
| ATTRIBUTE | DESCRIPTION |
|---|---|
cpu_percent |
Current CPU utilisation (0-100).
TYPE:
|
memory_percent |
Current memory utilisation (0-100).
TYPE:
|
timestamp |
Monotonic timestamp of the reading.
TYPE:
|
ResourceWatcher
¶
ResourceWatcher(
*,
interval: float = 5.0,
cpu_threshold: float = 85.0,
memory_threshold: float = 85.0,
on_threshold_breach: Callable[[str, float], None]
| None = None,
)
Bases: BaseWatcher
Background watcher that monitors CPU and memory.
When either metric exceeds its configured threshold the
on_threshold_breach callback is invoked with the resource
name ("cpu" or "memory") and the current value.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between checks.
TYPE:
|
cpu_threshold
|
CPU percentage that triggers a breach.
TYPE:
|
memory_threshold
|
Memory percentage that triggers a breach.
TYPE:
|
on_threshold_breach
|
Optional callback
TYPE:
|
Example
watcher = ResourceWatcher( ... cpu_threshold=80.0, ... on_threshold_breach=lambda r, v: print(f"{r} at {v}%"), ... ) await watcher.start()
Initialize the resource watcher.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between checks.
TYPE:
|
cpu_threshold
|
CPU percentage that triggers a breach.
TYPE:
|
memory_threshold
|
Memory percentage that triggers a breach.
TYPE:
|
on_threshold_breach
|
Optional breach callback.
TYPE:
|
start
async
¶
start() -> Result[None, Exception]
Start the resource watcher.
| RETURNS | DESCRIPTION |
|---|---|
Result[None, Exception]
|
|
Result[None, Exception]
|
to |
fallback
¶
fallback(
fallback_value: T,
exceptions: tuple[type[Exception], ...] = (Exception,),
) -> FallbackDecorator
Provide a fallback value on failures.
If the wrapped function returns an Err() or raises a specified exception, the fallback value is returned wrapped in an Ok().
| PARAMETER | DESCRIPTION |
|---|---|
fallback_value
|
The value to return on failure.
TYPE:
|
exceptions
|
Exceptions to catch.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
FallbackDecorator
|
Decorator function. |
timeout
¶
timeout(seconds: float) -> TimeoutDecorator
Enforce a maximum execution time.
If the execution time exceeds the specified limit, returns Err(TimeoutError).
| PARAMETER | DESCRIPTION |
|---|---|
seconds
|
Maximum allowed execution time in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TimeoutDecorator
|
Decorator function. |
calculate_delay
¶
calculate_delay(attempt: int, config: RetryConfig) -> float
Calculate delay before next retry.
| PARAMETER | DESCRIPTION |
|---|---|
attempt
|
Current attempt number (1-indexed).
TYPE:
|
config
|
Retry configuration.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
float
|
Delay in seconds before next retry. |
retry_on_exception
¶
retry_on_exception(
exception_types: tuple[type[Exception], ...],
max_attempts: int = 3,
) -> RetryDecorator
Retry on specific exceptions.
A simpler alternative to the full retry decorator when you just need basic retry functionality.
| PARAMETER | DESCRIPTION |
|---|---|
exception_types
|
Exception types to retry on.
TYPE:
|
max_attempts
|
Maximum number of attempts.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RetryDecorator
|
Decorated function with retry logic. |
Example
@retry_on_exception((ValueError,), max_attempts=2) ... def parse_data(data: str) -> dict: ... return json.loads(data)
Adaptive Circuit Breaker¶
Adaptive Circuit Breaker — auto-tunes failure threshold via rolling window.
Unlike standard Circuit Breakers that use static absolute failure counts, the AdaptiveCircuitBreaker opens its circuit ONLY when the error rate exceeds a target percentage in a rolling window of recent calls AND a minimum throughput of requests has been met.
AdaptiveMetrics
dataclass
¶
AdaptiveMetrics(
success_rate: float,
error_rate: float,
total_calls: int,
error_count: int,
state: CircuitState,
)
Snapshot of adaptive circuit breaker metrics.
| ATTRIBUTE | DESCRIPTION |
|---|---|
success_rate |
Current success rate (0.0 - 1.0).
TYPE:
|
error_rate |
Current error rate (0.0 - 1.0).
TYPE:
|
total_calls |
Total calls in the window.
TYPE:
|
error_count |
Errors in the window.
TYPE:
|
state |
Current circuit state.
TYPE:
|
AdaptiveCircuitBreaker
¶
AdaptiveCircuitBreaker(
name: str = "default",
*,
window_size: int = 100,
min_throughput: int = 10,
target_error_rate: float = 0.5,
recovery_timeout: float = 30.0,
)
Circuit breaker that opens based on an error rate percentage.
Maintains a rolling window of call outcomes. The circuit trips to OPEN if:
1. The window_size history has at least min_throughput events.
2. The error rate (errors / total) > target_error_rate.
Once OPEN, it waits recovery_timeout seconds before transitioning
to HALF_OPEN. In HALF_OPEN, if a request succeeds, it CLOSES and
clears the window. If it fails, it returns to OPEN.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Identifier for logging.
TYPE:
|
window_size
|
Number of recent calls to track.
TYPE:
|
min_throughput
|
Minimum requests before considering error rate.
TYPE:
|
target_error_rate
|
Desired error rate boundary (0.0 - 1.0).
TYPE:
|
recovery_timeout
|
Seconds before attempting half-open recovery.
TYPE:
|
Initialize the adaptive circuit breaker.
state
property
¶
state: CircuitState
Current circuit state. May evaluate timeouts and switch to HALF_OPEN.
record_failure
¶
record_failure(_exc: Exception) -> None
Record a failed call.
| PARAMETER | DESCRIPTION |
|---|---|
_exc
|
The exception that occurred.
TYPE:
|
evaluate_result
¶
evaluate_result(
result: Result[T, Exception],
) -> Result[T, Exception]
Evaluate a Result and record success or failure.
| PARAMETER | DESCRIPTION |
|---|---|
result
|
A
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[T, Exception]
|
The original Result. |
should_allow
¶
should_allow() -> bool
Check if a call should be attempted.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
Adaptive Retry¶
Adaptive Retry — learns optimal backoff from runtime outcomes.
Tracks recent retry outcomes in a rolling window and computes the best delay for each attempt level, favouring delays that historically led to successful retries.
RetryMetrics
dataclass
¶
RetryMetrics(
success_rate: float,
avg_delay: float,
p95_delay: float,
total_outcomes: int,
)
Snapshot of adaptive retry metrics.
| ATTRIBUTE | DESCRIPTION |
|---|---|
success_rate |
Overall success rate (0.0 - 1.0).
TYPE:
|
avg_delay |
Average delay across all successful retries.
TYPE:
|
p95_delay |
95th percentile delay.
TYPE:
|
total_outcomes |
Total tracked outcomes.
TYPE:
|
AdaptiveRetry
¶
AdaptiveRetry(
*,
min_delay: float = 0.1,
max_delay: float = 60.0,
window_size: int = 50,
max_attempts: int = 3,
)
Retry strategy that learns optimal delays from outcomes.
Maintains per-attempt-level statistics and returns the delay that historically led to successful retries at that attempt level.
| PARAMETER | DESCRIPTION |
|---|---|
min_delay
|
Minimum delay in seconds.
TYPE:
|
max_delay
|
Maximum delay in seconds.
TYPE:
|
window_size
|
Number of recent outcomes to track.
TYPE:
|
max_attempts
|
Default max attempts for
TYPE:
|
Example
ar = AdaptiveRetry(min_delay=0.1, max_delay=30.0) ar.record_outcome(attempt=1, success=True, elapsed=0.5) delay = ar.get_delay(attempt=1)
Initialize the adaptive retry.
| PARAMETER | DESCRIPTION |
|---|---|
min_delay
|
Minimum delay.
TYPE:
|
max_delay
|
Maximum delay.
TYPE:
|
window_size
|
Rolling window size.
TYPE:
|
max_attempts
|
Default max attempts.
TYPE:
|
record_outcome
¶
record_outcome(
attempt: int, success: bool, elapsed: float
) -> None
Record a retry outcome.
| PARAMETER | DESCRIPTION |
|---|---|
attempt
|
Attempt number (1-indexed).
TYPE:
|
success
|
Whether the attempt succeeded.
TYPE:
|
elapsed
|
Time elapsed before this attempt was made.
TYPE:
|
get_delay
¶
get_delay(attempt: int) -> float
Get the learned optimal delay for this attempt level.
If there is historical data for this attempt level, returns the median of successful delays. Otherwise uses exponential backoff with the configured bounds.
| PARAMETER | DESCRIPTION |
|---|---|
attempt
|
Attempt number (1-indexed).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
float
|
Delay in seconds. |
to_retry_config
¶
to_retry_config() -> RetryConfig
Export current state as a standard RetryConfig.
Uses the learned initial delay (attempt=1) if available.
| RETURNS | DESCRIPTION |
|---|---|
RetryConfig
|
A |
Bulkhead¶
Bulkhead pattern — concurrency isolation via semaphore.
Limits the number of concurrent executions to prevent a single failing dependency from consuming all available resources.
BulkheadFullError
¶
BulkheadFullError(
name: str, max_concurrent: int, max_queue: int
)
Bases: Exception
Raised when the bulkhead queue is at capacity.
Initialize BulkheadFullError.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Bulkhead name.
TYPE:
|
max_concurrent
|
Concurrency limit.
TYPE:
|
max_queue
|
Queue limit.
TYPE:
|
Bulkhead
¶
Bulkhead(
name: str = "default",
*,
max_concurrent: int = 10,
max_queue: int = 50,
timeout: float = 30.0,
)
Concurrency limiter using asyncio.Semaphore.
Limits the number of concurrent executions of a callable.
Excess callers are queued up to max_queue; beyond that
a BulkheadFullError is returned.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Identifier for logging.
TYPE:
|
max_concurrent
|
Maximum concurrent executions.
TYPE:
|
max_queue
|
Maximum queued callers beyond concurrent limit.
TYPE:
|
timeout
|
Seconds to wait for a permit before timing out.
TYPE:
|
Example
bulk = Bulkhead("db", max_concurrent=5, max_queue=10) result = await bulk.execute(fetch_data, user_id)
Initialize the bulkhead.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Bulkhead name.
TYPE:
|
max_concurrent
|
Concurrency limit.
TYPE:
|
max_queue
|
Queue limit.
TYPE:
|
timeout
|
Permit acquisition timeout.
TYPE:
|
execute
async
¶
execute(
fn: Callable[P, Awaitable[T]],
*args: args,
**kwargs: kwargs,
) -> Result[T, Exception]
Execute a callable within bulkhead limits.
| PARAMETER | DESCRIPTION |
|---|---|
fn
|
Async callable to execute.
TYPE:
|
*args
|
Positional arguments for fn.
TYPE:
|
**kwargs
|
Keyword arguments for fn.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[T, Exception]
|
|
Orchestrator¶
Resilience Orchestrator — compose multiple patterns into a pipeline.
Provides a fluent builder to combine bulkhead, circuit breaker, retry, timeout, and fallback into a single execution pipeline.
Execution order: bulkhead → circuit breaker → retry → timeout → fn → fallback.
ResilienceOrchestrator
¶
ResilienceOrchestrator(name: str = 'default')
Bases: Generic[T]
Compose resilience patterns into a single pipeline.
Provides a fluent builder API to add patterns in order. Execution proceeds through each configured layer.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Pipeline name for logging.
TYPE:
|
Example
orch = ( ... ResilienceOrchestrator("api") ... .with_bulkhead(max_concurrent=5) ... .with_circuit_breaker(breaker) ... .with_retry(RetryConfig(max_attempts=3)) ... .with_timeout(10.0) ... .with_fallback({"status": "cached"}) ... ) result = await orch.execute(call_api, endpoint)
Initialize the orchestrator.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Pipeline name.
TYPE:
|
with_bulkhead
¶
with_bulkhead(
max_concurrent: int = 10,
max_queue: int = 50,
timeout: float = 30.0,
) -> ResilienceOrchestrator[T]
Add a bulkhead concurrency limiter.
| PARAMETER | DESCRIPTION |
|---|---|
max_concurrent
|
Max concurrent executions.
TYPE:
|
max_queue
|
Max queued callers.
TYPE:
|
timeout
|
Permit acquisition timeout.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_circuit_breaker
¶
with_circuit_breaker(
breaker: CircuitBreaker | AdaptiveCircuitBreaker,
) -> ResilienceOrchestrator[T]
Add a circuit breaker.
| PARAMETER | DESCRIPTION |
|---|---|
breaker
|
Standard or adaptive circuit breaker.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_retry
¶
with_retry(
config: RetryConfig | AdaptiveRetry,
) -> ResilienceOrchestrator[T]
Add retry logic.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Standard retry config or adaptive retry.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_timeout
¶
with_timeout(seconds: float) -> ResilienceOrchestrator[T]
Add a timeout.
| PARAMETER | DESCRIPTION |
|---|---|
seconds
|
Maximum execution time.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
with_fallback
¶
with_fallback(value: T) -> ResilienceOrchestrator[T]
Add a fallback value for failures.
| PARAMETER | DESCRIPTION |
|---|---|
value
|
Value to return on failure.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResilienceOrchestrator[T]
|
self for chaining. |
execute
async
¶
execute(
fn: Callable[P, Awaitable[T]],
*args: args,
**kwargs: kwargs,
) -> Result[T, Exception]
Execute the function through the resilience pipeline.
Order: bulkhead → circuit breaker → retry → timeout → fn → fallback.
| PARAMETER | DESCRIPTION |
|---|---|
fn
|
Async callable to execute.
TYPE:
|
*args
|
Positional arguments.
TYPE:
|
**kwargs
|
Keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[T, Exception]
|
|
Watchdogs¶
Watchdog sub-package for TaipanStack resilience.
Provides background monitors that proactively detect and respond to system degradation: resource pressure, configuration drift, and dependency failures.
BaseWatcher
¶
BaseWatcher(*, interval: float = 5.0)
Bases: ABC
Abstract base for background watchdog tasks.
Subclasses implement _run which is called repeatedly at
_interval seconds until stop is called.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between each poll cycle.
TYPE:
|
Example
class MyWatcher(BaseWatcher): ... async def _run(self) -> None: ... print("checking...") watcher = MyWatcher(interval=5.0) await watcher.start()
Initialize the base watcher.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between each poll cycle.
TYPE:
|
start
async
¶
start() -> Result[None, Exception]
Start the background watcher loop.
| RETURNS | DESCRIPTION |
|---|---|
Result[None, Exception]
|
|
ConfigWatcher
¶
ConfigWatcher(
*,
config_paths: Sequence[Path],
config_model: type[BaseModel],
interval: float = 2.0,
on_config_change: Callable[[BaseModel], None]
| None = None,
on_validation_error: Callable[[Exception], None]
| None = None,
)
Bases: BaseWatcher
Background watcher that detects configuration file changes.
Polls file hashes at each interval. When a change is detected
the content is validated via the provided Pydantic model and,
if valid, the on_config_change callback is invoked.
| PARAMETER | DESCRIPTION |
|---|---|
config_paths
|
Files to watch.
TYPE:
|
config_model
|
Pydantic model for validation.
TYPE:
|
interval
|
Seconds between polls.
TYPE:
|
on_config_change
|
Callback receiving the validated model.
TYPE:
|
on_validation_error
|
Callback receiving the
TYPE:
|
Example
watcher = ConfigWatcher( ... config_paths=[Path(".env")], ... config_model=MySettings, ... on_config_change=lambda cfg: apply(cfg), ... ) await watcher.start()
Initialize the config watcher.
| PARAMETER | DESCRIPTION |
|---|---|
config_paths
|
Files to watch.
TYPE:
|
config_model
|
Pydantic model for validation.
TYPE:
|
interval
|
Seconds between polls.
TYPE:
|
on_config_change
|
Callback for valid config changes.
TYPE:
|
on_validation_error
|
Callback for validation failures.
TYPE:
|
HealthPinger
¶
HealthPinger(
*,
targets: Sequence[HealthTarget],
interval: float = 10.0,
on_health_change: Callable[[str, bool], None]
| None = None,
)
Bases: BaseWatcher
Background watcher that pings external dependencies.
For each registered HealthTarget, calls its check
coroutine on every cycle. If a target is unhealthy and has
an associated CircuitBreaker, the breaker is opened
preventively.
| PARAMETER | DESCRIPTION |
|---|---|
targets
|
Dependencies to monitor.
TYPE:
|
interval
|
Seconds between ping cycles.
TYPE:
|
on_health_change
|
Optional callback
TYPE:
|
Example
async def db_ping() -> bool: ... return await pool.fetchval("SELECT 1") == 1 pinger = HealthPinger( ... targets=[HealthTarget("db", db_ping, breaker)], ... ) await pinger.start()
Initialize the health pinger.
| PARAMETER | DESCRIPTION |
|---|---|
targets
|
Dependencies to monitor.
TYPE:
|
interval
|
Seconds between ping cycles.
TYPE:
|
on_health_change
|
Optional callback on status change.
TYPE:
|
HealthTarget
dataclass
¶
HealthTarget(
name: str,
check: Callable[[], Awaitable[bool]],
circuit_breaker: CircuitBreaker | None = None,
)
A dependency to be monitored by :class:HealthPinger.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Human-readable name for logging.
TYPE:
|
check |
Async callable returning
TYPE:
|
circuit_breaker |
Optional circuit breaker to open on failure.
TYPE:
|
ResourceSnapshot
dataclass
¶
ResourceSnapshot(
cpu_percent: float,
memory_percent: float,
timestamp: float,
)
Point-in-time snapshot of system resource usage.
| ATTRIBUTE | DESCRIPTION |
|---|---|
cpu_percent |
Current CPU utilisation (0-100).
TYPE:
|
memory_percent |
Current memory utilisation (0-100).
TYPE:
|
timestamp |
Monotonic timestamp of the reading.
TYPE:
|
ResourceWatcher
¶
ResourceWatcher(
*,
interval: float = 5.0,
cpu_threshold: float = 85.0,
memory_threshold: float = 85.0,
on_threshold_breach: Callable[[str, float], None]
| None = None,
)
Bases: BaseWatcher
Background watcher that monitors CPU and memory.
When either metric exceeds its configured threshold the
on_threshold_breach callback is invoked with the resource
name ("cpu" or "memory") and the current value.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between checks.
TYPE:
|
cpu_threshold
|
CPU percentage that triggers a breach.
TYPE:
|
memory_threshold
|
Memory percentage that triggers a breach.
TYPE:
|
on_threshold_breach
|
Optional callback
TYPE:
|
Example
watcher = ResourceWatcher( ... cpu_threshold=80.0, ... on_threshold_breach=lambda r, v: print(f"{r} at {v}%"), ... ) await watcher.start()
Initialize the resource watcher.
| PARAMETER | DESCRIPTION |
|---|---|
interval
|
Seconds between checks.
TYPE:
|
cpu_threshold
|
CPU percentage that triggers a breach.
TYPE:
|
memory_threshold
|
Memory percentage that triggers a breach.
TYPE:
|
on_threshold_breach
|
Optional breach callback.
TYPE:
|
start
async
¶
start() -> Result[None, Exception]
Start the resource watcher.
| RETURNS | DESCRIPTION |
|---|---|
Result[None, Exception]
|
|
Result[None, Exception]
|
to |
validate_config
¶
validate_config(
data: dict[str, object], model: type[BaseModel]
) -> Result[BaseModel, Exception]
Validate a data dictionary against a Pydantic model.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Raw configuration data.
TYPE:
|
model
|
Pydantic model class to validate against.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[BaseModel, Exception]
|
|
Result[BaseModel, Exception]
|
on failure. |
check_all
async
¶
check_all(
targets: Sequence[HealthTarget],
) -> Result[dict[str, bool], Exception]
Run health checks for all targets concurrently.
| PARAMETER | DESCRIPTION |
|---|---|
targets
|
Targets to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[dict[str, bool], Exception]
|
|
check_target
async
¶
check_target(
target: HealthTarget,
) -> Result[bool, Exception]
Run a single health check.
| PARAMETER | DESCRIPTION |
|---|---|
target
|
The target to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Result[bool, Exception]
|
|
Result[bool, Exception]
|
|
check_resources
¶
check_resources() -> Result[ResourceSnapshot, Exception]
Take a one-shot resource reading.
| RETURNS | DESCRIPTION |
|---|---|
Result[ResourceSnapshot, Exception]
|
|
Result[ResourceSnapshot, Exception]
|
unavailable. |