Skip to content

Utils API πŸ› οΈ

The utilities module provides cross-cutting helpers used throughout FlowyML: content hashing for cache keys, serialization/deserialization routines, structured logging setup, input-validation helpers, and lightweight debug tools. While most users won't call these directly, they are essential for plugin authors and anyone extending the framework.

Helper functions.

Debug Utilities

Pipeline and step debugging tools.

Classes

PipelineDebugger(pipeline)

Debug entire pipelines.

Features: - Step-by-step execution - DAG visualization - Execution replay - Error analysis

Source code in flowyml/utils/debug.py
def __init__(self, pipeline):
    self.pipeline = pipeline
    self.execution_log = []

Functions

analyze_errors(run_id: str) -> None

Analyze errors from a failed run.

Source code in flowyml/utils/debug.py
def analyze_errors(self, run_id: str) -> None:
    """Analyze errors from a failed run."""
    # Load run metadata
    metadata = self.pipeline.metadata_store.load_run(run_id)

    if not metadata:
        return

    steps_metadata = metadata.get("steps", {})

    failed_steps = []
    for step_name, step_data in steps_metadata.items():
        if not step_data.get("success", True):
            failed_steps.append((step_name, step_data))

    if not failed_steps:
        return

    for _, step_data in failed_steps:
        if step_data.get("source_code"):
            for _ in step_data["source_code"].split("\n")[:10]:
                pass
replay_run(run_id: str, start_from: str | None = None) -> None

Replay a previous run, optionally starting from a specific step.

Source code in flowyml/utils/debug.py
def replay_run(self, run_id: str, start_from: str | None = None) -> None:
    """Replay a previous run, optionally starting from a specific step."""
    if start_from:
        pass
    _ = run_id  # Unused in placeholder
step_through() -> None

Execute pipeline step-by-step with breaks.

Source code in flowyml/utils/debug.py
def step_through(self) -> None:
    """Execute pipeline step-by-step with breaks."""
    self.pipeline.build()
    order = self.pipeline.dag.topological_sort()

    for _ in order:
        response = input("\nExecute this step? [Y/n/q]: ").lower()

        if response == "q":
            break
        if response == "n":
            continue
visualize_dag() -> None

Visualize the pipeline DAG.

Source code in flowyml/utils/debug.py
def visualize_dag(self) -> None:
    """Visualize the pipeline DAG."""
    self.pipeline.build()

StepDebugger()

Debug individual pipeline steps.

Features: - Breakpoints - Input/output inspection - Exception debugging - Step profiling

Examples:

>>> from flowyml import step, StepDebugger
>>> debugger = StepDebugger()
>>> @step(outputs=["processed"])
... @debugger.breakpoint()
... def process_data(data):
...     # Debugger will stop here
...     return data * 2
Source code in flowyml/utils/debug.py
def __init__(self):
    self.breakpoints = set()
    self.step_history = []
    self.enabled = True

Functions

break_at(condition: Callable | None = None)

Add a breakpoint to a step.

Parameters:

Name Type Description Default
condition Callable | None

Optional condition function. Break only if returns True.

None
Source code in flowyml/utils/debug.py
def break_at(self, condition: Callable | None = None):
    """Add a breakpoint to a step.

    Args:
        condition: Optional condition function. Break only if returns True.
    """

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not self.enabled:
                return func(*args, **kwargs)

            # Check condition
            should_break = True
            if condition:
                should_break = condition(*args, **kwargs)

            if should_break:
                while True:
                    cmd = input("\n(debug) ").strip()

                    if cmd == "c":
                        break
                    if cmd == "i":
                        pass
                    elif cmd.startswith("p "):
                        expr = cmd[2:]
                        with contextlib.suppress(Exception):
                            # Evaluate in context
                            result = eval(expr, {"args": args, "kwargs": kwargs})
                    elif cmd == "pdb":
                        import pdb  # noqa: T100

                        pdb.set_trace()
                        break

            # Execute function
            try:
                result = func(*args, **kwargs)

                # Log execution
                self.step_history.append(
                    {
                        "step": func.__name__,
                        "inputs": {"args": args, "kwargs": kwargs},
                        "output": result,
                        "success": True,
                    },
                )

                return result
            except Exception as e:
                # Log error
                self.step_history.append(
                    {
                        "step": func.__name__,
                        "inputs": {"args": args, "kwargs": kwargs},
                        "error": str(e),
                        "success": False,
                    },
                )
                raise

        return wrapper

    return decorator
clear_history() -> None

Clear execution history.

Source code in flowyml/utils/debug.py
def clear_history(self) -> None:
    """Clear execution history."""
    self.step_history = []
get_history()

Get step execution history.

Source code in flowyml/utils/debug.py
def get_history(self):
    """Get step execution history."""
    return self.step_history
profile()

Profile step execution time.

Source code in flowyml/utils/debug.py
def profile(self):
    """Profile step execution time."""

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            import time

            start = time.time()
            result = func(*args, **kwargs)
            time.time() - start

            return result

        return wrapper

    return decorator
trace()

Enable step tracing (print inputs/outputs).

Source code in flowyml/utils/debug.py
def trace(self):
    """Enable step tracing (print inputs/outputs)."""

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not self.enabled:
                return func(*args, **kwargs)

            result = func(*args, **kwargs)

            return result

        return wrapper

    return decorator

Functions

debug_step(*args, **kwargs)

Convenience function to debug a step.

Source code in flowyml/utils/debug.py
def debug_step(*args, **kwargs):
    """Convenience function to debug a step."""
    return _global_debugger.break_at(*args, **kwargs)

inspect_step(step) -> None

Inspect a step's metadata.

