Skip to content

Orchestrators API 🎼

Orchestrators manage the execution of pipeline steps.

Base Executor

Base executor for running pipeline steps.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default', all_outputs: dict[str, Any] | None = None) -> ExecutionResult

Execute a single step.

Parameters:

Name Type Description Default
step

Step to execute

required
inputs dict[str, Any]

Input data for the step

required
context_params dict[str, Any]

Parameters from context

required
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for logging results

None
run_id str | None

Unique ID for this pipeline run

None
project_name str

Name of the project

'default'
all_outputs dict[str, Any] | None

Collection of all step outputs for conditional evaluation

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output or error

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
    all_outputs: dict[str, Any] | None = None,
) -> ExecutionResult:
    """Execute a single step.

    Args:
        step: Step to execute
        inputs: Input data for the step
        context_params: Parameters from context
        cache_store: Cache store for caching
        artifact_store: Artifact store for logging results
        run_id: Unique ID for this pipeline run
        project_name: Name of the project
        all_outputs: Collection of all step outputs for conditional evaluation

    Returns:
        ExecutionResult with output or error
    """
    raise NotImplementedError

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together.

Parameters:

Name Type Description Default
step_group

StepGroup to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together.

    Args:
        step_group: StepGroup to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step)
    """
    raise NotImplementedError

Local Executor

Bases: Executor

Local executor - runs steps in the current process.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default', all_outputs: dict[str, Any] | None = None) -> ExecutionResult

Execute step locally with retry, caching, and materialization.

Parameters:

Name Type Description Default
step

Step to execute

required
inputs dict[str, Any]

Input data for the step

required
context_params dict[str, Any]

Parameters from context

required
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for logging results

None
run_id str | None

Unique ID for this pipeline run

None
project_name str

Name of the project

'default'
all_outputs dict[str, Any] | None

Collection of all step outputs for conditional evaluation

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output or error

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
    all_outputs: dict[str, Any] | None = None,
) -> ExecutionResult:
    """Execute step locally with retry, caching, and materialization.

    Args:
        step: Step to execute
        inputs: Input data for the step
        context_params: Parameters from context
        cache_store: Cache store for caching
        artifact_store: Artifact store for logging results
        run_id: Unique ID for this pipeline run
        project_name: Name of the project
        all_outputs: Collection of all step outputs for conditional evaluation

    Returns:
        ExecutionResult with output or error
    """
    start_time = time.time()
    retries = 0

    # Check condition
    if step.condition:
        try:
            # Prepare kwargs for condition: inputs + context_params + all_outputs
            sig = inspect.signature(step.condition)
            kwargs = {**context_params}

            # Add all outputs so far (paths like 'data/processed')
            if all_outputs:
                kwargs.update(all_outputs)
                # Also flatten dict outputs to allow access to keys like 'quality_score'
                for val in all_outputs.values():
                    if isinstance(val, dict):
                        kwargs.update({k: v for k, v in val.items() if k not in kwargs})

            # Add direct inputs (might override all_outputs if paths match)
            kwargs.update(inputs)

            # Filter kwargs to only what condition accepts
            cond_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters}

            should_run = step.condition(**cond_kwargs)

            if not should_run:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=True,
                    output=None,  # Skipped steps produce None
                    duration_seconds=duration,
                    skipped=True,
                )
        except Exception as e:
            # If condition check fails, treat as step failure
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"Condition check failed: {str(e)}",
                duration_seconds=duration,
            )

    # Check cache
    if cache_store and step.cache:
        cache_key = step.get_cache_key(inputs)
        cached_result = cache_store.get(cache_key)

        if cached_result is not None:
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=cached_result,
                duration_seconds=duration,
                cached=True,
            )

    # Execute with retry
    max_retries = step.retry
    last_error = None

    for attempt in range(max_retries + 1):
        try:
            # Prepare arguments
            kwargs = {**inputs, **context_params}

            # Execute step
            monitor_thread = None
            log_capture = None
            original_stdout = None
            original_stderr = None
            try:
                # Start monitoring thread with log capture if run_id is present
                if run_id:
                    import sys

                    log_capture = LogCapture()
                    original_stdout = sys.stdout
                    original_stderr = sys.stderr
                    sys.stdout = log_capture
                    sys.stderr = log_capture

                    monitor_thread = MonitorThread(
                        run_id=run_id,
                        step_name=step.name,
                        target_tid=threading.get_ident(),
                        log_capture=log_capture,
                    )
                    monitor_thread.start()

                # Filter kwargs to only what the function accepts
                func_sig = inspect.signature(step.func)
                # Handle *args/**kwargs if needed, but for now strict matching is safer for steps
                filtered_kwargs = {k: v for k, v in kwargs.items() if k in func_sig.parameters}

                result = step.func(**filtered_kwargs)
            except StopExecution:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=False,
                    error="Execution stopped by user",
                    duration_seconds=duration,
                    retries=retries,
                )
            finally:
                # Restore stdout/stderr
                if original_stdout:
                    import sys

                    sys.stdout = original_stdout
                if original_stderr:
                    import sys

                    sys.stderr = original_stderr

                # Stop monitor thread (only if not already stopped in exception handler)
                if monitor_thread and not monitor_thread._stop_event.is_set():
                    monitor_thread.stop()
                    monitor_thread.join()

            # Materialize output if artifact store is available
            # Only upload if the result is an Asset with upload=True
            artifact_uri = None
            if artifact_store and result is not None and run_id:
                # Check if result is an Asset and respects upload flag
                should_upload = True
                try:
                    from flowyml.assets.base import Asset

                    if isinstance(result, Asset):
                        should_upload = getattr(result, "upload", False)
                except ImportError:
                    pass

                if should_upload:
                    with contextlib.suppress(Exception):
                        artifact_uri = artifact_store.materialize(
                            obj=result,
                            name="output",  # Default name for single output
                            run_id=run_id,
                            step_name=step.name,
                            project_name=project_name,
                        )

            # Type-based artifact routing
            routing_result = None
            try:
                from flowyml.core.routing import route_artifact, should_route

                if should_route(result):
                    # Get return type annotation if available
                    return_type = None
                    try:
                        from flowyml.core.routing import get_step_return_type

                        return_type = get_step_return_type(step.func)
                    except Exception:
                        pass

                    routing_result = route_artifact(
                        output=result,
                        step_name=step.name,
                        run_id=run_id or "local",
                        return_type=return_type,
                        project_name=project_name,
                    )
                    if routing_result and routing_result.store_uri:
                        artifact_uri = routing_result.store_uri
            except ImportError:
                pass  # Routing module not available
            except Exception:
                pass  # Routing failed, continue with normal flow

            # Auto-register in artifact catalog for lineage tracking
            try:
                from flowyml.storage.catalog.manager import ArtifactCatalog

                catalog = ArtifactCatalog()
                catalog.register(
                    name=f"{step.name}_output",
                    artifact_type=type(result).__name__,
                    data=result,
                    source_step=step.name,
                    source_pipeline=project_name,
                    source_run_id=run_id or "local",
                )
            except ImportError:
                pass  # Catalog not available
            except Exception:
                pass  # Catalog registration failed, don't block execution

            # Cache result
            if cache_store and step.cache:
                cache_key = step.get_cache_key(inputs)
                cache_store.set_value(
                    cache_key,
                    result,
                    step.name,
                    step.get_code_hash(),
                )

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=result,
                duration_seconds=duration,
                retries=retries,
                artifact_uri=artifact_uri,
            )

        except Exception as e:
            last_error = str(e)
            error_traceback = traceback.format_exc()
            retries += 1

            if attempt < max_retries:
                # Wait before retry (exponential backoff)
                wait_time = 2**attempt
                time.sleep(wait_time)
                continue

            # All retries exhausted - send error to logs
            if monitor_thread:
                monitor_thread.stop(error=f"{last_error}\n{error_traceback}")
                monitor_thread.join()
                monitor_thread = None  # Prevent double-stop in finally

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"{last_error}\n{error_traceback}",
                duration_seconds=duration,
                retries=retries,
            )

    # Should never reach here
    duration = time.time() - start_time
    return ExecutionResult(
        step_name=step.name,
        success=False,
        error=last_error,
        duration_seconds=duration,
        retries=retries,
    )

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together in the same environment.

For local execution, steps execute sequentially but share the same process.

Parameters:

Name Type Description Default
step_group

StepGroup containing steps to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step in execution order)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup from step_grouping module
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together in the same environment.

    For local execution, steps execute sequentially but share the same process.

    Args:
        step_group: StepGroup containing steps to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step in execution order)
    """
    results: list[ExecutionResult] = []
    step_outputs = dict(inputs)  # Copy initial inputs

    # Execute steps in their defined order
    for step_name in step_group.execution_order:
        # Find the step object
        step = next(s for s in step_group.steps if s.name == step_name)

        # Prepare inputs for this step - map input names to function parameters
        step_inputs = {}

        # Get function signature to properly map inputs to parameters
        sig = inspect.signature(step.func)
        params = list(sig.parameters.values())
        # Filter out self/cls
        params = [p for p in params if p.name not in ("self", "cls")]
        assigned_params = set()

        if step.inputs:
            for i, input_name in enumerate(step.inputs):
                if input_name not in step_outputs:
                    continue

                val = step_outputs[input_name]

                # Check if input name matches a parameter directly
                param_match = next((p for p in params if p.name == input_name), None)

                if param_match:
                    step_inputs[param_match.name] = val
                    assigned_params.add(param_match.name)
                elif i < len(params):
                    # Positional fallback - use the parameter at the same position
                    target_param = params[i]
                    if target_param.name not in assigned_params:
                        step_inputs[target_param.name] = val
                        assigned_params.add(target_param.name)

        # Auto-map parameters from available outputs by name
        for param in params:
            if param.name in step_outputs and param.name not in step_inputs:
                step_inputs[param.name] = step_outputs[param.name]
                assigned_params.add(param.name)

        # Inject context parameters for this specific step
        if context is not None:
            # Use context object to inject params per step
            step_context_params = context.inject_params(step.func)
        elif context_params is not None:
            # Fallback to provided context_params (backward compatibility)
            step_context_params = context_params
        else:
            step_context_params = {}

        # Execute this step
        result = self.execute_step(
            step=step,
            inputs=step_inputs,
            context_params=step_context_params,
            cache_store=cache_store,
            artifact_store=artifact_store,
            run_id=run_id,
            project_name=project_name,
        )

        results.append(result)

        # If step failed, stop group execution
        if not result.success:
            # Mark remaining steps as skipped
            current_index = step_group.execution_order.index(step_name)
            remaining_steps = step_group.execution_order[current_index + 1 :]

            for remaining_name in remaining_steps:
                skip_result = ExecutionResult(
                    step_name=remaining_name,
                    success=True,  # Set to True since skipped steps technically don't fail
                    error="Skipped due to earlier failure in group",
                    skipped=True,
                )
                results.append(skip_result)
            break

        # Store outputs for next steps in group
        if result.output is not None:
            if len(step.outputs) == 1:
                step_outputs[step.outputs[0]] = result.output
            elif isinstance(result.output, (list, tuple)) and len(result.output) == len(
                step.outputs,
            ):
                for name, val in zip(step.outputs, result.output, strict=False):
                    step_outputs[name] = val
            elif isinstance(result.output, dict):
                for name in step.outputs:
                    if name in result.output:
                        step_outputs[name] = result.output[name]
            else:
                if step.outputs:
                    step_outputs[step.outputs[0]] = result.output

    return results

