Creating Custom Stack Components for flowyml
This guide shows you how to create custom stack components and integrate them seamlessly into flowyml.
Quick Start
1. Create a Custom Component
| # my_components.py
from flowyml.stacks.components import Orchestrator
from flowyml.stacks.plugins import register_component
@register_component
class MyOrchestrator(Orchestrator):
"""My custom orchestrator."""
def __init__(self, name: str = "my_orchestrator", **kwargs):
super().__init__(name)
# Your initialization
def validate(self) -> bool:
# Validation logic
return True
def run_pipeline(self, pipeline, **kwargs):
# Execution logic
pass
def get_run_status(self, run_id: str) -> str:
# Status checking
return "SUCCESS"
def to_dict(self):
return {"name": self.name, "type": "my_orchestrator"}
|
2. Use in Configuration
| # flowyml.yaml
stacks:
my_stack:
type: local
orchestrator:
type: my_orchestrator
# Your custom config
|
3. Load Component
| # Auto-loads from my_components.py if in PYTHONPATH
flowyml run pipeline.py --stack my_stack
# Or explicitly load
flowyml component load my_components
|
Creating Components
Base Classes
flowyml provides these base classes:
| from flowyml.stacks.components import (
Orchestrator, # For pipeline orchestration
ArtifactStore, # For artifact storage
ContainerRegistry, # For Docker images
StackComponent, # Generic component
)
|
Orchestrator Example
| from flowyml.stacks.components import Orchestrator
from flowyml.stacks.plugins import register_component
@register_component
class AirflowOrchestrator(Orchestrator):
def __init__(self, name="airflow", dag_folder="~/airflow/dags"):
super().__init__(name)
self.dag_folder = dag_folder
def validate(self) -> bool:
try:
import airflow
return True
except ImportError:
raise ImportError("pip install apache-airflow")
def run_pipeline(self, pipeline, **kwargs):
# Convert to Airflow DAG
from airflow import DAG
# ... implementation
return "dag_run_id"
def get_run_status(self, run_id: str) -> str:
# Check Airflow DAG run status
return "RUNNING"
def to_dict(self):
return {
"type": "airflow",
"dag_folder": self.dag_folder
}
|
Artifact Store Example
| from flowyml.stacks.components import ArtifactStore
from flowyml.stacks.plugins import register_component
@register_component
class MinIOArtifactStore(ArtifactStore):
def __init__(
self,
name="minio",
endpoint="localhost:9000",
bucket="flowyml"
):
super().__init__(name)
self.endpoint = endpoint
self.bucket = bucket
def validate(self) -> bool:
from minio import Minio
client = Minio(self.endpoint)
return client.bucket_exists(self.bucket)
def save(self, artifact, path: str) -> str:
# Save to MinIO
return f"s3://{self.bucket}/{path}"
def load(self, path: str):
# Load from MinIO
pass
def exists(self, path: str) -> bool:
# Check existence
return True
def to_dict(self):
return {
"type": "minio",
"endpoint": self.endpoint,
"bucket": self.bucket
}
|
Registration Methods
Method 1: Decorator (Recommended)
| from flowyml.stacks.plugins import register_component
@register_component
class MyComponent(Orchestrator):
pass
# Or with custom name
@register_component(name="custom_name")
class MyComponent(Orchestrator):
pass
|
Method 2: Manual Registration
| from flowyml.stacks.plugins import get_component_registry
class MyComponent(Orchestrator):
pass
registry = get_component_registry()
registry.register(MyComponent, "my_component")
|
Method 3: Entry Points (for packages)
In your pyproject.toml:
| [project.entry-points."flowyml.stack_components"]
my_orchestrator = "my_package.components:MyOrchestrator"
my_artifact_store = "my_package.components:MyArtifactStore"
|
Then components auto-load when package is installed!
Loading Components
From Module
| from flowyml.stacks.plugins import load_component
# Load all components from module
load_component("my_package.components")
|
From File
| # Load specific class from file
load_component("/path/to/component.py:MyOrchestrator")
|
From Configuration
| # flowyml.yaml
components:
- module: my_package.components
- file: /path/to/custom.py:CustomComponent
|
Component Discovery
flowyml automatically discovers components from:
- Installed packages with entry points
PYTHONPATH - any module in path
- Current directory -
./components/ folder
- Configuration -
flowyml.yaml components section
Publishing Components
As Python Package
| # setup.py or pyproject.toml
[project.entry-points."flowyml.stack_components"]
my_orchestrator = "my_flowyml_plugin:MyOrchestrator"
|
| pip install my-flowyml-plugin
# Auto-available in flowyml!
flowyml component list
|
As Module
| # Add to PYTHONPATH
export PYTHONPATH=/path/to/components:$PYTHONPATH
# Or install in development mode
pip install -e /path/to/my-components
|
Using Existing Components
| # Install from PyPI
pip install flowyml-airflow-orchestrator
pip install flowyml-minio-store
# Automatically available!
|
Creating Shareable Components
| my-flowyml-components/
βββ pyproject.toml
βββ README.md
βββ my_flowyml_components/
βββ __init__.py
βββ airflow_orchestrator.py
βββ minio_store.py
|
| # pyproject.toml
[project]
name = "my-flowyml-components"
version = "0.1.0"
[project.entry-points."flowyml.stack_components"]
airflow = "my_flowyml_components.airflow_orchestrator:AirflowOrchestrator"
minio = "my_flowyml_components.minio_store:MinIOArtifactStore"
|
Examples
See:
- examples/custom_components/my_components.py - Complete examples
- examples/custom_components/airflow_integration.py - Airflow integration
- examples/custom_components/minio_integration.py - MinIO integration
Best Practices
- β
Use
@register_component decorator
- β
Implement all required methods
- β
Add comprehensive validation
- β
Include good documentation
- β
Provide configuration examples
- β
Test thoroughly
- β
Publish as package for reuse
Testing Custom Components
| import unittest
from my_components import MyOrchestrator
class TestMyOrchestrator(unittest.TestCase):
def test_validation(self):
orch = MyOrchestrator()
self.assertTrue(orch.validate())
def test_run_pipeline(self):
# Test pipeline execution
pass
|
Next Steps
- Create your custom component
- Test locally
- Publish to PyPI
- Share with community!