Skip to content

Exceptions API 🚨

FlowyML defines a structured exception hierarchy so you can catch errors at the right level of granularity. All custom exceptions inherit from a common base class, making it easy to write broad except clauses for framework errors while still allowing fine-grained handling of specific failure modes such as missing artifacts, invalid configurations, or orchestrator timeouts.

Custom exceptions thrown by flowyml.

Error Handling Module

Error handling utilities for robust pipeline execution.

Classes

CircuitBreaker(failure_threshold: int = 5, timeout: float = 60, recovery_timeout: float = 300, expected_exceptions: list[type[Exception]] | None = None)

Circuit breaker pattern implementation.

Prevents cascading failures by failing fast when a service is down.

Example
@step(circuit_breaker=CircuitBreaker(failure_threshold=3, timeout=60))
def call_api(url):
    return requests.get(url).json()

Initialize circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit

5
timeout float

Seconds to wait before trying again

60
recovery_timeout float

Seconds to wait before fully closing circuit

300
expected_exceptions list[type[Exception]] | None

Exceptions that trigger the breaker

None
Source code in flowyml/core/error_handling.py
def __init__(
    self,
    failure_threshold: int = 5,
    timeout: float = 60,
    recovery_timeout: float = 300,
    expected_exceptions: list[type[Exception]] | None = None,
):
    """Initialize circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        timeout: Seconds to wait before trying again
        recovery_timeout: Seconds to wait before fully closing circuit
        expected_exceptions: Exceptions that trigger the breaker
    """
    self.config = CircuitBreakerConfig(
        failure_threshold=failure_threshold,
        timeout=timeout,
        recovery_timeout=recovery_timeout,
        expected_exceptions=expected_exceptions or [Exception],
    )

    self.state = CircuitState.CLOSED
    self.failure_count = 0
    self.last_failure_time: datetime | None = None
    self.success_count = 0

Functions

call(func: Callable, *args, **kwargs) -> Any

Call function with circuit breaker protection.

Parameters:

Name Type Description Default
func Callable

Function to call

required
*args

Positional arguments

()
**kwargs

Keyword arguments

{}

Returns:

Type Description
Any

Function result

Raises:

Type Description
CircuitOpenError

If circuit is open

Source code in flowyml/core/error_handling.py
def call(self, func: Callable, *args, **kwargs) -> Any:
    """Call function with circuit breaker protection.

    Args:
        func: Function to call
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        Function result

    Raises:
        CircuitOpenError: If circuit is open
    """
    if self.state == CircuitState.OPEN:
        if self._should_attempt_reset():
            self.state = CircuitState.HALF_OPEN
        else:
            raise CircuitOpenError(
                f"Circuit breaker is open. Wait {self.config.timeout}s before retry.",
            )

    try:
        result = func(*args, **kwargs)
        self._on_success()
        return result

    except Exception as e:
        if self._is_expected_exception(e):
            self._on_failure()
        raise
reset() -> None

Reset circuit breaker to closed state.

Source code in flowyml/core/error_handling.py
def reset(self) -> None:
    """Reset circuit breaker to closed state."""
    self.state = CircuitState.CLOSED
    self.failure_count = 0
    self.success_count = 0
    self.last_failure_time = None

CircuitBreakerConfig(failure_threshold: int = 5, timeout: float = 60, recovery_timeout: float = 300, expected_exceptions: list[type[Exception]] = (lambda: [Exception])()) dataclass

Configuration for circuit breaker.

Attributes

expected_exceptions: list[type[Exception]] = field(default_factory=(lambda: [Exception])) class-attribute instance-attribute

Exceptions that trigger circuit breaker

failure_threshold: int = 5 class-attribute instance-attribute

Number of failures before opening circuit

recovery_timeout: float = 300 class-attribute instance-attribute

Time to wait before fully closing circuit (seconds)

timeout: float = 60 class-attribute instance-attribute

Time to wait before trying again (seconds)

CircuitOpenError

Bases: Exception

Exception raised when circuit breaker is open.

CircuitState

Bases: Enum

Circuit breaker states.

ExponentialBackoff(initial: float = 1.0, max_delay: float = 60.0, multiplier: float = 2.0, jitter: bool = True)

Exponential backoff retry strategy.

Example
from flowyml import step, retry, ExponentialBackoff


@step(
    retry=retry(
        max_attempts=5, backoff=ExponentialBackoff(initial=1, max=60, multiplier=2), on=[NetworkError, TimeoutError]
    )
)
def fetch_data():
    return api.get_data()