Vertex AI Orchestrator

Bases: RemoteOrchestrator

Vertex AI orchestrator for running pipelines on Google Cloud.

This orchestrator submits pipeline jobs to Vertex AI Pipelines, allowing for scalable, managed execution in the cloud.

Example
1
2
3
4
5
from flowyml.stacks.gcp import VertexAIOrchestrator

orchestrator = VertexAIOrchestrator(
    project_id="my-gcp-project", region="us-central1", service_account="my-sa@my-project.iam.gserviceaccount.com"
)

Initialize Vertex AI orchestrator.

Parameters:

Name Type Description Default
name str

Name of the orchestrator

'vertex_ai'
project_id str | None

GCP project ID

None
region str

GCP region for Vertex AI

'europe-west1'
service_account str | None

Service account email for job execution

None
network str | None

VPC network for jobs

None
encryption_key str | None

Customer-managed encryption key

None
Source code in flowyml/stacks/gcp.py
def __init__(
    self,
    name: str = "vertex_ai",
    project_id: str | None = None,
    region: str = "europe-west1",
    service_account: str | None = None,
    network: str | None = None,
    encryption_key: str | None = None,
):
    """Initialize Vertex AI orchestrator.

    Args:
        name: Name of the orchestrator
        project_id: GCP project ID
        region: GCP region for Vertex AI
        service_account: Service account email for job execution
        network: VPC network for jobs
        encryption_key: Customer-managed encryption key
    """
    super().__init__(name)
    self.project_id = project_id
    self.region = region
    self.service_account = service_account
    self.network = network
    self.encryption_key = encryption_key

