π Production Pipeline Tutorial
A complete, end-to-end guide for deploying FlowyML pipelines to production. By the end of this tutorial, you'll have a fully containerized pipeline running on a remote orchestrator with tracking, artifact storage, and monitoring.
π― What You'll Build
A complete ML pipeline with Docker containerization, GPU resources, cloud storage, experiment tracking, scheduling, and monitoring β ready for production.
π Prerequisites
Before you start, make sure you have:
- β
Python 3.9+ installed
- β
Docker installed and running
- β
A cloud account (GCP or AWS) β we'll use GCP in this tutorial
- β
gcloud CLI configured (or aws CLI for AWS)
| # Install FlowyML with all extras
pip install "flowyml[all]"
# Verify installation
flowyml --version
|
1οΈβ£ Project Setup
Create a New Project
| # Create project
flowyml init production-demo
cd production-demo
# Project structure
tree
|
| production-demo/
βββ flowyml.yaml # Stack configuration
βββ requirements.txt # Python dependencies
βββ Dockerfile # (Optional) Custom Docker image
βββ README.md
βββ src/
βββ __init__.py
βββ pipeline.py # Your pipeline code
βββ steps/
β βββ __init__.py
β βββ data_loader.py
β βββ preprocessor.py
β βββ trainer.py
βββ utils/
βββ helpers.py
|
π Best practice
Keep your steps in separate files under src/steps/. This makes them independently testable and reusable across pipelines.
2οΈβ£ Dependency Management
Option A: requirements.txt (Simple)
| # requirements.txt
flowyml[all]>=1.9.0
scikit-learn>=1.3.0
pandas>=2.0.0
numpy>=1.24.0
mlflow>=2.8.0
google-cloud-aiplatform>=1.38.0
google-cloud-storage>=2.10.0
|
Option B: pyproject.toml (Poetry)
| # pyproject.toml
[tool.poetry]
name = "production-demo"
version = "0.1.0"
description = "Production ML pipeline with FlowyML"
python = "^3.11"
[tool.poetry.dependencies]
flowyml = {version = "^1.9.0", extras = ["all"]}
scikit-learn = "^1.3.0"
pandas = "^2.0.0"
mlflow = "^2.8.0"
google-cloud-aiplatform = "^1.38.0"
|
| # If using Poetry
poetry install
poetry export -f requirements.txt -o requirements.txt # For Docker
|
π Which one to use?
requirements.txt β Simpler, universally supported, works with any Docker setup
pyproject.toml (Poetry) β Better dependency resolution, lockfiles for reproducibility
For production, we recommend starting with requirements.txt. Use Poetry if your team already has a Poetry workflow.
3οΈβ£ Stack Configuration
Local Development Stack
Start with a local stack for development and testing:
| # flowyml.yaml β local development
stacks:
local:
orchestrator: { type: local }
artifact_store: { type: local, path: "./artifacts" }
experiment_tracker:
type: mlflow
tracking_uri: http://localhost:5000
active_stack: local
|
Cloud Production Stack (GCP)
Add a production stack that uses Vertex AI:
| # flowyml.yaml β full multi-stack config
stacks:
local:
orchestrator: { type: local }
artifact_store: { type: local, path: "./artifacts" }
experiment_tracker:
type: mlflow
tracking_uri: http://localhost:5000
gcp-prod:
orchestrator:
type: vertex_ai
project: ${GCP_PROJECT}
location: us-central1
staging_bucket: gs://my-staging-bucket
artifact_store:
type: gcs
bucket: ${GCS_BUCKET}
prefix: production/
container_registry:
type: gcr
project: ${GCP_PROJECT}
location: us-central1
repository: ml-pipelines
experiment_tracker:
type: mlflow
tracking_uri: ${MLFLOW_TRACKING_URI}
model_registry:
type: vertex_model_registry
model_deployer:
type: vertex_endpoint
artifact_routing:
Model:
store: gcs
register: true
deploy: true
deploy_condition: manual
Dataset:
store: gcs
path: "{run_id}/datasets/{step_name}"
Metrics:
log_to_tracker: true
# Optional: AWS alternative
aws-staging:
orchestrator:
type: sagemaker
region: us-east-1
role_arn: ${SAGEMAKER_ROLE_ARN}
artifact_store:
type: s3
bucket: ${S3_BUCKET}
active_stack: local
|
4οΈβ£ Write the Pipeline
Step 1: Data Loader
| # src/steps/data_loader.py
from flowyml import step
import pandas as pd
@step(
outputs=["raw_data"],
resources={"cpu": "2", "memory": "4Gi"}
)
def load_data(source_path: str = "s3://data-lake/training.csv") -> pd.DataFrame:
"""Load raw data from cloud storage.
Resources: 2 CPU cores, 4GB RAM β suitable for data loading.
"""
print(f"π₯ Loading data from {source_path}")
df = pd.read_csv(source_path)
print(f"β
Loaded {len(df)} rows, {len(df.columns)} columns")
return df
|
Step 2: Preprocessor
| # src/steps/preprocessor.py
from flowyml import step
import pandas as pd
from sklearn.preprocessing import StandardScaler
@step(
inputs=["raw_data"],
outputs=["train_data", "test_data"],
resources={"cpu": "4", "memory": "8Gi"}
)
def preprocess(raw_data: pd.DataFrame, test_size: float = 0.2):
"""Clean, transform, and split data.
Resources: 4 CPU cores, 8GB RAM β feature engineering is CPU-heavy.
"""
from sklearn.model_selection import train_test_split
# Feature engineering
scaler = StandardScaler()
features = raw_data.drop("target", axis=1)
scaled = pd.DataFrame(scaler.fit_transform(features), columns=features.columns)
scaled["target"] = raw_data["target"].values
# Split
train, test = train_test_split(scaled, test_size=test_size, random_state=42)
print(f"π Train: {len(train)} rows, Test: {len(test)} rows")
return train, test
|
Step 3: Trainer (with GPU)
| # src/steps/trainer.py
from flowyml import step
from flowyml.core import Model, Metrics
import pandas as pd
@step(
inputs=["train_data", "test_data"],
outputs=["model", "metrics"],
resources={
"cpu": "8",
"memory": "32Gi",
"accelerator_type": "NVIDIA_TESLA_T4",
"accelerator_count": 1,
}
)
def train_and_evaluate(
train_data: pd.DataFrame,
test_data: pd.DataFrame,
n_estimators: int = 100,
max_depth: int = 10,
):
"""Train and evaluate a model.
Resources: 8 CPUs, 32GB RAM, 1x T4 GPU.
GPU allocation via the `resources` dict in the @step decorator.
"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score
# Train
X_train = train_data.drop("target", axis=1)
y_train = train_data["target"]
X_test = test_data.drop("target", axis=1)
y_test = test_data["target"]
clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
)
clf.fit(X_train, y_train)
# Evaluate
predictions = clf.predict(X_test)
acc = accuracy_score(y_test, predictions)
f1 = f1_score(y_test, predictions, average="weighted")
print(f"π― Accuracy: {acc:.4f}, F1: {f1:.4f}")
# Return typed artifacts β auto-routed based on flowyml.yaml
model = Model(data=clf, name="classifier", version="1.0.0", framework="sklearn")
metrics = Metrics({"accuracy": acc, "f1_score": f1, "precision": precision_score(y_test, predictions, average="weighted")})
return model, metrics
|
Step 4: Assemble the Pipeline
| # src/pipeline.py
from flowyml import Pipeline, context
from steps.data_loader import load_data
from steps.preprocessor import preprocess
from steps.trainer import train_and_evaluate
def create_pipeline(env: str = "local"):
"""Create a production-ready pipeline."""
# Context with hyperparameters
ctx = context(
source_path="gs://my-data-lake/training_v2.csv",
test_size=0.2,
n_estimators=200,
max_depth=15,
)
# Build pipeline
pipeline = Pipeline("production_classifier", context=ctx)
pipeline.add_step(load_data)
pipeline.add_step(preprocess)
pipeline.add_step(train_and_evaluate)
return pipeline
if __name__ == "__main__":
pipeline = create_pipeline()
result = pipeline.run()
if result.success:
print("β
Pipeline completed successfully!")
print(f"π Metrics: {result.outputs.get('metrics')}")
else:
print(f"β Pipeline failed: {result.error}")
|
5οΈβ£ Docker & Containerization
When FlowyML Builds Images for You
With auto_build: true (the default for remote orchestrators), FlowyML:
- Generates a Dockerfile from your config
- Installs dependencies from
requirements.txt
- Copies your source code into the image
- Builds the image locally
- Pushes it to your configured container registry
- Submits the pipeline with the image URI
You don't need to do anything extra β just run:
| FLOWYML_STACK=gcp-prod python src/pipeline.py
|
When You Need a Custom Dockerfile
Create a custom Dockerfile for:
- Custom system packages (OpenCV, ffmpeg, CUDA)
- Multi-stage builds for smaller images
- Pre-baked model weights or data
| # Dockerfile
FROM python:3.11-slim AS builder
# System dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libopencv-dev \
&& rm -rf /var/lib/apt/lists/*
# Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# --- Runtime stage ---
FROM python:3.11-slim
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Your code
COPY src/ /app/src/
COPY flowyml.yaml /app/flowyml.yaml
WORKDIR /app
|
Build and push:
| # Build
docker build -t us-central1-docker.pkg.dev/my-project/ml-pipelines/classifier:v1.0 .
# Push
docker push us-central1-docker.pkg.dev/my-project/ml-pipelines/classifier:v1.0
|
Reference in config:
| plugins:
docker:
auto_build: false
image: us-central1-docker.pkg.dev/my-project/ml-pipelines/classifier:v1.0
|
6οΈβ£ Step Grouping for Efficiency
Co-locate related steps in the same container to avoid cold-start overhead:
| # Steps in the same execution_group share a container
@step(outputs=["raw"], execution_group="preprocessing")
def load(): ...
@step(inputs=["raw"], outputs=["clean"], execution_group="preprocessing")
def clean(raw): ...
@step(inputs=["clean"], outputs=["features"], execution_group="preprocessing")
def featurize(clean): ...
# This step runs in its own container (heavy GPU workload)
@step(
inputs=["features"],
outputs=["model"],
resources={"cpu": "8", "memory": "32Gi", "accelerator_type": "NVIDIA_TESLA_T4", "accelerator_count": 1}
)
def train(features): ...
|
π Resource aggregation
For grouped steps, FlowyML allocates the maximum of all participants' resources. If step A needs 2 CPUs and step B needs 4 CPUs, the group gets 4 CPUs.
7οΈβ£ Tracking & Observability
MLflow Setup
| # Start local MLflow server (for development)
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --port 5000
|
| # flowyml.yaml β tracking config
stacks:
local:
experiment_tracker:
type: mlflow
tracking_uri: http://localhost:5000
|
FlowyML Dashboard
| # Start the FlowyML UI
flowyml ui start
# Access at http://localhost:8080
|
The dashboard provides:
- π Pipeline DAG β Visual graph showing step dependencies
- β‘ Real-time execution β Steps highlight as they run
- π Artifact inspection β Click any step to see inputs/outputs
- π Run history β Compare different runs side-by-side
- π Metrics over time β Track model performance across runs
8οΈβ£ Scheduling & Automation
Cron Scheduling
| from flowyml import PipelineScheduler
scheduler = PipelineScheduler()
# Retrain model daily at 3 AM
scheduler.schedule_daily(
"daily_retrain",
lambda: create_pipeline("gcp-prod").run(),
hour=3
)
# Run data quality checks every 6 hours
scheduler.schedule_interval(
"data_quality",
lambda: data_quality_pipeline.run(),
hours=6
)
scheduler.start()
|
CLI Scheduling
| # Schedule via CLI
flowyml schedule add \
--name "daily_retrain" \
--pipeline src/pipeline.py \
--stack gcp-prod \
--cron "0 3 * * *"
# List schedules
flowyml schedule list
# Disable a schedule
flowyml schedule disable daily_retrain
|
9οΈβ£ Running the Full Pipeline
Local Development
| # Run locally (default stack)
python src/pipeline.py
# Or via CLI
flowyml run src/pipeline.py
|
Cloud Production
| # Set environment variables
export GCP_PROJECT=my-project
export GCS_BUCKET=my-ml-artifacts
export MLFLOW_TRACKING_URI=https://mlflow.company.com
# Option 1: Environment variable
FLOWYML_STACK=gcp-prod python src/pipeline.py
# Option 2: CLI with stack flag
flowyml run src/pipeline.py --stack gcp-prod
# Option 3: Set default stack
flowyml stack set gcp-prod
flowyml run src/pipeline.py
|
What Happens When You Run on a Remote Orchestrator
| 1. FlowyML reads flowyml.yaml β selects "gcp-prod" stack
2. Generates a Dockerfile (or uses your custom one)
3. Builds the Docker image locally
4. Pushes to Google Artifact Registry
5. Submits the pipeline to Vertex AI Pipelines
6. Each step runs as a container on Vertex AI:
- load_data β 2 CPU, 4GB RAM
- preprocess β 4 CPU, 8GB RAM
- train_and_evaluate β 8 CPU, 32GB RAM, 1x T4 GPU
7. Artifacts are saved to GCS
8. Metrics are logged to MLflow
9. Model is registered in Vertex AI Model Registry
10. You monitor via FlowyML UI or GCP Console
|
π Complete flowyml.yaml Reference
| # flowyml.yaml β Production-ready configuration
stacks:
# ββ Local Development ββ
local:
orchestrator: { type: local }
artifact_store: { type: local, path: "./artifacts" }
experiment_tracker:
type: mlflow
tracking_uri: http://localhost:5000
# ββ GCP Production ββ
gcp-prod:
orchestrator:
type: vertex_ai
project: ${GCP_PROJECT}
location: us-central1
staging_bucket: gs://${GCS_BUCKET}-staging
artifact_store:
type: gcs
bucket: ${GCS_BUCKET}
prefix: production/
container_registry:
type: gcr
project: ${GCP_PROJECT}
location: us-central1
repository: ml-pipelines
experiment_tracker:
type: mlflow
tracking_uri: ${MLFLOW_TRACKING_URI}
model_registry:
type: vertex_model_registry
model_deployer:
type: vertex_endpoint
artifact_routing:
Model:
store: gcs
register: true
deploy: true
deploy_condition: manual
Dataset:
store: gcs
path: "{run_id}/datasets/{step_name}"
Metrics:
log_to_tracker: true
# ββ AWS Alternative ββ
aws-prod:
orchestrator:
type: sagemaker
region: us-east-1
role_arn: ${SAGEMAKER_ROLE_ARN}
artifact_store:
type: s3
bucket: ${S3_BUCKET}
region: us-east-1
container_registry:
type: ecr
repository: ml-pipelines
region: us-east-1
# ββ Docker Configuration ββ
docker:
auto_build: true
base_image: python:3.11-slim
requirements_file: requirements.txt
include_files:
- src/
- configs/
# ββ Pipeline Defaults ββ
pipeline_defaults:
resources:
cpu: "2"
memory: "8Gi"
timeout: 3600
active_stack: local
|
β
Production Checklist
Before deploying to production, verify:
π Next Steps