Skip to content

Utilities — Retry, Circuit Breaker & Metrics

TaipanStack provides resilience and observability utilities for production applications.


Retry

Retry logic with exponential backoff.

Provides decorators for automatic retry of failing operations with configurable backoff strategies. Compatible with any Python framework (sync and async).

RetryDecorator

Bases: Protocol

Protocol for the retry decorator.

RetryConfig dataclass

RetryConfig(
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    jitter_factor: float = 0.1,
    log_retries: bool = True,
    on_retry: Callable[[int, int, Exception, float], None]
    | None = None,
)

Configuration for retry behavior.

ATTRIBUTE DESCRIPTION
max_attempts

Maximum number of retry attempts.

TYPE: int

initial_delay

Initial delay between retries in seconds.

TYPE: float

max_delay

Maximum delay between retries.

TYPE: float

exponential_base

Base for exponential backoff (2 = double each time).

TYPE: float

jitter

Whether to add random jitter to delays.

TYPE: bool

jitter_factor

Maximum jitter as fraction of delay (0.1 = 10%).

TYPE: float

log_retries

Whether to emit standard log messages.

TYPE: bool

on_retry

Optional callback invoked on each retry.

TYPE: Callable[[int, int, Exception, float], None] | None

RetryError

RetryError(
    message: str,
    attempts: int,
    last_exception: Exception | None = None,
)

Bases: Exception

Raised when all retry attempts have failed.

Initialize RetryError.

PARAMETER DESCRIPTION
message

Description of the retry failure.

TYPE: str

attempts

Number of attempts made.

TYPE: int

last_exception

The last exception that was raised.

TYPE: Exception | None DEFAULT: None

Retrier

Retrier(
    *,
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    on: tuple[type[Exception], ...] = (Exception,),
)

Context manager for retry logic.

Provides a context manager interface for retry logic when decorators are not suitable.

Example

retrier = Retrier(max_attempts=3, on=(ConnectionError,)) with retrier: ... result = some_operation()

Initialize Retrier.

PARAMETER DESCRIPTION
max_attempts

Maximum retry attempts.

TYPE: int DEFAULT: 3

initial_delay

Initial delay between retries.

TYPE: float DEFAULT: 1.0

max_delay

Maximum delay between retries.

TYPE: float DEFAULT: 60.0

on

Exception types to retry on.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

__enter__

__enter__() -> Retrier

Enter the retry context.

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> bool

Exit the retry context.

Returns True to suppress the exception if we should retry, False to let it propagate.

calculate_delay

calculate_delay(attempt: int, config: RetryConfig) -> float

Calculate delay before next retry.

PARAMETER DESCRIPTION
attempt

Current attempt number (1-indexed).

TYPE: int

config

Retry configuration.

TYPE: RetryConfig

RETURNS DESCRIPTION
float

Delay in seconds before next retry.

retry

retry(
    *,
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    on: tuple[type[Exception], ...] = (Exception,),
    reraise: bool = True,
    log_retries: bool = True,
    on_retry: Callable[[int, int, Exception, float], None]
    | None = None,
) -> RetryDecorator

Retry a sync or async function with exponential backoff.

Automatically retries the decorated function when specified exceptions are raised, with configurable backoff strategy. Detects coroutine functions and preserves their async nature.

PARAMETER DESCRIPTION
max_attempts

Maximum number of retry attempts.

TYPE: int DEFAULT: 3

initial_delay

Initial delay between retries in seconds.

TYPE: float DEFAULT: 1.0

max_delay

Maximum delay between retries.

TYPE: float DEFAULT: 60.0

exponential_base

Base for exponential backoff.

TYPE: float DEFAULT: 2.0

jitter

Whether to add random jitter to delays.

TYPE: bool DEFAULT: True

on

Exception types to retry on.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

reraise

Whether to reraise the last exception on failure.

TYPE: bool DEFAULT: True

log_retries

Whether to log retry attempts.

TYPE: bool DEFAULT: True

on_retry

Optional callback invoked on each retry with (attempt, max_attempts, exception, delay). Useful for custom monitoring or metrics collection.

TYPE: Callable[[int, int, Exception, float], None] | None DEFAULT: None

RETURNS DESCRIPTION
RetryDecorator