Functions

get_run_logs(job_id: str) -> str

Get logs for a Vertex AI job.

Parameters:

Name Type Description Default
job_id str

The job resource name.

required

Returns:

Type Description
str

String containing the logs.

Source code in flowyml/stacks/gcp.py
def get_run_logs(self, job_id: str) -> str:
    """Get logs for a Vertex AI job.

    Args:
        job_id: The job resource name.

    Returns:
        String containing the logs.
    """
    try:
        from google.cloud import logging

        client = logging.Client(project=self.project_id)
        job_name = job_id.split("/")[-1]

        # Filter logs for this job
        # Note: This is a simplified filter; exact filter depends on Vertex AI logging format
        filter_str = f'resource.type="ml_job" AND conversion_id="{job_name}"'

        entries = client.list_entries(
            filter_=filter_str,
            order_by=logging.DESCENDING,
            max_results=100,
        )
        logs = []
        for entry in entries:
            if entry.payload:
                logs.append(str(entry.payload))

        return "\n".join(reversed(logs)) if logs else "No logs found."

    except Exception as e:
        return f"Failed to fetch logs: {e}"

get_run_status(job_id: str) -> ExecutionStatus

Get status of a Vertex AI job.

Source code in flowyml/stacks/gcp.py
def get_run_status(self, job_id: str) -> "ExecutionStatus":
    """Get status of a Vertex AI job."""
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        state = job.state.name

        # Map Vertex AI states to ExecutionStatus
        status_map = {
            "JOB_STATE_QUEUED": ExecutionStatus.PROVISIONING,
            "JOB_STATE_PENDING": ExecutionStatus.PROVISIONING,
            "JOB_STATE_RUNNING": ExecutionStatus.RUNNING,
            "JOB_STATE_SUCCEEDED": ExecutionStatus.COMPLETED,
            "JOB_STATE_FAILED": ExecutionStatus.FAILED,
            "JOB_STATE_CANCELLING": ExecutionStatus.STOPPING,
            "JOB_STATE_CANCELLED": ExecutionStatus.CANCELLED,
        }
        return status_map.get(state, ExecutionStatus.RUNNING)
    except Exception as e:
        print(f"Error fetching job status: {e}")
        return ExecutionStatus.FAILED

