Skip to content

FlowyML Integration API Reference 📖

Complete API reference for MLPotion's FlowyML integration — auto-generated from source code docstrings.

Auto-Generated Documentation

This page is automatically populated with API documentation from the source code. See the FlowyML Integration Guide for usage examples and tutorials.


Core Adapter

mlpotion.integrations.flowyml.adapters

FlowyML Adapter — Bridge MLPotion protocols to FlowyML steps.

Provides factory methods that wrap MLPotion's protocol-compliant components (DataLoader, ModelTrainer, ModelEvaluator) into fully-configured FlowyML Step objects with asset outputs, caching, retry, resource specs, and tags.

Classes

FlowyMLAdapter

Adapt MLPotion components into FlowyML pipeline steps.

Unlike the ZenML adapter which returns raw objects, these steps return FlowyML Asset objects (Dataset, Model, Metrics) with automatic metadata extraction and lineage tracking.

Example::

from mlpotion.frameworks.keras.data.loaders import CSVDataLoader
from mlpotion.integrations.flowyml import FlowyMLAdapter

loader = CSVDataLoader(file_path="data.csv", batch_size=32)
load_step = FlowyMLAdapter.create_data_loader_step(loader)

# Use in a FlowyML pipeline
pipeline = Pipeline("my_pipeline")
pipeline.add_step(load_step)
Functions
create_data_loader_step staticmethod
create_data_loader_step(
    loader: DataLoader[DatasetT],
    *,
    name: str | None = None,
    cache: bool | str | Callable = "code_hash",
    retry: int = 0,
    resources: Any | None = None,
    tags: dict[str, str] | None = None
) -> Step

Wrap a DataLoader as a FlowyML step returning a Dataset asset.

Parameters:

Name Type Description Default
loader DataLoader[DatasetT]

Any MLPotion DataLoader protocol implementation.

required
name str | None

Step name (defaults to 'load_data').

None
cache bool | str | Callable

Caching strategy ('code_hash', 'input_hash', False).

'code_hash'
retry int

Number of retry attempts on failure.

0
resources Any | None

ResourceRequirements for this step.

None
tags dict[str, str] | None

Metadata tags for observability.

None

Returns:

Type Description
Step

A FlowyML Step that loads data and returns a Dataset asset.

Source code in mlpotion/integrations/flowyml/adapters.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@staticmethod
def create_data_loader_step(
    loader: DataLoader[DatasetT],
    *,
    name: str | None = None,
    cache: bool | str | Callable = "code_hash",
    retry: int = 0,
    resources: Any | None = None,
    tags: dict[str, str] | None = None,
) -> Step:
    """Wrap a DataLoader as a FlowyML step returning a Dataset asset.

    Args:
        loader: Any MLPotion DataLoader protocol implementation.
        name: Step name (defaults to 'load_data').
        cache: Caching strategy ('code_hash', 'input_hash', False).
        retry: Number of retry attempts on failure.
        resources: ResourceRequirements for this step.
        tags: Metadata tags for observability.

    Returns:
        A FlowyML Step that loads data and returns a Dataset asset.
    """

    @step(
        name=name or "load_data",
        outputs=["dataset"],
        cache=cache,
        retry=retry,
        resources=resources,
        tags=tags or {"component": "data_loader"},
    )
    def load_data() -> Dataset:
        raw_data = loader.load()
        return Dataset.create(
            data=raw_data,
            name="loaded_dataset",
            tags={"source": type(loader).__name__},
        )

    return load_data
create_training_step staticmethod
create_training_step(
    trainer: ModelTrainer[ModelT, DatasetT],
    *,
    name: str | None = None,
    cache: bool | str | Callable = False,
    retry: int = 0,
    resources: Any | None = None,
    tags: dict[str, str] | None = None
) -> Step

Wrap a ModelTrainer as a FlowyML step returning a Model asset.

Parameters:

Name Type Description Default
trainer ModelTrainer[ModelT, DatasetT]

Any MLPotion ModelTrainer protocol implementation.

required
name str | None

Step name (defaults to 'train_model').

None
cache bool | str | Callable

Caching strategy (default: False for training).

False
retry int

Number of retry attempts on failure.

0
resources Any | None

ResourceRequirements (e.g., GPU config).

None
tags dict[str, str] | None

Metadata tags for observability.

None

Returns:

Type Description
Step

A FlowyML Step that trains a model and returns a Model asset.

Source code in mlpotion/integrations/flowyml/adapters.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@staticmethod
def create_training_step(
    trainer: ModelTrainer[ModelT, DatasetT],
    *,
    name: str | None = None,
    cache: bool | str | Callable = False,
    retry: int = 0,
    resources: Any | None = None,
    tags: dict[str, str] | None = None,
) -> Step:
    """Wrap a ModelTrainer as a FlowyML step returning a Model asset.

    Args:
        trainer: Any MLPotion ModelTrainer protocol implementation.
        name: Step name (defaults to 'train_model').
        cache: Caching strategy (default: False for training).
        retry: Number of retry attempts on failure.
        resources: ResourceRequirements (e.g., GPU config).
        tags: Metadata tags for observability.

    Returns:
        A FlowyML Step that trains a model and returns a Model asset.
    """

    @step(
        name=name or "train_model",
        inputs=["model", "dataset", "config"],
        outputs=["trained_model"],
        cache=cache,
        retry=retry,
        resources=resources,
        tags=tags or {"component": "model_trainer"},
    )
    def train_model(
        model: ModelT,
        dataset: DatasetT,
        config: Any,
        validation_dataset: DatasetT | None = None,
    ) -> Model:
        result = trainer.train(model, dataset, config, validation_dataset)
        return Model.create(
            data=result.model,
            name="trained_model",
            training_history=getattr(result, "history", None),
            tags={"trainer": type(trainer).__name__},
        )

    return train_model
create_evaluation_step staticmethod
create_evaluation_step(
    evaluator: ModelEvaluator[ModelT, DatasetT],
    *,
    name: str | None = None,
    cache: bool | str | Callable = "input_hash",
    retry: int = 0,
    resources: Any | None = None,
    tags: dict[str, str] | None = None
) -> Step

Wrap a ModelEvaluator as a FlowyML step returning a Metrics asset.

Parameters:

Name Type Description Default
evaluator ModelEvaluator[ModelT, DatasetT]

Any MLPotion ModelEvaluator protocol implementation.

required
name str | None

Step name (defaults to 'evaluate_model').

None
cache bool | str | Callable

Caching strategy (default: 'input_hash').

'input_hash'
retry int

Number of retry attempts on failure.

0
resources Any | None

ResourceRequirements for this step.

None
tags dict[str, str] | None

Metadata tags for observability.

None

Returns:

Type Description
Step

A FlowyML Step that evaluates a model and returns a Metrics asset.

Source code in mlpotion/integrations/flowyml/adapters.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@staticmethod
def create_evaluation_step(
    evaluator: ModelEvaluator[ModelT, DatasetT],
    *,
    name: str | None = None,
    cache: bool | str | Callable = "input_hash",
    retry: int = 0,
    resources: Any | None = None,
    tags: dict[str, str] | None = None,
) -> Step:
    """Wrap a ModelEvaluator as a FlowyML step returning a Metrics asset.

    Args:
        evaluator: Any MLPotion ModelEvaluator protocol implementation.
        name: Step name (defaults to 'evaluate_model').
        cache: Caching strategy (default: 'input_hash').
        retry: Number of retry attempts on failure.
        resources: ResourceRequirements for this step.
        tags: Metadata tags for observability.

    Returns:
        A FlowyML Step that evaluates a model and returns a Metrics asset.
    """

    @step(
        name=name or "evaluate_model",
        inputs=["model", "dataset"],
        outputs=["metrics"],
        cache=cache,
        retry=retry,
        resources=resources,
        tags=tags or {"component": "model_evaluator"},
    )
    def evaluate_model(
        model: ModelT,
        dataset: DatasetT,
        config: Any | None = None,
    ) -> Metrics:
        result = evaluator.evaluate(model, dataset, config)
        return Metrics.create(
            name="evaluation_metrics",
            metrics=result.metrics if hasattr(result, "metrics") else result,
            tags={"evaluator": type(evaluator).__name__},
        )

    return evaluate_model

Keras Integration

Steps

mlpotion.integrations.flowyml.keras.steps

FlowyML Keras steps — Full-featured pipeline steps for Keras workflows.

Each step leverages FlowyML's native capabilities: - Artifact-centric design: returns Dataset, Model, Metrics with auto-extraction - Supports caching, retry, GPU resources, tags, DAG wiring, and execution groups - train_model integrates FlowymlKerasCallback for automatic tracking

Classes

Functions

load_data

load_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str | None = None,
    column_names: list[str] | None = None,
    shuffle: bool = True,
    dtype: str = "float32",
) -> Dataset

Load CSV data into a Keras-compatible CSVSequence, wrapped as a Dataset asset.

Automatic metadata extraction captures batch count, batch size, source path, column names, and label information.

Parameters:

Name Type Description Default
file_path str

Glob pattern for CSV files (e.g., "data/*.csv").

required
batch_size int

Batch size for the sequence.

32
label_name str | None

Name of the label/target column.

None
column_names list[str] | None

Specific columns to load (None = all).

None
shuffle bool

Whether to shuffle the data.

True
dtype str

Data type for numeric conversion.

'float32'

Returns:

Type Description
Dataset

Dataset asset wrapping the CSVSequence with auto-extracted metadata.

Source code in mlpotion/integrations/flowyml/keras/steps.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@step(
    name="keras_load_data",
    outputs=["dataset"],
    cache="code_hash",
    tags={"framework": "keras", "component": "data_loader"},
)
def load_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str | None = None,
    column_names: list[str] | None = None,
    shuffle: bool = True,
    dtype: str = "float32",
) -> Dataset:
    """Load CSV data into a Keras-compatible CSVSequence, wrapped as a Dataset asset.

    Automatic metadata extraction captures batch count, batch size, source path,
    column names, and label information.

    Args:
        file_path: Glob pattern for CSV files (e.g., "data/*.csv").
        batch_size: Batch size for the sequence.
        label_name: Name of the label/target column.
        column_names: Specific columns to load (None = all).
        shuffle: Whether to shuffle the data.
        dtype: Data type for numeric conversion.

    Returns:
        Dataset asset wrapping the CSVSequence with auto-extracted metadata.
    """
    config = DataLoadingConfig(
        file_pattern=file_path,
        batch_size=batch_size,
        column_names=column_names,
        label_name=label_name,
        shuffle=shuffle,
        dtype=dtype,
    )
    loader = CSVDataLoader(**config.dict())
    sequence = loader.load()

    dataset = Dataset.create(
        data=sequence,
        name="keras_csv_dataset",
        properties={
            "source": file_path,
            "batch_size": batch_size,
            "batches": len(sequence),
            "label_name": label_name,
            "shuffle": shuffle,
            "dtype": dtype,
        },
        source=file_path,
        loader="CSVDataLoader",
        framework="keras",
    )

    logger.info(
        f"📦 Loaded dataset: {len(sequence)} batches, "
        f"batch_size={batch_size}, source={file_path}"
    )
    return dataset

transform_data

transform_data(
    dataset: Dataset,
    model: keras.Model,
    data_output_path: str,
    data_output_per_batch: bool = False,
    batch_size: int | None = None,
    feature_names: list[str] | None = None,
    input_columns: list[str] | None = None,
) -> Dataset

Transform data using a Keras model and save predictions to CSV.

Returns a Dataset asset with lineage linked to the input dataset.

Parameters:

Name Type Description Default
dataset Dataset

Input Dataset asset wrapping a CSVSequence.

required
model keras.Model

Keras model for generating predictions.

required
data_output_path str

Output path for transformed data.

required
data_output_per_batch bool

If True, output one file per batch.

False
batch_size int | None

Optional batch size override.

None
feature_names list[str] | None

Optional feature names for output CSV.

None
input_columns list[str] | None

Optional input columns to pass to model.

None

Returns:

Type Description
Dataset

Dataset asset pointing to the output CSV with parent lineage.

