Skip to content

Tutorial: Building an Extensible ML Pipeline

This step-by-step tutorial shows you how to build a production-ready ML pipeline with custom components, from development to deployment.

What You'll Build

  • Production ML pipeline with TensorFlow
  • Custom MinIO artifact store component
  • Multi-environment configuration
  • Automated CI/CD deployment

Prerequisites

  • Python 3.8+
  • Docker installed
  • (Optional) GCP account for cloud deployment
  • (Optional) MinIO server for custom storage

Step 1: Project Setup

Install flowyml

# Basic installation
pip install flowyml

# With ML and GCP support
pip install flowyml[tensorflow,gcp]

Initialize Project

# Create project directory
mkdir ml-pipeline-tutorial
cd ml-pipeline-tutorial

# Initialize flowyml
flowyml init

# Should create flowyml.yaml

Step 2: Write Your Pipeline

Create training_pipeline.py:

"""Clean ML training pipeline - infrastructure agnostic."""

from flowyml import Pipeline, step, context, Dataset, Model, Metrics
import tensorflow as tf
import pandas as pd
import numpy as np


@step(outputs=["dataset"])
def load_data(data_path: str):
    """Load and prepare training data."""
    # In production, load from data_path
    # For tutorial, generate synthetic data
    np.random.seed(42)
    X = np.random.randn(1000, 20)
    y = (X[:, 0] + X[:, 1] > 0).astype(int)

    df = pd.DataFrame(X)
    df['label'] = y

    return Dataset.create(
        data=df,
        name="training_data",
        properties={
            "rows": len(df),
            "cols": len(df.columns),
            "source": data_path
        }
    )


@step(inputs=["dataset"], outputs=["train_data", "val_data"])
def split_data(dataset: Dataset):
    """Split into train and validation sets."""
    df = dataset.data

    # 80-20 split
    split_idx = int(len(df) * 0.8)
    train_df = df.iloc[:split_idx]
    val_df = df.iloc[split_idx:]

    train_dataset = Dataset.create(
        data=train_df,
        name="train_data",
        parent=dataset,
        properties={"split": "train"}
    )
    val_dataset = Dataset.create(
        data=val_df,
        name="val_data",
        parent=dataset,
        properties={"split": "validation"}
    )

    return train_dataset, val_dataset


@step(inputs=["train_data"], outputs=["model"])
def train_model(train_data: Dataset, epochs: int):
    """Train TensorFlow model."""
    df = train_data.data

    # Prepare data
    X_train = df.drop('label', axis=1).values
    y_train = df['label'].values

    # Build model
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    # Train
    history = model.fit(
        X_train, y_train,
        epochs=epochs,
        batch_size=32,
        validation_split=0.2,
        verbose=1
    )

    return Model.create(
        data=model,
        name="binary_classifier",
        framework="tensorflow",
        parent=train_data
    )


@step(inputs=["model", "val_data"], outputs=["metrics"])
def evaluate_model(model: Model, val_data: Dataset):
    """Evaluate on validation set."""
    df = val_data.data

    X_val = df.drop('label', axis=1).values
    y_val = df['label'].values

    # Evaluate
    tf_model = model.data
    loss, accuracy = tf_model.evaluate(X_val, y_val, verbose=0)

    return Metrics.create(
        loss=float(loss),
        accuracy=float(accuracy),
        name="validation_metrics",
        parent=model
    )


# Create pipeline - NO infrastructure code!
if __name__ == "__main__":
    ctx = context(
        data_path="data/train.csv",
        epochs=10
    )

    pipeline = Pipeline("ml_training", context=ctx)
    pipeline.add_step(load_data)
    pipeline.add_step(split_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)

    result = pipeline.run()

    if result.success:
        print(f"✅ Training complete!")
        print(f"Accuracy: {result.outputs['metrics'].accuracy:.2%}")
    else:
        print(f"❌ Training failed")

Step 3: Test Locally

Run with default (local) stack

flowyml run training_pipeline.py

Output:

🚀 Running pipeline: training_pipeline.py
📦 Stack: local
⚙️  Loading pipeline...
🏃 Running pipeline...

Epoch 1/10
...
✅ Pipeline completed successfully!

Verify artifacts

