Skip to content

Core API Reference

Pipeline

flowyml.core.pipeline.Pipeline(name: str, context: Context | None = None, executor: Executor | None = None, enable_cache: bool = True, enable_checkpointing: bool | None = None, enable_experiment_tracking: bool | None = None, cache_dir: str | None = None, stack: Any | None = None, project: str | None = None, project_name: str | None = None, version: str | None = None, **kwargs)

Main pipeline class for orchestrating ML workflows.

Example

from flowyml import Pipeline, step, context ctx = context(learning_rate=0.001, epochs=10) @step(outputs=["model/trained"]) ... def train(learning_rate: float, epochs: int): ... return train_model(learning_rate, epochs) pipeline = Pipeline("my_pipeline", context=ctx) pipeline.add_step(train) result = pipeline.run()

With project_name, automatically creates/attaches to project

pipeline = Pipeline("my_pipeline", context=ctx, project_name="ml_project")

With version parameter, automatically creates VersionedPipeline

pipeline = Pipeline("my_pipeline", context=ctx, version="v1.0.1", project_name="ml_project")

Initialize pipeline.

Parameters:

Name Type Description Default
name str

Name of the pipeline

required
context Context | None

Optional context for parameter injection

None
executor Executor | None

Optional executor (defaults to LocalExecutor)

None
enable_cache bool

Whether to enable caching

True
enable_checkpointing bool | None

Whether to enable checkpointing (defaults to config setting, True by default)

None
enable_experiment_tracking bool | None

Whether to enable automatic experiment tracking (defaults to config.auto_log_metrics, True by default)

None
cache_dir str | None

Optional directory for cache

None
stack Any | None

Optional stack instance to run on

None
project str | None

Optional project name to attach this pipeline to (deprecated, use project_name)

None
project_name str | None

Optional project name to attach this pipeline to. If the project doesn't exist, it will be created automatically.

None
version str | None

Optional version string. If provided, a VersionedPipeline instance will be created instead of a regular Pipeline.

None
**kwargs

Additional keyword arguments passed to the pipeline. instance is automatically created instead of a regular Pipeline.

