Skip to content

🎒 Pipeline

🎒 Pipeline API

The central orchestrator that wires steps into a directed acyclic graph and manages execution.

πŸ”§ Build ▢️ Execute πŸ“… Schedule

Constructor

from flowyml import Pipeline

pipeline = Pipeline(
    name="my_pipeline",
    context=ctx,           # optional
    description="...",     # optional
)
Parameter Type Default Description
name str required Unique identifier for the pipeline. Used in UI, logs, and artifact paths.
context Context | None None A context() object for parameter injection into steps.
description str | None None Human-readable description shown in the UI dashboard.
tags list[str] [] Arbitrary tags for filtering and grouping pipeline runs.
version str | None None Explicit version string. Auto-incremented if omitted.

Key Methods

Method Signature Description
add_step add_step(fn, **overrides) Register a decorated @step function. Overrides let you change cache, retries, etc. at add-time.
run run(**runtime_ctx) β†’ PipelineResult Execute all steps in topological order. Returns a result object with .success, .outputs, .duration.
build build() β†’ DAG Compile the pipeline into a DAG without executing. Useful for validation and visualization.
schedule schedule(cron: str, **kwargs) Register a cron-style schedule for recurring execution.
dry_run dry_run(**runtime_ctx) β†’ PipelineResult Simulate execution β€” resolves the DAG and validates inputs/outputs without running step bodies.
visualize visualize(format="mermaid") Render the pipeline DAG as Mermaid, DOT, or ASCII art.

Usage Examples

1️⃣ Basic Pipeline

from flowyml import Pipeline, step

@step(outputs=["data"])
def load_data():
    return [1, 2, 3]

@step(inputs=["data"], outputs=["result"])
def transform(data):
    return [x * 2 for x in data]

pipeline = Pipeline("basic")
pipeline.add_step(load_data)
pipeline.add_step(transform)

result = pipeline.run()
print(result.outputs)  # {"result": [2, 4, 6]}

2️⃣ Pipeline with Context

from flowyml import Pipeline, step, context

@step(outputs=["data"])
def load_data(dataset_path: str = "data.csv"):
    return pd.read_csv(dataset_path)

ctx = context(dataset_path="gs://bucket/train.csv")
pipeline = Pipeline("with_context", context=ctx)
pipeline.add_step(load_data)

result = pipeline.run()

Runtime Overrides

You can also pass overrides directly to run():

result = pipeline.run(dataset_path="local/test.csv")

3️⃣ Sub-Pipeline Composition

from flowyml import Pipeline, step
from flowyml.core.pipeline import SubPipelineStep

# Define a reusable sub-pipeline
preprocess = Pipeline("preprocess")
preprocess.add_step(clean_data)
preprocess.add_step(normalize)

# Embed it inside a larger pipeline
main = Pipeline("training")
main.add_step(SubPipelineStep(preprocess))
main.add_step(train_model)
main.add_step(evaluate)

result = main.run()

4️⃣ Scheduled Execution

1
2
3
4
5
6
7
8
9
from flowyml import Pipeline, step

pipeline = Pipeline("nightly_retrain")
pipeline.add_step(fetch_latest_data)
pipeline.add_step(retrain_model)
pipeline.add_step(deploy_if_better)

# Run every day at 2 AM UTC
pipeline.schedule("0 2 * * *")

5️⃣ Dry Run & Validation

1
2
3
4
5
6
7
8
pipeline = Pipeline("validate_me")
pipeline.add_step(load_data)
pipeline.add_step(transform)

# Validate without executing step bodies
result = pipeline.dry_run()
print(result.dag)       # Inspect resolved DAG
print(result.outputs)   # Expected output keys

Dry Run Limitations

Dry runs validate the DAG topology and type compatibility but cannot catch runtime errors inside step functions.

PipelineResult

The object returned by run() and dry_run():

Attribute Type Description
success bool Whether all steps completed without error.
outputs dict Final outputs keyed by artifact name.
duration float Total wall-clock time in seconds.
steps list[StepResult] Per-step results with timing and status.
dag DAG The compiled directed acyclic graph.

Autodoc

Main pipeline class for orchestrating ML workflows.

Example

from flowyml import Pipeline, step, context ctx = context(learning_rate=0.001, epochs=10) @step(outputs=["model/trained"]) ... def train(learning_rate: float, epochs: int): ... return train_model(learning_rate, epochs)

Option 1: Auto-discover all @step-decorated functions

pipeline = Pipeline("my_pipeline", context=ctx, auto_discover=True) result = pipeline.run()

Option 2: Concise explicit selection

pipeline = Pipeline.from_steps(train, name="my_pipeline", context=ctx)

Option 3: Batch add

pipeline = Pipeline("my_pipeline", context=ctx) pipeline.add_steps([train])

Option 4: Manual add_step (existing, still works)

pipeline = Pipeline("my_pipeline", context=ctx) pipeline.add_step(train) result = pipeline.run()

With project_name, automatically creates/attaches to project

pipeline = Pipeline("my_pipeline", context=ctx, project_name="ml_project")

With version parameter, automatically creates VersionedPipeline

pipeline = Pipeline("my_pipeline", context=ctx, version="v1.0.1", project_name="ml_project")

Initialize pipeline.

Parameters:

Name Type Description Default
name str

Name of the pipeline

required
context Context | None

Optional context for parameter injection

None
executor Executor | None

Optional executor (defaults to LocalExecutor)

None
enable_cache bool

Whether to enable caching

True
enable_checkpointing bool | None

Whether to enable checkpointing (defaults to config setting, True by default)

None
enable_experiment_tracking bool | None

Whether to enable automatic experiment tracking (defaults to config.auto_log_metrics, True by default)

None
cache_dir str | None

Optional directory for cache

None
stack Any | None

Stack to run on. Accepts: - str: Stack name (e.g. "local", "aml_cpu_small"), a URI (e.g. "github://org/repo@v1#stack_name"), or "local". - Stack: Existing runtime Stack instance. - StackDefinition: Enterprise Pydantic stack definition.