Decorated function with retry logic.

Example

@retry(max_attempts=3, on=(ConnectionError, TimeoutError)) ... def fetch_data(url: str) -> dict: ... return requests.get(url, timeout=10).json()

@retry(max_attempts=3, on_retry=lambda a, m, e, d: print(f"Retry {a}/{m}")) ... def fragile_operation() -> str: ... return do_something()

retry_on_exception

retry_on_exception(
    exception_types: tuple[type[Exception], ...],
    max_attempts: int = 3,
) -> RetryDecorator

Retry on specific exceptions.

A simpler alternative to the full retry decorator when you just need basic retry functionality.

PARAMETER DESCRIPTION
exception_types

Exception types to retry on.

TYPE: tuple[type[Exception], ...]

max_attempts

Maximum number of attempts.

TYPE: int DEFAULT: 3

RETURNS DESCRIPTION
RetryDecorator

Decorated function with retry logic.

Example

@retry_on_exception((ValueError,), max_attempts=2) ... def parse_data(data: str) -> dict: ... return json.loads(data)


Circuit Breaker

Circuit Breaker pattern implementation.

Provides protection against cascading failures by temporarily blocking calls to a failing service. Compatible with any Python framework (sync and async).

CircuitBreakerDecorator

Bases: Protocol

Protocol for the circuit breaker decorator.

CircuitState

Bases: Enum

States of the circuit breaker.

CircuitBreakerError

CircuitBreakerError(message: str, state: CircuitState)

Bases: Exception

Raised when circuit breaker is open.

Initialize CircuitBreakerError.

PARAMETER DESCRIPTION
message

Error description.

TYPE: str

state

Current circuit state.

TYPE: CircuitState

CircuitBreakerConfig dataclass

CircuitBreakerConfig(
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
)

Configuration for circuit breaker behavior.

ATTRIBUTE DESCRIPTION
failure_threshold

Number of failures before opening circuit.

TYPE: int

success_threshold

Successes needed in half-open to close.

TYPE: int

timeout

Seconds before trying half-open after open.

TYPE: float

excluded_exceptions

Exceptions that don't count as failures.

TYPE: tuple[type[Exception], ...]

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...]

CircuitBreakerState dataclass

CircuitBreakerState(
    state: CircuitState = CLOSED,
    failure_count: int = 0,
    success_count: int = 0,
    half_open_attempts: int = 0,
    last_failure_time: float = 0.0,
    lock: Lock = Lock(),
)

Internal state tracking for circuit breaker.

CircuitBreaker

CircuitBreaker(
    *,
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
    name: str = "default",
    on_state_change: Callable[
        [CircuitState, CircuitState], None
    ]
    | None = None,
)

Circuit breaker implementation.

Monitors function calls and opens the circuit when too many failures occur, preventing further calls until the service recovers. Supports both sync and async functions.

Example

breaker = CircuitBreaker(failure_threshold=3) @breaker ... def call_external_api(): ... return requests.get("https://api.example.com", timeout=10)

Initialize CircuitBreaker.

PARAMETER DESCRIPTION
failure_threshold

Failures before opening circuit.

TYPE: int DEFAULT: 5

success_threshold

Successes to close from half-open.

TYPE: int DEFAULT: 2

timeout

Seconds before attempting half-open.

TYPE: float DEFAULT: 30.0

excluded_exceptions

Exceptions that don't trip circuit.

TYPE: tuple[type[Exception], ...] DEFAULT: ()

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

name

Name for logging/identification.

TYPE: str DEFAULT: 'default'

on_state_change

Optional callback invoked on state transitions with (old_state, new_state). Useful for custom monitoring.

TYPE: Callable[[CircuitState, CircuitState], None] | None DEFAULT: None

state property

state: CircuitState

Get current circuit state.

failure_count property

failure_count: int

Get current failure count.

reset

reset() -> None

Reset circuit breaker to closed state.

__call__

__call__(
    func: Callable[P, R]
    | Callable[P, Coroutine[Any, Any, R]],
) -> (
    Callable[P, R] | Callable[P, Coroutine[Any, Any, R]]
)

Decorate a sync or async function with circuit breaker protection.

circuit_breaker

