Custom Pipelines with FlowyML ๐งฉ
Learn how to compose custom pipelines by mixing and matching individual MLPotion steps. Go beyond the pre-built templates โ build exactly the workflow your project needs.
Time: ~15 minutes Level: Intermediate Prerequisites: Completed the FlowyML Quick Start
What We'll Build ๐ฏ
Three custom pipelines that are not available as pre-built templates:
- ๐ Train โ Inspect โ Export โ inspect architecture before deploying
- ๐ Multi-Dataset Evaluation โ evaluate one model against multiple datasets
- ๐งช Custom Step Pipeline โ add your own business logic as a FlowyML step
Pipeline 1: Train โ Inspect โ Export ๐
Sometimes you want to inspect the model architecture before exporting. No template for this? No problem โ build it from individual steps.
# custom_inspect_pipeline.py
import keras
from flowyml.core.context import Context
from flowyml.core.pipeline import Pipeline
from mlpotion.integrations.flowyml.keras import (
load_data,
train_model,
inspect_model,
export_model,
)
def create_model() -> keras.Model:
model = keras.Sequential([
keras.layers.Dense(256, activation="relu", input_shape=(4,)),
keras.layers.BatchNormalization(),
keras.layers.Dense(128, activation="relu"),
keras.layers.Dropout(0.3),
keras.layers.Dense(64, activation="relu"),
keras.layers.Dense(1),
])
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
return model
def main():
ctx = Context(
file_path="train.csv",
label_name="price",
batch_size=32,
epochs=30,
experiment_name="inspect-before-export",
export_path="models/inspected/",
export_format="keras",
)
# Build the custom pipeline from individual steps
pipeline = Pipeline(
name="train_inspect_export",
context=ctx,
enable_cache=True,
enable_checkpointing=True,
)
pipeline.add_step(load_data) # โ Dataset artifact
pipeline.add_step(train_model) # โ (Model, Metrics) artifacts
pipeline.add_step(inspect_model) # โ Metrics artifact (architecture)
pipeline.add_step(export_model) # โ Model artifact (exported)
result = pipeline.run()
print("โ
Custom pipeline complete!")
if __name__ == "__main__":
main()
DAG:
load_data โ train_model โ inspect_model โ export_model
FlowyML auto-wires this because:
- train_model outputs "model" โ inspect_model takes input "model"
- inspect_model does NOT consume the model โ it passes through
- export_model takes input "model" from train_model
Pipeline 2: Multi-Step Evaluation ๐
Evaluate a single model against multiple data splits using separate
load_data calls and a shared evaluate_model step:
# multi_eval_pipeline.py
from flowyml.core.context import Context
from flowyml.core.pipeline import Pipeline
from mlpotion.integrations.flowyml.keras import (
load_data,
load_model,
evaluate_model,
)
def main():
# Evaluate an existing model against test data
ctx = Context(
model_path="models/production/model.keras",
file_path="test.csv",
label_name="price",
batch_size=64,
)
pipeline = Pipeline(
name="comprehensive_evaluation",
context=ctx,
enable_cache=True,
)
pipeline.add_step(load_model) # โ Model artifact
pipeline.add_step(load_data) # โ Dataset artifact
pipeline.add_step(evaluate_model) # โ Metrics artifact
result = pipeline.run()
print("โ
Multi-evaluation complete!")
if __name__ == "__main__":
main()
Pipeline 3: Custom Step Pipeline ๐งช
Add your own business logic as a proper FlowyML step, then compose it with pre-built MLPotion steps:
# custom_step_pipeline.py
import keras
from flowyml.core.step import step
from flowyml.core.context import Context
from flowyml.core.pipeline import Pipeline
from flowyml import Dataset, Model, Metrics
from mlpotion.integrations.flowyml.keras import (
load_data,
train_model,
export_model,
)
# โโโ Your Custom Step โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@step(
name="validate_metrics",
inputs=["metrics"],
outputs=["validation_report"],
tags={"stage": "validation", "custom": "true"},
)
def validate_metrics(
metrics: Metrics,
max_acceptable_loss: float = 1000.0,
min_acceptable_mae: float = 200.0,
) -> Metrics:
"""Custom business logic: validate that metrics meet your criteria."""
loss = metrics.get_metric("loss", float("inf"))
mae = metrics.get_metric("mae", float("inf"))
report = {
"loss": loss,
"mae": mae,
"loss_acceptable": loss <= max_acceptable_loss,
"mae_acceptable": mae <= min_acceptable_mae,
"overall_pass": loss <= max_acceptable_loss and mae <= min_acceptable_mae,
}
return Metrics.create(
metrics=report,
name="validation_report",
tags={"stage": "validation"},
properties=report,
)
# โโโ Another Custom Step โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@step(
name="log_to_slack",
inputs=["validation_report"],
outputs=["notification_status"],
tags={"stage": "notification"},
)
def log_to_slack(validation_report: Metrics) -> Metrics:
"""Send validation results to Slack (mock for demo)."""
passed = validation_report.get_metric("overall_pass", False)
status = "โ
PASSED" if passed else "โ FAILED"
print(f"\n๐ข Slack notification: Model validation {status}")
print(f" Loss: {validation_report.get_metric('loss'):.2f}")
print(f" MAE: {validation_report.get_metric('mae'):.2f}")
return Metrics.create(
metrics={"notified": True, "status": status},
name="notification_status",
)
def main():
ctx = Context(
file_path="train.csv",
label_name="price",
batch_size=32,
epochs=20,
learning_rate=0.001,
export_path="models/validated/",
export_format="keras",
)
pipeline = Pipeline(
name="custom_validated_pipeline",
context=ctx,
enable_cache=True,
)
# Mix pre-built + custom steps
pipeline.add_step(load_data) # MLPotion step
pipeline.add_step(train_model) # MLPotion step
pipeline.add_step(validate_metrics) # Your custom step
pipeline.add_step(log_to_slack) # Your custom step
pipeline.add_step(export_model) # MLPotion step
result = pipeline.run()
print("\nโ
Custom validated pipeline complete!")
if __name__ == "__main__":
main()
DAG:
load_data โ train_model โ validate_metrics โ log_to_slack
โ export_model
Using the FlowyMLAdapter for Custom Components ๐
If you have a custom data loader, trainer, or evaluator that implements
MLPotion's protocol interface, wrap it with FlowyMLAdapter:
from mlpotion.integrations.flowyml import FlowyMLAdapter
from mlpotion.core.protocols import DataLoader
from flowyml.core.pipeline import Pipeline
class S3DataLoader:
"""Custom loader that reads from S3."""
def load(self):
# Your S3 loading logic
import pandas as pd
return pd.read_csv("s3://my-bucket/data.csv")
# Wrap it as a FlowyML step
s3_load_step = FlowyMLAdapter.create_data_loader_step(
S3DataLoader(),
name="s3_data_load",
cache="code_hash",
tags={"source": "s3", "env": "production"},
)
# Use in a pipeline alongside pre-built steps
from mlpotion.integrations.flowyml.keras import train_model, evaluate_model
pipeline = Pipeline("s3_pipeline")
pipeline.add_step(s3_load_step) # Your custom adapter step
pipeline.add_step(train_model) # Pre-built MLPotion step
pipeline.add_step(evaluate_model) # Pre-built MLPotion step
result = pipeline.run()
Tips for Custom Pipelines ๐ก
1. Always Declare inputs and outputs
FlowyML needs these to resolve the DAG. If you omit them, steps won't wire automatically:
# โ
Good โ FlowyML can wire this
@step(name="my_step", inputs=["model"], outputs=["processed_model"])
def my_step(model: Model) -> Model: ...
# โ Bad โ FlowyML can't auto-wire this
@step(name="my_step")
def my_step(model: Model) -> Model: ...
2. Accept Artifacts OR Raw Objects
Follow the pattern used by all MLPotion steps โ unwrap artifacts if present:
@step(name="my_step", inputs=["model", "dataset"], outputs=["result"])
def my_step(model, data):
# Unwrap if needed
raw_model = model.data if isinstance(model, Model) else model
raw_data = data.data if isinstance(data, Dataset) else data
# ...
3. Use Tags for Observability
Tags make it easy to filter and search steps in the FlowyML dashboard:
@step(
name="my_step",
tags={
"stage": "preprocessing",
"framework": "keras",
"team": "ml-platform",
"priority": "high",
},
)
4. Use Metrics.create() for Any Custom Metrics
Any dict of values can be wrapped as a Metrics artifact:
from flowyml import Metrics
report = Metrics.create(
metrics={"f1": 0.92, "precision": 0.95, "recall": 0.89},
name="custom_classification_metrics",
tags={"model_version": "v2"},
properties={"threshold": 0.5},
)
What You Learned ๐
- โ How to compose custom pipelines from individual steps
- โ How to create your own custom FlowyML steps
- โ How to mix pre-built and custom steps in the same pipeline
- โ
How to use
FlowyMLAdapterfor custom protocol components - โ Best practices for DAG wiring and artifact handling
Next Steps ๐
- Experiment Tracking โ โ Conditional deploy + metrics thresholds
- Scheduled Retraining โ โ Cron pipelines
- FlowyML Integration Guide โ โ Full API reference
You're now a FlowyML pipeline architect! ๐๏ธ