Skip to content

Decorators API πŸŽ€

@step

Decorator to define a pipeline step with automatic context injection.

Can be used as @step or @step(inputs=...)

Every decorated function is automatically registered in a global StepRegistry, enabling Pipeline(auto_discover=True) to build the DAG without any manual add_step() calls.

Parameters:

Name Type Description Default
_func Callable | None

Function being decorated (when used as @step)

None
inputs list[str] | None

List of input asset names

None
outputs list[str] | None

List of output asset names

None
cache bool | str | Callable

Caching strategy ("code_hash", "input_hash", callable, or False)

'code_hash'
retry int

Number of retry attempts on failure

0
timeout int | None

Maximum execution time in seconds

None
resources Union[dict[str, Any], ResourceRequirements, None]

Resource requirements (ResourceRequirements object or dict for backward compat)

None
tags dict[str, str] | None

Metadata tags for the step

None
name str | None

Optional custom name for the step

None
condition Callable | None

Optional callable that returns True if step should run

None
execution_group str | None

Optional group name for executing multiple steps together

None
pipeline str | None

Optional pipeline name for scoped auto-discovery. When set, the step is only auto-discovered by pipelines that match this name (or by get_registered_steps(pipeline="...")).

None
register bool

If False, the step is NOT added to the global registry. Defaults to True. Set to False for helper/utility steps that should only be used via explicit add_step().

True
Example

@step ... def simple_step(): ... ... @step(inputs=["data/train"], outputs=["model/trained"]) ... def train_model(train_data): ... ...

Scoped to a specific pipeline

@step(pipeline="training", outputs=["model"]) ... def train(data): ... ...

With resource requirements

from flowyml.core.resources import ResourceRequirements, GPUConfig @step(resources=ResourceRequirements(cpu="4", memory="16Gi", gpu=GPUConfig(gpu_type="nvidia-v100", count=2))) ... def gpu_train(data): ... ...

Source code in flowyml/core/step.py
def step(
    _func: Callable | None = None,
    *,
    inputs: list[str] | None = None,
    outputs: list[str] | None = None,
    cache: bool | str | Callable = "code_hash",
    retry: int = 0,
    timeout: int | None = None,
    resources: Union[dict[str, Any], "ResourceRequirements", None] = None,
    tags: dict[str, str] | None = None,
    name: str | None = None,
    condition: Callable | None = None,
    execution_group: str | None = None,
    pipeline: str | None = None,
    register: bool = True,
):
    """Decorator to define a pipeline step with automatic context injection.

    Can be used as @step or @step(inputs=...)

    Every decorated function is automatically registered in a global
    ``StepRegistry``, enabling ``Pipeline(auto_discover=True)`` to build
    the DAG without any manual ``add_step()`` calls.

    Args:
        _func: Function being decorated (when used as @step)
        inputs: List of input asset names
        outputs: List of output asset names
        cache: Caching strategy ("code_hash", "input_hash", callable, or False)
        retry: Number of retry attempts on failure
        timeout: Maximum execution time in seconds
        resources: Resource requirements (ResourceRequirements object or dict for backward compat)
        tags: Metadata tags for the step
        name: Optional custom name for the step
        condition: Optional callable that returns True if step should run
        execution_group: Optional group name for executing multiple steps together
        pipeline: Optional pipeline name for scoped auto-discovery.
            When set, the step is only auto-discovered by pipelines
            that match this name (or by ``get_registered_steps(pipeline="...")``).
        register: If False, the step is NOT added to the global registry.
            Defaults to True. Set to False for helper/utility steps that
            should only be used via explicit ``add_step()``.

    Example:
        >>> @step
        ... def simple_step():
        ...     ...
        >>> @step(inputs=["data/train"], outputs=["model/trained"])
        ... def train_model(train_data):
        ...     ...
        >>> # Scoped to a specific pipeline
        >>> @step(pipeline="training", outputs=["model"])
        ... def train(data):
        ...     ...
        >>> # With resource requirements
        >>> from flowyml.core.resources import ResourceRequirements, GPUConfig
        >>> @step(resources=ResourceRequirements(cpu="4", memory="16Gi", gpu=GPUConfig(gpu_type="nvidia-v100", count=2)))
        ... def gpu_train(data):
        ...     ...
    """

    def decorator(func: Callable) -> Step:
        # Merge pipeline tag into tags dict
        merged_tags = dict(tags) if tags else {}
        if pipeline is not None:
            merged_tags["pipeline"] = pipeline

        step_instance = Step(
            func=func,
            name=name,
            inputs=inputs,
            outputs=outputs,
            cache=cache,
            retry=retry,
            timeout=timeout,
            resources=resources,
            tags=merged_tags if merged_tags else None,
            condition=condition,
            execution_group=execution_group,
        )

        # Auto-register in global registry
        if register:
            _global_registry.register(step_instance)

        return step_instance

    if _func is None:
        return decorator
    else:
        return decorator(_func)

@trace_llm

Decorator to trace LLM calls.

Source code in flowyml/monitoring/llm.py
def trace_llm(name: str | None = None, event_type: str = "llm"):
    """Decorator to trace LLM calls."""

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            event_name = name or func.__name__

            # Capture inputs
            inputs = {
                "args": [str(a) for a in args],
                "kwargs": {k: str(v) for k, v in kwargs.items()},
            }

            tracer.start_event(event_name, event_type, inputs)

            try:
                result = func(*args, **kwargs)

                # Try to extract metrics if result has them (e.g. OpenAI response)
                metrics = {}
                if hasattr(result, "usage"):  # OpenAI style
                    metrics["prompt_tokens"] = getattr(result.usage, "prompt_tokens", 0)
                    metrics["completion_tokens"] = getattr(result.usage, "completion_tokens", 0)
                    metrics["total_tokens"] = getattr(result.usage, "total_tokens", 0)

                tracer.end_event(outputs={"result": str(result)}, metrics=metrics)
                return result
            except Exception as e:
                tracer.end_event(error=str(e))
                raise

        return wrapper

    return decorator