Skip to content

Orchestrators API 🎼

Orchestrators form the execution layer of FlowyML. They decide where and how each step runs β€” locally in a subprocess, inside a Docker container, or remotely on managed infrastructure such as Vertex AI, SageMaker, or Kubernetes. Every orchestrator implements the Executor base class, ensuring a uniform interface for step dispatch, resource allocation, and failure handling regardless of the target environment.

Orchestrators manage the execution of pipeline steps.

Base Executor

Base executor for running pipeline steps.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default', all_outputs: dict[str, Any] | None = None) -> ExecutionResult

Execute a single step.

Parameters:

Name Type Description Default
step

Step to execute

required
inputs dict[str, Any]

Input data for the step

required
context_params dict[str, Any]

Parameters from context

required
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for logging results

None
run_id str | None

Unique ID for this pipeline run

None
project_name str

Name of the project

'default'
all_outputs dict[str, Any] | None

Collection of all step outputs for conditional evaluation

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output or error

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
    all_outputs: dict[str, Any] | None = None,
) -> ExecutionResult:
    """Execute a single step.

    Args:
        step: Step to execute
        inputs: Input data for the step
        context_params: Parameters from context
        cache_store: Cache store for caching
        artifact_store: Artifact store for logging results
        run_id: Unique ID for this pipeline run
        project_name: Name of the project
        all_outputs: Collection of all step outputs for conditional evaluation

    Returns:
        ExecutionResult with output or error
    """
    raise NotImplementedError

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together.

Parameters:

Name Type Description Default
step_group