Initialize exponential backoff.

Parameters:

Name Type Description Default
initial float

Initial delay in seconds

1.0
max_delay float

Maximum delay in seconds

60.0
multiplier float

Backoff multiplier

2.0
jitter bool

Add random jitter to delays

True
Source code in flowyml/core/error_handling.py
def __init__(
    self,
    initial: float = 1.0,
    max_delay: float = 60.0,
    multiplier: float = 2.0,
    jitter: bool = True,
):
    """Initialize exponential backoff.

    Args:
        initial: Initial delay in seconds
        max_delay: Maximum delay in seconds
        multiplier: Backoff multiplier
        jitter: Add random jitter to delays
    """
    self.initial = initial
    self.max_delay = max_delay
    self.multiplier = multiplier
    self.jitter = jitter
    self.attempt = 0

Functions

get_delay() -> float

Get delay for current attempt.

Returns:

Type Description
float

Delay in seconds

Source code in flowyml/core/error_handling.py
def get_delay(self) -> float:
    """Get delay for current attempt.

    Returns:
        Delay in seconds
    """
    delay = min(self.initial * (self.multiplier**self.attempt), self.max_delay)

    if self.jitter:
        import random

        delay = delay * (0.5 + random.random())

    self.attempt += 1
    return delay
reset() -> None

Reset attempt counter.

Source code in flowyml/core/error_handling.py
def reset(self) -> None:
    """Reset attempt counter."""
    self.attempt = 0

FallbackConfig(fallback_func: Callable, fallback_on: list[type[Exception]] = (lambda: [Exception])(), max_fallback_attempts: int = 1) dataclass

Configuration for fallback handler.

Attributes

fallback_func: Callable instance-attribute

Fallback function to call on error

fallback_on: list[type[Exception]] = field(default_factory=(lambda: [Exception])) class-attribute instance-attribute

Exceptions that trigger fallback

max_fallback_attempts: int = 1 class-attribute instance-attribute

Maximum number of fallback attempts

FallbackHandler(fallback_func: Callable, fallback_on: list[type[Exception]] | None = None, max_attempts: int = 1)

Fallback handler for graceful degradation.

Example
@step(fallback=lambda: load_cached_data(), fallback_on=[TimeoutError])
def fetch_live_data():
    return requests.get(url).json()

Initialize fallback handler.

Parameters:

Name Type Description Default
fallback_func Callable

Function to call as fallback

required
fallback_on list[type[Exception]] | None

Exceptions that trigger fallback

None
max_attempts int

Maximum fallback attempts

1
Source code in flowyml/core/error_handling.py
def __init__(
    self,
    fallback_func: Callable,
    fallback_on: list[type[Exception]] | None = None,
    max_attempts: int = 1,
):
    """Initialize fallback handler.

    Args:
        fallback_func: Function to call as fallback
        fallback_on: Exceptions that trigger fallback
        max_attempts: Maximum fallback attempts
    """
    self.config = FallbackConfig(
        fallback_func=fallback_func,
        fallback_on=fallback_on or [Exception],
        max_fallback_attempts=max_attempts,
    )
    self.fallback_attempts = 0

Functions

call(func: Callable, *args, **kwargs) -> Any

Call function with fallback protection.

Parameters:

Name Type Description Default
func Callable

Primary function to call

required
*args

Positional arguments

()
**kwargs

Keyword arguments

{}

Returns:

Type Description
Any

Function result or fallback result

Source code in flowyml/core/error_handling.py
def call(self, func: Callable, *args, **kwargs) -> Any:
    """Call function with fallback protection.

    Args:
        func: Primary function to call
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        Function result or fallback result
    """
    try:
        return func(*args, **kwargs)

    except Exception as e:
        if self._should_fallback(e) and self.fallback_attempts < self.config.max_fallback_attempts:
            self.fallback_attempts += 1
            return self.config.fallback_func()
        raise
reset() -> None

Reset fallback attempts counter.

Source code in flowyml/core/error_handling.py
def reset(self) -> None:
    """Reset fallback attempts counter."""
    self.fallback_attempts = 0

OnFailureConfig(action: str = 'log', recipients: list[str] = list(), include_logs: bool = True, include_traceback: bool = True) dataclass

Configuration for failure handling.

Attributes

action: str = 'log' class-attribute instance-attribute

Action to take (log, email, slack, webhook)

include_logs: bool = True class-attribute instance-attribute

Include logs in notification

include_traceback: bool = True class-attribute instance-attribute

Include full traceback

