Skip to content

🎯 Plugin System: Practical Examples

This guide provides copy-paste ready examples for common plugin use cases.

Example 1: Run on Kubernetes in 5 Minutes

# install.sh
pip install flowyml zenml
zenml integration install kubernetes

# pipeline.py
from flowyml import Pipeline, step
from flowyml.stacks.plugins import load_component

# Load Kubernetes orchestrator from ZenML
load_component(
    "zenml:zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator",
    name="k8s"
)

@step(resources={"cpu": "4", "memory": "16Gi"})
def train_model():
    """Runs on Kubernetes!"""
    import time
    print("Training on K8s...")
    time.sleep(5)
    return {"accuracy": 0.95}

# Create pipeline
pipeline = Pipeline("k8s_example")
pipeline.add_step(train_model)

# Run on Kubernetes
result = pipeline.run(stack="kubernetes")
print(f"Result: {result}")
# flowyml.yaml
stacks:
  kubernetes:
    orchestrator:
      plugin: k8s
      config:
        kubernetes_context: my-cluster
        kubernetes_namespace: ml-pipelines

Run it:

python pipeline.py

Example 2: MLflow + S3 Production Stack

# setup.sh
pip install flowyml zenml mlflow boto3
zenml integration install mlflow s3

# production_pipeline.py
from flowyml import Pipeline, step
from flowyml.stacks.plugins import load_component

# Load components
load_component(
    "zenml:zenml.integrations.mlflow.experiment_trackers.MLflowExperimentTracker",
    name="mlflow"
)
load_component(
    "zenml:zenml.integrations.s3.artifact_stores.S3ArtifactStore",
    name="s3"
)

@step
def prepare_data():
    import pandas as pd
    df = pd.DataFrame({"feature": [1, 2, 3], "label": [0, 1, 0]})
    return df

@step
def train_with_mlflow(data):
    import mlflow
    from sklearn.ensemble import RandomForestClassifier

    with mlflow.start_run():
        # Log parameters
        mlflow.log_param("n_estimators", 100)
        mlflow.log_param("max_depth", 5)

        # Train
        model = RandomForestClassifier(n_estimators=100, max_depth=5)
        X = data[["feature"]]
        y = data["label"]
        model.fit(X, y)

        # Log metrics
        accuracy = model.score(X,y)
        mlflow.log_metric("accuracy", accuracy)

        # Log model
        mlflow.sklearn.log_model(model, "model")

    return model

pipeline = Pipeline("production")
pipeline.add_step(prepare_data)
pipeline.add_step(train_with_mlflow)
# flowyml.yaml
stacks:
  production:
    artifact_store:
      plugin: s3
      config:
        bucket: my-ml-artifacts
        region: us-west-2

# Set AWS credentials
# export AWS_ACCESS_KEY_ID=xxx
# export AWS_SECRET_ACCESS_KEY=xxx

Example 3: Vertex AI on GCP

# gcp_pipeline.py
from flowyml import Pipeline, step
from flowyml.stacks.plugins import load_component

# Load GCP components
load_component(
    "zenml:zenml.integrations.gcp.orchestrators.VertexOrchestrator",
    name="vertex"
)
load_component(
    "zenml:zenml.integrations.gcp.artifact_stores.GCPArtifactStore",
    name="gcs"
)

@step(resources={"cpu": "8", "memory": "32Gi", "accelerator_type": "NVIDIA_TESLA_T4", "accelerator_count": 1})
def train_on_vertex():
    """Runs on Vertex AI with GPU"""
    import tensorflow as tf

    # Use GPU if available
    print(f"GPUs Available: {len(tf.config.list_physical_devices('GPU'))}")

    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    return model

pipeline = Pipeline("vertex_training")
pipeline.add_step(train_on_vertex)
result = pipeline.run(stack="gcp_production")
# flowyml.yaml
stacks:
  gcp_production:
    orchestrator:
      plugin: vertex
      config:
        project: my-gcp-project
        location: us-central1
        service_account: ml-pipeline@my-project.iam.gserviceaccount.com
    artifact_store:
      plugin: gcs
      config:
        bucket: gs://my-ml-bucket

Example 4: Hybrid Stack (Multiple Frameworks)

Combine components from different sources:

# hybrid_pipeline.py
from flowyml import Pipeline, step
from flowyml.stacks import Stack
from flowyml.stacks.plugins import get_component_registry, load_component
from flowyml.storage.artifacts import LocalArtifactStore
from flowyml.storage.metadata import SQLiteMetadataStore

# Load ZenML Kubernetes orchestrator
load_component(
    "zenml:zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator",
    name="k8s"
)

