๐ Using External Frameworks (ZenML, Airflow, etc.)
FlowyML's plugin system allows you to use components from any ML framework without modification. This guide shows practical examples.
Supported Frameworks
| Framework | Components Available | Installation |
|---|---|---|
| ZenML | Orchestrators, Artifact Stores, Model Registries, Experiment Trackers | pip install zenml |
| Airflow | Operators, Sensors (coming soon) | pip install apache-airflow |
| MLflow | Tracking, Model Registry | pip install mlflow |
| Custom | Any Python class | Your package |
ZenML Integration
Quick Start with ZenML
1. Install ZenML and integrations:
2. Load ZenML components:
Via CLI:
flowyml component load zenml:zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator --name k8s
Via Python:
from flowyml.stacks.plugins import load_component
load_component(
"zenml:zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator",
name="k8s"
)
3. Use in your pipeline:
from flowyml import Pipeline, step
from flowyml.stacks.plugins import get_component_registry
registry = get_component_registry()
k8s_orch = registry.get_orchestrator("k8s")
@step(resources={"cpu": "4", "memory": "16Gi"})
def train():
# This runs on Kubernetes via ZenML!
return train_model()
pipeline = Pipeline("k8s_pipeline")
pipeline.stack.orchestrator = k8s_orch()
result = pipeline.run()
Example 1: Kubernetes Orchestration with ZenML
from flowyml.stacks.plugins import load_component
from flowyml import Pipeline, step
# Load ZenML's Kubernetes orchestrator
load_component(
"zenml:zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator",
name="k8s"
)
@step(resources={"cpu": "8", "memory": "32Gi", "gpu": "1"})
def train_on_k8s():
"""This step runs on Kubernetes cluster"""
model = train_large_model()
return model
pipeline = Pipeline("production_training")
pipeline.add_step(train_on_k8s)
# Runs on Kubernetes!
pipeline.run(stack="kubernetes")
Example 2: MLflow Experiment Tracking
from flowyml.stacks.plugins import load_component
# Load MLflow tracker from ZenML
load_component(
"zenml:zenml.integrations.mlflow.experiment_trackers.MLflowExperimentTracker",
name="mlflow"
)
@step
def train_with_tracking():
import mlflow
with mlflow.start_run():
# Log parameters
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 32)
# Training
model, metrics = train_model()
# Log metrics
mlflow.log_metric("accuracy", metrics["accuracy"])
mlflow.log_metric("f1_score", metrics["f1"])
# Log model
mlflow.sklearn.log_model(model, "model")
return model
Example 3: Cloud Artifact Storage (S3, GCS, Azure)
from flowyml.stacks.plugins import load_component
# Load S3 artifact store from ZenML
load_component(
"zenml:zenml.integrations.s3.artifact_stores.S3ArtifactStore",
name="s3_store"
)
# Or GCS
load_component(
"zenml:zenml.integrations.gcp.artifact_stores.GCPArtifactStore",
name="gcs_store"
)
# Or Azure
load_component(
"zenml:zenml.integrations.azure.artifact_stores.AzureBlobArtifactStore",
name="azure_store"
)
Configure in flowyml.yaml:
stacks:
production:
orchestrator: k8s
artifact_store:
plugin: s3_store
config:
bucket: my-ml-artifacts
region: us-west-2
Example 4: Vertex AI on GCP
from flowyml.stacks.plugins import load_component
# Load Vertex AI orchestrator
load_component(
"zenml:zenml.integrations.gcp.orchestrators.VertexOrchestrator",
name="vertex"
)
# Load GCS artifact store
load_component(
"zenml:zenml.integrations.gcp.artifact_stores.GCPArtifactStore",
name="gcs"
)
# flowyml.yaml
stacks:
gcp_production:
orchestrator:
plugin: vertex
config:
project: my-gcp-project
location: us-central1
artifact_store:
plugin: gcs
config:
bucket: gs://my-ml-bucket
Migrating Existing ZenML Stacks
If you already use ZenML, you can import your stacks directly:
Step 1: List ZenML Stacks
Output:
โโโโโโโโโโโโโโโโณโโโโโโโโโโโโโโโณโโโโโโโโโโโโโโโ
โ Name โ Orchestrator โ Artifact Store โ
โกโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฉ
โ default โ local โ local โ
โ production โ kubernetes โ s3 โ
โ staging โ kubeflow โ gcs โ
โโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ
Step 2: Import Stack to FlowyML
This generates flowyml.yaml:
# Auto-generated from ZenML stack 'production'
plugins:
- name: kubernetes_orchestrator
source: zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator
component_type: orchestrator
- name: s3_artifact_store
source: zenml.integrations.s3.artifact_stores.S3ArtifactStore
component_type: artifact_store
stacks:
production:
orchestrator: kubernetes_orchestrator
artifact_store: s3_artifact_store
Step 3: Use Imported Stack
Available ZenML Integrations
Orchestrators
- Kubernetes -
zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator - Kubeflow -
zenml.integrations.kubeflow.orchestrators.KubeflowOrchestrator - Vertex AI -
zenml.integrations.gcp.orchestrators.VertexOrchestrator - SageMaker -
zenml.integrations.aws.orchestrators.SagemakerOrchestrator - Airflow -
zenml.integrations.airflow.orchestrators.AirflowOrchestrator - Azure ML -
zenml.integrations.azure.orchestrators.AzureMLOrchestrator
Artifact Stores
- S3 -
zenml.integrations.s3.artifact_stores.S3ArtifactStore - GCS -
zenml.integrations.gcp.artifact_stores.GCPArtifactStore - Azure Blob -
zenml.integrations.azure.artifact_stores.AzureBlobArtifactStore - MinIO -
zenml.integrations.minio.artifact_stores.MinIOArtifactStore
Experiment Trackers
- MLflow -
zenml.integrations.mlflow.experiment_trackers.MLflowExperimentTracker - Weights & Biases -
zenml.integrations.wandb.experiment_trackers.WandbExperimentTracker - Neptune -
zenml.integrations.neptune.experiment_trackers.NeptuneExperimentTracker
Model Registries
- MLflow -
zenml.integrations.mlflow.model_registries.MLflowModelRegistry - Vertex AI -
zenml.integrations.gcp.model_registries.VertexModelRegistry
Advanced: Custom Bridge Rules
For fine control over component adaptation:
from flowyml.stacks.bridge import GenericBridge, AdaptationRule
from flowyml.stacks.components import ComponentType
# Define custom adaptation rules
rules = [
AdaptationRule(
source_type="zenml.orchestrators.custom.MyOrchestrator",
target_type=ComponentType.ORCHESTRATOR,
method_mapping={
"run_pipeline": "execute",
"get_orchestrator_run_id": "get_run_id"
},
attribute_mapping={
"config": "settings"
}
)
]
# Register custom bridge
from flowyml.stacks.plugins import get_component_registry
registry = get_component_registry()
registry.register_bridge("custom", GenericBridge(rules=rules))
# Use custom bridge
load_component("custom:path.to.MyOrchestrator")
Comparison: Pure ZenML vs FlowyML + ZenML
| Feature | Pure ZenML | FlowyML + ZenML Plugin |
|---|---|---|
| UI | Basic CLI | โ Modern Web UI |
| Orchestrators | Many | โ Same + FlowyML's |
| Project Management | Basic | โ Rich structure |
| Metrics Dashboard | External | โ Built-in |
| Pipeline Syntax | Complex | โ Simpler |
| Ecosystem | ZenML only | โ ZenML + Airflow + more |
Troubleshooting
Component Not Loading
# Check ZenML integration
zenml integration list
# Install if needed
zenml integration install kubernetes
# Verify import
python -c "from zenml.integrations.kubernetes.orchestrators import KubernetesOrchestrator"
Missing Dependencies
Best Practices
-
Pin Versions - Specify ZenML version in requirements
-
Use YAML Config - Define stacks in
flowyml.yamlfor reproducibility -
Test Locally - Verify components before cloud deployment
-
Secure Credentials - Use environment variables for secrets