Creating Custom Stack Components for flowyml
This guide shows you how to create custom stack components and integrate them seamlessly into flowyml.
Quick Start
1. Create a Custom Component
# my_components.py
from flowyml.stacks.components import Orchestrator
from flowyml.stacks.plugins import register_component
@register_component
class MyOrchestrator(Orchestrator):
"""My custom orchestrator."""
def __init__(self, name: str = "my_orchestrator", **kwargs):
super().__init__(name)
# Your initialization
def validate(self) -> bool:
# Validation logic
return True
def run_pipeline(self, pipeline, **kwargs):
# Execution logic
pass
def get_run_status(self, run_id: str) -> str:
# Status checking
return "SUCCESS"
def to_dict(self):
return {"name": self.name, "type": "my_orchestrator"}
2. Use in Configuration
# flowyml.yaml
stacks:
my_stack:
type: local
orchestrator:
type: my_orchestrator
# Your custom config
3. Load Component
# Auto-loads from my_components.py if in PYTHONPATH
flowyml run pipeline.py --stack my_stack
# Or explicitly load
flowyml component load my_components
Creating Components
Base Classes
flowyml provides these base classes:
from flowyml.stacks.components import (
Orchestrator, # For pipeline orchestration
ArtifactStore, # For artifact storage
ContainerRegistry, # For Docker images
StackComponent, # Generic component
)
Orchestrator Example
from flowyml.stacks.components import Orchestrator
from flowyml.stacks.plugins import register_component
@register_component
class AirflowOrchestrator(Orchestrator):
def __init__(self, name="airflow", dag_folder="~/airflow/dags"):
super().__init__(name)
self.dag_folder = dag_folder
def validate(self) -> bool:
try:
import airflow
return True
except ImportError:
raise ImportError("pip install apache-airflow")
def run_pipeline(self, pipeline, **kwargs):
# Convert to Airflow DAG
from airflow import DAG
# ... implementation
return "dag_run_id"
def get_run_status(self, run_id: str) -> str:
# Check Airflow DAG run status
return "RUNNING"
def to_dict(self):
return {
"type": "airflow",
"dag_folder": self.dag_folder
}
Artifact Store Example
from flowyml.stacks.components import ArtifactStore
from flowyml.stacks.plugins import register_component
@register_component
class MinIOArtifactStore(ArtifactStore):
def __init__(
self,
name="minio",
endpoint="localhost:9000",
bucket="flowyml"
):
super().__init__(name)
self.endpoint = endpoint
self.bucket = bucket
def validate(self) -> bool:
from minio import Minio
client = Minio(self.endpoint)
return client.bucket_exists(self.bucket)
def save(self, artifact, path: str) -> str:
# Save to MinIO
return f"s3://{self.bucket}/{path}"
def load(self, path: str):
# Load from MinIO
pass
def exists(self, path: str) -> bool:
# Check existence
return True
def to_dict(self):
return {
"type": "minio",
"endpoint": self.endpoint,
"bucket": self.bucket
}
Registration Methods
Method 1: Decorator (Recommended)
from flowyml.stacks.plugins import register_component
@register_component
class MyComponent(Orchestrator):
pass
# Or with custom name
@register_component(name="custom_name")
class MyComponent(Orchestrator):
pass
Method 2: Manual Registration
from flowyml.stacks.plugins import get_component_registry
class MyComponent(Orchestrator):
pass
registry = get_component_registry()
registry.register(MyComponent, "my_component")
Method 3: Entry Points (for packages)
In your pyproject.toml:
[project.entry-points."flowyml.stack_components"]
my_orchestrator = "my_package.components:MyOrchestrator"
my_artifact_store = "my_package.components:MyArtifactStore"
Then components auto-load when package is installed!
Loading Components
From Module
from flowyml.stacks.plugins import load_component
# Load all components from module
load_component("my_package.components")
From File
From Configuration
# flowyml.yaml
components:
- module: my_package.components
- file: /path/to/custom.py:CustomComponent
Component Discovery
flowyml automatically discovers components from:
- Installed packages with entry points
PYTHONPATH- any module in path- Current directory -
./components/folder - Configuration -
flowyml.yamlcomponents section
Publishing Components
As Python Package
# setup.py or pyproject.toml
[project.entry-points."flowyml.stack_components"]
my_orchestrator = "my_flowyml_plugin:MyOrchestrator"
As Module
# Add to PYTHONPATH
export PYTHONPATH=/path/to/components:$PYTHONPATH
# Or install in development mode
pip install -e /path/to/my-components
Community Components
Using Existing Components
# Install from PyPI
pip install flowyml-airflow-orchestrator
pip install flowyml-minio-store
# Automatically available!
Creating Shareable Components
my-flowyml-components/
├── pyproject.toml
├── README.md
└── my_flowyml_components/
├── __init__.py
├── airflow_orchestrator.py
└── minio_store.py
# pyproject.toml
[project]
name = "my-flowyml-components"
version = "0.1.0"
[project.entry-points."flowyml.stack_components"]
airflow = "my_flowyml_components.airflow_orchestrator:AirflowOrchestrator"
minio = "my_flowyml_components.minio_store:MinIOArtifactStore"
Examples
See:
- examples/custom_components/my_components.py - Complete examples
- examples/custom_components/airflow_integration.py - Airflow integration
- examples/custom_components/minio_integration.py - MinIO integration
Best Practices
- ✅ Use
@register_componentdecorator - ✅ Implement all required methods
- ✅ Add comprehensive validation
- ✅ Include good documentation
- ✅ Provide configuration examples
- ✅ Test thoroughly
- ✅ Publish as package for reuse
Testing Custom Components
import unittest
from my_components import MyOrchestrator
class TestMyOrchestrator(unittest.TestCase):
def test_validation(self):
orch = MyOrchestrator()
self.assertTrue(orch.validate())
def test_run_pipeline(self):
# Test pipeline execution
pass
Next Steps
- Create your custom component
- Test locally
- Publish to PyPI
- Share with community!