None
env str | None

Environment name from flowyml.yaml (e.g. "dev", "staging", "prod"). Resolves the stack from the project config's environments section. If both stack and env are provided, stack takes priority.

None
project str | None

Optional project name to attach this pipeline to (deprecated, use project_name)

None
project_name str | None

Optional project name to attach this pipeline to. If the project doesn't exist, it will be created automatically.

None
version str | None

Optional version string. If provided, a VersionedPipeline instance will be created instead of a regular Pipeline.

None
auto_discover bool

If True, automatically discover all @step-decorated functions from the global registry at build time. Steps with a matching pipeline tag are preferred. Defaults to False.

False
**kwargs Any

Additional keyword arguments passed to the pipeline.

{}
Source code in flowyml/core/pipeline.py
def __init__(
    self,
    name: str,
    context: Context | None = None,
    executor: Executor | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool | None = None,  # None means use config default
    enable_experiment_tracking: bool | None = None,  # None means use config default (True)
    cache_dir: str | None = None,
    stack: Any | None = None,  # Stack name (str), Stack instance, or StackDefinition
    env: str | None = None,  # Environment name from flowyml.yaml (e.g. 'dev', 'staging', 'prod')
    project: str | None = None,  # Project name to attach to (deprecated, use project_name)
    project_name: str | None = None,  # Project name to attach to (creates if doesn't exist)
    version: str | None = None,  # If provided, VersionedPipeline is created via __new__
    auto_discover: bool = False,  # Auto-discover @step-decorated functions
    **kwargs: Any,
):
    """Initialize pipeline.

    Args:
        name: Name of the pipeline
        context: Optional context for parameter injection
        executor: Optional executor (defaults to LocalExecutor)
        enable_cache: Whether to enable caching
        enable_checkpointing: Whether to enable checkpointing (defaults to config setting, True by default)
        enable_experiment_tracking: Whether to enable automatic experiment tracking (defaults to config.auto_log_metrics, True by default)
        cache_dir: Optional directory for cache
        stack: Stack to run on. Accepts:
            - ``str``: Stack name (e.g. ``"local"``, ``"aml_cpu_small"``),
              a URI (e.g. ``"github://org/repo@v1#stack_name"``), or ``"local"``.
            - ``Stack``: Existing runtime Stack instance.
            - ``StackDefinition``: Enterprise Pydantic stack definition.
        env: Environment name from ``flowyml.yaml`` (e.g. ``"dev"``, ``"staging"``,
            ``"prod"``). Resolves the stack from the project config's environments
            section. If both ``stack`` and ``env`` are provided, ``stack`` takes priority.
        project: Optional project name to attach this pipeline to (deprecated, use project_name)
        project_name: Optional project name to attach this pipeline to.
            If the project doesn't exist, it will be created automatically.
        version: Optional version string. If provided, a VersionedPipeline
            instance will be created instead of a regular Pipeline.
        auto_discover: If True, automatically discover all ``@step``-decorated
            functions from the global registry at build time. Steps with a
            matching ``pipeline`` tag are preferred. Defaults to False.
        **kwargs: Additional keyword arguments passed to the pipeline.
    """
    from flowyml.utils.config import get_config

    self.name = name
    self.context = context or Context()
    self.enable_cache = enable_cache

    # Set checkpointing (use config default if not specified)
    config = get_config()
    self.enable_checkpointing = (
        enable_checkpointing if enable_checkpointing is not None else config.enable_checkpointing
    )

    # Set experiment tracking (use config default if not specified, default: True)
    # Can be set via enable_experiment_tracking parameter or defaults to config.auto_log_metrics
    self.enable_experiment_tracking = (
        enable_experiment_tracking if enable_experiment_tracking is not None else config.auto_log_metrics
    )
    self.stack = None  # Will be assigned via _apply_stack
    self._stack_locked = stack is not None
    self._provided_executor = executor

    self.steps: list[Step] = []
    self.dag = DAG()

    # Storage
    if cache_dir is None:
        from flowyml.utils.config import get_config

        cache_dir = str(get_config().cache_dir)

    self.cache_store = CacheStore(cache_dir) if enable_cache else None

    from flowyml.utils.config import get_config

    self.runs_dir = get_config().runs_dir
    self.runs_dir.mkdir(parents=True, exist_ok=True)

    # Initialize components from stack or defaults
    self.executor = executor or LocalExecutor()
    # Metadata store for UI integration - use same store as UI
    from flowyml.storage.metadata import SQLiteMetadataStore
    from flowyml.utils.config import get_config
    import os

    config = get_config()
    # Use simple environment variable check to allow connecting to shared DB
    db_url = os.environ.get("FLOWYML_DATABASE_URL")

    if db_url:
        self.metadata_store = SQLiteMetadataStore(db_url=db_url)
    else:
        # Use the same metadata database path as the UI to ensure visibility
        self.metadata_store = SQLiteMetadataStore(db_path=str(config.metadata_db))

    # --- Unified Stack Resolution ---
    # Priority: explicit stack arg β†’ env arg β†’ env var β†’ project config β†’ default
    self._env = env
    self._stack_definition = None  # Enterprise StackDefinition (if resolved)

    resolved_stack = self._resolve_stack_arg(stack, env)
    if resolved_stack:
        self._apply_stack(resolved_stack, locked=stack is not None)
    else:
        # Fallback: auto-resolve active stack from flowyml.yaml / FLOWYML_STACK env var
        try:
            from flowyml.plugins.stack_config import get_active_stack as _get_yaml_stack

            yaml_stack = _get_yaml_stack()
            if yaml_stack is not None:
                live_stack = yaml_stack.to_stack()
                self._apply_stack(live_stack, locked=False)
        except Exception:
            pass  # No config file or parse error β€” continue with defaults

    # Handle Project Attachment
    # Support both project_name (preferred) and project (for backward compatibility)
    project_to_use = project_name or project
    if project_to_use:
        from flowyml.core.project import ProjectManager

        manager = ProjectManager()
        # Get or create project
        proj = manager.get_project(project_to_use)
        if not proj:
            proj = manager.create_project(project_to_use)

        # Configure pipeline with project settings
        self.runs_dir = proj.runs_dir
        self.metadata_store = proj.metadata_store

        # Register pipeline with project
        if name not in proj.metadata["pipelines"]:
            proj.metadata["pipelines"].append(name)
            proj._save_metadata()

        # Store project name for later use (e.g., in _save_run)
        self.project_name = project_to_use
    else:
        self.project_name = None

    # State
    self._built = False
    self._auto_discover = auto_discover
    self.step_groups: list[Any] = []  # Will hold StepGroup objects
    self.control_flows: list[Any] = []  # Store conditional control flows (If, Switch, etc.)

