Skip to content

Resilience

TaipanStack's resilience module provides components for ensuring system stability and self-healing under failure conditions.

taipanstack.resilience

Resilience module for TaipanStack.

Provides circuit breaker, retry, fallback, and timeout patterns for building resilient applications. This is the canonical home for all resilience-related utilities.

AdaptiveCircuitBreaker

AdaptiveCircuitBreaker(
    name: str = "default",
    *,
    window_size: int = 100,
    min_throughput: int = 10,
    target_error_rate: float = 0.5,
    recovery_timeout: float = 30.0,
)

Circuit breaker that opens based on an error rate percentage.

Maintains a rolling window of call outcomes. The circuit trips to OPEN if: 1. The window_size history has at least min_throughput events. 2. The error rate (errors / total) > target_error_rate.

Once OPEN, it waits recovery_timeout seconds before transitioning to HALF_OPEN. In HALF_OPEN, if a request succeeds, it CLOSES and clears the window. If it fails, it returns to OPEN.

PARAMETER DESCRIPTION
name

Identifier for logging.

TYPE: str DEFAULT: 'default'

window_size

Number of recent calls to track.

TYPE: int DEFAULT: 100

min_throughput

Minimum requests before considering error rate.

TYPE: int DEFAULT: 10

target_error_rate

Desired error rate boundary (0.0 - 1.0).

TYPE: float DEFAULT: 0.5

recovery_timeout

Seconds before attempting half-open recovery.

TYPE: float DEFAULT: 30.0

Initialize the adaptive circuit breaker.

state property

state: CircuitState

Current circuit state. May evaluate timeouts and switch to HALF_OPEN.

metrics property

metrics: AdaptiveMetrics

Snapshot of current adaptive metrics.

record_success

record_success() -> None

Record a successful call.

record_failure

record_failure(_exc: Exception) -> None

Record a failed call.

PARAMETER DESCRIPTION
_exc

The exception that occurred.

TYPE: Exception

evaluate_result

evaluate_result(
    result: Result[T, Exception],
) -> Result[T, Exception]

Evaluate a Result and record success or failure.

PARAMETER DESCRIPTION
result

A Result to evaluate.

TYPE: Result[T, Exception]

RETURNS DESCRIPTION
Result[T, Exception]

The original Result.

should_allow

should_allow() -> bool

Check if a call should be attempted.

RETURNS DESCRIPTION
bool

True if the circuit permits a call.

reset

reset() -> None

Reset the breaker and window.

AdaptiveMetrics dataclass

AdaptiveMetrics(
    success_rate: float,
    error_rate: float,
    total_calls: int,
    error_count: int,
    state: CircuitState,
)

Snapshot of adaptive circuit breaker metrics.

ATTRIBUTE DESCRIPTION
success_rate

Current success rate (0.0 - 1.0).

TYPE: float

error_rate

Current error rate (0.0 - 1.0).

TYPE: float

total_calls

Total calls in the window.

TYPE: int

error_count

Errors in the window.

TYPE: int

state

Current circuit state.

TYPE: CircuitState

AdaptiveRetry

AdaptiveRetry(
    *,
    min_delay: float = 0.1,
    max_delay: float = 60.0,
    window_size: int = 50,
    max_attempts: int = 3,
)

Retry strategy that learns optimal delays from outcomes.

Maintains per-attempt-level statistics and returns the delay that historically led to successful retries at that attempt level.

PARAMETER DESCRIPTION
min_delay

Minimum delay in seconds.

TYPE: float DEFAULT: 0.1

max_delay

Maximum delay in seconds.

TYPE: float DEFAULT: 60.0

window_size

Number of recent outcomes to track.

TYPE: int DEFAULT: 50

max_attempts

Default max attempts for to_retry_config().

TYPE: int DEFAULT: 3

Example

ar = AdaptiveRetry(min_delay=0.1, max_delay=30.0) ar.record_outcome(attempt=1, success=True, elapsed=0.5) delay = ar.get_delay(attempt=1)

Initialize the adaptive retry.

PARAMETER DESCRIPTION
min_delay

Minimum delay.

TYPE: float DEFAULT: 0.1

max_delay

Maximum delay.

TYPE: float DEFAULT: 60.0

window_size

Rolling window size.

TYPE: int DEFAULT: 50

max_attempts

Default max attempts.

TYPE: int DEFAULT: 3

metrics property

metrics: RetryMetrics

Snapshot of current adaptive retry metrics.

record_outcome

record_outcome(
    attempt: int, success: bool, elapsed: float
) -> None

Record a retry outcome.

PARAMETER DESCRIPTION
attempt

Attempt number (1-indexed).

TYPE: int

success

Whether the attempt succeeded.

TYPE: bool

elapsed

Time elapsed before this attempt was made.

TYPE: float

get_delay

get_delay(attempt: int) -> float

Get the learned optimal delay for this attempt level.

If there is historical data for this attempt level, returns the median of successful delays. Otherwise uses exponential backoff with the configured bounds.

PARAMETER DESCRIPTION
attempt

Attempt number (1-indexed).

TYPE: int

RETURNS DESCRIPTION
float

Delay in seconds.

to_retry_config

to_retry_config() -> RetryConfig

Export current state as a standard RetryConfig.

Uses the learned initial delay (attempt=1) if available.

RETURNS DESCRIPTION
RetryConfig

A RetryConfig snapshot.

Bulkhead

Bulkhead(
    name: str = "default",
    *,
    max_concurrent: int = 10,
    max_queue: int = 50,
    timeout: float = 30.0,
)

Concurrency limiter using asyncio.Semaphore.