circuit_breaker(
    *,
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
    name: str | None = None,
    on_state_change: Callable[
        [CircuitState, CircuitState], None
    ]
    | None = None,
) -> CircuitBreakerDecorator

Decorate a sync or async function with circuit breaker pattern.

PARAMETER DESCRIPTION
failure_threshold

Failures before opening circuit.

TYPE: int DEFAULT: 5

success_threshold

Successes to close from half-open.

TYPE: int DEFAULT: 2

timeout

Seconds before attempting half-open.

TYPE: float DEFAULT: 30.0

excluded_exceptions

Exceptions that don't trip circuit.

TYPE: tuple[type[Exception], ...] DEFAULT: ()

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

name

Optional name for the circuit.

TYPE: str | None DEFAULT: None

on_state_change

Optional callback invoked on state transitions with (old_state, new_state).

TYPE: Callable[[CircuitState, CircuitState], None] | None DEFAULT: None

RETURNS DESCRIPTION
CircuitBreakerDecorator

Decorated function with circuit breaker protection.

Example

@circuit_breaker(failure_threshold=3, timeout=60) ... def call_api(endpoint: str) -> dict: ... return requests.get(endpoint, timeout=10).json()

@circuit_breaker( ... failure_threshold=3, ... on_state_change=lambda old, new: print(f"{old} -> {new}"), ... ) ... def monitored_call() -> str: ... return service.call()


Metrics

Metrics collection and monitoring utilities.

Provides lightweight metrics collection for monitoring application performance and health. Compatible with any Python framework.

TimingStats dataclass

TimingStats(
    count: int = 0,
    total_time: float = 0.0,
    min_time: float = float("inf"),
    max_time: float = 0.0,
)

Statistics for timing measurements.

avg_time property

avg_time: float

Calculate average time.

record

record(duration: float) -> None

Record a timing measurement.

TimerStats

Bases: TypedDict

Statistics dictionary for timing measurements.

MetricsSnapshot

Bases: TypedDict

Snapshot of all collected metrics.

Counter

Counter()

Simple counter metric.

Initialize the counter.

increment

increment(amount: int = 1) -> int

Increment counter and return new value.

decrement

decrement(amount: int = 1) -> int

Decrement counter and return new value.

reset

reset() -> None

Reset counter to zero.

MetricsCollector

MetricsCollector()

Centralized metrics collection.

Thread-safe collector for various metric types including counters, timers, and gauges.

Example

metrics = MetricsCollector() metrics.increment("requests_total") with metrics.timer("request_duration"): ... process_request()

Initialize metrics collector.

__new__

__new__() -> MetricsCollector

Singleton pattern for global metrics access.

increment

increment(name: str, amount: int = 1) -> int

Increment a counter metric.

decrement

decrement(name: str, amount: int = 1) -> int

Decrement a counter metric.

gauge

gauge(name: str, value: float) -> None

Set a gauge metric value.

get_gauge

get_gauge(name: str) -> float | None

Get a gauge metric value.

record_time

record_time(name: str, duration: float) -> None

Record a timing measurement.

timer

timer(name: str) -> Timer

Create a context manager timer.

get_counter

get_counter(name: str) -> int

Get current counter value.

get_timer_stats

get_timer_stats(name: str) -> TimingStats | None

Get timing statistics for a named timer.

get_all_metrics

get_all_metrics() -> MetricsSnapshot

Get all metrics as a dictionary.

reset

reset() -> None

Reset all metrics.

Timer

Timer(name: str, collector: MetricsCollector)

Context manager for timing code blocks.

Initialize timer.

PARAMETER DESCRIPTION
name

Name of the timer metric.

TYPE: str

collector

MetricsCollector to record to.

TYPE: MetricsCollector

__enter__

__enter__() -> Timer

Start the timer.

__exit__

__exit__(*args: object) -> None

Stop timer and record duration.

timed

timed(
    name: str | None = None,
    *,
    collector: MetricsCollector | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]

Decorator to time function execution.

PARAMETER DESCRIPTION
name

Optional metric name (defaults to function name).

TYPE: str | None DEFAULT: None

collector

Optional MetricsCollector instance.

TYPE: MetricsCollector | None DEFAULT: None