Parameters:

Name Type Description Default
step

Step to inspect

required
Source code in flowyml/utils/debug.py
def inspect_step(step) -> None:
    """Inspect a step's metadata.

    Args:
        step: Step to inspect
    """
    if step.source_code:
        pass

print_dag(pipeline) -> None

Pretty print pipeline DAG.

Source code in flowyml/utils/debug.py
def print_dag(pipeline) -> None:
    """Pretty print pipeline DAG."""
    pipeline.build()

profile_step()

Convenience function to profile a step.

Source code in flowyml/utils/debug.py
def profile_step():
    """Convenience function to profile a step."""
    return _global_debugger.profile()

trace_step()

Convenience function to trace a step.

Source code in flowyml/utils/debug.py
def trace_step():
    """Convenience function to trace a step."""
    return _global_debugger.trace()

Validation Utilities

Pydantic schemas and validation utilities for flowyml.

Classes

CacheStrategy

Bases: str, Enum

Cache strategy options.

ContextConfig

Bases: BaseModel

Configuration for pipeline context.

Functions

validate_device(v: str | None) -> str | None classmethod

Validate device string.

Source code in flowyml/utils/validation.py
@field_validator("device")
@classmethod
def validate_device(cls, v: str | None) -> str | None:
    """Validate device string."""
    if v is None:
        return v

    valid_devices = ["cpu", "cuda", "mps", "tpu"]
    if v.lower() not in valid_devices and not v.startswith("cuda:"):
        raise ValueError(
            f"Invalid device. Must be one of: {', '.join(valid_devices)} or 'cuda:N'",
        )

    return v.lower()

DatasetSchema

Bases: BaseModel

Schema for dataset validation.

ExperimentConfig

Bases: BaseModel

Configuration for experiments.

MetricsSchema

Bases: BaseModel

Schema for metrics validation.

ModelSchema

Bases: BaseModel

Schema for model validation.

PipelineConfig

Bases: BaseModel

Configuration for pipelines.

ResourceRequirements

Bases: BaseModel

Resource requirements for step execution.

Functions

validate_size_format(v: str | None) -> str | None classmethod

Validate memory/disk size format.

Source code in flowyml/utils/validation.py
@field_validator("memory", "disk")
@classmethod
def validate_size_format(cls, v: str | None) -> str | None:
    """Validate memory/disk size format."""
    if v is None:
        return v

    valid_units = ["B", "KB", "MB", "GB", "TB"]
    v_upper = v.upper()

    for unit in valid_units:
        if v_upper.endswith(unit):
            try:
                size_val = v_upper[: -len(unit)]
                float(size_val)
                return v
            except ValueError:
                raise ValueError(
                    f"Invalid size format: {v}. Expected format: <number><unit> (e.g., '4GB')",
                )

    raise ValueError(f"Invalid size unit. Must be one of: {', '.join(valid_units)}")

RetryConfig

Bases: BaseModel

Retry configuration for step execution.

Functions

max_delay_greater_than_initial(v: float, info) -> float classmethod

Validate max_delay is greater than initial_delay.

Source code in flowyml/utils/validation.py
@field_validator("max_delay")
@classmethod
def max_delay_greater_than_initial(cls, v: float, info) -> float:
    """Validate max_delay is greater than initial_delay."""
    if "initial_delay" in info.data and v < info.data["initial_delay"]:
        raise ValueError("max_delay must be greater than or equal to initial_delay")
    return v

StackConfig

Bases: BaseModel

Configuration for execution stacks.

StepConfig

Bases: BaseModel

Configuration for pipeline steps.

Functions

validate_context_config(config: dict[str, Any]) -> ContextConfig

Validate context configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Context configuration dictionary

required

Returns:

Type Description
ContextConfig

Validated ContextConfig instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_context_config(config: dict[str, Any]) -> ContextConfig:
    """Validate context configuration.

    Args:
        config: Context configuration dictionary

    Returns:
        Validated ContextConfig instance

    Raises:
        ValidationError: If validation fails
    """
    return ContextConfig(**config)

validate_metrics(metrics: dict[str, Any]) -> MetricsSchema

Validate metrics.

Parameters:

Name Type Description Default
metrics dict[str, Any]

Metrics dictionary

required

Returns:

Type Description
MetricsSchema

Validated MetricsSchema instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_metrics(metrics: dict[str, Any]) -> MetricsSchema:
    """Validate metrics.

    Args:
        metrics: Metrics dictionary

    Returns:
        Validated MetricsSchema instance

    Raises:
        ValidationError: If validation fails
    """
    return MetricsSchema(**metrics)

validate_pipeline_config(config: dict[str, Any]) -> PipelineConfig

Validate pipeline configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Pipeline configuration dictionary

required

Returns:

Type Description
PipelineConfig

Validated PipelineConfig instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_pipeline_config(config: dict[str, Any]) -> PipelineConfig:
    """Validate pipeline configuration.

    Args:
        config: Pipeline configuration dictionary

    Returns:
        Validated PipelineConfig instance

    Raises:
        ValidationError: If validation fails
    """
    return PipelineConfig(**config)

validate_step_config(config: dict[str, Any]) -> StepConfig

Validate step configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Step configuration dictionary

required

Returns:

Type Description
StepConfig

Validated StepConfig instance

Raises:

Type Description
ValidationError

If validation fails

Source code in flowyml/utils/validation.py
def validate_step_config(config: dict[str, Any]) -> StepConfig:
    """Validate step configuration.

    Args:
        config: Step configuration dictionary

    Returns:
        Validated StepConfig instance

    Raises:
        ValidationError: If validation fails
    """
    return StepConfig(**config)

See Also