Skip to content

Pipeline API πŸ—οΈ

The Pipeline class is the main entry point for defining and running workflows in flowyml.

Usage

1
2
3
4
5
from flowyml import Pipeline

pipeline = Pipeline("my_pipeline")
pipeline.add_step(step_func)
pipeline.run()

Class Pipeline

Main pipeline class for orchestrating ML workflows.

Example

from flowyml import Pipeline, step, context ctx = context(learning_rate=0.001, epochs=10) @step(outputs=["model/trained"]) ... def train(learning_rate: float, epochs: int): ... return train_model(learning_rate, epochs)

Option 1: Auto-discover all @step-decorated functions

pipeline = Pipeline("my_pipeline", context=ctx, auto_discover=True) result = pipeline.run()

Option 2: Concise explicit selection

pipeline = Pipeline.from_steps(train, name="my_pipeline", context=ctx)

Option 3: Batch add

pipeline = Pipeline("my_pipeline", context=ctx) pipeline.add_steps([train])

Option 4: Manual add_step (existing, still works)

pipeline = Pipeline("my_pipeline", context=ctx) pipeline.add_step(train) result = pipeline.run()

With project_name, automatically creates/attaches to project

pipeline = Pipeline("my_pipeline", context=ctx, project_name="ml_project")

With version parameter, automatically creates VersionedPipeline

pipeline = Pipeline("my_pipeline", context=ctx, version="v1.0.1", project_name="ml_project")

Initialize pipeline.

Parameters:

Name Type Description Default
name str

Name of the pipeline

required
context Context | None

Optional context for parameter injection

None
executor Executor | None

Optional executor (defaults to LocalExecutor)

None
enable_cache bool

Whether to enable caching

True
enable_checkpointing bool | None

Whether to enable checkpointing (defaults to config setting, True by default)

None
enable_experiment_tracking bool | None

Whether to enable automatic experiment tracking (defaults to config.auto_log_metrics, True by default)

None
cache_dir str | None

Optional directory for cache

None
stack Any | None

Optional stack instance to run on

None
project str | None

Optional project name to attach this pipeline to (deprecated, use project_name)

None
project_name str | None

Optional project name to attach this pipeline to. If the project doesn't exist, it will be created automatically.

None
version str | None

Optional version string. If provided, a VersionedPipeline instance will be created instead of a regular Pipeline.

None
auto_discover bool

If True, automatically discover all @step-decorated functions from the global registry at build time. Steps with a matching pipeline tag are preferred. Defaults to False.

False
**kwargs Any

Additional keyword arguments passed to the pipeline.