RETURNS DESCRIPTION
Callable[[Callable[P, R]], Callable[P, R]]

Decorated function that records execution time.

Example

@timed("api_call_duration") ... def call_api(endpoint: str) -> dict: ... return requests.get(endpoint, timeout=10).json()

counted

counted(
    name: str | None = None,
    *,
    collector: MetricsCollector | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]

Decorator to count function calls.

PARAMETER DESCRIPTION
name

Optional metric name (defaults to function name).

TYPE: str | None DEFAULT: None

collector

Optional MetricsCollector instance.

TYPE: MetricsCollector | None DEFAULT: None

RETURNS DESCRIPTION
Callable[[Callable[P, R]], Callable[P, R]]

Decorated function that counts calls.

Example

@counted("login_attempts") ... def login(username: str, password: str) -> bool: ... return authenticate(username, password)


Resilience

Resilience decorators.

Provides tools for graceful fallback and timeouts using the Result monad.

FallbackDecorator

Bases: Protocol

Protocol for the fallback decorator.

TimeoutDecorator

Bases: Protocol

Protocol for the timeout decorator.

fallback

fallback(
    fallback_value: T,
    exceptions: tuple[type[Exception], ...] = (Exception,),
) -> FallbackDecorator

Provide a fallback value on failures.

If the wrapped function returns an Err() or raises a specified exception, the fallback value is returned wrapped in an Ok().

PARAMETER DESCRIPTION
fallback_value

The value to return on failure.

TYPE: T

exceptions

Exceptions to catch.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

RETURNS DESCRIPTION
FallbackDecorator

Decorator function.

timeout

timeout(seconds: float) -> TimeoutDecorator

Enforce a maximum execution time.

If the execution time exceeds the specified limit, returns Err(TimeoutError).

PARAMETER DESCRIPTION
seconds

Maximum allowed execution time in seconds.

TYPE: float

RETURNS DESCRIPTION
TimeoutDecorator

Decorator function.


Cache

Intelligent Cache decorator.

Provides in-memory caching that respects the Result monad and TTL, ignoring caching for Err() results.

CacheDecorator

Bases: Protocol

Protocol for the cache decorator.

cached

cached(ttl: float) -> CacheDecorator

Cache the Ok() results of a function for a given TTL.

Err() results are not cached. Supports both async and sync functions.

PARAMETER DESCRIPTION
ttl

Time to live in seconds.

TYPE: float

RETURNS DESCRIPTION
CacheDecorator

Decorator function.


Context

Observability Context Module.

Provides context variables and context managers for tracing and observability, such as the correlation ID.

get_correlation_id

get_correlation_id() -> str | None

Get the current correlation ID.

RETURNS DESCRIPTION
str | None

The correlation ID if set, otherwise None.

set_correlation_id

set_correlation_id(correlation_id: str | None) -> None

Set the correlation ID for the current context.

PARAMETER DESCRIPTION
correlation_id

The correlation ID string, or None to clear.

TYPE: str | None

correlation_scope

correlation_scope(
    correlation_id: str | None,
) -> Iterator[None]

Context manager to set the correlation ID and restore it after.

PARAMETER DESCRIPTION
correlation_id

The correlation ID to set for the duration of the scope.

TYPE: str | None

YIELDS DESCRIPTION
None

None


Concurrency

Concurrency utilities.

Provides a bulkhead pattern concurrency limiter decorator for both synchronous and asynchronous functions. Uses an OverloadError and returns a Result type.

OverloadError

OverloadError(message: str = 'Concurrency limit reached')

Bases: Exception

Exception raised when a concurrency limit is exceeded or timed out.

Initialize the OverloadError.

PARAMETER DESCRIPTION
message

The error message to display. Defaults to "Concurrency limit reached".

TYPE: str DEFAULT: 'Concurrency limit reached'

ConcurrencyLimitDecorator

Bases: Protocol

Protocol for the concurrency limit decorator.

limit_concurrency

limit_concurrency(
    max_tasks: int, timeout: float = 0.0
) -> ConcurrencyLimitDecorator

Decorate a function to apply the bulkhead concurrency limit pattern.

If the maximum concurrent executions are reached, the wrapper will wait up to timeout seconds to acquire a execution slot. If it fails, it returns an Err(OverloadError).