# Check artifact storage
ls -R .flowyml/artifacts/

# Check metadata
sqlite3 .flowyml/metadata.db "SELECT * FROM runs;"

Step 4: Create Custom Component

For this tutorial, we'll create a MinIO artifact store.

Create custom_components/minio_store.py:

"""Custom MinIO artifact store for tutorial."""

from flowyml.stacks.components import ArtifactStore
from flowyml.stacks.plugins import register_component
from typing import Any
import pickle
import io


@register_component
class MinIOArtifactStore(ArtifactStore):
    """MinIO object storage integration."""

    def __init__(
        self,
        name: str = "minio",
        endpoint: str = "localhost:9000",
        bucket: str = "ml-artifacts",
        access_key: str = "minioadmin",
        secret_key: str = "minioadmin",
        secure: bool = False,
    ):
        super().__init__(name)
        self.endpoint = endpoint
        self.bucket = bucket
        self.access_key = access_key
        self.secret_key = secret_key
        self.secure = secure
        self._client = None

    @property
    def client(self):
        if self._client is None:
            from minio import Minio

            self._client = Minio(
                self.endpoint,
                access_key=self.access_key,
                secret_key=self.secret_key,
                secure=self.secure,
            )

            # Create bucket if needed
            if not self._client.bucket_exists(self.bucket):
                self._client.make_bucket(self.bucket)

        return self._client

    def validate(self) -> bool:
        try:
            from minio import Minio
            _ = self.client  # Test connection
            return True
        except ImportError:
            raise ImportError("pip install minio")
        except Exception as e:
            raise ConnectionError(f"Cannot connect to MinIO: {e}")

    def save(self, artifact: Any, path: str) -> str:
        data = pickle.dumps(artifact)
        stream = io.BytesIO(data)

        self.client.put_object(
            self.bucket,
            path,
            stream,
            length=len(data)
        )

        return f"s3://{self.bucket}/{path}"

    def load(self, path: str) -> Any:
        if path.startswith("s3://"):
            path = path.replace(f"s3://{self.bucket}/", "")

        response = self.client.get_object(self.bucket, path)
        return pickle.loads(response.read())

    def exists(self, path: str) -> bool:
        try:
            self.client.stat_object(self.bucket, path)
            return True
        except:
            return False

    def to_dict(self):
        return {
            "type": "minio",
            "endpoint": self.endpoint,
            "bucket": self.bucket
        }

Test Custom Component

# Install MinIO client
pip install minio

# Start MinIO (Docker)
docker run -d \
  -p 9000:9000 \
  -p 9001:9001 \
  --name minio \
  minio/minio server /data --console-address ":9001"

# Load component
flowyml component load custom_components.minio_store

# Verify
flowyml component list

Step 5: Multi-Environment Configuration

Update flowyml.yaml:

# Multi-environment configuration

stacks:
  # Local development
  local:
    type: local
    artifact_store:
      path: .flowyml/artifacts
    metadata_store:
      path: .flowyml/metadata.db

  # Development with MinIO
  dev_minio:
    type: local
    artifact_store:
      type: minio
      endpoint: localhost:9000
      bucket: ml-dev
    metadata_store:
      path: .flowyml/metadata.db

  # Staging on GCP
  staging:
    type: gcp
    project_id: ${GCP_PROJECT_ID}
    region: us-central1
    artifact_store:
      type: gcs
      bucket: ${GCP_STAGING_BUCKET}
    orchestrator:
      type: vertex_ai

  # Production on GCP
  production:
    type: gcp
    project_id: ${GCP_PROJECT_ID}
    region: us-central1
    artifact_store:
      type: gcs
      bucket: ${GCP_PROD_BUCKET}
    orchestrator:
      type: vertex_ai
      service_account: ${GCP_SERVICE_ACCOUNT}

default_stack: local

resources:
  default:
    cpu: "2"
    memory: "8Gi"

  training:
    cpu: "8"
    memory: "32Gi"
    gpu: "nvidia-tesla-v100"
    gpu_count: 2

docker:
  use_poetry: true
  base_image: python:3.11-slim

components:
  - module: custom_components.minio_store

Test Different Stacks

# Local
flowyml run training_pipeline.py