Limits the number of concurrent executions of a callable. Excess callers are queued up to max_queue; beyond that a BulkheadFullError is returned.

PARAMETER DESCRIPTION
name

Identifier for logging.

TYPE: str DEFAULT: 'default'

max_concurrent

Maximum concurrent executions.

TYPE: int DEFAULT: 10

max_queue

Maximum queued callers beyond concurrent limit.

TYPE: int DEFAULT: 50

timeout

Seconds to wait for a permit before timing out.

TYPE: float DEFAULT: 30.0

Example

bulk = Bulkhead("db", max_concurrent=5, max_queue=10) result = await bulk.execute(fetch_data, user_id)

Initialize the bulkhead.

PARAMETER DESCRIPTION
name

Bulkhead name.

TYPE: str DEFAULT: 'default'

max_concurrent

Concurrency limit.

TYPE: int DEFAULT: 10

max_queue

Queue limit.

TYPE: int DEFAULT: 50

timeout

Permit acquisition timeout.

TYPE: float DEFAULT: 30.0

available_permits property

available_permits: int

Number of available concurrency permits.

queued property

queued: int

Number of callers currently waiting in the queue.

active property

active: int

Number of currently executing tasks.

execute async

execute(
    fn: Callable[P, Awaitable[T]],
    *args: args,
    **kwargs: kwargs,
) -> Result[T, Exception]

Execute a callable within bulkhead limits.

PARAMETER DESCRIPTION
fn

Async callable to execute.

TYPE: Callable[P, Awaitable[T]]

*args

Positional arguments for fn.

TYPE: args DEFAULT: ()

**kwargs

Keyword arguments for fn.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
Result[T, Exception]

Ok(result) on success, Err on failure.

BulkheadFullError

BulkheadFullError(
    name: str, max_concurrent: int, max_queue: int
)

Bases: Exception

Raised when the bulkhead queue is at capacity.

Initialize BulkheadFullError.

PARAMETER DESCRIPTION
name

Bulkhead name.

TYPE: str

max_concurrent

Concurrency limit.

TYPE: int

max_queue

Queue limit.

TYPE: int

ResilienceOrchestrator

ResilienceOrchestrator(name: str = 'default')

Bases: Generic[T]

Compose resilience patterns into a single pipeline.

Provides a fluent builder API to add patterns in order. Execution proceeds through each configured layer.

PARAMETER DESCRIPTION
name

Pipeline name for logging.

TYPE: str DEFAULT: 'default'

Example

orch = ( ... ResilienceOrchestrator("api") ... .with_bulkhead(max_concurrent=5) ... .with_circuit_breaker(breaker) ... .with_retry(RetryConfig(max_attempts=3)) ... .with_timeout(10.0) ... .with_fallback({"status": "cached"}) ... ) result = await orch.execute(call_api, endpoint)

Initialize the orchestrator.

PARAMETER DESCRIPTION
name

Pipeline name.

TYPE: str DEFAULT: 'default'

with_bulkhead

with_bulkhead(
    max_concurrent: int = 10,
    max_queue: int = 50,
    timeout: float = 30.0,
) -> ResilienceOrchestrator[T]

Add a bulkhead concurrency limiter.

PARAMETER DESCRIPTION
max_concurrent

Max concurrent executions.

TYPE: int DEFAULT: 10

max_queue

Max queued callers.

TYPE: int DEFAULT: 50

timeout

Permit acquisition timeout.

TYPE: float DEFAULT: 30.0

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_circuit_breaker

with_circuit_breaker(
    breaker: CircuitBreaker | AdaptiveCircuitBreaker,
) -> ResilienceOrchestrator[T]

Add a circuit breaker.

PARAMETER DESCRIPTION
breaker

Standard or adaptive circuit breaker.

TYPE: CircuitBreaker | AdaptiveCircuitBreaker

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_retry

with_retry(
    config: RetryConfig | AdaptiveRetry,
) -> ResilienceOrchestrator[T]

Add retry logic.

PARAMETER DESCRIPTION
config

Standard retry config or adaptive retry.

TYPE: RetryConfig | AdaptiveRetry

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_timeout

with_timeout(seconds: float) -> ResilienceOrchestrator[T]

Add a timeout.

PARAMETER DESCRIPTION
seconds

Maximum execution time.

TYPE: float

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_fallback

with_fallback(value: T) -> ResilienceOrchestrator[T]

Add a fallback value for failures.

PARAMETER DESCRIPTION
value

Value to return on failure.

TYPE: T

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

execute async

execute(
    fn: Callable[P, Awaitable[T]],
    *args: args,
    **kwargs: kwargs,
) -> Result[T, Exception]

Execute the function through the resilience pipeline.

Order: bulkhead → circuit breaker → retry → timeout → fn → fallback.

PARAMETER DESCRIPTION
fn

Async callable to execute.

TYPE: Callable[P, Awaitable[T]]

*args

Positional arguments.

TYPE: args DEFAULT: ()

**kwargs

Keyword arguments.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
Result[T, Exception]

Ok(result) on success, Err on failure.

RetryMetrics dataclass

RetryMetrics(
    success_rate: float,
    avg_delay: float,
    p95_delay: float,
    total_outcomes: int,
)

Snapshot of adaptive retry metrics.

ATTRIBUTE DESCRIPTION
success_rate

Overall success rate (0.0 - 1.0).

TYPE: float

avg_delay

Average delay across all successful retries.

TYPE: float

p95_delay

95th percentile delay.

TYPE: float

total_outcomes

Total tracked outcomes.

TYPE: int

CircuitBreaker

