Skip to content

Orchestrators API 🎼

Orchestrators manage the execution of pipeline steps.

Base Executor

Base executor for running pipeline steps.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None) -> ExecutionResult

Execute a single step.

Parameters:

Name Type Description Default
step

Step to execute

required
inputs dict[str, Any]

Input data for the step

required
context_params dict[str, Any]

Parameters from context

required
cache_store Any | None

Cache store for caching

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output or error

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
) -> ExecutionResult:
    """Execute a single step.

    Args:
        step: Step to execute
        inputs: Input data for the step
        context_params: Parameters from context
        cache_store: Cache store for caching

    Returns:
        ExecutionResult with output or error
    """
    raise NotImplementedError

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together.

Parameters:

Name Type Description Default
step_group

StepGroup to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together.

    Args:
        step_group: StepGroup to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step)
    """
    raise NotImplementedError

Local Executor

Bases: Executor

Local executor - runs steps in the current process.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> ExecutionResult

Execute step locally with retry, caching, and materialization.

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> ExecutionResult:
    """Execute step locally with retry, caching, and materialization."""
    start_time = time.time()
    retries = 0

    # Check condition
    if step.condition:
        try:
            # We pass inputs and context params to condition if it accepts them
            # For simplicity, let's try to inspect the condition function
            # or just pass what we can.
            # A simple approach: pass nothing if it takes no args, or kwargs if it does.
            # But inspect is safer.
            sig = inspect.signature(step.condition)
            kwargs = {**inputs, **context_params}

            # Filter kwargs to only what condition accepts
            cond_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters}

            should_run = step.condition(**cond_kwargs)

            if not should_run:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=True,
                    output=None,  # Skipped steps produce None
                    duration_seconds=duration,
                    skipped=True,
                )
        except Exception as e:
            # If condition check fails, treat as step failure
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"Condition check failed: {str(e)}",
                duration_seconds=duration,
            )

    # Check cache
    if cache_store and step.cache:
        cache_key = step.get_cache_key(inputs)
        cached_result = cache_store.get(cache_key)

        if cached_result is not None:
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=cached_result,
                duration_seconds=duration,
                cached=True,
            )

    # Execute with retry
    max_retries = step.retry
    last_error = None

    for attempt in range(max_retries + 1):
        try:
            # Prepare arguments
            kwargs = {**inputs, **context_params}

            # Execute step
            monitor_thread = None
            log_capture = None
            original_stdout = None
            original_stderr = None
            try:
                # Start monitoring thread with log capture if run_id is present
                if run_id:
                    import sys

                    log_capture = LogCapture()
                    original_stdout = sys.stdout
                    original_stderr = sys.stderr
                    sys.stdout = log_capture
                    sys.stderr = log_capture

                    monitor_thread = MonitorThread(
                        run_id=run_id,
                        step_name=step.name,
                        target_tid=threading.get_ident(),
                        log_capture=log_capture,
                    )
                    monitor_thread.start()

                result = step.func(**kwargs)
            except StopExecution:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=False,
                    error="Execution stopped by user",
                    duration_seconds=duration,
                    retries=retries,
                )
            finally:
                # Restore stdout/stderr
                if original_stdout:
                    import sys

                    sys.stdout = original_stdout
                if original_stderr:
                    import sys

                    sys.stderr = original_stderr

                # Stop monitor thread (only if not already stopped in exception handler)
                if monitor_thread and not monitor_thread._stop_event.is_set():
                    monitor_thread.stop()
                    monitor_thread.join()

            # Materialize output if artifact store is available
            artifact_uri = None
            if artifact_store and result is not None and run_id:
                with contextlib.suppress(Exception):
                    artifact_uri = artifact_store.materialize(
                        obj=result,
                        name="output",  # Default name for single output
                        run_id=run_id,
                        step_name=step.name,
                        project_name=project_name,
                    )

            # Cache result
            if cache_store and step.cache:
                cache_key = step.get_cache_key(inputs)
                cache_store.set_value(
                    cache_key,
                    result,
                    step.name,
                    step.get_code_hash(),
                )

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=result,
                duration_seconds=duration,
                retries=retries,
                artifact_uri=artifact_uri,
            )

        except Exception as e:
            last_error = str(e)
            error_traceback = traceback.format_exc()
            retries += 1

            if attempt < max_retries:
                # Wait before retry (exponential backoff)
                wait_time = 2**attempt
                time.sleep(wait_time)
                continue

            # All retries exhausted - send error to logs
            if monitor_thread:
                monitor_thread.stop(error=f"{last_error}\n{error_traceback}")
                monitor_thread.join()
                monitor_thread = None  # Prevent double-stop in finally

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"{last_error}\n{error_traceback}",
                duration_seconds=duration,
                retries=retries,
            )

    # Should never reach here
    duration = time.time() - start_time
    return ExecutionResult(
        step_name=step.name,
        success=False,
        error=last_error,
        duration_seconds=duration,
        retries=retries,
    )

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together in the same environment.

For local execution, steps execute sequentially but share the same process.

Parameters:

Name Type Description Default
step_group

StepGroup containing steps to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step in execution order)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup from step_grouping module
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together in the same environment.

    For local execution, steps execute sequentially but share the same process.

    Args:
        step_group: StepGroup containing steps to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step in execution order)
    """
    results: list[ExecutionResult] = []
    step_outputs = dict(inputs)  # Copy initial inputs

    # Execute steps in their defined order
    for step_name in step_group.execution_order:
        # Find the step object
        step = next(s for s in step_group.steps if s.name == step_name)

        # Prepare inputs for this step - map input names to function parameters
        step_inputs = {}

        # Get function signature to properly map inputs to parameters
        sig = inspect.signature(step.func)
        params = list(sig.parameters.values())
        # Filter out self/cls
        params = [p for p in params if p.name not in ("self", "cls")]
        assigned_params = set()

        if step.inputs:
            for i, input_name in enumerate(step.inputs):
                if input_name not in step_outputs:
                    continue

                val = step_outputs[input_name]

                # Check if input name matches a parameter directly
                param_match = next((p for p in params if p.name == input_name), None)

                if param_match:
                    step_inputs[param_match.name] = val
                    assigned_params.add(param_match.name)
                elif i < len(params):
                    # Positional fallback - use the parameter at the same position
                    target_param = params[i]
                    if target_param.name not in assigned_params:
                        step_inputs[target_param.name] = val
                        assigned_params.add(target_param.name)

        # Auto-map parameters from available outputs by name
        for param in params:
            if param.name in step_outputs and param.name not in step_inputs:
                step_inputs[param.name] = step_outputs[param.name]
                assigned_params.add(param.name)

        # Inject context parameters for this specific step
        if context is not None:
            # Use context object to inject params per step
            step_context_params = context.inject_params(step.func)
        elif context_params is not None:
            # Fallback to provided context_params (backward compatibility)
            step_context_params = context_params
        else:
            step_context_params = {}

        # Execute this step
        result = self.execute_step(
            step=step,
            inputs=step_inputs,
            context_params=step_context_params,
            cache_store=cache_store,
            artifact_store=artifact_store,
            run_id=run_id,
            project_name=project_name,
        )

        results.append(result)

        # If step failed, stop group execution
        if not result.success:
            # Mark remaining steps as skipped
            current_index = step_group.execution_order.index(step_name)
            remaining_steps = step_group.execution_order[current_index + 1 :]

            for remaining_name in remaining_steps:
                skip_result = ExecutionResult(
                    step_name=remaining_name,
                    success=True,  # Set to True since skipped steps technically don't fail
                    error="Skipped due to earlier failure in group",
                    skipped=True,
                )
                results.append(skip_result)
            break

        # Store outputs for next steps in group
        if result.output is not None:
            if len(step.outputs) == 1:
                step_outputs[step.outputs[0]] = result.output
            elif isinstance(result.output, (list, tuple)) and len(result.output) == len(step.outputs):
                for name, val in zip(step.outputs, result.output, strict=False):
                    step_outputs[name] = val
            elif isinstance(result.output, dict):
                for name in step.outputs:
                    if name in result.output:
                        step_outputs[name] = result.output[name]
            else:
                if step.outputs:
                    step_outputs[step.outputs[0]] = result.output

    return results