Functions

__new__(name: str, version: str | None = None, project_name: str | None = None, project: str | None = None, **kwargs: Any)

Create a Pipeline or VersionedPipeline instance.

If version is provided, automatically returns a VersionedPipeline instance. Otherwise, returns a regular Pipeline instance.

Source code in flowyml/core/pipeline.py
def __new__(
    cls,
    name: str,
    version: str | None = None,
    project_name: str | None = None,
    project: str | None = None,  # For backward compatibility
    **kwargs: Any,
):
    """Create a Pipeline or VersionedPipeline instance.

    If version is provided, automatically returns a VersionedPipeline instance.
    Otherwise, returns a regular Pipeline instance.
    """
    if version is not None:
        from flowyml.core.versioning import VersionedPipeline

        # Pass project_name or project to VersionedPipeline
        vp_kwargs = kwargs.copy()
        if project_name:
            vp_kwargs["project_name"] = project_name
        elif project:
            vp_kwargs["project"] = project
        return VersionedPipeline(name=name, version=version, **vp_kwargs)
    return super().__new__(cls)

add_control_flow(control_flow: Any) -> Pipeline

Add conditional control flow to the pipeline.

Parameters:

Name Type Description Default
control_flow Any

Control flow object (If, Switch, etc.)

required

Returns:

Type Description
Pipeline

Self for chaining

Example
from flowyml import If

pipeline.add_control_flow(
    If(
        condition=lambda ctx: ctx.steps["evaluate_model"].outputs["accuracy"] > 0.9,
        then_step=deploy_model,
        else_step=retrain_model,
    )
)
Source code in flowyml/core/pipeline.py
def add_control_flow(self, control_flow: Any) -> "Pipeline":
    """Add conditional control flow to the pipeline.

    Args:
        control_flow: Control flow object (If, Switch, etc.)

    Returns:
        Self for chaining

    Example:
        ```python
        from flowyml import If

        pipeline.add_control_flow(
            If(
                condition=lambda ctx: ctx.steps["evaluate_model"].outputs["accuracy"] > 0.9,
                then_step=deploy_model,
                else_step=retrain_model,
            )
        )
        ```
    """
    self.control_flows.append(control_flow)
    self._built = False
    return self

add_step(step: Step) -> Pipeline

Add a step to the pipeline.

Parameters:

Name Type Description Default
step Step

Step to add

required

Returns:

Type Description
Pipeline

Self for chaining

Source code in flowyml/core/pipeline.py
def add_step(self, step: Step) -> "Pipeline":
    """Add a step to the pipeline.

    Args:
        step: Step to add

    Returns:
        Self for chaining
    """
    self.steps.append(step)
    self._built = False
    return self

add_steps(steps: list[Step]) -> Pipeline

Add multiple steps to the pipeline at once.

Parameters:

Name Type Description Default
steps list[Step]

List of Step instances to add

required

Returns:

Type Description
Pipeline

Self for chaining

Example

pipeline.add_steps([load_data, train_model, evaluate])

Source code in flowyml/core/pipeline.py
def add_steps(self, steps: list[Step]) -> "Pipeline":
    """Add multiple steps to the pipeline at once.

    Args:
        steps: List of Step instances to add

    Returns:
        Self for chaining

    Example:
        >>> pipeline.add_steps([load_data, train_model, evaluate])
    """
    for s in steps:
        self.steps.append(s)
    self._built = False
    return self

add_sub_pipeline(pipeline: Any, name: str | None = None, inputs: list[str] | None = None, outputs: list[str] | None = None, input_mapping: dict[str, str] | None = None, output_mapping: dict[str, str] | None = None, **kwargs: Any) -> Pipeline

Add a sub-pipeline as a step in this pipeline.

The sub-pipeline's steps will execute as a single unit within this pipeline's execution flow.

Parameters:

Name Type Description Default
pipeline Any

Pipeline to nest as a step

required
name str | None

Optional step name (defaults to sub_pipeline.name)

None
inputs list[str] | None

Input asset names from this pipeline

None
outputs list[str] | None

Output asset names exposed to this pipeline

None
input_mapping dict[str, str] | None

Maps this pipeline's output names to child input names

None
output_mapping dict[str, str] | None

Maps child output names to this pipeline's input names

None
**kwargs Any

Additional SubPipelineStep configuration

{}

Returns:

Type Description
Pipeline

Self for chaining

Example

preprocess = Pipeline("preprocessing") preprocess.add_step(clean).add_step(normalize)

parent = Pipeline("training") parent.add_sub_pipeline(preprocess, inputs=["raw"], outputs=["clean"]) parent.add_step(train_model)