CircuitBreaker(
    *,
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
    name: str = "default",
    on_state_change: Callable[
        [CircuitState, CircuitState], None
    ]
    | None = None,
)

Circuit breaker implementation.

Monitors function calls and opens the circuit when too many failures occur, preventing further calls until the service recovers. Supports both sync and async functions.

Example

breaker = CircuitBreaker(failure_threshold=3) @breaker ... def call_external_api(): ... return requests.get("https://api.example.com", timeout=10)

Initialize CircuitBreaker.

PARAMETER DESCRIPTION
failure_threshold

Failures before opening circuit.

TYPE: int DEFAULT: 5

success_threshold

Successes to close from half-open.

TYPE: int DEFAULT: 2

timeout

Seconds before attempting half-open.

TYPE: float DEFAULT: 30.0

excluded_exceptions

Exceptions that don't trip circuit.

TYPE: tuple[type[Exception], ...] DEFAULT: ()

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

name

Name for logging/identification.

TYPE: str DEFAULT: 'default'

on_state_change

Optional callback invoked on state transitions with (old_state, new_state). Useful for custom monitoring.

TYPE: Callable[[CircuitState, CircuitState], None] | None DEFAULT: None

state property

state: CircuitState

Get current circuit state.

failure_count property

failure_count: int

Get current failure count.

reset

reset() -> None

Reset circuit breaker to closed state.

__call__

__call__(
    func: Callable[P, R] | Callable[P, Awaitable[R]],
) -> Callable[P, R] | Callable[P, Awaitable[R]]

Decorate a sync or async function with circuit breaker protection.

CircuitBreakerConfig dataclass

CircuitBreakerConfig(
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
)

Configuration for circuit breaker behavior.

ATTRIBUTE DESCRIPTION
failure_threshold

Number of failures before opening circuit.

TYPE: int

success_threshold

Successes needed in half-open to close.

TYPE: int

timeout

Seconds before trying half-open after open.

TYPE: float

excluded_exceptions

Exceptions that don't count as failures.

TYPE: tuple[type[Exception], ...]

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...]

__post_init__

__post_init__() -> None

Validate configuration values.

CircuitBreakerError

CircuitBreakerError(message: str, state: CircuitState)

Bases: Exception

Raised when circuit breaker is open.

Initialize CircuitBreakerError.

PARAMETER DESCRIPTION
message

Error description.

TYPE: str

state

Current circuit state.

TYPE: CircuitState

CircuitBreakerState dataclass

CircuitBreakerState(
    state: CircuitState = CLOSED,
    failure_count: int = 0,
    success_count: int = 0,
    half_open_attempts: int = 0,
    last_failure_time: float = 0.0,
    lock: Lock = Lock(),
)

Internal state tracking for circuit breaker.

CircuitState

Bases: Enum

States of the circuit breaker.

Retrier

Retrier(
    *,
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    on: tuple[type[Exception], ...] = (Exception,),
)

Context manager for retry logic.

Provides a context manager interface for retry logic when decorators are not suitable.

Example

retrier = Retrier(max_attempts=3, on=(ConnectionError,)) with retrier: ... result = some_operation()

Initialize Retrier.

PARAMETER DESCRIPTION
max_attempts

Maximum retry attempts.

TYPE: int DEFAULT: 3

initial_delay

Initial delay between retries.

TYPE: float DEFAULT: 1.0

max_delay

Maximum delay between retries.

TYPE: float DEFAULT: 60.0

on

Exception types to retry on.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

__enter__

__enter__() -> Retrier

Enter the retry context.

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> bool

Exit the retry context.

Returns True to suppress the exception if we should retry, False to let it propagate.

RetryConfig dataclass

RetryConfig(
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    jitter_factor: float = 0.1,
    log_retries: bool = True,
    on_retry: Callable[[int, int, Exception, float], None]
    | None = None,
)

Configuration for retry behavior.

ATTRIBUTE DESCRIPTION
max_attempts

Maximum number of retry attempts.

TYPE: int

initial_delay

Initial delay between retries in seconds.

TYPE: float

max_delay

Maximum delay between retries.

TYPE: float

exponential_base

Base for exponential backoff (2 = double each time).

TYPE: float

jitter

Whether to add random jitter to delays.

TYPE: bool

jitter_factor

Maximum jitter as fraction of delay (0.1 = 10%).

TYPE: float

log_retries

Whether to emit standard log messages.

TYPE: bool

on_retry

Optional callback invoked on each retry.

TYPE: Callable[[int, int, Exception, float], None] | None

__post_init__

__post_init__() -> None

Validate configuration parameters.

RetryError

RetryError(
    message: str,
    attempts: int,
    last_exception: Exception | None = None,
)

Bases: Exception

Raised when all retry attempts have failed.

Initialize RetryError.

PARAMETER DESCRIPTION
message

Description of the retry failure.

TYPE: str

attempts

Number of attempts made.

TYPE: int

last_exception

The last exception that was raised.

TYPE: Exception | None DEFAULT: None

BaseWatcher

BaseWatcher(*, interval: float = 5.0)

Bases: ABC

Abstract base for background watchdog tasks.

Subclasses implement _run which is called repeatedly at _interval seconds until stop is called.

PARAMETER DESCRIPTION
interval

Seconds between each poll cycle.

TYPE: float DEFAULT: 5.0

Example

class MyWatcher(BaseWatcher): ... async def _run(self) -> None: ... print("checking...") watcher = MyWatcher(interval=5.0) await watcher.start()

Initialize the base watcher.

PARAMETER DESCRIPTION
interval

Seconds between each poll cycle.

TYPE: float DEFAULT: 5.0