{}
Source code in flowyml/core/pipeline.py
def __init__(
    self,
    name: str,
    context: Context | None = None,
    executor: Executor | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool | None = None,  # None means use config default
    enable_experiment_tracking: bool | None = None,  # None means use config default (True)
    cache_dir: str | None = None,
    stack: Any | None = None,  # Stack instance
    project: str | None = None,  # Project name to attach to (deprecated, use project_name)
    project_name: str | None = None,  # Project name to attach to (creates if doesn't exist)
    version: str | None = None,  # If provided, VersionedPipeline is created via __new__
    **kwargs,
):
    """Initialize pipeline.

    Args:
        name: Name of the pipeline
        context: Optional context for parameter injection
        executor: Optional executor (defaults to LocalExecutor)
        enable_cache: Whether to enable caching
        enable_checkpointing: Whether to enable checkpointing (defaults to config setting, True by default)
        enable_experiment_tracking: Whether to enable automatic experiment tracking (defaults to config.auto_log_metrics, True by default)
        cache_dir: Optional directory for cache
        stack: Optional stack instance to run on
        project: Optional project name to attach this pipeline to (deprecated, use project_name)
        project_name: Optional project name to attach this pipeline to.
            If the project doesn't exist, it will be created automatically.
        version: Optional version string. If provided, a VersionedPipeline
            instance will be created instead of a regular Pipeline.
        **kwargs: Additional keyword arguments passed to the pipeline.
            instance is automatically created instead of a regular Pipeline.
    """
    from flowyml.utils.config import get_config

    self.name = name
    self.context = context or Context()
    self.enable_cache = enable_cache

    # Set checkpointing (use config default if not specified)
    config = get_config()
    self.enable_checkpointing = (
        enable_checkpointing if enable_checkpointing is not None else config.enable_checkpointing
    )

    # Set experiment tracking (use config default if not specified, default: True)
    # Can be set via enable_experiment_tracking parameter or defaults to config.auto_log_metrics
    self.enable_experiment_tracking = (
        enable_experiment_tracking if enable_experiment_tracking is not None else config.auto_log_metrics
    )
    self.stack = None  # Will be assigned via _apply_stack
    self._stack_locked = stack is not None
    self._provided_executor = executor

    self.steps: list[Step] = []
    self.dag = DAG()

    # Storage
    if cache_dir is None:
        from flowyml.utils.config import get_config

        cache_dir = str(get_config().cache_dir)

    self.cache_store = CacheStore(cache_dir) if enable_cache else None

    from flowyml.utils.config import get_config

    self.runs_dir = get_config().runs_dir
    self.runs_dir.mkdir(parents=True, exist_ok=True)

    # Initialize components from stack or defaults
    self.executor = executor or LocalExecutor()
    # Metadata store for UI integration - use same store as UI
    from flowyml.storage.metadata import SQLiteMetadataStore
    from flowyml.utils.config import get_config

    config = get_config()
    # Use the same metadata database path as the UI to ensure visibility
    self.metadata_store = SQLiteMetadataStore(db_path=str(config.metadata_db))

    if stack:
        self._apply_stack(stack, locked=True)

    # Handle Project Attachment
    # Support both project_name (preferred) and project (for backward compatibility)
    project_to_use = project_name or project
    if project_to_use:
        from flowyml.core.project import ProjectManager

        manager = ProjectManager()
        # Get or create project
        proj = manager.get_project(project_to_use)
        if not proj:
            proj = manager.create_project(project_to_use)

        # Configure pipeline with project settings
        self.runs_dir = proj.runs_dir
        self.metadata_store = proj.metadata_store

        # Register pipeline with project
        if name not in proj.metadata["pipelines"]:
            proj.metadata["pipelines"].append(name)
            proj._save_metadata()

        # Store project name for later use (e.g., in _save_run)
        self.project_name = project_to_use
    else:
        self.project_name = None

    # State
    self._built = False
    self.step_groups: list[Any] = []  # Will hold StepGroup objects
    self.control_flows: list[Any] = []  # Store conditional control flows (If, Switch, etc.)

Functions

__new__(name: str, version: str | None = None, project_name: str | None = None, project: str | None = None, **kwargs)

Create a Pipeline or VersionedPipeline instance.

If version is provided, automatically returns a VersionedPipeline instance. Otherwise, returns a regular Pipeline instance.

Source code in flowyml/core/pipeline.py
def __new__(
    cls,
    name: str,
    version: str | None = None,
    project_name: str | None = None,
    project: str | None = None,  # For backward compatibility
    **kwargs,
):
    """Create a Pipeline or VersionedPipeline instance.

    If version is provided, automatically returns a VersionedPipeline instance.
    Otherwise, returns a regular Pipeline instance.
    """
    if version is not None:
        from flowyml.core.versioning import VersionedPipeline

        # Pass project_name or project to VersionedPipeline
        vp_kwargs = kwargs.copy()
        if project_name:
            vp_kwargs["project_name"] = project_name
        elif project:
            vp_kwargs["project"] = project
        return VersionedPipeline(name=name, version=version, **vp_kwargs)
    return super().__new__(cls)

add_control_flow(control_flow: Any) -> Pipeline

Add conditional control flow to the pipeline.

Parameters:

Name Type Description Default
control_flow Any

Control flow object (If, Switch, etc.)

required

Returns:

Type Description
Pipeline

Self for chaining

Example
from flowyml import If

pipeline.add_control_flow(
    If(
        condition=lambda ctx: ctx.steps["evaluate_model"].outputs["accuracy"] > 0.9,
        then_step=deploy_model,
        else_step=retrain_model,
    )
)
Source code in flowyml/core/pipeline.py
def add_control_flow(self, control_flow: Any) -> "Pipeline":
    """Add conditional control flow to the pipeline.

    Args:
        control_flow: Control flow object (If, Switch, etc.)

    Returns:
        Self for chaining

    Example:
        ```python
        from flowyml import If

        pipeline.add_control_flow(
            If(
                condition=lambda ctx: ctx.steps["evaluate_model"].outputs["accuracy"] > 0.9,
                then_step=deploy_model,
                else_step=retrain_model,
            )
        )
        ```
    """
    self.control_flows.append(control_flow)
    self._built = False
    return self

