Skip to content

Utilities — Retry, Circuit Breaker

TaipanStack provides resilience and observability utilities for production applications.


Retry

Backward-compatibility shim for the retry module.

This module re-exports all public symbols from the canonical taipanstack.resilience.retry module. Importing from this path remains supported for compatibility, but new code should use the canonical module.

.. deprecated:: Import from taipanstack.resilience.retry instead.

Retrier

Retrier(
    *,
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    on: tuple[type[Exception], ...] = (Exception,),
)

Context manager for retry logic.

Provides a context manager interface for retry logic when decorators are not suitable.

Example

retrier = Retrier(max_attempts=3, on=(ConnectionError,)) with retrier: ... result = some_operation()

Initialize Retrier.

PARAMETER DESCRIPTION
max_attempts

Maximum retry attempts.

TYPE: int DEFAULT: 3

initial_delay

Initial delay between retries.

TYPE: float DEFAULT: 1.0

max_delay

Maximum delay between retries.

TYPE: float DEFAULT: 60.0

on

Exception types to retry on.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

__enter__

__enter__() -> Retrier

Enter the retry context.

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> bool

Exit the retry context.

Returns True to suppress the exception if we should retry, False to let it propagate.

RetryConfig dataclass

RetryConfig(
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    jitter_factor: float = 0.1,
    log_retries: bool = True,
    on_retry: Callable[[int, int, Exception, float], None]
    | None = None,
)

Configuration for retry behavior.

ATTRIBUTE DESCRIPTION
max_attempts

Maximum number of retry attempts.

TYPE: int

initial_delay

Initial delay between retries in seconds.

TYPE: float

max_delay

Maximum delay between retries.

TYPE: float

exponential_base

Base for exponential backoff (2 = double each time).

TYPE: float

jitter

Whether to add random jitter to delays.

TYPE: bool

jitter_factor

Maximum jitter as fraction of delay (0.1 = 10%).

TYPE: float

log_retries

Whether to emit standard log messages.

TYPE: bool

on_retry

Optional callback invoked on each retry.

TYPE: Callable[[int, int, Exception, float], None] | None

__post_init__

__post_init__() -> None

Validate configuration parameters.

RetryDecorator

Bases: Protocol

Protocol for the retry decorator.

RetryError

RetryError(
    message: str,
    attempts: int,
    last_exception: Exception | None = None,
)

Bases: Exception

Raised when all retry attempts have failed.

Initialize RetryError.

PARAMETER DESCRIPTION
message

Description of the retry failure.

TYPE: str

attempts

Number of attempts made.

TYPE: int

last_exception

The last exception that was raised.

TYPE: Exception | None DEFAULT: None

calculate_delay

calculate_delay(attempt: int, config: RetryConfig) -> float

Calculate delay before next retry.

PARAMETER DESCRIPTION
attempt

Current attempt number (1-indexed).

TYPE: int

config

Retry configuration.

TYPE: RetryConfig

RETURNS DESCRIPTION
float

Delay in seconds before next retry.

retry

retry(
    *,
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    on: tuple[type[Exception], ...] = (Exception,),
    reraise: bool = True,
    log_retries: bool = True,
    on_retry: Callable[[int, int, Exception, float], None]
    | None = None,
) -> RetryDecorator

Retry a sync or async function with exponential backoff.

Automatically retries the decorated function when specified exceptions are raised, with configurable backoff strategy. Detects coroutine functions and preserves their async nature.

PARAMETER DESCRIPTION
max_attempts

Maximum number of retry attempts.

TYPE: int DEFAULT: 3

initial_delay

Initial delay between retries in seconds.

TYPE: float DEFAULT: 1.0

max_delay

Maximum delay between retries.

TYPE: float DEFAULT: 60.0

exponential_base

Base for exponential backoff.

TYPE: float DEFAULT: 2.0

jitter

Whether to add random jitter to delays.

TYPE: bool DEFAULT: True

on

Exception types to retry on.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

reraise

Whether to reraise the last exception on failure.

TYPE: bool DEFAULT: True

log_retries

Whether to log retry attempts.

TYPE: bool DEFAULT: True