{}
Source code in flowyml/core/pipeline.py
def __init__(
    self,
    name: str,
    context: Context | None = None,
    executor: Executor | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool | None = None,  # None means use config default
    enable_experiment_tracking: bool | None = None,  # None means use config default (True)
    cache_dir: str | None = None,
    stack: Any | None = None,  # Stack instance
    project: str | None = None,  # Project name to attach to (deprecated, use project_name)
    project_name: str | None = None,  # Project name to attach to (creates if doesn't exist)
    version: str | None = None,  # If provided, VersionedPipeline is created via __new__
    auto_discover: bool = False,  # Auto-discover @step-decorated functions
    **kwargs: Any,
):
    """Initialize pipeline.

    Args:
        name: Name of the pipeline
        context: Optional context for parameter injection
        executor: Optional executor (defaults to LocalExecutor)
        enable_cache: Whether to enable caching
        enable_checkpointing: Whether to enable checkpointing (defaults to config setting, True by default)
        enable_experiment_tracking: Whether to enable automatic experiment tracking (defaults to config.auto_log_metrics, True by default)
        cache_dir: Optional directory for cache
        stack: Optional stack instance to run on
        project: Optional project name to attach this pipeline to (deprecated, use project_name)
        project_name: Optional project name to attach this pipeline to.
            If the project doesn't exist, it will be created automatically.
        version: Optional version string. If provided, a VersionedPipeline
            instance will be created instead of a regular Pipeline.
        auto_discover: If True, automatically discover all ``@step``-decorated
            functions from the global registry at build time. Steps with a
            matching ``pipeline`` tag are preferred. Defaults to False.
        **kwargs: Additional keyword arguments passed to the pipeline.
    """
    from flowyml.utils.config import get_config

    self.name = name
    self.context = context or Context()
    self.enable_cache = enable_cache

    # Set checkpointing (use config default if not specified)
    config = get_config()
    self.enable_checkpointing = (
        enable_checkpointing if enable_checkpointing is not None else config.enable_checkpointing
    )

    # Set experiment tracking (use config default if not specified, default: True)
    # Can be set via enable_experiment_tracking parameter or defaults to config.auto_log_metrics
    self.enable_experiment_tracking = (
        enable_experiment_tracking if enable_experiment_tracking is not None else config.auto_log_metrics
    )
    self.stack = None  # Will be assigned via _apply_stack
    self._stack_locked = stack is not None
    self._provided_executor = executor

    self.steps: list[Step] = []
    self.dag = DAG()

    # Storage
    if cache_dir is None:
        from flowyml.utils.config import get_config

        cache_dir = str(get_config().cache_dir)

    self.cache_store = CacheStore(cache_dir) if enable_cache else None

    from flowyml.utils.config import get_config

    self.runs_dir = get_config().runs_dir
    self.runs_dir.mkdir(parents=True, exist_ok=True)

    # Initialize components from stack or defaults
    self.executor = executor or LocalExecutor()
    # Metadata store for UI integration - use same store as UI
    from flowyml.storage.metadata import SQLiteMetadataStore
    from flowyml.utils.config import get_config
    import os

    config = get_config()
    # Use simple environment variable check to allow connecting to shared DB
    db_url = os.environ.get("FLOWYML_DATABASE_URL")

    if db_url:
        self.metadata_store = SQLiteMetadataStore(db_url=db_url)
    else:
        # Use the same metadata database path as the UI to ensure visibility
        self.metadata_store = SQLiteMetadataStore(db_path=str(config.metadata_db))

    if stack:
        self._apply_stack(stack, locked=True)
    else:
        # Auto-resolve active stack from flowyml.yaml / FLOWYML_STACK env var
        try:
            from flowyml.plugins.stack_config import get_active_stack as _get_yaml_stack

            yaml_stack = _get_yaml_stack()
            if yaml_stack is not None:
                live_stack = yaml_stack.to_stack()
                self._apply_stack(live_stack, locked=False)
        except Exception:
            pass  # No config file or parse error β€” continue with defaults

    # Handle Project Attachment
    # Support both project_name (preferred) and project (for backward compatibility)
    project_to_use = project_name or project
    if project_to_use:
        from flowyml.core.project import ProjectManager

        manager = ProjectManager()
        # Get or create project
        proj = manager.get_project(project_to_use)
        if not proj:
            proj = manager.create_project(project_to_use)

        # Configure pipeline with project settings
        self.runs_dir = proj.runs_dir
        self.metadata_store = proj.metadata_store

        # Register pipeline with project
        if name not in proj.metadata["pipelines"]:
            proj.metadata["pipelines"].append(name)
            proj._save_metadata()

        # Store project name for later use (e.g., in _save_run)
        self.project_name = project_to_use
    else:
        self.project_name = None

    # State
    self._built = False
    self._auto_discover = auto_discover
    self.step_groups: list[Any] = []  # Will hold StepGroup objects
    self.control_flows: list[Any] = []  # Store conditional control flows (If, Switch, etc.)

Functions

__new__(name: str, version: str | None = None, project_name: str | None = None, project: str | None = None, **kwargs: Any)

Create a Pipeline or VersionedPipeline instance.

If version is provided, automatically returns a VersionedPipeline instance. Otherwise, returns a regular Pipeline instance.

Source code in flowyml/core/pipeline.py
def __new__(
    cls,
    name: str,
    version: str | None = None,
    project_name: str | None = None,
    project: str | None = None,  # For backward compatibility
    **kwargs: Any,
):
    """Create a Pipeline or VersionedPipeline instance.

    If version is provided, automatically returns a VersionedPipeline instance.
    Otherwise, returns a regular Pipeline instance.
    """
    if version is not None:
        from flowyml.core.versioning import VersionedPipeline

        # Pass project_name or project to VersionedPipeline
        vp_kwargs = kwargs.copy()
        if project_name:
            vp_kwargs["project_name"] = project_name
        elif project:
            vp_kwargs["project"] = project
        return VersionedPipeline(name=name, version=version, **vp_kwargs)
    return super().__new__(cls)

add_control_flow(control_flow: Any) -> Pipeline

Add conditional control flow to the pipeline.

Parameters:

Name Type Description Default
control_flow Any

