Skip to content

Utils API πŸ› οΈ

Helper functions.

Debug Utilities

Pipeline and step debugging tools.

Classes

PipelineDebugger(pipeline)

Debug entire pipelines.

Features: - Step-by-step execution - DAG visualization - Execution replay - Error analysis

Source code in flowyml/utils/debug.py
def __init__(self, pipeline):
    self.pipeline = pipeline
    self.execution_log = []

Functions

analyze_errors(run_id: str) -> None

Analyze errors from a failed run.

Source code in flowyml/utils/debug.py
def analyze_errors(self, run_id: str) -> None:
    """Analyze errors from a failed run."""
    # Load run metadata
    metadata = self.pipeline.metadata_store.load_run(run_id)

    if not metadata:
        return

    steps_metadata = metadata.get("steps", {})

    failed_steps = []
    for step_name, step_data in steps_metadata.items():
        if not step_data.get("success", True):
            failed_steps.append((step_name, step_data))

    if not failed_steps:
        return

    for _, step_data in failed_steps:
        if step_data.get("source_code"):
            for _ in step_data["source_code"].split("\n")[:10]:
                pass
replay_run(run_id: str, start_from: str | None = None) -> None

Replay a previous run, optionally starting from a specific step.

Source code in flowyml/utils/debug.py
def replay_run(self, run_id: str, start_from: str | None = None) -> None:
    """Replay a previous run, optionally starting from a specific step."""
    if start_from:
        pass
    _ = run_id  # Unused in placeholder
step_through() -> None

Execute pipeline step-by-step with breaks.

Source code in flowyml/utils/debug.py
def step_through(self) -> None:
    """Execute pipeline step-by-step with breaks."""
    self.pipeline.build()
    order = self.pipeline.dag.topological_sort()

    for _ in order:
        response = input("\nExecute this step? [Y/n/q]: ").lower()

        if response == "q":
            break
        if response == "n":
            continue
visualize_dag() -> None

Visualize the pipeline DAG.

Source code in flowyml/utils/debug.py
def visualize_dag(self) -> None:
    """Visualize the pipeline DAG."""
    self.pipeline.build()

StepDebugger()

Debug individual pipeline steps.

Features: - Breakpoints - Input/output inspection - Exception debugging - Step profiling

Examples:

1
2
3
4
5
6
7
>>> from flowyml import step, StepDebugger
>>> debugger = StepDebugger()
>>> @step(outputs=["processed"])
... @debugger.breakpoint()
... def process_data(data):
...     # Debugger will stop here
...     return data * 2
Source code in flowyml/utils/debug.py
def __init__(self):
    self.breakpoints = set()
    self.step_history = []
    self.enabled = True

Functions

break_at(condition: Callable | None = None)

Add a breakpoint to a step.

Parameters:

Name Type Description Default
condition Callable | None

Optional condition function. Break only if returns True.

None
Source code in flowyml/utils/debug.py
def break_at(self, condition: Callable | None = None):
    """Add a breakpoint to a step.

    Args:
        condition: Optional condition function. Break only if returns True.
    """

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not self.enabled:
                return func(*args, **kwargs)

            # Check condition
            should_break = True
            if condition:
                should_break = condition(*args, **kwargs)

            if should_break:
                while True:
                    cmd = input("\n(debug) ").strip()

                    if cmd == "c":
                        break
                    if cmd == "i":
                        pass
                    elif cmd.startswith("p "):
                        expr = cmd[2:]
                        with contextlib.suppress(Exception):
                            # Evaluate in context
                            result = eval(expr, {"args": args, "kwargs": kwargs})
                    elif cmd == "pdb":
                        import pdb  # noqa: T100

                        pdb.set_trace()
                        break

            # Execute function
            try:
                result = func(*args, **kwargs)

                # Log execution
                self.step_history.append(
                    {
                        "step": func.__name__,
                        "inputs": {"args": args, "kwargs": kwargs},
                        "output": result,
                        "success": True,
                    },
                )

                return result
            except Exception as e:
                # Log error
                self.step_history.append(
                    {
                        "step": func.__name__,
                        "inputs": {"args": args, "kwargs": kwargs},
                        "error": str(e),
                        "success": False,
                    },
                )
                raise

        return wrapper

    return decorator
clear_history() -> None

Clear execution history.

Source code in flowyml/utils/debug.py
def clear_history(self) -> None:
    """Clear execution history."""
    self.step_history = []
get_history()

Get step execution history.

Source code in flowyml/utils/debug.py
def get_history(self):
    """Get step execution history."""
    return self.step_history
profile()

Profile step execution time.

Source code in flowyml/utils/debug.py
def profile(self):
    """Profile step execution time."""

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            import time

            start = time.time()
            result = func(*args, **kwargs)
            time.time() - start

            return result

        return wrapper

    return decorator
trace()

Enable step tracing (print inputs/outputs).

Source code in flowyml/utils/debug.py
def trace(self):
    """Enable step tracing (print inputs/outputs)."""

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not self.enabled:
                return func(*args, **kwargs)

            result = func(*args, **kwargs)

            return result

        return wrapper

    return decorator

Functions

debug_step(*args, **kwargs)

Convenience function to debug a step.

Source code in flowyml/utils/debug.py
def debug_step(*args, **kwargs):
    """Convenience function to debug a step."""
    return _global_debugger.break_at(*args, **kwargs)

inspect_step(step) -> None

Inspect a step's metadata.

Parameters:

Name Type Description Default
step

Step to inspect