Source code in flowyml/core/pipeline.py
def add_sub_pipeline(
    self,
    pipeline: Any,
    name: str | None = None,
    inputs: list[str] | None = None,
    outputs: list[str] | None = None,
    input_mapping: dict[str, str] | None = None,
    output_mapping: dict[str, str] | None = None,
    **kwargs: Any,
) -> "Pipeline":
    """Add a sub-pipeline as a step in this pipeline.

    The sub-pipeline's steps will execute as a single unit within
    this pipeline's execution flow.

    Args:
        pipeline: Pipeline to nest as a step
        name: Optional step name (defaults to sub_pipeline.name)
        inputs: Input asset names from this pipeline
        outputs: Output asset names exposed to this pipeline
        input_mapping: Maps this pipeline's output names to child input names
        output_mapping: Maps child output names to this pipeline's input names
        **kwargs: Additional SubPipelineStep configuration

    Returns:
        Self for chaining

    Example:
        >>> preprocess = Pipeline("preprocessing")
        >>> preprocess.add_step(clean).add_step(normalize)
        >>>
        >>> parent = Pipeline("training")
        >>> parent.add_sub_pipeline(preprocess, inputs=["raw"], outputs=["clean"])
        >>> parent.add_step(train_model)
    """
    from flowyml.core.subpipeline import SubPipelineStep

    sub_step = SubPipelineStep(
        sub_pipeline=pipeline,
        name=name,
        inputs=inputs,
        outputs=outputs,
        input_mapping=input_mapping,
        output_mapping=output_mapping,
        **kwargs,
    )
    return self.add_step(sub_step)

build() -> None

Build the execution DAG with type validation.

Source code in flowyml/core/pipeline.py
def build(self) -> None:
    """Build the execution DAG with type validation."""
    if self._built:
        return

    # Auto-discover steps from global registry if enabled
    if self._auto_discover and not self.steps:
        from flowyml.core.step import get_registered_steps

        discovered = get_registered_steps(pipeline=self.name)
        if not discovered:
            discovered = get_registered_steps()
        self.steps = list(discovered)

    # Clear previous DAG
    self.dag = DAG()

    # Add nodes
    for step in self.steps:
        node = Node(
            name=step.name,
            step=step,
            inputs=step.inputs,
            outputs=step.outputs,
        )
        self.dag.add_node(node)

    # Build edges
    self.dag.build_edges()

    # Validate DAG structure (now returns errors + warnings)
    validation_result = self.dag.validate()

    # Handle both old (list) and new (tuple) return formats
    if isinstance(validation_result, tuple):
        errors, warnings = validation_result
    else:
        errors = validation_result
        warnings = []

    # Log warnings (don't fail the build)
    if warnings:
        import logging

        logger = logging.getLogger(__name__)
        for w in warnings:
            logger.warning(f"Pipeline '{self.name}': {w}")

    if errors:
        raise ValueError("Pipeline validation failed:\n" + "\n".join(errors))

    # Type validation across connections
    try:
        from flowyml.core.type_validator import validate_pipeline

        type_errors, type_warnings = validate_pipeline(self.dag, self.steps)

        if type_warnings:
            import logging

            logger = logging.getLogger(__name__)
            for tw in type_warnings:
                logger.warning(f"Pipeline '{self.name}': {tw}")

        if type_errors:
            error_messages = [str(e) for e in type_errors]
            raise ValueError(
                "Pipeline type validation failed:\n" + "\n".join(error_messages),
            )
    except ImportError:
        pass  # type_validator not available, skip

    # Analyze step groups
    from flowyml.core.step_grouping import StepGroupAnalyzer

    analyzer = StepGroupAnalyzer()
    self.step_groups = analyzer.analyze_groups(self.dag, self.steps)

    self._built = True

cache_stats() -> dict[str, Any]

Get cache statistics.

Source code in flowyml/core/pipeline.py
def cache_stats(self) -> dict[str, Any]:
    """Get cache statistics."""
    if self.cache_store:
        return self.cache_store.stats()
    return {}

check_cache() -> dict[str, Any] | None

Check if a successful run of this pipeline already exists.

Returns:

Type Description
dict[str, Any] | None

Metadata of the last successful run, or None if not found.

Source code in flowyml/core/pipeline.py
def check_cache(self) -> dict[str, Any] | None:
    """Check if a successful run of this pipeline already exists.

    Returns:
        Metadata of the last successful run, or None if not found.
    """
    # Query metadata store for successful runs of this pipeline
    try:
        runs = self.metadata_store.query(
            pipeline_name=self.name,
            status="completed",
        )

        if runs:
            # Return the most recent one (query returns ordered by created_at DESC)
            return runs[0]
    except Exception as e:
        # Don't fail if metadata store is not available or errors
        print(f"Warning: Failed to check cache: {e}")

    return None

dry_run(inputs: dict[str, Any] | None = None, stack: Any | None = None, env: str | None = None, **kwargs: Any) -> PipelineResult

Validate the pipeline without executing it.

Resolves the stack, validates policies, and displays the execution plan without running any steps.

Parameters:

Name Type Description Default
inputs dict[str, Any] | None

Optional input data for the pipeline.

None
stack Any | None

Stack override (name, URI, instance, or StackDefinition).

None
env str | None

Environment name from project config.

None
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
PipelineResult

PipelineResult with validation info but no execution.

Source code in flowyml/core/pipeline.py
def dry_run(
    self,
    inputs: dict[str, Any] | None = None,
    stack: Any | None = None,
    env: str | None = None,
    **kwargs: Any,
) -> PipelineResult:
    """Validate the pipeline without executing it.

    Resolves the stack, validates policies, and displays the execution
    plan without running any steps.

    Args:
        inputs: Optional input data for the pipeline.
        stack: Stack override (name, URI, instance, or StackDefinition).
        env: Environment name from project config.
        **kwargs: Additional arguments.

    Returns:
        PipelineResult with validation info but no execution.
    """
    return self.run(
        inputs=inputs,
        stack=stack,
        env=env,
        dry_run=True,
        auto_start_ui=False,
        **kwargs,
    )

from_definition(definition: dict, context: Context | None = None) -> Pipeline classmethod

Reconstruct pipeline from stored definition.

This creates a "ghost" pipeline that can be executed but uses the stored step structure. Actual step logic must still be available in the codebase.

Parameters:

Name Type Description Default
definition dict

Pipeline definition from to_definition()

required
context Context | None

Optional context for execution

None

Returns:

Type Description
Pipeline

Reconstructed Pipeline instance