Control flow object (If, Switch, etc.)

required

Returns:

Type Description
Pipeline

Self for chaining

Example
1
2
3
4
5
6
7
8
9
from flowyml import If

pipeline.add_control_flow(
    If(
        condition=lambda ctx: ctx.steps["evaluate_model"].outputs["accuracy"] > 0.9,
        then_step=deploy_model,
        else_step=retrain_model,
    )
)
Source code in flowyml/core/pipeline.py
def add_control_flow(self, control_flow: Any) -> "Pipeline":
    """Add conditional control flow to the pipeline.

    Args:
        control_flow: Control flow object (If, Switch, etc.)

    Returns:
        Self for chaining

    Example:
        ```python
        from flowyml import If

        pipeline.add_control_flow(
            If(
                condition=lambda ctx: ctx.steps["evaluate_model"].outputs["accuracy"] > 0.9,
                then_step=deploy_model,
                else_step=retrain_model,
            )
        )
        ```
    """
    self.control_flows.append(control_flow)
    self._built = False
    return self

add_step(step: Step) -> Pipeline

Add a step to the pipeline.

Parameters:

Name Type Description Default
step Step

Step to add

required

Returns:

Type Description
Pipeline

Self for chaining

Source code in flowyml/core/pipeline.py
def add_step(self, step: Step) -> "Pipeline":
    """Add a step to the pipeline.

    Args:
        step: Step to add

    Returns:
        Self for chaining
    """
    self.steps.append(step)
    self._built = False
    return self

add_steps(steps: list[Step]) -> Pipeline

Add multiple steps to the pipeline at once.

Parameters:

Name Type Description Default
steps list[Step]

List of Step instances to add

required

Returns:

Type Description
Pipeline

Self for chaining

Example

pipeline.add_steps([load_data, train_model, evaluate])

Source code in flowyml/core/pipeline.py
def add_steps(self, steps: list[Step]) -> "Pipeline":
    """Add multiple steps to the pipeline at once.

    Args:
        steps: List of Step instances to add

    Returns:
        Self for chaining

    Example:
        >>> pipeline.add_steps([load_data, train_model, evaluate])
    """
    for s in steps:
        self.steps.append(s)
    self._built = False
    return self

add_sub_pipeline(pipeline: Any, name: str | None = None, inputs: list[str] | None = None, outputs: list[str] | None = None, input_mapping: dict[str, str] | None = None, output_mapping: dict[str, str] | None = None, **kwargs: Any) -> Pipeline

Add a sub-pipeline as a step in this pipeline.

The sub-pipeline's steps will execute as a single unit within this pipeline's execution flow.

Parameters:

Name Type Description Default
pipeline Any

Pipeline to nest as a step

required
name str | None

Optional step name (defaults to sub_pipeline.name)

None
inputs list[str] | None

Input asset names from this pipeline

None
outputs list[str] | None

Output asset names exposed to this pipeline

None
input_mapping dict[str, str] | None

Maps this pipeline's output names to child input names

None
output_mapping dict[str, str] | None

Maps child output names to this pipeline's input names

None
**kwargs Any

Additional SubPipelineStep configuration

{}

Returns:

Type Description
Pipeline

Self for chaining

Example

preprocess = Pipeline("preprocessing") preprocess.add_step(clean).add_step(normalize)

parent = Pipeline("training") parent.add_sub_pipeline(preprocess, inputs=["raw"], outputs=["clean"]) parent.add_step(train_model)

Source code in flowyml/core/pipeline.py
def add_sub_pipeline(
    self,
    pipeline: Any,
    name: str | None = None,
    inputs: list[str] | None = None,
    outputs: list[str] | None = None,
    input_mapping: dict[str, str] | None = None,
    output_mapping: dict[str, str] | None = None,
    **kwargs: Any,
) -> "Pipeline":
    """Add a sub-pipeline as a step in this pipeline.

    The sub-pipeline's steps will execute as a single unit within
    this pipeline's execution flow.

    Args:
        pipeline: Pipeline to nest as a step
        name: Optional step name (defaults to sub_pipeline.name)
        inputs: Input asset names from this pipeline
        outputs: Output asset names exposed to this pipeline
        input_mapping: Maps this pipeline's output names to child input names
        output_mapping: Maps child output names to this pipeline's input names
        **kwargs: Additional SubPipelineStep configuration

    Returns:
        Self for chaining

    Example:
        >>> preprocess = Pipeline("preprocessing")
        >>> preprocess.add_step(clean).add_step(normalize)
        >>>
        >>> parent = Pipeline("training")
        >>> parent.add_sub_pipeline(preprocess, inputs=["raw"], outputs=["clean"])
        >>> parent.add_step(train_model)
    """
    from flowyml.core.subpipeline import SubPipelineStep

    sub_step = SubPipelineStep(
        sub_pipeline=pipeline,
        name=name,
        inputs=inputs,
        outputs=outputs,
        input_mapping=input_mapping,
        output_mapping=output_mapping,
        **kwargs,
    )
    return self.add_step(sub_step)