on_retry

Optional callback invoked on each retry with (attempt, max_attempts, exception, delay). Useful for custom monitoring or metrics collection.

TYPE: Callable[[int, int, Exception, float], None] | None DEFAULT: None

RETURNS DESCRIPTION
RetryDecorator

Decorated function with retry logic.

Example

@retry(max_attempts=3, on=(ConnectionError, TimeoutError)) ... def fetch_data(url: str) -> dict: ... return requests.get(url, timeout=10).json()

@retry(max_attempts=3, on_retry=lambda a, m, e, d: print(f"Retry {a}/{m}")) ... def fragile_operation() -> str: ... return do_something()

retry_on_exception

retry_on_exception(
    exception_types: tuple[type[Exception], ...],
    max_attempts: int = 3,
) -> RetryDecorator

Retry on specific exceptions.

A simpler alternative to the full retry decorator when you just need basic retry functionality.

PARAMETER DESCRIPTION
exception_types

Exception types to retry on.

TYPE: tuple[type[Exception], ...]

max_attempts

Maximum number of attempts.

TYPE: int DEFAULT: 3

RETURNS DESCRIPTION
RetryDecorator

Decorated function with retry logic.

Example

@retry_on_exception((ValueError,), max_attempts=2) ... def parse_data(data: str) -> dict: ... return json.loads(data)


Circuit Breaker

Backward-compatibility shim for the circuit breaker module.

This module re-exports all symbols from the canonical taipanstack.resilience.circuit_breaker module. Import from this path continues to work, but new code should import from the canonical location directly.

.. deprecated:: Import from taipanstack.resilience.circuit_breaker instead.

CircuitBreaker

CircuitBreaker(
    *,
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
    name: str = "default",
    on_state_change: Callable[
        [CircuitState, CircuitState], None
    ]
    | None = None,
)

Circuit breaker implementation.

Monitors function calls and opens the circuit when too many failures occur, preventing further calls until the service recovers. Supports both sync and async functions.

Example

breaker = CircuitBreaker(failure_threshold=3) @breaker ... def call_external_api(): ... return requests.get("https://api.example.com", timeout=10)

Initialize CircuitBreaker.

PARAMETER DESCRIPTION
failure_threshold

Failures before opening circuit.

TYPE: int DEFAULT: 5

success_threshold

Successes to close from half-open.

TYPE: int DEFAULT: 2

timeout

Seconds before attempting half-open.

TYPE: float DEFAULT: 30.0

excluded_exceptions

Exceptions that don't trip circuit.

TYPE: tuple[type[Exception], ...] DEFAULT: ()

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

name

Name for logging/identification.

TYPE: str DEFAULT: 'default'

on_state_change

Optional callback invoked on state transitions with (old_state, new_state). Useful for custom monitoring.

TYPE: Callable[[CircuitState, CircuitState], None] | None DEFAULT: None

state property

state: CircuitState

Get current circuit state.

failure_count property

failure_count: int

Get current failure count.

reset

reset() -> None

Reset circuit breaker to closed state.

__call__

__call__(
    func: Callable[P, R] | Callable[P, Awaitable[R]],
) -> Callable[P, R] | Callable[P, Awaitable[R]]

Decorate a sync or async function with circuit breaker protection.

CircuitBreakerConfig dataclass

CircuitBreakerConfig(
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
)

Configuration for circuit breaker behavior.

ATTRIBUTE DESCRIPTION
failure_threshold

Number of failures before opening circuit.

TYPE: int

success_threshold

Successes needed in half-open to close.

TYPE: int

timeout

Seconds before trying half-open after open.

TYPE: float

excluded_exceptions

Exceptions that don't count as failures.

TYPE: tuple[type[Exception], ...]

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...]

__post_init__

__post_init__() -> None

Validate configuration values.

CircuitBreakerDecorator

Bases: Protocol

Protocol for the circuit breaker decorator.

CircuitBreakerError

CircuitBreakerError(message: str, state: CircuitState)

Bases: Exception

Raised when circuit breaker is open.

Initialize CircuitBreakerError.

PARAMETER DESCRIPTION
message

Error description.

TYPE: str