add_step(step: Step) -> Pipeline

Add a step to the pipeline.

Parameters:

Name Type Description Default
step Step

Step to add

required

Returns:

Type Description
Pipeline

Self for chaining

Source code in flowyml/core/pipeline.py
def add_step(self, step: Step) -> "Pipeline":
    """Add a step to the pipeline.

    Args:
        step: Step to add

    Returns:
        Self for chaining
    """
    self.steps.append(step)
    self._built = False
    return self

build() -> None

Build the execution DAG.

Source code in flowyml/core/pipeline.py
def build(self) -> None:
    """Build the execution DAG."""
    if self._built:
        return

    # Clear previous DAG
    self.dag = DAG()

    # Add nodes
    for step in self.steps:
        node = Node(
            name=step.name,
            step=step,
            inputs=step.inputs,
            outputs=step.outputs,
        )
        self.dag.add_node(node)

    # Build edges
    self.dag.build_edges()

    # Validate
    errors = self.dag.validate()
    if errors:
        raise ValueError("Pipeline validation failed:\n" + "\n".join(errors))

    # Analyze step groups
    from flowyml.core.step_grouping import StepGroupAnalyzer

    analyzer = StepGroupAnalyzer()
    self.step_groups = analyzer.analyze_groups(self.dag, self.steps)

    self._built = True

cache_stats() -> dict[str, Any]

Get cache statistics.

Source code in flowyml/core/pipeline.py
def cache_stats(self) -> dict[str, Any]:
    """Get cache statistics."""
    if self.cache_store:
        return self.cache_store.stats()
    return {}

check_cache() -> dict[str, Any] | None

Check if a successful run of this pipeline already exists.

Returns:

Type Description
dict[str, Any] | None

Metadata of the last successful run, or None if not found.

Source code in flowyml/core/pipeline.py
def check_cache(self) -> dict[str, Any] | None:
    """Check if a successful run of this pipeline already exists.

    Returns:
        Metadata of the last successful run, or None if not found.
    """
    # Query metadata store for successful runs of this pipeline
    try:
        runs = self.metadata_store.query(
            pipeline_name=self.name,
            status="completed",
        )

        if runs:
            # Return the most recent one (query returns ordered by created_at DESC)
            return runs[0]
    except Exception as e:
        # Don't fail if metadata store is not available or errors
        print(f"Warning: Failed to check cache: {e}")

    return None

from_definition(definition: dict, context: Context | None = None) -> Pipeline classmethod

Reconstruct pipeline from stored definition.

This creates a "ghost" pipeline that can be executed but uses the stored step structure. Actual step logic must still be available in the codebase.

Parameters:

Name Type Description Default
definition dict

Pipeline definition from to_definition()

required
context Context | None

Optional context for execution

None

Returns:

Type Description
Pipeline

Reconstructed Pipeline instance

Source code in flowyml/core/pipeline.py
@classmethod
def from_definition(cls, definition: dict, context: Context | None = None) -> "Pipeline":
    """Reconstruct pipeline from stored definition.

    This creates a "ghost" pipeline that can be executed but uses
    the stored step structure. Actual step logic must still be
    available in the codebase.

    Args:
        definition: Pipeline definition from to_definition()
        context: Optional context for execution

    Returns:
        Reconstructed Pipeline instance
    """
    from flowyml.core.step import step as step_decorator

    # Create pipeline instance
    pipeline = cls(
        name=definition["name"],
        context=context or Context(),
    )

    # Reconstruct steps
    for step_def in definition["steps"]:
        # Create a generic step function that can be called
        # In a real implementation, we'd need to either:
        # 1. Store serialized functions (using cloudpickle)
        # 2. Import functions by name from codebase
        # 3. Use placeholder functions

        # For now, we'll create a placeholder that logs execution
        def generic_step_func(*args, **kwargs):
            """Generic step function for reconstructed pipeline."""
            print(f"Executing reconstructed step with args={args}, kwargs={kwargs}")
            return

        # Apply step decorator with stored metadata
        decorated = step_decorator(
            name=step_def["name"],
            inputs=step_def["inputs"],
            outputs=step_def["outputs"],
            tags=step_def.get("tags", []),
        )(generic_step_func)

        # Add to pipeline
        pipeline.add_step(decorated)

    return pipeline