Source code in mlpotion/integrations/flowyml/keras/steps.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@step(
    name="keras_transform_data",
    inputs=["dataset"],
    outputs=["transformed"],
    cache="code_hash",
    tags={"framework": "keras", "component": "data_transformer"},
)
def transform_data(
    dataset: Dataset,
    model: keras.Model,
    data_output_path: str,
    data_output_per_batch: bool = False,
    batch_size: int | None = None,
    feature_names: list[str] | None = None,
    input_columns: list[str] | None = None,
) -> Dataset:
    """Transform data using a Keras model and save predictions to CSV.

    Returns a Dataset asset with lineage linked to the input dataset.

    Args:
        dataset: Input Dataset asset wrapping a CSVSequence.
        model: Keras model for generating predictions.
        data_output_path: Output path for transformed data.
        data_output_per_batch: If True, output one file per batch.
        batch_size: Optional batch size override.
        feature_names: Optional feature names for output CSV.
        input_columns: Optional input columns to pass to model.

    Returns:
        Dataset asset pointing to the output CSV with parent lineage.
    """
    # Extract raw sequence from Dataset if wrapped
    raw_dataset = dataset.data if isinstance(dataset, Dataset) else dataset

    config = DataTransformationConfig(
        data_output_path=data_output_path,
        data_output_per_batch=data_output_per_batch,
        batch_size=batch_size,
        feature_names=feature_names,
        input_columns=input_columns,
    )
    transformer = CSVDataTransformer(
        dataset=raw_dataset,
        model=model,
        data_output_path=data_output_path,
        data_output_per_batch=data_output_per_batch,
        batch_size=batch_size,
        feature_names=feature_names,
        input_columns=input_columns,
    )
    transformer.transform(dataset=raw_dataset, model=model, config=config)

    # Wrap output as Dataset with parent lineage
    parent = dataset if isinstance(dataset, Dataset) else None
    transformed = Dataset.create(
        data={"output_path": data_output_path},
        name="keras_transformed_data",
        parent=parent,
        properties={
            "output_path": data_output_path,
            "per_batch": data_output_per_batch,
            "feature_names": feature_names,
        },
        source=data_output_path,
        transformer="CSVDataTransformer",
    )

    logger.info(f"🔄 Transformed data saved to: {data_output_path}")
    return transformed

train_model

train_model(
    model: keras.Model,
    data: CSVSequence | Dataset,
    epochs: int = 10,
    learning_rate: float = 0.001,
    verbose: int = 1,
    validation_data: CSVSequence | Dataset | None = None,
    callbacks: list[keras.callbacks.Callback] | None = None,
    experiment_name: str | None = None,
    project: str | None = None,
    log_model: bool = True,
) -> tuple[Model, Metrics]

Train a Keras model with FlowyML tracking integration.

Automatically attaches a FlowymlKerasCallback for: - Dynamic capture of ALL training metrics - Live dashboard updates - Model artifact logging

Returns a Model asset (via Model.from_keras with auto-extracted metadata) and a Metrics asset with training history.

Parameters:

Name Type Description Default
model keras.Model

Compiled Keras model.

required
data CSVSequence | Dataset

Training data as CSVSequence or Dataset asset.

required
epochs int

Number of training epochs.

10
learning_rate float

Learning rate.

0.001
verbose int

Keras verbosity level.

1
validation_data CSVSequence | Dataset | None

Optional validation CSVSequence or Dataset.

None
callbacks list[keras.callbacks.Callback] | None

Additional Keras callbacks (FlowyML callback auto-added).

None
experiment_name str | None

Experiment name for FlowyML tracking.

None
project str | None

Project name for FlowyML dashboard.

None
log_model bool

Whether to save model artifact after training.

True

Returns:

Type Description
tuple[Model, Metrics]

Tuple of (Model asset, Metrics asset).

Source code in mlpotion/integrations/flowyml/keras/steps.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
@step(
    name="keras_train_model",
    inputs=["dataset"],
    outputs=["model", "training_metrics"],
    cache=False,
    retry=1,
    tags={"framework": "keras", "component": "model_trainer"},
)
def train_model(
    model: keras.Model,
    data: CSVSequence | Dataset,
    epochs: int = 10,
    learning_rate: float = 0.001,
    verbose: int = 1,
    validation_data: CSVSequence | Dataset | None = None,
    callbacks: list[keras.callbacks.Callback] | None = None,
    experiment_name: str | None = None,
    project: str | None = None,
    log_model: bool = True,
) -> tuple[Model, Metrics]:
    """Train a Keras model with FlowyML tracking integration.

    Automatically attaches a FlowymlKerasCallback for:
    - Dynamic capture of ALL training metrics
    - Live dashboard updates
    - Model artifact logging

    Returns a Model asset (via Model.from_keras with auto-extracted metadata)
    and a Metrics asset with training history.

    Args:
        model: Compiled Keras model.
        data: Training data as CSVSequence or Dataset asset.
        epochs: Number of training epochs.
        learning_rate: Learning rate.
        verbose: Keras verbosity level.
        validation_data: Optional validation CSVSequence or Dataset.
        callbacks: Additional Keras callbacks (FlowyML callback auto-added).
        experiment_name: Experiment name for FlowyML tracking.
        project: Project name for FlowyML dashboard.
        log_model: Whether to save model artifact after training.

    Returns:
        Tuple of (Model asset, Metrics asset).
    """
    # Extract raw data from Dataset if wrapped
    raw_data = data.data if isinstance(data, Dataset) else data
    raw_val = (
        validation_data.data
        if isinstance(validation_data, Dataset)
        else validation_data
    )

    all_callbacks = list(callbacks or [])

    # Auto-attach FlowyML callback for tracking
    flowyml_callback = FlowymlKerasCallback(
        experiment_name=experiment_name or f"keras_train_{uuid.uuid4()}",
        project=project,
        log_model=log_model,
    )
    all_callbacks.append(flowyml_callback)

    config = ModelTrainingConfig(
        epochs=epochs,
        learning_rate=learning_rate,
        verbose=verbose,
        optimizer="adam",
        loss="mse",
        metrics=["mae"],
        framework_options={"callbacks": all_callbacks} if all_callbacks else {},
    )

    trainer = ModelTrainer()
    result = trainer.train(
        model=model,
        dataset=raw_data,
        config=config,
        validation_dataset=raw_val,
    )

    # Collect raw metrics
    raw_metrics: dict[str, Any] = result.metrics if hasattr(result, "metrics") else {}
    if hasattr(result, "history") and result.history:
        raw_metrics["history"] = result.history
    raw_metrics["epochs_completed"] = epochs
    raw_metrics["learning_rate"] = learning_rate

    # Wrap as Model asset using from_keras for full auto-extraction
    model_asset = Model.from_keras(
        model,
        name="keras_trained_model",
        callback=flowyml_callback,
        epochs_requested=epochs,
        batch_size=getattr(raw_data, "batch_size", None),
    )

    # Wrap as Metrics asset
    metrics_asset = Metrics.create(
        metrics=raw_metrics,
        name="keras_training_metrics",
        tags={"stage": "training", "framework": "keras"},
        properties={
            "epochs": epochs,
            "learning_rate": learning_rate,
            **{k: v for k, v in raw_metrics.items() if k != "history"},
        },
    )

    logger.info(
        f"🎯 Training complete: {epochs} epochs, "
        f"metrics captured: {list(raw_metrics.keys())}"
    )
    return model_asset, metrics_asset

evaluate_model

evaluate_model(
    model: keras.Model | Model,
    data: CSVSequence | Dataset,
    verbose: int = 0,
) -> Metrics

Evaluate a Keras model and return a Metrics asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Trained Keras model or Model asset.

required
data CSVSequence | Dataset

Evaluation data as CSVSequence or Dataset asset.

required
verbose int

Keras verbosity level.

0

Returns:

Type Description
Metrics

Metrics asset with evaluation results.

Source code in mlpotion/integrations/flowyml/keras/steps.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
@step(
    name="keras_evaluate_model",
    inputs=["model", "dataset"],
    outputs=["metrics"],
    cache="input_hash",
    tags={"framework": "keras", "component": "model_evaluator"},
)
def evaluate_model(
    model: keras.Model | Model,
    data: CSVSequence | Dataset,
    verbose: int = 0,
) -> Metrics:
    """Evaluate a Keras model and return a Metrics asset.

    Args:
        model: Trained Keras model or Model asset.
        data: Evaluation data as CSVSequence or Dataset asset.
        verbose: Keras verbosity level.

    Returns:
        Metrics asset with evaluation results.
    """
    # Extract raw objects from assets
    raw_model = model.data if isinstance(model, Model) else model
    raw_data = data.data if isinstance(data, Dataset) else data

    config = ModelEvaluationConfig(verbose=verbose)
    evaluator = ModelEvaluator()
    result = evaluator.evaluate(model=raw_model, dataset=raw_data, config=config)

    raw_metrics = result.metrics if hasattr(result, "metrics") else {}

    metrics_asset = Metrics.create(
        metrics=raw_metrics,
        name="keras_evaluation_metrics",
        tags={"stage": "evaluation", "framework": "keras"},
        properties=raw_metrics,
    )

    logger.info(f"📊 Evaluation: {raw_metrics}")
    return metrics_asset

export_model

export_model(
    model: keras.Model | Model,
    export_path: str,
    export_format: str | None = None,
) -> Model

Export a Keras model to the specified format, returned as a Model asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Keras model or Model asset to export.

required
export_path str

Destination path.

required
export_format str | None

Format ('keras', 'saved_model', 'tflite').

None

Returns:

Type Description
Model

Model asset with export metadata.

Source code in mlpotion/integrations/flowyml/keras/steps.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
@step(
    name="keras_export_model",
    inputs=["model"],
    outputs=["exported_model"],
    cache="code_hash",
    tags={"framework": "keras", "component": "model_exporter"},
)
def export_model(
    model: keras.Model | Model,
    export_path: str,
    export_format: str | None = None,
) -> Model:
    """Export a Keras model to the specified format, returned as a Model asset.

    Args:
        model: Keras model or Model asset to export.
        export_path: Destination path.
        export_format: Format ('keras', 'saved_model', 'tflite').

    Returns:
        Model asset with export metadata.
    """
    raw_model = model.data if isinstance(model, Model) else model

    exporter = ModelExporter()
    config = {}
    if export_format:
        config["export_format"] = export_format
    exporter.export(model=raw_model, path=export_path, **config)

    model_asset = Model.from_keras(
        raw_model,
        name="keras_exported_model",
        export_path=export_path,
        export_format=export_format or "keras",
    )

    logger.info(f"📤 Exported model to: {export_path}")
    return model_asset

save_model

save_model(
    model: keras.Model | Model, save_path: str
) -> Model

Save a Keras model to disk, returned as a Model asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Keras model or Model asset to save.

required
save_path str

Destination file path.

required

Returns:

Type Description
Model

Model asset with save location metadata.

Source code in mlpotion/integrations/flowyml/keras/steps.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
@step(
    name="keras_save_model",
    inputs=["model"],
    outputs=["saved_model"],
    tags={"framework": "keras", "component": "model_persistence"},
)
def save_model(
    model: keras.Model | Model,
    save_path: str,
) -> Model:
    """Save a Keras model to disk, returned as a Model asset.

    Args:
        model: Keras model or Model asset to save.
        save_path: Destination file path.

    Returns:
        Model asset with save location metadata.
    """
    raw_model = model.data if isinstance(model, Model) else model

    persistence = ModelPersistence(path=save_path, model=raw_model)
    persistence.save()

    model_asset = Model.from_keras(
        raw_model,
        name="keras_saved_model",
        save_path=save_path,
    )

    logger.info(f"💾 Saved model to: {save_path}")
    return model_asset

load_model

load_model(model_path: str, inspect: bool = False) -> Model

Load a Keras model from disk, returned as a Model asset.

Parameters:

Name Type Description Default
model_path str

Path to the saved model.

required
inspect bool

If True, log model inspection info.

False

Returns:

Type Description
Model

Model asset wrapping the loaded Keras model.

Source code in mlpotion/integrations/flowyml/keras/steps.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
@step(
    name="keras_load_model",
    outputs=["model"],
    cache="code_hash",
    tags={"framework": "keras", "component": "model_persistence"},
)
def load_model(
    model_path: str,
    inspect: bool = False,
) -> Model:
    """Load a Keras model from disk, returned as a Model asset.

    Args:
        model_path: Path to the saved model.
        inspect: If True, log model inspection info.

    Returns:
        Model asset wrapping the loaded Keras model.
    """
    persistence = ModelPersistence(path=model_path)
    raw_model, inspection = persistence.load(inspect=inspect)

    model_asset = Model.from_keras(
        raw_model,
        name="keras_loaded_model",
        source_path=model_path,
    )

    if inspect and inspection:
        logger.info(f"🔍 Loaded model from: {model_path}, inspection: {inspection}")
    else:
        logger.info(f"🔍 Loaded model from: {model_path}")

    return model_asset

inspect_model

inspect_model(
    model: keras.Model | Model,
    include_layers: bool = True,
    include_signatures: bool = True,
) -> Metrics

Inspect a Keras model and return detailed metadata as a Metrics asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Keras model or Model asset to inspect.

required
include_layers bool

Include per-layer information.

True
include_signatures bool

Include input/output signatures.

True

Returns:

Type Description
Metrics

Metrics asset with model inspection details.

Source code in mlpotion/integrations/flowyml/keras/steps.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
@step(
    name="keras_inspect_model",
    inputs=["model"],
    outputs=["inspection"],
    tags={"framework": "keras", "component": "model_inspector"},
)
def inspect_model(
    model: keras.Model | Model,
    include_layers: bool = True,
    include_signatures: bool = True,
) -> Metrics:
    """Inspect a Keras model and return detailed metadata as a Metrics asset.

    Args:
        model: Keras model or Model asset to inspect.
        include_layers: Include per-layer information.
        include_signatures: Include input/output signatures.

    Returns:
        Metrics asset with model inspection details.
    """
    raw_model = model.data if isinstance(model, Model) else model

    inspector = ModelInspector(
        include_layers=include_layers,
        include_signatures=include_signatures,
    )
    inspection = inspector.inspect(raw_model)

    metrics_asset = Metrics.create(
        metrics=inspection,
        name="keras_model_inspection",
        tags={"stage": "inspection", "framework": "keras"},
        properties={
            "model_name": inspection.get("name", "unknown"),
            "total_params": inspection.get("parameters", {}).get("total"),
        },
    )

    logger.info(
        f"🔎 Model: {inspection.get('name', 'unknown')}, "
        f"params: {inspection.get('parameters', {}).get('total', '?')}"
    )
    return metrics_asset

