Skip to content

ZenML Pipeline Tutorial 🔄

Transform your MLPotion pipeline into a production-ready MLOps workflow with ZenML tracking, versioning, and reproducibility!

Note: ZenML is just one integration example. MLPotion is designed to be orchestrator-agnostic and works with Prefect, Airflow, Kubeflow, and any other orchestration platform. See ZenML Integration Guide for extending to other orchestrators.

Prerequisites 📋

poetry add mlpotion -E tensorflow -E zenml
zenml init

Converting Your Pipeline 🔄

Before: Standalone Pipeline

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Basic TensorFlow usage WITHOUT ZenML.

This example demonstrates the core MLPotion TensorFlow workflow:
1. Load data from CSV
2. Optimize dataset for performance
3. Create a TensorFlow model
4. Train the model
5. Evaluate the model
6. Save and export the model
"""

import tensorflow as tf

from mlpotion.frameworks.tensorflow import (
    CSVDataLoader,
    DatasetOptimizer,
    ModelEvaluator,
    ModelPersistence,
    ModelTrainer,
    ModelTrainingConfig,
)


def main() -> None:
    """Run basic TensorFlow training pipeline."""
    print("=" * 60)
    print("MLPotion - TensorFlow Basic Usage")
    print("=" * 60)

    # 1. Load data
    print("\n1. Loading data...")
    loader = CSVDataLoader(
        file_pattern="examples/data/sample.csv",
        label_name="target",
        batch_size=1,  # Load unbatched, let DatasetOptimizer handle batching
    )
    dataset = loader.load()
    print(f"Dataset: {dataset}")

    # Unbatch the dataset first (since CSVDataLoader batches by default)
    dataset = dataset.unbatch()

    # Transform OrderedDict to single tensor
    def prepare_features(features, label):
        """Convert OrderedDict of features to single tensor."""
        feature_list = [features[key] for key in sorted(features.keys())]
        stacked_features = tf.stack(feature_list, axis=-1)
        return stacked_features, label

    dataset = dataset.map(prepare_features)

    # 2. Optimize dataset
    print("\n2. Optimizing dataset...")
    optimizer = DatasetOptimizer(batch_size=8, shuffle_buffer_size=100)
    dataset = optimizer.optimize(dataset)

    # 3. Create model
    print("\n3. Creating model...")
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Dense(64, activation="relu", input_shape=(10,)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(32, activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(1),
        ]
    )
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="mse",
        metrics=["mae", "mse"],
    )
    print(model.summary())

    # 4. Train model
    print("\n4. Training model...")
    trainer = ModelTrainer()
    config = ModelTrainingConfig(
        epochs=10,
        batch_size=8,
        learning_rate=0.001,
        verbose=1,
    )

    result = trainer.train(
        model=model,
        data=dataset,
        config=config,
    )

    print("\nTraining completed!")
    print(f"{result=}")

    # 5. Evaluate model
    print("\n5. Evaluating model...")
    evaluator = ModelEvaluator()
    from mlpotion.frameworks.tensorflow import ModelEvaluationConfig

    eval_config = ModelEvaluationConfig(batch_size=8, verbose=1)
    eval_result = evaluator.evaluate(
        model=model,
        data=dataset,
        config=eval_config,
    )
    print(f"{eval_result=}")

    # 6. Save model
    print("\n6. Saving model...")
    model_path = "/tmp/tensorflow_model.keras"
    persistence = ModelPersistence(
        path=model_path,
        model=model,
    )
    persistence.save(
        save_format=".keras",
    )
    print(f"Model saved to: {model_path}")

    # 7. Load model
    print("\n7. Loading model...")
    loaded_model, metadata = persistence.load()
    print(f"Model loaded successfully: {type(loaded_model)}")

    print("\n" + "=" * 60)
    print("Complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()

After: ZenML Pipeline

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""TensorFlow training pipeline WITH ZenML orchestration.

This example demonstrates how to use MLPotion's TensorFlow components
within a ZenML pipeline for reproducible and tracked ML workflows.

Requirements:
    pip install zenml

Setup:
    zenml init  # Initialize ZenML repository
    export ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK=true  # For testing without full stack
"""

import tensorflow as tf
from zenml import pipeline, step

from mlpotion.frameworks.tensorflow import ModelTrainingConfig
from mlpotion.integrations.zenml.tensorflow.steps import (
    evaluate_model,
    export_model,
    load_data,
    optimize_data,
    save_model,
    train_model,
)