is_running property

is_running: bool

Return True if the background task is active.

start async

start() -> Result[None, Exception]

Start the background watcher loop.

RETURNS DESCRIPTION
Result[None, Exception]

Ok(None) on success, Err if already running.

stop async

stop() -> None

Signal the watcher to stop and wait for it to finish.

__repr__

__repr__() -> str

Return a developer-friendly representation.

ConfigWatcher

ConfigWatcher(
    *,
    config_paths: Sequence[Path],
    config_model: type[BaseModel],
    interval: float = 2.0,
    on_config_change: Callable[[BaseModel], None]
    | None = None,
    on_validation_error: Callable[[Exception], None]
    | None = None,
)

Bases: BaseWatcher

Background watcher that detects configuration file changes.

Polls file hashes at each interval. When a change is detected the content is validated via the provided Pydantic model and, if valid, the on_config_change callback is invoked.

PARAMETER DESCRIPTION
config_paths

Files to watch.

TYPE: Sequence[Path]

config_model

Pydantic model for validation.

TYPE: type[BaseModel]

interval

Seconds between polls.

TYPE: float DEFAULT: 2.0

on_config_change

Callback receiving the validated model.

TYPE: Callable[[BaseModel], None] | None DEFAULT: None

on_validation_error

Callback receiving the Exception when validation fails.

TYPE: Callable[[Exception], None] | None DEFAULT: None

Example

watcher = ConfigWatcher( ... config_paths=[Path(".env")], ... config_model=MySettings, ... on_config_change=lambda cfg: apply(cfg), ... ) await watcher.start()

Initialize the config watcher.

PARAMETER DESCRIPTION
config_paths

Files to watch.

TYPE: Sequence[Path]

config_model

Pydantic model for validation.

TYPE: type[BaseModel]

interval

Seconds between polls.

TYPE: float DEFAULT: 2.0

on_config_change

Callback for valid config changes.

TYPE: Callable[[BaseModel], None] | None DEFAULT: None

on_validation_error

Callback for validation failures.

TYPE: Callable[[Exception], None] | None DEFAULT: None

HealthPinger

HealthPinger(
    *,
    targets: Sequence[HealthTarget],
    interval: float = 10.0,
    on_health_change: Callable[[str, bool], None]
    | None = None,
)

Bases: BaseWatcher

Background watcher that pings external dependencies.

For each registered HealthTarget, calls its check coroutine on every cycle. If a target is unhealthy and has an associated CircuitBreaker, the breaker is opened preventively.

PARAMETER DESCRIPTION
targets

Dependencies to monitor.

TYPE: Sequence[HealthTarget]

interval

Seconds between ping cycles.

TYPE: float DEFAULT: 10.0

on_health_change

Optional callback (name, is_healthy).

TYPE: Callable[[str, bool], None] | None DEFAULT: None

Example

async def db_ping() -> bool: ... return await pool.fetchval("SELECT 1") == 1 pinger = HealthPinger( ... targets=[HealthTarget("db", db_ping, breaker)], ... ) await pinger.start()

Initialize the health pinger.

PARAMETER DESCRIPTION
targets

Dependencies to monitor.

TYPE: Sequence[HealthTarget]

interval

Seconds between ping cycles.

TYPE: float DEFAULT: 10.0

on_health_change

Optional callback on status change.

TYPE: Callable[[str, bool], None] | None DEFAULT: None

HealthTarget dataclass

HealthTarget(
    name: str,
    check: Callable[[], Awaitable[bool]],
    circuit_breaker: CircuitBreaker | None = None,
)

A dependency to be monitored by :class:HealthPinger.

ATTRIBUTE DESCRIPTION
name

Human-readable name for logging.

TYPE: str

check

Async callable returning True if the target is healthy, False otherwise.

TYPE: Callable[[], Awaitable[bool]]

circuit_breaker

Optional circuit breaker to open on failure.

TYPE: CircuitBreaker | None

ResourceSnapshot dataclass

ResourceSnapshot(
    cpu_percent: float,
    memory_percent: float,
    timestamp: float,
)

Point-in-time snapshot of system resource usage.

ATTRIBUTE DESCRIPTION
cpu_percent

Current CPU utilisation (0-100).

TYPE: float

memory_percent

Current memory utilisation (0-100).

TYPE: float

timestamp

Monotonic timestamp of the reading.

TYPE: float

ResourceWatcher

ResourceWatcher(
    *,
    interval: float = 5.0,
    cpu_threshold: float = 85.0,
    memory_threshold: float = 85.0,
    on_threshold_breach: Callable[[str, float], None]
    | None = None,
)

Bases: BaseWatcher

Background watcher that monitors CPU and memory.

When either metric exceeds its configured threshold the on_threshold_breach callback is invoked with the resource name ("cpu" or "memory") and the current value.

PARAMETER DESCRIPTION
interval

Seconds between checks.

TYPE: float DEFAULT: 5.0

cpu_threshold

CPU percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

memory_threshold

Memory percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

on_threshold_breach

Optional callback (resource, value) -> None.

TYPE: Callable[[str, float], None] | None DEFAULT: None

Example

watcher = ResourceWatcher( ... cpu_threshold=80.0, ... on_threshold_breach=lambda r, v: print(f"{r} at {v}%"), ... ) await watcher.start()

Initialize the resource watcher.

PARAMETER DESCRIPTION
interval

Seconds between checks.

TYPE: float DEFAULT: 5.0

cpu_threshold

CPU percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

memory_threshold

Memory percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

on_threshold_breach

Optional breach callback.