state

Current circuit state.

TYPE: CircuitState

CircuitBreakerState dataclass

CircuitBreakerState(
    state: CircuitState = CLOSED,
    failure_count: int = 0,
    success_count: int = 0,
    half_open_attempts: int = 0,
    last_failure_time: float = 0.0,
    lock: Lock = Lock(),
)

Internal state tracking for circuit breaker.

CircuitState

Bases: Enum

States of the circuit breaker.

circuit_breaker

circuit_breaker(
    *,
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 30.0,
    excluded_exceptions: tuple[type[Exception], ...] = (),
    failure_exceptions: tuple[type[Exception], ...] = (
        Exception,
    ),
    name: str | None = None,
    on_state_change: Callable[
        [CircuitState, CircuitState], None
    ]
    | None = None,
) -> CircuitBreakerDecorator

Decorate a sync or async function with circuit breaker pattern.

PARAMETER DESCRIPTION
failure_threshold

Failures before opening circuit.

TYPE: int DEFAULT: 5

success_threshold

Successes to close from half-open.

TYPE: int DEFAULT: 2

timeout

Seconds before attempting half-open.

TYPE: float DEFAULT: 30.0

excluded_exceptions

Exceptions that don't trip circuit.

TYPE: tuple[type[Exception], ...] DEFAULT: ()

failure_exceptions

Exceptions that count as failures.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

name

Optional name for the circuit.

TYPE: str | None DEFAULT: None

on_state_change

Optional callback invoked on state transitions with (old_state, new_state).

TYPE: Callable[[CircuitState, CircuitState], None] | None DEFAULT: None

RETURNS DESCRIPTION
CircuitBreakerDecorator

Decorated function with circuit breaker protection.

Example

@circuit_breaker(failure_threshold=3, timeout=60) ... def call_api(endpoint: str) -> dict: ... return requests.get(endpoint, timeout=10).json()

@circuit_breaker( ... failure_threshold=3, ... on_state_change=lambda old, new: print(f"{old} -> {new}"), ... ) ... def monitored_call() -> str: ... return service.call()


Resilience

Backward-compatibility shim for resilience decorators.

This module re-exports the public decorators and helper types from the canonical taipanstack.resilience.resilience module. Importing from this path remains supported for compatibility, but new code should use the canonical module.

.. deprecated:: Import from taipanstack.resilience.resilience instead.

FallbackDecorator

Bases: Protocol

Protocol for the fallback decorator.

TimeoutDecorator

Bases: Protocol

Protocol for the timeout decorator.

fallback

fallback(
    fallback_value: T,
    exceptions: tuple[type[Exception], ...] = (Exception,),
) -> FallbackDecorator

Provide a fallback value on failures.

If the wrapped function returns an Err() or raises a specified exception, the fallback value is returned wrapped in an Ok().

PARAMETER DESCRIPTION
fallback_value

The value to return on failure.

TYPE: T

exceptions

Exceptions to catch.

TYPE: tuple[type[Exception], ...] DEFAULT: (Exception,)

RETURNS DESCRIPTION
FallbackDecorator

Decorator function.

timeout

timeout(seconds: float) -> TimeoutDecorator

Enforce a maximum execution time.

If the execution time exceeds the specified limit, returns Err(TimeoutError).

PARAMETER DESCRIPTION
seconds

Maximum allowed execution time in seconds.

TYPE: float

RETURNS DESCRIPTION
TimeoutDecorator

Decorator function.


Cache

Intelligent Cache decorator.

Provides in-memory caching that respects the Result monad and TTL, ignoring caching for Err() results.

CacheDecorator

Bases: Protocol

Protocol for the cache decorator.

cached

cached(ttl: float, max_size: int = 1024) -> CacheDecorator

Cache the Ok() results of a function for a given TTL.

Err() results are not cached. Supports both async and sync functions. Implements LRU (Least Recently Used) eviction when max_size is reached.

PARAMETER DESCRIPTION
ttl

Time to live in seconds.

TYPE: float

max_size

Maximum number of elements to store in the cache.

TYPE: int DEFAULT: 1024

RETURNS DESCRIPTION
CacheDecorator