invalidate_cache(step: str | None = None, before: str | None = None) -> None

Invalidate cache entries.

Parameters:

Name Type Description Default
step str | None

Invalidate cache for specific step

None
before str | None

Invalidate cache entries before date

None
Source code in flowyml/core/pipeline.py
def invalidate_cache(
    self,
    step: str | None = None,
    before: str | None = None,
) -> None:
    """Invalidate cache entries.

    Args:
        step: Invalidate cache for specific step
        before: Invalidate cache entries before date
    """
    if self.cache_store:
        if step:
            self.cache_store.invalidate(step_name=step)
        else:
            self.cache_store.clear()

run(inputs: dict[str, Any] | None = None, debug: bool = False, stack: Any | None = None, orchestrator: Any | None = None, resources: Any | None = None, docker_config: Any | None = None, context: dict[str, Any] | None = None, auto_start_ui: bool = True, **kwargs) -> PipelineResult

Execute the pipeline.

Parameters:

Name Type Description Default
inputs dict[str, Any] | None

Optional input data for the pipeline

None
debug bool

Enable debug mode with detailed logging

False
stack Any | None

Stack override (uses self.stack or active stack if not provided)

None
orchestrator Any | None

Orchestrator override (takes precedence over stack orchestrator)

None
resources Any | None

Resource configuration for execution

None
docker_config Any | None

Docker configuration for containerized execution

None
context dict[str, Any] | None

Context variables override

None
auto_start_ui bool

Automatically start UI server if not running and display URL

True
**kwargs

Additional arguments passed to the orchestrator

{}
Note

The orchestrator is determined in this priority order: 1. Explicit orchestrator parameter (if provided) 2. Stack's orchestrator (if stack is set/active) 3. Default LocalOrchestrator

When using a stack (e.g., GCPStack), the stack's orchestrator is automatically used unless explicitly overridden. This is the recommended approach for production deployments.

Returns:

Type Description
PipelineResult

PipelineResult with outputs and execution info