TYPE: Callable[[str, float], None] | None DEFAULT: None

start async

start() -> Result[None, Exception]

Start the resource watcher.

RETURNS DESCRIPTION
Result[None, Exception]

Err if psutil is not installed, otherwise delegates

Result[None, Exception]

to BaseWatcher.start().

fallback

fallback(
    fallback_value: T,
    exceptions: tuple[type[Exception], ...] = (Exception,),
) -> FallbackDecorator

Provide a fallback value on failures.

If the wrapped function returns an Err() or raises a specified exception, the fallback value is returned wrapped in an Ok().

PARAMETER DESCRIPTION
fallback_value

The value to return on failure.

TYPE: T

exceptions

Exceptions to catch.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

RETURNS DESCRIPTION
FallbackDecorator

Decorator function.

timeout

timeout(seconds: float) -> TimeoutDecorator

Enforce a maximum execution time.

If the execution time exceeds the specified limit, returns Err(TimeoutError).

PARAMETER DESCRIPTION
seconds

Maximum allowed execution time in seconds.

TYPE: float

RETURNS DESCRIPTION
TimeoutDecorator

Decorator function.

calculate_delay

calculate_delay(attempt: int, config: RetryConfig) -> float

Calculate delay before next retry.

PARAMETER DESCRIPTION
attempt

Current attempt number (1-indexed).

TYPE: int

config

Retry configuration.

TYPE: RetryConfig

RETURNS DESCRIPTION
float

Delay in seconds before next retry.

retry_on_exception

retry_on_exception(
    exception_types: tuple[type[Exception], ...],
    max_attempts: int = 3,
) -> RetryDecorator

Retry on specific exceptions.

A simpler alternative to the full retry decorator when you just need basic retry functionality.

PARAMETER DESCRIPTION
exception_types

Exception types to retry on.

TYPE: tuple[type[Exception], ...]

max_attempts

Maximum number of attempts.

TYPE: int DEFAULT: 3

RETURNS DESCRIPTION
RetryDecorator

Decorated function with retry logic.

Example

@retry_on_exception((ValueError,), max_attempts=2) ... def parse_data(data: str) -> dict: ... return json.loads(data)


Adaptive Circuit Breaker

Adaptive Circuit Breaker — auto-tunes failure threshold via rolling window.

Unlike standard Circuit Breakers that use static absolute failure counts, the AdaptiveCircuitBreaker opens its circuit ONLY when the error rate exceeds a target percentage in a rolling window of recent calls AND a minimum throughput of requests has been met.

AdaptiveMetrics dataclass

AdaptiveMetrics(
    success_rate: float,
    error_rate: float,
    total_calls: int,
    error_count: int,
    state: CircuitState,
)

Snapshot of adaptive circuit breaker metrics.

ATTRIBUTE DESCRIPTION
success_rate

Current success rate (0.0 - 1.0).

TYPE: float

error_rate

Current error rate (0.0 - 1.0).

TYPE: float

total_calls

Total calls in the window.

TYPE: int

error_count

Errors in the window.

TYPE: int

state

Current circuit state.

TYPE: CircuitState

AdaptiveCircuitBreaker

AdaptiveCircuitBreaker(
    name: str = "default",
    *,
    window_size: int = 100,
    min_throughput: int = 10,
    target_error_rate: float = 0.5,
    recovery_timeout: float = 30.0,
)

Circuit breaker that opens based on an error rate percentage.

Maintains a rolling window of call outcomes. The circuit trips to OPEN if: 1. The window_size history has at least min_throughput events. 2. The error rate (errors / total) > target_error_rate.

Once OPEN, it waits recovery_timeout seconds before transitioning to HALF_OPEN. In HALF_OPEN, if a request succeeds, it CLOSES and clears the window. If it fails, it returns to OPEN.

PARAMETER DESCRIPTION
name

Identifier for logging.

TYPE: str DEFAULT: 'default'

window_size

Number of recent calls to track.

TYPE: int DEFAULT: 100

min_throughput

Minimum requests before considering error rate.

TYPE: int DEFAULT: 10

target_error_rate

Desired error rate boundary (0.0 - 1.0).

TYPE: float DEFAULT: 0.5

recovery_timeout

Seconds before attempting half-open recovery.

TYPE: float DEFAULT: 30.0

Initialize the adaptive circuit breaker.

state property

state: CircuitState

Current circuit state. May evaluate timeouts and switch to HALF_OPEN.

metrics property

metrics: AdaptiveMetrics

Snapshot of current adaptive metrics.

record_success

record_success() -> None

Record a successful call.

record_failure

record_failure(_exc: Exception) -> None

Record a failed call.

PARAMETER DESCRIPTION
_exc

The exception that occurred.

TYPE: Exception

evaluate_result

evaluate_result(
    result: Result[T, Exception],
) -> Result[T, Exception]

Evaluate a Result and record success or failure.

PARAMETER DESCRIPTION
result

A Result to evaluate.

TYPE: Result[T, Exception]

RETURNS DESCRIPTION
Result[T, Exception]

The original Result.

should_allow

should_allow() -> bool

Check if a call should be attempted.

RETURNS DESCRIPTION
bool

True if the circuit permits a call.

reset

reset() -> None

Reset the breaker and window.


Adaptive Retry

Adaptive Retry — learns optimal backoff from runtime outcomes.

Tracks recent retry outcomes in a rolling window and computes the best delay for each attempt level, favouring delays that historically led to successful retries.

RetryMetrics dataclass

RetryMetrics(
    success_rate: float,
    avg_delay: float,
    p95_delay: float,
    total_outcomes: int,
)

