Skip to content

Exceptions API 🚨

Custom exceptions thrown by flowyml.

Error Handling Module

Error handling utilities for robust pipeline execution.

Classes

CircuitBreaker(failure_threshold: int = 5, timeout: float = 60, recovery_timeout: float = 300, expected_exceptions: list[type[Exception]] | None = None)

Circuit breaker pattern implementation.

Prevents cascading failures by failing fast when a service is down.

Example
1
2
3
@step(circuit_breaker=CircuitBreaker(failure_threshold=3, timeout=60))
def call_api(url):
    return requests.get(url).json()

Initialize circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit

5
timeout float

Seconds to wait before trying again

60
recovery_timeout float

Seconds to wait before fully closing circuit

300
expected_exceptions list[type[Exception]] | None

Exceptions that trigger the breaker

None
Source code in flowyml/core/error_handling.py
def __init__(
    self,
    failure_threshold: int = 5,
    timeout: float = 60,
    recovery_timeout: float = 300,
    expected_exceptions: list[type[Exception]] | None = None,
):
    """Initialize circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        timeout: Seconds to wait before trying again
        recovery_timeout: Seconds to wait before fully closing circuit
        expected_exceptions: Exceptions that trigger the breaker
    """
    self.config = CircuitBreakerConfig(
        failure_threshold=failure_threshold,
        timeout=timeout,
        recovery_timeout=recovery_timeout,
        expected_exceptions=expected_exceptions or [Exception],
    )

    self.state = CircuitState.CLOSED
    self.failure_count = 0
    self.last_failure_time: datetime | None = None
    self.success_count = 0

Functions

call(func: Callable, *args, **kwargs) -> Any

Call function with circuit breaker protection.

Parameters:

Name Type Description Default
func Callable

Function to call

required
*args

Positional arguments

()
**kwargs

Keyword arguments

{}

Returns:

Type Description
Any

Function result

Raises:

Type Description
CircuitOpenError

If circuit is open

Source code in flowyml/core/error_handling.py
def call(self, func: Callable, *args, **kwargs) -> Any:
    """Call function with circuit breaker protection.

    Args:
        func: Function to call
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        Function result

    Raises:
        CircuitOpenError: If circuit is open
    """
    if self.state == CircuitState.OPEN:
        if self._should_attempt_reset():
            self.state = CircuitState.HALF_OPEN
        else:
            raise CircuitOpenError(
                f"Circuit breaker is open. Wait {self.config.timeout}s before retry.",
            )

    try:
        result = func(*args, **kwargs)
        self._on_success()
        return result

    except Exception as e:
        if self._is_expected_exception(e):
            self._on_failure()
        raise
reset() -> None

Reset circuit breaker to closed state.

Source code in flowyml/core/error_handling.py
def reset(self) -> None:
    """Reset circuit breaker to closed state."""
    self.state = CircuitState.CLOSED
    self.failure_count = 0
    self.success_count = 0
    self.last_failure_time = None

CircuitBreakerConfig(failure_threshold: int = 5, timeout: float = 60, recovery_timeout: float = 300, expected_exceptions: list[type[Exception]] = (lambda: [Exception])()) dataclass

Configuration for circuit breaker.

Attributes

expected_exceptions: list[type[Exception]] = field(default_factory=(lambda: [Exception])) class-attribute instance-attribute

Exceptions that trigger circuit breaker

failure_threshold: int = 5 class-attribute instance-attribute

Number of failures before opening circuit

recovery_timeout: float = 300 class-attribute instance-attribute

Time to wait before fully closing circuit (seconds)

timeout: float = 60 class-attribute instance-attribute

Time to wait before trying again (seconds)

CircuitOpenError

Bases: Exception

Exception raised when circuit breaker is open.

CircuitState

Bases: Enum

Circuit breaker states.

ExponentialBackoff(initial: float = 1.0, max_delay: float = 60.0, multiplier: float = 2.0, jitter: bool = True)

Exponential backoff retry strategy.

Example
from flowyml import step, retry, ExponentialBackoff


@step(
    retry=retry(
        max_attempts=5, backoff=ExponentialBackoff(initial=1, max=60, multiplier=2), on=[NetworkError, TimeoutError]
    )
)
def fetch_data():
    return api.get_data()

Initialize exponential backoff.

Parameters:

Name Type Description Default
initial float

Initial delay in seconds

1.0
max_delay float

Maximum delay in seconds

60.0
multiplier float

