Skip to content

Orchestrators API 🎼

Orchestrators manage the execution of pipeline steps.

Base Executor

Base executor for running pipeline steps.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default', all_outputs: dict[str, Any] | None = None) -> ExecutionResult

Execute a single step.

Parameters:

Name Type Description Default
step

Step to execute

required
inputs dict[str, Any]

Input data for the step

required
context_params dict[str, Any]

Parameters from context

required
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for logging results

None
run_id str | None

Unique ID for this pipeline run

None
project_name str

Name of the project

'default'
all_outputs dict[str, Any] | None

Collection of all step outputs for conditional evaluation

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output or error

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
    all_outputs: dict[str, Any] | None = None,
) -> ExecutionResult:
    """Execute a single step.

    Args:
        step: Step to execute
        inputs: Input data for the step
        context_params: Parameters from context
        cache_store: Cache store for caching
        artifact_store: Artifact store for logging results
        run_id: Unique ID for this pipeline run
        project_name: Name of the project
        all_outputs: Collection of all step outputs for conditional evaluation

    Returns:
        ExecutionResult with output or error
    """
    raise NotImplementedError

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together.

Parameters:

Name Type Description Default
step_group

StepGroup to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together.

    Args:
        step_group: StepGroup to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step)
    """
    raise NotImplementedError

Local Executor

Bases: Executor

Local executor - runs steps in the current process.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default', all_outputs: dict[str, Any] | None = None) -> ExecutionResult

Execute step locally with retry, caching, and materialization.

Parameters:

Name Type Description Default
step

Step to execute

required
inputs dict[str, Any]

Input data for the step

required
context_params dict[str, Any]

Parameters from context

required
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for logging results

None
run_id str | None

Unique ID for this pipeline run

None
project_name str

Name of the project

'default'
all_outputs dict[str, Any] | None

Collection of all step outputs for conditional evaluation

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output or error

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
    all_outputs: dict[str, Any] | None = None,
) -> ExecutionResult:
    """Execute step locally with retry, caching, and materialization.

    Args:
        step: Step to execute
        inputs: Input data for the step
        context_params: Parameters from context
        cache_store: Cache store for caching
        artifact_store: Artifact store for logging results
        run_id: Unique ID for this pipeline run
        project_name: Name of the project
        all_outputs: Collection of all step outputs for conditional evaluation

    Returns:
        ExecutionResult with output or error
    """
    start_time = time.time()
    retries = 0

    # Check condition
    if step.condition:
        try:
            # Prepare kwargs for condition: inputs + context_params + all_outputs
            sig = inspect.signature(step.condition)
            kwargs = {**context_params}

            # Add all outputs so far (paths like 'data/processed')
            if all_outputs:
                kwargs.update(all_outputs)
                # Also flatten dict outputs to allow access to keys like 'quality_score'
                for val in all_outputs.values():
                    if isinstance(val, dict):
                        kwargs.update({k: v for k, v in val.items() if k not in kwargs})

            # Add direct inputs (might override all_outputs if paths match)
            kwargs.update(inputs)

            # Filter kwargs to only what condition accepts
            cond_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters}

            should_run = step.condition(**cond_kwargs)

            if not should_run:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=True,
                    output=None,  # Skipped steps produce None
                    duration_seconds=duration,
                    skipped=True,
                )
        except Exception as e:
            # If condition check fails, treat as step failure
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"Condition check failed: {str(e)}",
                duration_seconds=duration,
            )

    # Check cache
    if cache_store and step.cache:
        cache_key = step.get_cache_key(inputs)
        cached_result = cache_store.get(cache_key)

        if cached_result is not None:
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=cached_result,
                duration_seconds=duration,
                cached=True,
            )

    # Execute with retry
    max_retries = step.retry
    last_error = None

    for attempt in range(max_retries + 1):
        try:
            # Prepare arguments
            kwargs = {**inputs, **context_params}

            # Execute step
            monitor_thread = None
            log_capture = None
            original_stdout = None
            original_stderr = None
            try:
                # Start monitoring thread with log capture if run_id is present
                if run_id:
                    import sys

                    log_capture = LogCapture()
                    original_stdout = sys.stdout
                    original_stderr = sys.stderr
                    sys.stdout = log_capture
                    sys.stderr = log_capture

                    monitor_thread = MonitorThread(
                        run_id=run_id,
                        step_name=step.name,
                        target_tid=threading.get_ident(),
                        log_capture=log_capture,
                    )
                    monitor_thread.start()

                # Filter kwargs to only what the function accepts
                func_sig = inspect.signature(step.func)
                # Handle *args/**kwargs if needed, but for now strict matching is safer for steps
                filtered_kwargs = {k: v for k, v in kwargs.items() if k in func_sig.parameters}

                result = step.func(**filtered_kwargs)
            except StopExecution:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=False,
                    error="Execution stopped by user",
                    duration_seconds=duration,
                    retries=retries,
                )
            finally:
                # Restore stdout/stderr
                if original_stdout:
                    import sys

                    sys.stdout = original_stdout
                if original_stderr:
                    import sys

                    sys.stderr = original_stderr

                # Stop monitor thread (only if not already stopped in exception handler)
                if monitor_thread and not monitor_thread._stop_event.is_set():
                    monitor_thread.stop()
                    monitor_thread.join()

            # Materialize output if artifact store is available
            # Only upload if the result is an Asset with upload=True
            artifact_uri = None
            if artifact_store and result is not None and run_id:
                # Check if result is an Asset and respects upload flag
                should_upload = True
                try:
                    from flowyml.assets.base import Asset

                    if isinstance(result, Asset):
                        should_upload = getattr(result, "upload", False)
                except ImportError:
                    pass

                if should_upload:
                    with contextlib.suppress(Exception):
                        artifact_uri = artifact_store.materialize(
                            obj=result,
                            name="output",  # Default name for single output
                            run_id=run_id,
                            step_name=step.name,
                            project_name=project_name,
                        )

            # Type-based artifact routing
            routing_result = None
            try:
                from flowyml.core.routing import route_artifact, should_route

                if should_route(result):
                    # Get return type annotation if available
                    return_type = None
                    try:
                        from flowyml.core.routing import get_step_return_type

                        return_type = get_step_return_type(step.func)
                    except Exception:
                        pass

                    routing_result = route_artifact(
                        output=result,
                        step_name=step.name,
                        run_id=run_id or "local",
                        return_type=return_type,
                        project_name=project_name,
                    )
                    if routing_result and routing_result.store_uri:
                        artifact_uri = routing_result.store_uri
            except ImportError:
                pass  # Routing module not available
            except Exception:
                pass  # Routing failed, continue with normal flow

            # Cache result
            if cache_store and step.cache:
                cache_key = step.get_cache_key(inputs)
                cache_store.set_value(
                    cache_key,
                    result,
                    step.name,
                    step.get_code_hash(),
                )

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=result,
                duration_seconds=duration,
                retries=retries,
                artifact_uri=artifact_uri,
            )

        except Exception as e:
            last_error = str(e)
            error_traceback = traceback.format_exc()
            retries += 1

            if attempt < max_retries:
                # Wait before retry (exponential backoff)
                wait_time = 2**attempt
                time.sleep(wait_time)
                continue

            # All retries exhausted - send error to logs
            if monitor_thread:
                monitor_thread.stop(error=f"{last_error}\n{error_traceback}")
                monitor_thread.join()
                monitor_thread = None  # Prevent double-stop in finally

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"{last_error}\n{error_traceback}",
                duration_seconds=duration,
                retries=retries,
            )

    # Should never reach here
    duration = time.time() - start_time
    return ExecutionResult(
        step_name=step.name,
        success=False,
        error=last_error,
        duration_seconds=duration,
        retries=retries,
    )

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together in the same environment.

For local execution, steps execute sequentially but share the same process.

Parameters:

Name Type Description Default
step_group

StepGroup containing steps to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step in execution order)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup from step_grouping module
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together in the same environment.

    For local execution, steps execute sequentially but share the same process.

    Args:
        step_group: StepGroup containing steps to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step in execution order)
    """
    results: list[ExecutionResult] = []
    step_outputs = dict(inputs)  # Copy initial inputs

    # Execute steps in their defined order
    for step_name in step_group.execution_order:
        # Find the step object
        step = next(s for s in step_group.steps if s.name == step_name)

        # Prepare inputs for this step - map input names to function parameters
        step_inputs = {}

        # Get function signature to properly map inputs to parameters
        sig = inspect.signature(step.func)
        params = list(sig.parameters.values())
        # Filter out self/cls
        params = [p for p in params if p.name not in ("self", "cls")]
        assigned_params = set()

        if step.inputs:
            for i, input_name in enumerate(step.inputs):
                if input_name not in step_outputs:
                    continue

                val = step_outputs[input_name]

                # Check if input name matches a parameter directly
                param_match = next((p for p in params if p.name == input_name), None)

                if param_match:
                    step_inputs[param_match.name] = val
                    assigned_params.add(param_match.name)
                elif i < len(params):
                    # Positional fallback - use the parameter at the same position
                    target_param = params[i]
                    if target_param.name not in assigned_params:
                        step_inputs[target_param.name] = val
                        assigned_params.add(target_param.name)

        # Auto-map parameters from available outputs by name
        for param in params:
            if param.name in step_outputs and param.name not in step_inputs:
                step_inputs[param.name] = step_outputs[param.name]
                assigned_params.add(param.name)

        # Inject context parameters for this specific step
        if context is not None:
            # Use context object to inject params per step
            step_context_params = context.inject_params(step.func)
        elif context_params is not None:
            # Fallback to provided context_params (backward compatibility)
            step_context_params = context_params
        else:
            step_context_params = {}

        # Execute this step
        result = self.execute_step(
            step=step,
            inputs=step_inputs,
            context_params=step_context_params,
            cache_store=cache_store,
            artifact_store=artifact_store,
            run_id=run_id,
            project_name=project_name,
        )

        results.append(result)

        # If step failed, stop group execution
        if not result.success:
            # Mark remaining steps as skipped
            current_index = step_group.execution_order.index(step_name)
            remaining_steps = step_group.execution_order[current_index + 1 :]

            for remaining_name in remaining_steps:
                skip_result = ExecutionResult(
                    step_name=remaining_name,
                    success=True,  # Set to True since skipped steps technically don't fail
                    error="Skipped due to earlier failure in group",
                    skipped=True,
                )
                results.append(skip_result)
            break

        # Store outputs for next steps in group
        if result.output is not None:
            if len(step.outputs) == 1:
                step_outputs[step.outputs[0]] = result.output
            elif isinstance(result.output, (list, tuple)) and len(result.output) == len(step.outputs):
                for name, val in zip(step.outputs, result.output, strict=False):
                    step_outputs[name] = val
            elif isinstance(result.output, dict):
                for name in step.outputs:
                    if name in result.output:
                        step_outputs[name] = result.output[name]
            else:
                if step.outputs:
                    step_outputs[step.outputs[0]] = result.output

    return results

