Skip to content

Step API πŸ‘£

Steps are the building blocks of flowyml pipelines.

Usage

1
2
3
4
5
from flowyml import step

@step(cache=True)
def my_step(data):
    return process(data)

Decorator @step

Decorator to define a pipeline step with automatic context injection.

Can be used as @step or @step(inputs=...)

Every decorated function is automatically registered in a global StepRegistry, enabling Pipeline(auto_discover=True) to build the DAG without any manual add_step() calls.

Parameters:

Name Type Description Default
_func Callable | None

Function being decorated (when used as @step)

None
inputs list[str] | None

List of input asset names

None
outputs list[str] | None

List of output asset names

None
cache bool | str | Callable

Caching strategy ("code_hash", "input_hash", callable, or False)

'code_hash'
retry int

Number of retry attempts on failure

0
timeout int | None

Maximum execution time in seconds

None
resources Union[dict[str, Any], ResourceRequirements, None]

Resource requirements (ResourceRequirements object or dict for backward compat)

None
tags dict[str, str] | None

Metadata tags for the step

None
name str | None

Optional custom name for the step

None
condition Callable | None

Optional callable that returns True if step should run

None
execution_group str | None

Optional group name for executing multiple steps together

None
pipeline str | None

Optional pipeline name for scoped auto-discovery. When set, the step is only auto-discovered by pipelines that match this name (or by get_registered_steps(pipeline="...")).

None
register bool

If False, the step is NOT added to the global registry. Defaults to True. Set to False for helper/utility steps that should only be used via explicit add_step().

True
Example

@step ... def simple_step(): ... ... @step(inputs=["data/train"], outputs=["model/trained"]) ... def train_model(train_data): ... ...

Scoped to a specific pipeline

@step(pipeline="training", outputs=["model"]) ... def train(data): ... ...

With resource requirements

from flowyml.core.resources import ResourceRequirements, GPUConfig @step(resources=ResourceRequirements(cpu="4", memory="16Gi", gpu=GPUConfig(gpu_type="nvidia-v100", count=2))) ... def gpu_train(data): ... ...

Source code in flowyml/core/step.py
def step(
    _func: Callable | None = None,
    *,
    inputs: list[str] | None = None,
    outputs: list[str] | None = None,
    cache: bool | str | Callable = "code_hash",
    retry: int = 0,
    timeout: int | None = None,
    resources: Union[dict[str, Any], "ResourceRequirements", None] = None,
    tags: dict[str, str] | None = None,
    name: str | None = None,
    condition: Callable | None = None,
    execution_group: str | None = None,
    pipeline: str | None = None,
    register: bool = True,
):
    """Decorator to define a pipeline step with automatic context injection.

    Can be used as @step or @step(inputs=...)

    Every decorated function is automatically registered in a global
    ``StepRegistry``, enabling ``Pipeline(auto_discover=True)`` to build
    the DAG without any manual ``add_step()`` calls.

    Args:
        _func: Function being decorated (when used as @step)
        inputs: List of input asset names
        outputs: List of output asset names
        cache: Caching strategy ("code_hash", "input_hash", callable, or False)
        retry: Number of retry attempts on failure
        timeout: Maximum execution time in seconds
        resources: Resource requirements (ResourceRequirements object or dict for backward compat)
        tags: Metadata tags for the step
        name: Optional custom name for the step
        condition: Optional callable that returns True if step should run
        execution_group: Optional group name for executing multiple steps together
        pipeline: Optional pipeline name for scoped auto-discovery.
            When set, the step is only auto-discovered by pipelines
            that match this name (or by ``get_registered_steps(pipeline="...")``).
        register: If False, the step is NOT added to the global registry.
            Defaults to True. Set to False for helper/utility steps that
            should only be used via explicit ``add_step()``.

    Example:
        >>> @step
        ... def simple_step():
        ...     ...
        >>> @step(inputs=["data/train"], outputs=["model/trained"])
        ... def train_model(train_data):
        ...     ...
        >>> # Scoped to a specific pipeline
        >>> @step(pipeline="training", outputs=["model"])
        ... def train(data):
        ...     ...
        >>> # With resource requirements
        >>> from flowyml.core.resources import ResourceRequirements, GPUConfig
        >>> @step(resources=ResourceRequirements(cpu="4", memory="16Gi", gpu=GPUConfig(gpu_type="nvidia-v100", count=2)))
        ... def gpu_train(data):
        ...     ...
    """

    def decorator(func: Callable) -> Step:
        # Merge pipeline tag into tags dict
        merged_tags = dict(tags) if tags else {}
        if pipeline is not None:
            merged_tags["pipeline"] = pipeline

        step_instance = Step(
            func=func,
            name=name,
            inputs=inputs,
            outputs=outputs,
            cache=cache,
            retry=retry,
            timeout=timeout,
            resources=resources,
            tags=merged_tags if merged_tags else None,
            condition=condition,
            execution_group=execution_group,
        )

        # Auto-register in global registry
        if register:
            _global_registry.register(step_instance)

        return step_instance

    if _func is None:
        return decorator
    else:
        return decorator(_func)