build() -> None

Build the execution DAG with type validation.

Source code in flowyml/core/pipeline.py
def build(self) -> None:
    """Build the execution DAG with type validation."""
    if self._built:
        return

    # Auto-discover steps from global registry if enabled
    if self._auto_discover and not self.steps:
        from flowyml.core.step import get_registered_steps

        discovered = get_registered_steps(pipeline=self.name)
        if not discovered:
            discovered = get_registered_steps()
        self.steps = list(discovered)

    # Clear previous DAG
    self.dag = DAG()

    # Add nodes
    for step in self.steps:
        node = Node(
            name=step.name,
            step=step,
            inputs=step.inputs,
            outputs=step.outputs,
        )
        self.dag.add_node(node)

    # Build edges
    self.dag.build_edges()

    # Validate DAG structure (now returns errors + warnings)
    validation_result = self.dag.validate()

    # Handle both old (list) and new (tuple) return formats
    if isinstance(validation_result, tuple):
        errors, warnings = validation_result
    else:
        errors = validation_result
        warnings = []

    # Log warnings (don't fail the build)
    if warnings:
        import logging

        logger = logging.getLogger(__name__)
        for w in warnings:
            logger.warning(f"Pipeline '{self.name}': {w}")

    if errors:
        raise ValueError("Pipeline validation failed:\n" + "\n".join(errors))

    # Type validation across connections
    try:
        from flowyml.core.type_validator import validate_pipeline

        type_errors, type_warnings = validate_pipeline(self.dag, self.steps)

        if type_warnings:
            import logging

            logger = logging.getLogger(__name__)
            for tw in type_warnings:
                logger.warning(f"Pipeline '{self.name}': {tw}")

        if type_errors:
            error_messages = [str(e) for e in type_errors]
            raise ValueError(
                "Pipeline type validation failed:\n" + "\n".join(error_messages),
            )
    except ImportError:
        pass  # type_validator not available, skip

    # Analyze step groups
    from flowyml.core.step_grouping import StepGroupAnalyzer

    analyzer = StepGroupAnalyzer()
    self.step_groups = analyzer.analyze_groups(self.dag, self.steps)

    self._built = True

cache_stats() -> dict[str, Any]

Get cache statistics.

Source code in flowyml/core/pipeline.py
def cache_stats(self) -> dict[str, Any]:
    """Get cache statistics."""
    if self.cache_store:
        return self.cache_store.stats()
    return {}

check_cache() -> dict[str, Any] | None

Check if a successful run of this pipeline already exists.

Returns:

Type Description
dict[str, Any] | None

Metadata of the last successful run, or None if not found.

Source code in flowyml/core/pipeline.py
def check_cache(self) -> dict[str, Any] | None:
    """Check if a successful run of this pipeline already exists.

    Returns:
        Metadata of the last successful run, or None if not found.
    """
    # Query metadata store for successful runs of this pipeline
    try:
        runs = self.metadata_store.query(
            pipeline_name=self.name,
            status="completed",
        )

        if runs:
            # Return the most recent one (query returns ordered by created_at DESC)
            return runs[0]
    except Exception as e:
        # Don't fail if metadata store is not available or errors
        print(f"Warning: Failed to check cache: {e}")

    return None

from_definition(definition: dict, context: Context | None = None) -> Pipeline classmethod

Reconstruct pipeline from stored definition.

This creates a "ghost" pipeline that can be executed but uses the stored step structure. Actual step logic must still be available in the codebase.

Parameters:

Name Type Description Default
definition dict

Pipeline definition from to_definition()

required
context Context | None

Optional context for execution

None

Returns:

Type Description
Pipeline

Reconstructed Pipeline instance