Source code in flowyml/core/pipeline.py
def run(
    self,
    inputs: dict[str, Any] | None = None,
    debug: bool = False,
    stack: Any | None = None,  # Stack override
    orchestrator: Any | None = None,  # Orchestrator override (takes precedence over stack orchestrator)
    resources: Any | None = None,  # ResourceConfig
    docker_config: Any | None = None,  # DockerConfig
    context: dict[str, Any] | None = None,  # Context vars override
    auto_start_ui: bool = True,  # Auto-start UI server
    **kwargs,
) -> PipelineResult:
    """Execute the pipeline.

    Args:
        inputs: Optional input data for the pipeline
        debug: Enable debug mode with detailed logging
        stack: Stack override (uses self.stack or active stack if not provided)
        orchestrator: Orchestrator override (takes precedence over stack orchestrator)
        resources: Resource configuration for execution
        docker_config: Docker configuration for containerized execution
        context: Context variables override
        auto_start_ui: Automatically start UI server if not running and display URL
        **kwargs: Additional arguments passed to the orchestrator

    Note:
        The orchestrator is determined in this priority order:
        1. Explicit `orchestrator` parameter (if provided)
        2. Stack's orchestrator (if stack is set/active)
        3. Default LocalOrchestrator

        When using a stack (e.g., GCPStack), the stack's orchestrator is automatically
        used unless explicitly overridden. This is the recommended approach for
        production deployments.

    Returns:
        PipelineResult with outputs and execution info
    """
    import uuid
    from flowyml.core.orchestrator import LocalOrchestrator
    from flowyml.core.checkpoint import PipelineCheckpoint
    from flowyml.utils.config import get_config

    # Generate or use provided run_id
    run_id = kwargs.pop("run_id", None) or str(uuid.uuid4())

    # Initialize checkpointing if enabled
    if self.enable_checkpointing:
        config = get_config()
        checkpoint = PipelineCheckpoint(
            run_id=run_id,
            checkpoint_dir=str(config.checkpoint_dir),
        )

        # Check if we should resume from checkpoint
        if checkpoint.exists():
            checkpoint_data = checkpoint.load()
            completed_steps = checkpoint_data.get("completed_steps", [])
            if completed_steps:
                # Auto-resume: use checkpoint state
                if hasattr(self, "_display") and self._display:
                    self._display.console.print(
                        f"[yellow]📦 Resuming from checkpoint: {len(completed_steps)} steps already completed[/yellow]",
                    )
                # Store checkpoint info for orchestrator
                self._checkpoint = checkpoint
                self._resume_from_checkpoint = True
                self._completed_steps_from_checkpoint = set(completed_steps)
            else:
                self._checkpoint = checkpoint
                self._resume_from_checkpoint = False
                self._completed_steps_from_checkpoint = set()
        else:
            self._checkpoint = checkpoint
            self._resume_from_checkpoint = False
            self._completed_steps_from_checkpoint = set()
    else:
        self._checkpoint = None
        self._resume_from_checkpoint = False
        self._completed_steps_from_checkpoint = set()

    # Auto-start UI server if requested
    ui_url = None
    run_url = None
    ui_start_failed = False
    if auto_start_ui:
        ui_url, run_url, ui_start_failed = self._ensure_ui_server(run_id)

    # Determine stack for this run
    if stack is not None:
        self._apply_stack(stack, locked=True)
    elif not self._stack_locked:
        active_stack = None
        try:
            from flowyml.stacks.registry import get_active_stack
        except ImportError:
            get_active_stack = None
        if get_active_stack:
            active_stack = get_active_stack()
        if active_stack:
            self._apply_stack(active_stack, locked=False)

    # Determine orchestrator
    # Priority: 1) Explicit orchestrator parameter, 2) Stack orchestrator, 3) Default LocalOrchestrator
    if orchestrator is None:
        # Use orchestrator from stack if available
        orchestrator = getattr(self.stack, "orchestrator", None) if self.stack else None
        if orchestrator is None:
            orchestrator = LocalOrchestrator()

    # Update context with provided values
    if context:
        self.context.update(context)

    # Build DAG if needed
    if not self._built:
        self.build()

    resource_config = self._coerce_resource_config(resources)
    docker_cfg = self._coerce_docker_config(docker_config)

    # Initialize display system for beautiful CLI output
    display = None
    try:
        from flowyml.core.display import PipelineDisplay

        display = PipelineDisplay(
            pipeline_name=self.name,
            steps=self.steps,
            dag=self.dag,
            verbose=True,
            ui_url=ui_url,  # Pass UI URL for prominent display at start
            run_url=run_url,  # Pass run-specific URL for clickable link
        )
        display.show_header()
        display.show_execution_start()
    except Exception:
        # Silently fail if display system not available
        pass

    # Store display on pipeline for orchestrator to use
    self._display = display

    # Run the pipeline via orchestrator
    result = orchestrator.run_pipeline(
        self,
        run_id=run_id,
        resources=resource_config,
        docker_config=docker_cfg,
        inputs=inputs,
        context=context,
        **kwargs,
    )

    # Show summary (only if result is a PipelineResult, not a string)
    if display and not isinstance(result, str):
        display.show_summary(result, ui_url=ui_url, run_url=run_url)

    # If result is just a job ID (remote execution), wrap it in a basic result
    if isinstance(result, str):
        # Create a submitted result wrapper
        wrapper = PipelineResult(run_id, self.name)
        wrapper.attach_configs(resource_config, docker_cfg)
        wrapper.mark_submitted(result)
        self._save_run(wrapper)
        self._save_pipeline_definition()
        return wrapper

    # Ensure result has configs attached (in case orchestrator didn't do it)
    if hasattr(result, "attach_configs") and not hasattr(result, "resource_config"):
        result.attach_configs(resource_config, docker_cfg)

    return result

schedule(schedule_type: str, value: str | int, **kwargs) -> Any