PARAMETER DESCRIPTION
max_tasks

Maximum concurrent function executions allowed.

TYPE: int

timeout

Maximum time in seconds to wait for a slot if limit is reached.

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
ConcurrencyLimitDecorator

Decorated function returning a Result[T, OverloadError].

Example

@limit_concurrency(max_tasks=2, timeout=0.1) ... def process_data() -> str: ... return "data" process_data() Ok('data')


Filesystem

Safe filesystem operations.

Provides secure wrappers around file operations with path validation, atomic writes, and proper error handling using Result types.

FileNotFoundErr dataclass

FileNotFoundErr(path: Path, message: str = '')

Error when file is not found.

__post_init__

__post_init__() -> None

Set default message.

NotAFileErr dataclass

NotAFileErr(path: Path, message: str = '')

Error when path is not a file.

__post_init__

__post_init__() -> None

Set default message.

FileTooLargeErr dataclass

FileTooLargeErr(
    path: Path, size: int, max_size: int, message: str = ""
)

Error when file exceeds size limit.

__post_init__

__post_init__() -> None

Set default message.

WriteOptions dataclass

WriteOptions(
    base_dir: Path | str | None = None,
    encoding: str = "utf-8",
    create_parents: bool = True,
    backup: bool = True,
    atomic: bool = True,
)

Options for safe_write.

ATTRIBUTE DESCRIPTION
base_dir

Base directory to constrain to.

TYPE: Path | str | None

encoding

File encoding.

TYPE: str

create_parents

Create parent directories if needed.

TYPE: bool

backup

Create backup of existing file.

TYPE: bool

atomic

Use atomic write.

TYPE: bool

safe_read

safe_read(
    path: Path | str,
    *,
    base_dir: Path | str | None = None,
    encoding: str = "utf-8",
    max_size_bytes: int | None = 10 * 1024 * 1024,
) -> Result[str, ReadFileError]

Read a file safely with path validation.

PARAMETER DESCRIPTION
path

Path to the file to read.

TYPE: Path | str

base_dir

Base directory to constrain to.

TYPE: Path | str | None DEFAULT: None

encoding

File encoding.

TYPE: str DEFAULT: 'utf-8'

max_size_bytes

Maximum file size to read (None for no limit).

TYPE: int | None DEFAULT: 10 * 1024 * 1024

RETURNS DESCRIPTION
Ok

File contents on success.

TYPE: str

Err

Error details on failure.

TYPE: ReadFileError

Example

match safe_read("config.json"): ... case Ok(content): ... data = json.loads(content) ... case Err(FileNotFoundErr(path=p)): ... print(f"Missing: {p}") ... case Err(FileTooLargeErr(size=s)): ... print(f"Too big: {s} bytes")

safe_write

safe_write(
    path: Path | str,
    content: str,
    *,
    options: WriteOptions | None = None,
) -> Path

Write to a file safely with path validation.

PARAMETER DESCRIPTION
path

Path to write to.

TYPE: Path | str

content

Content to write.

TYPE: str

options

Write options.

TYPE: WriteOptions | None DEFAULT: None

RETURNS DESCRIPTION
Path

Path to the written file.

RAISES DESCRIPTION
SecurityError

If path validation fails.

ensure_dir

ensure_dir(
    path: Path | str,
    *,
    base_dir: Path | str | None = None,
    mode: int = 493,
) -> Path

Ensure a directory exists, creating it if needed.

PARAMETER DESCRIPTION
path

Path to the directory.

TYPE: Path | str

base_dir

Base directory to constrain to.

TYPE: Path | str | None DEFAULT: None

mode

Directory permissions.

TYPE: int DEFAULT: 493

RETURNS DESCRIPTION
Path

Path to the directory.

RAISES DESCRIPTION
SecurityError

If path validation fails.

safe_copy

safe_copy(
    src: Path | str,
    dst: Path | str,
    *,
    base_dir: Path | str | None = None,
    overwrite: bool = False,
) -> Path

Copy a file safely.

PARAMETER DESCRIPTION
src

Source file path.

TYPE: Path | str

dst

Destination file path.

TYPE: Path | str

base_dir