Decorator function.


Context

Observability Context Module.

Provides context variables and context managers for tracing and observability, such as the correlation ID.

get_correlation_id

get_correlation_id() -> str | None

Get the current correlation ID.

RETURNS DESCRIPTION
str | None

The correlation ID if set, otherwise None.

set_correlation_id

set_correlation_id(correlation_id: str | None) -> None

Set the correlation ID for the current context.

PARAMETER DESCRIPTION
correlation_id

The correlation ID string, or None to clear.

TYPE: str | None

correlation_scope

correlation_scope(
    correlation_id: str | None,
) -> Iterator[None]

Context manager to set the correlation ID and restore it after.

PARAMETER DESCRIPTION
correlation_id

The correlation ID to set for the duration of the scope.

TYPE: str | None

YIELDS DESCRIPTION
None

None


Concurrency

Concurrency utilities.

Provides a bulkhead pattern concurrency limiter decorator for both synchronous and asynchronous functions. Uses an OverloadError and returns a Result type.

OverloadError

OverloadError(message: str = 'Concurrency limit reached')

Bases: Exception

Exception raised when a concurrency limit is exceeded or timed out.

Initialize the OverloadError.

PARAMETER DESCRIPTION
message

The error message to display. Defaults to "Concurrency limit reached".

TYPE: str DEFAULT: 'Concurrency limit reached'

ConcurrencyLimitDecorator

Bases: Protocol

Protocol for the concurrency limit decorator.

limit_concurrency

limit_concurrency(
    max_tasks: int, timeout: float = 0.0
) -> ConcurrencyLimitDecorator

Decorate a function to apply the bulkhead concurrency limit pattern.

If the maximum concurrent executions are reached, the wrapper will wait up to timeout seconds to acquire a execution slot. If it fails, it returns an Err(OverloadError).

PARAMETER DESCRIPTION
max_tasks

Maximum concurrent function executions allowed.

TYPE: int

timeout

Maximum time in seconds to wait for a slot if limit is reached.

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
ConcurrencyLimitDecorator

Decorated function returning a Result[T, OverloadError].

Example

@limit_concurrency(max_tasks=2, timeout=0.1) ... def process_data() -> str: ... return "data" process_data() Ok('data')


Filesystem

Safe filesystem operations.

Provides secure wrappers around file operations with path validation, atomic writes, and proper error handling using Result types.

FileNotFoundErr dataclass

FileNotFoundErr(path: Path)

Error when file is not found.

message property

message: str

Get the error message.

NotAFileErr dataclass

NotAFileErr(path: Path)

Error when path is not a file.

message property

message: str

Get the error message.

FileTooLargeErr dataclass

FileTooLargeErr(path: Path, size: int, max_size: int)

Error when file exceeds size limit.

message property

message: str

Get the error message.

WriteOptions dataclass

WriteOptions(
    base_dir: Path | str | None = None,
    encoding: str = "utf-8",
    create_parents: bool = True,
    backup: bool = True,
    atomic: bool = True,
)

Options for safe_write.

ATTRIBUTE DESCRIPTION
base_dir

Base directory to constrain to.

TYPE: Path | str | None

encoding

File encoding.

TYPE: str

create_parents

Create parent directories if needed.

TYPE: bool

backup

Create backup of existing file.

TYPE: bool

atomic

Use atomic write.

TYPE: bool

safe_read

safe_read(
    path: Path | str,
    *,
    base_dir: Path | str | None = None,
    encoding: str = "utf-8",
    max_size_bytes: int | None = 10 * 1024 * 1024,
) -> Result[str, ReadFileError]

Read a file safely with path validation.

PARAMETER DESCRIPTION
path

Path to the file to read.

TYPE: Path | str

base_dir

Base directory to constrain to.

TYPE: Path | str | None DEFAULT: None

encoding

File encoding.

TYPE: str DEFAULT: 'utf-8'

max_size_bytes

Maximum file size to read (None for no limit).

TYPE: int | None DEFAULT: 10 * 1024 * 1024

RETURNS DESCRIPTION
Ok

File contents on success.

TYPE: str

Err