Schedule this pipeline to run automatically.

Parameters:

Name Type Description Default
schedule_type str

Type of schedule ('cron', 'interval', 'daily', 'hourly')

required
value str | int

Schedule value (cron expression, seconds, 'HH:MM', or minute)

required
**kwargs

Additional arguments for scheduler

{}

Returns:

Type Description
Any

Schedule object

Source code in flowyml/core/pipeline.py
def schedule(
    self,
    schedule_type: str,
    value: str | int,
    **kwargs,
) -> Any:
    """Schedule this pipeline to run automatically.

    Args:
        schedule_type: Type of schedule ('cron', 'interval', 'daily', 'hourly')
        value: Schedule value (cron expression, seconds, 'HH:MM', or minute)
        **kwargs: Additional arguments for scheduler

    Returns:
        Schedule object
    """
    from flowyml.core.scheduler import PipelineScheduler

    scheduler = PipelineScheduler()

    if schedule_type == "cron":
        return scheduler.schedule_cron(self.name, self.run, str(value), **kwargs)
    elif schedule_type == "interval":
        return scheduler.schedule_interval(self.name, self.run, seconds=int(value), **kwargs)
    elif schedule_type == "daily":
        if isinstance(value, str) and ":" in value:
            h, m = map(int, value.split(":"))
            return scheduler.schedule_daily(self.name, self.run, hour=h, minute=m, **kwargs)
        else:
            raise ValueError("Daily schedule value must be 'HH:MM'")
    elif schedule_type == "hourly":
        return scheduler.schedule_hourly(self.name, self.run, minute=int(value), **kwargs)
    else:
        raise ValueError(f"Unknown schedule type: {schedule_type}")

to_definition() -> dict

Serialize pipeline to definition for storage and reconstruction.

Source code in flowyml/core/pipeline.py
def to_definition(self) -> dict:
    """Serialize pipeline to definition for storage and reconstruction."""
    if not self._built:
        self.build()

    return {
        "name": self.name,
        "steps": [
            {
                "name": step.name,
                "inputs": step.inputs,
                "outputs": step.outputs,
                "source_code": step.source_code,
                "tags": step.tags,
                "execution_group": step.execution_group,
            }
            for step in self.steps
        ],
        "dag": {
            "nodes": [
                {
                    "name": node.name,
                    "inputs": node.inputs,
                    "outputs": node.outputs,
                }
                for node in self.dag.nodes.values()
            ],
            "edges": [
                {"source": dep, "target": node_name} for node_name, deps in self.dag.edges.items() for dep in deps
            ],
        },
    }

visualize() -> str

Generate pipeline visualization.

Source code in flowyml/core/pipeline.py
def visualize(self) -> str:
    """Generate pipeline visualization."""
    if not self._built:
        self.build()
    return self.dag.visualize()

Step

flowyml.core.step.Step(func: Callable, name: str | None = None, inputs: list[str] | None = None, outputs: list[str] | None = None, cache: bool | str | Callable = 'code_hash', retry: int = 0, timeout: int | None = None, resources: Union[dict[str, Any], ResourceRequirements, None] = None, tags: dict[str, str] | None = None, condition: Callable | None = None, execution_group: str | None = None)

A pipeline step that can be executed with automatic context injection.