# Get components
registry = get_component_registry()
k8s_orch = registry.get_orchestrator("k8s")

# Create hybrid stack
hybrid_stack = Stack(
    name="hybrid",
    orchestrator=k8s_orch(),  # From ZenML
    artifact_store=LocalArtifactStore(),  # From FlowyML
    metadata_store=SQLiteMetadataStore()  # From FlowyML
)

@step
def process():
    return {"status": "processed"}

pipeline = Pipeline("hybrid_example", stack=hybrid_stack)
pipeline.add_step(process)
result = pipeline.run()

Example 5: Import Existing ZenML Stack

# Step 1: List your ZenML stacks
zenml stack list

# Step 2: Import to FlowyML
flowyml plugin import-zenml-stack production

# Step 3: Verify
cat flowyml.yaml

# Step 4: Run with imported stack
flowyml run my_pipeline.py --stack production

Example 6: Custom Component from Local File

# my_components.py
from flowyml.stacks.components import Orchestrator

class MyCustomOrchestrator(Orchestrator):
    def run_pipeline(self, pipeline, **kwargs):
        print(f"Running {pipeline.name} with custom orchestrator")
        # Your orchestration logic
        return pipeline.execute_locally()

# Load from file
from flowyml.stacks.plugins import load_component
load_component("/path/to/my_components.py:MyCustomOrchestrator")

Example 7: Load from PyPI Package

# If you published a flowyml plugin to PyPI

# Install
# pip install flowyml-my-plugin

# Use (auto-discovered via entry points!)
from flowyml.stacks.plugins import get_component_registry

registry = get_component_registry()
my_component = registry.get_orchestrator("my_custom_orch")

Example 8: Multi-Cloud Setup

# flowyml.yaml
stacks:
  aws_production:
    orchestrator:
      plugin: sagemaker
      config:
        role_arn: arn:aws:iam::123456789:role/SageMakerRole
    artifact_store:
      plugin: s3
      config:
        bucket: ml-artifacts-aws
        region: us-east-1

  gcp_production:
    orchestrator:
      plugin: vertex
      config:
        project: my-gcp-project
        location: us-central1
    artifact_store:
      plugin: gcs
      config:
        bucket: gs://ml-artifacts-gcp

  azure_production:
    orchestrator:
      plugin: azureml
      config:
        subscription_id: xxx
        resource_group: ml-resources
    artifact_store:
      plugin: azure_blob
      config:
        container: ml-artifacts
# Switch between clouds easily
pipeline.run(stack="aws_production")  # Run on AWS
pipeline.run(stack="gcp_production")  # Run on GCP
pipeline.run(stack="azure_production")  # Run on Azure

Common Patterns

Pattern 1: Development → Staging → Production

stacks:
  dev:
    orchestrator: local
    artifact_store:
      type: local
      path: .flowyml/artifacts

  staging:
    orchestrator:
      plugin: k8s
      config:
        kubernetes_namespace: ml-staging
    artifact_store:
      plugin: s3
      config:
        bucket: ml-staging-artifacts

  production:
    orchestrator:
      plugin: k8s
      config:
        kubernetes_namespace: ml-production
    artifact_store:
      plugin: s3
      config:
        bucket: ml-production-artifacts

Pattern 2: Experiment Tracking + Model Registry

from flowyml.stacks.plugins import load_component

# Load experiment tracker
load_component(
    "zenml:zenml.integrations.mlflow.experiment_trackers.MLflowExperimentTracker",
    name="mlflow_tracker"
)

# Load model registry
load_component(
    "zenml:zenml.integrations.mlflow.model_registries.MLflowModelRegistry",
    name="mlflow_registry"
)

@step
def train_and_register():
    import mlflow

    # Track experiment
    with mlflow.start_run() as run:
        model = train_model()
        mlflow.sklearn.log_model(model, "model")

        # Register model
        model_uri = f"runs:/{run.info.run_id}/model"
        mlflow.register_model(model_uri, "MyModel")

    return model

Quick Reference

# Plugin Management
flowyml plugin list                    # List installed plugins
flowyml plugin search kubernetes       # Search for plugins
flowyml plugin install zenml-k8s       # Install a plugin

# Component Management
flowyml component list                 # List all components
flowyml component load SOURCE         # Load a component

# Stack Management
flowyml stack list                    # List stacks
flowyml stack show STACK_NAME         # Show stack details
flowyml plugin import-zenml-stack NAME # Import ZenML stack

# Running Pipelines
flowyml run pipeline.py --stack production
flowyml run pipeline.py --stack kubernetes --resources gpu_training

Next Steps