Error details on failure.

TYPE: ReadFileError

Example

match safe_read("config.json"): ... case Ok(content): ... data = json.loads(content) ... case Err(FileNotFoundErr(path=p)): ... print(f"Missing: {p}") ... case Err(FileTooLargeErr(size=s)): ... print(f"Too big: {s} bytes")

safe_write

safe_write(
    path: Path | str,
    content: str,
    *,
    options: WriteOptions | None = None,
) -> Path

Write to a file safely with path validation.

PARAMETER DESCRIPTION
path

Path to write to.

TYPE: Path | str

content

Content to write.

TYPE: str

options

Write options.

TYPE: WriteOptions | None DEFAULT: None

RETURNS DESCRIPTION
Path

Path to the written file.

RAISES DESCRIPTION
SecurityError

If path validation fails.

ensure_dir

ensure_dir(
    path: Path | str,
    *,
    base_dir: Path | str | None = None,
    mode: int = 493,
) -> Path

Ensure a directory exists, creating it if needed.

PARAMETER DESCRIPTION
path

Path to the directory.

TYPE: Path | str

base_dir

Base directory to constrain to.

TYPE: Path | str | None DEFAULT: None

mode

Directory permissions.

TYPE: int DEFAULT: 493

RETURNS DESCRIPTION
Path

Path to the directory.

RAISES DESCRIPTION
SecurityError

If path validation fails.

FileExistsError

If a file already exists at the given path or intermediate paths.


Logging

Structured logging with context.

Provides a configured logger with support for structured output, context propagation, and proper formatting.

StackLogger

StackLogger(
    name: str = "stack",
    level: str = "INFO",
    *,
    use_structured: bool = False,
)

Enhanced logger with context support.

Provides a wrapper around standard logging with additional features like context propagation and structured output support.

ATTRIBUTE DESCRIPTION
name

Logger name.

level

Current log level.

Initialize the logger.

PARAMETER DESCRIPTION
name

Logger name.

TYPE: str DEFAULT: 'stack'

level

Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

TYPE: str DEFAULT: 'INFO'

use_structured

Use structured logging if structlog is available.

TYPE: bool DEFAULT: False

bind

bind(**context: object) -> StackLogger

Add context to logger.

PARAMETER DESCRIPTION
**context

Key-value pairs to add to context.

TYPE: object DEFAULT: {}

RETURNS DESCRIPTION
StackLogger

Self for chaining.

unbind

unbind(*keys: str) -> StackLogger

Remove context keys.

PARAMETER DESCRIPTION
*keys

Keys to remove from context.

TYPE: str DEFAULT: ()

RETURNS DESCRIPTION
StackLogger

Self for chaining.

debug

debug(message: str, **kwargs: object) -> None

Log a debug message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: object DEFAULT: {}

info

info(message: str, **kwargs: object) -> None

Log an info message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: object DEFAULT: {}

warning

warning(message: str, **kwargs: object) -> None

Log a warning message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: object DEFAULT: {}

error

error(message: str, **kwargs: object) -> None

Log an error message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: object DEFAULT: {}

critical

critical(message: str, **kwargs: object) -> None

Log a critical message.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: object DEFAULT: {}

exception

exception(message: str, **kwargs: object) -> None

Log an exception with traceback.

PARAMETER DESCRIPTION
message

The message to log.

TYPE: str

**kwargs

Additional context.

TYPE: object DEFAULT: {}

mask_sensitive_data_processor

mask_sensitive_data_processor(
    _logger: object,
    _method: str,
    event_dict: MutableMapping[str, object],
) -> MutableMapping[str, object]

Mask sensitive data in structlog event dictionaries.

Intercept the event_dict produced by structlog and replace the value of any key whose name contains a sensitive substring with "***REDACTED***". Matching is case-insensitive.

PARAMETER DESCRIPTION
_logger

The wrapped logger object (unused, required by structlog).

TYPE: object

_method

The name of the log method called (unused, required by structlog).

TYPE: str

event_dict

The structured event dictionary.

TYPE: MutableMapping[str, object]

RETURNS DESCRIPTION
MutableMapping[str, object]

