Skip to content

MLPotion: Brew Your ML Magic! ๐Ÿงชโœจ

MLPotion Logo

Provided and maintained by ๐Ÿฆ„ UnicoLab

Welcome, fellow alchemist! ๐Ÿง™โ€โ™‚๏ธ Ready to brew some machine learning magic without getting locked in a cauldron?

MLPotion is your chest of modular, mix-and-match ML building blocks that work across Keras, TensorFlow, and PyTorch. Think of it as LEGOยฎ for ML pipelines, but with fewer foot injuries and more flexibility!

Why MLPotion? ๐Ÿค”

Ever felt trapped by a framework that forces you to do things "their way"? We've been there. That's why we created MLPotion:

  • ๐ŸŽฏ Framework Agnostic: Write once, run anywhere (well, on Keras, TensorFlow, or PyTorch)
  • ๐Ÿงฑ Modular by Design: Pick the pieces you need, leave the rest in the box
  • ๐Ÿ”ฌ Type-Safe: Python 3.10+ typing that actually helps you (mypy approved!)
  • ๐Ÿš€ Production Ready: Built for the real world, not just notebooks
  • ๐ŸŽจ Orchestration Flexible: Works standalone OR with ZenML, Prefect, Airflow - your choice!
  • ๐Ÿ“ฆ Install What You Need: Core package works without any ML frameworks (you only install what you need)!
  • ๐Ÿค Community-Driven: Missing something? Contribute it back - we love community additions!

What's in the Potion? ๐Ÿงช

โš—๏ธ Core Ingredients

  • Type-safe protocols for all components
  • Framework-agnostic result types
  • Consistent error handling
  • Zero-dependency core package

๐Ÿ”ง Framework Support

  • Keras 3.0+ - The friendly one
  • TensorFlow 2.15+ - The production workhorse
  • PyTorch 2.0+ - The researcher's favorite

๐Ÿ“Š Data Processing

  • CSV loaders for all frameworks
  • Dataset optimization utilities
  • Data transformers
  • Preprocessing pipelines

๐ŸŽ“ Training & Evaluation

  • Unified training interface
  • Comprehensive evaluation tools
  • Rich result objects
  • Training history tracking

๐Ÿ’พ Model Management

  • Save/load model checkpoints
  • Export to production formats
  • Model inspection utilities
  • Multiple export formats

๐Ÿ”„ Orchestration Integration

  • ZenML integration built-in
  • Extensible to Prefect, Airflow, etc.
  • Works standalone (no orchestration needed!)
  • Community contributions welcome

The MLPotion Philosophy ๐ŸŽญ

"A good potion doesn't force you to drink it a certain way. It just... works."

โ€” Ancient ML Alchemist Proverb (we just made that up)

We believe in:

  1. Flexibility > Convention: Your project, your rules
  2. Simplicity > Complexity: If it's hard to use, we failed
  3. Type Safety > Runtime Surprises: Catch errors before they bite
  4. Modularity > Monoliths: Use what you need, ignore the rest
  5. Consistency > Chaos: Same patterns across all frameworks
  6. Community > Corporate: Built by the community, for the community

Extensibility & Community Contributions ๐ŸŒŸ

MLPotion is designed to be extensible. While we provide ZenML integration out-of-the-box, you can easily integrate with:

  • Prefect: Wrap components as Prefect tasks
  • Airflow: Use as operators in DAGs
  • Kubeflow: Deploy in Kubeflow pipelines
  • Your Custom Orchestrator: The building blocks work anywhere!

Missing a feature? We actively encourage community contributions! Whether it's:

  • A new data loader (Parquet, Avro, databases)
  • Integration with another orchestration framework
  • Framework-specific optimizations
  • New export formats

Your contributions help everyone. Check out our Contributing Guide to get started!

Who's This For? ๐ŸŽฏ