@step(enable_cache=False)  # Disable caching to ensure fresh model
def create_model() -> tf.keras.Model:
    """Create and compile a TensorFlow model that accepts dict inputs.

    Returns:
        Compiled TensorFlow/Keras model ready for training.
    """
    # Create inputs for each feature (10 features: feature_0 to feature_9)
    # After batching, make_csv_dataset produces tensors with shape (batch_size,) for each scalar feature
    # The materializer now correctly preserves this shape as (None,) where None is the batch dimension
    inputs = {}
    feature_list = []

    for i in range(10):
        # Each input has shape (1,) per sample after batching and materializer roundtrip
        # The materializer preserves the concrete shape (batch_size, 1)
        inp = tf.keras.Input(shape=(1,), name=f"feature_{i}", dtype=tf.float32)
        inputs[f"feature_{i}"] = inp
        # Already shape (batch_size, 1), no need to reshape
        feature_list.append(inp)

    # Concatenate all features along the last axis
    # This will create shape (batch_size, 10)
    concatenated = tf.keras.layers.Concatenate(axis=-1)(feature_list)

    # Build the model architecture
    x = tf.keras.layers.Dense(64, activation="relu")(concatenated)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Dense(32, activation="relu")(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    outputs = tf.keras.layers.Dense(1)(x)

    # Create the functional model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="mse",
        metrics=["mae", "mse"],
    )

    return model


@step
def create_training_config() -> ModelTrainingConfig:
    """Create training configuration.

    Returns:
        Training configuration with hyperparameters.
    """
    return ModelTrainingConfig(
        epochs=10,
        batch_size=8,
        learning_rate=0.001,
        verbose=1,
    )


@pipeline(enable_cache=False)
def tensorflow_training_pipeline(
    file_path: str = "examples/data/sample.csv",
    label_name: str = "target",
    model_save_path: str = "/tmp/tensorflow_model.keras",
    export_path: str = "/tmp/tensorflow_model_export",
):
    """Complete TensorFlow training pipeline with ZenML.

    This pipeline orchestrates the entire ML workflow:
    1. Load data from CSV
    2. Optimize dataset for performance
    3. Create and configure model
    4. Train model
    5. Evaluate model
    6. Save model
    7. Export model for deployment

    Args:
        file_path: Path to CSV data file.
        label_name: Name of the target column.
        model_save_path: Path to save the trained model.
        export_path: Path to export the model for serving.
    """
    # Step 1: Load data
    dataset = load_data(
        file_path=file_path,
        batch_size=1,
        label_name=label_name,
    )

    # Step 2: Optimize dataset
    optimized_dataset = optimize_data(
        dataset=dataset,
        batch_size=8,
        shuffle_buffer_size=100,
    )

    # Step 3: Create model and config
    model = create_model()

    # Step 4: Train model
    _config_train = {
        "epochs": 10,
        "learning_rate": 0.001,
        "verbose": 1,
    }
    trained_model, training_metrics = train_model(
        model=model,
        dataset=optimized_dataset,
        **_config_train,
    )

    # Step 5: Evaluate model
    evaluation_metrics = evaluate_model(
        model=trained_model,
        dataset=optimized_dataset,
    )

    # Step 6: Save model
    save_model(
        model=trained_model,
        save_path=model_save_path,
    )

    # Step 7: Export model for serving
    export_model(
        model=trained_model,
        export_path=export_path,
        export_format="keras",
    )

    return trained_model, training_metrics, evaluation_metrics


if __name__ == "__main__":
    """Run the TensorFlow ZenML pipeline."""
    print("=" * 60)
    print("MLPotion - TensorFlow ZenML Pipeline")
    print("=" * 60)

    # Run the pipeline
    print("\nRunning ZenML pipeline...")
    result = tensorflow_training_pipeline()

    print("\n" + "=" * 60)
    print("Pipeline completed successfully!")

Benefits You Get 🌟

  • ✅ Automatic artifact versioning
  • ✅ Full pipeline lineage tracking
  • ✅ Experiment comparison
  • ✅ Model registry integration
  • ✅ Reproducible runs
  • ✅ Caching of unchanged steps
  • ✅ Step output tracking with metadata

Viewing Pipeline Runs 🔍

# List all pipeline runs
zenml pipeline runs list

# View details of a specific run
zenml pipeline runs describe <RUN_ID>

# Compare different runs
zenml pipeline runs compare <RUN_ID_1> <RUN_ID_2>

See the ZenML Integration Guide for complete documentation and examples with PyTorch and Keras!


Ready for production MLOps! 🚀