required
Source code in flowyml/utils/debug.py
def inspect_step(step) -> None:
    """Inspect a step's metadata.

    Args:
        step: Step to inspect
    """
    if step.source_code:
        pass

print_dag(pipeline) -> None

Pretty print pipeline DAG.

Source code in flowyml/utils/debug.py
def print_dag(pipeline) -> None:
    """Pretty print pipeline DAG."""
    pipeline.build()

profile_step()

Convenience function to profile a step.

Source code in flowyml/utils/debug.py
def profile_step():
    """Convenience function to profile a step."""
    return _global_debugger.profile()

trace_step()

Convenience function to trace a step.

Source code in flowyml/utils/debug.py
def trace_step():
    """Convenience function to trace a step."""
    return _global_debugger.trace()

Validation Utilities

Pydantic schemas and validation utilities for flowyml.

Classes

CacheStrategy

Bases: str, Enum

Cache strategy options.

ContextConfig

Bases: BaseModel

Configuration for pipeline context.

Functions

validate_device(v: str | None) -> str | None classmethod

Validate device string.

Source code in flowyml/utils/validation.py
@field_validator("device")
@classmethod
def validate_device(cls, v: str | None) -> str | None:
    """Validate device string."""
    if v is None:
        return v

    valid_devices = ["cpu", "cuda", "mps", "tpu"]
    if v.lower() not in valid_devices and not v.startswith("cuda:"):
        raise ValueError(
            f"Invalid device. Must be one of: {', '.join(valid_devices)} or 'cuda:N'",
        )

    return v.lower()

DatasetSchema

Bases: BaseModel

Schema for dataset validation.

ExperimentConfig

Bases: BaseModel

Configuration for experiments.

MetricsSchema

Bases: BaseModel

Schema for metrics validation.

ModelSchema

Bases: BaseModel

Schema for model validation.

PipelineConfig

Bases: BaseModel

Configuration for pipelines.

ResourceRequirements

Bases: BaseModel

Resource requirements for step execution.

Functions

validate_size_format(v: str | None) -> str | None classmethod

Validate memory/disk size format.

Source code in flowyml/utils/validation.py
@field_validator("memory", "disk")
@classmethod
def validate_size_format(cls, v: str | None) -> str | None:
    """Validate memory/disk size format."""
    if v is None:
        return v

    valid_units = ["B", "KB", "MB", "GB", "TB"]
    v_upper = v.upper()

    for unit in valid_units:
        if v_upper.endswith(unit):
            try:
                size_val = v_upper[: -len(unit)]
                float(size_val)
                return v
            except ValueError:
                raise ValueError(
                    f"Invalid size format: {v}. Expected format: <number><unit> (e.g., '4GB')",
                )

    raise ValueError(f"Invalid size unit. Must be one of: {', '.join(valid_units)}")

RetryConfig

Bases: BaseModel

Retry configuration for step execution.

Functions

max_delay_greater_than_initial(v: float, info) -> float classmethod

Validate max_delay is greater than initial_delay.

Source code in flowyml/utils/validation.py
@field_validator("max_delay")
@classmethod
def max_delay_greater_than_initial(cls, v: float, info) -> float:
    """Validate max_delay is greater than initial_delay."""
    if "initial_delay" in info.data and v < info.data["initial_delay"]:
        raise ValueError("max_delay must be greater than or equal to initial_delay")
    return v

StackConfig

Bases: BaseModel

Configuration for execution stacks.

StepConfig

Bases: BaseModel

Configuration for pipeline steps.

Functions

validate_context_config(config: dict[str, Any]) -> ContextConfig

Validate context configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Context configuration dictionary

required

Returns:

Type Description
ContextConfig

Validated ContextConfig instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_context_config(config: dict[str, Any]) -> ContextConfig:
    """Validate context configuration.

    Args:
        config: Context configuration dictionary

    Returns:
        Validated ContextConfig instance

    Raises:
        ValidationError: If validation fails
    """
    return ContextConfig(**config)

validate_metrics(metrics: dict[str, Any]) -> MetricsSchema

Validate metrics.

Parameters:

Name Type Description Default
metrics dict[str, Any]

Metrics dictionary

required

Returns:

Type Description
MetricsSchema

Validated MetricsSchema instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_metrics(metrics: dict[str, Any]) -> MetricsSchema:
    """Validate metrics.

    Args:
        metrics: Metrics dictionary

    Returns:
        Validated MetricsSchema instance

    Raises:
        ValidationError: If validation fails
    """
    return MetricsSchema(**metrics)

validate_pipeline_config(config: dict[str, Any]) -> PipelineConfig

Validate pipeline configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Pipeline configuration dictionary

required

Returns:

Type Description
PipelineConfig

Validated PipelineConfig instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_pipeline_config(config: dict[str, Any]) -> PipelineConfig:
    """Validate pipeline configuration.

    Args:
        config: Pipeline configuration dictionary

    Returns:
        Validated PipelineConfig instance

    Raises:
        ValidationError: If validation fails
    """
    return PipelineConfig(**config)

validate_step_config(config: dict[str, Any]) -> StepConfig

Validate step configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Step configuration dictionary

required

Returns:

Type Description
StepConfig

Validated StepConfig instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_step_config(config: dict[str, Any]) -> StepConfig:
    """Validate step configuration.

    Args:
        config: Step configuration dictionary

    Returns:
        Validated StepConfig instance

    Raises:
        ValidationError: If validation fails
    """
    return StepConfig(**config)