Vertex AI Orchestrator

Bases: RemoteOrchestrator

Vertex AI orchestrator for running pipelines on Google Cloud.

This orchestrator submits pipeline jobs to Vertex AI Pipelines, allowing for scalable, managed execution in the cloud.

Example
from flowyml.stacks.gcp import VertexAIOrchestrator

orchestrator = VertexAIOrchestrator(
    project_id="my-gcp-project", region="us-central1", service_account="my-sa@my-project.iam.gserviceaccount.com"
)

Initialize Vertex AI orchestrator.

Parameters:

Name Type Description Default
name str

Name of the orchestrator

'vertex_ai'
project_id str | None

GCP project ID

None
region str

GCP region for Vertex AI

'us-central1'
service_account str | None

Service account email for job execution

None
network str | None

VPC network for jobs

None
encryption_key str | None

Customer-managed encryption key

None
Source code in flowyml/stacks/gcp.py
def __init__(
    self,
    name: str = "vertex_ai",
    project_id: str | None = None,
    region: str = "us-central1",
    service_account: str | None = None,
    network: str | None = None,
    encryption_key: str | None = None,
):
    """Initialize Vertex AI orchestrator.

    Args:
        name: Name of the orchestrator
        project_id: GCP project ID
        region: GCP region for Vertex AI
        service_account: Service account email for job execution
        network: VPC network for jobs
        encryption_key: Customer-managed encryption key
    """
    super().__init__(name)
    self.project_id = project_id
    self.region = region
    self.service_account = service_account
    self.network = network
    self.encryption_key = encryption_key

Functions