Source code in flowyml/core/pipeline.py
@classmethod
def from_definition(cls, definition: dict, context: Context | None = None) -> "Pipeline":
    """Reconstruct pipeline from stored definition.

    This creates a "ghost" pipeline that can be executed but uses
    the stored step structure. Actual step logic must still be
    available in the codebase.

    Args:
        definition: Pipeline definition from to_definition()
        context: Optional context for execution

    Returns:
        Reconstructed Pipeline instance
    """
    from flowyml.core.step import step as step_decorator

    # Create pipeline instance
    pipeline = cls(
        name=definition["name"],
        context=context or Context(),
    )

    # Reconstruct steps
    for step_def in definition["steps"]:
        # Create a generic step function that can be called
        # In a real implementation, we'd need to either:
        # 1. Store serialized functions (using cloudpickle)
        # 2. Import functions by name from codebase
        # 3. Use placeholder functions

        # For now, we'll create a placeholder that logs execution
        def generic_step_func(*args, **kwargs):
            """Generic step function for reconstructed pipeline."""
            print(f"Executing reconstructed step with args={args}, kwargs={kwargs}")
            return

        # Apply step decorator with stored metadata
        decorated = step_decorator(
            name=step_def["name"],
            inputs=step_def["inputs"],
            outputs=step_def["outputs"],
            tags=step_def.get("tags", []),
        )(generic_step_func)

        # Add to pipeline
        pipeline.add_step(decorated)

    return pipeline

from_steps(*steps: Step, name: str, **kwargs: Any) -> Pipeline classmethod

Create a pipeline from an explicit list of steps.

Convenience constructor that avoids repetitive add_step() calls while still giving you full control over which steps are included.

Parameters:

Name Type Description Default
*steps Step

Step instances to include

()
name str

Pipeline name (keyword-only)

required
**kwargs Any

Additional arguments passed to Pipeline()

{}

Returns:

Type Description
Pipeline

Configured Pipeline instance

Example

pipeline = Pipeline.from_steps( ... load_data, ... train_model, ... evaluate, ... name="training", ... enable_cache=False, ... )

Source code in flowyml/core/pipeline.py
@classmethod
def from_steps(
    cls,
    *steps: Step,
    name: str,
    **kwargs: Any,
) -> "Pipeline":
    """Create a pipeline from an explicit list of steps.

    Convenience constructor that avoids repetitive ``add_step()`` calls
    while still giving you full control over which steps are included.

    Args:
        *steps: Step instances to include
        name: Pipeline name (keyword-only)
        **kwargs: Additional arguments passed to Pipeline()

    Returns:
        Configured Pipeline instance

    Example:
        >>> pipeline = Pipeline.from_steps(
        ...     load_data,
        ...     train_model,
        ...     evaluate,
        ...     name="training",
        ...     enable_cache=False,
        ... )
    """
    pipeline = cls(name=name, **kwargs)
    pipeline.add_steps(list(steps))
    return pipeline

invalidate_cache(step: str | None = None, before: str | None = None) -> None

Invalidate cache entries.

Parameters:

Name Type Description Default
step str | None

Invalidate cache for specific step

None
before str | None

Invalidate cache entries before date

None
Source code in flowyml/core/pipeline.py
def invalidate_cache(
    self,
    step: str | None = None,
    before: str | None = None,
) -> None:
    """Invalidate cache entries.

    Args:
        step: Invalidate cache for specific step
        before: Invalidate cache entries before date
    """
    if self.cache_store:
        if step:
            self.cache_store.invalidate(step_name=step)
        else:
            self.cache_store.clear()

rerun(run_id: str, from_step: str | None = None, **kwargs: Any) -> PipelineResult

Re-run a pipeline from a checkpoint, resuming from where it left off.

Parameters:

Name Type Description Default
run_id str

The run ID of the previous execution to resume from.

required
from_step str | None

Optional step name to start re-execution from. If provided, all steps before this one are skipped. If not provided, resumes from the first non-completed step.

None
**kwargs Any

Additional arguments passed to Pipeline.run().

{}

Returns:

Type Description
PipelineResult

PipelineResult with outputs from the resumed run.

Examples:

1
2
3
4
>>> # Resume from last checkpoint
>>> result = pipeline.rerun(run_id="previous-run-id")
>>> # Resume from a specific step
>>> result = pipeline.rerun(run_id="previous-run-id", from_step="train_model")
Source code in flowyml/core/pipeline.py
def rerun(
    self,
    run_id: str,
    from_step: str | None = None,
    **kwargs: Any,
) -> "PipelineResult":
    """Re-run a pipeline from a checkpoint, resuming from where it left off.

    Args:
        run_id: The run ID of the previous execution to resume from.
        from_step: Optional step name to start re-execution from.
                   If provided, all steps before this one are skipped.
                   If not provided, resumes from the first non-completed step.
        **kwargs: Additional arguments passed to Pipeline.run().

    Returns:
        PipelineResult with outputs from the resumed run.

    Examples:
        >>> # Resume from last checkpoint
        >>> result = pipeline.rerun(run_id="previous-run-id")
        >>> # Resume from a specific step
        >>> result = pipeline.rerun(run_id="previous-run-id", from_step="train_model")
    """
    from flowyml.core.checkpoint import PipelineCheckpoint
    from flowyml.utils.config import get_config

    config = get_config()
    checkpoint = PipelineCheckpoint(
        run_id=run_id,
        checkpoint_dir=str(config.checkpoint_dir),
    )

    if not checkpoint.exists():
        raise ValueError(
            f"No checkpoint found for run_id='{run_id}'. "
            "Cannot resume β€” run the pipeline fresh with pipeline.run() instead.",
        )

    # Determine which steps to skip
    completed = checkpoint.get_completed_steps()

    if from_step:
        # Skip only steps that come before from_step
        steps_to_skip = set()
        for step_name in completed:
            if step_name == from_step:
                break
            steps_to_skip.add(step_name)
    else:
        # Skip all completed steps (resume from first non-completed)
        steps_to_skip = set(completed) - {"pipeline_complete"}

    # Set checkpoint state on pipeline for orchestrator to use
    self._checkpoint = checkpoint
    self._resume_from_checkpoint = True
    self._completed_steps_from_checkpoint = steps_to_skip

    import logging

    logger = logging.getLogger("flowyml.checkpoint")
    logger.info(
        f"Re-running pipeline '{self.name}' from run '{run_id}'. "
        f"Skipping {len(steps_to_skip)} completed step(s)."
        + (f" Starting from step '{from_step}'." if from_step else ""),
    )

    # Run with the checkpoint state already configured
    return self.run(run_id=run_id, **kwargs)

run(inputs: dict[str, Any] | None = None, debug: bool = False, stack: Any | None = None, orchestrator: Any | None = None, resources: Any | None = None, docker_config: Any | None = None, context: dict[str, Any] | None = None, auto_start_ui: bool = True, **kwargs: Any) -> PipelineResult

Execute the pipeline.

Parameters:

Name Type Description Default
inputs dict[str, Any] | None

Optional input data for the pipeline

None
debug bool

Enable debug mode with detailed logging

False
stack Any | None

Stack override (uses self.stack or active stack if not provided)

None
orchestrator Any | None

Orchestrator override (takes precedence over stack orchestrator)

None
resources Any | None

Resource configuration for execution

None
docker_config Any | None

Docker configuration for containerized execution

None
context dict[str, Any] | None

Context variables override

None
auto_start_ui bool

Automatically start UI server if not running and display URL

True
**kwargs Any

Additional arguments passed to the orchestrator

{}
Note

The orchestrator is determined in this priority order: 1. Explicit orchestrator parameter (if provided) 2. Stack's orchestrator (if stack is set/active) 3. Default LocalOrchestrator

When using a stack (e.g., GCPStack), the stack's orchestrator is automatically used unless explicitly overridden. This is the recommended approach for production deployments.

Returns:

Type Description
PipelineResult

PipelineResult with outputs and execution info