Source code in flowyml/core/pipeline.py
@classmethod
def from_definition(cls, definition: dict, context: Context | None = None) -> "Pipeline":
    """Reconstruct pipeline from stored definition.

    This creates a "ghost" pipeline that can be executed but uses
    the stored step structure. Actual step logic must still be
    available in the codebase.

    Args:
        definition: Pipeline definition from to_definition()
        context: Optional context for execution

    Returns:
        Reconstructed Pipeline instance
    """
    from flowyml.core.step import step as step_decorator

    # Create pipeline instance
    pipeline = cls(
        name=definition["name"],
        context=context or Context(),
    )

    # Reconstruct steps
    for step_def in definition["steps"]:
        # Create a generic step function that can be called
        # In a real implementation, we'd need to either:
        # 1. Store serialized functions (using cloudpickle)
        # 2. Import functions by name from codebase
        # 3. Use placeholder functions

        # For now, we'll create a placeholder that logs execution
        def generic_step_func(*args, **kwargs):
            """Generic step function for reconstructed pipeline."""
            print(f"Executing reconstructed step with args={args}, kwargs={kwargs}")
            return

        # Apply step decorator with stored metadata
        decorated = step_decorator(
            name=step_def["name"],
            inputs=step_def["inputs"],
            outputs=step_def["outputs"],
            tags=step_def.get("tags", []),
        )(generic_step_func)

        # Add to pipeline
        pipeline.add_step(decorated)

    return pipeline

from_steps(*steps: Step, name: str, **kwargs: Any) -> Pipeline classmethod

Create a pipeline from an explicit list of steps.

Convenience constructor that avoids repetitive add_step() calls while still giving you full control over which steps are included.

Parameters:

Name Type Description Default
*steps Step

Step instances to include

()
name str

Pipeline name (keyword-only)

required
**kwargs Any

Additional arguments passed to Pipeline()

{}

Returns:

Type Description
Pipeline

Configured Pipeline instance

Example

pipeline = Pipeline.from_steps( ... load_data, ... train_model, ... evaluate, ... name="training", ... enable_cache=False, ... )

Source code in flowyml/core/pipeline.py
@classmethod
def from_steps(
    cls,
    *steps: Step,
    name: str,
    **kwargs: Any,
) -> "Pipeline":
    """Create a pipeline from an explicit list of steps.

    Convenience constructor that avoids repetitive ``add_step()`` calls
    while still giving you full control over which steps are included.

    Args:
        *steps: Step instances to include
        name: Pipeline name (keyword-only)
        **kwargs: Additional arguments passed to Pipeline()

    Returns:
        Configured Pipeline instance

    Example:
        >>> pipeline = Pipeline.from_steps(
        ...     load_data,
        ...     train_model,
        ...     evaluate,
        ...     name="training",
        ...     enable_cache=False,
        ... )
    """
    pipeline = cls(name=name, **kwargs)
    pipeline.add_steps(list(steps))
    return pipeline

invalidate_cache(step: str | None = None, before: str | None = None) -> None

Invalidate cache entries.

Parameters:

Name Type Description Default
step str | None

Invalidate cache for specific step

None
before str | None

Invalidate cache entries before date

None
Source code in flowyml/core/pipeline.py
def invalidate_cache(
    self,
    step: str | None = None,
    before: str | None = None,
) -> None:
    """Invalidate cache entries.

    Args:
        step: Invalidate cache for specific step
        before: Invalidate cache entries before date
    """
    if self.cache_store:
        if step:
            self.cache_store.invalidate(step_name=step)
        else:
            self.cache_store.clear()

rerun(run_id: str, from_step: str | None = None, **kwargs: Any) -> PipelineResult

Re-run a pipeline from a checkpoint, resuming from where it left off.

Parameters:

Name Type Description Default
run_id str

The run ID of the previous execution to resume from.

required
from_step str | None

Optional step name to start re-execution from. If provided, all steps before this one are skipped. If not provided, resumes from the first non-completed step.

None
**kwargs Any

Additional arguments passed to Pipeline.run().

{}

Returns:

Type Description
PipelineResult

PipelineResult with outputs from the resumed run.

Examples:

>>> # Resume from last checkpoint
>>> result = pipeline.rerun(run_id="previous-run-id")
>>> # Resume from a specific step
>>> result = pipeline.rerun(run_id="previous-run-id", from_step="train_model")
Source code in flowyml/core/pipeline.py
def rerun(
    self,
    run_id: str,
    from_step: str | None = None,
    **kwargs: Any,
) -> "PipelineResult":
    """Re-run a pipeline from a checkpoint, resuming from where it left off.

    Args:
        run_id: The run ID of the previous execution to resume from.
        from_step: Optional step name to start re-execution from.
                   If provided, all steps before this one are skipped.
                   If not provided, resumes from the first non-completed step.
        **kwargs: Additional arguments passed to Pipeline.run().

    Returns:
        PipelineResult with outputs from the resumed run.

    Examples:
        >>> # Resume from last checkpoint
        >>> result = pipeline.rerun(run_id="previous-run-id")
        >>> # Resume from a specific step
        >>> result = pipeline.rerun(run_id="previous-run-id", from_step="train_model")
    """
    from flowyml.core.checkpoint import PipelineCheckpoint
    from flowyml.utils.config import get_config

    config = get_config()
    checkpoint = PipelineCheckpoint(
        run_id=run_id,
        checkpoint_dir=str(config.checkpoint_dir),
    )

    if not checkpoint.exists():
        raise ValueError(
            f"No checkpoint found for run_id='{run_id}'. "
            "Cannot resume β€” run the pipeline fresh with pipeline.run() instead.",
        )

    # Determine which steps to skip
    completed = checkpoint.get_completed_steps()

    if from_step:
        # Skip only steps that come before from_step
        steps_to_skip = set()
        for step_name in completed:
            if step_name == from_step:
                break
            steps_to_skip.add(step_name)
    else:
        # Skip all completed steps (resume from first non-completed)
        steps_to_skip = set(completed) - {"pipeline_complete"}

    # Set checkpoint state on pipeline for orchestrator to use
    self._checkpoint = checkpoint
    self._resume_from_checkpoint = True
    self._completed_steps_from_checkpoint = steps_to_skip

    import logging

    logger = logging.getLogger("flowyml.checkpoint")
    logger.info(
        f"Re-running pipeline '{self.name}' from run '{run_id}'. "
        f"Skipping {len(steps_to_skip)} completed step(s)."
        + (f" Starting from step '{from_step}'." if from_step else ""),
    )

    # Run with the checkpoint state already configured
    return self.run(run_id=run_id, **kwargs)

