Tutorial: Building an Extensible ML Pipeline
This step-by-step tutorial shows you how to build a production-ready ML pipeline with custom components, from development to deployment.
What You'll Build
- Production ML pipeline with TensorFlow
- Custom MinIO artifact store component
- Multi-environment configuration
- Automated CI/CD deployment
Prerequisites
- Python 3.8+
- Docker installed
- (Optional) GCP account for cloud deployment
- (Optional) MinIO server for custom storage
Step 1: Project Setup
Install flowyml
# Basic installation
pip install flowyml
# With ML and GCP support
pip install flowyml[tensorflow,gcp]
Initialize Project
# Create project directory
mkdir ml-pipeline-tutorial
cd ml-pipeline-tutorial
# Initialize flowyml
flowyml init
# Should create flowyml.yaml
Step 2: Write Your Pipeline
Create training_pipeline.py:
"""Clean ML training pipeline - infrastructure agnostic."""
from flowyml import Pipeline, step, context, Dataset, Model, Metrics
import tensorflow as tf
import pandas as pd
import numpy as np
@step(outputs=["dataset"])
def load_data(data_path: str):
"""Load and prepare training data."""
# In production, load from data_path
# For tutorial, generate synthetic data
np.random.seed(42)
X = np.random.randn(1000, 20)
y = (X[:, 0] + X[:, 1] > 0).astype(int)
df = pd.DataFrame(X)
df['label'] = y
return Dataset.create(
data=df,
name="training_data",
properties={
"rows": len(df),
"cols": len(df.columns),
"source": data_path
}
)
@step(inputs=["dataset"], outputs=["train_data", "val_data"])
def split_data(dataset: Dataset):
"""Split into train and validation sets."""
df = dataset.data
# 80-20 split
split_idx = int(len(df) * 0.8)
train_df = df.iloc[:split_idx]
val_df = df.iloc[split_idx:]
train_dataset = Dataset.create(
data=train_df,
name="train_data",
parent=dataset,
properties={"split": "train"}
)
val_dataset = Dataset.create(
data=val_df,
name="val_data",
parent=dataset,
properties={"split": "validation"}
)
return train_dataset, val_dataset
@step(inputs=["train_data"], outputs=["model"])
def train_model(train_data: Dataset, epochs: int):
"""Train TensorFlow model."""
df = train_data.data
# Prepare data
X_train = df.drop('label', axis=1).values
y_train = df['label'].values
# Build model
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# Train
history = model.fit(
X_train, y_train,
epochs=epochs,
batch_size=32,
validation_split=0.2,
verbose=1
)
return Model.create(
data=model,
name="binary_classifier",
framework="tensorflow",
parent=train_data
)
@step(inputs=["model", "val_data"], outputs=["metrics"])
def evaluate_model(model: Model, val_data: Dataset):
"""Evaluate on validation set."""
df = val_data.data
X_val = df.drop('label', axis=1).values
y_val = df['label'].values
# Evaluate
tf_model = model.data
loss, accuracy = tf_model.evaluate(X_val, y_val, verbose=0)
return Metrics.create(
loss=float(loss),
accuracy=float(accuracy),
name="validation_metrics",
parent=model
)
# Create pipeline - NO infrastructure code!
if __name__ == "__main__":
ctx = context(
data_path="data/train.csv",
epochs=10
)
pipeline = Pipeline("ml_training", context=ctx)
pipeline.add_step(load_data)
pipeline.add_step(split_data)
pipeline.add_step(train_model)
pipeline.add_step(evaluate_model)
result = pipeline.run()
if result.success:
print(f"✅ Training complete!")
print(f"Accuracy: {result.outputs['metrics'].accuracy:.2%}")
else:
print(f"❌ Training failed")
Step 3: Test Locally
Run with default (local) stack
Output:
🚀 Running pipeline: training_pipeline.py
📦 Stack: local
⚙️ Loading pipeline...
🏃 Running pipeline...
Epoch 1/10
...
✅ Pipeline completed successfully!
Verify artifacts
# Check artifact storage
ls -R .flowyml/artifacts/
# Check metadata
sqlite3 .flowyml/metadata.db "SELECT * FROM runs;"
Step 4: Create Custom Component
For this tutorial, we'll create a MinIO artifact store.
Create custom_components/minio_store.py:
"""Custom MinIO artifact store for tutorial."""
from flowyml.stacks.components import ArtifactStore
from flowyml.stacks.plugins import register_component
from typing import Any
import pickle
import io
@register_component
class MinIOArtifactStore(ArtifactStore):
"""MinIO object storage integration."""
def __init__(
self,
name: str = "minio",
endpoint: str = "localhost:9000",
bucket: str = "ml-artifacts",
access_key: str = "minioadmin",
secret_key: str = "minioadmin",
secure: bool = False,
):
super().__init__(name)
self.endpoint = endpoint
self.bucket = bucket
self.access_key = access_key
self.secret_key = secret_key
self.secure = secure
self._client = None
@property
def client(self):
if self._client is None:
from minio import Minio
self._client = Minio(
self.endpoint,
access_key=self.access_key,
secret_key=self.secret_key,
secure=self.secure,
)
# Create bucket if needed
if not self._client.bucket_exists(self.bucket):
self._client.make_bucket(self.bucket)
return self._client
def validate(self) -> bool:
try:
from minio import Minio
_ = self.client # Test connection
return True
except ImportError:
raise ImportError("pip install minio")
except Exception as e:
raise ConnectionError(f"Cannot connect to MinIO: {e}")
def save(self, artifact: Any, path: str) -> str:
data = pickle.dumps(artifact)
stream = io.BytesIO(data)
self.client.put_object(
self.bucket,
path,
stream,
length=len(data)
)
return f"s3://{self.bucket}/{path}"
def load(self, path: str) -> Any:
if path.startswith("s3://"):
path = path.replace(f"s3://{self.bucket}/", "")
response = self.client.get_object(self.bucket, path)
return pickle.loads(response.read())
def exists(self, path: str) -> bool:
try:
self.client.stat_object(self.bucket, path)
return True
except:
return False
def to_dict(self):
return {
"type": "minio",
"endpoint": self.endpoint,
"bucket": self.bucket
}
Test Custom Component
# Install MinIO client
pip install minio
# Start MinIO (Docker)
docker run -d \
-p 9000:9000 \
-p 9001:9001 \
--name minio \
minio/minio server /data --console-address ":9001"
# Load component
flowyml component load custom_components.minio_store
# Verify
flowyml component list
Step 5: Multi-Environment Configuration
Update flowyml.yaml:
# Multi-environment configuration
stacks:
# Local development
local:
type: local
artifact_store:
path: .flowyml/artifacts
metadata_store:
path: .flowyml/metadata.db
# Development with MinIO
dev_minio:
type: local
artifact_store:
type: minio
endpoint: localhost:9000
bucket: ml-dev
metadata_store:
path: .flowyml/metadata.db
# Staging on GCP
staging:
type: gcp
project_id: ${GCP_PROJECT_ID}
region: us-central1
artifact_store:
type: gcs
bucket: ${GCP_STAGING_BUCKET}
orchestrator:
type: vertex_ai
# Production on GCP
production:
type: gcp
project_id: ${GCP_PROJECT_ID}
region: us-central1
artifact_store:
type: gcs
bucket: ${GCP_PROD_BUCKET}
orchestrator:
type: vertex_ai
service_account: ${GCP_SERVICE_ACCOUNT}
default_stack: local
resources:
default:
cpu: "2"
memory: "8Gi"
training:
cpu: "8"
memory: "32Gi"
gpu: "nvidia-tesla-v100"
gpu_count: 2
docker:
use_poetry: true
base_image: python:3.11-slim
components:
- module: custom_components.minio_store
Test Different Stacks
# Local
flowyml run training_pipeline.py
# With MinIO
flowyml run training_pipeline.py --stack dev_minio
# Staging (dry run)
flowyml run training_pipeline.py --stack staging --dry-run
# Production with GPUs
flowyml run training_pipeline.py \
--stack production \
--resources training \
--context epochs=50
Step 6: Create Dockerfile
Create Dockerfile:
FROM python:3.11-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
git \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements
COPY pyproject.toml* requirements.txt* ./
# Install Python dependencies
RUN if [ -f pyproject.toml ]; then \
pip install poetry && poetry install --no-dev; \
elif [ -f requirements.txt ]; then \
pip install -r requirements.txt; \
fi
# Install flowyml
RUN pip install flowyml[tensorflow,gcp]
# Copy code
COPY . .
# Set entrypoint
ENTRYPOINT ["python"]
CMD ["training_pipeline.py"]
Build and Test
# Build image
docker build -t ml-pipeline:latest .
# Test locally
docker run ml-pipeline:latest
# Push to registry
docker tag ml-pipeline:latest gcr.io/my-project/ml-pipeline:v1
docker push gcr.io/my-project/ml-pipeline:v1
Step 7: CI/CD Setup
Create .github/workflows/ml-pipeline.yml:
name: ML Training Pipeline
on:
push:
branches: [main]
workflow_dispatch:
inputs:
environment:
description: 'Environment to deploy to'
required: true
default: 'staging'
type: choice
options:
- staging
- production
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install flowyml[tensorflow]
pip install minio # For custom component
- name: Run tests
run: |
flowyml run training_pipeline.py --dry-run
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install flowyml
run: |
pip install flowyml[tensorflow,gcp]
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
- name: Run pipeline on GCP
env:
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
GCP_STAGING_BUCKET: ${{ secrets.GCP_STAGING_BUCKET }}
GCP_PROD_BUCKET: ${{ secrets.GCP_PROD_BUCKET }}
GCP_SERVICE_ACCOUNT: ${{ secrets.GCP_SERVICE_ACCOUNT }}
run: |
ENV=${{ github.event.inputs.environment || 'staging' }}
flowyml run training_pipeline.py \
--stack $ENV \
--resources training \
--context experiment_name=github-${{ github.run_id }}
Step 8: Production Deployment
Verify Configuration
# Check stack configuration
flowyml stack show production
# Dry run
flowyml run training_pipeline.py \
--stack production \
--resources training \
--dry-run
Deploy
# Run on production
flowyml run training_pipeline.py \
--stack production \
--resources training \
--context epochs=100 \
--context experiment_name=prod-v1
What You've Learned
✅ Clean pipeline code - no infrastructure coupling ✅ Custom components - MinIO artifact store ✅ Multi-environment setup - dev, staging, production ✅ Configuration-driven - same code, different infra ✅ Docker integration - containerized execution ✅ CI/CD automation - GitHub Actions deployment
Next Steps
- Add more custom components
- Airflow orchestrator
- Redis cache
-
Custom metrics tracker
-
Enhance pipeline
- Hyperparameter tuning
- Model registry integration
-
A/B testing
-
Monitor and optimize
- Add logging
- Track metrics
-
Optimize resources
-
Share components
- Package as pip installable
- Publish to PyPI
- Contribute to community
Resources
Troubleshooting
MinIO Connection Issues
# Check MinIO is running
docker ps | grep minio
# Test connection
python -c "from minio import Minio; Minio('localhost:9000', 'minioadmin', 'minioadmin').list_buckets()"
GCP Authentication Issues
# Check authentication
gcloud auth list
# Re-authenticate
gcloud auth login
gcloud auth application-default login
Component Not Loading
# Check Python path
python -c "import sys; print('\n'.join(sys.path))"
# Load explicitly
flowyml component load custom_components.minio_store
# Verify
flowyml component list
Congratulations! You've built a production-ready, extensible ML pipeline! 🎉