You'll love MLPotion if you:

  • Switch between frameworks and hate rewriting everything
  • Value heavily tested code that you can reuse
  • Value type safety and IDE autocomplete (who doesn't?)
  • Want production-ready code without enterprise bloat
  • Believe ML pipelines should be composable and testable

You might want something else if you:

  • Do not like modularity
  • Do not like reusability
  • Are too lazy to contribute something that you can't already find here

Getting Started ๐Ÿš€

Ready to start brewing? Here's your path:

1

๐Ÿ“ฅ Install MLPotion

Choose your framework flavor

Installation Guide โ†’
2

โšก Quick Start

Get up and running in 5 minutes

Quick Start โ†’
3

๐Ÿง  Learn Concepts

Understand the architecture

Core Concepts โ†’
4

๐ŸŽจ Build Pipelines

Create your first pipeline

First Pipeline โ†’

Show Me the Code! ๐Ÿ’ป

Standalone Usage (Framework-Only)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""Basic Keras usage WITHOUT ZenML.

This example demonstrates the core MLPotion Keras workflow:
1. Load data from CSV
2. Create a Keras model
3. Train the model
4. Evaluate the model
5. Save and export the model
"""

import tensorflow as tf
from loguru import logger

from mlpotion.frameworks.keras import (
    CSVDataLoader,
    ModelEvaluator,
    ModelPersistence,
    ModelTrainer,
    ModelTrainingConfig,
)


# =================== METHODS ===========================
def create_model(input_dim: int = 10) -> tf.keras.Model:
    """Create a simple feedforward neural network.

    Args:
        input_dim: Number of input features.

    Returns:
        Compiled Keras model.
    """
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Dense(64, activation="relu", input_shape=(input_dim,)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(32, activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(1),
        ]
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="mse",
        metrics=["mae", "mse"],
    )
    return model


# =================== MAIN ===========================

# 1. Load data (โ™ป๏ธ REUSABLE)
logger.info("\n1. Loading data from CSV...")
loader = CSVDataLoader(
    file_pattern="docs/examples/data/sample.csv",
    label_name="target",
    batch_size=8,
    shuffle=True,
)
dataset = loader.load()
logger.info(f"Dataset created: {dataset}")

# 2. Create model (CUSTOM)
logger.info("\n2. Creating Keras model...")
model = create_model(input_dim=10)
logger.info(model.summary())

# 3. Train model (โ™ป๏ธ REUSABLE)
logger.info("\n3. Training model...")
trainer = ModelTrainer()
config = ModelTrainingConfig(
    epochs=10,
    batch_size=8,
    learning_rate=0.001,
    validation_split=0.2,
    verbose=1,
)

result = trainer.train(
    model=model,
    data=dataset,
    config=config,
)

logger.info("\nTraining completed!")
logger.info(f"Final training results: {result}")

# 4. Evaluate model (โ™ป๏ธ REUSABLE)
logger.info("\n4. Evaluating model...")
evaluator = ModelEvaluator()
eval_result = evaluator.evaluate(
    model=model,
    data=dataset,
    config=config,
)

logger.info("Evaluation completed!")
logger.info(f"Evaluation results: {eval_result}")

# 5. Save model (โ™ป๏ธ REUSABLE)
logger.info("\n5. Saving model...")
model_path = "/tmp/keras_model.keras"

persistence = ModelPersistence(
    path=model_path,
    model=model,
)
persistence.save(
    save_format="keras",
)
logger.info(f"Model saved to: {model_path}")

# 6. Load model (โ™ป๏ธ REUSABLE)
logger.info("\n6. Loading model...")
loaded_model, metadata = persistence.load()
logger.info(f"Model loaded successfully: {type(loaded_model)}")

logger.info("\n" + "=" * 60)
logger.info("Complete!")
logger.info("=" * 60)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Basic TensorFlow usage WITHOUT ZenML.

This example demonstrates the core MLPotion TensorFlow workflow:
1. Load data from CSV
2. Optimize dataset for performance
3. Create a TensorFlow model
4. Train the model
5. Evaluate the model
6. Save and export the model
"""

import tensorflow as tf

from mlpotion.frameworks.tensorflow import (
    CSVDataLoader,
    DatasetOptimizer,
    ModelEvaluator,
    ModelPersistence,
    ModelTrainer,
    ModelTrainingConfig,
)


def main() -> None:
    """Run basic TensorFlow training pipeline."""
    print("=" * 60)
    print("MLPotion - TensorFlow Basic Usage")
    print("=" * 60)

    # 1. Load data
    print("\n1. Loading data...")
    loader = CSVDataLoader(
        file_pattern="examples/data/sample.csv",
        label_name="target",
        batch_size=1,  # Load unbatched, let DatasetOptimizer handle batching
    )
    dataset = loader.load()
    print(f"Dataset: {dataset}")

    # Unbatch the dataset first (since CSVDataLoader batches by default)
    dataset = dataset.unbatch()

    # Transform OrderedDict to single tensor
    def prepare_features(features, label):
        """Convert OrderedDict of features to single tensor."""
        feature_list = [features[key] for key in sorted(features.keys())]
        stacked_features = tf.stack(feature_list, axis=-1)
        return stacked_features, label

    dataset = dataset.map(prepare_features)

    # 2. Optimize dataset
    print("\n2. Optimizing dataset...")
    optimizer = DatasetOptimizer(batch_size=8, shuffle_buffer_size=100)
    dataset = optimizer.optimize(dataset)

    # 3. Create model
    print("\n3. Creating model...")
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Dense(64, activation="relu", input_shape=(10,)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(32, activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(1),
        ]
    )
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="mse",
        metrics=["mae", "mse"],
    )
    print(model.summary())

    # 4. Train model
    print("\n4. Training model...")
    trainer = ModelTrainer()
    config = ModelTrainingConfig(
        epochs=10,
        batch_size=8,
        learning_rate=0.001,
        verbose=1,
    )

    result = trainer.train(
        model=model,
        data=dataset,
        config=config,
    )

    print("\nTraining completed!")
    print(f"{result=}")

    # 5. Evaluate model
    print("\n5. Evaluating model...")
    evaluator = ModelEvaluator()
    from mlpotion.frameworks.tensorflow import ModelEvaluationConfig

    eval_config = ModelEvaluationConfig(batch_size=8, verbose=1)
    eval_result = evaluator.evaluate(
        model=model,
        data=dataset,
        config=eval_config,
    )
    print(f"{eval_result=}")

    # 6. Save model
    print("\n6. Saving model...")
    model_path = "/tmp/tensorflow_model.keras"
    persistence = ModelPersistence(
        path=model_path,
        model=model,
    )
    persistence.save(
        save_format=".keras",
    )
    print(f"Model saved to: {model_path}")

    # 7. Load model
    print("\n7. Loading model...")
    loaded_model, metadata = persistence.load()
    print(f"Model loaded successfully: {type(loaded_model)}")

    print("\n" + "=" * 60)
    print("Complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Basic PyTorch usage WITHOUT ZenML.

This example demonstrates the core MLPotion PyTorch workflow:
1. Load data from CSV
2. Create a PyTorch model
3. Train the model
4. Evaluate the model
5. Save and export the model
"""

import torch
import torch.nn as nn

from mlpotion.frameworks.pytorch import (
    CSVDataset,
    CSVDataLoader,
    ModelEvaluator,
    ModelPersistence,
    ModelTrainer,
    ModelTrainingConfig,
)


class SimpleModel(nn.Module):
    """Simple feedforward neural network.

    Args:
        input_dim: Number of input features.
        hidden_dim: Size of hidden layer.
    """

    def __init__(self, input_dim: int = 10, hidden_dim: int = 64) -> None:
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.dropout1 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(hidden_dim, 32)
        self.dropout2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(32, 1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass through the network."""
        x = torch.relu(self.fc1(x))
        x = self.dropout1(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout2(x)
        return self.fc3(x)


def main() -> None:
    """Run basic PyTorch training pipeline."""
    print("=" * 60)
    print("MLPotion - PyTorch Basic Usage")
    print("=" * 60)

    # 1. Load data
    print("\n1. Loading data...")
    dataset = CSVDataset(
        file_pattern="examples/data/sample.csv",
        label_name="target",
    )
    print(f"Dataset size: {len(dataset)}")

    # 2. Create DataLoader
    print("\n2. Creating DataLoader...")
    factory = CSVDataLoader(batch_size=8, shuffle=True)
    dataloader = factory.load(dataset)

    # 3. Create model
    print("\n3. Creating model...")
    model = SimpleModel(input_dim=10, hidden_dim=64)
    print(model)

    # 4. Train model
    print("\n4. Training model...")
    trainer = ModelTrainer()
    config = ModelTrainingConfig(
        epochs=10,
        learning_rate=0.001,
        device="cuda" if torch.cuda.is_available() else "cpu",
        verbose=1,
    )
    result = trainer.train(
        model=model,
        dataloader=dataloader,
        config=config,
    )

    print("\nTraining completed!")
    print(f"Training time: {result.training_time:.2f}s")
    print(f"Final loss: {result.metrics['loss']:.4f}")

    # 5. Evaluate model
    print("\n5. Evaluating model...")
    evaluator = ModelEvaluator()
    eval_result = evaluator.evaluate(model, dataloader, config)

    print(f"Evaluation completed in {eval_result.evaluation_time:.2f}s")
    print("Evaluation metrics:")
    for metric_name, metric_value in eval_result.metrics.items():
        print(f"  - {metric_name}: {metric_value:.4f}")

    # 6. Save model
    print("\n6. Saving model...")
    persistence = ModelPersistence(
        path="/tmp/pytorch_model.pth",
        model=model,
    )
    model_path = "/tmp/pytorch_model.pth"
    persistence.save()
    print(f"Model saved to: {model_path}")

    # 7. Load model
    print("\n7. Loading model...")
    loaded_model, metadata = persistence.load()
    print(f"Model loaded successfully: {type(loaded_model)}")

    print("\n" + "=" * 60)
    print("Complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()

ZenML Pipeline Examples (MLOps Mode)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""Keras training pipeline WITH ZenML orchestration.

This example demonstrates how to use MLPotion's Keras components
within a ZenML pipeline for reproducible and tracked ML workflows.

Requirements:
    pip install zenml

Setup:
    zenml init  # Initialize ZenML repository
    export ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK=true  # For testing without full stack
"""
import keras
from zenml import pipeline, step

from mlpotion.integrations.zenml.keras.steps import (
    evaluate_model,
    export_model,
    load_data,
    save_model,
    train_model,
)


@step
def create_model() -> keras.Model:
    """Create and compile a Keras model.

    Returns:
        Compiled Keras model ready for training.
    """
    model = keras.Sequential(
        [
            keras.layers.Dense(64, activation="relu", input_shape=(10,)),
            keras.layers.Dropout(0.2),
            keras.layers.Dense(32, activation="relu"),
            keras.layers.Dropout(0.2),
            keras.layers.Dense(1),
        ]
    )

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss="mse",
        metrics=["mae", "mse"],
    )

    return model


@pipeline(enable_cache=False)
def keras_training_pipeline(
    file_path: str = "examples/data/sample.csv",
    label_name: str = "target",
    model_save_path: str = "/tmp/keras_model.keras",
    export_path: str = "/tmp/keras_model_export",
):
    """Complete Keras training pipeline with ZenML.

    This pipeline orchestrates the entire ML workflow:
    1. Load data from CSV
    2. Create and configure model
    3. Train model
    4. Evaluate model
    5. Save model
    6. Export model for deployment

    Args:
        file_path: Path to CSV data file.
        label_name: Name of the target column.
        model_save_path: Path to save the trained model.
        export_path: Path to export the model for serving.
    """
    # Step 1: Load data
    dataset = load_data(
        file_path=file_path,
        label_name=label_name,
        batch_size=8,
        shuffle=True,
    )

    # Step 2: Create model and config
    model = create_model()

    _config_train = {
        "epochs": 10,
        "learning_rate": 0.001,
        "verbose": 1,
    }
    # Step 3: Train model
    trained_model, training_metrics = train_model(
        model=model,
        data=dataset,
        **_config_train,
    )
    # Step 4: Evaluate model
    evaluation_metrics = evaluate_model(
        model=trained_model,
        data=dataset,
        verbose=1,
    )

    # # Step 5: Save model
    save_model(
        model=trained_model,
        save_path=model_save_path,
    )

    # # Step 6: Export model for serving
    export_model(
        model=trained_model,
        export_path=export_path,
        export_format="tf",
    )
    return trained_model, training_metrics, evaluation_metrics


if __name__ == "__main__":
    """Run the Keras ZenML pipeline."""
    print("=" * 60)
    print("MLPotion - Keras ZenML Pipeline")
    print("=" * 60)

    # Initialize ZenML (if not already initialized)
    try:
        from zenml.client import Client

        client = Client()
        print(f"โœ… ZenML initialized. Active stack: {client.active_stack_model.name}")
    except Exception as e:
        print(f"โš ๏ธ  ZenML client error: {e}")
        print("Run 'zenml init' if you haven't already")

    # Run the pipeline
    print("\nRunning ZenML pipeline...")
    result = keras_training_pipeline()

    print("\n" + "=" * 60)
    print("Pipeline completed successfully!")
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""TensorFlow training pipeline WITH ZenML orchestration.

This example demonstrates how to use MLPotion's TensorFlow components
within a ZenML pipeline for reproducible and tracked ML workflows.

Requirements:
    pip install zenml

Setup:
    zenml init  # Initialize ZenML repository
    export ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK=true  # For testing without full stack
"""

import tensorflow as tf
from zenml import pipeline, step

from mlpotion.frameworks.tensorflow import ModelTrainingConfig
from mlpotion.integrations.zenml.tensorflow.steps import (
    evaluate_model,
    export_model,
    load_data,
    optimize_data,
    save_model,
    train_model,
)


@step(enable_cache=False)  # Disable caching to ensure fresh model
def create_model() -> tf.keras.Model:
    """Create and compile a TensorFlow model that accepts dict inputs.

    Returns:
        Compiled TensorFlow/Keras model ready for training.
    """
    # Create inputs for each feature (10 features: feature_0 to feature_9)
    # After batching, make_csv_dataset produces tensors with shape (batch_size,) for each scalar feature
    # The materializer now correctly preserves this shape as (None,) where None is the batch dimension
    inputs = {}
    feature_list = []

    for i in range(10):
        # Each input has shape (1,) per sample after batching and materializer roundtrip
        # The materializer preserves the concrete shape (batch_size, 1)
        inp = tf.keras.Input(shape=(1,), name=f"feature_{i}", dtype=tf.float32)
        inputs[f"feature_{i}"] = inp
        # Already shape (batch_size, 1), no need to reshape
        feature_list.append(inp)

    # Concatenate all features along the last axis
    # This will create shape (batch_size, 10)
    concatenated = tf.keras.layers.Concatenate(axis=-1)(feature_list)

    # Build the model architecture
    x = tf.keras.layers.Dense(64, activation="relu")(concatenated)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Dense(32, activation="relu")(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    outputs = tf.keras.layers.Dense(1)(x)

    # Create the functional model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="mse",
        metrics=["mae", "mse"],
    )

    return model


@step
def create_training_config() -> ModelTrainingConfig:
    """Create training configuration.

    Returns:
        Training configuration with hyperparameters.
    """
    return ModelTrainingConfig(
        epochs=10,
        batch_size=8,
        learning_rate=0.001,
        verbose=1,
    )


@pipeline(enable_cache=False)
def tensorflow_training_pipeline(
    file_path: str = "examples/data/sample.csv",
    label_name: str = "target",
    model_save_path: str = "/tmp/tensorflow_model.keras",
    export_path: str = "/tmp/tensorflow_model_export",
):
    """Complete TensorFlow training pipeline with ZenML.

    This pipeline orchestrates the entire ML workflow:
    1. Load data from CSV
    2. Optimize dataset for performance
    3. Create and configure model
    4. Train model
    5. Evaluate model
    6. Save model
    7. Export model for deployment

    Args:
        file_path: Path to CSV data file.
        label_name: Name of the target column.
        model_save_path: Path to save the trained model.
        export_path: Path to export the model for serving.
    """
    # Step 1: Load data
    dataset = load_data(
        file_path=file_path,
        batch_size=1,
        label_name=label_name,
    )

    # Step 2: Optimize dataset
    optimized_dataset = optimize_data(
        dataset=dataset,
        batch_size=8,
        shuffle_buffer_size=100,
    )

    # Step 3: Create model and config
    model = create_model()

    # Step 4: Train model
    _config_train = {
        "epochs": 10,
        "learning_rate": 0.001,
        "verbose": 1,
    }
    trained_model, training_metrics = train_model(
        model=model,
        dataset=optimized_dataset,
        **_config_train,
    )

    # Step 5: Evaluate model
    evaluation_metrics = evaluate_model(
        model=trained_model,
        dataset=optimized_dataset,
    )

    # Step 6: Save model
    save_model(
        model=trained_model,
        save_path=model_save_path,
    )

    # Step 7: Export model for serving
    export_model(
        model=trained_model,
        export_path=export_path,
        export_format="keras",
    )

    return trained_model, training_metrics, evaluation_metrics


if __name__ == "__main__":
    """Run the TensorFlow ZenML pipeline."""
    print("=" * 60)
    print("MLPotion - TensorFlow ZenML Pipeline")
    print("=" * 60)

    # Run the pipeline
    print("\nRunning ZenML pipeline...")
    result = tensorflow_training_pipeline()

    print("\n" + "=" * 60)
    print("Pipeline completed successfully!")
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""PyTorch training pipeline WITH ZenML orchestration.

This example demonstrates how to use MLPotion's PyTorch components
within a ZenML pipeline for reproducible and tracked ML workflows.

Requirements:
    pip install zenml

Setup:
    zenml init  # Initialize ZenML repository
    export ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK=true  # For testing without full stack
"""

import torch
import torch.nn as nn
from zenml import pipeline, step

from mlpotion.integrations.zenml.pytorch.steps import (
    evaluate_model,
    export_model,
    load_csv_data,
    save_model,
    train_model,
)


class SimpleModel(nn.Module):
    """Simple feedforward neural network.

    Args:
        input_dim: Number of input features.
        hidden_dim: Size of hidden layer.
    """

    def __init__(self, input_dim: int = 10, hidden_dim: int = 64) -> None:
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.dropout1 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(hidden_dim, 32)
        self.dropout2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(32, 1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass through the network."""
        x = torch.relu(self.fc1(x))
        x = self.dropout1(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout2(x)
        return self.fc3(x)


@step
def create_model() -> nn.Module:
    """Create a PyTorch model.

    Returns:
        PyTorch model ready for training.
    """
    model = SimpleModel(input_dim=10, hidden_dim=64)
    return model


@pipeline(enable_cache=False)
def pytorch_training_pipeline(
    file_path: str = "examples/data/sample.csv",
    label_name: str = "target",
    model_save_path: str = "/tmp/pytorch_model.pth",
    export_path: str = "/tmp/pytorch_model_export.pt",
):
    """Complete PyTorch training pipeline with ZenML.

    This pipeline orchestrates the entire ML workflow:
    1. Load data from CSV
    2. Create and configure model
    3. Train model
    4. Evaluate model
    5. Save model
    6. Export model for deployment

    Args:
        file_path: Path to CSV data file.
        label_name: Name of the target column.
        model_save_path: Path to save the trained model.
        export_path: Path to export the model for serving.
    """
    # Step 1: Load data
    dataloader = load_csv_data(
        file_path=file_path,
        label_name=label_name,
        batch_size=8,
        shuffle=True,
    )

    # Step 2: Create model and config
    model = create_model()

    # Step 3: Train model
    _config_train = {
        "epochs": 10,
        "learning_rate": 0.001,
        "verbose": 1,
    }
    model, metrics = train_model(
        model=model,
        dataloader=dataloader,
        **_config_train,
    )

    # Step 4: Evaluate model
    evaluation_metrics = evaluate_model(
        model=model,
        dataloader=dataloader,
    )

    # Step 5: Save model
    save_model(
        model=model,
        save_path=model_save_path,
    )

    # # Step 6: Export model for serving (TorchScript)
    export_model(
        model=model,
        export_path=export_path,
        export_format="torchscript",
    )
    return model, metrics, evaluation_metrics


if __name__ == "__main__":
    """Run the PyTorch ZenML pipeline."""
    print("=" * 60)
    print("MLPotion - PyTorch ZenML Pipeline")
    print("=" * 60)

    # Initialize ZenML (if not already initialized)
    try:
        from zenml.client import Client

        client = Client()
        print(f"โœ… ZenML initialized. Active stack: {client.active_stack_model.name}")
    except Exception as e:
        print(f"โš ๏ธ  ZenML client error: {e}")
        print("Run 'zenml init' if you haven't already")

    # Run the pipeline
    print("\nRunning ZenML pipeline...")
    result = pytorch_training_pipeline()

    print("\n" + "=" * 60)
    print("Pipeline completed successfully!")

Feature Comparison ๐Ÿ“Š

Feature MLPotion Framework-Only All-in-One Solutions
Multi-framework โœ… Yes โŒ No โš ๏ธ Limited
Type Safety โœ… Full โš ๏ธ Partial โš ๏ธ Partial
Modular Install โœ… Yes โŒ No โŒ No
ZenML Native โœ… Yes โŒ Manual โš ๏ธ Adapters
Learning Curve ๐Ÿ“ˆ Gentle ๐Ÿ“ˆ Framework-specific ๐Ÿ“ˆ Steep
Production Ready โœ… Yes โš ๏ธ DIY โœ… Yes
Flexibility ๐ŸŒŸ๐ŸŒŸ๐ŸŒŸ๐ŸŒŸ๐ŸŒŸ ๐ŸŒŸ๐ŸŒŸ๐ŸŒŸ๐ŸŒŸ๐ŸŒŸ ๐ŸŒŸ๐ŸŒŸ

Community & Support ๐Ÿค

What's Next? ๐Ÿ—บ๏ธ

๐Ÿ“š Learn the Basics

New to MLPotion? Start here!

๐Ÿ”ง Framework Guides

Deep dive into your framework

๐ŸŽ“ Tutorials

Learn by building

๐Ÿ“– API Reference

Detailed documentation


Ready to brew some ML magic? Let's get started! ๐Ÿงชโœจ
Built with โค๏ธ for the ML community by ๐Ÿฆ„ UnicoLab