Source code in flowyml/core/pipeline.py
def run(
    self,
    inputs: dict[str, Any] | None = None,
    debug: bool = False,
    stack: Any | None = None,  # Stack override
    orchestrator: Any | None = None,  # Orchestrator override (takes precedence over stack orchestrator)
    resources: Any | None = None,  # ResourceConfig
    docker_config: Any | None = None,  # DockerConfig
    context: dict[str, Any] | None = None,  # Context vars override
    auto_start_ui: bool = True,  # Auto-start UI server
    **kwargs: Any,
) -> PipelineResult:
    """Execute the pipeline.

    Args:
        inputs: Optional input data for the pipeline
        debug: Enable debug mode with detailed logging
        stack: Stack override (uses self.stack or active stack if not provided)
        orchestrator: Orchestrator override (takes precedence over stack orchestrator)
        resources: Resource configuration for execution
        docker_config: Docker configuration for containerized execution
        context: Context variables override
        auto_start_ui: Automatically start UI server if not running and display URL
        **kwargs: Additional arguments passed to the orchestrator

    Note:
        The orchestrator is determined in this priority order:
        1. Explicit `orchestrator` parameter (if provided)
        2. Stack's orchestrator (if stack is set/active)
        3. Default LocalOrchestrator

        When using a stack (e.g., GCPStack), the stack's orchestrator is automatically
        used unless explicitly overridden. This is the recommended approach for
        production deployments.

    Returns:
        PipelineResult with outputs and execution info
    """
    import uuid
    from flowyml.core.orchestrator import LocalOrchestrator
    from flowyml.core.checkpoint import PipelineCheckpoint
    from flowyml.utils.config import get_config

    # Generate or use provided run_id
    run_id = kwargs.pop("run_id", None) or str(uuid.uuid4())

    # Initialize checkpointing if enabled
    if self.enable_checkpointing:
        config = get_config()
        checkpoint = PipelineCheckpoint(
            run_id=run_id,
            checkpoint_dir=str(config.checkpoint_dir),
        )

        # Check if we should resume from checkpoint
        if checkpoint.exists():
            checkpoint_data = checkpoint.load()
            completed_steps = checkpoint_data.get("completed_steps", [])
            if completed_steps:
                # Auto-resume: use checkpoint state
                if hasattr(self, "_display") and self._display:
                    self._display.console.print(
                        f"[yellow]πŸ“¦ Resuming from checkpoint: {len(completed_steps)} steps already completed[/yellow]",
                    )
                # Store checkpoint info for orchestrator
                self._checkpoint = checkpoint
                self._resume_from_checkpoint = True
                self._completed_steps_from_checkpoint = set(completed_steps)
            else:
                self._checkpoint = checkpoint
                self._resume_from_checkpoint = False
                self._completed_steps_from_checkpoint = set()
        else:
            self._checkpoint = checkpoint
            self._resume_from_checkpoint = False
            self._completed_steps_from_checkpoint = set()
    else:
        self._checkpoint = None
        self._resume_from_checkpoint = False
        self._completed_steps_from_checkpoint = set()

    # Auto-start UI server if requested
    ui_url = None
    run_url = None
    ui_start_failed = False
    if auto_start_ui:
        ui_url, run_url, ui_start_failed = self._ensure_ui_server(run_id)

    # Determine stack for this run
    if stack is not None:
        self._apply_stack(stack, locked=True)
    elif not self._stack_locked:
        active_stack = None
        try:
            from flowyml.stacks.registry import get_active_stack
        except ImportError:
            get_active_stack = None
        if get_active_stack:
            active_stack = get_active_stack()
        if active_stack:
            self._apply_stack(active_stack, locked=False)

    # Determine orchestrator
    # Priority: 1) Explicit orchestrator parameter, 2) Stack orchestrator, 3) Default LocalOrchestrator
    if orchestrator is None:
        # Use orchestrator from stack if available
        orchestrator = getattr(self.stack, "orchestrator", None) if self.stack else None
        if orchestrator is None:
            orchestrator = LocalOrchestrator()

    # Update context with provided values
    if context:
        self.context.update(context)

    # Build DAG if needed
    if not self._built:
        self.build()

    resource_config = self._coerce_resource_config(resources)
    docker_cfg = self._coerce_docker_config(docker_config)

    # Prepare Docker Image if running on a stack
    if self.stack and docker_cfg:
        try:
            # This handles building/pushing or validating the URI
            project_name = getattr(self, "project_name", None)
            docker_cfg.image = self.stack.prepare_docker_image(
                docker_cfg,
                pipeline_name=self.name,
                project_name=project_name,
            )
        except Exception as e:
            # If preparation fails (e.g. build error), we should probably fail the run
            # or at least warn. For now, we'll fail to prevent running with bad config
            raise RuntimeError(f"Failed to prepare docker image: {e}") from e

    # Initialize display system for beautiful CLI output
    display = None
    try:
        from flowyml.core.display import PipelineDisplay

        display = PipelineDisplay(
            pipeline_name=self.name,
            steps=self.steps,
            dag=self.dag,
            verbose=True,
            ui_url=ui_url,  # Pass UI URL for prominent display at start
            run_url=run_url,  # Pass run-specific URL for clickable link
        )
        display.show_header()
        display.show_execution_start()
    except Exception:
        # Silently fail if display system not available
        pass

    # Store display on pipeline for orchestrator to use
    self._display = display

    # Run the pipeline via orchestrator
    result = orchestrator.run_pipeline(
        self,
        run_id=run_id,
        resources=resource_config,
        docker_config=docker_cfg,
        inputs=inputs,
        context=context,
        **kwargs,
    )

    # Show summary (only if result is a PipelineResult, not a string)
    if display and not isinstance(result, str):
        display.show_summary(result, ui_url=ui_url, run_url=run_url)

    # If result is just a job ID (remote execution), wrap it in a basic result
    if isinstance(result, str):
        # Create a submitted result wrapper
        wrapper = PipelineResult(run_id, self.name)
        wrapper.attach_configs(resource_config, docker_cfg)
        wrapper.mark_submitted(result)
        self._save_run(wrapper)
        self._save_pipeline_definition()
        return wrapper

    # Auto-freeze pipeline snapshot for reproducibility
    try:
        from flowyml.core.versioning import freeze_pipeline

        snapshot = freeze_pipeline(self)
        if hasattr(result, "snapshot_hash"):
            result.snapshot_hash = snapshot.snapshot_hash
    except Exception:
        pass  # Don't fail run if snapshot fails

    # Ensure result has configs attached (in case orchestrator didn't do it)
    if hasattr(result, "attach_configs") and not hasattr(result, "resource_config"):
        result.attach_configs(resource_config, docker_cfg)

    return result