The event dictionary with sensitive values masked.

correlation_id_processor

correlation_id_processor(
    _logger: object,
    _method: str,
    event_dict: MutableMapping[str, object],
) -> MutableMapping[str, object]

Structlog processor to inject correlation ID into events.

PARAMETER DESCRIPTION
_logger

The wrapped logger object.

TYPE: object

_method

The name of the log method called.

TYPE: str

event_dict

The structured event dictionary.

TYPE: MutableMapping[str, object]

RETURNS DESCRIPTION
MutableMapping[str, object]

The event dictionary with correlation_id injected if sets.

setup_logging

setup_logging(
    level: Literal[
        "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
    ] = "INFO",
    *,
    format_type: Literal[
        "simple", "detailed", "json"
    ] = "detailed",
    log_file: str | None = None,
    use_structured: bool = False,
) -> None

Configure the root logger.

PARAMETER DESCRIPTION
level

Log level to set.

TYPE: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] DEFAULT: 'INFO'

format_type

Output format type.

TYPE: Literal['simple', 'detailed', 'json'] DEFAULT: 'detailed'

log_file

Optional file to log to.

TYPE: str | None DEFAULT: None

use_structured

Use structlog if available.

TYPE: bool DEFAULT: False

get_logger

get_logger(
    name: str = "stack",
    *,
    level: str = "INFO",
    use_structured: bool = False,
) -> StackLogger

Get a configured logger instance.

PARAMETER DESCRIPTION
name

Logger name.

TYPE: str DEFAULT: 'stack'

level

Log level.

TYPE: str DEFAULT: 'INFO'

use_structured

Use structlog if available.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
StackLogger

Configured StackLogger instance.

Example

logger = get_logger("my_module") logger.bind(request_id="123").info("Processing request")

log_operation

log_operation(
    operation: str,
    *,
    logger: StackLogger | None = None,
    level: str = "INFO",
    expected_exceptions: tuple[type[Exception], ...]
    | type[Exception] = Exception,
) -> AbstractContextManager[StackLogger]

Context manager for logging operations.

PARAMETER DESCRIPTION
operation

Name of the operation.

TYPE: str

logger

Logger to use (creates one if not provided).

TYPE: StackLogger | None DEFAULT: None

level

Log level for messages.

TYPE: str DEFAULT: 'INFO'

expected_exceptions

Exceptions to catch and log as failures.

TYPE: tuple[type[Exception], ...] | type[Exception] DEFAULT: Exception

YIELDS DESCRIPTION
AbstractContextManager[StackLogger]

The logger instance.

Example

with log_operation("setup") as logger: ... logger.info("Setting up environment")


Rate Limit

Rate limiting utilities.

Provides an in-memory token-bucket based rate limiting decorator for both synchronous and asynchronous functions. The decorator returns a Result type encapsulating the original return value or a RateLimitError error.

RateLimitError

RateLimitError(message: str = 'Rate limit exceeded')

Bases: Exception

Exception raised when a rate limit is exceeded.

Initialize the RateLimitError.

PARAMETER DESCRIPTION
message

The error message to display.Defaults to "Rate limit exceeded".

TYPE: str DEFAULT: 'Rate limit exceeded'

RateLimiter

RateLimiter(max_calls: int, time_window: float)

Token bucket rate limiter logic.

Initialize the token bucket.

PARAMETER DESCRIPTION
max_calls

The maximum number of calls allowed in the time window.

TYPE: int

time_window

The time window in seconds.

TYPE: float

consume

consume(tokens: float = 1.0) -> bool

Try to consume tokens.

PARAMETER DESCRIPTION
tokens

Number of tokens to consume. Defaults to 1.0.

TYPE: float DEFAULT: 1.0

RETURNS DESCRIPTION
bool

True if tokens were consumed (allow), False otherwise (limit exceeded).

RateLimitDecorator

Bases: Protocol

Protocol for the rate limit decorator.

rate_limit

rate_limit(
    max_calls: int, time_window: float
) -> RateLimitDecorator

Decorate a function to apply rate limiting.

If the rate limit is exceeded, the wrapped function immediately returns an Err(RateLimitError). Uses an in-memory token bucket strategy.

