Creating Custom Stack Components for flowyml
This guide shows you how to create custom stack components and integrate them seamlessly into flowyml.
Quick Start
1. Create a Custom Component
# my_components.py
from flowyml.stacks.components import Orchestrator
from flowyml.stacks.plugins import register_component
@register_component
class MyOrchestrator(Orchestrator):
"""My custom orchestrator."""
def __init__(self, name: str = "my_orchestrator", **kwargs):
super().__init__(name)
# Your initialization
def validate(self) -> bool:
# Validation logic
return True
def run_pipeline(self, pipeline, **kwargs):
# Execution logic
pass
def get_run_status(self, run_id: str) -> str:
# Status checking
return "SUCCESS"
def to_dict(self):
return {"name": self.name, "type": "my_orchestrator"}
2. Use in Configuration
# flowyml.yaml
stacks:
my_stack:
type: local
orchestrator:
type: my_orchestrator
# Your custom config
3. Load Component
# Auto-loads from my_components.py if in PYTHONPATH
flowyml run pipeline.py --stack my_stack
# Or explicitly load
flowyml component load my_components
Creating Components
Base Classes
flowyml provides these base classes:
from flowyml.stacks.components import (
Orchestrator, # For pipeline orchestration
ArtifactStore, # For artifact storage
ContainerRegistry, # For Docker images
StackComponent, # Generic component
)
Orchestrator Example
from flowyml.stacks.components import Orchestrator
from flowyml.stacks.plugins import register_component
@register_component
class AirflowOrchestrator(Orchestrator):
def __init__(self, name="airflow", dag_folder="~/airflow/dags"):
super().__init__(name)
self.dag_folder = dag_folder
def validate(self) -> bool:
try:
import airflow
return True
except ImportError:
raise ImportError("pip install apache-airflow")
def run_pipeline(self, pipeline, **kwargs):
# Convert to Airflow DAG
from airflow import DAG
# ... implementation
return "dag_run_id"
def get_run_status(self, run_id: str) -> str:
# Check Airflow DAG run status
return "RUNNING"
def to_dict(self):
return {
"type": "airflow",
"dag_folder": self.dag_folder
}
Artifact Store Example
from flowyml.stacks.components import ArtifactStore
from flowyml.stacks.plugins import register_component
@register_component
class MinIOArtifactStore(ArtifactStore):
def __init__(
self,
name="minio",
endpoint="localhost:9000",
bucket="flowyml"
):
super().__init__(name)
self.endpoint = endpoint
self.bucket = bucket
def validate(self) -> bool:
from minio import Minio
client = Minio(self.endpoint)
return client.bucket_exists(self.bucket)
def save(self, artifact, path: str) -> str:
# Save to MinIO
return f"s3://{self.bucket}/{path}"
def load(self, path: str):
# Load from MinIO
pass
def exists(self, path: str) -> bool:
# Check existence
return True
def to_dict(self):
return {
"type": "minio",
"endpoint": self.endpoint,
"bucket": self.bucket
}
Registration Methods
Method 1: Decorator (Recommended)
from flowyml.stacks.plugins import register_component
@register_component
class MyComponent(Orchestrator):
pass
# Or with custom name
@register_component(name="custom_name")
class MyComponent(Orchestrator):
pass
Method 2: Manual Registration
from flowyml.stacks.plugins import get_component_registry
class MyComponent(Orchestrator):
pass
registry = get_component_registry()
registry.register(MyComponent, "my_component")
Method 3: Entry Points (for packages)
In your pyproject.toml:
[project.entry-points."flowyml.stack_components"]
my_orchestrator = "my_package.components:MyOrchestrator"
my_artifact_store = "my_package.components:MyArtifactStore"
Then components auto-load when package is installed!
Loading Components
From Module
from flowyml.stacks.plugins import load_component
# Load all components from module
load_component("my_package.components")
From File
From Configuration
# flowyml.yaml
components:
- module: my_package.components
- file: /path/to/custom.py:CustomComponent
Using ZenML Components
Wrap ZenML Component
from flowyml.stacks.plugins import get_component_registry
# Load ZenML component
registry = get_component_registry()
registry.wrap_zenml_component(
zenml_component_class=KubernetesOrchestrator,
name="k8s"
)
Via Configuration
# flowyml.yaml
components:
- zenml: zenml.orchestrators.kubernetes.KubernetesOrchestrator
name: k8s
stacks:
k8s_stack:
orchestrator:
type: k8s
# ZenML config
Using the CLI
# Load ZenML component
flowyml component load zenml:zenml.orchestrators.kubernetes.KubernetesOrchestrator
# List available components
flowyml component list
Component Discovery
flowyml automatically discovers components from:
- Installed packages with entry points
PYTHONPATH- any module in path- Current directory -
./components/folder - Configuration -
flowyml.yamlcomponents section
Publishing Components
As Python Package
# setup.py or pyproject.toml
[project.entry-points."flowyml.stack_components"]
my_orchestrator = "my_flowyml_plugin:MyOrchestrator"
As Module
# Add to PYTHONPATH
export PYTHONPATH=/path/to/components:$PYTHONPATH
# Or install in development mode
pip install -e /path/to/my-components
Community Components
Using Existing Components
# Install from PyPI
pip install flowyml-airflow-orchestrator
pip install flowyml-minio-store
# Automatically available!
Creating Shareable Components
my-flowyml-components/
├── pyproject.toml
├── README.md
└── my_flowyml_components/
├── __init__.py
├── airflow_orchestrator.py
└── minio_store.py
# pyproject.toml
[project]
name = "my-flowyml-components"
version = "0.1.0"
[project.entry-points."flowyml.stack_components"]
airflow = "my_flowyml_components.airflow_orchestrator:AirflowOrchestrator"
minio = "my_flowyml_components.minio_store:MinIOArtifactStore"
Advanced: ZenML Integration
Complete ZenML Compatibility
from zenml.stack import Stack as ZenMLStack
from flowyml.stacks.plugins import get_component_registry
# Import all ZenML components
def import_zenml_stack(zenml_stack: ZenMLStack):
registry = get_component_registry()
# Wrap each component
registry.wrap_zenml_component(
zenml_stack.orchestrator,
"zenml_orchestrator"
)
registry.wrap_zenml_component(
zenml_stack.artifact_store,
"zenml_artifact_store"
)
# Now use in flowyml!
Gradual Migration from ZenML
# Keep using ZenML components
from zenml.integrations.kubernetes import KubernetesOrchestrator
# Use in flowyml
from flowyml.stacks import Stack
from flowyml.stacks.plugins import get_component_registry
registry = get_component_registry()
registry.wrap_zenml_component(KubernetesOrchestrator, "k8s")
# Create flowyml stack with ZenML component
stack = Stack(
name="hybrid",
orchestrator=registry.get_orchestrator("k8s"),
# ... other components
)
Examples
See:
- examples/custom_components/my_components.py - Complete examples
- examples/custom_components/airflow_integration.py - Airflow integration
- examples/custom_components/minio_integration.py - MinIO integration
Best Practices
- ✅ Use
@register_componentdecorator - ✅ Implement all required methods
- ✅ Add comprehensive validation
- ✅ Include good documentation
- ✅ Provide configuration examples
- ✅ Test thoroughly
- ✅ Publish as package for reuse
Testing Custom Components
import unittest
from my_components import MyOrchestrator
class TestMyOrchestrator(unittest.TestCase):
def test_validation(self):
orch = MyOrchestrator()
self.assertTrue(orch.validate())
def test_run_pipeline(self):
# Test pipeline execution
pass
Next Steps
- Create your custom component
- Test locally
- Publish to PyPI
- Share with community!