schedule(schedule_type: str, value: str | int, **kwargs) -> Any

Schedule this pipeline to run automatically.

Parameters:

Name Type Description Default
schedule_type str

Type of schedule ('cron', 'interval', 'daily', 'hourly')

required
value str | int

Schedule value (cron expression, seconds, 'HH:MM', or minute)

required
**kwargs

Additional arguments for scheduler

{}

Returns:

Type Description
Any

Schedule object

Source code in flowyml/core/pipeline.py
def schedule(
    self,
    schedule_type: str,
    value: str | int,
    **kwargs,
) -> Any:
    """Schedule this pipeline to run automatically.

    Args:
        schedule_type: Type of schedule ('cron', 'interval', 'daily', 'hourly')
        value: Schedule value (cron expression, seconds, 'HH:MM', or minute)
        **kwargs: Additional arguments for scheduler

    Returns:
        Schedule object
    """
    from flowyml.core.scheduler import PipelineScheduler

    scheduler = PipelineScheduler()

    if schedule_type == "cron":
        return scheduler.schedule_cron(self.name, self.run, str(value), **kwargs)
    elif schedule_type == "interval":
        return scheduler.schedule_interval(self.name, self.run, seconds=int(value), **kwargs)
    elif schedule_type == "daily":
        if isinstance(value, str) and ":" in value:
            h, m = map(int, value.split(":"))
            return scheduler.schedule_daily(self.name, self.run, hour=h, minute=m, **kwargs)
        else:
            raise ValueError("Daily schedule value must be 'HH:MM'")
    elif schedule_type == "hourly":
        return scheduler.schedule_hourly(self.name, self.run, minute=int(value), **kwargs)
    else:
        raise ValueError(f"Unknown schedule type: {schedule_type}")

to_definition() -> dict

Serialize pipeline to definition for storage and reconstruction.

Source code in flowyml/core/pipeline.py
def to_definition(self) -> dict:
    """Serialize pipeline to definition for storage and reconstruction."""
    if not self._built:
        self.build()

    return {
        "name": self.name,
        "steps": [
            {
                "name": step.name,
                "inputs": step.inputs,
                "outputs": step.outputs,
                "source_code": step.source_code,
                "tags": step.tags,
                "execution_group": step.execution_group,
            }
            for step in self.steps
        ],
        "dag": {
            "nodes": [
                {
                    "name": node.name,
                    "inputs": node.inputs,
                    "outputs": node.outputs,
                }
                for node in self.dag.nodes.values()
            ],
            "edges": [
                {"source": dep, "target": node_name} for node_name, deps in self.dag.edges.items() for dep in deps
            ],
        },
    }

visualize() -> str

Generate pipeline visualization.

Source code in flowyml/core/pipeline.py
def visualize(self) -> str:
    """Generate pipeline visualization."""
    if not self._built:
        self.build()
    return self.dag.visualize()