# With MinIO
flowyml run training_pipeline.py --stack dev_minio

# Staging (dry run)
flowyml run training_pipeline.py --stack staging --dry-run

# Production with GPUs
flowyml run training_pipeline.py \
  --stack production \
  --resources training \
  --context epochs=50

Step 6: Create Dockerfile

Create Dockerfile:

FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    git \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements
COPY pyproject.toml* requirements.txt* ./

# Install Python dependencies
RUN if [ -f pyproject.toml ]; then \
        pip install poetry && poetry install --no-dev; \
    elif [ -f requirements.txt ]; then \
        pip install -r requirements.txt; \
    fi

# Install flowyml
RUN pip install flowyml[tensorflow,gcp]

# Copy code
COPY . .

# Set entrypoint
ENTRYPOINT ["python"]
CMD ["training_pipeline.py"]

Build and Test

# Build image
docker build -t ml-pipeline:latest .

# Test locally
docker run ml-pipeline:latest

# Push to registry
docker tag ml-pipeline:latest gcr.io/my-project/ml-pipeline:v1
docker push gcr.io/my-project/ml-pipeline:v1

Step 7: CI/CD Setup

Create .github/workflows/ml-pipeline.yml:

name: ML Training Pipeline

on:
  push:
    branches: [main]
  workflow_dispatch:
    inputs:
      environment:
        description: 'Environment to deploy to'
        required: true
        default: 'staging'
        type: choice
        options:
          - staging
          - production

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install flowyml[tensorflow]
          pip install minio  # For custom component

      - name: Run tests
        run: |
          flowyml run training_pipeline.py --dry-run

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install flowyml
        run: |
          pip install flowyml[tensorflow,gcp]

      - name: Authenticate to Google Cloud
        uses: google-github-actions/auth@v1
        with:
          credentials_json: ${{ secrets.GCP_SA_KEY }}

      - name: Run pipeline on GCP
        env:
          GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
          GCP_STAGING_BUCKET: ${{ secrets.GCP_STAGING_BUCKET }}
          GCP_PROD_BUCKET: ${{ secrets.GCP_PROD_BUCKET }}
          GCP_SERVICE_ACCOUNT: ${{ secrets.GCP_SERVICE_ACCOUNT }}
        run: |
          ENV=${{ github.event.inputs.environment || 'staging' }}

          flowyml run training_pipeline.py \
            --stack $ENV \
            --resources training \
            --context experiment_name=github-${{ github.run_id }}

Step 8: Production Deployment

Verify Configuration

# Check stack configuration
flowyml stack show production

# Dry run
flowyml run training_pipeline.py \
  --stack production \
  --resources training \
  --dry-run

Deploy

# Run on production
flowyml run training_pipeline.py \
  --stack production \
  --resources training \
  --context epochs=100 \
  --context experiment_name=prod-v1

What You've Learned

Clean pipeline code - no infrastructure coupling ✅ Custom components - MinIO artifact store ✅ Multi-environment setup - dev, staging, production ✅ Configuration-driven - same code, different infra ✅ Docker integration - containerized execution ✅ CI/CD automation - GitHub Actions deployment

Next Steps

  1. Add more custom components
  2. Airflow orchestrator
  3. Redis cache
  4. Custom metrics tracker

  5. Enhance pipeline

  6. Hyperparameter tuning
  7. Model registry integration
  8. A/B testing

  9. Monitor and optimize

  10. Add logging
  11. Track metrics
  12. Optimize resources

  13. Share components

  14. Package as pip installable
  15. Publish to PyPI
  16. Contribute to community

Resources

Troubleshooting

MinIO Connection Issues

# Check MinIO is running
docker ps | grep minio

# Test connection
python -c "from minio import Minio; Minio('localhost:9000', 'minioadmin', 'minioadmin').list_buckets()"

GCP Authentication Issues

# Check authentication
gcloud auth list

# Re-authenticate
gcloud auth login
gcloud auth application-default login

Component Not Loading

# Check Python path
python -c "import sys; print('\n'.join(sys.path))"

# Load explicitly
flowyml component load custom_components.minio_store

# Verify
flowyml component list

Congratulations! You've built a production-ready, extensible ML pipeline! 🎉