Backoff multiplier

2.0
jitter bool

Add random jitter to delays

True
Source code in flowyml/core/error_handling.py
def __init__(
    self,
    initial: float = 1.0,
    max_delay: float = 60.0,
    multiplier: float = 2.0,
    jitter: bool = True,
):
    """Initialize exponential backoff.

    Args:
        initial: Initial delay in seconds
        max_delay: Maximum delay in seconds
        multiplier: Backoff multiplier
        jitter: Add random jitter to delays
    """
    self.initial = initial
    self.max_delay = max_delay
    self.multiplier = multiplier
    self.jitter = jitter
    self.attempt = 0

Functions

get_delay() -> float

Get delay for current attempt.

Returns:

Type Description
float

Delay in seconds

Source code in flowyml/core/error_handling.py
def get_delay(self) -> float:
    """Get delay for current attempt.

    Returns:
        Delay in seconds
    """
    delay = min(self.initial * (self.multiplier**self.attempt), self.max_delay)

    if self.jitter:
        import random

        delay = delay * (0.5 + random.random())

    self.attempt += 1
    return delay
reset() -> None

Reset attempt counter.

Source code in flowyml/core/error_handling.py
def reset(self) -> None:
    """Reset attempt counter."""
    self.attempt = 0

FallbackConfig(fallback_func: Callable, fallback_on: list[type[Exception]] = (lambda: [Exception])(), max_fallback_attempts: int = 1) dataclass

Configuration for fallback handler.

Attributes

fallback_func: Callable instance-attribute

Fallback function to call on error

fallback_on: list[type[Exception]] = field(default_factory=(lambda: [Exception])) class-attribute instance-attribute

Exceptions that trigger fallback

max_fallback_attempts: int = 1 class-attribute instance-attribute

Maximum number of fallback attempts

FallbackHandler(fallback_func: Callable, fallback_on: list[type[Exception]] | None = None, max_attempts: int = 1)

Fallback handler for graceful degradation.

Example
1
2
3
@step(fallback=lambda: load_cached_data(), fallback_on=[TimeoutError])
def fetch_live_data():
    return requests.get(url).json()

Initialize fallback handler.

Parameters:

Name Type Description Default
fallback_func Callable

Function to call as fallback

required
fallback_on list[type[Exception]] | None

Exceptions that trigger fallback

None
max_attempts int

Maximum fallback attempts

1
Source code in flowyml/core/error_handling.py
def __init__(
    self,
    fallback_func: Callable,
    fallback_on: list[type[Exception]] | None = None,
    max_attempts: int = 1,
):
    """Initialize fallback handler.

    Args:
        fallback_func: Function to call as fallback
        fallback_on: Exceptions that trigger fallback
        max_attempts: Maximum fallback attempts
    """
    self.config = FallbackConfig(
        fallback_func=fallback_func,
        fallback_on=fallback_on or [Exception],
        max_fallback_attempts=max_attempts,
    )
    self.fallback_attempts = 0

Functions

call(func: Callable, *args, **kwargs) -> Any

Call function with fallback protection.

Parameters:

Name Type Description Default
func Callable

Primary function to call

required
*args

Positional arguments

()
**kwargs

Keyword arguments

{}

Returns:

Type Description
Any

Function result or fallback result

Source code in flowyml/core/error_handling.py
def call(self, func: Callable, *args, **kwargs) -> Any:
    """Call function with fallback protection.

    Args:
        func: Primary function to call
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        Function result or fallback result
    """
    try:
        return func(*args, **kwargs)

    except Exception as e:
        if self._should_fallback(e) and self.fallback_attempts < self.config.max_fallback_attempts:
            self.fallback_attempts += 1
            return self.config.fallback_func()
        raise
reset() -> None

Reset fallback attempts counter.

Source code in flowyml/core/error_handling.py
def reset(self) -> None:
    """Reset fallback attempts counter."""
    self.fallback_attempts = 0

OnFailureConfig(action: str = 'log', recipients: list[str] = list(), include_logs: bool = True, include_traceback: bool = True) dataclass

Configuration for failure handling.

Attributes

action: str = 'log' class-attribute instance-attribute

Action to take (log, email, slack, webhook)

include_logs: bool = True class-attribute instance-attribute

Include logs in notification

include_traceback: bool = True class-attribute instance-attribute

Include full traceback

recipients: list[str] = field(default_factory=list) class-attribute instance-attribute

Recipients for notifications