Base directory to constrain both paths to.

TYPE: Path | str | None DEFAULT: None

overwrite

Allow overwriting existing file.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Path

Path to the destination file.

RAISES DESCRIPTION
SecurityError

If path validation fails.

FileExistsError

If destination exists and overwrite=False.

safe_delete

safe_delete(
    path: Path | str,
    *,
    base_dir: Path | str | None = None,
    missing_ok: bool = True,
    recursive: bool = False,
) -> bool

Delete a file or directory safely.

PARAMETER DESCRIPTION
path

Path to delete.

TYPE: Path | str

base_dir

Base directory to constrain to.

TYPE: Path | str | None DEFAULT: None

missing_ok

Don't raise if path doesn't exist.

TYPE: bool DEFAULT: True

recursive

Allow deleting directories recursively.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
bool

True if something was deleted.

RAISES DESCRIPTION
SecurityError

If path validation fails.

FileNotFoundError

If path doesn't exist and missing_ok=False.

get_file_hash

get_file_hash(
    path: Path | str,
    *,
    algorithm: str = "sha256",
    base_dir: Path | str | None = None,
) -> str

Get hash of a file.

PARAMETER DESCRIPTION
path

Path to the file.

TYPE: Path | str

algorithm

Hash algorithm (sha256, sha512, etc).

TYPE: str DEFAULT: 'sha256'

base_dir

Base directory to constrain to.

TYPE: Path | str | None DEFAULT: None

RETURNS DESCRIPTION
str

Hex digest of the file hash.

find_files

find_files(
    directory: Path | str,
    pattern: str = "*",
    *,
    base_dir: Path | str | None = None,
    recursive: bool = True,
    include_hidden: bool = False,
) -> list[Path]

Find files matching a pattern.

PARAMETER DESCRIPTION
directory

Directory to search in.

TYPE: Path | str

pattern

Glob pattern to match.

TYPE: str DEFAULT: '*'

base_dir

Base directory to constrain to.

TYPE: Path | str | None DEFAULT: None

recursive

Search recursively.

TYPE: bool DEFAULT: True

include_hidden

Include hidden files (starting with .).

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
list[Path]

List of matching file paths.


Logging

Structured logging with context.

Provides a configured logger with support for structured output, context propagation, and proper formatting.

StackLogger

StackLogger(
    name: str = "stack",
    level: str = "INFO",
    *,
    use_structured: bool = False,
)

Enhanced logger with context support.

Provides a wrapper around standard logging with additional features like context propagation and structured output support.

ATTRIBUTE DESCRIPTION
name

Logger name.

level

Current log level.

Initialize the logger.

PARAMETER DESCRIPTION
name

Logger name.

TYPE: str DEFAULT: 'stack'

level

Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

TYPE: str DEFAULT: 'INFO'

use_structured

Use structured logging if structlog is available.

TYPE: bool DEFAULT: False

bind

bind(**context: Any) -> StackLogger

Add context to logger.

PARAMETER DESCRIPTION
**context

Key-value pairs to add to context.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
StackLogger

Self for chaining.

unbind

unbind(*keys: str) -> StackLogger

Remove context keys.

PARAMETER DESCRIPTION
*keys

Keys to remove from context.

TYPE: str DEFAULT: ()

RETURNS DESCRIPTION
StackLogger

Self for chaining.

debug

debug(message: str, **kwargs: Any) -> None

Log a debug message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: Any DEFAULT: {}

info

info(message: str, **kwargs: Any) -> None

Log an info message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: Any DEFAULT: {}

warning

warning(message: str, **kwargs: Any) -> None

Log a warning message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: Any DEFAULT: {}

error

error(message: str, **kwargs: Any) -> None

Log an error message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: Any DEFAULT: {}

critical

critical(message: str, **kwargs: Any) -> None

Log a critical message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: Any DEFAULT: {}

exception

exception(message: str, **kwargs: Any) -> None

Log an exception with traceback.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: Any DEFAULT: {}

mask_sensitive_data_processor

mask_sensitive_data_processor(
    _logger: Any,
    _method: str,
    event_dict: MutableMapping[str, Any],
) -> MutableMapping[str, Any]

Mask sensitive data in structlog event dictionaries.