run(inputs: dict[str, Any] | None = None, debug: bool = False, stack: Any | None = None, env: str | None = None, dry_run: bool = False, orchestrator: Any | None = None, resources: Any | None = None, docker_config: Any | None = None, context: dict[str, Any] | None = None, auto_start_ui: bool = True, **kwargs: Any) -> PipelineResult

Execute the pipeline.

Parameters:

Name Type Description Default
inputs dict[str, Any] | None

Optional input data for the pipeline

None
debug bool

Enable debug mode with detailed logging

False
stack Any | None

Stack to use for this run. Accepts: - str: Stack name or URI (e.g. "local", "github://org/repo@v1#name") - Stack: Runtime Stack instance - StackDefinition: Enterprise Pydantic stack definition

None
env str | None

Environment name from flowyml.yaml (e.g. "dev", "staging", "prod"). Resolves the stack from the project config's environments section.

None
dry_run bool

If True, resolve the stack, validate policies, and display the execution plan without actually running any steps.

False
orchestrator Any | None

Orchestrator override (takes precedence over stack orchestrator)

None
resources Any | None

Resource configuration for execution

None
docker_config Any | None

Docker configuration for containerized execution

None
context dict[str, Any] | None

Context variables override

None
auto_start_ui bool

Automatically start UI server if not running and display URL

True
**kwargs Any

Additional arguments passed to the orchestrator

{}
Note

The orchestrator is determined in this priority order: 1. Explicit orchestrator parameter (if provided) 2. Stack's orchestrator (if stack is set/active) 3. Default LocalOrchestrator

When using a stack (e.g., GCPStack), the stack's orchestrator is automatically used unless explicitly overridden. This is the recommended approach for production deployments.

Returns:

Type Description
PipelineResult

PipelineResult with outputs and execution info