Vertex AI Orchestrator

Bases: RemoteOrchestrator

Vertex AI orchestrator for running pipelines on Google Cloud.

This orchestrator submits pipeline jobs to Vertex AI Pipelines, allowing for scalable, managed execution in the cloud.

Example
from flowyml.stacks.gcp import VertexAIOrchestrator

orchestrator = VertexAIOrchestrator(
    project_id="my-gcp-project", region="us-central1", service_account="my-sa@my-project.iam.gserviceaccount.com"
)

Initialize Vertex AI orchestrator.

Parameters:

Name Type Description Default
name str

Name of the orchestrator

'vertex_ai'
project_id str | None

GCP project ID

None
region str

GCP region for Vertex AI

'europe-west1'
service_account str | None

Service account email for job execution

None
network str | None

VPC network for jobs

None
encryption_key str | None

Customer-managed encryption key

None
Source code in flowyml/stacks/gcp.py
def __init__(
    self,
    name: str = "vertex_ai",
    project_id: str | None = None,
    region: str = "europe-west1",
    service_account: str | None = None,
    network: str | None = None,
    encryption_key: str | None = None,
):
    """Initialize Vertex AI orchestrator.

    Args:
        name: Name of the orchestrator
        project_id: GCP project ID
        region: GCP region for Vertex AI
        service_account: Service account email for job execution
        network: VPC network for jobs
        encryption_key: Customer-managed encryption key
    """
    super().__init__(name)
    self.project_id = project_id
    self.region = region
    self.service_account = service_account
    self.network = network
    self.encryption_key = encryption_key

