Utils API π οΈ
The utilities module provides cross-cutting helpers used throughout FlowyML: content hashing for cache keys, serialization/deserialization routines, structured logging setup, input-validation helpers, and lightweight debug tools. While most users won't call these directly, they are essential for plugin authors and anyone extending the framework.
Helper functions.
Debug Utilities
Pipeline and step debugging tools.
Classes
PipelineDebugger(pipeline)
Debug entire pipelines.
Features: - Step-by-step execution - DAG visualization - Execution replay - Error analysis
Source code in flowyml/utils/debug.py
Functions
analyze_errors(run_id: str) -> None
Analyze errors from a failed run.
Source code in flowyml/utils/debug.py
replay_run(run_id: str, start_from: str | None = None) -> None
Replay a previous run, optionally starting from a specific step.
step_through() -> None
Execute pipeline step-by-step with breaks.
Source code in flowyml/utils/debug.py
StepDebugger()
Debug individual pipeline steps.
Features: - Breakpoints - Input/output inspection - Exception debugging - Step profiling
Examples:
>>> from flowyml import step, StepDebugger
>>> debugger = StepDebugger()
>>> @step(outputs=["processed"])
... @debugger.breakpoint()
... def process_data(data):
... # Debugger will stop here
... return data * 2
Source code in flowyml/utils/debug.py
Functions
break_at(condition: Callable | None = None)
Add a breakpoint to a step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
Callable | None
|
Optional condition function. Break only if returns True. |
None
|
Source code in flowyml/utils/debug.py
clear_history() -> None
get_history()
profile()
Profile step execution time.
Source code in flowyml/utils/debug.py
trace()
Enable step tracing (print inputs/outputs).
Source code in flowyml/utils/debug.py
Functions
debug_step(*args, **kwargs)
inspect_step(step) -> None
print_dag(pipeline) -> None
profile_step()
Validation Utilities
Pydantic schemas and validation utilities for flowyml.
Classes
CacheStrategy
Bases: str, Enum
Cache strategy options.
ContextConfig
Bases: BaseModel
Configuration for pipeline context.
Functions
validate_device(v: str | None) -> str | None
classmethod
Validate device string.
Source code in flowyml/utils/validation.py
DatasetSchema
Bases: BaseModel
Schema for dataset validation.
ExperimentConfig
Bases: BaseModel
Configuration for experiments.
MetricsSchema
Bases: BaseModel
Schema for metrics validation.
ModelSchema
Bases: BaseModel
Schema for model validation.
PipelineConfig
Bases: BaseModel
Configuration for pipelines.
ResourceRequirements
Bases: BaseModel
Resource requirements for step execution.
Functions
validate_size_format(v: str | None) -> str | None
classmethod
Validate memory/disk size format.
Source code in flowyml/utils/validation.py
RetryConfig
Bases: BaseModel
Retry configuration for step execution.
Functions
max_delay_greater_than_initial(v: float, info) -> float
classmethod
Validate max_delay is greater than initial_delay.
Source code in flowyml/utils/validation.py
StackConfig
Bases: BaseModel
Configuration for execution stacks.
StepConfig
Bases: BaseModel
Configuration for pipeline steps.
Functions
validate_context_config(config: dict[str, Any]) -> ContextConfig
Validate context configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Context configuration dictionary |
required |
Returns:
| Type | Description |
|---|---|
ContextConfig
|
Validated ContextConfig instance |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If validation fails |
Source code in flowyml/utils/validation.py
validate_metrics(metrics: dict[str, Any]) -> MetricsSchema
Validate metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
dict[str, Any]
|
Metrics dictionary |
required |
Returns:
| Type | Description |
|---|---|
MetricsSchema
|
Validated MetricsSchema instance |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If validation fails |
Source code in flowyml/utils/validation.py
validate_pipeline_config(config: dict[str, Any]) -> PipelineConfig
Validate pipeline configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Pipeline configuration dictionary |
required |
Returns:
| Type | Description |
|---|---|
PipelineConfig
|
Validated PipelineConfig instance |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If validation fails |
Source code in flowyml/utils/validation.py
validate_step_config(config: dict[str, Any]) -> StepConfig
Validate step configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Step configuration dictionary |
required |
Returns:
| Type | Description |
|---|---|
StepConfig
|
Validated StepConfig instance |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If validation fails |
Source code in flowyml/utils/validation.py
See Also
- Debugging Guide β how to diagnose and troubleshoot pipeline issues