Source code in flowyml/core/pipeline.py
def run(
    self,
    inputs: dict[str, Any] | None = None,
    debug: bool = False,
    stack: Any | None = None,  # Stack name (str), URI, Stack instance, or StackDefinition
    env: str | None = None,  # Environment from flowyml.yaml (e.g. 'dev', 'staging', 'prod')
    dry_run: bool = False,  # Validate without executing
    orchestrator: Any | None = None,  # Orchestrator override (takes precedence over stack orchestrator)
    resources: Any | None = None,  # ResourceConfig
    docker_config: Any | None = None,  # DockerConfig
    context: dict[str, Any] | None = None,  # Context vars override
    auto_start_ui: bool = True,  # Auto-start UI server
    **kwargs: Any,
) -> PipelineResult:
    """Execute the pipeline.

    Args:
        inputs: Optional input data for the pipeline
        debug: Enable debug mode with detailed logging
        stack: Stack to use for this run. Accepts:
            - ``str``: Stack name or URI (e.g. ``"local"``, ``"github://org/repo@v1#name"``)
            - ``Stack``: Runtime Stack instance
            - ``StackDefinition``: Enterprise Pydantic stack definition
        env: Environment name from ``flowyml.yaml`` (e.g. ``"dev"``, ``"staging"``,
            ``"prod"``). Resolves the stack from the project config's environments section.
        dry_run: If True, resolve the stack, validate policies, and display
            the execution plan without actually running any steps.
        orchestrator: Orchestrator override (takes precedence over stack orchestrator)
        resources: Resource configuration for execution
        docker_config: Docker configuration for containerized execution
        context: Context variables override
        auto_start_ui: Automatically start UI server if not running and display URL
        **kwargs: Additional arguments passed to the orchestrator

    Note:
        The orchestrator is determined in this priority order:
        1. Explicit `orchestrator` parameter (if provided)
        2. Stack's orchestrator (if stack is set/active)
        3. Default LocalOrchestrator

        When using a stack (e.g., GCPStack), the stack's orchestrator is automatically
        used unless explicitly overridden. This is the recommended approach for
        production deployments.

    Returns:
        PipelineResult with outputs and execution info
    """
    import uuid
    from flowyml.core.orchestrator import LocalOrchestrator
    from flowyml.core.checkpoint import PipelineCheckpoint
    from flowyml.utils.config import get_config
    from flowyml.plugins.integration import get_integration

    # Generate or use provided run_id
    run_id = kwargs.pop("run_id", None) or str(uuid.uuid4())

    # --- Transparent Experiment Tracking (dual-write) ---
    # Initialize plugin integration for external tracker forwarding.
    # The integration auto-resolves the tracker from stack/plugin config.
    # Internal FlowyML tracking is handled by _save_run() / _log_experiment_metrics().
    integration = None
    if self.enable_experiment_tracking:
        try:
            integration = get_integration()
            # Collect context params for tracker
            context_params = self.context.to_dict() if self.context else {}
            if context:
                context_params.update(context)
            # Build run tags
            run_tags = {}
            if self.project_name:
                run_tags["flowyml.project"] = self.project_name
            if self.stack:
                run_tags["flowyml.stack"] = getattr(self.stack, "name", str(type(self.stack).__name__))
            integration.on_pipeline_start(
                pipeline_name=self.name,
                run_id=run_id,
                context=context_params,
                tags=run_tags,
            )
        except Exception as e:
            import logging as _logging

            _logging.getLogger(__name__).debug(
                "External tracker integration start skipped: %s",
                e,
            )

    # Initialize checkpointing if enabled
    if self.enable_checkpointing:
        config = get_config()
        checkpoint = PipelineCheckpoint(
            run_id=run_id,
            checkpoint_dir=str(config.checkpoint_dir),
        )

        # Check if we should resume from checkpoint
        if checkpoint.exists():
            checkpoint_data = checkpoint.load()
            completed_steps = checkpoint_data.get("completed_steps", [])
            if completed_steps:
                # Auto-resume: use checkpoint state
                if hasattr(self, "_display") and self._display:
                    self._display.console.print(
                        f"[yellow]πŸ“¦ Resuming from checkpoint: {len(completed_steps)} steps already completed[/yellow]",
                    )
                # Store checkpoint info for orchestrator
                self._checkpoint = checkpoint
                self._resume_from_checkpoint = True
                self._completed_steps_from_checkpoint = set(completed_steps)
            else:
                self._checkpoint = checkpoint
                self._resume_from_checkpoint = False
                self._completed_steps_from_checkpoint = set()
        else:
            self._checkpoint = checkpoint
            self._resume_from_checkpoint = False
            self._completed_steps_from_checkpoint = set()
    else:
        self._checkpoint = None
        self._resume_from_checkpoint = False
        self._completed_steps_from_checkpoint = set()

    # Auto-start UI server if requested
    ui_url = None
    run_url = None
    ui_start_failed = False
    if auto_start_ui:
        ui_url, run_url, ui_start_failed = self._ensure_ui_server(run_id)

    # --- Unified Stack Resolution for this run ---
    if stack is not None or env is not None:
        resolved = self._resolve_stack_arg(stack, env)
        if resolved:
            self._apply_stack(resolved, locked=True)
    elif not self._stack_locked:
        active_stack = None
        try:
            from flowyml.stacks.registry import get_active_stack
        except ImportError:
            get_active_stack = None
        if get_active_stack:
            active_stack = get_active_stack()
        if active_stack:
            self._apply_stack(active_stack, locked=False)

    # --- Enterprise Policy Validation ---
    if self._stack_definition:
        try:
            from flowyml.stacks.enterprise.policy import PolicyEngine, PolicyContext

            engine = PolicyEngine()
            policy_ctx = PolicyContext(
                stack=self._stack_definition,
                project_name=getattr(self, "project_name", None),
                environment=env or self._env,
            )
            engine.check(policy_ctx)  # Raises PolicyViolationError on failure
        except ImportError:
            pass  # Enterprise module not loaded

    # --- Dry Run: display plan and return early ---
    if dry_run:
        if not self._built:
            self.build()

        stack_info = "local (default)"
        if self._stack_definition:
            sd = self._stack_definition
            stack_info = f"{sd.name} v{sd.version} (backend: {sd.backend})"
        elif self.stack:
            stack_info = getattr(self.stack, "name", str(type(self.stack).__name__))

        import logging

        logger = logging.getLogger(__name__)
        logger.info(f"πŸ” Dry run: Pipeline '{self.name}'")
        logger.info(f"   Stack: {stack_info}")
        logger.info(f"   Steps: {[s.name for s in self.steps]}")
        logger.info("   DAG validated: βœ“")
        logger.info("   Policy check: βœ“")

        result = PipelineResult(run_id, self.name)
        result.status = "dry_run"
        return result

    # Determine orchestrator
    # Priority: 1) Explicit orchestrator parameter, 2) Stack orchestrator, 3) Default LocalOrchestrator
    if orchestrator is None:
        # Use orchestrator from stack if available
        orchestrator = getattr(self.stack, "orchestrator", None) if self.stack else None
        if orchestrator is None:
            orchestrator = LocalOrchestrator()

    # Update context with provided values
    if context:
        self.context.update(context)

    # Build DAG if needed
    if not self._built:
        self.build()

    resource_config = self._coerce_resource_config(resources)
    docker_cfg = self._coerce_docker_config(docker_config)

    # Prepare Docker Image if running on a stack
    if self.stack and docker_cfg:
        try:
            # This handles building/pushing or validating the URI
            project_name = getattr(self, "project_name", None)
            docker_cfg.image = self.stack.prepare_docker_image(
                docker_cfg,
                pipeline_name=self.name,
                project_name=project_name,
            )
        except Exception as e:
            # If preparation fails (e.g. build error), we should probably fail the run
            # or at least warn. For now, we'll fail to prevent running with bad config
            raise RuntimeError(f"Failed to prepare docker image: {e}") from e

    # Initialize display system for beautiful CLI output
    display = None
    try:
        from flowyml.core.display import PipelineDisplay

        display = PipelineDisplay(
            pipeline_name=self.name,
            steps=self.steps,
            dag=self.dag,
            verbose=True,
            ui_url=ui_url,  # Pass UI URL for prominent display at start
            run_url=run_url,  # Pass run-specific URL for clickable link
        )
        display.show_header()
        display.show_execution_start()
    except Exception:
        # Silently fail if display system not available
        pass

    # Store display on pipeline for orchestrator to use
    self._display = display

    # Run the pipeline via orchestrator
    result = orchestrator.run_pipeline(
        self,
        run_id=run_id,
        resources=resource_config,
        docker_config=docker_cfg,
        inputs=inputs,
        context=context,
        **kwargs,
    )

    # Show summary (only if result is a PipelineResult, not a string or SubmissionResult)
    from flowyml.core.submission_result import SubmissionResult

    if isinstance(result, SubmissionResult):
        # Remote orchestrator returned a SubmissionResult β€” wrap it
        wrapper = PipelineResult(run_id, self.name)
        wrapper.attach_configs(resource_config, docker_cfg)
        wrapper.mark_submitted(result.job_id)
        wrapper.submission_result = result
        if display:
            meta = result.metadata or {}
            mode = meta.get("mode", "single_job")

            if mode == "group_orchestration":
                groups = meta.get("groups", [])
                total = meta.get("total_groups", len(groups))
                failed = meta.get("failed_group") or meta.get("failed_step")

                display.console.print(
                    "\n  [bold green]☁️  Group orchestration complete[/bold green]"
                    if not failed
                    else "\n  [bold red]☁️  Group orchestration failed[/bold red]",
                )
                display.console.print(
                    f"  Platform: [cyan]{meta.get('platform', 'vertex_ai')}[/cyan]"
                    f"  Project: [cyan]{meta.get('project', '')}[/cyan]"
                    f"  Region: [cyan]{meta.get('region', '')}[/cyan]",
                )
                display.console.print(f"  Execution units: [bold]{total}[/bold]\n")

                for g in groups:
                    status = g.get("status", "UNKNOWN")
                    icon = "βœ…" if status == "SUCCEEDED" else "❌"
                    steps_str = ", ".join(g.get("steps", []))
                    machine = g.get("machine_type", "auto")
                    display.console.print(
                        f"  {icon} [bold]{g['group_name']}[/bold]" f"  ({machine})" f"  β†’ {steps_str}",
                    )
                if failed:
                    display.console.print(
                        f"\n  [red]Failed at: {failed}[/red]",
                    )
                    if meta.get("error"):
                        display.console.print(f"  [red]{meta['error']}[/red]")
            else:
                display.console.print(
                    "\n  [bold green]☁️  Job submitted to remote orchestrator[/bold green]",
                )
                display.console.print(f"  Job ID: {result.job_id}")
                if meta:
                    for k, v in meta.items():
                        if k != "groups":
                            display.console.print(f"  {k}: {v}")
        self._save_run(wrapper)
        self._save_pipeline_definition()
        return wrapper

    if display and not isinstance(result, str):
        display.show_summary(result, ui_url=ui_url, run_url=run_url)

    # If result is just a job ID (remote execution), wrap it in a basic result
    if isinstance(result, str):
        # Create a submitted result wrapper
        wrapper = PipelineResult(run_id, self.name)
        wrapper.attach_configs(resource_config, docker_cfg)
        wrapper.mark_submitted(result)
        self._save_run(wrapper)
        self._save_pipeline_definition()
        return wrapper

    # Auto-freeze pipeline snapshot for reproducibility
    try:
        from flowyml.core.versioning import freeze_pipeline

        snapshot = freeze_pipeline(self)
        if hasattr(result, "snapshot_hash"):
            result.snapshot_hash = snapshot.snapshot_hash
    except Exception:
        pass  # Don't fail run if snapshot fails

    # Ensure result has configs attached (in case orchestrator didn't do it)
    if hasattr(result, "attach_configs") and not hasattr(result, "resource_config"):
        result.attach_configs(resource_config, docker_cfg)

    # --- End External Tracker Run (dual-write) ---
    if integration is not None:
        try:
            is_success = getattr(result, "success", False)
            integration.on_pipeline_end(
                success=is_success,
                result=result,
            )
        except Exception as e:
            import logging as _logging

            _logging.getLogger(__name__).debug(
                "External tracker integration end skipped: %s",
                e,
            )

    return result