get_run_status(job_id: str) -> ExecutionStatus

Get status of a Vertex AI job.

Source code in flowyml/stacks/gcp.py
def get_run_status(self, job_id: str) -> "ExecutionStatus":
    """Get status of a Vertex AI job."""
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        state = job.state.name

        # Map Vertex AI states to ExecutionStatus
        status_map = {
            "JOB_STATE_QUEUED": ExecutionStatus.PROVISIONING,
            "JOB_STATE_PENDING": ExecutionStatus.PROVISIONING,
            "JOB_STATE_RUNNING": ExecutionStatus.RUNNING,
            "JOB_STATE_SUCCEEDED": ExecutionStatus.COMPLETED,
            "JOB_STATE_FAILED": ExecutionStatus.FAILED,
            "JOB_STATE_CANCELLING": ExecutionStatus.STOPPING,
            "JOB_STATE_CANCELLED": ExecutionStatus.CANCELLED,
        }
        return status_map.get(state, ExecutionStatus.RUNNING)
    except Exception as e:
        print(f"Error fetching job status: {e}")
        return ExecutionStatus.FAILED

run_pipeline(pipeline: Any, run_id: str, resources: ResourceConfig | None = None, docker_config: DockerConfig | None = None, inputs: dict[str, Any] | None = None, context: dict[str, Any] | None = None, **kwargs) -> SubmissionResult

Run pipeline on Vertex AI.

Parameters:

Name Type Description Default
pipeline Any

Pipeline to run

required
run_id str

Run identifier

required
resources ResourceConfig | None

Resource configuration

None
docker_config DockerConfig | None

Docker configuration

None
inputs dict[str, Any] | None

Input data

None
context dict[str, Any] | None

Context variables

None
**kwargs

Additional arguments

{}

Returns:

Type Description
SubmissionResult

SubmissionResult with job resource name

Source code in flowyml/stacks/gcp.py
def run_pipeline(
    self,
    pipeline: Any,
    run_id: str,
    resources: ResourceConfig | None = None,
    docker_config: DockerConfig | None = None,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    **kwargs,
) -> "SubmissionResult":
    """Run pipeline on Vertex AI.

    Args:
        pipeline: Pipeline to run
        run_id: Run identifier
        resources: Resource configuration
        docker_config: Docker configuration
        inputs: Input data
        context: Context variables
        **kwargs: Additional arguments

    Returns:
        SubmissionResult with job resource name
    """
    from google.cloud import aiplatform
    import time

    # Initialize Vertex AI
    aiplatform.init(project=self.project_id, location=self.region)

    # Create custom job
    job_display_name = f"{pipeline.name}-{run_id[:8]}"

    # Build worker pool specs
    worker_pool_specs = self._build_worker_pool_specs(
        docker_config=docker_config,
        resources=resources,
    )

    # Create and run custom job
    job = aiplatform.CustomJob(
        display_name=job_display_name,
        worker_pool_specs=worker_pool_specs,
        service_account=self.service_account,
        network=self.network,
        encryption_spec_key_name=self.encryption_key,
    )

    # Submit job asynchronously
    job.submit()

    job_id = job.resource_name

    # Create wait function
    def wait_for_completion():
        """Poll job status until completion."""
        while True:
            status = self.get_run_status(job_id)
            if status.is_finished:
                if not status.is_successful:
                    raise RuntimeError(f"Vertex AI job {job_id} failed with status: {status}")
                break
            time.sleep(15)  # Poll every 15 seconds

    return SubmissionResult(
        job_id=job_id,
        wait_for_completion=wait_for_completion,
        metadata={
            "platform": "vertex_ai",
            "project": self.project_id,
            "region": self.region,
            "job_name": job_display_name,
        },
    )

stop_run(job_id: str, graceful: bool = True) -> None

Cancel a Vertex AI job.

Source code in flowyml/stacks/gcp.py
def stop_run(self, job_id: str, graceful: bool = True) -> None:
    """Cancel a Vertex AI job."""
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        job.cancel()
    except Exception as e:
        print(f"Error cancelling job {job_id}: {e}")
        raise

to_dict() -> dict[str, Any]

Convert to dictionary.

Source code in flowyml/stacks/gcp.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "name": self.name,
        "type": "vertex_ai",
        "project_id": self.project_id,
        "region": self.region,
        "service_account": self.service_account,
        "network": self.network,
    }

validate() -> bool

Validate Vertex AI configuration.

Source code in flowyml/stacks/gcp.py
def validate(self) -> bool:
    """Validate Vertex AI configuration."""
    if not self.project_id:
        raise ValueError("project_id is required for VertexAIOrchestrator")

    # Check if google-cloud-aiplatform is installed
    import importlib.util

    if importlib.util.find_spec("google.cloud.aiplatform") is not None:
        return True
    raise ImportError(
        "google-cloud-aiplatform is required for VertexAIOrchestrator. "
        "Install with: pip install google-cloud-aiplatform",
    )