PARAMETER DESCRIPTION
max_calls

Maximum function executions allowed in the defined window.

TYPE: int

time_window

Time window size in seconds.

TYPE: float

RETURNS DESCRIPTION
RateLimitDecorator

Decorated function returning a Result[T, RateLimitError].

Example

@rate_limit(max_calls=2, time_window=1.0) ... def fetch_data() -> str: ... return "data" fetch_data() Ok('data') fetch_data() Ok('data') fetch_data() Err(RateLimitError('Rate limit exceeded'))


Serialization

Serialization utilities.

Provides an optimized default encoder for use with orjson.dumps.

default_encoder

default_encoder(obj: object) -> dict[str, object]

Default encoder for orjson.dumps handling Result types.

Intercepts objects of type Ok and Err: - Ok(value): Returns {"status": "success"} merged with value if value is a dict. Otherwise returns {"status": "success", "data": value}. - Err(error): Returns {"status": "error", "message": str(error)}.

PARAMETER DESCRIPTION
obj

The object to encode.

TYPE: object

RETURNS DESCRIPTION
dict[str, object]

A serializable representation of the object.

RAISES DESCRIPTION
TypeError

If the object type is not supported.

Example

import orjson orjson.dumps(Ok({"id": 1}), default=default_encoder) b'{"status":"success","id":1}' orjson.dumps(Err(ValueError("oops")), default=default_encoder) b'{"status":"error","message":"oops"}'


Subprocess

Safe subprocess execution with security guards.

Provides secure wrappers around subprocess execution with command validation, timeout handling, and retry logic.

SafeCommandResult dataclass

SafeCommandResult(
    command: list[str],
    returncode: int,
    stdout: str = "",
    stderr: str = "",
    duration_seconds: float = 0.0,
)

Result of a safe command execution.

ATTRIBUTE DESCRIPTION
command

The executed command.

TYPE: list[str]

returncode

Exit code of the command.

TYPE: int

stdout

Standard output.

TYPE: str

stderr

Standard error.

TYPE: str

success

Whether the command succeeded (returncode == 0).

TYPE: bool

duration_seconds

How long the command took.

TYPE: float

success property

success: bool

Check if command succeeded.

raise_on_error

raise_on_error() -> SafeCommandResult

Raise an exception if command failed.

RETURNS DESCRIPTION
SafeCommandResult

Self if successful.

RAISES DESCRIPTION
CalledProcessError

If command failed.

run_safe_command

run_safe_command(
    command: Sequence[str],
    *,
    cwd: Path | str | None = None,
    timeout: float = 300.0,
    capture_output: bool = True,
    check: bool = False,
    allowed_commands: Sequence[str] | None = None,
    env: dict[str, str] | None = None,
    allowed_env_vars: Sequence[str] | None = None,
    dry_run: bool = False,
) -> SafeCommandResult

Execute a command safely with security guards.

This function provides a secure wrapper around subprocess.run with command injection protection, timeout handling, and optional command whitelisting.

PARAMETER DESCRIPTION
command

Command and arguments as a sequence.

TYPE: Sequence[str]

cwd

Working directory for the command.

TYPE: Path | str | None DEFAULT: None

timeout

Maximum execution time in seconds.

TYPE: float DEFAULT: 300.0

capture_output

Whether to capture stdout/stderr.

TYPE: bool DEFAULT: True

check

Whether to raise on non-zero exit.

TYPE: bool DEFAULT: False

allowed_commands

Whitelist of allowed commands.

TYPE: Sequence[str] | None DEFAULT: None

env

Environment variables to set.

TYPE: dict[str, str] | None DEFAULT: None

allowed_env_vars

Whitelist of allowed environment variables.

TYPE: Sequence[str] | None DEFAULT: None

dry_run

If True, don't actually execute the command.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
SafeCommandResult

SafeCommandResult with execution details.

RAISES DESCRIPTION
SecurityError

If command validation fails.

TimeoutExpired

If command times out.

CalledProcessError

If check=True and command fails.

Example

result = run_safe_command(["poetry", "install"]) if result.success: ... print("Installation complete!")