schedule(schedule_type: str, value: str | int, **kwargs) -> Any

Schedule this pipeline to run automatically.

Parameters:

Name Type Description Default
schedule_type str

Type of schedule ('cron', 'interval', 'daily', 'hourly')

required
value str | int

Schedule value (cron expression, seconds, 'HH:MM', or minute)

required
**kwargs

Additional arguments for scheduler

{}

Returns:

Type Description
Any

Schedule object

Source code in flowyml/core/pipeline.py
def schedule(
    self,
    schedule_type: str,
    value: str | int,
    **kwargs,
) -> Any:
    """Schedule this pipeline to run automatically.

    Args:
        schedule_type: Type of schedule ('cron', 'interval', 'daily', 'hourly')
        value: Schedule value (cron expression, seconds, 'HH:MM', or minute)
        **kwargs: Additional arguments for scheduler

    Returns:
        Schedule object
    """
    from flowyml.core.scheduler import PipelineScheduler

    scheduler = PipelineScheduler()

    if schedule_type == "cron":
        return scheduler.schedule_cron(self.name, self.run, str(value), **kwargs)
    elif schedule_type == "interval":
        return scheduler.schedule_interval(self.name, self.run, seconds=int(value), **kwargs)
    elif schedule_type == "daily":
        if isinstance(value, str) and ":" in value:
            h, m = map(int, value.split(":"))
            return scheduler.schedule_daily(self.name, self.run, hour=h, minute=m, **kwargs)
        else:
            raise ValueError("Daily schedule value must be 'HH:MM'")
    elif schedule_type == "hourly":
        return scheduler.schedule_hourly(self.name, self.run, minute=int(value), **kwargs)
    else:
        raise ValueError(f"Unknown schedule type: {schedule_type}")

to_definition() -> dict

Serialize pipeline to definition for storage and reconstruction.

Source code in flowyml/core/pipeline.py
def to_definition(self) -> dict:
    """Serialize pipeline to definition for storage and reconstruction."""
    if not self._built:
        self.build()

    return {
        "name": self.name,
        "steps": [
            {
                "name": step.name,
                "inputs": step.inputs,
                "outputs": step.outputs,
                "source_code": step.source_code,
                "tags": step.tags,
                "execution_group": step.execution_group,
            }
            for step in self.steps
        ],
        "dag": {
            "nodes": [
                {
                    "name": node.name,
                    "inputs": node.inputs,
                    "outputs": node.outputs,
                }
                for node in self.dag.nodes.values()
            ],
            "edges": [
                {"source": dep, "target": node_name} for node_name, deps in self.dag.edges.items() for dep in deps
            ],
        },
    }

visualize() -> str

Generate pipeline visualization.

Source code in flowyml/core/pipeline.py
def visualize(self) -> str:
    """Generate pipeline visualization."""
    if not self._built:
        self.build()
    return self.dag.visualize()

πŸš€ What's Next?

πŸ‘£ Step API

Decorator options, caching, retries, resource requirements, and input/output contracts.

View Step API β†’

πŸ“œ Context API

Parameter injection, environment-specific configs, and runtime overrides.

View Context API β†’

πŸ“¦ Assets API

Model, Dataset, and Metrics β€” first-class artifacts with lineage tracking.

View Assets API β†’