RetryConfig(max_attempts: int = 3, backoff: ExponentialBackoff | None = None, retry_on: list[type[Exception]] = (lambda: [Exception])(), not_retry_on: list[type[Exception]] = list()) dataclass

Configuration for retry logic.

Attributes

backoff: ExponentialBackoff | None = None class-attribute instance-attribute

Backoff strategy

max_attempts: int = 3 class-attribute instance-attribute

Maximum number of retry attempts

not_retry_on: list[type[Exception]] = field(default_factory=list) class-attribute instance-attribute

Exceptions NOT to retry on

retry_on: list[type[Exception]] = field(default_factory=(lambda: [Exception])) class-attribute instance-attribute

Exceptions to retry on

Functions

execute_with_retry(func: Callable, retry_config: RetryConfig, *args, **kwargs: Any) -> Any

Execute function with retry logic.

Parameters:

Name Type Description Default
func Callable

Function to execute

required
retry_config RetryConfig

Retry configuration

required
*args

Positional arguments

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
Any

Function result

Source code in flowyml/core/error_handling.py
def execute_with_retry(
    func: Callable,
    retry_config: RetryConfig,
    *args,
    **kwargs: Any,
) -> Any:
    """Execute function with retry logic.

    Args:
        func: Function to execute
        retry_config: Retry configuration
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        Function result

    Raises:
        Last exception if all retries fail
    """
    last_exception = None

    for attempt in range(retry_config.max_attempts):
        try:
            return func(*args, **kwargs)

        except Exception as e:
            # Don't retry if in not_retry_on list
            if any(isinstance(e, exc_type) for exc_type in retry_config.not_retry_on):
                raise

            # Only retry if in retry_on list
            if not any(isinstance(e, exc_type) for exc_type in retry_config.retry_on):
                raise

            last_exception = e

            # Don't sleep on last attempt
            if attempt < retry_config.max_attempts - 1 and retry_config.backoff:
                delay = retry_config.backoff.get_delay()
                time.sleep(delay)

    # All retries failed
    if last_exception:
        raise last_exception
    raise RuntimeError("Retry failed with no exception captured")

on_failure(action: str = 'log', recipients: list[str] | None = None, include_logs: bool = True, include_traceback: bool = True) -> OnFailureConfig

Create failure handling configuration.

Parameters:

Name Type Description Default
action str

Action to take on failure

'log'
recipients list[str] | None

Recipients for notifications

None
include_logs bool

Include logs in notification

True
include_traceback bool

Include full traceback

True

Returns:

Type Description
OnFailureConfig

OnFailureConfig instance

Source code in flowyml/core/error_handling.py
def on_failure(
    action: str = "log",
    recipients: list[str] | None = None,
    include_logs: bool = True,
    include_traceback: bool = True,
) -> OnFailureConfig:
    """Create failure handling configuration.

    Args:
        action: Action to take on failure
        recipients: Recipients for notifications
        include_logs: Include logs in notification
        include_traceback: Include full traceback

    Returns:
        OnFailureConfig instance
    """
    return OnFailureConfig(
        action=action,
        recipients=recipients or [],
        include_logs=include_logs,
        include_traceback=include_traceback,
    )

retry(max_attempts: int = 3, backoff: ExponentialBackoff | None = None, on: list[type[Exception]] | None = None, not_on: list[type[Exception]] | None = None) -> RetryConfig

Create retry configuration.

Parameters:

Name Type Description Default
max_attempts int

Maximum retry attempts

3
backoff ExponentialBackoff | None

Backoff strategy

None
on list[type[Exception]] | None

Exceptions to retry on

None
not_on list[type[Exception]] | None

Exceptions not to retry on

None

Returns:

Type Description
RetryConfig

RetryConfig instance

Source code in flowyml/core/error_handling.py
def retry(
    max_attempts: int = 3,
    backoff: ExponentialBackoff | None = None,
    on: list[type[Exception]] | None = None,
    not_on: list[type[Exception]] | None = None,
) -> RetryConfig:
    """Create retry configuration.

    Args:
        max_attempts: Maximum retry attempts
        backoff: Backoff strategy
        on: Exceptions to retry on
        not_on: Exceptions not to retry on

    Returns:
        RetryConfig instance
    """
    return RetryConfig(
        max_attempts=max_attempts,
        backoff=backoff or ExponentialBackoff(),
        retry_on=on or [Exception],
        not_retry_on=not_on or [],
    )