StepGroup to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together.

    Args:
        step_group: StepGroup to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step)
    """
    raise NotImplementedError

Local Executor

Bases: Executor

Local executor - runs steps in the current process.

Functions

execute_step(step, inputs: dict[str, Any], context_params: dict[str, Any], cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default', all_outputs: dict[str, Any] | None = None) -> ExecutionResult

Execute step locally with retry, caching, and materialization.

Parameters:

Name Type Description Default
step

Step to execute

required
inputs dict[str, Any]

Input data for the step

required
context_params dict[str, Any]

Parameters from context

required
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for logging results

None
run_id str | None

Unique ID for this pipeline run

None
project_name str

Name of the project

'default'
all_outputs dict[str, Any] | None

Collection of all step outputs for conditional evaluation

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output or error

Source code in flowyml/core/executor.py
def execute_step(
    self,
    step,
    inputs: dict[str, Any],
    context_params: dict[str, Any],
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
    all_outputs: dict[str, Any] | None = None,
) -> ExecutionResult:
    """Execute step locally with retry, caching, and materialization.

    Args:
        step: Step to execute
        inputs: Input data for the step
        context_params: Parameters from context
        cache_store: Cache store for caching
        artifact_store: Artifact store for logging results
        run_id: Unique ID for this pipeline run
        project_name: Name of the project
        all_outputs: Collection of all step outputs for conditional evaluation

    Returns:
        ExecutionResult with output or error
    """
    start_time = time.time()
    retries = 0

    # Check condition
    if step.condition:
        try:
            # Prepare kwargs for condition: inputs + context_params + all_outputs
            sig = inspect.signature(step.condition)
            kwargs = {**context_params}

            # Add all outputs so far (paths like 'data/processed')
            if all_outputs:
                kwargs.update(all_outputs)
                # Also flatten dict outputs to allow access to keys like 'quality_score'
                for val in all_outputs.values():
                    if isinstance(val, dict):
                        kwargs.update({k: v for k, v in val.items() if k not in kwargs})

            # Add direct inputs (might override all_outputs if paths match)
            kwargs.update(inputs)

            # Filter kwargs to only what condition accepts
            cond_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters}

            should_run = step.condition(**cond_kwargs)

            if not should_run:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=True,
                    output=None,  # Skipped steps produce None
                    duration_seconds=duration,
                    skipped=True,
                )
        except Exception as e:
            # If condition check fails, treat as step failure
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"Condition check failed: {str(e)}",
                duration_seconds=duration,
            )

    # Check cache
    if cache_store and step.cache:
        cache_key = step.get_cache_key(inputs, context_params=context_params)
        cached_result = cache_store.get(cache_key)

        if cached_result is not None:
            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=cached_result,
                duration_seconds=duration,
                cached=True,
            )

    # Execute with retry
    max_retries = step.retry
    last_error = None

    for attempt in range(max_retries + 1):
        try:
            # Prepare arguments
            kwargs = {**inputs, **context_params}

            # Execute step
            monitor_thread = None
            log_capture = None
            original_stdout = None
            original_stderr = None
            try:
                # Start monitoring thread with log capture if run_id is present
                if run_id:
                    import sys

                    log_capture = LogCapture()
                    original_stdout = sys.stdout
                    original_stderr = sys.stderr
                    sys.stdout = log_capture
                    sys.stderr = log_capture

                    monitor_thread = MonitorThread(
                        run_id=run_id,
                        step_name=step.name,
                        target_tid=threading.get_ident(),
                        log_capture=log_capture,
                    )
                    monitor_thread.start()

                # Filter kwargs to only what the function accepts
                func_sig = inspect.signature(step.func)
                # Handle *args/**kwargs if needed, but for now strict matching is safer for steps
                filtered_kwargs = {k: v for k, v in kwargs.items() if k in func_sig.parameters}

                result = step.func(**filtered_kwargs)
            except StopExecution:
                duration = time.time() - start_time
                return ExecutionResult(
                    step_name=step.name,
                    success=False,
                    error="Execution stopped by user",
                    duration_seconds=duration,
                    retries=retries,
                )
            finally:
                # Restore stdout/stderr
                if original_stdout:
                    import sys

                    sys.stdout = original_stdout
                if original_stderr:
                    import sys

                    sys.stderr = original_stderr

                # Stop monitor thread (only if not already stopped in exception handler)
                if monitor_thread and not monitor_thread._stop_event.is_set():
                    monitor_thread.stop()
                    monitor_thread.join()

            # Materialize output if artifact store is available
            # Only upload if the result is an Asset with upload=True
            artifact_uri = None
            if artifact_store and result is not None and run_id:
                # Check if result is an Asset and respects upload flag
                should_upload = True
                try:
                    from flowyml.assets.base import Asset

                    if isinstance(result, Asset):
                        should_upload = getattr(result, "upload", False)
                except ImportError:
                    pass

                if should_upload:
                    with contextlib.suppress(Exception):
                        artifact_uri = artifact_store.materialize(
                            obj=result,
                            name="output",  # Default name for single output
                            run_id=run_id,
                            step_name=step.name,
                            project_name=project_name,
                        )

            # Type-based artifact routing
            routing_result = None
            try:
                from flowyml.core.routing import route_artifact, should_route

                if should_route(result):
                    # Get return type annotation if available
                    return_type = None
                    try:
                        from flowyml.core.routing import get_step_return_type

                        return_type = get_step_return_type(step.func)
                    except Exception:
                        pass

                    routing_result = route_artifact(
                        output=result,
                        step_name=step.name,
                        run_id=run_id or "local",
                        return_type=return_type,
                        project_name=project_name,
                    )
                    if routing_result and routing_result.store_uri:
                        artifact_uri = routing_result.store_uri
            except ImportError:
                pass  # Routing module not available
            except Exception:
                pass  # Routing failed, continue with normal flow

            # Auto-register in artifact catalog for lineage tracking
            try:
                from flowyml.storage.catalog.manager import ArtifactCatalog

                catalog = ArtifactCatalog()
                catalog.register(
                    name=f"{step.name}_output",
                    artifact_type=type(result).__name__,
                    data=result,
                    source_step=step.name,
                    source_pipeline=project_name,
                    source_run_id=run_id or "local",
                )
            except ImportError:
                pass  # Catalog not available
            except Exception:
                pass  # Catalog registration failed, don't block execution

            # Cache result
            if cache_store and step.cache:
                cache_key = step.get_cache_key(inputs, context_params=context_params)
                cache_store.set_value(
                    cache_key,
                    result,
                    step.name,
                    step.get_code_hash(),
                )

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=True,
                output=result,
                duration_seconds=duration,
                retries=retries,
                artifact_uri=artifact_uri,
            )

        except Exception as e:
            last_error = str(e)
            error_traceback = traceback.format_exc()
            retries += 1

            if attempt < max_retries:
                # Wait before retry (exponential backoff)
                wait_time = 2**attempt
                time.sleep(wait_time)
                continue

            # All retries exhausted - send error to logs
            if monitor_thread:
                monitor_thread.stop(error=f"{last_error}\n{error_traceback}")
                monitor_thread.join()
                monitor_thread = None  # Prevent double-stop in finally

            duration = time.time() - start_time
            return ExecutionResult(
                step_name=step.name,
                success=False,
                error=f"{last_error}\n{error_traceback}",
                duration_seconds=duration,
                retries=retries,
            )

    # Should never reach here
    duration = time.time() - start_time
    return ExecutionResult(
        step_name=step.name,
        success=False,
        error=last_error,
        duration_seconds=duration,
        retries=retries,
    )

execute_step_group(step_group, inputs: dict[str, Any], context: Any | None = None, context_params: dict[str, Any] | None = None, cache_store: Any | None = None, artifact_store: Any | None = None, run_id: str | None = None, project_name: str = 'default') -> list[ExecutionResult]

Execute a group of steps together in the same environment.

For local execution, steps execute sequentially but share the same process.

Parameters:

Name Type Description Default
step_group

StepGroup containing steps to execute

required
inputs dict[str, Any]

Input data available to the group

required
context Any | None

Context object for per-step parameter injection (preferred)

None
context_params dict[str, Any] | None

Parameters from context (deprecated, use context instead)

None
cache_store Any | None

Cache store for caching

None
artifact_store Any | None

Artifact store for materialization

None
run_id str | None

Run identifier

None
project_name str

Project name

'default'

Returns:

Type Description
list[ExecutionResult]

List of ExecutionResult (one per step in execution order)

Source code in flowyml/core/executor.py
def execute_step_group(
    self,
    step_group,  # StepGroup from step_grouping module
    inputs: dict[str, Any],
    context: Any | None = None,  # Context object for per-step injection
    context_params: dict[str, Any] | None = None,  # Deprecated: use context instead
    cache_store: Any | None = None,
    artifact_store: Any | None = None,
    run_id: str | None = None,
    project_name: str = "default",
) -> list[ExecutionResult]:
    """Execute a group of steps together in the same environment.

    For local execution, steps execute sequentially but share the same process.

    Args:
        step_group: StepGroup containing steps to execute
        inputs: Input data available to the group
        context: Context object for per-step parameter injection (preferred)
        context_params: Parameters from context (deprecated, use context instead)
        cache_store: Cache store for caching
        artifact_store: Artifact store for materialization
        run_id: Run identifier
        project_name: Project name

    Returns:
        List of ExecutionResult (one per step in execution order)
    """
    results: list[ExecutionResult] = []
    step_outputs = dict(inputs)  # Copy initial inputs

    # Execute steps in their defined order
    for step_name in step_group.execution_order:
        # Find the step object
        step = next(s for s in step_group.steps if s.name == step_name)

        # Prepare inputs for this step - map input names to function parameters
        step_inputs = {}

        # Get function signature to properly map inputs to parameters
        sig = inspect.signature(step.func)
        params = list(sig.parameters.values())
        # Filter out self/cls
        params = [p for p in params if p.name not in ("self", "cls")]
        assigned_params = set()

        if step.inputs:
            for i, input_name in enumerate(step.inputs):
                if input_name not in step_outputs:
                    continue

                val = step_outputs[input_name]

                # Check if input name matches a parameter directly
                param_match = next((p for p in params if p.name == input_name), None)

                if param_match:
                    step_inputs[param_match.name] = val
                    assigned_params.add(param_match.name)
                elif i < len(params):
                    # Positional fallback - use the parameter at the same position
                    target_param = params[i]
                    if target_param.name not in assigned_params:
                        step_inputs[target_param.name] = val
                        assigned_params.add(target_param.name)

        # Auto-map parameters from available outputs by name
        for param in params:
            if param.name in step_outputs and param.name not in step_inputs:
                step_inputs[param.name] = step_outputs[param.name]
                assigned_params.add(param.name)

        # Inject context parameters for this specific step
        if context is not None:
            # Use context object to inject params per step
            step_context_params = context.inject_params(step.func)
        elif context_params is not None:
            # Fallback to provided context_params (backward compatibility)
            step_context_params = context_params
        else:
            step_context_params = {}

        # Execute this step
        result = self.execute_step(
            step=step,
            inputs=step_inputs,
            context_params=step_context_params,
            cache_store=cache_store,
            artifact_store=artifact_store,
            run_id=run_id,
            project_name=project_name,
        )

        results.append(result)

        # If step failed, stop group execution
        if not result.success:
            # Mark remaining steps as skipped
            current_index = step_group.execution_order.index(step_name)
            remaining_steps = step_group.execution_order[current_index + 1 :]

            for remaining_name in remaining_steps:
                skip_result = ExecutionResult(
                    step_name=remaining_name,
                    success=True,  # Set to True since skipped steps technically don't fail
                    error="Skipped due to earlier failure in group",
                    skipped=True,
                )
                results.append(skip_result)
            break

        # Store outputs for next steps in group
        if result.output is not None:
            if len(step.outputs) == 1:
                step_outputs[step.outputs[0]] = result.output
            elif isinstance(result.output, (list, tuple)) and len(result.output) == len(
                step.outputs,
            ):
                for name, val in zip(step.outputs, result.output, strict=False):
                    step_outputs[name] = val
            elif isinstance(result.output, dict):
                for name in step.outputs:
                    if name in result.output:
                        step_outputs[name] = result.output[name]
            else:
                if step.outputs:
                    step_outputs[step.outputs[0]] = result.output

    return results

Vertex AI Orchestrator

Bases: RemoteOrchestrator

Vertex AI orchestrator for running pipelines on Google Cloud.

This orchestrator submits pipeline jobs to Vertex AI Pipelines, allowing for scalable, managed execution in the cloud.

Example
from flowyml.stacks.gcp import VertexAIOrchestrator

orchestrator = VertexAIOrchestrator(
    project_id="my-gcp-project", region="us-central1", service_account="my-sa@my-project.iam.gserviceaccount.com"
)

Initialize Vertex AI orchestrator.

Parameters:

Name Type Description Default
name str

Name of the orchestrator

'vertex_ai'
project_id str | None

GCP project ID

None
region str

GCP region for Vertex AI

'europe-west1'
service_account str | None

Service account email for job execution

None
network str | None

VPC network for jobs

None
encryption_key str | None

Customer-managed encryption key

None
staging_bucket str | None

GCS staging bucket for job artifacts

None
Source code in flowyml/stacks/gcp.py
def __init__(
    self,
    name: str = "vertex_ai",
    project_id: str | None = None,
    region: str = "europe-west1",
    service_account: str | None = None,
    network: str | None = None,
    encryption_key: str | None = None,
    staging_bucket: str | None = None,
):
    """Initialize Vertex AI orchestrator.

    Args:
        name: Name of the orchestrator
        project_id: GCP project ID
        region: GCP region for Vertex AI
        service_account: Service account email for job execution
        network: VPC network for jobs
        encryption_key: Customer-managed encryption key
        staging_bucket: GCS staging bucket for job artifacts
    """
    super().__init__(name)
    self.project_id = project_id
    self.region = region
    self.service_account = service_account
    self.network = network
    self.encryption_key = encryption_key
    self.staging_bucket = staging_bucket

Functions

get_run_details(job_id: str) -> dict

Get full details of a Vertex AI job.

Returns comprehensive job metadata including timing, resource usage, and error information for the FlowyML dashboard.

Parameters:

Name Type Description Default
job_id str

The job resource name.

required

Returns:

Type Description
dict

Dictionary with job details.

Source code in flowyml/stacks/gcp.py
def get_run_details(self, job_id: str) -> dict:
    """Get full details of a Vertex AI job.

    Returns comprehensive job metadata including timing, resource
    usage, and error information for the FlowyML dashboard.

    Args:
        job_id: The job resource name.

    Returns:
        Dictionary with job details.
    """
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        return {
            "job_id": job.resource_name,
            "display_name": job.display_name,
            "state": job.state.name,
            "create_time": job.create_time.isoformat() if job.create_time else None,
            "start_time": job.start_time.isoformat() if job.start_time else None,
            "end_time": job.end_time.isoformat() if job.end_time else None,
            "error": str(job.error) if job.error else None,
            "labels": dict(job.labels) if job.labels else {},
            "platform": "vertex_ai",
            "project": self.project_id,
            "region": self.region,
            "console_url": (
                f"https://console.cloud.google.com/vertex-ai/training/"
                f"{job.resource_name.split('/')[-1]}?project={self.project_id}"
            ),
        }
    except Exception as e:
        return {"job_id": job_id, "error": str(e)}

get_run_logs(job_id: str, max_entries: int = 200) -> str

Get logs for a Vertex AI job from Cloud Logging.

Parameters:

Name Type Description Default
job_id str

The job resource name.

required
max_entries int

Maximum number of log entries to fetch.

200

Returns:

Type Description
str

String containing the logs.

Source code in flowyml/stacks/gcp.py
def get_run_logs(self, job_id: str, max_entries: int = 200) -> str:
    """Get logs for a Vertex AI job from Cloud Logging.

    Args:
        job_id: The job resource name.
        max_entries: Maximum number of log entries to fetch.

    Returns:
        String containing the logs.
    """
    try:
        from google.cloud import logging as cloud_logging

        client = cloud_logging.Client(project=self.project_id)
        job_name = job_id.split("/")[-1]

        # Vertex AI custom jobs log under ml_job resource type
        filter_str = f'resource.type="ml_job" ' f'resource.labels.job_id="{job_name}"'

        entries = list(
            client.list_entries(
                filter_=filter_str,
                order_by=cloud_logging.DESCENDING,
                max_results=max_entries,
            ),
        )

        logs = []
        for entry in reversed(entries):
            timestamp = entry.timestamp.strftime("%H:%M:%S") if entry.timestamp else ""
            severity = getattr(entry, "severity", "INFO")
            payload = entry.payload if entry.payload else str(entry)
            logs.append(f"[{timestamp}] [{severity}] {payload}")

        return "\n".join(logs) if logs else "No logs found yet."

    except ImportError:
        return (
            "google-cloud-logging is required for log retrieval. " "Install with: pip install google-cloud-logging"
        )
    except Exception as e:
        return f"Failed to fetch logs: {e}"

get_run_status(job_id: str) -> ExecutionStatus

Get status of a Vertex AI job.

Parameters:

Name Type Description Default
job_id str

The job resource name.

required

Returns:

Type Description
ExecutionStatus

ExecutionStatus enum value.

Source code in flowyml/stacks/gcp.py
def get_run_status(self, job_id: str) -> "ExecutionStatus":
    """Get status of a Vertex AI job.

    Args:
        job_id: The job resource name.

    Returns:
        ExecutionStatus enum value.
    """
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        state = job.state.name

        # Map Vertex AI states to ExecutionStatus
        status_map = {
            "JOB_STATE_QUEUED": ExecutionStatus.PROVISIONING,
            "JOB_STATE_PENDING": ExecutionStatus.PROVISIONING,
            "JOB_STATE_RUNNING": ExecutionStatus.RUNNING,
            "JOB_STATE_SUCCEEDED": ExecutionStatus.COMPLETED,
            "JOB_STATE_FAILED": ExecutionStatus.FAILED,
            "JOB_STATE_CANCELLING": ExecutionStatus.STOPPING,
            "JOB_STATE_CANCELLED": ExecutionStatus.CANCELLED,
            "JOB_STATE_PAUSED": ExecutionStatus.RUNNING,
            "JOB_STATE_EXPIRED": ExecutionStatus.FAILED,
            "JOB_STATE_UPDATING": ExecutionStatus.RUNNING,
        }
        return status_map.get(state, ExecutionStatus.RUNNING)
    except Exception as e:
        print(f"Error fetching job status: {e}")
        return ExecutionStatus.FAILED

run_pipeline(pipeline: Any, run_id: str, resources: ResourceConfig | None = None, docker_config: DockerConfig | None = None, inputs: dict[str, Any] | None = None, context: dict[str, Any] | None = None, **kwargs: Any) -> SubmissionResult

Run pipeline on Vertex AI with per-group orchestration.

This is the core ZenML-parity feature: run a local command, and FlowyML automatically: 1. Analyzes the pipeline DAG into execution groups 2. Builds a Docker image for linux/amd64 (if needed) 3. Pushes it to Artifact Registry 4. Submits each group as a separate Vertex AI CustomJob 5. Each container runs flowyml step-runner --steps ... 6. Waits for each group before submitting the next 7. Streams logs back to the local console

Parameters:

Name Type Description Default
pipeline Any

Pipeline to run

required
run_id str

Run identifier

required
resources ResourceConfig | None

Default resource configuration (overridden per-group)

None
docker_config DockerConfig | None

Docker configuration (image, env vars)

None
inputs dict[str, Any] | None

Input data

None
context dict[str, Any] | None

Context variables

None
**kwargs Any

Additional arguments

{}

Returns:

Type Description
SubmissionResult

SubmissionResult with job metadata for all groups

Source code in flowyml/stacks/gcp.py
def run_pipeline(
    self,
    pipeline: Any,
    run_id: str,
    resources: ResourceConfig | None = None,
    docker_config: DockerConfig | None = None,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    **kwargs: Any,
) -> "SubmissionResult":
    """Run pipeline on Vertex AI with per-group orchestration.

    This is the core ZenML-parity feature: run a local command, and
    FlowyML automatically:
      1. Analyzes the pipeline DAG into execution groups
      2. Builds a Docker image for linux/amd64 (if needed)
      3. Pushes it to Artifact Registry
      4. Submits each group as a separate Vertex AI CustomJob
      5. Each container runs ``flowyml step-runner --steps ...``
      6. Waits for each group before submitting the next
      7. Streams logs back to the local console

    Args:
        pipeline: Pipeline to run
        run_id: Run identifier
        resources: Default resource configuration (overridden per-group)
        docker_config: Docker configuration (image, env vars)
        inputs: Input data
        context: Context variables
        **kwargs: Additional arguments

    Returns:
        SubmissionResult with job metadata for all groups
    """
    from google.cloud import aiplatform
    from flowyml.core.step_grouping import get_execution_units, StepGroup
    import logging
    import os

    logger = logging.getLogger("flowyml.orchestrator.vertex_ai")

    # Initialize Vertex AI
    aiplatform.init(
        project=self.project_id,
        location=self.region,
        staging_bucket=self.staging_bucket,
    )

    # ── Analyze execution groups ────────────────────────────────
    if not pipeline._built:
        pipeline.build()

    execution_units = get_execution_units(pipeline.dag, pipeline.steps)

    # Classify units
    groups = [u for u in execution_units if isinstance(u, StepGroup)]
    has_groups = len(groups) > 0

    logger.info(
        "Pipeline '%s': %d execution units (%d groups, %d standalone steps)",
        pipeline.name,
        len(execution_units),
        len(groups),
        len(execution_units) - len(groups),
    )

    # ── Resolve pipeline_module for step runner ─────────────────
    pipeline_module = kwargs.get("pipeline_module", "")
    if not pipeline_module:
        import yaml

        # Try to read raw config from flowyml.yaml
        config_path = os.environ.get("FLOWYML_CONFIG", "flowyml.yaml")
        try:
            if os.path.exists(config_path):
                with open(config_path) as f:
                    raw_config = yaml.safe_load(f)
                pipeline_module = raw_config.get("pipeline_module", "")
        except Exception as e:
            logger.warning("Could not read pipeline_module from config: %s", e)

    # ── Auto-build & push Docker image ──────────────────────────
    image_uri = ""
    if docker_config and docker_config.image and "/" in docker_config.image:
        # Pre-built image URI provided β€” use as-is
        image_uri = docker_config.image
        logger.info("Using pre-built image: %s", image_uri)
    elif has_groups and pipeline_module:
        # Auto-build & push
        try:
            image_uri = self._auto_build_and_push(docker_config)
        except Exception as e:
            logger.error("Auto-build failed: %s", e)
            raise RuntimeError(
                f"Docker auto-build/push failed: {e}. "
                f"You can manually build and push, then set docker.image "
                f"in flowyml.yaml to the full registry URI.",
            ) from e
    elif docker_config and docker_config.image:
        image_uri = docker_config.image

    # ── Single-job fallback (no groups or no step runner) ───────
    if not has_groups or not pipeline_module:
        job_name = f"{pipeline.name}-{run_id[:8]}"
        logger.info("Submitting single CustomJob: %s", job_name)

        single_docker = DockerConfig(
            image=image_uri or (docker_config.image if docker_config else ""),
            env_vars=docker_config.env_vars if docker_config else {},
        )

        job = self._submit_single_job(
            display_name=job_name,
            docker_config=single_docker,
            resources=resources,
            command=docker_config.command if docker_config and docker_config.command else None,
            args=docker_config.args if docker_config and docker_config.args else None,
        )

        job_id = job.resource_name

        def wait_single():
            self._wait_for_job(job)

        return SubmissionResult(
            job_id=job_id,
            wait_for_completion=wait_single,
            metadata={
                "platform": "vertex_ai",
                "project": self.project_id,
                "region": self.region,
                "job_name": job_name,
                "mode": "single_job",
            },
        )

    # ── KFP Vertex AI Pipelines Orchestration (ZenML-parity) ────
    # ZenML parity requires submitting the DAG via Kubeflow Pipelines (KFP)
    # so the entire topological graph is visualized natively in GCP Console.
    try:
        import importlib.util

        has_kfp = importlib.util.find_spec("kfp") is not None
    except (ImportError, ModuleNotFoundError):
        has_kfp = False
    if not has_kfp:
        raise ImportError(
            "kfp is required for Vertex AI Pipelines orchestration. "
            "Install with: pip install kfp kfp-server-api",
        )
    from kfp import dsl
    from kfp import compiler

    logger.info(
        "πŸš€ Compiling KFP Pipeline for %d execution units β†’ Vertex AI Pipelines",
        len(execution_units),
    )
    logger.info("   Pipeline module: %s", pipeline_module)

    artifact_bucket = self.staging_bucket or f"gs://flowyml-{self.project_id}-artifacts"

    # Resolve Transparent Telemetry URL (Ngrok Tunneling)
    try:
        from flowyml.ui.utils import get_ui_server_url, get_ui_host_port

        server_url = os.getenv("FLOWYML_SERVER_URL")
        if not server_url:
            server_url = get_ui_server_url()
            # If it's local, auto-tunnel it!
            if server_url and ("localhost" in server_url or "127.0.0.1" in server_url):
                try:
                    from flowyml.ui.tunnel import start_tunnel

                    host, port = get_ui_host_port()
                    tunnel_url = start_tunnel(port)
                    if tunnel_url:
                        server_url = tunnel_url
                        logger.info(
                            f"🌐 Transparent telemetry active! Remote pipelines will stream to: {tunnel_url}",
                        )
                except Exception as e:
                    logger.warning(f"Auto-tunneling failed: {e}")
    except Exception:
        server_url = os.getenv("FLOWYML_SERVER_URL", "")

    # 1. Define KFP Container Component for Step Runner
    @dsl.container_component
    def flowyml_step_group():
        return dsl.ContainerSpec(
            image=image_uri,
            command=["flowyml", "step-runner"],
        )

    # 2. Build Pipeline DAG
    @dsl.pipeline(name=pipeline.name.replace("_", "-"), description="FlowyML Orchestrated Pipeline")
    def flowyml_pipeline():
        tasks = {}
        for unit in execution_units:
            if isinstance(unit, StepGroup):
                group = unit
                step_names = [s.name for s in group.steps]
                group_name = group.group_name

                # Derive resources
                group_resources = self._resource_from_group(group)
            else:
                group = None
                step_names = [unit.name]
                group_name = unit.name
                group_resources = resources or ResourceConfig(cpu="2", memory="4Gi")

            # Create KFP Task
            task = flowyml_step_group()

            # Set Env Vars
            task.set_env_variable("FLOWYML_PIPELINE_MODULE", pipeline_module)
            task.set_env_variable("FLOWYML_STEP_NAMES", ",".join(step_names))
            task.set_env_variable("FLOWYML_RUN_ID", run_id)
            task.set_env_variable("FLOWYML_EXECUTION_GROUP", group_name)
            task.set_env_variable("FLOWYML_PIPELINE_NAME", pipeline.name)
            task.set_env_variable("PYTHONUNBUFFERED", "1")
            task.set_env_variable("FLOWYML_ARTIFACT_DIR", "/tmp/flowyml_artifacts")  # noqa: S108
            task.set_env_variable("FLOWYML_STAGING_BUCKET", artifact_bucket)
            task.set_env_variable("FLOWYML_CONFIG", "flowyml.yaml")

            # Inject active stack so remote container uses gcp-prod, not local
            active_stack = os.environ.get("FLOWYML_STACK", "gcp-prod")
            task.set_env_variable("FLOWYML_STACK", active_stack)
            # Platform identifier for log formatting
            task.set_env_variable("FLOWYML_PLATFORM", "gcp")

            # Telemetry connection for remote UI monitoring
            if server_url:
                task.set_env_variable("FLOWYML_SERVER_URL", server_url)

            # Assign custom resources and names
            task.set_display_name(group_name)
            if group_resources:
                if group_resources.cpu:
                    task.set_cpu_limit(str(group_resources.cpu))
                if group_resources.memory:
                    task.set_memory_limit(group_resources.memory)

            tasks[group_name] = task

            # Determine topological dependencies using FlowyML DAG
            # Look for dependencies produced by previous execution units
            for s_name in step_names:
                deps = pipeline.dag.get_dependencies(s_name)
                for dep in deps:
                    dep_group_name = None
                    for prev_unit in execution_units:
                        if prev_unit == unit:
                            break
                        prev_names = (
                            [s.name for s in prev_unit.steps]
                            if isinstance(prev_unit, StepGroup)
                            else [prev_unit.name]
                        )
                        if dep in prev_names:
                            dep_group_name = (
                                isinstance(prev_unit, StepGroup) and prev_unit.group_name or prev_unit.name
                            )
                            break

                    if dep_group_name and dep_group_name in tasks:
                        task.after(tasks[dep_group_name])

    # 3. Compile Pipeline YAML
    pipeline_yaml = f".flowyml_pipeline_{run_id[:8]}.yaml"
    compiler.Compiler().compile(
        pipeline_func=flowyml_pipeline,
        package_path=pipeline_yaml,
    )

    logger.info("βœ… Pipeline compiled to %s", pipeline_yaml)

    # 4. Submit Vertex AI PipelineJob
    job_display_name = f"{pipeline.name}-{run_id[:8]}"
    pipeline_job = aiplatform.PipelineJob(
        display_name=job_display_name,
        template_path=pipeline_yaml,
        job_id=job_display_name.replace("_", "-").lower(),
        project=self.project_id,
        location=self.region,
        enable_caching=False,
    )

    logger.info("πŸ“€ Submitting PipelineJob to Vertex AI...")

    submit_kwargs = {}
    if self.service_account:
        submit_kwargs["service_account"] = self.service_account
    if self.network:
        submit_kwargs["network"] = self.network

    pipeline_job.submit(**submit_kwargs)

    dashboard_uri = ""
    try:
        dashboard_uri = pipeline_job._dashboard_uri()
        logger.info("──────────────────────────────────────────────────")
        logger.info("✨ Vertex AI Pipelines DAG successfully submitted!")
        logger.info("πŸ”— View Live Execution Graph: %s", dashboard_uri)
        logger.info("──────────────────────────────────────────────────")
    except Exception:
        logger.info("βœ… Pipeline submitted successfully!")

    # Clean up temporary YAML
    import contextlib
    from pathlib import Path

    with contextlib.suppress(Exception):
        Path(pipeline_yaml).unlink(missing_ok=True)

    def wait_pipeline():
        # Simply poll until complete, but don't stream logs because KFP
        # provides a beautiful UI for that natively.
        pipeline_job.wait()

    return SubmissionResult(
        job_id=pipeline_job.resource_name,
        wait_for_completion=wait_pipeline,
        metadata={
            "platform": "vertex_ai",
            "project": self.project_id,
            "region": self.region,
            "mode": "kfp_pipeline",
            "job_name": job_display_name,
            "dashboard_uri": dashboard_uri,
        },
    )

stop_run(job_id: str, graceful: bool = True) -> None

Cancel a Vertex AI job.

Parameters:

Name Type Description Default
job_id str

The job resource name.

required
graceful bool

Whether to wait for graceful shutdown.

True
Source code in flowyml/stacks/gcp.py
def stop_run(self, job_id: str, graceful: bool = True) -> None:
    """Cancel a Vertex AI job.

    Args:
        job_id: The job resource name.
        graceful: Whether to wait for graceful shutdown.
    """
    from google.cloud import aiplatform

    try:
        job = aiplatform.CustomJob(job_id)
        job.cancel()
        print(f"Cancellation requested for job: {job_id}")
    except Exception as e:
        print(f"Error cancelling job {job_id}: {e}")
        raise

stream_logs(job_id: str, poll_interval: float = 5.0)

Stream logs from a Vertex AI job in real-time.

Yields log lines as they become available, suitable for driving the FlowyML GUI's live log viewer.

Parameters:

Name Type Description Default
job_id str

The job resource name.

required
poll_interval float

Seconds between polling for new log entries.

5.0

Yields:

Type Description

dict with keys: timestamp, severity, message, is_finished

Source code in flowyml/stacks/gcp.py
def stream_logs(self, job_id: str, poll_interval: float = 5.0):
    """Stream logs from a Vertex AI job in real-time.

    Yields log lines as they become available, suitable for
    driving the FlowyML GUI's live log viewer.

    Args:
        job_id: The job resource name.
        poll_interval: Seconds between polling for new log entries.

    Yields:
        dict with keys: timestamp, severity, message, is_finished
    """
    import time

    seen_entries = set()
    job_finished = False

    while not job_finished:
        # Check job status
        status = self.get_run_status(job_id)
        job_finished = status in (
            ExecutionStatus.COMPLETED,
            ExecutionStatus.FAILED,
            ExecutionStatus.CANCELLED,
        )

        # Fetch recent logs
        try:
            from google.cloud import logging as cloud_logging

            client = cloud_logging.Client(project=self.project_id)
            job_name = job_id.split("/")[-1]

            filter_str = f'resource.type="ml_job" ' f'resource.labels.job_id="{job_name}"'

            entries = list(
                client.list_entries(
                    filter_=filter_str,
                    order_by=cloud_logging.ASCENDING,
                    max_results=500,
                ),
            )

            for entry in entries:
                entry_id = getattr(entry, "insert_id", id(entry))
                if entry_id not in seen_entries:
                    seen_entries.add(entry_id)
                    yield {
                        "timestamp": entry.timestamp.isoformat() if entry.timestamp else None,
                        "severity": getattr(entry, "severity", "INFO"),
                        "message": str(entry.payload) if entry.payload else str(entry),
                        "is_finished": False,
                    }

        except Exception as e:
            yield {
                "timestamp": None,
                "severity": "ERROR",
                "message": f"Log polling error: {e}",
                "is_finished": False,
            }

        if not job_finished:
            time.sleep(poll_interval)

    # Final status yield
    yield {
        "timestamp": None,
        "severity": "INFO",
        "message": f"Job completed with status: {status.name}",
        "is_finished": True,
    }

to_dict() -> dict[str, Any]

Convert to dictionary.

Source code in flowyml/stacks/gcp.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "name": self.name,
        "type": "vertex_ai",
        "project_id": self.project_id,
        "region": self.region,
        "service_account": self.service_account,
        "network": self.network,
        "console_url": f"https://console.cloud.google.com/vertex-ai/training?project={self.project_id}",
    }

validate() -> bool

Validate Vertex AI configuration.

Source code in flowyml/stacks/gcp.py
def validate(self) -> bool:
    """Validate Vertex AI configuration."""
    if not self.project_id:
        raise ValueError("project_id is required for VertexAIOrchestrator")

    # Check if google-cloud-aiplatform is installed
    import importlib.util

    if importlib.util.find_spec("google.cloud.aiplatform") is not None:
        return True
    raise ImportError(
        "google-cloud-aiplatform is required for VertexAIOrchestrator. "
        "Install with: pip install google-cloud-aiplatform",
    )

See Also