Snapshot of adaptive retry metrics.

ATTRIBUTE DESCRIPTION
success_rate

Overall success rate (0.0 - 1.0).

TYPE: float

avg_delay

Average delay across all successful retries.

TYPE: float

p95_delay

95th percentile delay.

TYPE: float

total_outcomes

Total tracked outcomes.

TYPE: int

AdaptiveRetry

AdaptiveRetry(
    *,
    min_delay: float = 0.1,
    max_delay: float = 60.0,
    window_size: int = 50,
    max_attempts: int = 3,
)

Retry strategy that learns optimal delays from outcomes.

Maintains per-attempt-level statistics and returns the delay that historically led to successful retries at that attempt level.

PARAMETER DESCRIPTION
min_delay

Minimum delay in seconds.

TYPE: float DEFAULT: 0.1

max_delay

Maximum delay in seconds.

TYPE: float DEFAULT: 60.0

window_size

Number of recent outcomes to track.

TYPE: int DEFAULT: 50

max_attempts

Default max attempts for to_retry_config().

TYPE: int DEFAULT: 3

Example

ar = AdaptiveRetry(min_delay=0.1, max_delay=30.0) ar.record_outcome(attempt=1, success=True, elapsed=0.5) delay = ar.get_delay(attempt=1)

Initialize the adaptive retry.

PARAMETER DESCRIPTION
min_delay

Minimum delay.

TYPE: float DEFAULT: 0.1

max_delay

Maximum delay.

TYPE: float DEFAULT: 60.0

window_size

Rolling window size.

TYPE: int DEFAULT: 50

max_attempts

Default max attempts.

TYPE: int DEFAULT: 3

metrics property

metrics: RetryMetrics

Snapshot of current adaptive retry metrics.

record_outcome

record_outcome(
    attempt: int, success: bool, elapsed: float
) -> None

Record a retry outcome.

PARAMETER DESCRIPTION
attempt

Attempt number (1-indexed).

TYPE: int

success

Whether the attempt succeeded.

TYPE: bool

elapsed

Time elapsed before this attempt was made.

TYPE: float

get_delay

get_delay(attempt: int) -> float

Get the learned optimal delay for this attempt level.

If there is historical data for this attempt level, returns the median of successful delays. Otherwise uses exponential backoff with the configured bounds.

PARAMETER DESCRIPTION
attempt

Attempt number (1-indexed).

TYPE: int

RETURNS DESCRIPTION
float

Delay in seconds.

to_retry_config

to_retry_config() -> RetryConfig

Export current state as a standard RetryConfig.

Uses the learned initial delay (attempt=1) if available.

RETURNS DESCRIPTION
RetryConfig

A RetryConfig snapshot.


Bulkhead

Bulkhead pattern — concurrency isolation via semaphore.

Limits the number of concurrent executions to prevent a single failing dependency from consuming all available resources.

BulkheadFullError

BulkheadFullError(
    name: str, max_concurrent: int, max_queue: int
)

Bases: Exception

Raised when the bulkhead queue is at capacity.

Initialize BulkheadFullError.

PARAMETER DESCRIPTION
name

Bulkhead name.

TYPE: str

max_concurrent

Concurrency limit.

TYPE: int

max_queue

Queue limit.

TYPE: int

Bulkhead

Bulkhead(
    name: str = "default",
    *,
    max_concurrent: int = 10,
    max_queue: int = 50,
    timeout: float = 30.0,
)

Concurrency limiter using asyncio.Semaphore.

Limits the number of concurrent executions of a callable. Excess callers are queued up to max_queue; beyond that a BulkheadFullError is returned.

PARAMETER DESCRIPTION
name

Identifier for logging.

TYPE: str DEFAULT: 'default'

max_concurrent

Maximum concurrent executions.

TYPE: int DEFAULT: 10

max_queue

Maximum queued callers beyond concurrent limit.

TYPE: int DEFAULT: 50

timeout

Seconds to wait for a permit before timing out.

TYPE: float DEFAULT: 30.0

Example

bulk = Bulkhead("db", max_concurrent=5, max_queue=10) result = await bulk.execute(fetch_data, user_id)

Initialize the bulkhead.

PARAMETER DESCRIPTION
name

Bulkhead name.

TYPE: str DEFAULT: 'default'

max_concurrent

Concurrency limit.

TYPE: int DEFAULT: 10

max_queue

Queue limit.

TYPE: int DEFAULT: 50

timeout

Permit acquisition timeout.

TYPE: float DEFAULT: 30.0

available_permits property

available_permits: int

Number of available concurrency permits.

queued property

queued: int

Number of callers currently waiting in the queue.

active property

active: int

Number of currently executing tasks.

execute async

execute(
    fn: Callable[P, Awaitable[T]],
    *args: args,
    **kwargs: kwargs,
) -> Result[T, Exception]

Execute a callable within bulkhead limits.

PARAMETER DESCRIPTION
fn

Async callable to execute.

TYPE: Callable[P, Awaitable[T]]

*args

Positional arguments for fn.

TYPE: args DEFAULT: ()

**kwargs

Keyword arguments for fn.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
Result[T, Exception]

Ok(result) on success, Err on failure.


Orchestrator

Resilience Orchestrator — compose multiple patterns into a pipeline.

Provides a fluent builder to combine bulkhead, circuit breaker, retry, timeout, and fallback into a single execution pipeline.

Execution order: bulkhead → circuit breaker → retry → timeout → fn → fallback.

ResilienceOrchestrator

ResilienceOrchestrator(name: str = 'default')

Bases: Generic[T]

Compose resilience patterns into a single pipeline.