Functions

get_run_logs(job_id: str) -> str

Get logs for a Vertex AI job.

Parameters:

Name Type Description Default
job_id str

The job resource name.

required

Returns:

Type Description
str

String containing the logs.

Source code in flowyml/stacks/gcp.py
def get_run_logs(self, job_id: str) -> str:
    """Get logs for a Vertex AI job.

    Args:
        job_id: The job resource name.

    Returns:
        String containing the logs.
    """
    try:
        from google.cloud import logging

        client = logging.Client(project=self.project_id)
        job_name = job_id.split("/")[-1]

        # Filter logs for this job
        # Note: This is a simplified filter; exact filter depends on Vertex AI logging format
        filter_str = f'resource.type="ml_job" AND conversion_id="{job_name}"'

        entries = client.list_entries(filter_=filter_str, order_by=logging.DESCENDING, max_results=100)
        logs = []
        for entry in entries:
            if entry.payload:
                logs.append(str(entry.payload))

        return "\n".join(reversed(logs)) if logs else "No logs found."

    except Exception as e:
        return f"Failed to fetch logs: {e}"

get_run_status(job_id: str) -> ExecutionStatus

Get status of a Vertex AI job.

Source code in flowyml/stacks/gcp.py
def get_run_status(self, job_id: str) -> "ExecutionStatus":
    """Get status of a Vertex AI job."""
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        state = job.state.name

        # Map Vertex AI states to ExecutionStatus
        status_map = {
            "JOB_STATE_QUEUED": ExecutionStatus.PROVISIONING,
            "JOB_STATE_PENDING": ExecutionStatus.PROVISIONING,
            "JOB_STATE_RUNNING": ExecutionStatus.RUNNING,
            "JOB_STATE_SUCCEEDED": ExecutionStatus.COMPLETED,
            "JOB_STATE_FAILED": ExecutionStatus.FAILED,
            "JOB_STATE_CANCELLING": ExecutionStatus.STOPPING,
            "JOB_STATE_CANCELLED": ExecutionStatus.CANCELLED,
        }
        return status_map.get(state, ExecutionStatus.RUNNING)
    except Exception as e:
        print(f"Error fetching job status: {e}")
        return ExecutionStatus.FAILED

