Pipelines π
Pipelines are the core abstraction in flowyml β they represent workflows that orchestrate your ML operations from data to deployment.
[!NOTE] What you'll learn: How to design, build, and run production-grade pipelines
Key insight: A well-designed pipeline is infrastructure-agnostic. Write it once, run it anywhere (local, staging, production) without code changes.
Why Pipelines Matter
Without pipelines, ML workflows are often: - Scripts scattered across notebooks β Hard to reproduce, impossible to version - Tightly coupled to infrastructure β Rewrite for every environment - Manually orchestrated β Prone to human error, doesn't scale - Opaque β Can't see what's running, debug failures, or track lineage
flowyml pipelines solve this by providing: - Declarative workflows β Define what to do, not how to execute it - Automatic dependency resolution β Steps run in the right order - Built-in observability β Track every run, inspect every artifact - Environment portability β Same code, different stacks
Pipeline Design Principles
Before diving into code, understand these design principles:
1. Steps Should Be Pure Functions
# β
Good: Pure function, testable in isolation
@step(outputs=["processed"])
def clean_data(raw_data):
return raw_data.dropna().reset_index(drop=True)
# β Bad: Side effects, hard to test
@step
def clean_data():
global df # Don't do this!
df = df.dropna()
Why: Pure functions are testable, cacheable, and parallelizable.
2. One Step, One Responsibility
# β
Good: Focused steps
@step(outputs=["split_data"])
def split_data(data): ...
@step(inputs=["split_data"], outputs=["model"])
def train_model(split_data): ...
# β Bad: Doing too much
@step(outputs=["model"])
def split_and_train(data):
# Splitting and training in one step = can't cache independently
split = split_data(data)
model = train(split)
return model
Why: Granular steps enable better caching, debugging, and reuse.
3. Configuration Belongs in Context
# β
Good: Context injection
ctx = context(learning_rate=0.001, epochs=10)
pipeline = Pipeline("training", context=ctx)
@step(outputs=["model"])
def train(data, learning_rate: float, epochs: int):
# Parameters injected automatically
...
# β Bad: Hardcoded configuration
@step(outputs=["model"])
def train(data):
learning_rate = 0.001 # Can't change without code edit
epochs = 10
Why: Separation of code and config enables dev/staging/prod with one codebase.
Creating Your First Pipeline
Here's a complete, runnable example:
from flowyml import Pipeline, step, context
# Define steps
@step(outputs=["raw_data"])
def extract():
return [1, 2, 3, 4, 5]
@step(inputs=["raw_data"], outputs=["processed_data"])
def transform(raw_data):
return [x * 2 for x in raw_data]
# Create pipeline
pipeline = Pipeline("etl_pipeline")
# Add steps in execution order
pipeline.add_step(extract)
pipeline.add_step(transform)
# Run the pipeline
result = pipeline.run()
if result.success:
print(f"β Processed data: {result.outputs['processed_data']}")
What just happened: flowyml built a DAG, determined execution order, and ran your steps. No Airflow DAG files, no Kubeflow YAML, just Python.
Pipeline Configuration βοΈ
The Pipeline class accepts several configuration options:
from flowyml import Pipeline, context
ctx = context(
learning_rate=0.001,
batch_size=32
)
pipeline = Pipeline(
name="training_pipeline",
context=ctx, # Context for parameter injection
enable_cache=True, # Enable intelligent caching
cache_dir="./my_cache", # Custom cache directory
stack=my_stack, # Execution stack (local, cloud, etc.)
project_name="ml_project", # Automatically creates/attaches to project
version="v1.0.0", # Optional: creates VersionedPipeline automatically
enable_checkpointing=True, # Enable automatic checkpointing (default: True)
enable_experiment_tracking=True # Enable automatic experiment tracking (default: True)
)
Configuration Options
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Pipeline name (required) | - |
context |
Context |
Context object for parameter injection | Context() |
executor |
Executor |
Custom executor | LocalExecutor() |
enable_cache |
bool |
Enable step caching | True |
cache_dir |
str |
Cache storage directory | .flowyml/cache |
stack |
Stack |
Execution stack (local/cloud) | None |
project_name |
str |
Project name (creates/attaches automatically) | None |
version |
str |
Version string (creates VersionedPipeline) | None |
enable_checkpointing |
bool |
Enable automatic checkpointing | True |
enable_experiment_tracking |
bool |
Enable automatic experiment tracking | True (config default) |
Execution Graph (DAG) πΈοΈ
When you add steps to a pipeline, flowyml builds a Directed Acyclic Graph (DAG):
- Nodes: Steps in the pipeline
- Edges: Data dependencies between steps
flowyml analyzes the inputs and outputs of each step to determine the execution order automatically.
@step(outputs=["data"])
def step_a():
return [1, 2, 3]
@step(inputs=["data"], outputs=["result"])
def step_b(data):
return sum(data)
# flowyml automatically determines step_a must run before step_b
Running Pipelines βΆοΈ
Basic Execution
# Run the pipeline
result = pipeline.run()
# Check success
if result.success:
print(f"β Pipeline completed successfully!")
print(f"Outputs: {result.outputs}")
else:
print(f"β Pipeline failed")
With Runtime Overrides
You can override configuration at runtime:
# Override context
result = pipeline.run(context={"learning_rate": 0.05})
# Use different stack
result = pipeline.run(stack=my_production_stack)
# Enable debug mode
result = pipeline.run(debug=True)
The PipelineResult Object
The result of a pipeline execution is a PipelineResult object:
result = pipeline.run()
# Properties
result.run_id # Unique run identifier
result.pipeline_name # Pipeline name
result.success # Boolean: overall success
result.outputs # Dict: all step outputs
result.step_results # Dict: detailed results per step
result.duration_seconds # Total execution time
# Methods
result.summary() # Human-readable summary
result.to_dict() # Convert to dictionary
result["step_name"] # Access specific output
Example: Inspecting Results
result = pipeline.run()
if result.success:
print(result.summary())
# Access step results
for step_name, step_result in result.step_results.items():
print(f"Step: {step_name}")
print(f" Duration: {step_result.duration_seconds:.2f}s")
print(f" Cached: {step_result.cached}")
if step_result.artifact_uri:
print(f" Artifact: {step_result.artifact_uri}")
Pipeline Building
Adding Steps
Steps are added in the order they should be registered with the pipeline:
pipeline = Pipeline("ml_workflow")
# Add steps
pipeline.add_step(load_data)
pipeline.add_step(preprocess)
pipeline.add_step(train_model)
pipeline.add_step(evaluate)
# Add returns the pipeline for chaining
pipeline = (Pipeline("workflow")
.add_step(step1)
.add_step(step2)
.add_step(step3)
)
DAG Visualization
Output:
Step Execution Order:
1. load_data
2. preprocess (depends on: load_data)
3. train_model (depends on: preprocess)
4. evaluate (depends on: train_model)
Working with Stacks π§©
Stacks define where and how your pipeline executes:
from flowyml import LocalStack
# Create a local stack with custom paths
stack = LocalStack(
artifact_path=".flowyml/artifacts",
metadata_path=".flowyml/metadata.db"
)
# Use stack in pipeline
pipeline = Pipeline("my_pipeline", stack=stack)
# Or set at runtime
result = pipeline.run(stack=stack)
See the Stack Architecture guide for more details on stacks.
Advanced Features
Conditional Execution
from flowyml import when
@step(outputs=["data"])
def load():
return {"quality": 0.95}
@step(inputs=["data"], outputs=["model"])
@when(lambda data: data["quality"] > 0.9)
def train(data):
return "trained_model"
# train() only executes if quality > 0.9
Parallel Execution
from flowyml import parallel_map
@step
def process_batch(items):
return parallel_map(expensive_function, items, num_workers=4)
Error Handling
from flowyml import retry, on_failure
@step
@retry(max_attempts=3, backoff=2.0)
@on_failure(fallback_function)
def flaky_step():
# Retries up to 3 times with exponential backoff
return fetch_external_data()
Pipeline Patterns & Anti-Patterns
β Pattern: Environment-Agnostic Design
# Same pipeline code works everywhere
ctx_dev = context(data_path="./local_data.csv", batch_size=32)
ctx_prod = context(data_path="gs://bucket/data.csv", batch_size=512)
# Development
pipeline = Pipeline("ml_training", context=ctx_dev)
pipeline.run()
# Production (same code!)
pipeline = Pipeline("ml_training", context=ctx_prod, stack=prod_stack)
pipeline.run()
Why this works: Zero code changes from dev to prod. Just swap context and stack.
β Anti-Pattern: Environment-Specific Branches
# Don't do this!
@step(outputs=["data"])
def load_data():
if os.getenv("ENV") == "production":
return load_from_gcs()
else:
return load_from_local()
Why it's bad: Logic pollution, hard to test, environments drift apart.
Fix: Use context and stacks to handle environment differences.
β Pattern: Composition Over Inheritance
# Reusable, composable steps
@step(outputs=["data"])
def load_csv(path: str):
return pd.read_csv(path)
@step(inputs=["data"], outputs=["clean"])
def clean(data):
return data.dropna()
# Build multiple pipelines from same steps
etl_pipeline = Pipeline("etl").add_step(load_csv).add_step(clean)
validation_pipeline = Pipeline("validation").add_step(load_csv).add_step(validate)
Why this works: Steps are building blocks. Mix and match for different workflows.
β Anti-Pattern: Monolithic Pipelines
# Don't do this!
@step(outputs=["everything"])
def do_everything():
data = load()
clean = process(data)
model = train(clean)
metrics = evaluate(model)
deploy(model)
return metrics
Why it's bad: Can't cache parts, can't parallelize, can't reuse, hard to debug.
β Pattern: Fail Fast with Validation Steps
@step(outputs=["data"])
def load_data():
return fetch_data()
@step(inputs=["data"])
def validate_data(data):
if len(data) < 100:
raise ValueError("Insufficient data")
if data['target'].isnull().any():
raise ValueError("Missing target values")
# Validation passes, no output needed
@step(inputs=["data"], outputs=["model"])
def train_model(data):
# Only runs if validation passed
return train(data)
Why this works: Catch problems early, save expensive compute on bad data.
Decision Guide: When to Split Steps
| Scenario | Split? | Reason |
|---|---|---|
| Step takes >5 minutes to run | β Yes | Better caching granularity |
| Might want to run parts separately | β Yes | Enables reuse |
| Operation is expensive (GPU, API calls) | β Yes | Cache results independently |
| Tightly coupled operations (save + load same artifact) | β No | Keep together for atomicity |
| Fast operations (<1 second) | β Maybe | Balance overhead vs. benefit |
Real-World Pipeline Examples
ML Training Pipeline
from flowyml import Pipeline, step, context
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
ctx = context(
data_path="data/train.csv",
test_size=0.2,
n_estimators=100,
random_state=42
)
@step(outputs=["raw_data"])
def load_data(data_path: str):
return pd.read_csv(data_path)
@step(inputs=["raw_data"], outputs=["X_train", "X_test", "y_train", "y_test"])
def split_data(raw_data, test_size: float, random_state: int):
X = raw_data.drop('target', axis=1)
y = raw_data['target']
return train_test_split(X, y, test_size=test_size, random_state=random_state)
@step(inputs=["X_train", "y_train"], outputs=["model"])
def train(X_train, y_train, n_estimators: int):
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X_train, y_train)
return model
@step(inputs=["model", "X_test", "y_test"], outputs=["accuracy"])
def evaluate(model, X_test, y_test):
return model.score(X_test, y_test)
# Create pipeline with project (will create project if it doesn't exist)
pipeline = Pipeline("ml_training", context=ctx, project_name="ml_platform")
pipeline.add_step(load_data)
pipeline.add_step(split_data)
pipeline.add_step(train)
pipeline.add_step(evaluate)
result = pipeline.run()
print(f"Model accuracy: {result.outputs['accuracy']:.2%}")
Why this design works: - Each step is independently cacheable - Parameters in context, not hardcoded - Can easily add more steps (preprocessing, feature engineering) - Testable components
Best Practices Summary
- Name descriptively:
customer_churn_prediction>pipeline1 - Explicit dependencies: Always declare
inputsandoutputs - One responsibility per step: Split complex logic into focused steps
- Context for configuration: Never hardcode parameters
- Enable caching in dev: Speed up iteration, disable in prod if needed
- Fail fast: Validate early, don't waste compute on bad data
- Test in isolation: Each step should be unit-testable
- Document assumptions: Use docstrings to explain step requirements
- Version your pipelines: Use Git for pipeline code versioning
- Monitor in production: Use the UI to track runs and catch failures
Next Steps π
Master the building blocks: - Steps: Deep dive into step configuration, decorators, and best practices - Context: Learn parameter injection patterns and environment management - Assets: Work with typed artifacts (Datasets, Models, Metrics)
Level up your pipelines: - Caching: Optimize iteration speed with intelligent caching - Conditional Execution: Build adaptive workflows - Parallel Execution: Speed up independent operations - Error Handling: Build resilient production pipelines
Deploy to production: - Stack Architecture: Understand local vs. cloud execution - Projects: Organize multi-tenant deployments - Scheduling: Automate recurring pipelines