Source code in flowyml/core/step.py
def __init__(
    self,
    func: Callable,
    name: str | None = None,
    inputs: list[str] | None = None,
    outputs: list[str] | None = None,
    cache: bool | str | Callable = "code_hash",
    retry: int = 0,
    timeout: int | None = None,
    resources: Union[dict[str, Any], "ResourceRequirements", None] = None,
    tags: dict[str, str] | None = None,
    condition: Callable | None = None,
    execution_group: str | None = None,
):
    self.func = func
    self.name = name or func.__name__
    self.inputs = inputs or []
    self.outputs = outputs or []
    self.cache = cache
    self.retry = retry
    self.timeout = timeout

    # Store resources (accept both dict for backward compatibility and ResourceRequirements)
    self.resources = resources
    if self.resources and ResourceRequirements and not isinstance(self.resources, ResourceRequirements):
        if isinstance(self.resources, dict):
            resource_kwargs = dict(self.resources)
            gpu_value = resource_kwargs.get("gpu")
            if GPUConfig and gpu_value is not None:
                if isinstance(gpu_value, dict):
                    resource_kwargs["gpu"] = GPUConfig(
                        gpu_type=gpu_value.get("gpu_type") or gpu_value.get("type") or "generic",
                        count=int(gpu_value.get("count", 1)),
                        memory=gpu_value.get("memory"),
                    )
                elif isinstance(gpu_value, (int, float)):
                    resource_kwargs["gpu"] = GPUConfig(gpu_type="generic", count=int(gpu_value))
            with contextlib.suppress(TypeError):
                self.resources = ResourceRequirements(**resource_kwargs)

    self.tags = tags or {}
    self.condition = condition
    self.execution_group = execution_group

    # Capture source code for UI display
    try:
        self.source_code = inspect.getsource(func)
    except (OSError, TypeError):
        self.source_code = "# Source code not available"

    self.config = StepConfig(
        name=self.name,
        func=func,
        inputs=self.inputs,
        outputs=self.outputs,
        cache=self.cache,
        retry=self.retry,
        timeout=self.timeout,
        resources=self.resources,
        tags=self.tags,
        condition=self.condition,
        execution_group=self.execution_group,
    )

Functions

__call__(*args, **kwargs)

Execute the step function.

Source code in flowyml/core/step.py
def __call__(self, *args, **kwargs):
    """Execute the step function."""
    # Check condition if present
    if self.condition:
        # We might need to inject context into condition too,
        # but for now assume it takes no args or same args as step?
        # This is tricky without context injection logic here.
        # The executor handles execution, so maybe we just store it here.
        pass

    return self.func(*args, **kwargs)

get_cache_key(inputs: dict[str, Any] | None = None) -> str

Generate cache key based on caching strategy.

Parameters:

Name Type Description Default
inputs dict[str, Any] | None

Input data for the step

None

Returns:

Type Description
str

Cache key string

Source code in flowyml/core/step.py
def get_cache_key(self, inputs: dict[str, Any] | None = None) -> str:
    """Generate cache key based on caching strategy.

    Args:
        inputs: Input data for the step

    Returns:
        Cache key string
    """
    if self.cache == "code_hash":
        return f"{self.name}:{self.get_code_hash()}"
    elif self.cache == "input_hash" and inputs:
        return f"{self.name}:{self.get_input_hash(inputs)}"
    elif callable(self.cache) and inputs:
        return self.cache(inputs, {})
    else:
        return f"{self.name}:no-cache"

get_code_hash() -> str

Compute hash of the step's source code.

Source code in flowyml/core/step.py
def get_code_hash(self) -> str:
    """Compute hash of the step's source code."""
    try:
        source = inspect.getsource(self.func)
        return hashlib.md5(source.encode()).hexdigest()
    except (OSError, TypeError):
        # Fallback for dynamically defined functions or when source is unavailable
        return hashlib.md5(self.name.encode()).hexdigest()[:16]

get_input_hash(inputs: dict[str, Any]) -> str

Generate hash of inputs for caching.

Source code in flowyml/core/step.py
def get_input_hash(self, inputs: dict[str, Any]) -> str:
    """Generate hash of inputs for caching."""
    input_str = json.dumps(inputs, sort_keys=True, default=str)
    return hashlib.sha256(input_str.encode()).hexdigest()[:16]

Context

flowyml.core.context.Context(**kwargs)

Pipeline context with automatic parameter injection.

Example

ctx = Context(learning_rate=0.001, epochs=10, batch_size=32, device="cuda")

Source code in flowyml/core/context.py
def __init__(self, **kwargs):
    self._params = kwargs
    self._parent = None
    self._metadata = {}

Functions

__getattr__(name: str) -> Any

Allow dot notation access to parameters.

Source code in flowyml/core/context.py
def __getattr__(self, name: str) -> Any:
    """Allow dot notation access to parameters."""
    if name.startswith("_"):
        return super().__getattribute__(name)

    if name in self._params:
        return self._params[name]

    if self._parent and name in self._parent._params:
        return self._parent._params[name]

    raise AttributeError(f"Context has no parameter '{name}'")