Provides a fluent builder API to add patterns in order. Execution proceeds through each configured layer.

PARAMETER DESCRIPTION
name

Pipeline name for logging.

TYPE: str DEFAULT: 'default'

Example

orch = ( ... ResilienceOrchestrator("api") ... .with_bulkhead(max_concurrent=5) ... .with_circuit_breaker(breaker) ... .with_retry(RetryConfig(max_attempts=3)) ... .with_timeout(10.0) ... .with_fallback({"status": "cached"}) ... ) result = await orch.execute(call_api, endpoint)

Initialize the orchestrator.

PARAMETER DESCRIPTION
name

Pipeline name.

TYPE: str DEFAULT: 'default'

with_bulkhead

with_bulkhead(
    max_concurrent: int = 10,
    max_queue: int = 50,
    timeout: float = 30.0,
) -> ResilienceOrchestrator[T]

Add a bulkhead concurrency limiter.

PARAMETER DESCRIPTION
max_concurrent

Max concurrent executions.

TYPE: int DEFAULT: 10

max_queue

Max queued callers.

TYPE: int DEFAULT: 50

timeout

Permit acquisition timeout.

TYPE: float DEFAULT: 30.0

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_circuit_breaker

with_circuit_breaker(
    breaker: CircuitBreaker | AdaptiveCircuitBreaker,
) -> ResilienceOrchestrator[T]

Add a circuit breaker.

PARAMETER DESCRIPTION
breaker

Standard or adaptive circuit breaker.

TYPE: CircuitBreaker | AdaptiveCircuitBreaker

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_retry

with_retry(
    config: RetryConfig | AdaptiveRetry,
) -> ResilienceOrchestrator[T]

Add retry logic.

PARAMETER DESCRIPTION
config

Standard retry config or adaptive retry.

TYPE: RetryConfig | AdaptiveRetry

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_timeout

with_timeout(seconds: float) -> ResilienceOrchestrator[T]

Add a timeout.

PARAMETER DESCRIPTION
seconds

Maximum execution time.

TYPE: float

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

with_fallback

with_fallback(value: T) -> ResilienceOrchestrator[T]

Add a fallback value for failures.

PARAMETER DESCRIPTION
value

Value to return on failure.

TYPE: T

RETURNS DESCRIPTION
ResilienceOrchestrator[T]

self for chaining.

execute async

execute(
    fn: Callable[P, Awaitable[T]],
    *args: args,
    **kwargs: kwargs,
) -> Result[T, Exception]

Execute the function through the resilience pipeline.

Order: bulkhead → circuit breaker → retry → timeout → fn → fallback.

PARAMETER DESCRIPTION
fn

Async callable to execute.

TYPE: Callable[P, Awaitable[T]]

*args

Positional arguments.

TYPE: args DEFAULT: ()

**kwargs

Keyword arguments.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
Result[T, Exception]

Ok(result) on success, Err on failure.


Watchdogs

Watchdog sub-package for TaipanStack resilience.

Provides background monitors that proactively detect and respond to system degradation: resource pressure, configuration drift, and dependency failures.

BaseWatcher

BaseWatcher(*, interval: float = 5.0)

Bases: ABC

Abstract base for background watchdog tasks.

Subclasses implement _run which is called repeatedly at _interval seconds until stop is called.

PARAMETER DESCRIPTION
interval

Seconds between each poll cycle.

TYPE: float DEFAULT: 5.0

Example

class MyWatcher(BaseWatcher): ... async def _run(self) -> None: ... print("checking...") watcher = MyWatcher(interval=5.0) await watcher.start()

Initialize the base watcher.

PARAMETER DESCRIPTION
interval

Seconds between each poll cycle.

TYPE: float DEFAULT: 5.0

is_running property

is_running: bool

Return True if the background task is active.

start async

start() -> Result[None, Exception]

Start the background watcher loop.

RETURNS DESCRIPTION
Result[None, Exception]

Ok(None) on success, Err if already running.

stop async

stop() -> None

Signal the watcher to stop and wait for it to finish.

__repr__

__repr__() -> str

Return a developer-friendly representation.

ConfigWatcher

ConfigWatcher(
    *,
    config_paths: Sequence[Path],
    config_model: type[BaseModel],
    interval: float = 2.0,
    on_config_change: Callable[[BaseModel], None]
    | None = None,
    on_validation_error: Callable[[Exception], None]
    | None = None,
)

Bases: BaseWatcher

Background watcher that detects configuration file changes.

Polls file hashes at each interval. When a change is detected the content is validated via the provided Pydantic model and, if valid, the on_config_change callback is invoked.

PARAMETER DESCRIPTION
config_paths

Files to watch.

TYPE: Sequence[Path]

config_model

Pydantic model for validation.

TYPE: type[BaseModel]

interval

Seconds between polls.

TYPE: float DEFAULT: 2.0

on_config_change

Callback receiving the validated model.

TYPE: Callable[[BaseModel], None] | None DEFAULT: None

on_validation_error

Callback receiving the Exception when validation fails.

TYPE: Callable[[Exception], None] | None DEFAULT: None

Example

watcher = ConfigWatcher( ... config_paths=[Path(".env")], ... config_model=MySettings, ... on_config_change=lambda cfg: apply(cfg), ... ) await watcher.start()

Initialize the config watcher.

PARAMETER DESCRIPTION
config_paths

Files to watch.

TYPE: Sequence[Path]

config_model

Pydantic model for validation.

TYPE: type[BaseModel]

interval

Seconds between polls.

TYPE: float DEFAULT: 2.0

on_config_change

Callback for valid config changes.

