Utilities — Retry, Circuit Breaker¶
TaipanStack provides resilience and observability utilities for production applications.
Retry¶
Backward-compatibility shim for the retry module.
This module re-exports all public symbols from the canonical
taipanstack.resilience.retry module. Importing from this path remains
supported for compatibility, but new code should use the canonical module.
.. deprecated::
Import from taipanstack.resilience.retry instead.
Retrier
¶
Retrier(
*,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
on: tuple[type[Exception], ...] = (Exception,),
)
Context manager for retry logic.
Provides a context manager interface for retry logic when decorators are not suitable.
Example
retrier = Retrier(max_attempts=3, on=(ConnectionError,)) with retrier: ... result = some_operation()
Initialize Retrier.
| PARAMETER | DESCRIPTION |
|---|---|
max_attempts
|
Maximum retry attempts.
TYPE:
|
initial_delay
|
Initial delay between retries.
TYPE:
|
max_delay
|
Maximum delay between retries.
TYPE:
|
on
|
Exception types to retry on.
TYPE:
|
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> bool
Exit the retry context.
Returns True to suppress the exception if we should retry, False to let it propagate.
RetryConfig
dataclass
¶
RetryConfig(
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
jitter_factor: float = 0.1,
log_retries: bool = True,
on_retry: Callable[[int, int, Exception, float], None]
| None = None,
)
Configuration for retry behavior.
| ATTRIBUTE | DESCRIPTION |
|---|---|
max_attempts |
Maximum number of retry attempts.
TYPE:
|
initial_delay |
Initial delay between retries in seconds.
TYPE:
|
max_delay |
Maximum delay between retries.
TYPE:
|
exponential_base |
Base for exponential backoff (2 = double each time).
TYPE:
|
jitter |
Whether to add random jitter to delays.
TYPE:
|
jitter_factor |
Maximum jitter as fraction of delay (0.1 = 10%).
TYPE:
|
log_retries |
Whether to emit standard log messages.
TYPE:
|
on_retry |
Optional callback invoked on each retry.
TYPE:
|
RetryDecorator
¶
Bases: Protocol
Protocol for the retry decorator.
RetryError
¶
RetryError(
message: str,
attempts: int,
last_exception: Exception | None = None,
)
Bases: Exception
Raised when all retry attempts have failed.
Initialize RetryError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Description of the retry failure.
TYPE:
|
attempts
|
Number of attempts made.
TYPE:
|
last_exception
|
The last exception that was raised.
TYPE:
|
calculate_delay
¶
calculate_delay(attempt: int, config: RetryConfig) -> float
Calculate delay before next retry.
| PARAMETER | DESCRIPTION |
|---|---|
attempt
|
Current attempt number (1-indexed).
TYPE:
|
config
|
Retry configuration.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
float
|
Delay in seconds before next retry. |
retry
¶
retry(
*,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
on: tuple[type[Exception], ...] = (Exception,),
reraise: bool = True,
log_retries: bool = True,
on_retry: Callable[[int, int, Exception, float], None]
| None = None,
) -> RetryDecorator
Retry a sync or async function with exponential backoff.
Automatically retries the decorated function when specified exceptions are raised, with configurable backoff strategy. Detects coroutine functions and preserves their async nature.
| PARAMETER | DESCRIPTION |
|---|---|
max_attempts
|
Maximum number of retry attempts.
TYPE:
|
initial_delay
|
Initial delay between retries in seconds.
TYPE:
|
max_delay
|
Maximum delay between retries.
TYPE:
|
exponential_base
|
Base for exponential backoff.
TYPE:
|
jitter
|
Whether to add random jitter to delays.
TYPE:
|
on
|
Exception types to retry on.
TYPE:
|
reraise
|
Whether to reraise the last exception on failure.
TYPE:
|
log_retries
|
Whether to log retry attempts.
TYPE:
|
on_retry
|
Optional callback invoked on each retry with (attempt, max_attempts, exception, delay). Useful for custom monitoring or metrics collection.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RetryDecorator
|
Decorated function with retry logic. |
Example
@retry(max_attempts=3, on=(ConnectionError, TimeoutError)) ... def fetch_data(url: str) -> dict: ... return requests.get(url, timeout=10).json()
@retry(max_attempts=3, on_retry=lambda a, m, e, d: print(f"Retry {a}/{m}")) ... def fragile_operation() -> str: ... return do_something()
retry_on_exception
¶
retry_on_exception(
exception_types: tuple[type[Exception], ...],
max_attempts: int = 3,
) -> RetryDecorator
Retry on specific exceptions.
A simpler alternative to the full retry decorator when you just need basic retry functionality.
| PARAMETER | DESCRIPTION |
|---|---|
exception_types
|
Exception types to retry on.
TYPE:
|
max_attempts
|
Maximum number of attempts.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RetryDecorator
|
Decorated function with retry logic. |
Example
@retry_on_exception((ValueError,), max_attempts=2) ... def parse_data(data: str) -> dict: ... return json.loads(data)
Circuit Breaker¶
Backward-compatibility shim for the circuit breaker module.
This module re-exports all symbols from the canonical
taipanstack.resilience.circuit_breaker module. Import from this
path continues to work, but new code should import from the canonical
location directly.
.. deprecated::
Import from taipanstack.resilience.circuit_breaker instead.
CircuitBreaker
¶
CircuitBreaker(
*,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
name: str = "default",
on_state_change: Callable[
[CircuitState, CircuitState], None
]
| None = None,
)
Circuit breaker implementation.
Monitors function calls and opens the circuit when too many failures occur, preventing further calls until the service recovers. Supports both sync and async functions.
Example
breaker = CircuitBreaker(failure_threshold=3) @breaker ... def call_external_api(): ... return requests.get("https://api.example.com", timeout=10)
Initialize CircuitBreaker.
| PARAMETER | DESCRIPTION |
|---|---|
failure_threshold
|
Failures before opening circuit.
TYPE:
|
success_threshold
|
Successes to close from half-open.
TYPE:
|
timeout
|
Seconds before attempting half-open.
TYPE:
|
excluded_exceptions
|
Exceptions that don't trip circuit.
TYPE:
|
failure_exceptions
|
Exceptions that count as failures.
TYPE:
|
name
|
Name for logging/identification.
TYPE:
|
on_state_change
|
Optional callback invoked on state transitions with (old_state, new_state). Useful for custom monitoring.
TYPE:
|
__call__
¶
__call__(
func: Callable[P, R] | Callable[P, Awaitable[R]],
) -> Callable[P, R] | Callable[P, Awaitable[R]]
Decorate a sync or async function with circuit breaker protection.
CircuitBreakerConfig
dataclass
¶
CircuitBreakerConfig(
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
)
Configuration for circuit breaker behavior.
| ATTRIBUTE | DESCRIPTION |
|---|---|
failure_threshold |
Number of failures before opening circuit.
TYPE:
|
success_threshold |
Successes needed in half-open to close.
TYPE:
|
timeout |
Seconds before trying half-open after open.
TYPE:
|
excluded_exceptions |
Exceptions that don't count as failures.
TYPE:
|
failure_exceptions |
Exceptions that count as failures.
TYPE:
|
CircuitBreakerDecorator
¶
Bases: Protocol
Protocol for the circuit breaker decorator.
CircuitBreakerError
¶
CircuitBreakerError(message: str, state: CircuitState)
Bases: Exception
Raised when circuit breaker is open.
Initialize CircuitBreakerError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Error description.
TYPE:
|
state
|
Current circuit state.
TYPE:
|
CircuitBreakerState
dataclass
¶
CircuitBreakerState(
state: CircuitState = CLOSED,
failure_count: int = 0,
success_count: int = 0,
half_open_attempts: int = 0,
last_failure_time: float = 0.0,
lock: Lock = Lock(),
)
Internal state tracking for circuit breaker.
CircuitState
¶
Bases: Enum
States of the circuit breaker.
circuit_breaker
¶
circuit_breaker(
*,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
excluded_exceptions: tuple[type[Exception], ...] = (),
failure_exceptions: tuple[type[Exception], ...] = (
Exception,
),
name: str | None = None,
on_state_change: Callable[
[CircuitState, CircuitState], None
]
| None = None,
) -> CircuitBreakerDecorator
Decorate a sync or async function with circuit breaker pattern.
| PARAMETER | DESCRIPTION |
|---|---|
failure_threshold
|
Failures before opening circuit.
TYPE:
|
success_threshold
|
Successes to close from half-open.
TYPE:
|
timeout
|
Seconds before attempting half-open.
TYPE:
|
excluded_exceptions
|
Exceptions that don't trip circuit.
TYPE:
|
failure_exceptions
|
Exceptions that count as failures.
TYPE:
|
name
|
Optional name for the circuit.
TYPE:
|
on_state_change
|
Optional callback invoked on state transitions with (old_state, new_state).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
CircuitBreakerDecorator
|
Decorated function with circuit breaker protection. |
Example
@circuit_breaker(failure_threshold=3, timeout=60) ... def call_api(endpoint: str) -> dict: ... return requests.get(endpoint, timeout=10).json()
@circuit_breaker( ... failure_threshold=3, ... on_state_change=lambda old, new: print(f"{old} -> {new}"), ... ) ... def monitored_call() -> str: ... return service.call()
Resilience¶
Backward-compatibility shim for resilience decorators.
This module re-exports the public decorators and helper types from the canonical
taipanstack.resilience.resilience module. Importing from this path remains
supported for compatibility, but new code should use the canonical module.
.. deprecated::
Import from taipanstack.resilience.resilience instead.
FallbackDecorator
¶
Bases: Protocol
Protocol for the fallback decorator.
TimeoutDecorator
¶
Bases: Protocol
Protocol for the timeout decorator.
fallback
¶
fallback(
fallback_value: T,
exceptions: tuple[type[Exception], ...] = (Exception,),
) -> FallbackDecorator
Provide a fallback value on failures.
If the wrapped function returns an Err() or raises a specified exception, the fallback value is returned wrapped in an Ok().
| PARAMETER | DESCRIPTION |
|---|---|
fallback_value
|
The value to return on failure.
TYPE:
|
exceptions
|
Exceptions to catch.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
FallbackDecorator
|
Decorator function. |
timeout
¶
timeout(seconds: float) -> TimeoutDecorator
Enforce a maximum execution time.
If the execution time exceeds the specified limit, returns Err(TimeoutError).
| PARAMETER | DESCRIPTION |
|---|---|
seconds
|
Maximum allowed execution time in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TimeoutDecorator
|
Decorator function. |
Cache¶
Intelligent Cache decorator.
Provides in-memory caching that respects the Result monad and TTL, ignoring caching for Err() results.
CacheDecorator
¶
Bases: Protocol
Protocol for the cache decorator.
cached
¶
cached(ttl: float, max_size: int = 1024) -> CacheDecorator
Cache the Ok() results of a function for a given TTL.
Err() results are not cached. Supports both async and sync functions. Implements LRU (Least Recently Used) eviction when max_size is reached.
| PARAMETER | DESCRIPTION |
|---|---|
ttl
|
Time to live in seconds.
TYPE:
|
max_size
|
Maximum number of elements to store in the cache.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
CacheDecorator
|
Decorator function. |
Context¶
Observability Context Module.
Provides context variables and context managers for tracing and observability, such as the correlation ID.
get_correlation_id
¶
get_correlation_id() -> str | None
Get the current correlation ID.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
The correlation ID if set, otherwise None. |
set_correlation_id
¶
set_correlation_id(correlation_id: str | None) -> None
Set the correlation ID for the current context.
| PARAMETER | DESCRIPTION |
|---|---|
correlation_id
|
The correlation ID string, or None to clear.
TYPE:
|
correlation_scope
¶
correlation_scope(
correlation_id: str | None,
) -> Iterator[None]
Context manager to set the correlation ID and restore it after.
| PARAMETER | DESCRIPTION |
|---|---|
correlation_id
|
The correlation ID to set for the duration of the scope.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
None
|
None |
Concurrency¶
Concurrency utilities.
Provides a bulkhead pattern concurrency limiter decorator for both
synchronous and asynchronous functions. Uses an OverloadError and
returns a Result type.
OverloadError
¶
OverloadError(message: str = 'Concurrency limit reached')
Bases: Exception
Exception raised when a concurrency limit is exceeded or timed out.
Initialize the OverloadError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The error message to display. Defaults to "Concurrency limit reached".
TYPE:
|
ConcurrencyLimitDecorator
¶
Bases: Protocol
Protocol for the concurrency limit decorator.
limit_concurrency
¶
limit_concurrency(
max_tasks: int, timeout: float = 0.0
) -> ConcurrencyLimitDecorator
Decorate a function to apply the bulkhead concurrency limit pattern.
If the maximum concurrent executions are reached, the wrapper will wait up
to timeout seconds to acquire a execution slot. If it fails, it returns
an Err(OverloadError).
| PARAMETER | DESCRIPTION |
|---|---|
max_tasks
|
Maximum concurrent function executions allowed.
TYPE:
|
timeout
|
Maximum time in seconds to wait for a slot if limit is reached.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ConcurrencyLimitDecorator
|
Decorated function returning a |
Example
@limit_concurrency(max_tasks=2, timeout=0.1) ... def process_data() -> str: ... return "data" process_data() Ok('data')
Filesystem¶
Safe filesystem operations.
Provides secure wrappers around file operations with path validation, atomic writes, and proper error handling using Result types.
FileTooLargeErr
dataclass
¶
FileTooLargeErr(path: Path, size: int, max_size: int)
Error when file exceeds size limit.
WriteOptions
dataclass
¶
WriteOptions(
base_dir: Path | str | None = None,
encoding: str = "utf-8",
create_parents: bool = True,
backup: bool = True,
atomic: bool = True,
)
Options for safe_write.
| ATTRIBUTE | DESCRIPTION |
|---|---|
base_dir |
Base directory to constrain to.
TYPE:
|
encoding |
File encoding.
TYPE:
|
create_parents |
Create parent directories if needed.
TYPE:
|
backup |
Create backup of existing file.
TYPE:
|
atomic |
Use atomic write.
TYPE:
|
safe_read
¶
safe_read(
path: Path | str,
*,
base_dir: Path | str | None = None,
encoding: str = "utf-8",
max_size_bytes: int | None = 10 * 1024 * 1024,
) -> Result[str, ReadFileError]
Read a file safely with path validation.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the file to read.
TYPE:
|
base_dir
|
Base directory to constrain to.
TYPE:
|
encoding
|
File encoding.
TYPE:
|
max_size_bytes
|
Maximum file size to read (None for no limit).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Ok
|
File contents on success.
TYPE:
|
Err
|
Error details on failure.
TYPE:
|
Example
match safe_read("config.json"): ... case Ok(content): ... data = json.loads(content) ... case Err(FileNotFoundErr(path=p)): ... print(f"Missing: {p}") ... case Err(FileTooLargeErr(size=s)): ... print(f"Too big: {s} bytes")
safe_write
¶
safe_write(
path: Path | str,
content: str,
*,
options: WriteOptions | None = None,
) -> Path
Write to a file safely with path validation.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to write to.
TYPE:
|
content
|
Content to write.
TYPE:
|
options
|
Write options.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
Path to the written file. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If path validation fails. |
ensure_dir
¶
ensure_dir(
path: Path | str,
*,
base_dir: Path | str | None = None,
mode: int = 493,
) -> Path
Ensure a directory exists, creating it if needed.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the directory.
TYPE:
|
base_dir
|
Base directory to constrain to.
TYPE:
|
mode
|
Directory permissions.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
Path to the directory. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If path validation fails. |
FileExistsError
|
If a file already exists at the given path or intermediate paths. |
Logging¶
Structured logging with context.
Provides a configured logger with support for structured output, context propagation, and proper formatting.
StackLogger
¶
StackLogger(
name: str = "stack",
level: str = "INFO",
*,
use_structured: bool = False,
)
Enhanced logger with context support.
Provides a wrapper around standard logging with additional features like context propagation and structured output support.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Logger name.
|
level |
Current log level.
|
Initialize the logger.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logger name.
TYPE:
|
level
|
Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
TYPE:
|
use_structured
|
Use structured logging if structlog is available.
TYPE:
|
bind
¶
bind(**context: object) -> StackLogger
Add context to logger.
| PARAMETER | DESCRIPTION |
|---|---|
**context
|
Key-value pairs to add to context.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StackLogger
|
Self for chaining. |
unbind
¶
unbind(*keys: str) -> StackLogger
Remove context keys.
| PARAMETER | DESCRIPTION |
|---|---|
*keys
|
Keys to remove from context.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StackLogger
|
Self for chaining. |
debug
¶
debug(message: str, **kwargs: object) -> None
Log a debug message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
info
¶
info(message: str, **kwargs: object) -> None
Log an info message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
warning
¶
warning(message: str, **kwargs: object) -> None
Log a warning message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
error
¶
error(message: str, **kwargs: object) -> None
Log an error message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
critical
¶
critical(message: str, **kwargs: object) -> None
Log a critical message.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
exception
¶
exception(message: str, **kwargs: object) -> None
Log an exception with traceback.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The message to log.
TYPE:
|
**kwargs
|
Additional context.
TYPE:
|
mask_sensitive_data_processor
¶
mask_sensitive_data_processor(
_logger: object,
_method: str,
event_dict: MutableMapping[str, object],
) -> MutableMapping[str, object]
Mask sensitive data in structlog event dictionaries.
Intercept the event_dict produced by structlog and replace the
value of any key whose name contains a sensitive substring with
"***REDACTED***". Matching is case-insensitive.
| PARAMETER | DESCRIPTION |
|---|---|
_logger
|
The wrapped logger object (unused, required by structlog).
TYPE:
|
_method
|
The name of the log method called (unused, required by structlog).
TYPE:
|
event_dict
|
The structured event dictionary.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
MutableMapping[str, object]
|
The event dictionary with sensitive values masked. |
correlation_id_processor
¶
correlation_id_processor(
_logger: object,
_method: str,
event_dict: MutableMapping[str, object],
) -> MutableMapping[str, object]
Structlog processor to inject correlation ID into events.
| PARAMETER | DESCRIPTION |
|---|---|
_logger
|
The wrapped logger object.
TYPE:
|
_method
|
The name of the log method called.
TYPE:
|
event_dict
|
The structured event dictionary.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
MutableMapping[str, object]
|
The event dictionary with correlation_id injected if sets. |
setup_logging
¶
setup_logging(
level: Literal[
"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
] = "INFO",
*,
format_type: Literal[
"simple", "detailed", "json"
] = "detailed",
log_file: str | None = None,
use_structured: bool = False,
) -> None
Configure the root logger.
| PARAMETER | DESCRIPTION |
|---|---|
level
|
Log level to set.
TYPE:
|
format_type
|
Output format type.
TYPE:
|
log_file
|
Optional file to log to.
TYPE:
|
use_structured
|
Use structlog if available.
TYPE:
|
get_logger
¶
get_logger(
name: str = "stack",
*,
level: str = "INFO",
use_structured: bool = False,
) -> StackLogger
Get a configured logger instance.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logger name.
TYPE:
|
level
|
Log level.
TYPE:
|
use_structured
|
Use structlog if available.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StackLogger
|
Configured StackLogger instance. |
Example
logger = get_logger("my_module") logger.bind(request_id="123").info("Processing request")
log_operation
¶
log_operation(
operation: str,
*,
logger: StackLogger | None = None,
level: str = "INFO",
expected_exceptions: tuple[type[Exception], ...]
| type[Exception] = Exception,
) -> AbstractContextManager[StackLogger]
Context manager for logging operations.
| PARAMETER | DESCRIPTION |
|---|---|
operation
|
Name of the operation.
TYPE:
|
logger
|
Logger to use (creates one if not provided).
TYPE:
|
level
|
Log level for messages.
TYPE:
|
expected_exceptions
|
Exceptions to catch and log as failures.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AbstractContextManager[StackLogger]
|
The logger instance. |
Example
with log_operation("setup") as logger: ... logger.info("Setting up environment")
Rate Limit¶
Rate limiting utilities.
Provides an in-memory token-bucket based rate limiting decorator
for both synchronous and asynchronous functions. The decorator
returns a Result type encapsulating the original return value
or a RateLimitError error.
RateLimitError
¶
RateLimitError(message: str = 'Rate limit exceeded')
Bases: Exception
Exception raised when a rate limit is exceeded.
Initialize the RateLimitError.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The error message to display.Defaults to "Rate limit exceeded".
TYPE:
|
RateLimiter
¶
RateLimiter(max_calls: int, time_window: float)
Token bucket rate limiter logic.
Initialize the token bucket.
| PARAMETER | DESCRIPTION |
|---|---|
max_calls
|
The maximum number of calls allowed in the time window.
TYPE:
|
time_window
|
The time window in seconds.
TYPE:
|
consume
¶
consume(tokens: float = 1.0) -> bool
Try to consume tokens.
| PARAMETER | DESCRIPTION |
|---|---|
tokens
|
Number of tokens to consume. Defaults to 1.0.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if tokens were consumed (allow), False otherwise (limit exceeded). |
RateLimitDecorator
¶
Bases: Protocol
Protocol for the rate limit decorator.
rate_limit
¶
rate_limit(
max_calls: int, time_window: float
) -> RateLimitDecorator
Decorate a function to apply rate limiting.
If the rate limit is exceeded, the wrapped function immediately returns
an Err(RateLimitError). Uses an in-memory token bucket strategy.
| PARAMETER | DESCRIPTION |
|---|---|
max_calls
|
Maximum function executions allowed in the defined window.
TYPE:
|
time_window
|
Time window size in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RateLimitDecorator
|
Decorated function returning a |
Example
@rate_limit(max_calls=2, time_window=1.0) ... def fetch_data() -> str: ... return "data" fetch_data() Ok('data') fetch_data() Ok('data') fetch_data() Err(RateLimitError('Rate limit exceeded'))
Serialization¶
Serialization utilities.
Provides an optimized default encoder for use with orjson.dumps.
default_encoder
¶
default_encoder(obj: object) -> dict[str, object]
Default encoder for orjson.dumps handling Result types.
Intercepts objects of type Ok and Err:
- Ok(value): Returns {"status": "success"} merged with value if
value is a dict. Otherwise returns {"status": "success", "data": value}.
- Err(error): Returns {"status": "error", "message": str(error)}.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
The object to encode.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, object]
|
A serializable representation of the object. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the object type is not supported. |
Example
import orjson orjson.dumps(Ok({"id": 1}), default=default_encoder) b'{"status":"success","id":1}' orjson.dumps(Err(ValueError("oops")), default=default_encoder) b'{"status":"error","message":"oops"}'
Subprocess¶
Safe subprocess execution with security guards.
Provides secure wrappers around subprocess execution with command validation, timeout handling, and retry logic.
SafeCommandResult
dataclass
¶
SafeCommandResult(
command: list[str],
returncode: int,
stdout: str = "",
stderr: str = "",
duration_seconds: float = 0.0,
)
Result of a safe command execution.
| ATTRIBUTE | DESCRIPTION |
|---|---|
command |
The executed command.
TYPE:
|
returncode |
Exit code of the command.
TYPE:
|
stdout |
Standard output.
TYPE:
|
stderr |
Standard error.
TYPE:
|
success |
Whether the command succeeded (returncode == 0).
TYPE:
|
duration_seconds |
How long the command took.
TYPE:
|
raise_on_error
¶
raise_on_error() -> SafeCommandResult
Raise an exception if command failed.
| RETURNS | DESCRIPTION |
|---|---|
SafeCommandResult
|
Self if successful. |
| RAISES | DESCRIPTION |
|---|---|
CalledProcessError
|
If command failed. |
run_safe_command
¶
run_safe_command(
command: Sequence[str],
*,
cwd: Path | str | None = None,
timeout: float = 300.0,
capture_output: bool = True,
check: bool = False,
allowed_commands: Sequence[str] | None = None,
env: dict[str, str] | None = None,
allowed_env_vars: Sequence[str] | None = None,
dry_run: bool = False,
) -> SafeCommandResult
Execute a command safely with security guards.
This function provides a secure wrapper around subprocess.run with command injection protection, timeout handling, and optional command whitelisting.
| PARAMETER | DESCRIPTION |
|---|---|
command
|
Command and arguments as a sequence.
TYPE:
|
cwd
|
Working directory for the command.
TYPE:
|
timeout
|
Maximum execution time in seconds.
TYPE:
|
capture_output
|
Whether to capture stdout/stderr.
TYPE:
|
check
|
Whether to raise on non-zero exit.
TYPE:
|
allowed_commands
|
Whitelist of allowed commands.
TYPE:
|
env
|
Environment variables to set.
TYPE:
|
allowed_env_vars
|
Whitelist of allowed environment variables.
TYPE:
|
dry_run
|
If True, don't actually execute the command.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SafeCommandResult
|
SafeCommandResult with execution details. |
| RAISES | DESCRIPTION |
|---|---|
SecurityError
|
If command validation fails. |
TimeoutExpired
|
If command times out. |
CalledProcessError
|
If check=True and command fails. |
Example
result = run_safe_command(["poetry", "install"]) if result.success: ... print("Installation complete!")