__getitem__(key: str) -> Any

Allow dict-style access to parameters.

Source code in flowyml/core/context.py
def __getitem__(self, key: str) -> Any:
    """Allow dict-style access to parameters."""
    if key in self._params:
        return self._params[key]

    if self._parent and key in self._parent._params:
        return self._parent._params[key]

    raise KeyError(f"Context has no parameter '{key}'")

get(key: str, default: Any = None) -> Any

Get parameter with default value.

Source code in flowyml/core/context.py
def get(self, key: str, default: Any = None) -> Any:
    """Get parameter with default value."""
    try:
        return self[key]
    except KeyError:
        return default

inherit(**overrides) -> Context

Create child context with inheritance.

Source code in flowyml/core/context.py
def inherit(self, **overrides) -> "Context":
    """Create child context with inheritance."""
    child = Context(**overrides)
    child._parent = self
    return child

inject_params(func: callable) -> dict[str, Any]

Automatically inject parameters based on function signature.

Parameters:

Name Type Description Default
func callable

Function to analyze and inject parameters for

required

Returns:

Type Description
dict[str, Any]

Dictionary of parameters to inject

Source code in flowyml/core/context.py
def inject_params(self, func: callable) -> dict[str, Any]:
    """Automatically inject parameters based on function signature.

    Args:
        func: Function to analyze and inject parameters for

    Returns:
        Dictionary of parameters to inject
    """
    sig = inspect.signature(func)
    injected = {}

    for param_name, param in sig.parameters.items():
        # Skip self, cls, args, kwargs
        if param_name in ("self", "cls"):
            continue
        if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
            continue

        # Check if parameter exists in context
        if param_name in self.keys():
            injected[param_name] = self[param_name]

    return injected

items() -> list[tuple]

Return all parameter items.

Source code in flowyml/core/context.py
def items(self) -> list[tuple]:
    """Return all parameter items."""
    result = {}
    if self._parent:
        result.update(dict(self._parent.items()))
    result.update(self._params)
    return list(result.items())

keys() -> set[str]

Return all parameter keys.

Source code in flowyml/core/context.py
def keys(self) -> set[str]:
    """Return all parameter keys."""
    keys = set(self._params.keys())
    if self._parent:
        keys.update(self._parent.keys())
    return keys

to_dict() -> dict[str, Any]

Convert context to dictionary.

Source code in flowyml/core/context.py
def to_dict(self) -> dict[str, Any]:
    """Convert context to dictionary."""
    result = {}
    if self._parent:
        result.update(self._parent.to_dict())
    result.update(self._params)
    return result

update(data: dict[str, Any]) -> None

Update context with new data.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary of key-value pairs to add to context

required
Source code in flowyml/core/context.py
def update(self, data: dict[str, Any]) -> None:
    """Update context with new data.

    Args:
        data: Dictionary of key-value pairs to add to context
    """
    self._params.update(data)

validate_for_step(step_func: callable, exclude: list[str] = None) -> list[str]

Validate that all required parameters are available.

Parameters:

Name Type Description Default
step_func callable

Step function to validate

required
exclude list[str]

List of parameter names to exclude from validation (e.g. inputs)

None

Returns:

Type Description
list[str]

List of missing required parameters

Source code in flowyml/core/context.py
def validate_for_step(self, step_func: callable, exclude: list[str] = None) -> list[str]:
    """Validate that all required parameters are available.

    Args:
        step_func: Step function to validate
        exclude: List of parameter names to exclude from validation (e.g. inputs)

    Returns:
        List of missing required parameters
    """
    sig = inspect.signature(step_func)
    missing = []
    exclude = exclude or []

    for param_name, param in sig.parameters.items():
        # Skip optional parameters
        if param_name in ("self", "cls"):
            continue
        if param_name in exclude:
            continue
        if param.default != inspect.Parameter.empty:
            continue
        if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
            continue

        # Check if required param is missing
        if param_name not in self.keys():
            missing.append(param_name)

    return missing