TYPE: Callable[[BaseModel], None] | None DEFAULT: None

on_validation_error

Callback for validation failures.

TYPE: Callable[[Exception], None] | None DEFAULT: None

HealthPinger

HealthPinger(
    *,
    targets: Sequence[HealthTarget],
    interval: float = 10.0,
    on_health_change: Callable[[str, bool], None]
    | None = None,
)

Bases: BaseWatcher

Background watcher that pings external dependencies.

For each registered HealthTarget, calls its check coroutine on every cycle. If a target is unhealthy and has an associated CircuitBreaker, the breaker is opened preventively.

PARAMETER DESCRIPTION
targets

Dependencies to monitor.

TYPE: Sequence[HealthTarget]

interval

Seconds between ping cycles.

TYPE: float DEFAULT: 10.0

on_health_change

Optional callback (name, is_healthy).

TYPE: Callable[[str, bool], None] | None DEFAULT: None

Example

async def db_ping() -> bool: ... return await pool.fetchval("SELECT 1") == 1 pinger = HealthPinger( ... targets=[HealthTarget("db", db_ping, breaker)], ... ) await pinger.start()

Initialize the health pinger.

PARAMETER DESCRIPTION
targets

Dependencies to monitor.

TYPE: Sequence[HealthTarget]

interval

Seconds between ping cycles.

TYPE: float DEFAULT: 10.0

on_health_change

Optional callback on status change.

TYPE: Callable[[str, bool], None] | None DEFAULT: None

HealthTarget dataclass

HealthTarget(
    name: str,
    check: Callable[[], Awaitable[bool]],
    circuit_breaker: CircuitBreaker | None = None,
)

A dependency to be monitored by :class:HealthPinger.

ATTRIBUTE DESCRIPTION
name

Human-readable name for logging.

TYPE: str

check

Async callable returning True if the target is healthy, False otherwise.

TYPE: Callable[[], Awaitable[bool]]

circuit_breaker

Optional circuit breaker to open on failure.

TYPE: CircuitBreaker | None

ResourceSnapshot dataclass

ResourceSnapshot(
    cpu_percent: float,
    memory_percent: float,
    timestamp: float,
)

Point-in-time snapshot of system resource usage.

ATTRIBUTE DESCRIPTION
cpu_percent

Current CPU utilisation (0-100).

TYPE: float

memory_percent

Current memory utilisation (0-100).

TYPE: float

timestamp

Monotonic timestamp of the reading.

TYPE: float

ResourceWatcher

ResourceWatcher(
    *,
    interval: float = 5.0,
    cpu_threshold: float = 85.0,
    memory_threshold: float = 85.0,
    on_threshold_breach: Callable[[str, float], None]
    | None = None,
)

Bases: BaseWatcher

Background watcher that monitors CPU and memory.

When either metric exceeds its configured threshold the on_threshold_breach callback is invoked with the resource name ("cpu" or "memory") and the current value.

PARAMETER DESCRIPTION
interval

Seconds between checks.

TYPE: float DEFAULT: 5.0

cpu_threshold

CPU percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

memory_threshold

Memory percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

on_threshold_breach

Optional callback (resource, value) -> None.

TYPE: Callable[[str, float], None] | None DEFAULT: None

Example

watcher = ResourceWatcher( ... cpu_threshold=80.0, ... on_threshold_breach=lambda r, v: print(f"{r} at {v}%"), ... ) await watcher.start()

Initialize the resource watcher.

PARAMETER DESCRIPTION
interval

Seconds between checks.

TYPE: float DEFAULT: 5.0

cpu_threshold

CPU percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

memory_threshold

Memory percentage that triggers a breach.

TYPE: float DEFAULT: 85.0

on_threshold_breach

Optional breach callback.

TYPE: Callable[[str, float], None] | None DEFAULT: None

start async

start() -> Result[None, Exception]

Start the resource watcher.

RETURNS DESCRIPTION
Result[None, Exception]

Err if psutil is not installed, otherwise delegates

Result[None, Exception]

to BaseWatcher.start().

validate_config

validate_config(
    data: dict[str, object], model: type[BaseModel]
) -> Result[BaseModel, Exception]

Validate a data dictionary against a Pydantic model.

PARAMETER DESCRIPTION
data

Raw configuration data.

TYPE: dict[str, object]

model

Pydantic model class to validate against.

TYPE: type[BaseModel]

RETURNS DESCRIPTION
Result[BaseModel, Exception]

Ok(model_instance) on success, Err(ValidationError)

Result[BaseModel, Exception]

on failure.

check_all async

check_all(
    targets: Sequence[HealthTarget],
) -> Result[dict[str, bool], Exception]

Run health checks for all targets concurrently.

PARAMETER DESCRIPTION
targets

Targets to check.

TYPE: Sequence[HealthTarget]

RETURNS DESCRIPTION
Result[dict[str, bool], Exception]

Ok(dict) mapping target names to health status.

check_target async

check_target(
    target: HealthTarget,
) -> Result[bool, Exception]

Run a single health check.

PARAMETER DESCRIPTION
target

The target to check.

TYPE: HealthTarget

RETURNS DESCRIPTION
Result[bool, Exception]

Ok(True) if healthy, Ok(False) if unhealthy,

Result[bool, Exception]

Err if the check itself raises.

check_resources

check_resources() -> Result[ResourceSnapshot, Exception]

Take a one-shot resource reading.

RETURNS DESCRIPTION
Result[ResourceSnapshot, Exception]

Ok(ResourceSnapshot) on success, Err if psutil is

Result[ResourceSnapshot, Exception]

unavailable.