Intercept the event_dict produced by structlog and replace the value of any key whose name contains a sensitive substring with "***REDACTED***". Matching is case-insensitive.

PARAMETER DESCRIPTION
_logger

The wrapped logger object (unused, required by structlog).

TYPE: Any

_method

The name of the log method called (unused, required by structlog).

TYPE: str

event_dict

The structured event dictionary.

TYPE: MutableMapping[str, Any]

RETURNS DESCRIPTION
MutableMapping[str, Any]

The event dictionary with sensitive values masked.

correlation_id_processor

correlation_id_processor(
    _logger: Any,
    _method: str,
    event_dict: MutableMapping[str, Any],
) -> MutableMapping[str, Any]

Structlog processor to inject correlation ID into events.

PARAMETER DESCRIPTION
_logger

The wrapped logger object.

TYPE: Any

_method

The name of the log method called.

TYPE: str

event_dict

The structured event dictionary.

TYPE: MutableMapping[str, Any]

RETURNS DESCRIPTION
MutableMapping[str, Any]

The event dictionary with correlation_id injected if sets.

setup_logging

setup_logging(
    level: Literal[
        "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
    ] = "INFO",
    *,
    format_type: Literal[
        "simple", "detailed", "json"
    ] = "detailed",
    log_file: str | None = None,
    use_structured: bool = False,
) -> None

Configure the root logger.

PARAMETER DESCRIPTION
level

Log level to set.

TYPE: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] DEFAULT: 'INFO'

format_type

Output format type.

TYPE: Literal['simple', 'detailed', 'json'] DEFAULT: 'detailed'

log_file

Optional file to log to.

TYPE: str | None DEFAULT: None

use_structured

Use structlog if available.

TYPE: bool DEFAULT: False

get_logger

get_logger(
    name: str = "stack",
    *,
    level: str = "INFO",
    use_structured: bool = False,
) -> StackLogger

Get a configured logger instance.

PARAMETER DESCRIPTION
name

Logger name.

TYPE: str DEFAULT: 'stack'

level

Log level.

TYPE: str DEFAULT: 'INFO'

use_structured

Use structlog if available.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
StackLogger

Configured StackLogger instance.

Example

logger = get_logger("my_module") logger.bind(request_id="123").info("Processing request")

log_operation

log_operation(
    operation: str,
    *,
    logger: StackLogger | None = None,
    level: str = "INFO",
    expected_exceptions: tuple[type[Exception], ...]
    | type[Exception] = Exception,
) -> AbstractContextManager[StackLogger]

Context manager for logging operations.

PARAMETER DESCRIPTION
operation

Name of the operation.

TYPE: str

logger

Logger to use (creates one if not provided).

TYPE: StackLogger | None DEFAULT: None

level

Log level for messages.

TYPE: str DEFAULT: 'INFO'

expected_exceptions

Exceptions to catch and log as failures.

TYPE: tuple[type[Exception], ...] | type[Exception] DEFAULT: Exception

YIELDS DESCRIPTION
AbstractContextManager[StackLogger]

The logger instance.

Example

with log_operation("setup") as logger: ... logger.info("Setting up environment")


Rate Limit

Rate limiting utilities.

Provides an in-memory token-bucket based rate limiting decorator for both synchronous and asynchronous functions. The decorator returns a Result type encapsulating the original return value or a RateLimitError error.

RateLimitError

RateLimitError(message: str = 'Rate limit exceeded')

Bases: Exception

Exception raised when a rate limit is exceeded.

Initialize the RateLimitError.

PARAMETER DESCRIPTION
message

The error message to display.Defaults to "Rate limit exceeded".

TYPE: str DEFAULT: 'Rate limit exceeded'

RateLimiter

RateLimiter(max_calls: int, time_window: float)

Token bucket rate limiter logic.

Initialize the token bucket.

PARAMETER DESCRIPTION
max_calls

The maximum number of calls allowed in the time window.

TYPE: int

time_window

The time window in seconds.

TYPE: float

consume

consume() -> bool

Try to consume a single token.

RETURNS DESCRIPTION
bool

True if a token was consumed (allow), False otherwise (limit exceeded).

RateLimitDecorator

Bases: Protocol

