Pipelines π
Pipelines are the core abstraction in flowyml β they represent workflows that orchestrate your ML operations from data to deployment.
What you'll learn
How to design, build, and run production-grade pipelines. A well-designed pipeline is infrastructure-agnostic β write it once, run it anywhere.
Why Pipelines Matter π‘
Without pipelines, ML workflows are often:
- Scripts scattered across notebooks β Hard to reproduce, impossible to version
- Tightly coupled to infrastructure β Rewrite for every environment
- Manually orchestrated β Prone to human error, doesn't scale
- Opaque β Can't see what's running, debug failures, or track lineage
flowyml pipelines solve this by providing:
- Declarative workflows β Define what to do, not how to execute it
- Automatic dependency resolution β Steps run in the right order
- Built-in observability β Track every run, inspect every artifact
- Environment portability β Same code, different stacks
Pipeline Design Principles βοΈ
Before diving into code, understand these design principles:
1. Pure Functions π§ͺ
| # β
Good: Pure function, testable in isolation
@step(outputs=["processed"])
def clean_data(raw_data):
return raw_data.dropna().reset_index(drop=True)
# β Bad: Side effects, hard to test
@step
def clean_data():
global df # Don't do this!
df = df.dropna()
|
Why: Pure functions are testable, cacheable, and parallelizable.
2. Single Responsibility π―
| # β
Good: Focused steps
@step(outputs=["split_data"])
def split_data(data): ...
@step(inputs=["split_data"], outputs=["model"])
def train_model(split_data): ...
# β Bad: Doing too much
@step(outputs=["model"])
def split_and_train(data):
# Splitting and training in one step = can't cache independently
split = split_data(data)
model = train(split)
return model
|
Why: Granular steps enable better caching, debugging, and reuse.
3. Context Injection π
| # β
Good: Context injection
ctx = context(learning_rate=0.001, epochs=10)
pipeline = Pipeline("training", context=ctx)
@step(outputs=["model"])
def train(data, learning_rate: float, epochs: int):
# Parameters injected automatically
...
# β Bad: Hardcoded configuration
@step(outputs=["model"])
def train(data):
learning_rate = 0.001 # Can't change without code edit
epochs = 10
|
Why: Separation of code and config enables dev/staging/prod with one codebase.
4. Type-Based Routing π₯
Instead of manual wiring, use return type hints to let FlowyML route your assets.
| from flowyml import Model
# β
Good: Auto-routed to your Model Registry & Artifact Store
@step
def train(...) -> Model:
return Model(obj, name="my_model")
|
Why: Decouples your ML code from infrastructure. You define what it is, FlowyML handles where it goes.
The Pipeline Lifecycle π
Understanding the lifecycle of a FlowyML pipeline helps you build better production systems:
- Define: Create your steps using the
@step decorator and return typed Assets.
- Configure: Create a
context() with your hyperparameters and choose a Stack.
- Build: Assemble your DAG using
pipeline.add_step(). FlowyML validates your data flow here.
- Execute: Call
pipeline.run(). FlowyML handles caching, routing, and lineage.
- Observe: Use the UI to monitor the run real-time and inspect the resulting artifacts.
Creating Your First Pipeline π
Here's a complete, runnable example:
| import pandas as pd
from flowyml import Pipeline, step, Dataset
# Define steps
@step(outputs=["raw_data"])
def extract() -> Dataset:
df = pd.DataFrame({"val": [1, 2, 3, 4, 5]})
return Dataset(df, name="raw_numbers")
@step(inputs=["raw_data"], outputs=["processed_data"])
def transform(raw_data: pd.DataFrame):
return raw_data * 2
# Create pipeline
pipeline = Pipeline("etl_pipeline")
# Chaining steps
pipeline.add_step(extract).add_step(transform)
# Run the pipeline
result = pipeline.run()
if result.success:
print(f"β Pipeline run complete: {result.run_id}")
|
Tip
You can view the DAG of any pipeline before running it by calling pipeline.build() followed by print(pipeline.dag.visualize()).
Pipeline Configuration βοΈ
The Pipeline class accepts several configuration options:
| from flowyml import Pipeline, context
ctx = context(
learning_rate=0.001,
batch_size=32
)
pipeline = Pipeline(
name="training_pipeline",
context=ctx, # Context for parameter injection
enable_cache=True, # Enable intelligent caching
cache_dir="./my_cache", # Custom cache directory
stack=my_stack, # Execution stack (local, cloud, etc.)
project_name="ml_project", # Automatically creates/attaches to project
version="v1.0.0", # Optional: creates VersionedPipeline automatically
enable_checkpointing=True, # Enable automatic checkpointing (default: True)
enable_experiment_tracking=True # Enable automatic experiment tracking (default: True)
)
|
Configuration Options
| Parameter |
Type |
Description |
Default |
name |
str |
Pipeline name (required) |
- |
context |
Context |
Context object for parameter injection |
Context() |
executor |
Executor |
Custom executor |
LocalExecutor() |
enable_cache |
bool |
Enable step caching |
True |
cache_dir |
str |
Cache storage directory |
.flowyml/cache |
stack |
Stack |
Execution stack (local/cloud) |
None |
project_name |
str |
Project name (creates/attaches automatically) |
None |
version |
str |
Version string (creates VersionedPipeline) |
None |
enable_checkpointing |
bool |
Enable automatic checkpointing |
True |
enable_experiment_tracking |
bool |
Enable automatic experiment tracking |
True (config default) |
auto_discover |
bool |
Auto-discover @step-decorated functions from the global registry at build time |
False |
Execution Graph (DAG) πΈοΈ
When you add steps to a pipeline, flowyml builds a Directed Acyclic Graph (DAG):
- Nodes: Steps in the pipeline
- Edges: Data dependencies between steps
flowyml analyzes the inputs and outputs of each step to determine the execution order automatically.
| @step(outputs=["data"])
def step_a():
return [1, 2, 3]
@step(inputs=["data"], outputs=["result"])
def step_b(data):
return sum(data)
# flowyml automatically determines step_a must run before step_b
|
Running Pipelines βΆοΈ
Basic Execution
| # Run the pipeline
result = pipeline.run()
# Check success
if result.success:
print(f"β Pipeline completed successfully!")
print(f"Outputs: {result.outputs}")
else:
print(f"β Pipeline failed")
|
With Runtime Overrides
You can override configuration at runtime:
| # Override context
result = pipeline.run(context={"learning_rate": 0.05})
# Use different stack
result = pipeline.run(stack=my_production_stack)
# Enable debug mode
result = pipeline.run(debug=True)
|
The PipelineResult Object π
The result of a pipeline execution is a PipelineResult object:
| result = pipeline.run()
# Properties
result.run_id # Unique run identifier
result.pipeline_name # Pipeline name
result.success # Boolean: overall success
result.outputs # Dict: all step outputs
result.step_results # Dict: detailed results per step
result.duration_seconds # Total execution time
# Methods
result.summary() # Human-readable summary
result.to_dict() # Convert to dictionary
result["step_name"] # Access specific output
|
Example: Inspecting Results
| result = pipeline.run()
if result.success:
print(result.summary())
# Access step results
for step_name, step_result in result.step_results.items():
print(f"Step: {step_name}")
print(f" Duration: {step_result.duration_seconds:.2f}s")
print(f" Cached: {step_result.cached}")
if step_result.artifact_uri:
print(f" Artifact: {step_result.artifact_uri}")
|
Pipeline Building ποΈ
Adding Steps
Steps are added in the order they should be registered with the pipeline:
| pipeline = Pipeline("ml_workflow")
# Add steps
pipeline.add_step(load_data)
pipeline.add_step(preprocess)
pipeline.add_step(train_model)
pipeline.add_step(evaluate)
# Add returns the pipeline for chaining
pipeline = (Pipeline("workflow")
.add_step(step1)
.add_step(step2)
.add_step(step3)
)
|
DAG Visualization
| # Build the DAG
pipeline.build()
# Visualize the execution graph
print(pipeline.dag.visualize())
|
Output:
| Step Execution Order:
1. load_data
2. preprocess (depends on: load_data)
3. train_model (depends on: preprocess)
4. evaluate (depends on: train_model)
|
Working with Stacks π§©
Stacks define where and how your pipeline executes:
| from flowyml import LocalStack
# Create a local stack with custom paths
stack = LocalStack(
artifact_path=".flowyml/artifacts",
metadata_path=".flowyml/metadata.db"
)
# Use stack in pipeline
pipeline = Pipeline("my_pipeline", stack=stack)
# Or set at runtime
result = pipeline.run(stack=stack)
|
See the Stack Architecture guide for more details on stacks.
Advanced Features β‘
Pipeline Versioning (VersionedPipeline)
Create a versioned pipeline simply by passing version= to the Pipeline constructor:
| from flowyml import Pipeline
# Automatically creates a VersionedPipeline under the hood
pipeline = Pipeline("training", version="v1.0.0")
pipeline.add_step(train_model).add_step(evaluate)
# Compare versions
pipeline.compare_with("v0.9.0")
# Save immutable snapshots
from flowyml import freeze_pipeline
snapshot = freeze_pipeline(pipeline)
print(snapshot.snapshot_hash) # SHA-256 seal
assert snapshot.verify() # Verify integrity
|
Pipeline from Steps (Factory)
Create a pipeline without repetitive add_step() calls:
| pipeline = Pipeline.from_steps(
load_data,
preprocess,
train_model,
evaluate,
name="training",
enable_cache=False,
)
|
Sub-Pipeline Composition
Nest entire pipelines as steps, with input/output mapping:
| # Create reusable sub-pipeline
preprocess = Pipeline("preprocessing")
preprocess.add_step(clean_data).add_step(normalize)
# Use as a step in parent pipeline
parent = Pipeline("training")
parent.add_sub_pipeline(
preprocess,
inputs=["raw_data"],
outputs=["clean_data"],
input_mapping={"raw_data": "input"}, # Maps parentβchild names
output_mapping={"output": "clean_data"}, # Maps childβparent names
)
parent.add_step(train_model)
|
See the full guide: Sub-Pipelines
Selective Re-Execution (Rerun)
Resume a failed pipeline from its checkpoint:
| # Resume from last checkpoint
result = pipeline.rerun(run_id="previous-run-id")
# Resume from a specific step (skip steps before it)
result = pipeline.rerun(run_id="previous-run-id", from_step="train_model")
|
Tip
rerun() requires checkpointing to be enabled. Set enable_checkpointing=True in the Pipeline constructor (it's True by default via config).
See the full guide: Checkpointing
Build-Time Type Validation
pipeline.build() automatically validates type compatibility between connected steps:
| @step(outputs=["model"])
def train() -> Model:
return Model(clf)
@step(inputs=["model"])
def evaluate(model: Dataset): # β Type mismatch! Gets Model, expects Dataset
pass
pipeline.add_step(train).add_step(evaluate)
pipeline.build() # Raises: Pipeline type validation failed
|
The validator checks:
- Optional, Union, list[T], dict[K,V] compatibility
- Returns both errors (hard mismatches) and warnings (possible issues)
Conditional Execution
| from flowyml import when
@step(outputs=["data"])
def load():
return {"quality": 0.95}
@step(inputs=["data"], outputs=["model"])
@when(lambda data: data["quality"] > 0.9)
def train(data):
return "trained_model"
# train() only executes if quality > 0.9
|
Parallel Execution
| from flowyml import parallel_map
@step
def process_batch(items):
return parallel_map(expensive_function, items, num_workers=4)
|
Map Tasks
Distribute work over collections with concurrency controls and per-item retries:
| from flowyml import map_task
@map_task(concurrency=8, retries=2, min_success_ratio=0.95)
def process_document(doc: dict) -> dict:
return transform(doc)
result = process_document(documents)
print(f"Processed {result.successes}/{result.total}")
|
See the full guide: Map Tasks
Dynamic Workflows
Generate sub-pipelines at runtime based on intermediate results:
| from flowyml import dynamic, Pipeline, step
@dynamic(outputs=["best_model"])
def hyperparameter_search(config: dict):
sub = Pipeline("hp_search")
for lr in config["learning_rates"]:
@step(outputs=[f"model_lr_{lr}"])
def train(learning_rate=lr):
return train_model(learning_rate)
sub.add_step(train)
return sub
|
See the full guide: Dynamic Workflows
Error Handling
| from flowyml import retry, on_failure
@step
@retry(max_attempts=3, backoff=2.0)
@on_failure(fallback_function)
def flaky_step():
# Retries up to 3 times with exponential backoff
return fetch_external_data()
|
See the Stack Architecture guide for more details on stacks.
Patterns & Anti-Patterns β
β
Pattern: Environment-Agnostic Design
| # Same pipeline code works everywhere
ctx_dev = context(data_path="./local_data.csv", batch_size=32)
ctx_prod = context(data_path="gs://bucket/data.csv", batch_size=512)
# Development
pipeline = Pipeline("ml_training", context=ctx_dev)
pipeline.run()
# Production (same code!)
pipeline = Pipeline("ml_training", context=ctx_prod, stack=prod_stack)
pipeline.run()
|
Why this works: Zero code changes from dev to prod. Just swap context and stack.
β Anti-Pattern: Environment-Specific Branches
| # Don't do this!
@step(outputs=["data"])
def load_data():
if os.getenv("ENV") == "production":
return load_from_gcs()
else:
return load_from_local()
|
Why it's bad: Logic pollution, hard to test, environments drift apart.
Fix: Use context and stacks to handle environment differences.
β
Pattern: Composition Over Inheritance
| # Reusable, composable steps
@step(outputs=["data"])
def load_csv(path: str):
return pd.read_csv(path)
@step(inputs=["data"], outputs=["clean"])
def clean(data):
return data.dropna()
# Build multiple pipelines from same steps
etl_pipeline = Pipeline("etl").add_step(load_csv).add_step(clean)
validation_pipeline = Pipeline("validation").add_step(load_csv).add_step(validate)
|
Why this works: Steps are building blocks. Mix and match for different workflows.
β Anti-Pattern: Monolithic Pipelines
| # Don't do this!
@step(outputs=["everything"])
def do_everything():
data = load()
clean = process(data)
model = train(clean)
metrics = evaluate(model)
deploy(model)
return metrics
|
Why it's bad: Can't cache parts, can't parallelize, can't reuse, hard to debug.
β
Pattern: Fail Fast with Validation Steps
| @step(outputs=["data"])
def load_data():
return fetch_data()
@step(inputs=["data"])
def validate_data(data):
if len(data) < 100:
raise ValueError("Insufficient data")
if data['target'].isnull().any():
raise ValueError("Missing target values")
# Validation passes, no output needed
@step(inputs=["data"], outputs=["model"])
def train_model(data):
# Only runs if validation passed
return train(data)
|
Why this works: Catch problems early, save expensive compute on bad data.
Decision Guide βοΈ
| Scenario |
Split? |
Reason |
| Step takes >5 minutes to run |
β
Yes |
Better caching granularity |
| Might want to run parts separately |
β
Yes |
Enables reuse |
| Operation is expensive (GPU, API calls) |
β
Yes |
Cache results independently |
| Tightly coupled operations (save + load same artifact) |
β No |
Keep together for atomicity |
| Fast operations (<1 second) |
β Maybe |
Balance overhead vs. benefit |
Real-World Examples π
ML Training Pipeline
| from flowyml import Pipeline, step, context
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
ctx = context(
data_path="data/train.csv",
test_size=0.2,
n_estimators=100,
random_state=42
)
@step(outputs=["raw_data"])
def load_data(data_path: str):
return pd.read_csv(data_path)
@step(inputs=["raw_data"], outputs=["X_train", "X_test", "y_train", "y_test"])
def split_data(raw_data, test_size: float, random_state: int):
X = raw_data.drop('target', axis=1)
y = raw_data['target']
return train_test_split(X, y, test_size=test_size, random_state=random_state)
@step(inputs=["X_train", "y_train"], outputs=["model"])
def train(X_train, y_train, n_estimators: int):
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X_train, y_train)
return model
@step(inputs=["model", "X_test", "y_test"], outputs=["accuracy"])
def evaluate(model, X_test, y_test):
return model.score(X_test, y_test)
# Create pipeline with project (will create project if it doesn't exist)
pipeline = Pipeline("ml_training", context=ctx, project_name="ml_platform")
pipeline.add_step(load_data)
pipeline.add_step(split_data)
pipeline.add_step(train)
pipeline.add_step(evaluate)
result = pipeline.run()
print(f"Model accuracy: {result.outputs['accuracy']:.2%}")
|
Why this design works:
- Each step is independently cacheable
- Parameters in context, not hardcoded
- Can easily add more steps (preprocessing, feature engineering)
- Testable components
Best Practices π‘
- Name descriptively:
customer_churn_prediction > pipeline1
- Explicit dependencies: Always declare
inputs and outputs
- One responsibility per step: Split complex logic into focused steps
- Context for configuration: Never hardcode parameters
- Enable caching in dev: Speed up iteration, disable in prod if needed
- Fail fast: Validate early, don't waste compute on bad data
- Test in isolation: Each step should be unit-testable
- Document assumptions: Use docstrings to explain step requirements
- Version your pipelines: Use Git for pipeline code versioning
- Monitor in production: Use the UI to track runs and catch failures
Next Steps π
Master the building blocks:
- Steps: Deep dive into step configuration, decorators, and best practices
- Context: Learn parameter injection patterns and environment management
- Assets: Work with typed artifacts (Datasets, Models, Metrics)
Level up your pipelines:
- Caching: Optimize iteration speed with intelligent caching
- Conditional Execution: Build adaptive workflows
- Parallel Execution: Speed up independent operations
- Error Handling: Build resilient production pipelines
- Map Tasks: Distribute work with concurrency and retries
- Dynamic Workflows: Runtime-generated sub-pipelines
- Sub-Pipelines: Compose pipelines hierarchically
- Checkpointing: Resume failed runs from where they left off
- Artifact Catalog: Centralized artifact discovery and lineage
Deploy to production:
- Stack Architecture: Understand local vs. cloud execution and YAML-driven stack hydration
- Projects: Organize multi-tenant deployments
- Scheduling: Automate recurring pipelines