run_pipeline(pipeline: Any, run_id: str, resources: ResourceConfig | None = None, docker_config: DockerConfig | None = None, inputs: dict[str, Any] | None = None, context: dict[str, Any] | None = None, **kwargs: Any) -> SubmissionResult

Run pipeline on Vertex AI.

Parameters:

Name Type Description Default
pipeline Any

Pipeline to run

required
run_id str

Run identifier

required
resources ResourceConfig | None

Resource configuration

None
docker_config DockerConfig | None

Docker configuration

None
inputs dict[str, Any] | None

Input data

None
context dict[str, Any] | None

Context variables

None
**kwargs Any

Additional arguments

{}

Returns:

Type Description
SubmissionResult

SubmissionResult with job resource name

Source code in flowyml/stacks/gcp.py
def run_pipeline(
    self,
    pipeline: Any,
    run_id: str,
    resources: ResourceConfig | None = None,
    docker_config: DockerConfig | None = None,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    **kwargs: Any,
) -> "SubmissionResult":
    """Run pipeline on Vertex AI.

    Args:
        pipeline: Pipeline to run
        run_id: Run identifier
        resources: Resource configuration
        docker_config: Docker configuration
        inputs: Input data
        context: Context variables
        **kwargs: Additional arguments

    Returns:
        SubmissionResult with job resource name
    """
    from google.cloud import aiplatform
    import time

    # Initialize Vertex AI
    aiplatform.init(project=self.project_id, location=self.region)

    # Create custom job
    job_display_name = f"{pipeline.name}-{run_id[:8]}"

    # Build worker pool specs
    worker_pool_specs = self._build_worker_pool_specs(
        docker_config=docker_config,
        resources=resources,
    )

    # Create and run custom job
    job = aiplatform.CustomJob(
        display_name=job_display_name,
        worker_pool_specs=worker_pool_specs,
        service_account=self.service_account,
        network=self.network,
        encryption_spec_key_name=self.encryption_key,
    )

    # Submit job asynchronously
    job.submit()

    job_id = job.resource_name

    # Create wait function
    def wait_for_completion():
        """Poll job status until completion."""
        while True:
            status = self.get_run_status(job_id)
            if status.is_finished:
                if not status.is_successful:
                    raise RuntimeError(f"Vertex AI job {job_id} failed with status: {status}")
                break
            time.sleep(15)  # Poll every 15 seconds

    return SubmissionResult(
        job_id=job_id,
        wait_for_completion=wait_for_completion,
        metadata={
            "platform": "vertex_ai",
            "project": self.project_id,
            "region": self.region,
            "job_name": job_display_name,
        },
    )

stop_run(job_id: str, graceful: bool = True) -> None

Cancel a Vertex AI job.

Source code in flowyml/stacks/gcp.py
def stop_run(self, job_id: str, graceful: bool = True) -> None:
    """Cancel a Vertex AI job."""
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        job.cancel()
    except Exception as e:
        print(f"Error cancelling job {job_id}: {e}")
        raise

to_dict() -> dict[str, Any]

Convert to dictionary.

Source code in flowyml/stacks/gcp.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "name": self.name,
        "type": "vertex_ai",
        "project_id": self.project_id,
        "region": self.region,
        "service_account": self.service_account,
        "network": self.network,
    }

validate() -> bool

Validate Vertex AI configuration.

Source code in flowyml/stacks/gcp.py
def validate(self) -> bool:
    """Validate Vertex AI configuration."""
    if not self.project_id:
        raise ValueError("project_id is required for VertexAIOrchestrator")

    # Check if google-cloud-aiplatform is installed
    import importlib.util

    if importlib.util.find_spec("google.cloud.aiplatform") is not None:
        return True
    raise ImportError(
        "google-cloud-aiplatform is required for VertexAIOrchestrator. "
        "Install with: pip install google-cloud-aiplatform",
    )