Pipelines

mlpotion.integrations.flowyml.keras.pipelines

Pre-built FlowyML pipeline templates for Keras workflows.

Provides ready-to-run pipelines that wire MLPotion Keras steps together with proper DAG dependencies, context injection, experiment tracking, and optional scheduling.

Available pipelines:

  • create_keras_training_pipeline — Load → Train → Evaluate
  • create_keras_full_pipeline — Load → Transform → Train → Evaluate → Export
  • create_keras_evaluation_pipeline — Load model + data → Evaluate → Inspect
  • create_keras_export_pipeline — Load model → Export + Save
  • create_keras_experiment_pipeline — Full pipeline with experiment tracking & conditional deploy

Functions

create_keras_training_pipeline

create_keras_training_pipeline(
    name: str = "keras_training",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline

Create a ready-to-run Keras training pipeline.

DAG: load_data → train_model → evaluate_model

Provide hyperparameters via the context object::

from flowyml.core.context import Context

ctx = Context(
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    epochs=10,
    learning_rate=0.001,
    experiment_name="my-experiment",
)

pipeline = create_keras_training_pipeline(
    name="my_training",
    context=ctx,
    project_name="my_project",
)
result = pipeline.run()

Parameters:

Name Type Description Default
name str

Pipeline name.

'keras_training'
context Context | None

FlowyML Context with parameters to inject.

None
enable_cache bool

Whether to enable step caching.

True
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string for versioned pipeline.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/keras/pipelines.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def create_keras_training_pipeline(
    name: str = "keras_training",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline:
    """Create a ready-to-run Keras training pipeline.

    **DAG**: ``load_data → train_model → evaluate_model``

    Provide hyperparameters via the context object::

        from flowyml.core.context import Context

        ctx = Context(
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            epochs=10,
            learning_rate=0.001,
            experiment_name="my-experiment",
        )

        pipeline = create_keras_training_pipeline(
            name="my_training",
            context=ctx,
            project_name="my_project",
        )
        result = pipeline.run()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters to inject.
        enable_cache: Whether to enable step caching.
        project_name: Project to attach this pipeline to.
        version: Optional version string for versioned pipeline.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        project_name=project_name,
        version=version,
    )

    pipeline.add_step(load_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)

    return pipeline

create_keras_full_pipeline

create_keras_full_pipeline(
    name: str = "keras_full",
    context: Context | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline

Create a full Keras pipeline covering the entire ML lifecycle.

DAG: load_data → transform_data → train_model → evaluate_model → export_model

Includes checkpointing for long-running training steps so the pipeline can resume from the last checkpoint on failure.

Context parameters::

ctx = Context(
    # Data loading
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    # Transformation
    data_output_path="data/transformed/",
    # Training
    epochs=50,
    learning_rate=0.001,
    experiment_name="full-run",
    project="my_project",
    # Export
    export_path="models/production/",
    export_format="keras",
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'keras_full'
context Context | None

FlowyML Context with parameters.

None
enable_cache bool

Whether to enable step caching.

True
enable_checkpointing bool

Whether to enable checkpointing.

True
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/keras/pipelines.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def create_keras_full_pipeline(
    name: str = "keras_full",
    context: Context | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline:
    """Create a full Keras pipeline covering the entire ML lifecycle.

    **DAG**: ``load_data → transform_data → train_model → evaluate_model → export_model``

    Includes checkpointing for long-running training steps so the pipeline
    can resume from the last checkpoint on failure.

    Context parameters::

        ctx = Context(
            # Data loading
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            # Transformation
            data_output_path="data/transformed/",
            # Training
            epochs=50,
            learning_rate=0.001,
            experiment_name="full-run",
            project="my_project",
            # Export
            export_path="models/production/",
            export_format="keras",
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        enable_cache: Whether to enable step caching.
        enable_checkpointing: Whether to enable checkpointing.
        project_name: Project to attach this pipeline to.
        version: Optional version string.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        enable_checkpointing=enable_checkpointing,
        project_name=project_name,
        version=version,
    )

    pipeline.add_step(load_data)
    pipeline.add_step(transform_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(export_model)

    return pipeline

create_keras_evaluation_pipeline

create_keras_evaluation_pipeline(
    name: str = "keras_evaluation",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
) -> Pipeline

Create a pipeline for evaluating an existing Keras model.

DAG: load_model → load_data → evaluate_model → inspect_model

Useful for model validation, A/B testing, and periodic evaluation against new data without retraining.

Context parameters::

ctx = Context(
    model_path="models/production/model.keras",
    file_path="data/test.csv",
    label_name="target",
    batch_size=64,
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'keras_evaluation'
context Context | None

FlowyML Context with parameters.

None
enable_cache bool

Whether to enable step caching.

True
project_name str | None

Project to attach this pipeline to.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/keras/pipelines.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def create_keras_evaluation_pipeline(
    name: str = "keras_evaluation",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
) -> Pipeline:
    """Create a pipeline for evaluating an existing Keras model.

    **DAG**: ``load_model → load_data → evaluate_model → inspect_model``

    Useful for model validation, A/B testing, and periodic evaluation
    against new data without retraining.

    Context parameters::

        ctx = Context(
            model_path="models/production/model.keras",
            file_path="data/test.csv",
            label_name="target",
            batch_size=64,
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        enable_cache: Whether to enable step caching.
        project_name: Project to attach this pipeline to.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        project_name=project_name,
    )

    pipeline.add_step(load_model)
    pipeline.add_step(load_data)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(inspect_model)

    return pipeline

create_keras_export_pipeline

create_keras_export_pipeline(
    name: str = "keras_export",
    context: Context | None = None,
    project_name: str | None = None,
) -> Pipeline

Create a pipeline for exporting and saving an existing model.

DAG: load_model → export_model, save_model

Useful for converting a trained model to multiple formats (SavedModel, TFLite, Keras) and persisting to different locations.

Context parameters::

ctx = Context(
    model_path="models/trained/model.keras",
    export_path="models/exported/",
    export_format="saved_model",
    save_path="models/backup/model.keras",
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'keras_export'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/keras/pipelines.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def create_keras_export_pipeline(
    name: str = "keras_export",
    context: Context | None = None,
    project_name: str | None = None,
) -> Pipeline:
    """Create a pipeline for exporting and saving an existing model.

    **DAG**: ``load_model → export_model, save_model``

    Useful for converting a trained model to multiple formats
    (SavedModel, TFLite, Keras) and persisting to different locations.

    Context parameters::

        ctx = Context(
            model_path="models/trained/model.keras",
            export_path="models/exported/",
            export_format="saved_model",
            save_path="models/backup/model.keras",
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,  # Always re-export
        project_name=project_name,
    )

    pipeline.add_step(load_model)
    pipeline.add_step(export_model)
    pipeline.add_step(save_model)

    return pipeline

create_keras_experiment_pipeline

create_keras_experiment_pipeline(
    name: str = "keras_experiment",
    context: Context | None = None,
    project_name: str | None = None,
    version: str | None = None,
    deploy_threshold: float = 0.8,
    threshold_metric: str = "accuracy",
) -> Pipeline

Create a full experiment pipeline with tracking and conditional deployment.

DAG::

load_data → train_model → evaluate_model
                               ↓
                      [if metric > threshold]
                               ↓
                      export_model → save_model

Integrates FlowyML experiment tracking and conditionally exports the model only if validation metrics exceed the given threshold.

Context parameters::

ctx = Context(
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    epochs=30,
    learning_rate=0.001,
    experiment_name="experiment-v1",
    project="my_project",
    export_path="models/production/",
    save_path="models/checkpoints/model.keras",
)

pipeline = create_keras_experiment_pipeline(
    context=ctx,
    project_name="my_project",
    deploy_threshold=0.85,
    threshold_metric="accuracy",
)
result = pipeline.run()

Parameters:

Name Type Description Default
name str

Pipeline name.

'keras_experiment'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string.

None
deploy_threshold float

Minimum metric value to trigger deployment.

0.8
threshold_metric str

Which metric to check against the threshold.

'accuracy'

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/keras/pipelines.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def create_keras_experiment_pipeline(
    name: str = "keras_experiment",
    context: Context | None = None,
    project_name: str | None = None,
    version: str | None = None,
    deploy_threshold: float = 0.8,
    threshold_metric: str = "accuracy",
) -> Pipeline:
    """Create a full experiment pipeline with tracking and conditional deployment.

    **DAG**::

        load_data → train_model → evaluate_model

                              [if metric > threshold]

                              export_model → save_model

    Integrates FlowyML experiment tracking and conditionally exports the
    model only if validation metrics exceed the given threshold.

    Context parameters::

        ctx = Context(
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            epochs=30,
            learning_rate=0.001,
            experiment_name="experiment-v1",
            project="my_project",
            export_path="models/production/",
            save_path="models/checkpoints/model.keras",
        )

        pipeline = create_keras_experiment_pipeline(
            context=ctx,
            project_name="my_project",
            deploy_threshold=0.85,
            threshold_metric="accuracy",
        )
        result = pipeline.run()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.
        version: Optional version string.
        deploy_threshold: Minimum metric value to trigger deployment.
        threshold_metric: Which metric to check against the threshold.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    from flowyml.core.conditional import If

    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,
        enable_experiment_tracking=True,
        enable_checkpointing=True,
        project_name=project_name,
        version=version,
    )

    # Core training DAG
    pipeline.add_step(load_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)

    # Conditional deployment: only export if metric exceeds threshold
    deploy_condition = If(
        condition=lambda metrics: (
            metrics.get_metric(threshold_metric, 0) >= deploy_threshold
            if hasattr(metrics, "get_metric")
            else metrics.get(threshold_metric, 0) >= deploy_threshold
        ),
        then_steps=[export_model, save_model],
        name=f"deploy_if_{threshold_metric}_above_{deploy_threshold}",
    )
    pipeline.control_flows.append(deploy_condition)

    return pipeline

create_keras_scheduled_pipeline

create_keras_scheduled_pipeline(
    name: str = "keras_scheduled_retraining",
    context: Context | None = None,
    project_name: str | None = None,
    schedule: str = "0 2 * * 0",
    timezone: str = "UTC",
) -> dict[str, Any]

Create a scheduled retraining pipeline.

Returns both the pipeline and a configured scheduler so you can register periodic retraining (e.g., weekly) with a single call.

DAG: load_data → train_model → evaluate_model → export_model

Schedule format uses cron syntax (default: every Sunday at 2 AM)::

pipeline_info = create_keras_scheduled_pipeline(
    context=ctx,
    project_name="my_project",
    schedule="0 2 * * 0",  # Weekly
)

# Access the components
pipeline = pipeline_info["pipeline"]
scheduler = pipeline_info["scheduler"]

# Run once immediately
result = pipeline.run()

# Or start the scheduler for automatic retraining
scheduler.start()

Parameters:

Name Type Description Default
name str

Pipeline name.

'keras_scheduled_retraining'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None
schedule str

Cron expression for scheduling.

'0 2 * * 0'
timezone str

Timezone for the schedule.

'UTC'

Returns:

Type Description
dict[str, Any]

Dict with pipeline and scheduler keys.

Source code in mlpotion/integrations/flowyml/keras/pipelines.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def create_keras_scheduled_pipeline(
    name: str = "keras_scheduled_retraining",
    context: Context | None = None,
    project_name: str | None = None,
    schedule: str = "0 2 * * 0",
    timezone: str = "UTC",
) -> dict[str, Any]:
    """Create a scheduled retraining pipeline.

    Returns both the pipeline and a configured scheduler so you can
    register periodic retraining (e.g., weekly) with a single call.

    **DAG**: ``load_data → train_model → evaluate_model → export_model``

    Schedule format uses cron syntax (default: every Sunday at 2 AM)::

        pipeline_info = create_keras_scheduled_pipeline(
            context=ctx,
            project_name="my_project",
            schedule="0 2 * * 0",  # Weekly
        )

        # Access the components
        pipeline = pipeline_info["pipeline"]
        scheduler = pipeline_info["scheduler"]

        # Run once immediately
        result = pipeline.run()

        # Or start the scheduler for automatic retraining
        scheduler.start()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.
        schedule: Cron expression for scheduling.
        timezone: Timezone for the schedule.

    Returns:
        Dict with ``pipeline`` and ``scheduler`` keys.
    """
    from flowyml.core.scheduler import PipelineScheduler

    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,  # Fresh data each run
        enable_checkpointing=True,
        project_name=project_name,
    )

    pipeline.add_step(load_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(export_model)

    # Configure scheduler
    scheduler = PipelineScheduler()
    scheduler.schedule(
        pipeline=pipeline,
        cron=schedule,
        timezone=timezone,
    )

    return {
        "pipeline": pipeline,
        "scheduler": scheduler,
    }

PyTorch Integration

Steps

mlpotion.integrations.flowyml.pytorch.steps

FlowyML PyTorch steps — Full-featured pipeline steps for PyTorch workflows.

Each step leverages FlowyML's native capabilities: - Artifact-centric design: returns Dataset, Model, Metrics with auto-extraction - Supports caching, retry, GPU resources, tags, DAG wiring, and execution groups - Returns framework-native objects wrapped as FlowyML assets

Classes

Functions

load_csv_data

load_csv_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str | None = None,
    column_names: list[str] | None = None,
    shuffle: bool = True,
    num_workers: int = 0,
    pin_memory: bool = False,
    drop_last: bool = False,
    dtype: str = "float32",
) -> Dataset

Load CSV data into a PyTorch DataLoader, wrapped as a Dataset asset.

Automatic metadata extraction captures batch size, source path, column names, and worker configuration.

Parameters:

Name Type Description Default
file_path str

Glob pattern for CSV files.

required
batch_size int

Batch size.

32
label_name str | None

Target column name.

None
column_names list[str] | None

Specific columns to load.

None
shuffle bool

Whether to shuffle.

True
num_workers int

Number of data loading workers.

0
pin_memory bool

Pin memory for faster GPU transfer.

False
drop_last bool

Drop the last incomplete batch.

False
dtype str

Data type for tensors.

'float32'

Returns:

Type Description
Dataset

Dataset asset wrapping the PyTorch DataLoader.

Source code in mlpotion/integrations/flowyml/pytorch/steps.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@step(
    name="pytorch_load_csv_data",
    outputs=["dataset"],
    cache="code_hash",
    tags={"framework": "pytorch", "component": "data_loader"},
)
def load_csv_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str | None = None,
    column_names: list[str] | None = None,
    shuffle: bool = True,
    num_workers: int = 0,
    pin_memory: bool = False,
    drop_last: bool = False,
    dtype: str = "float32",
) -> Dataset:
    """Load CSV data into a PyTorch DataLoader, wrapped as a Dataset asset.

    Automatic metadata extraction captures batch size, source path,
    column names, and worker configuration.

    Args:
        file_path: Glob pattern for CSV files.
        batch_size: Batch size.
        label_name: Target column name.
        column_names: Specific columns to load.
        shuffle: Whether to shuffle.
        num_workers: Number of data loading workers.
        pin_memory: Pin memory for faster GPU transfer.
        drop_last: Drop the last incomplete batch.
        dtype: Data type for tensors.

    Returns:
        Dataset asset wrapping the PyTorch DataLoader.
    """
    # Convert dtype string to torch.dtype
    torch_dtype = getattr(torch, dtype)

    # Create dataset
    csv_dataset = CSVDataset(
        file_pattern=file_path,
        column_names=column_names,
        label_name=label_name,
        dtype=torch_dtype,
    )

    # Create DataLoader config
    config = DataLoadingConfig(
        file_pattern=file_path,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
        pin_memory=pin_memory,
        drop_last=drop_last,
    )

    # Create DataLoader using factory
    loader_factory = CSVDataLoader(**config.dict(exclude={"file_pattern", "config"}))
    dataloader = loader_factory.load(csv_dataset)

    dataset_asset = Dataset.create(
        data=dataloader,
        name="pytorch_csv_dataset",
        properties={
            "source": file_path,
            "batch_size": batch_size,
            "label_name": label_name,
            "shuffle": shuffle,
            "num_workers": num_workers,
            "pin_memory": pin_memory,
            "dtype": dtype,
        },
        source=file_path,
        loader="CSVDataLoader",
        framework="pytorch",
    )

    logger.info(
        f"📦 Loaded PyTorch DataLoader: batch_size={batch_size}, source={file_path}"
    )
    return dataset_asset

load_streaming_csv_data

load_streaming_csv_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str | None = None,
    column_names: list[str] | None = None,
    num_workers: int = 0,
    pin_memory: bool = False,
    chunksize: int = 10000,
    dtype: str = "float32",
) -> Dataset

Load large CSV data via streaming into a PyTorch DataLoader, wrapped as a Dataset asset.

Uses chunked reading for datasets that don't fit in memory.

Parameters:

Name Type Description Default
file_path str

Glob pattern for CSV files.

required
batch_size int

Batch size.

32
label_name str | None

Target column name.

None
column_names list[str] | None

Specific columns.

None
num_workers int

Number of data loading workers.

0
pin_memory bool

Pin memory for faster GPU transfer.

False
chunksize int

Number of rows per chunk.

10000
dtype str

Data type for tensors.

'float32'

Returns:

Type Description
Dataset

Dataset asset wrapping the streaming PyTorch DataLoader.

Source code in mlpotion/integrations/flowyml/pytorch/steps.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@step(
    name="pytorch_load_streaming_csv_data",
    outputs=["dataset"],
    cache=False,
    tags={"framework": "pytorch", "component": "data_loader", "mode": "streaming"},
)
def load_streaming_csv_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str | None = None,
    column_names: list[str] | None = None,
    num_workers: int = 0,
    pin_memory: bool = False,
    chunksize: int = 10000,
    dtype: str = "float32",
) -> Dataset:
    """Load large CSV data via streaming into a PyTorch DataLoader, wrapped as a Dataset asset.

    Uses chunked reading for datasets that don't fit in memory.

    Args:
        file_path: Glob pattern for CSV files.
        batch_size: Batch size.
        label_name: Target column name.
        column_names: Specific columns.
        num_workers: Number of data loading workers.
        pin_memory: Pin memory for faster GPU transfer.
        chunksize: Number of rows per chunk.
        dtype: Data type for tensors.

    Returns:
        Dataset asset wrapping the streaming PyTorch DataLoader.
    """
    # Convert dtype string to torch.dtype
    torch_dtype = getattr(torch, dtype)

    # Create streaming dataset
    streaming_dataset = StreamingCSVDataset(
        file_pattern=file_path,
        column_names=column_names,
        label_name=label_name,
        chunksize=chunksize,
        dtype=torch_dtype,
    )

    # Create DataLoader config (no shuffle for streaming)
    config = DataLoadingConfig(
        file_pattern=file_path,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=pin_memory,
        drop_last=False,
    )

    loader_factory = CSVDataLoader(**config.dict(exclude={"file_pattern", "config"}))
    dataloader = loader_factory.load(streaming_dataset)

    dataset_asset = Dataset.create(
        data=dataloader,
        name="pytorch_streaming_dataset",
        properties={
            "source": file_path,
            "batch_size": batch_size,
            "chunksize": chunksize,
            "label_name": label_name,
            "num_workers": num_workers,
            "dtype": dtype,
            "mode": "streaming",
        },
        source=file_path,
        loader="StreamingCSVDataLoader",
        framework="pytorch",
    )

    logger.info(
        f"📦 Streaming DataLoader: batch_size={batch_size}, "
        f"chunksize={chunksize}, source={file_path}"
    )
    return dataset_asset

train_model

train_model(
    model: nn.Module,
    data: DataLoader | Dataset,
    epochs: int = 10,
    learning_rate: float = 0.001,
    optimizer: str = "adam",
    loss_fn: str = "mse",
    device: str = "cpu",
    validation_data: DataLoader | Dataset | None = None,
    verbose: bool = True,
    max_batches_per_epoch: int | None = None,
) -> tuple[Model, Metrics]

Train a PyTorch model with full configuration.

Returns a Model asset (via Model.from_pytorch with auto-extracted metadata) and a Metrics asset with training history.

Parameters:

Name Type Description Default
model nn.Module

PyTorch model (nn.Module).

required
data DataLoader | Dataset

Training DataLoader or Dataset asset.

required
epochs int

Number of training epochs.

10
learning_rate float

Learning rate.

0.001
optimizer str

Optimizer type ('adam', 'sgd', 'adamw').

'adam'
loss_fn str

Loss function name ('mse', 'cross_entropy').

'mse'
device str

Device to train on ('cuda', 'cpu').

'cpu'
validation_data DataLoader | Dataset | None

Optional validation DataLoader or Dataset.

None
verbose bool

Whether to log per-epoch metrics.

True
max_batches_per_epoch int | None

Limit batches per epoch (for debugging).

None

Returns:

Type Description
tuple[Model, Metrics]

Tuple of (Model asset, Metrics asset).

Source code in mlpotion/integrations/flowyml/pytorch/steps.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
@step(
    name="pytorch_train_model",
    inputs=["dataset"],
    outputs=["model", "training_metrics"],
    cache=False,
    retry=1,
    tags={"framework": "pytorch", "component": "model_trainer"},
)
def train_model(
    model: nn.Module,
    data: DataLoader | Dataset,
    epochs: int = 10,
    learning_rate: float = 0.001,
    optimizer: str = "adam",
    loss_fn: str = "mse",
    device: str = "cpu",
    validation_data: DataLoader | Dataset | None = None,
    verbose: bool = True,
    max_batches_per_epoch: int | None = None,
) -> tuple[Model, Metrics]:
    """Train a PyTorch model with full configuration.

    Returns a Model asset (via Model.from_pytorch with auto-extracted metadata)
    and a Metrics asset with training history.

    Args:
        model: PyTorch model (nn.Module).
        data: Training DataLoader or Dataset asset.
        epochs: Number of training epochs.
        learning_rate: Learning rate.
        optimizer: Optimizer type ('adam', 'sgd', 'adamw').
        loss_fn: Loss function name ('mse', 'cross_entropy').
        device: Device to train on ('cuda', 'cpu').
        validation_data: Optional validation DataLoader or Dataset.
        verbose: Whether to log per-epoch metrics.
        max_batches_per_epoch: Limit batches per epoch (for debugging).

    Returns:
        Tuple of (Model asset, Metrics asset).
    """
    # Extract raw data from Dataset if wrapped
    raw_data = data.data if isinstance(data, Dataset) else data
    raw_val = (
        validation_data.data
        if isinstance(validation_data, Dataset)
        else validation_data
    )

    config = ModelTrainingConfig(
        epochs=epochs,
        learning_rate=learning_rate,
        optimizer=optimizer,
        loss_fn=loss_fn,
        device=device,
        verbose=verbose,
        max_batches_per_epoch=max_batches_per_epoch,
    )

    trainer = ModelTrainer()
    result = trainer.train(
        model=model,
        dataloader=raw_data,
        config=config,
        validation_dataloader=raw_val,
    )

    # Collect raw metrics
    raw_metrics: dict[str, Any] = result.metrics if hasattr(result, "metrics") else {}
    raw_metrics["epochs_completed"] = epochs
    raw_metrics["learning_rate"] = learning_rate
    raw_metrics["optimizer"] = optimizer
    raw_metrics["device"] = device

    # Wrap as Model asset using from_pytorch for full auto-extraction
    model_asset = Model.from_pytorch(
        result.model,
        name="pytorch_trained_model",
        training_history=raw_metrics,
        epochs_requested=epochs,
        optimizer_type=optimizer,
        loss_function=loss_fn,
    )

    # Wrap as Metrics asset
    metrics_asset = Metrics.create(
        metrics=raw_metrics,
        name="pytorch_training_metrics",
        tags={"stage": "training", "framework": "pytorch"},
        properties={
            "epochs": epochs,
            "learning_rate": learning_rate,
            "optimizer": optimizer,
            "device": device,
        },
    )

    logger.info(f"🎯 PyTorch training complete: {epochs} epochs, device={device}")
    return model_asset, metrics_asset

evaluate_model

evaluate_model(
    model: nn.Module | Model,
    data: DataLoader | Dataset,
    loss_fn: str = "mse",
    device: str = "cpu",
    verbose: bool = True,
    max_batches: int | None = None,
) -> Metrics

Evaluate a PyTorch model and return a Metrics asset.

Parameters:

Name Type Description Default
model nn.Module | Model

Trained PyTorch model or Model asset.

required
data DataLoader | Dataset

Evaluation DataLoader or Dataset asset.

required
loss_fn str

Loss function name.

'mse'
device str

Device for evaluation.

'cpu'
verbose bool

Whether to log metrics.

True
max_batches int | None

Limit batches to evaluate.

None

Returns:

Type Description
Metrics

Metrics asset with evaluation results.

Source code in mlpotion/integrations/flowyml/pytorch/steps.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
@step(
    name="pytorch_evaluate_model",
    inputs=["model", "dataset"],
    outputs=["metrics"],
    cache="input_hash",
    tags={"framework": "pytorch", "component": "model_evaluator"},
)
def evaluate_model(
    model: nn.Module | Model,
    data: DataLoader | Dataset,
    loss_fn: str = "mse",
    device: str = "cpu",
    verbose: bool = True,
    max_batches: int | None = None,
) -> Metrics:
    """Evaluate a PyTorch model and return a Metrics asset.

    Args:
        model: Trained PyTorch model or Model asset.
        data: Evaluation DataLoader or Dataset asset.
        loss_fn: Loss function name.
        device: Device for evaluation.
        verbose: Whether to log metrics.
        max_batches: Limit batches to evaluate.

    Returns:
        Metrics asset with evaluation results.
    """
    # Extract raw objects from assets
    raw_model = model.data if isinstance(model, Model) else model
    raw_data = data.data if isinstance(data, Dataset) else data

    config = ModelEvaluationConfig(
        batch_size=raw_data.batch_size or 32,
        verbose=verbose,
        device=device,
        framework_options={"loss_fn": loss_fn, "max_batches": max_batches},
    )

    evaluator = ModelEvaluator()
    result = evaluator.evaluate(
        model=raw_model,
        dataloader=raw_data,
        config=config,
    )

    raw_metrics = result.metrics if hasattr(result, "metrics") else {}

    metrics_asset = Metrics.create(
        metrics=raw_metrics,
        name="pytorch_evaluation_metrics",
        tags={"stage": "evaluation", "framework": "pytorch"},
        properties=raw_metrics,
    )

    logger.info(f"📊 PyTorch evaluation: {raw_metrics}")
    return metrics_asset

export_model

export_model(
    model: nn.Module | Model,
    export_path: str,
    export_format: str = "torchscript",
    sample_input: torch.Tensor | None = None,
) -> Model

Export a PyTorch model to the specified format, returned as a Model asset.

Parameters:

Name Type Description Default
model nn.Module | Model

PyTorch model or Model asset to export.

required
export_path str

Destination path.

required
export_format str

Format ('torchscript', 'onnx').

'torchscript'
sample_input torch.Tensor | None

Sample input tensor (required for ONNX).

None

Returns:

Type Description
Model

Model asset with export metadata.

Source code in mlpotion/integrations/flowyml/pytorch/steps.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
@step(
    name="pytorch_export_model",
    inputs=["model"],
    outputs=["exported_model"],
    cache="code_hash",
    tags={"framework": "pytorch", "component": "model_exporter"},
)
def export_model(
    model: nn.Module | Model,
    export_path: str,
    export_format: str = "torchscript",
    sample_input: torch.Tensor | None = None,
) -> Model:
    """Export a PyTorch model to the specified format, returned as a Model asset.

    Args:
        model: PyTorch model or Model asset to export.
        export_path: Destination path.
        export_format: Format ('torchscript', 'onnx').
        sample_input: Sample input tensor (required for ONNX).

    Returns:
        Model asset with export metadata.
    """
    raw_model = model.data if isinstance(model, Model) else model

    config = ModelExportConfig(
        export_path=export_path,
        format=export_format,
    )
    exporter = ModelExporter()
    exporter.export(model=raw_model, config=config, sample_input=sample_input)

    model_asset = Model.from_pytorch(
        raw_model,
        name="pytorch_exported_model",
        export_path=export_path,
        export_format=export_format,
    )

    logger.info(f"📤 Exported PyTorch model to: {export_path}")
    return model_asset

save_model

save_model(
    model: nn.Module | Model, save_path: str
) -> Model

Save a PyTorch model to disk, returned as a Model asset.

Parameters:

Name Type Description Default
model nn.Module | Model

PyTorch model or Model asset to save.

required
save_path str

Destination file path.

required

Returns:

Type Description
Model

Model asset with save location metadata.

Source code in mlpotion/integrations/flowyml/pytorch/steps.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
@step(
    name="pytorch_save_model",
    inputs=["model"],
    outputs=["saved_model"],
    tags={"framework": "pytorch", "component": "model_persistence"},
)
def save_model(
    model: nn.Module | Model,
    save_path: str,
) -> Model:
    """Save a PyTorch model to disk, returned as a Model asset.

    Args:
        model: PyTorch model or Model asset to save.
        save_path: Destination file path.

    Returns:
        Model asset with save location metadata.
    """
    raw_model = model.data if isinstance(model, Model) else model

    persistence = ModelPersistence()
    persistence.save(model=raw_model, path=save_path)

    model_asset = Model.from_pytorch(
        raw_model,
        name="pytorch_saved_model",
        save_path=save_path,
    )

    logger.info(f"💾 Saved PyTorch model to: {save_path}")
    return model_asset

load_model

load_model(
    model_path: str,
    model_class: type | None = None,
    device: str | None = None,
) -> Model

Load a PyTorch model from disk, returned as a Model asset.

Parameters:

Name Type Description Default
model_path str

Path to the saved model.

required
model_class type | None

Model class for state_dict loading.

None
device str | None

Device to load model onto.

None

Returns:

Type Description
Model

Model asset wrapping the loaded PyTorch model.

Source code in mlpotion/integrations/flowyml/pytorch/steps.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
@step(
    name="pytorch_load_model",
    outputs=["model"],
    cache="code_hash",
    tags={"framework": "pytorch", "component": "model_persistence"},
)
def load_model(
    model_path: str,
    model_class: type | None = None,
    device: str | None = None,
) -> Model:
    """Load a PyTorch model from disk, returned as a Model asset.

    Args:
        model_path: Path to the saved model.
        model_class: Model class for state_dict loading.
        device: Device to load model onto.

    Returns:
        Model asset wrapping the loaded PyTorch model.
    """
    persistence = ModelPersistence()
    raw_model = persistence.load(
        path=model_path, model_class=model_class, device=device
    )

    model_asset = Model.from_pytorch(
        raw_model,
        name="pytorch_loaded_model",
        source_path=model_path,
    )

    logger.info(f"🔍 Loaded PyTorch model from: {model_path}")
    return model_asset

Pipelines

mlpotion.integrations.flowyml.pytorch.pipelines

Pre-built FlowyML pipeline templates for PyTorch workflows.

Provides ready-to-run pipelines that wire MLPotion PyTorch steps together with proper DAG dependencies, context injection, experiment tracking, and optional scheduling.

Available pipelines:

  • create_pytorch_training_pipeline — Load → Train → Evaluate
  • create_pytorch_full_pipeline — Load → Train → Evaluate → Export → Save
  • create_pytorch_evaluation_pipeline — Load model + data → Evaluate
  • create_pytorch_export_pipeline — Load model → Export + Save
  • create_pytorch_experiment_pipeline — Full pipeline with conditional deploy
  • create_pytorch_scheduled_pipeline — Scheduled retraining with cron

Functions

create_pytorch_training_pipeline

create_pytorch_training_pipeline(
    name: str = "pytorch_training",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline

Create a ready-to-run PyTorch training pipeline.

DAG: load_csv_data → train_model → evaluate_model

Provide hyperparameters via the context object::

from flowyml.core.context import Context

ctx = Context(
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    epochs=20,
    learning_rate=0.001,
    optimizer="adam",
    loss_fn="cross_entropy",
    device="cuda",
)

pipeline = create_pytorch_training_pipeline(
    name="my_training",
    context=ctx,
    project_name="my_project",
)
result = pipeline.run()

Parameters:

Name Type Description Default
name str

Pipeline name.

'pytorch_training'
context Context | None

FlowyML Context with parameters to inject.

None
enable_cache bool

Whether to enable step caching.

True
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string for versioned pipeline.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/pytorch/pipelines.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def create_pytorch_training_pipeline(
    name: str = "pytorch_training",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline:
    """Create a ready-to-run PyTorch training pipeline.

    **DAG**: ``load_csv_data → train_model → evaluate_model``

    Provide hyperparameters via the context object::

        from flowyml.core.context import Context

        ctx = Context(
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            epochs=20,
            learning_rate=0.001,
            optimizer="adam",
            loss_fn="cross_entropy",
            device="cuda",
        )

        pipeline = create_pytorch_training_pipeline(
            name="my_training",
            context=ctx,
            project_name="my_project",
        )
        result = pipeline.run()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters to inject.
        enable_cache: Whether to enable step caching.
        project_name: Project to attach this pipeline to.
        version: Optional version string for versioned pipeline.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        project_name=project_name,
        version=version,
    )

    pipeline.add_step(load_csv_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)

    return pipeline

create_pytorch_full_pipeline

create_pytorch_full_pipeline(
    name: str = "pytorch_full",
    context: Context | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline

Create a full PyTorch pipeline covering the entire ML lifecycle.

DAG: load_csv_data → train_model → evaluate_model → export_model → save_model

Includes checkpointing for long-running training steps so the pipeline can resume from the last checkpoint on failure.

Context parameters::

ctx = Context(
    # Data loading
    file_path="data/train.csv",
    label_name="target",
    batch_size=64,
    # Training
    epochs=100,
    learning_rate=0.001,
    optimizer="adamw",
    loss_fn="cross_entropy",
    device="cuda",
    # Export
    export_path="models/production/model.pt",
    export_format="torchscript",
    save_path="models/backup/model.pt",
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'pytorch_full'
context Context | None

FlowyML Context with parameters.

None
enable_cache bool

Whether to enable step caching.

True
enable_checkpointing bool

Whether to enable checkpointing.

True
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/pytorch/pipelines.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def create_pytorch_full_pipeline(
    name: str = "pytorch_full",
    context: Context | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline:
    """Create a full PyTorch pipeline covering the entire ML lifecycle.

    **DAG**: ``load_csv_data → train_model → evaluate_model → export_model → save_model``

    Includes checkpointing for long-running training steps so the pipeline
    can resume from the last checkpoint on failure.

    Context parameters::

        ctx = Context(
            # Data loading
            file_path="data/train.csv",
            label_name="target",
            batch_size=64,
            # Training
            epochs=100,
            learning_rate=0.001,
            optimizer="adamw",
            loss_fn="cross_entropy",
            device="cuda",
            # Export
            export_path="models/production/model.pt",
            export_format="torchscript",
            save_path="models/backup/model.pt",
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        enable_cache: Whether to enable step caching.
        enable_checkpointing: Whether to enable checkpointing.
        project_name: Project to attach this pipeline to.
        version: Optional version string.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        enable_checkpointing=enable_checkpointing,
        project_name=project_name,
        version=version,
    )

    pipeline.add_step(load_csv_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(export_model)
    pipeline.add_step(save_model)

    return pipeline

create_pytorch_evaluation_pipeline

create_pytorch_evaluation_pipeline(
    name: str = "pytorch_evaluation",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
) -> Pipeline

Create a pipeline for evaluating an existing PyTorch model.

DAG: load_model → load_csv_data → evaluate_model

Useful for model validation, A/B testing, and periodic evaluation against new data without retraining.

Context parameters::

ctx = Context(
    model_path="models/production/model.pt",
    file_path="data/test.csv",
    label_name="target",
    batch_size=64,
    device="cuda",
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'pytorch_evaluation'
context Context | None

FlowyML Context with parameters.

None
enable_cache bool

Whether to enable step caching.

True
project_name str | None

Project to attach this pipeline to.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/pytorch/pipelines.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def create_pytorch_evaluation_pipeline(
    name: str = "pytorch_evaluation",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
) -> Pipeline:
    """Create a pipeline for evaluating an existing PyTorch model.

    **DAG**: ``load_model → load_csv_data → evaluate_model``

    Useful for model validation, A/B testing, and periodic evaluation
    against new data without retraining.

    Context parameters::

        ctx = Context(
            model_path="models/production/model.pt",
            file_path="data/test.csv",
            label_name="target",
            batch_size=64,
            device="cuda",
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        enable_cache: Whether to enable step caching.
        project_name: Project to attach this pipeline to.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        project_name=project_name,
    )

    pipeline.add_step(load_model)
    pipeline.add_step(load_csv_data)
    pipeline.add_step(evaluate_model)

    return pipeline

create_pytorch_export_pipeline

create_pytorch_export_pipeline(
    name: str = "pytorch_export",
    context: Context | None = None,
    project_name: str | None = None,
) -> Pipeline

Create a pipeline for exporting and saving an existing model.

DAG: load_model → export_model, save_model

Useful for converting a trained model to TorchScript or ONNX and persisting to different locations.

Context parameters::

ctx = Context(
    model_path="models/trained/model.pt",
    export_path="models/exported/model.ts",
    export_format="torchscript",
    save_path="models/backup/model.pt",
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'pytorch_export'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/pytorch/pipelines.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def create_pytorch_export_pipeline(
    name: str = "pytorch_export",
    context: Context | None = None,
    project_name: str | None = None,
) -> Pipeline:
    """Create a pipeline for exporting and saving an existing model.

    **DAG**: ``load_model → export_model, save_model``

    Useful for converting a trained model to TorchScript or ONNX
    and persisting to different locations.

    Context parameters::

        ctx = Context(
            model_path="models/trained/model.pt",
            export_path="models/exported/model.ts",
            export_format="torchscript",
            save_path="models/backup/model.pt",
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,  # Always re-export
        project_name=project_name,
    )

    pipeline.add_step(load_model)
    pipeline.add_step(export_model)
    pipeline.add_step(save_model)

    return pipeline

create_pytorch_experiment_pipeline

create_pytorch_experiment_pipeline(
    name: str = "pytorch_experiment",
    context: Context | None = None,
    project_name: str | None = None,
    version: str | None = None,
    deploy_threshold: float = 0.8,
    threshold_metric: str = "accuracy",
) -> Pipeline

Create a full experiment pipeline with conditional deployment.

DAG::

load_csv_data → train_model → evaluate_model
                                   ↓
                          [if metric > threshold]
                                   ↓
                          export_model → save_model

Conditionally exports the model only if validation metrics exceed the given threshold.

Context parameters::

ctx = Context(
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    epochs=50,
    learning_rate=0.001,
    optimizer="adamw",
    device="cuda",
    export_path="models/production/model.pt",
    save_path="models/checkpoints/model.pt",
)

pipeline = create_pytorch_experiment_pipeline(
    context=ctx,
    deploy_threshold=0.85,
    threshold_metric="accuracy",
)
result = pipeline.run()

Parameters:

Name Type Description Default
name str

Pipeline name.

'pytorch_experiment'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string.

None
deploy_threshold float

Minimum metric value to trigger deployment.

0.8
threshold_metric str

Which metric to check against the threshold.

'accuracy'

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/pytorch/pipelines.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def create_pytorch_experiment_pipeline(
    name: str = "pytorch_experiment",
    context: Context | None = None,
    project_name: str | None = None,
    version: str | None = None,
    deploy_threshold: float = 0.8,
    threshold_metric: str = "accuracy",
) -> Pipeline:
    """Create a full experiment pipeline with conditional deployment.

    **DAG**::

        load_csv_data → train_model → evaluate_model

                                  [if metric > threshold]

                                  export_model → save_model

    Conditionally exports the model only if validation metrics
    exceed the given threshold.

    Context parameters::

        ctx = Context(
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            epochs=50,
            learning_rate=0.001,
            optimizer="adamw",
            device="cuda",
            export_path="models/production/model.pt",
            save_path="models/checkpoints/model.pt",
        )

        pipeline = create_pytorch_experiment_pipeline(
            context=ctx,
            deploy_threshold=0.85,
            threshold_metric="accuracy",
        )
        result = pipeline.run()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.
        version: Optional version string.
        deploy_threshold: Minimum metric value to trigger deployment.
        threshold_metric: Which metric to check against the threshold.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    from flowyml.core.conditional import If

    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,
        enable_experiment_tracking=True,
        enable_checkpointing=True,
        project_name=project_name,
        version=version,
    )

    # Core training DAG
    pipeline.add_step(load_csv_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)

    # Conditional deployment: only export if metric exceeds threshold
    deploy_condition = If(
        condition=lambda metrics: (
            metrics.get_metric(threshold_metric, 0) >= deploy_threshold
            if hasattr(metrics, "get_metric")
            else metrics.get(threshold_metric, 0) >= deploy_threshold
        ),
        then_steps=[export_model, save_model],
        name=f"deploy_if_{threshold_metric}_above_{deploy_threshold}",
    )
    pipeline.control_flows.append(deploy_condition)

    return pipeline

create_pytorch_scheduled_pipeline

create_pytorch_scheduled_pipeline(
    name: str = "pytorch_scheduled_retraining",
    context: Context | None = None,
    project_name: str | None = None,
    schedule: str = "0 2 * * 0",
    timezone: str = "UTC",
) -> dict[str, Any]

Create a scheduled retraining pipeline.

Returns both the pipeline and a configured scheduler so you can register periodic retraining (e.g., weekly) with a single call.

DAG: load_csv_data → train_model → evaluate_model → export_model

Schedule format uses cron syntax (default: every Sunday at 2 AM)::

pipeline_info = create_pytorch_scheduled_pipeline(
    context=ctx,
    project_name="my_project",
    schedule="0 2 * * 0",  # Weekly
)

# Access the components
pipeline = pipeline_info["pipeline"]
scheduler = pipeline_info["scheduler"]

# Run once immediately
result = pipeline.run()

# Or start the scheduler for automatic retraining
scheduler.start()

Parameters:

Name Type Description Default
name str

Pipeline name.

'pytorch_scheduled_retraining'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None
schedule str

Cron expression for scheduling.

'0 2 * * 0'
timezone str

Timezone for the schedule.

'UTC'

Returns:

Type Description
dict[str, Any]

Dict with pipeline and scheduler keys.

Source code in mlpotion/integrations/flowyml/pytorch/pipelines.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def create_pytorch_scheduled_pipeline(
    name: str = "pytorch_scheduled_retraining",
    context: Context | None = None,
    project_name: str | None = None,
    schedule: str = "0 2 * * 0",
    timezone: str = "UTC",
) -> dict[str, Any]:
    """Create a scheduled retraining pipeline.

    Returns both the pipeline and a configured scheduler so you can
    register periodic retraining (e.g., weekly) with a single call.

    **DAG**: ``load_csv_data → train_model → evaluate_model → export_model``

    Schedule format uses cron syntax (default: every Sunday at 2 AM)::

        pipeline_info = create_pytorch_scheduled_pipeline(
            context=ctx,
            project_name="my_project",
            schedule="0 2 * * 0",  # Weekly
        )

        # Access the components
        pipeline = pipeline_info["pipeline"]
        scheduler = pipeline_info["scheduler"]

        # Run once immediately
        result = pipeline.run()

        # Or start the scheduler for automatic retraining
        scheduler.start()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.
        schedule: Cron expression for scheduling.
        timezone: Timezone for the schedule.

    Returns:
        Dict with ``pipeline`` and ``scheduler`` keys.
    """
    from flowyml.core.scheduler import PipelineScheduler

    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,  # Fresh data each run
        enable_checkpointing=True,
        project_name=project_name,
    )

    pipeline.add_step(load_csv_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(export_model)

    # Configure scheduler
    scheduler = PipelineScheduler()
    scheduler.schedule(
        pipeline=pipeline,
        cron=schedule,
        timezone=timezone,
    )

    return {
        "pipeline": pipeline,
        "scheduler": scheduler,
    }

TensorFlow Integration

Steps

mlpotion.integrations.flowyml.tensorflow.steps

FlowyML TensorFlow steps — Full-featured pipeline steps for TF/Keras workflows.

Each step leverages FlowyML's native capabilities: - Artifact-centric design: returns Dataset, Model, Metrics with auto-extraction - Supports caching, retry, GPU resources, tags, DAG wiring, and execution groups - train_model integrates FlowymlKerasCallback for automatic tracking

Classes

Functions

load_data

load_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str = "target",
    column_names: list[str] | None = None,
) -> Dataset

Load CSV data into a tf.data.Dataset, wrapped as a Dataset asset.

Automatic metadata extraction captures batch size, source path, label name, and column configuration.

Parameters:

Name Type Description Default
file_path str

Glob pattern for CSV files.

required
batch_size int

Batch size.

32
label_name str

Target column name.

'target'
column_names list[str] | None

Specific columns to load.

None

Returns:

Type Description
Dataset

Dataset asset wrapping the tf.data.Dataset.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@step(
    name="tf_load_data",
    outputs=["dataset"],
    cache="code_hash",
    tags={"framework": "tensorflow", "component": "data_loader"},
)
def load_data(
    file_path: str,
    batch_size: int = 32,
    label_name: str = "target",
    column_names: list[str] | None = None,
) -> Dataset:
    """Load CSV data into a tf.data.Dataset, wrapped as a Dataset asset.

    Automatic metadata extraction captures batch size, source path,
    label name, and column configuration.

    Args:
        file_path: Glob pattern for CSV files.
        batch_size: Batch size.
        label_name: Target column name.
        column_names: Specific columns to load.

    Returns:
        Dataset asset wrapping the tf.data.Dataset.
    """
    config = DataLoadingConfig(
        file_pattern=file_path,
        batch_size=batch_size,
        label_name=label_name,
        column_names=column_names,
    )
    loader = CSVDataLoader(**config.dict())
    tf_dataset = loader.load()

    dataset_asset = Dataset.create(
        data=tf_dataset,
        name="tf_csv_dataset",
        properties={
            "source": file_path,
            "batch_size": batch_size,
            "label_name": label_name,
        },
        source=file_path,
        loader="CSVDataLoader",
        framework="tensorflow",
    )

    logger.info(f"📦 Loaded TF Dataset: batch_size={batch_size}, source={file_path}")
    return dataset_asset

optimize_data

optimize_data(
    dataset: tf.data.Dataset | Dataset,
    batch_size: int = 32,
    shuffle_buffer_size: int | None = None,
    prefetch: bool = True,
    cache: bool = False,
) -> Dataset

Optimize a tf.data.Dataset with caching and prefetching, returned as Dataset asset.

Returns a Dataset asset with lineage linked to the input dataset.

Parameters:

Name Type Description Default
dataset tf.data.Dataset | Dataset

Input tf.data.Dataset or Dataset asset.

required
batch_size int

Batch size.

32
shuffle_buffer_size int | None

Shuffle buffer size.

None
prefetch bool

Enable prefetching.

True
cache bool

Enable dataset caching.

False

Returns:

Type Description
Dataset

Dataset asset wrapping the optimized tf.data.Dataset.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@step(
    name="tf_optimize_data",
    inputs=["dataset"],
    outputs=["optimized_dataset"],
    cache="code_hash",
    tags={"framework": "tensorflow", "component": "data_optimizer"},
)
def optimize_data(
    dataset: tf.data.Dataset | Dataset,
    batch_size: int = 32,
    shuffle_buffer_size: int | None = None,
    prefetch: bool = True,
    cache: bool = False,
) -> Dataset:
    """Optimize a tf.data.Dataset with caching and prefetching, returned as Dataset asset.

    Returns a Dataset asset with lineage linked to the input dataset.

    Args:
        dataset: Input tf.data.Dataset or Dataset asset.
        batch_size: Batch size.
        shuffle_buffer_size: Shuffle buffer size.
        prefetch: Enable prefetching.
        cache: Enable dataset caching.

    Returns:
        Dataset asset wrapping the optimized tf.data.Dataset.
    """
    # Extract raw tf.data.Dataset from Dataset if wrapped
    raw_dataset = dataset.data if isinstance(dataset, Dataset) else dataset

    config = DataOptimizationConfig(
        batch_size=batch_size,
        shuffle_buffer_size=shuffle_buffer_size,
        prefetch=prefetch,
        cache=cache,
    )
    optimizer = DatasetOptimizer(**config.dict())
    optimized = optimizer.optimize(raw_dataset)

    parent = dataset if isinstance(dataset, Dataset) else None
    dataset_asset = Dataset.create(
        data=optimized,
        name="tf_optimized_dataset",
        parent=parent,
        properties={
            "batch_size": batch_size,
            "prefetch": prefetch,
            "cache": cache,
            "shuffle_buffer_size": shuffle_buffer_size,
        },
        optimizer="DatasetOptimizer",
        framework="tensorflow",
    )

    logger.info(f"⚡ Optimized TF Dataset: cache={cache}, prefetch={prefetch}")
    return dataset_asset

transform_data

transform_data(
    dataset: tf.data.Dataset | Dataset,
    model: keras.Model | Model,
    data_output_path: str,
    data_output_per_batch: bool = False,
) -> Dataset

Transform data using a model and save predictions to CSV, returned as a Dataset asset.

Returns a Dataset asset with lineage linked to the input dataset.

Parameters:

Name Type Description Default
dataset tf.data.Dataset | Dataset

Input tf.data.Dataset or Dataset asset.

required
model keras.Model | Model

Keras/TF model or Model asset for generating predictions.

required
data_output_path str

Output path for transformed data.

required
data_output_per_batch bool

If True, output one file per batch.

False

Returns:

Type Description
Dataset

Dataset asset pointing to the output CSV with parent lineage.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@step(
    name="tf_transform_data",
    inputs=["dataset"],
    outputs=["transformed"],
    tags={"framework": "tensorflow", "component": "data_transformer"},
)
def transform_data(
    dataset: tf.data.Dataset | Dataset,
    model: keras.Model | Model,
    data_output_path: str,
    data_output_per_batch: bool = False,
) -> Dataset:
    """Transform data using a model and save predictions to CSV, returned as a Dataset asset.

    Returns a Dataset asset with lineage linked to the input dataset.

    Args:
        dataset: Input tf.data.Dataset or Dataset asset.
        model: Keras/TF model or Model asset for generating predictions.
        data_output_path: Output path for transformed data.
        data_output_per_batch: If True, output one file per batch.

    Returns:
        Dataset asset pointing to the output CSV with parent lineage.
    """
    # Extract raw objects from assets
    raw_dataset = dataset.data if isinstance(dataset, Dataset) else dataset
    raw_model = model.data if isinstance(model, Model) else model

    transformer = DataToCSVTransformer(
        dataset=raw_dataset,
        model=raw_model,
        data_output_path=data_output_path,
        data_output_per_batch=data_output_per_batch,
    )

    # Create minimal config for transform method
    config = DataTransformationConfig(
        file_pattern="",  # Not used since dataset is provided
        model_path="",  # Not used since model is provided
        model_input_signature={},  # Empty dict as model is provided directly
        data_output_path=data_output_path,
        data_output_per_batch=data_output_per_batch,
    )

    transformer.transform(dataset=None, model=None, config=config)

    parent = dataset if isinstance(dataset, Dataset) else None
    transformed = Dataset.create(
        data={"output_path": data_output_path},
        name="tf_transformed_data",
        parent=parent,
        properties={
            "output_path": data_output_path,
            "per_batch": data_output_per_batch,
        },
        source=data_output_path,
        transformer="DataToCSVTransformer",
    )

    logger.info(f"🔄 Transformed data saved to: {data_output_path}")
    return transformed

train_model

train_model(
    model: keras.Model,
    data: tf.data.Dataset | Dataset,
    epochs: int = 10,
    learning_rate: float = 0.001,
    verbose: int = 1,
    validation_data: tf.data.Dataset
    | Dataset
    | None = None,
    callbacks: list[keras.callbacks.Callback] | None = None,
    experiment_name: str | None = None,
    project: str | None = None,
    log_model: bool = True,
) -> tuple[Model, Metrics]

Train a TF/Keras model with FlowyML tracking integration.

Automatically attaches a FlowymlKerasCallback for: - Dynamic capture of ALL training metrics - Live dashboard updates - Model artifact logging

Returns a Model asset (via Model.from_keras with auto-extracted metadata) and a Metrics asset with training history.

Parameters:

Name Type Description Default
model keras.Model

Compiled Keras model.

required
data tf.data.Dataset | Dataset

Training tf.data.Dataset or Dataset asset.

required
epochs int

Number of training epochs.

10
learning_rate float

Learning rate.

0.001
verbose int

Keras verbosity level.

1
validation_data tf.data.Dataset | Dataset | None

Optional validation dataset or Dataset asset.

None
callbacks list[keras.callbacks.Callback] | None

Additional Keras callbacks.

None
experiment_name str | None

Experiment name for FlowyML tracking.

None
project str | None

Project name for FlowyML dashboard.

None
log_model bool

Whether to save model artifact after training.

True

Returns:

Type Description
tuple[Model, Metrics]

Tuple of (Model asset, Metrics asset).

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
@step(
    name="tf_train_model",
    inputs=["dataset"],
    outputs=["model", "training_metrics"],
    cache=False,
    retry=1,
    tags={"framework": "tensorflow", "component": "model_trainer"},
)
def train_model(
    model: keras.Model,
    data: tf.data.Dataset | Dataset,
    epochs: int = 10,
    learning_rate: float = 0.001,
    verbose: int = 1,
    validation_data: tf.data.Dataset | Dataset | None = None,
    callbacks: list[keras.callbacks.Callback] | None = None,
    experiment_name: str | None = None,
    project: str | None = None,
    log_model: bool = True,
) -> tuple[Model, Metrics]:
    """Train a TF/Keras model with FlowyML tracking integration.

    Automatically attaches a FlowymlKerasCallback for:
    - Dynamic capture of ALL training metrics
    - Live dashboard updates
    - Model artifact logging

    Returns a Model asset (via Model.from_keras with auto-extracted metadata)
    and a Metrics asset with training history.

    Args:
        model: Compiled Keras model.
        data: Training tf.data.Dataset or Dataset asset.
        epochs: Number of training epochs.
        learning_rate: Learning rate.
        verbose: Keras verbosity level.
        validation_data: Optional validation dataset or Dataset asset.
        callbacks: Additional Keras callbacks.
        experiment_name: Experiment name for FlowyML tracking.
        project: Project name for FlowyML dashboard.
        log_model: Whether to save model artifact after training.

    Returns:
        Tuple of (Model asset, Metrics asset).
    """
    # Extract raw data from Dataset if wrapped
    raw_data = data.data if isinstance(data, Dataset) else data
    raw_val = (
        validation_data.data
        if isinstance(validation_data, Dataset)
        else validation_data
    )

    all_callbacks = list(callbacks or [])

    # Auto-attach FlowyML callback
    flowyml_callback = None
    if experiment_name:
        flowyml_callback = FlowymlKerasCallback(
            experiment_name=experiment_name,
            project=project,
            log_model=log_model,
        )
        all_callbacks.append(flowyml_callback)

    config = ModelTrainingConfig(
        epochs=epochs,
        learning_rate=learning_rate,
        verbose=verbose,
        optimizer="adam",
        loss="mse",
        metrics=["mae"],
    )

    trainer = ModelTrainer()
    result = trainer.train(
        model=model,
        dataset=raw_data,
        config=config,
        validation_dataset=raw_val,
    )

    # Collect raw metrics
    raw_metrics: dict[str, Any] = result.metrics if hasattr(result, "metrics") else {}
    raw_metrics["epochs_completed"] = epochs
    raw_metrics["learning_rate"] = learning_rate

    # Wrap as Model asset using from_keras for full auto-extraction
    model_asset = Model.from_keras(
        model,
        name="tf_trained_model",
        callback=flowyml_callback,
        epochs_requested=epochs,
    )

    # Wrap as Metrics asset
    metrics_asset = Metrics.create(
        metrics=raw_metrics,
        name="tf_training_metrics",
        tags={"stage": "training", "framework": "tensorflow"},
        properties={
            "epochs": epochs,
            "learning_rate": learning_rate,
            **{k: v for k, v in raw_metrics.items() if k != "history"},
        },
    )

    logger.info(
        f"🎯 TF training complete: {epochs} epochs, "
        f"metrics captured: {list(raw_metrics.keys())}"
    )
    return model_asset, metrics_asset

evaluate_model

evaluate_model(
    model: keras.Model | Model,
    data: tf.data.Dataset | Dataset,
    verbose: int = 0,
) -> Metrics

Evaluate a TF/Keras model and return a Metrics asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Trained Keras model or Model asset.

required
data tf.data.Dataset | Dataset

Evaluation tf.data.Dataset or Dataset asset.

required
verbose int

Keras verbosity level.

0

Returns:

Type Description
Metrics

Metrics asset with evaluation results.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@step(
    name="tf_evaluate_model",
    inputs=["model", "dataset"],
    outputs=["metrics"],
    cache="input_hash",
    tags={"framework": "tensorflow", "component": "model_evaluator"},
)
def evaluate_model(
    model: keras.Model | Model,
    data: tf.data.Dataset | Dataset,
    verbose: int = 0,
) -> Metrics:
    """Evaluate a TF/Keras model and return a Metrics asset.

    Args:
        model: Trained Keras model or Model asset.
        data: Evaluation tf.data.Dataset or Dataset asset.
        verbose: Keras verbosity level.

    Returns:
        Metrics asset with evaluation results.
    """
    # Extract raw objects from assets
    raw_model = model.data if isinstance(model, Model) else model
    raw_data = data.data if isinstance(data, Dataset) else data

    config = ModelEvaluationConfig(verbose=verbose)
    evaluator = ModelEvaluator()
    result = evaluator.evaluate(model=raw_model, dataset=raw_data, config=config)

    raw_metrics = result.metrics if hasattr(result, "metrics") else {}

    metrics_asset = Metrics.create(
        metrics=raw_metrics,
        name="tf_evaluation_metrics",
        tags={"stage": "evaluation", "framework": "tensorflow"},
        properties=raw_metrics,
    )

    logger.info(f"📊 TF evaluation: {raw_metrics}")
    return metrics_asset

export_model

export_model(
    model: keras.Model | Model,
    export_path: str,
    export_format: str = "keras",
) -> Model

Export a TF/Keras model, returned as a Model asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Keras model or Model asset to export.

required
export_path str

Destination path.

required
export_format str

Format ('saved_model', 'tflite', 'keras').

'keras'

Returns:

Type Description
Model

Model asset with export metadata.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
@step(
    name="tf_export_model",
    inputs=["model"],
    outputs=["exported_model"],
    cache="code_hash",
    tags={"framework": "tensorflow", "component": "model_exporter"},
)
def export_model(
    model: keras.Model | Model,
    export_path: str,
    export_format: str = "keras",
) -> Model:
    """Export a TF/Keras model, returned as a Model asset.

    Args:
        model: Keras model or Model asset to export.
        export_path: Destination path.
        export_format: Format ('saved_model', 'tflite', 'keras').

    Returns:
        Model asset with export metadata.
    """
    raw_model = model.data if isinstance(model, Model) else model

    exporter = ModelExporter()
    exporter.export(
        model=raw_model,
        path=export_path,
        export_format=export_format,
    )

    model_asset = Model.from_keras(
        raw_model,
        name="tf_exported_model",
        export_path=export_path,
        export_format=export_format,
    )

    logger.info(f"📤 Exported TF model to: {export_path}")
    return model_asset

save_model

save_model(
    model: keras.Model | Model, save_path: str
) -> Model

Save a TF/Keras model to disk, returned as a Model asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Keras model or Model asset to save.

required
save_path str

Destination file path.

required

Returns:

Type Description
Model

Model asset with save location metadata.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
@step(
    name="tf_save_model",
    inputs=["model"],
    outputs=["saved_model"],
    tags={"framework": "tensorflow", "component": "model_persistence"},
)
def save_model(
    model: keras.Model | Model,
    save_path: str,
) -> Model:
    """Save a TF/Keras model to disk, returned as a Model asset.

    Args:
        model: Keras model or Model asset to save.
        save_path: Destination file path.

    Returns:
        Model asset with save location metadata.
    """
    raw_model = model.data if isinstance(model, Model) else model

    persistence = ModelPersistence(path=save_path, model=raw_model)
    persistence.save()

    model_asset = Model.from_keras(
        raw_model,
        name="tf_saved_model",
        save_path=save_path,
    )

    logger.info(f"💾 Saved TF model to: {save_path}")
    return model_asset

load_model

load_model(model_path: str, inspect: bool = False) -> Model

Load a TF/Keras model from disk, returned as a Model asset.

Parameters:

Name Type Description Default
model_path str

Path to the saved model.

required
inspect bool

If True, log model inspection info.

False

Returns:

Type Description
Model

Model asset wrapping the loaded Keras model.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
@step(
    name="tf_load_model",
    outputs=["model"],
    cache="code_hash",
    tags={"framework": "tensorflow", "component": "model_persistence"},
)
def load_model(
    model_path: str,
    inspect: bool = False,
) -> Model:
    """Load a TF/Keras model from disk, returned as a Model asset.

    Args:
        model_path: Path to the saved model.
        inspect: If True, log model inspection info.

    Returns:
        Model asset wrapping the loaded Keras model.
    """
    persistence = ModelPersistence(path=model_path)
    raw_model, inspection = persistence.load(inspect=inspect)

    model_asset = Model.from_keras(
        raw_model,
        name="tf_loaded_model",
        source_path=model_path,
    )

    if inspect and inspection:
        logger.info(f"🔍 Loaded TF model from: {model_path}, inspection: {inspection}")
    else:
        logger.info(f"🔍 Loaded TF model from: {model_path}")

    return model_asset

inspect_model

inspect_model(
    model: keras.Model | Model,
    include_layers: bool = True,
    include_signatures: bool = True,
) -> Metrics

Inspect a TF/Keras model and return detailed metadata as a Metrics asset.

Parameters:

Name Type Description Default
model keras.Model | Model

Keras model or Model asset to inspect.

required
include_layers bool

Include per-layer information.

True
include_signatures bool

Include input/output signatures.

True

Returns:

Type Description
Metrics

Metrics asset with model inspection details.

Source code in mlpotion/integrations/flowyml/tensorflow/steps.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
@step(
    name="tf_inspect_model",
    inputs=["model"],
    outputs=["inspection"],
    tags={"framework": "tensorflow", "component": "model_inspector"},
)
def inspect_model(
    model: keras.Model | Model,
    include_layers: bool = True,
    include_signatures: bool = True,
) -> Metrics:
    """Inspect a TF/Keras model and return detailed metadata as a Metrics asset.

    Args:
        model: Keras model or Model asset to inspect.
        include_layers: Include per-layer information.
        include_signatures: Include input/output signatures.

    Returns:
        Metrics asset with model inspection details.
    """
    raw_model = model.data if isinstance(model, Model) else model

    inspector = ModelInspector(
        include_layers=include_layers,
        include_signatures=include_signatures,
    )
    inspection = inspector.inspect(raw_model)

    metrics_asset = Metrics.create(
        metrics=inspection,
        name="tf_model_inspection",
        tags={"stage": "inspection", "framework": "tensorflow"},
        properties={
            "model_name": inspection.get("name", "unknown"),
            "total_params": inspection.get("parameters", {}).get("total"),
        },
    )

    logger.info(
        f"🔎 TF Model: {inspection.get('name', 'unknown')}, "
        f"params: {inspection.get('parameters', {}).get('total', '?')}"
    )
    return metrics_asset

Pipelines

mlpotion.integrations.flowyml.tensorflow.pipelines

Pre-built FlowyML pipeline templates for TensorFlow/Keras workflows.

Provides ready-to-run pipelines that wire MLPotion TensorFlow steps together with proper DAG dependencies, context injection, experiment tracking, and optional scheduling.

Available pipelines:

  • create_tf_training_pipeline — Load → Train → Evaluate
  • create_tf_full_pipeline — Load → Optimize → Transform → Train → Evaluate → Export
  • create_tf_evaluation_pipeline — Load model + data → Evaluate → Inspect
  • create_tf_export_pipeline — Load model → Export + Save
  • create_tf_experiment_pipeline — Full pipeline with conditional deploy
  • create_tf_scheduled_pipeline — Scheduled retraining with cron

Functions

create_tf_training_pipeline

create_tf_training_pipeline(
    name: str = "tf_training",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline

Create a ready-to-run TensorFlow training pipeline.

DAG: load_data → train_model → evaluate_model

Provide hyperparameters via the context object::

from flowyml.core.context import Context

ctx = Context(
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    epochs=10,
    learning_rate=0.001,
    experiment_name="my-experiment",
)

pipeline = create_tf_training_pipeline(
    name="my_training",
    context=ctx,
    project_name="my_project",
)
result = pipeline.run()

Parameters:

Name Type Description Default
name str

Pipeline name.

'tf_training'
context Context | None

FlowyML Context with parameters to inject.

None
enable_cache bool

Whether to enable step caching.

True
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string for versioned pipeline.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/tensorflow/pipelines.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def create_tf_training_pipeline(
    name: str = "tf_training",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline:
    """Create a ready-to-run TensorFlow training pipeline.

    **DAG**: ``load_data → train_model → evaluate_model``

    Provide hyperparameters via the context object::

        from flowyml.core.context import Context

        ctx = Context(
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            epochs=10,
            learning_rate=0.001,
            experiment_name="my-experiment",
        )

        pipeline = create_tf_training_pipeline(
            name="my_training",
            context=ctx,
            project_name="my_project",
        )
        result = pipeline.run()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters to inject.
        enable_cache: Whether to enable step caching.
        project_name: Project to attach this pipeline to.
        version: Optional version string for versioned pipeline.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        project_name=project_name,
        version=version,
    )

    pipeline.add_step(load_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)

    return pipeline

create_tf_full_pipeline

create_tf_full_pipeline(
    name: str = "tf_full",
    context: Context | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline

Create a full TensorFlow pipeline covering the entire ML lifecycle.

DAG: load_data → optimize_data → train_model → evaluate_model → export_model

Includes data optimization (prefetch/cache/shuffle) and checkpointing for long-running training steps.

Context parameters::

ctx = Context(
    # Data loading
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    # Optimization
    shuffle_buffer_size=10000,
    prefetch=True,
    cache=True,
    # Training
    epochs=50,
    learning_rate=0.001,
    experiment_name="full-run",
    project="my_project",
    # Export
    export_path="models/production/",
    export_format="saved_model",
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'tf_full'
context Context | None

FlowyML Context with parameters.

None
enable_cache bool

Whether to enable step caching.

True
enable_checkpointing bool

Whether to enable checkpointing.

True
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/tensorflow/pipelines.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def create_tf_full_pipeline(
    name: str = "tf_full",
    context: Context | None = None,
    enable_cache: bool = True,
    enable_checkpointing: bool = True,
    project_name: str | None = None,
    version: str | None = None,
) -> Pipeline:
    """Create a full TensorFlow pipeline covering the entire ML lifecycle.

    **DAG**: ``load_data → optimize_data → train_model → evaluate_model → export_model``

    Includes data optimization (prefetch/cache/shuffle) and checkpointing
    for long-running training steps.

    Context parameters::

        ctx = Context(
            # Data loading
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            # Optimization
            shuffle_buffer_size=10000,
            prefetch=True,
            cache=True,
            # Training
            epochs=50,
            learning_rate=0.001,
            experiment_name="full-run",
            project="my_project",
            # Export
            export_path="models/production/",
            export_format="saved_model",
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        enable_cache: Whether to enable step caching.
        enable_checkpointing: Whether to enable checkpointing.
        project_name: Project to attach this pipeline to.
        version: Optional version string.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        enable_checkpointing=enable_checkpointing,
        project_name=project_name,
        version=version,
    )

    pipeline.add_step(load_data)
    pipeline.add_step(optimize_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(export_model)

    return pipeline

create_tf_evaluation_pipeline

create_tf_evaluation_pipeline(
    name: str = "tf_evaluation",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
) -> Pipeline

Create a pipeline for evaluating an existing TF/Keras model.

DAG: load_model → load_data → evaluate_model → inspect_model

Useful for model validation, A/B testing, and periodic evaluation against new data without retraining.

Context parameters::

ctx = Context(
    model_path="models/production/model.keras",
    file_path="data/test.csv",
    label_name="target",
    batch_size=64,
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'tf_evaluation'
context Context | None

FlowyML Context with parameters.

None
enable_cache bool

Whether to enable step caching.

True
project_name str | None

Project to attach this pipeline to.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/tensorflow/pipelines.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def create_tf_evaluation_pipeline(
    name: str = "tf_evaluation",
    context: Context | None = None,
    enable_cache: bool = True,
    project_name: str | None = None,
) -> Pipeline:
    """Create a pipeline for evaluating an existing TF/Keras model.

    **DAG**: ``load_model → load_data → evaluate_model → inspect_model``

    Useful for model validation, A/B testing, and periodic evaluation
    against new data without retraining.

    Context parameters::

        ctx = Context(
            model_path="models/production/model.keras",
            file_path="data/test.csv",
            label_name="target",
            batch_size=64,
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        enable_cache: Whether to enable step caching.
        project_name: Project to attach this pipeline to.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=enable_cache,
        project_name=project_name,
    )

    pipeline.add_step(load_model)
    pipeline.add_step(load_data)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(inspect_model)

    return pipeline

create_tf_export_pipeline

create_tf_export_pipeline(
    name: str = "tf_export",
    context: Context | None = None,
    project_name: str | None = None,
) -> Pipeline

Create a pipeline for exporting and saving an existing model.

DAG: load_model → export_model, save_model

Useful for converting a trained model to multiple formats (SavedModel, TFLite, Keras) and persisting to different locations.

Context parameters::

ctx = Context(
    model_path="models/trained/model.keras",
    export_path="models/exported/",
    export_format="saved_model",
    save_path="models/backup/model.keras",
)

Parameters:

Name Type Description Default
name str

Pipeline name.

'tf_export'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/tensorflow/pipelines.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def create_tf_export_pipeline(
    name: str = "tf_export",
    context: Context | None = None,
    project_name: str | None = None,
) -> Pipeline:
    """Create a pipeline for exporting and saving an existing model.

    **DAG**: ``load_model → export_model, save_model``

    Useful for converting a trained model to multiple formats
    (SavedModel, TFLite, Keras) and persisting to different locations.

    Context parameters::

        ctx = Context(
            model_path="models/trained/model.keras",
            export_path="models/exported/",
            export_format="saved_model",
            save_path="models/backup/model.keras",
        )

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,  # Always re-export
        project_name=project_name,
    )

    pipeline.add_step(load_model)
    pipeline.add_step(export_model)
    pipeline.add_step(save_model)

    return pipeline

create_tf_experiment_pipeline

create_tf_experiment_pipeline(
    name: str = "tf_experiment",
    context: Context | None = None,
    project_name: str | None = None,
    version: str | None = None,
    deploy_threshold: float = 0.8,
    threshold_metric: str = "accuracy",
) -> Pipeline

Create a full experiment pipeline with conditional deployment.

DAG::

load_data → train_model → evaluate_model
                               ↓
                      [if metric > threshold]
                               ↓
                      export_model → save_model

Integrates FlowyML experiment tracking and conditionally exports the model only if validation metrics exceed the given threshold.

Context parameters::

ctx = Context(
    file_path="data/train.csv",
    label_name="target",
    batch_size=32,
    epochs=30,
    learning_rate=0.001,
    experiment_name="experiment-v1",
    project="my_project",
    export_path="models/production/",
    save_path="models/checkpoints/model.keras",
)

pipeline = create_tf_experiment_pipeline(
    context=ctx,
    deploy_threshold=0.85,
    threshold_metric="accuracy",
)
result = pipeline.run()

Parameters:

Name Type Description Default
name str

Pipeline name.

'tf_experiment'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None
version str | None

Optional version string.

None
deploy_threshold float

Minimum metric value to trigger deployment.

0.8
threshold_metric str

Which metric to check against the threshold.

'accuracy'

Returns:

Type Description
Pipeline

Configured FlowyML Pipeline ready for .run().

Source code in mlpotion/integrations/flowyml/tensorflow/pipelines.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def create_tf_experiment_pipeline(
    name: str = "tf_experiment",
    context: Context | None = None,
    project_name: str | None = None,
    version: str | None = None,
    deploy_threshold: float = 0.8,
    threshold_metric: str = "accuracy",
) -> Pipeline:
    """Create a full experiment pipeline with conditional deployment.

    **DAG**::

        load_data → train_model → evaluate_model

                              [if metric > threshold]

                              export_model → save_model

    Integrates FlowyML experiment tracking and conditionally exports the
    model only if validation metrics exceed the given threshold.

    Context parameters::

        ctx = Context(
            file_path="data/train.csv",
            label_name="target",
            batch_size=32,
            epochs=30,
            learning_rate=0.001,
            experiment_name="experiment-v1",
            project="my_project",
            export_path="models/production/",
            save_path="models/checkpoints/model.keras",
        )

        pipeline = create_tf_experiment_pipeline(
            context=ctx,
            deploy_threshold=0.85,
            threshold_metric="accuracy",
        )
        result = pipeline.run()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.
        version: Optional version string.
        deploy_threshold: Minimum metric value to trigger deployment.
        threshold_metric: Which metric to check against the threshold.

    Returns:
        Configured FlowyML Pipeline ready for ``.run()``.
    """
    from flowyml.core.conditional import If

    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,
        enable_experiment_tracking=True,
        enable_checkpointing=True,
        project_name=project_name,
        version=version,
    )

    # Core training DAG
    pipeline.add_step(load_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)

    # Conditional deployment: only export if metric exceeds threshold
    deploy_condition = If(
        condition=lambda metrics: (
            metrics.get_metric(threshold_metric, 0) >= deploy_threshold
            if hasattr(metrics, "get_metric")
            else metrics.get(threshold_metric, 0) >= deploy_threshold
        ),
        then_steps=[export_model, save_model],
        name=f"deploy_if_{threshold_metric}_above_{deploy_threshold}",
    )
    pipeline.control_flows.append(deploy_condition)

    return pipeline

create_tf_scheduled_pipeline

create_tf_scheduled_pipeline(
    name: str = "tf_scheduled_retraining",
    context: Context | None = None,
    project_name: str | None = None,
    schedule: str = "0 2 * * 0",
    timezone: str = "UTC",
) -> dict[str, Any]

Create a scheduled retraining pipeline.

Returns both the pipeline and a configured scheduler so you can register periodic retraining (e.g., weekly) with a single call.

DAG: load_data → train_model → evaluate_model → export_model

Schedule format uses cron syntax (default: every Sunday at 2 AM)::

pipeline_info = create_tf_scheduled_pipeline(
    context=ctx,
    project_name="my_project",
    schedule="0 2 * * 0",  # Weekly
)

# Access the components
pipeline = pipeline_info["pipeline"]
scheduler = pipeline_info["scheduler"]

# Run once immediately
result = pipeline.run()

# Or start the scheduler for automatic retraining
scheduler.start()

Parameters:

Name Type Description Default
name str

Pipeline name.

'tf_scheduled_retraining'
context Context | None

FlowyML Context with parameters.

None
project_name str | None

Project to attach this pipeline to.

None
schedule str

Cron expression for scheduling.

'0 2 * * 0'
timezone str

Timezone for the schedule.

'UTC'

Returns:

Type Description
dict[str, Any]

Dict with pipeline and scheduler keys.

Source code in mlpotion/integrations/flowyml/tensorflow/pipelines.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def create_tf_scheduled_pipeline(
    name: str = "tf_scheduled_retraining",
    context: Context | None = None,
    project_name: str | None = None,
    schedule: str = "0 2 * * 0",
    timezone: str = "UTC",
) -> dict[str, Any]:
    """Create a scheduled retraining pipeline.

    Returns both the pipeline and a configured scheduler so you can
    register periodic retraining (e.g., weekly) with a single call.

    **DAG**: ``load_data → train_model → evaluate_model → export_model``

    Schedule format uses cron syntax (default: every Sunday at 2 AM)::

        pipeline_info = create_tf_scheduled_pipeline(
            context=ctx,
            project_name="my_project",
            schedule="0 2 * * 0",  # Weekly
        )

        # Access the components
        pipeline = pipeline_info["pipeline"]
        scheduler = pipeline_info["scheduler"]

        # Run once immediately
        result = pipeline.run()

        # Or start the scheduler for automatic retraining
        scheduler.start()

    Args:
        name: Pipeline name.
        context: FlowyML Context with parameters.
        project_name: Project to attach this pipeline to.
        schedule: Cron expression for scheduling.
        timezone: Timezone for the schedule.

    Returns:
        Dict with ``pipeline`` and ``scheduler`` keys.
    """
    from flowyml.core.scheduler import PipelineScheduler

    pipeline = Pipeline(
        name=name,
        context=context,
        enable_cache=False,  # Fresh data each run
        enable_checkpointing=True,
        project_name=project_name,
    )

    pipeline.add_step(load_data)
    pipeline.add_step(train_model)
    pipeline.add_step(evaluate_model)
    pipeline.add_step(export_model)

    # Configure scheduler
    scheduler = PipelineScheduler()
    scheduler.schedule(
        pipeline=pipeline,
        cron=schedule,
        timezone=timezone,
    )

    return {
        "pipeline": pipeline,
        "scheduler": scheduler,
    }

See the FlowyML Integration Guide for complete usage documentation and tutorials