run_pipeline(pipeline: Any, run_id: str, resources: ResourceConfig | None = None, docker_config: DockerConfig | None = None, inputs: dict[str, Any] | None = None, context: dict[str, Any] | None = None, **kwargs) -> SubmissionResult

Run pipeline on Vertex AI.

Parameters:

Name Type Description Default
pipeline Any

Pipeline to run

required
run_id str

Run identifier

required
resources ResourceConfig | None

Resource configuration

None
docker_config DockerConfig | None

Docker configuration

None
inputs dict[str, Any] | None

Input data

None
context dict[str, Any] | None

Context variables

None
**kwargs

Additional arguments

{}

Returns:

Type Description
SubmissionResult

SubmissionResult with job resource name

Source code in flowyml/stacks/gcp.py
def run_pipeline(
    self,
    pipeline: Any,
    run_id: str,
    resources: ResourceConfig | None = None,
    docker_config: DockerConfig | None = None,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    **kwargs,
) -> "SubmissionResult":
    """Run pipeline on Vertex AI.

    Args:
        pipeline: Pipeline to run
        run_id: Run identifier
        resources: Resource configuration
        docker_config: Docker configuration
        inputs: Input data
        context: Context variables
        **kwargs: Additional arguments

    Returns:
        SubmissionResult with job resource name
    """
    from google.cloud import aiplatform
    import time

    # Initialize Vertex AI
    aiplatform.init(project=self.project_id, location=self.region)

    # Create custom job
    job_display_name = f"{pipeline.name}-{run_id[:8]}"

    # Build worker pool specs
    worker_pool_specs = self._build_worker_pool_specs(
        docker_config=docker_config,
        resources=resources,
    )

    # Create and run custom job
    job = aiplatform.CustomJob(
        display_name=job_display_name,
        worker_pool_specs=worker_pool_specs,
        service_account=self.service_account,
        network=self.network,
        encryption_spec_key_name=self.encryption_key,
    )

    # Submit job asynchronously
    job.submit()

    job_id = job.resource_name

    # Create wait function
    def wait_for_completion():
        """Poll job status until completion."""
        while True:
            status = self.get_run_status(job_id)
            if status.is_finished:
                if not status.is_successful:
                    raise RuntimeError(f"Vertex AI job {job_id} failed with status: {status}")
                break
            time.sleep(15)  # Poll every 15 seconds

    return SubmissionResult(
        job_id=job_id,
        wait_for_completion=wait_for_completion,
        metadata={
            "platform": "vertex_ai",
            "project": self.project_id,
            "region": self.region,
            "job_name": job_display_name,
        },
    )

stop_run(job_id: str, graceful: bool = True) -> None

Cancel a Vertex AI job.

Source code in flowyml/stacks/gcp.py
def stop_run(self, job_id: str, graceful: bool = True) -> None:
    """Cancel a Vertex AI job."""
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        job.cancel()
    except Exception as e:
        print(f"Error cancelling job {job_id}: {e}")
        raise

to_dict() -> dict[str, Any]

Convert to dictionary.

Source code in flowyml/stacks/gcp.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "name": self.name,
        "type": "vertex_ai",
        "project_id": self.project_id,
        "region": self.region,
        "service_account": self.service_account,
        "network": self.network,
    }

validate() -> bool

Validate Vertex AI configuration.

Source code in flowyml/stacks/gcp.py
def validate(self) -> bool:
    """Validate Vertex AI configuration."""
    if not self.project_id:
        raise ValueError("project_id is required for VertexAIOrchestrator")

    # Check if google-cloud-aiplatform is installed
    import importlib.util

    if importlib.util.find_spec("google.cloud.aiplatform") is not None:
        return True
    raise ImportError(
        "google-cloud-aiplatform is required for VertexAIOrchestrator. "
        "Install with: pip install google-cloud-aiplatform",
    )