Class Step

A pipeline step that can be executed with automatic context injection.

Source code in flowyml/core/step.py
def __init__(
    self,
    func: Callable,
    name: str | None = None,
    inputs: list[str] | None = None,
    outputs: list[str] | None = None,
    cache: bool | str | Callable = "code_hash",
    retry: int = 0,
    timeout: int | None = None,
    resources: Union[dict[str, Any], "ResourceRequirements", None] = None,
    tags: dict[str, str] | None = None,
    condition: Callable | None = None,
    execution_group: str | None = None,
):
    self.func = func
    self.name = name or func.__name__
    self.inputs = inputs or []
    self.outputs = outputs or []
    self.cache = cache
    self.retry = retry
    self.timeout = timeout

    # Store resources (accept both dict for backward compatibility and ResourceRequirements)
    self.resources = resources
    if self.resources and ResourceRequirements and not isinstance(self.resources, ResourceRequirements):
        if isinstance(self.resources, dict):
            resource_kwargs = dict(self.resources)
            gpu_value = resource_kwargs.get("gpu")
            if GPUConfig and gpu_value is not None:
                if isinstance(gpu_value, dict):
                    resource_kwargs["gpu"] = GPUConfig(
                        gpu_type=gpu_value.get("gpu_type") or gpu_value.get("type") or "generic",
                        count=int(gpu_value.get("count", 1)),
                        memory=gpu_value.get("memory"),
                    )
                elif isinstance(gpu_value, (int, float)):
                    resource_kwargs["gpu"] = GPUConfig(gpu_type="generic", count=int(gpu_value))
            with contextlib.suppress(TypeError):
                self.resources = ResourceRequirements(**resource_kwargs)

    self.tags = tags or {}
    self.condition = condition
    self.execution_group = execution_group

    # Capture source code and location for UI display
    try:
        self.source_code = inspect.getsource(func)
        self.source_file = inspect.getsourcefile(func)
        _, self.source_line = inspect.getsourcelines(func)
    except (OSError, TypeError):
        self.source_code = "# Source code not available"
        self.source_file = None
        self.source_line = None

    self.config = StepConfig(
        name=self.name,
        func=func,
        inputs=self.inputs,
        outputs=self.outputs,
        cache=self.cache,
        retry=self.retry,
        timeout=self.timeout,
        resources=self.resources,
        tags=self.tags,
        condition=self.condition,
        execution_group=self.execution_group,
        source_file=self.source_file,
        source_line=self.source_line,
    )

Functions

__call__(*args: Any, **kwargs: Any) -> Any

Execute the step function.

Source code in flowyml/core/step.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Execute the step function."""
    # Check condition if present
    if self.condition:
        # We might need to inject context into condition too,
        # but for now assume it takes no args or same args as step?
        # This is tricky without context injection logic here.
        # The executor handles execution, so maybe we just store it here.
        pass

    return self.func(*args, **kwargs)

get_cache_key(inputs: dict[str, Any] | None = None) -> str

Generate cache key based on caching strategy.

Parameters:

Name Type Description Default
inputs dict[str, Any] | None

Input data for the step

None

Returns:

Type Description
str

Cache key string

Source code in flowyml/core/step.py
def get_cache_key(self, inputs: dict[str, Any] | None = None) -> str:
    """Generate cache key based on caching strategy.

    Args:
        inputs: Input data for the step

    Returns:
        Cache key string
    """
    if self.cache == "code_hash":
        return f"{self.name}:{self.get_code_hash()}"
    elif self.cache == "input_hash" and inputs:
        return f"{self.name}:{self.get_input_hash(inputs)}"
    elif callable(self.cache) and inputs:
        return self.cache(inputs, {})
    else:
        return f"{self.name}:no-cache"

get_code_hash() -> str

Compute hash of the step's source code.

Source code in flowyml/core/step.py
def get_code_hash(self) -> str:
    """Compute hash of the step's source code."""
    try:
        source = inspect.getsource(self.func)
        return hashlib.md5(source.encode()).hexdigest()
    except (OSError, TypeError):
        # Fallback for dynamically defined functions or when source is unavailable
        return hashlib.md5(self.name.encode()).hexdigest()[:16]

get_input_hash(inputs: dict[str, Any]) -> str

Generate hash of inputs for caching.

Source code in flowyml/core/step.py
def get_input_hash(self, inputs: dict[str, Any]) -> str:
    """Generate hash of inputs for caching."""
    input_str = json.dumps(inputs, sort_keys=True, default=str)
    return hashlib.sha256(input_str.encode()).hexdigest()[:16]