Utilities — Retry, Circuit Breaker & Metrics¶
TaipanStack provides resilience and observability utilities for production applications.
Retry¶
Retry logic with exponential backoff.
Provides decorators for automatic retry of failing operations with configurable backoff strategies. Compatible with any Python framework (sync and async).
RetryDecorator
¶
Bases: Protocol
Protocol for the retry decorator.
RetryConfig
dataclass
¶
RetryConfig(
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
jitter_factor: float = 0.1,
log_retries: bool = True,
on_retry: Callable[[int, int, Exception, float], None]
| None = None,
)
Configuration for retry behavior.
| ATTRIBUTE | DESCRIPTION |
|---|---|
max_attempts |
Maximum number of retry attempts.
TYPE:
|
initial_delay |
Initial delay between retries in seconds.
TYPE:
|
max_delay |
Maximum delay between retries.
TYPE:
|
exponential_base |
Base for exponential backoff (2 = double each time).
TYPE:
|
jitter |
Whether to add random jitter to delays.
TYPE:
|
jitter_factor |
Maximum jitter as fraction of delay (0.1 = 10%).
TYPE:
|
log_retries |
Whether to emit standard log messages.
TYPE:
|
on_retry |
Optional callback invoked on each retry.
TYPE:
|
RetryError
¶
RetryError(
message: str,
attempts: int,
last_exception: Exception | None = None,
)
Bases: Exception
Raised when all retry attempts have failed.
Initialize RetryError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Description of the retry failure.
TYPE:
|
attempts
|
Number of attempts made.
TYPE:
|
last_exception
|
The last exception that was raised.
TYPE:
|
Retrier
¶
Retrier(
*,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
on: tuple[type[Exception], ...] = (Exception,),
)
Context manager for retry logic.
Provides a context manager interface for retry logic when decorators are not suitable.
Example
retrier = Retrier(max_attempts=3, on=(ConnectionError,)) with retrier: ... result = some_operation()
Initialize Retrier.
| PARAMETER | DESCRIPTION |
|---|---|
max_attempts
|
Maximum retry attempts.
TYPE:
|
initial_delay
|
Initial delay between retries.
TYPE:
|
max_delay
|
Maximum delay between retries.
TYPE:
|
on
|
Exception types to retry on.
TYPE:
|
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> bool
Exit the retry context.
Returns True to suppress the exception if we should retry, False to let it propagate.
calculate_delay
¶
calculate_delay(attempt: int, config: RetryConfig) -> float
Calculate delay before next retry.
| PARAMETER | DESCRIPTION |
|---|---|
attempt
|
Current attempt number (1-indexed).
TYPE:
|
config
|
Retry configuration.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
float
|
Delay in seconds before next retry. |
retry
¶
retry(
*,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
on: tuple[type[Exception], ...] = (Exception,),
reraise: bool = True,
log_retries: bool = True,
on_retry: Callable[[int, int, Exception, float], None]
| None = None,
) -> RetryDecorator
Retry a sync or async function with exponential backoff.
Automatically retries the decorated function when specified exceptions are raised, with configurable backoff strategy. Detects coroutine functions and preserves their async nature.
| PARAMETER | DESCRIPTION |
|---|---|
max_attempts
|
Maximum number of retry attempts.
TYPE:
|
initial_delay
|
Initial delay between retries in seconds.
TYPE:
|
max_delay
|
Maximum delay between retries.
TYPE:
|
exponential_base
|
Base for exponential backoff.
TYPE:
|
jitter
|
Whether to add random jitter to delays.
TYPE:
|
on
|
Exception types to retry on.
TYPE:
|
reraise
|
Whether to reraise the last exception on failure.
TYPE:
|
log_retries
|
Whether to log retry attempts.
TYPE:
|
on_retry
|
Optional callback invoked on each retry with (attempt, max_attempts, exception, delay). Useful for custom monitoring or metrics collection.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RetryDecorator
|
Decorated function with retry logic. |
Example
@retry(max_attempts=3, on=(ConnectionError, TimeoutError)) ... def fetch_data(url: str) -> dict: ... return requests.get(url, timeout=10).json()
@retry(max_attempts=3, on_retry=lambda a, m, e, d: print(f"Retry {a}/{m}")) ... def fragile_operation() -> str: ... return do_something()
retry_on_exception
¶
retry_on_exception(
exception_types: tuple[type[Exception], ...],
max_attempts: int = 3,
) -> RetryDecorator
Retry on specific exceptions.
A simpler alternative to the full retry decorator when you just need basic retry functionality.
| PARAMETER | DESCRIPTION |
|---|---|
exception_types
|
Exception types to retry on.
TYPE:
|
max_attempts
|
Maximum number of attempts.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RetryDecorator
|
Decorated function with retry logic. |
Example
@retry_on_exception((ValueError,), max_attempts=2) ... def parse_data(data: str) -> dict: ... return json.loads(data)
Circuit Breaker¶
Circuit Breaker pattern implementation.
Provides protection against cascading failures by temporarily blocking calls to a failing service. Compatible with any Python framework (sync and async).
CircuitBreakerDecorator
¶
Bases: Protocol
Protocol for the circuit breaker decorator.
CircuitState
¶
Bases: Enum
States of the circuit breaker.
CircuitBreakerError
¶
CircuitBreakerError(message: str, state: CircuitState)
Bases: Exception
Raised when circuit breaker is open.
Initialize CircuitBreakerError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Error description.
TYPE:
|
state
|
Current circuit state.
TYPE:
|
CircuitBreakerConfig
dataclass
¶
CircuitBreakerConfig(
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
)
Configuration for circuit breaker behavior.
| ATTRIBUTE | DESCRIPTION |
|---|---|
failure_threshold |
Number of failures before opening circuit.
TYPE:
|
success_threshold |
Successes needed in half-open to close.
TYPE:
|
timeout |
Seconds before trying half-open after open.
TYPE:
|
excluded_exceptions |
Exceptions that don't count as failures.
TYPE:
|
failure_exceptions |
Exceptions that count as failures.
TYPE:
|
CircuitBreakerState
dataclass
¶
CircuitBreakerState(
state: CircuitState = CLOSED,
failure_count: int = 0,
success_count: int = 0,
half_open_attempts: int = 0,
last_failure_time: float = 0.0,
lock: Lock = Lock(),
)
Internal state tracking for circuit breaker.
CircuitBreaker
¶
CircuitBreaker(
*,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
name: str = "default",
on_state_change: Callable[
[CircuitState, CircuitState], None
]
| None = None,
)
Circuit breaker implementation.
Monitors function calls and opens the circuit when too many failures occur, preventing further calls until the service recovers. Supports both sync and async functions.
Example
breaker = CircuitBreaker(failure_threshold=3) @breaker ... def call_external_api(): ... return requests.get("https://api.example.com", timeout=10)
Initialize CircuitBreaker.
| PARAMETER | DESCRIPTION |
|---|---|
failure_threshold
|
Failures before opening circuit.
TYPE:
|
success_threshold
|
Successes to close from half-open.
TYPE:
|
timeout
|
Seconds before attempting half-open.
TYPE:
|
excluded_exceptions
|
Exceptions that don't trip circuit.
TYPE:
|
failure_exceptions
|
Exceptions that count as failures.
TYPE:
|
name
|
Name for logging/identification.
TYPE:
|
on_state_change
|
Optional callback invoked on state transitions with (old_state, new_state). Useful for custom monitoring.
TYPE:
|
__call__
¶
__call__(
func: Callable[P, R]
| Callable[P, Coroutine[Any, Any, R]],
) -> (
Callable[P, R] | Callable[P, Coroutine[Any, Any, R]]
)
Decorate a sync or async function with circuit breaker protection.
circuit_breaker
¶
circuit_breaker(
*,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
name: str | None = None,
on_state_change: Callable[
[CircuitState, CircuitState], None
]
| None = None,
) -> CircuitBreakerDecorator
Decorate a sync or async function with circuit breaker pattern.
| PARAMETER | DESCRIPTION |
|---|---|
failure_threshold
|
Failures before opening circuit.
TYPE:
|
success_threshold
|
Successes to close from half-open.
TYPE:
|
timeout
|
Seconds before attempting half-open.
TYPE:
|
excluded_exceptions
|
Exceptions that don't trip circuit.
TYPE:
|
failure_exceptions
|
Exceptions that count as failures.
TYPE:
|
name
|
Optional name for the circuit.
TYPE:
|
on_state_change
|
Optional callback invoked on state transitions with (old_state, new_state).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
CircuitBreakerDecorator
|
Decorated function with circuit breaker protection. |
Example
@circuit_breaker(failure_threshold=3, timeout=60) ... def call_api(endpoint: str) -> dict: ... return requests.get(endpoint, timeout=10).json()
@circuit_breaker( ... failure_threshold=3, ... on_state_change=lambda old, new: print(f"{old} -> {new}"), ... ) ... def monitored_call() -> str: ... return service.call()
Metrics¶
Metrics collection and monitoring utilities.
Provides lightweight metrics collection for monitoring application performance and health. Compatible with any Python framework.
TimingStats
dataclass
¶
TimingStats(
count: int = 0,
total_time: float = 0.0,
min_time: float = float("inf"),
max_time: float = 0.0,
)
TimerStats
¶
Bases: TypedDict
Statistics dictionary for timing measurements.
MetricsSnapshot
¶
Bases: TypedDict
Snapshot of all collected metrics.
Counter
¶
Counter()
MetricsCollector
¶
MetricsCollector()
Centralized metrics collection.
Thread-safe collector for various metric types including counters, timers, and gauges.
Example
metrics = MetricsCollector() metrics.increment("requests_total") with metrics.timer("request_duration"): ... process_request()
Initialize metrics collector.
get_timer_stats
¶
get_timer_stats(name: str) -> TimingStats | None
Get timing statistics for a named timer.
Timer
¶
Timer(name: str, collector: MetricsCollector)
Context manager for timing code blocks.
Initialize timer.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name of the timer metric.
TYPE:
|
collector
|
MetricsCollector to record to.
TYPE:
|
timed
¶
timed(
name: str | None = None,
*,
collector: MetricsCollector | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to time function execution.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Optional metric name (defaults to function name).
TYPE:
|
collector
|
Optional MetricsCollector instance.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Callable[[Callable[P, R]], Callable[P, R]]
|
Decorated function that records execution time. |
Example
@timed("api_call_duration") ... def call_api(endpoint: str) -> dict: ... return requests.get(endpoint, timeout=10).json()
counted
¶
counted(
name: str | None = None,
*,
collector: MetricsCollector | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]
Decorator to count function calls.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Optional metric name (defaults to function name).
TYPE:
|
collector
|
Optional MetricsCollector instance.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Callable[[Callable[P, R]], Callable[P, R]]
|
Decorated function that counts calls. |
Example
@counted("login_attempts") ... def login(username: str, password: str) -> bool: ... return authenticate(username, password)
Resilience¶
Resilience decorators.
Provides tools for graceful fallback and timeouts using the Result monad.
FallbackDecorator
¶
Bases: Protocol
Protocol for the fallback decorator.
TimeoutDecorator
¶
Bases: Protocol
Protocol for the timeout decorator.
fallback
¶
fallback(
fallback_value: T,
exceptions: tuple[type[Exception], ...] = (Exception,),
) -> FallbackDecorator
Provide a fallback value on failures.
If the wrapped function returns an Err() or raises a specified exception, the fallback value is returned wrapped in an Ok().
| PARAMETER | DESCRIPTION |
|---|---|
fallback_value
|
The value to return on failure.
TYPE:
|
exceptions
|
Exceptions to catch.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
FallbackDecorator
|
Decorator function. |
timeout
¶
timeout(seconds: float) -> TimeoutDecorator
Enforce a maximum execution time.
If the execution time exceeds the specified limit, returns Err(TimeoutError).
| PARAMETER | DESCRIPTION |
|---|---|
seconds
|
Maximum allowed execution time in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TimeoutDecorator
|
Decorator function. |
Cache¶
Intelligent Cache decorator.
Provides in-memory caching that respects the Result monad and TTL, ignoring caching for Err() results.
CacheDecorator
¶
Bases: Protocol
Protocol for the cache decorator.
cached
¶
cached(ttl: float) -> CacheDecorator
Cache the Ok() results of a function for a given TTL.
Err() results are not cached. Supports both async and sync functions.
| PARAMETER | DESCRIPTION |
|---|---|
ttl
|
Time to live in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
CacheDecorator
|
Decorator function. |
Context¶
Observability Context Module.
Provides context variables and context managers for tracing and observability, such as the correlation ID.
get_correlation_id
¶
get_correlation_id() -> str | None
Get the current correlation ID.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
The correlation ID if set, otherwise None. |
set_correlation_id
¶
set_correlation_id(correlation_id: str | None) -> None
Set the correlation ID for the current context.
| PARAMETER | DESCRIPTION |
|---|---|
correlation_id
|
The correlation ID string, or None to clear.
TYPE:
|
correlation_scope
¶
correlation_scope(
correlation_id: str | None,
) -> Iterator[None]
Context manager to set the correlation ID and restore it after.
| PARAMETER | DESCRIPTION |
|---|---|
correlation_id
|
The correlation ID to set for the duration of the scope.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
None
|
None |
Concurrency¶
Concurrency utilities.
Provides a bulkhead pattern concurrency limiter decorator for both
synchronous and asynchronous functions. Uses an OverloadError and
returns a Result type.
OverloadError
¶
OverloadError(message: str = 'Concurrency limit reached')
Bases: Exception
Exception raised when a concurrency limit is exceeded or timed out.
Initialize the OverloadError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The error message to display. Defaults to "Concurrency limit reached".
TYPE:
|
ConcurrencyLimitDecorator
¶
Bases: Protocol
Protocol for the concurrency limit decorator.
limit_concurrency
¶
limit_concurrency(
max_tasks: int, timeout: float = 0.0
) -> ConcurrencyLimitDecorator
Decorate a function to apply the bulkhead concurrency limit pattern.
If the maximum concurrent executions are reached, the wrapper will wait up
to timeout seconds to acquire a execution slot. If it fails, it returns
an Err(OverloadError).
| PARAMETER | DESCRIPTION |
|---|---|
max_tasks
|
Maximum concurrent function executions allowed.
TYPE:
|
timeout
|
Maximum time in seconds to wait for a slot if limit is reached.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ConcurrencyLimitDecorator
|
Decorated function returning a |
Example
@limit_concurrency(max_tasks=2, timeout=0.1) ... def process_data() -> str: ... return "data" process_data() Ok('data')
Filesystem¶
Safe filesystem operations.
Provides secure wrappers around file operations with path validation, atomic writes, and proper error handling using Result types.
FileNotFoundErr
dataclass
¶
FileNotFoundErr(path: Path, message: str = '')
Error when file is not found.
FileTooLargeErr
dataclass
¶
FileTooLargeErr(
path: Path, size: int, max_size: int, message: str = ""
)
Error when file exceeds size limit.
WriteOptions
dataclass
¶
WriteOptions(
base_dir: Path | str | None = None,
encoding: str = "utf-8",
create_parents: bool = True,
backup: bool = True,
atomic: bool = True,
)
Options for safe_write.
| ATTRIBUTE | DESCRIPTION |
|---|---|
base_dir |
Base directory to constrain to.
TYPE:
|
encoding |
File encoding.
TYPE:
|
create_parents |
Create parent directories if needed.
TYPE:
|
backup |
Create backup of existing file.
TYPE:
|
atomic |
Use atomic write.
TYPE:
|
safe_read
¶
safe_read(
path: Path | str,
*,
base_dir: Path | str | None = None,
encoding: str = "utf-8",
max_size_bytes: int | None = 10 * 1024 * 1024,
) -> Result[str, ReadFileError]
Read a file safely with path validation.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the file to read.
TYPE:
|
base_dir
|
Base directory to constrain to.
TYPE:
|
encoding
|
File encoding.
TYPE:
|
max_size_bytes
|
Maximum file size to read (None for no limit).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Ok
|
File contents on success.
TYPE:
|
Err
|
Error details on failure.
TYPE:
|
Example
match safe_read("config.json"): ... case Ok(content): ... data = json.loads(content) ... case Err(FileNotFoundErr(path=p)): ... print(f"Missing: {p}") ... case Err(FileTooLargeErr(size=s)): ... print(f"Too big: {s} bytes")
safe_write
¶
safe_write(
path: Path | str,
content: str,
*,
options: WriteOptions | None = None,
) -> Path
Write to a file safely with path validation.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to write to.
TYPE:
|
content
|
Content to write.
TYPE:
|
options
|
Write options.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
Path to the written file. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If path validation fails. |
ensure_dir
¶
ensure_dir(
path: Path | str,
*,
base_dir: Path | str | None = None,
mode: int = 493,
) -> Path
Ensure a directory exists, creating it if needed.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the directory.
TYPE:
|
base_dir
|
Base directory to constrain to.
TYPE:
|
mode
|
Directory permissions.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
Path to the directory. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If path validation fails. |
safe_copy
¶
safe_copy(
src: Path | str,
dst: Path | str,
*,
base_dir: Path | str | None = None,
overwrite: bool = False,
) -> Path
Copy a file safely.
| PARAMETER | DESCRIPTION |
|---|---|
src
|
Source file path.
TYPE:
|
dst
|
Destination file path.
TYPE:
|
base_dir
|
Base directory to constrain both paths to.
TYPE:
|
overwrite
|
Allow overwriting existing file.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
Path to the destination file. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If path validation fails. |
FileExistsError
|
If destination exists and overwrite=False. |
safe_delete
¶
safe_delete(
path: Path | str,
*,
base_dir: Path | str | None = None,
missing_ok: bool = True,
recursive: bool = False,
) -> bool
Delete a file or directory safely.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to delete.
TYPE:
|
base_dir
|
Base directory to constrain to.
TYPE:
|
missing_ok
|
Don't raise if path doesn't exist.
TYPE:
|
recursive
|
Allow deleting directories recursively.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if something was deleted. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If path validation fails. |
FileNotFoundError
|
If path doesn't exist and missing_ok=False. |
get_file_hash
¶
get_file_hash(
path: Path | str,
*,
algorithm: str = "sha256",
base_dir: Path | str | None = None,
) -> str
Get hash of a file.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the file.
TYPE:
|
algorithm
|
Hash algorithm (sha256, sha512, etc).
TYPE:
|
base_dir
|
Base directory to constrain to.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Hex digest of the file hash. |
find_files
¶
find_files(
directory: Path | str,
pattern: str = "*",
*,
base_dir: Path | str | None = None,
recursive: bool = True,
include_hidden: bool = False,
) -> list[Path]
Find files matching a pattern.
| PARAMETER | DESCRIPTION |
|---|---|
directory
|
Directory to search in.
TYPE:
|
pattern
|
Glob pattern to match.
TYPE:
|
base_dir
|
Base directory to constrain to.
TYPE:
|
recursive
|
Search recursively.
TYPE:
|
include_hidden
|
Include hidden files (starting with .).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Path]
|
List of matching file paths. |
Logging¶
Structured logging with context.
Provides a configured logger with support for structured output, context propagation, and proper formatting.
StackLogger
¶
StackLogger(
name: str = "stack",
level: str = "INFO",
*,
use_structured: bool = False,
)
Enhanced logger with context support.
Provides a wrapper around standard logging with additional features like context propagation and structured output support.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Logger name.
|
level |
Current log level.
|
Initialize the logger.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logger name.
TYPE:
|
level
|
Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
TYPE:
|
use_structured
|
Use structured logging if structlog is available.
TYPE:
|
bind
¶
bind(**context: Any) -> StackLogger
Add context to logger.
| PARAMETER | DESCRIPTION |
|---|---|
**context
|
Key-value pairs to add to context.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StackLogger
|
Self for chaining. |
unbind
¶
unbind(*keys: str) -> StackLogger
Remove context keys.
| PARAMETER | DESCRIPTION |
|---|---|
*keys
|
Keys to remove from context.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StackLogger
|
Self for chaining. |
debug
¶
debug(message: str, **kwargs: Any) -> None
Log a debug message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
info
¶
info(message: str, **kwargs: Any) -> None
Log an info message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
warning
¶
warning(message: str, **kwargs: Any) -> None
Log a warning message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
error
¶
error(message: str, **kwargs: Any) -> None
Log an error message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
critical
¶
critical(message: str, **kwargs: Any) -> None
Log a critical message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
exception
¶
exception(message: str, **kwargs: Any) -> None
Log an exception with traceback.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
mask_sensitive_data_processor
¶
mask_sensitive_data_processor(
_logger: Any,
_method: str,
event_dict: MutableMapping[str, Any],
) -> MutableMapping[str, Any]
Mask sensitive data in structlog event dictionaries.
Intercept the event_dict produced by structlog and replace the
value of any key whose name contains a sensitive substring with
"***REDACTED***". Matching is case-insensitive.
| PARAMETER | DESCRIPTION |
|---|---|
_logger
|
The wrapped logger object (unused, required by structlog).
TYPE:
|
_method
|
The name of the log method called (unused, required by structlog).
TYPE:
|
event_dict
|
The structured event dictionary.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
MutableMapping[str, Any]
|
The event dictionary with sensitive values masked. |
correlation_id_processor
¶
correlation_id_processor(
_logger: Any,
_method: str,
event_dict: MutableMapping[str, Any],
) -> MutableMapping[str, Any]
Structlog processor to inject correlation ID into events.
| PARAMETER | DESCRIPTION |
|---|---|
_logger
|
The wrapped logger object.
TYPE:
|
_method
|
The name of the log method called.
TYPE:
|
event_dict
|
The structured event dictionary.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
MutableMapping[str, Any]
|
The event dictionary with correlation_id injected if sets. |
setup_logging
¶
setup_logging(
level: Literal[
"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
] = "INFO",
*,
format_type: Literal[
"simple", "detailed", "json"
] = "detailed",
log_file: str | None = None,
use_structured: bool = False,
) -> None
Configure the root logger.
| PARAMETER | DESCRIPTION |
|---|---|
level
|
Log level to set.
TYPE:
|
format_type
|
Output format type.
TYPE:
|
log_file
|
Optional file to log to.
TYPE:
|
use_structured
|
Use structlog if available.
TYPE:
|
get_logger
¶
get_logger(
name: str = "stack",
*,
level: str = "INFO",
use_structured: bool = False,
) -> StackLogger
Get a configured logger instance.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logger name.
TYPE:
|
level
|
Log level.
TYPE:
|
use_structured
|
Use structlog if available.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StackLogger
|
Configured StackLogger instance. |
Example
logger = get_logger("my_module") logger.bind(request_id="123").info("Processing request")
log_operation
¶
log_operation(
operation: str,
*,
logger: StackLogger | None = None,
level: str = "INFO",
expected_exceptions: tuple[type[Exception], ...]
| type[Exception] = Exception,
) -> AbstractContextManager[StackLogger]
Context manager for logging operations.
| PARAMETER | DESCRIPTION |
|---|---|
operation
|
Name of the operation.
TYPE:
|
logger
|
Logger to use (creates one if not provided).
TYPE:
|
level
|
Log level for messages.
TYPE:
|
expected_exceptions
|
Exceptions to catch and log as failures.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AbstractContextManager[StackLogger]
|
The logger instance. |
Example
with log_operation("setup") as logger: ... logger.info("Setting up environment")
Rate Limit¶
Rate limiting utilities.
Provides an in-memory token-bucket based rate limiting decorator
for both synchronous and asynchronous functions. The decorator
returns a Result type encapsulating the original return value
or a RateLimitError error.
RateLimitError
¶
RateLimitError(message: str = 'Rate limit exceeded')
Bases: Exception
Exception raised when a rate limit is exceeded.
Initialize the RateLimitError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The error message to display.Defaults to "Rate limit exceeded".
TYPE:
|
RateLimiter
¶
RateLimiter(max_calls: int, time_window: float)
Token bucket rate limiter logic.
Initialize the token bucket.
| PARAMETER | DESCRIPTION |
|---|---|
max_calls
|
The maximum number of calls allowed in the time window.
TYPE:
|
time_window
|
The time window in seconds.
TYPE:
|
consume
¶
consume() -> bool
Try to consume a single token.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if a token was consumed (allow), False otherwise (limit exceeded). |
RateLimitDecorator
¶
Bases: Protocol
Protocol for the rate limit decorator.
rate_limit
¶
rate_limit(
max_calls: int, time_window: float
) -> RateLimitDecorator
Decorate a function to apply rate limiting.
If the rate limit is exceeded, the wrapped function immediately returns
an Err(RateLimitError). Uses an in-memory token bucket strategy.
| PARAMETER | DESCRIPTION |
|---|---|
max_calls
|
Maximum function executions allowed in the defined window.
TYPE:
|
time_window
|
Time window size in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RateLimitDecorator
|
Decorated function returning a |
Example
@rate_limit(max_calls=2, time_window=1.0) ... def fetch_data() -> str: ... return "data" fetch_data() Ok('data') fetch_data() Ok('data') fetch_data() Err(RateLimitError('Rate limit exceeded'))
Serialization¶
Serialization utilities.
Provides an optimized default encoder for use with orjson.dumps.
default_encoder
¶
default_encoder(obj: object) -> dict[str, object]
Default encoder for orjson.dumps handling Result types.
Intercepts objects of type Ok and Err:
- Ok(value): Returns {"status": "success"} merged with value if
value is a dict. Otherwise returns {"status": "success", "data": value}.
- Err(error): Returns {"status": "error", "message": str(error)}.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
The object to encode.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, object]
|
A serializable representation of the object. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the object type is not supported. |
Example
import orjson orjson.dumps(Ok({"id": 1}), default=default_encoder) b'{"status":"success","id":1}' orjson.dumps(Err(ValueError("oops")), default=default_encoder) b'{"status":"error","message":"oops"}'
Subprocess¶
Safe subprocess execution with security guards.
Provides secure wrappers around subprocess execution with command validation, timeout handling, and retry logic.
SafeCommandResult
dataclass
¶
SafeCommandResult(
command: list[str],
returncode: int,
stdout: str = "",
stderr: str = "",
duration_seconds: float = 0.0,
)
Result of a safe command execution.
| ATTRIBUTE | DESCRIPTION |
|---|---|
command |
The executed command.
TYPE:
|
returncode |
Exit code of the command.
TYPE:
|
stdout |
Standard output.
TYPE:
|
stderr |
Standard error.
TYPE:
|
success |
Whether the command succeeded (returncode == 0).
TYPE:
|
duration_seconds |
How long the command took.
TYPE:
|
raise_on_error
¶
raise_on_error() -> SafeCommandResult
Raise an exception if command failed.
| RETURNS | DESCRIPTION |
|---|---|
SafeCommandResult
|
Self if successful. |
| RAISES | DESCRIPTION |
|---|---|
CalledProcessError
|
If command failed. |
run_safe_command
¶
run_safe_command(
command: Sequence[str],
*,
cwd: Path | str | None = None,
timeout: float = 300.0,
capture_output: bool = True,
check: bool = False,
allowed_commands: Sequence[str] | None = None,
env: dict[str, str] | None = None,
dry_run: bool = False,
) -> SafeCommandResult
Execute a command safely with security guards.
This function provides a secure wrapper around subprocess.run with command injection protection, timeout handling, and optional command whitelisting.
| PARAMETER | DESCRIPTION |
|---|---|
command
|
Command and arguments as a sequence.
TYPE:
|
cwd
|
Working directory for the command.
TYPE:
|
timeout
|
Maximum execution time in seconds.
TYPE:
|
capture_output
|
Whether to capture stdout/stderr.
TYPE:
|
check
|
Whether to raise on non-zero exit.
TYPE:
|
allowed_commands
|
Whitelist of allowed commands.
TYPE:
|
env
|
Environment variables to set.
TYPE:
|
dry_run
|
If True, don't actually execute the command.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SafeCommandResult
|
SafeCommandResult with execution details. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If command validation fails. |
TimeoutExpired
|
If command times out. |
CalledProcessError
|
If check=True and command fails. |
Example
result = run_safe_command(["poetry", "install"]) if result.success: ... print("Installation complete!")