recipients: list[str] = field(default_factory=list) class-attribute instance-attribute

Recipients for notifications

RetryConfig(max_attempts: int = 3, backoff: ExponentialBackoff | None = None, retry_on: list[type[Exception]] = (lambda: [Exception])(), not_retry_on: list[type[Exception]] = list()) dataclass

Configuration for retry logic.

Attributes

backoff: ExponentialBackoff | None = None class-attribute instance-attribute

Backoff strategy

max_attempts: int = 3 class-attribute instance-attribute

Maximum number of retry attempts

not_retry_on: list[type[Exception]] = field(default_factory=list) class-attribute instance-attribute

Exceptions NOT to retry on

retry_on: list[type[Exception]] = field(default_factory=(lambda: [Exception])) class-attribute instance-attribute

Exceptions to retry on

Functions

execute_with_retry(func: Callable, retry_config: RetryConfig, *args, **kwargs: Any) -> Any

Execute function with retry logic.

Parameters:

Name Type Description Default
func Callable

Function to execute

required
retry_config RetryConfig

Retry configuration

required
*args

Positional arguments

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
Any

Function result

Source code in flowyml/core/error_handling.py
def execute_with_retry(
    func: Callable,
    retry_config: RetryConfig,
    *args,
    **kwargs: Any,
) -> Any:
    """Execute function with retry logic.

    Args:
        func: Function to execute
        retry_config: Retry configuration
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        Function result

    Raises:
        Last exception if all retries fail
    """
    last_exception = None

    for attempt in range(retry_config.max_attempts):
        try:
            return func(*args, **kwargs)

        except Exception as e:
            # Don't retry if in not_retry_on list
            if any(isinstance(e, exc_type) for exc_type in retry_config.not_retry_on):
                raise

            # Only retry if in retry_on list
            if not any(isinstance(e, exc_type) for exc_type in retry_config.retry_on):
                raise

            last_exception = e

            # Don't sleep on last attempt
            if attempt < retry_config.max_attempts - 1 and retry_config.backoff:
                delay = retry_config.backoff.get_delay()
                time.sleep(delay)

    # All retries failed
    if last_exception:
        raise last_exception
    raise RuntimeError("Retry failed with no exception captured")

on_failure(action: str = 'log', recipients: list[str] | None = None, include_logs: bool = True, include_traceback: bool = True) -> OnFailureConfig

Create failure handling configuration.

Parameters:

Name Type Description Default
action str

Action to take on failure

'log'
recipients list[str] | None

Recipients for notifications

None
include_logs bool

Include logs in notification

True
include_traceback bool

Include full traceback

True

Returns:

Type Description
OnFailureConfig

OnFailureConfig instance

Source code in flowyml/core/error_handling.py
def on_failure(
    action: str = "log",
    recipients: list[str] | None = None,
    include_logs: bool = True,
    include_traceback: bool = True,
) -> OnFailureConfig:
    """Create failure handling configuration.

    Args:
        action: Action to take on failure
        recipients: Recipients for notifications
        include_logs: Include logs in notification
        include_traceback: Include full traceback

    Returns:
        OnFailureConfig instance
    """
    return OnFailureConfig(
        action=action,
        recipients=recipients or [],
        include_logs=include_logs,
        include_traceback=include_traceback,
    )

retry(max_attempts: int = 3, backoff: ExponentialBackoff | None = None, on: list[type[Exception]] | None = None, not_on: list[type[Exception]] | None = None) -> RetryConfig

Create retry configuration.

Parameters:

Name Type Description Default
max_attempts int

Maximum retry attempts

3
backoff ExponentialBackoff | None

Backoff strategy

None
on list[type[Exception]] | None

Exceptions to retry on

None
not_on list[type[Exception]] | None

Exceptions not to retry on

None

Returns:

Type Description
RetryConfig

RetryConfig instance

Source code in flowyml/core/error_handling.py
def retry(
    max_attempts: int = 3,
    backoff: ExponentialBackoff | None = None,
    on: list[type[Exception]] | None = None,
    not_on: list[type[Exception]] | None = None,
) -> RetryConfig:
    """Create retry configuration.

    Args:
        max_attempts: Maximum retry attempts
        backoff: Backoff strategy
        on: Exceptions to retry on
        not_on: Exceptions not to retry on

    Returns:
        RetryConfig instance
    """
    return RetryConfig(
        max_attempts=max_attempts,
        backoff=backoff or ExponentialBackoff(),
        retry_on=on or [Exception],
        not_retry_on=not_on or [],
    )

See Also