Protocol for the rate limit decorator.

rate_limit

rate_limit(
    max_calls: int, time_window: float
) -> RateLimitDecorator

Decorate a function to apply rate limiting.

If the rate limit is exceeded, the wrapped function immediately returns an Err(RateLimitError). Uses an in-memory token bucket strategy.

PARAMETER DESCRIPTION
max_calls

Maximum function executions allowed in the defined window.

TYPE: int

time_window

Time window size in seconds.

TYPE: float

RETURNS DESCRIPTION
RateLimitDecorator

Decorated function returning a Result[T, RateLimitError].

Example

@rate_limit(max_calls=2, time_window=1.0) ... def fetch_data() -> str: ... return "data" fetch_data() Ok('data') fetch_data() Ok('data') fetch_data() Err(RateLimitError('Rate limit exceeded'))


Serialization

Serialization utilities.

Provides an optimized default encoder for use with orjson.dumps.

default_encoder

default_encoder(obj: object) -> dict[str, object]

Default encoder for orjson.dumps handling Result types.

Intercepts objects of type Ok and Err: - Ok(value): Returns {"status": "success"} merged with value if value is a dict. Otherwise returns {"status": "success", "data": value}. - Err(error): Returns {"status": "error", "message": str(error)}.

PARAMETER DESCRIPTION
obj

The object to encode.

TYPE: object

RETURNS DESCRIPTION
dict[str, object]

A serializable representation of the object.

RAISES DESCRIPTION
TypeError

If the object type is not supported.

Example

import orjson orjson.dumps(Ok({"id": 1}), default=default_encoder) b'{"status":"success","id":1}' orjson.dumps(Err(ValueError("oops")), default=default_encoder) b'{"status":"error","message":"oops"}'


Subprocess

Safe subprocess execution with security guards.

Provides secure wrappers around subprocess execution with command validation, timeout handling, and retry logic.

SafeCommandResult dataclass

SafeCommandResult(
    command: list[str],
    returncode: int,
    stdout: str = "",
    stderr: str = "",
    duration_seconds: float = 0.0,
)

Result of a safe command execution.

ATTRIBUTE DESCRIPTION
command

The executed command.

TYPE: list[str]

returncode

Exit code of the command.

TYPE: int

stdout

Standard output.

TYPE: str

stderr

Standard error.

TYPE: str

success

Whether the command succeeded (returncode == 0).

TYPE: bool

duration_seconds

How long the command took.

TYPE: float

success property

success: bool

Check if command succeeded.

raise_on_error

raise_on_error() -> SafeCommandResult

Raise an exception if command failed.

RETURNS DESCRIPTION
SafeCommandResult

Self if successful.

RAISES DESCRIPTION
CalledProcessError

If command failed.

run_safe_command

run_safe_command(
    command: Sequence[str],
    *,
    cwd: Path | str | None = None,
    timeout: float = 300.0,
    capture_output: bool = True,
    check: bool = False,
    allowed_commands: Sequence[str] | None = None,
    env: dict[str, str] | None = None,
    dry_run: bool = False,
) -> SafeCommandResult

Execute a command safely with security guards.

This function provides a secure wrapper around subprocess.run with command injection protection, timeout handling, and optional command whitelisting.

PARAMETER DESCRIPTION
command

Command and arguments as a sequence.

TYPE: Sequence[str]

cwd

Working directory for the command.

TYPE: Path | str | None DEFAULT: None

timeout

Maximum execution time in seconds.

TYPE: float DEFAULT: 300.0

capture_output

Whether to capture stdout/stderr.

TYPE: bool DEFAULT: True

check

Whether to raise on non-zero exit.

TYPE: bool DEFAULT: False

allowed_commands

Whitelist of allowed commands.

TYPE: Sequence[str] | None DEFAULT: None

env

Environment variables to set.

TYPE: dict[str, str] | None DEFAULT: None

dry_run

If True, don't actually execute the command.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
SafeCommandResult

SafeCommandResult with execution details.

RAISES DESCRIPTION
SecurityError

If command validation fails.

TimeoutExpired

If command times out.

CalledProcessError

If check=True and command fails.

Example

result = run_safe_command(["poetry", "install"]) if result.success: ... print("Installation complete!")