π§© Sub-Pipeline Composition
What you'll learn
How to nest entire pipelines as steps in other pipelines for modular, reusable, and testable workflow design.
Sub-pipelines let you compose complex workflows from smaller, tested units β like function composition, but for entire DAGs. Each child pipeline runs in its own execution context while the parent manages data flow between them.
Why Sub-Pipelines? π€
| Monolithic Pipelines | Sub-Pipeline Composition |
|---|---|
| One giant pipeline | Modular, reusable units |
| Hard to test in isolation | Test each child independently |
| Code duplication | Share preprocessing across projects |
| Everything visible | Encapsulate implementation details |
Basic Usage π
from flowyml import Pipeline, step
# 1. Define reusable child pipeline
preprocess = Pipeline("preprocessing")
@step(outputs=["cleaned"])
def clean_data(raw_data):
return raw_data.dropna()
@step(inputs=["cleaned"], outputs=["normalized"])
def normalize(cleaned):
return (cleaned - cleaned.mean()) / cleaned.std()
preprocess.add_step(clean_data)
preprocess.add_step(normalize)
# 2. Compose into parent pipeline
parent = Pipeline("training")
parent.add_sub_pipeline(
preprocess,
inputs=["raw_data"],
outputs=["clean_data"],
)
parent.add_step(train_model)
parent.run()
Input / Output Mapping π₯
Map parent output names to child input names (and vice versa) when they differ:
parent.add_sub_pipeline(
preprocess,
inputs=["raw_data"],
outputs=["clean_data"],
input_mapping={"raw_data": "input_df"}, # parent name β child name
output_mapping={"normalized": "clean_data"}, # child name β parent name
)
How Mapping Works
graph LR
A["Parent: raw_data"] -- "input_mapping" --> B["Child: input_df"]
B --> C["Child Pipeline<br/>clean β normalize"]
C --> D["Child: normalized"]
D -- "output_mapping" --> E["Parent: clean_data"]
input_mapping: Translates parent output names into the child pipeline's expected input namesoutput_mapping: Translates child output names back into the parent's namespace- Unmapped keys are passed through as-is
Programmatic API π οΈ
Use SubPipelineStep directly for full control:
from flowyml.core.subpipeline import SubPipelineStep
sub_step = SubPipelineStep(
sub_pipeline=preprocess,
name="data_prep",
inputs=["raw"],
outputs=["clean"],
input_mapping={"raw": "input_df"},
output_mapping={"normalized": "clean"},
)
parent.add_step(sub_step)
Or use the convenience function:
from flowyml import sub_pipeline
parent.add_step(
sub_pipeline(
preprocess,
inputs=["raw"],
outputs=["clean"],
output_mapping={"normalized": "clean"},
)
)
Real-World Examples π
Shared Preprocessing Across Projects
# preprocessing.py β reusable module
def create_preprocess_pipeline():
pipe = Pipeline("preprocess")
pipe.add_step(load_data)
pipe.add_step(handle_missing)
pipe.add_step(encode_categoricals)
pipe.add_step(scale_features)
return pipe
# training.py
preprocess = create_preprocess_pipeline()
training = Pipeline("training")
training.add_sub_pipeline(preprocess, inputs=["raw"], outputs=["features"])
training.add_step(train_model)
training.add_step(evaluate_model)
Multi-Stage ML Pipeline
# Compose three child pipelines into one parent
parent = Pipeline("end_to_end")
parent.add_sub_pipeline(
data_pipeline,
inputs=["raw_data"],
outputs=["features"],
)
parent.add_sub_pipeline(
training_pipeline,
inputs=["features"],
outputs=["model"],
)
parent.add_sub_pipeline(
evaluation_pipeline,
inputs=["model", "features"],
outputs=["report"],
)
parent.run()
SubPipelineStep API Reference
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
sub_pipeline |
Pipeline |
required | The child pipeline to wrap |
name |
str \| None |
"sub:{pipeline.name}" |
Step name in parent |
inputs |
list[str] |
[] |
Input names from parent |
outputs |
list[str] |
[] |
Output names exposed to parent |
input_mapping |
dict[str, str] |
{} |
Parent β child name mapping |
output_mapping |
dict[str, str] |
{} |
Child β parent name mapping |
Best Practices π‘
Design for reusability
Build child pipelines as standalone modules with their own tests. Import and compose them in parent pipelines.
Naming convention
Use descriptive names: parent.add_sub_pipeline(preprocess, name="feature_engineering") makes logs easier to read.
Context isolation
Child pipelines run with auto_start_ui=False. They share the parent's storage but have their own execution context.