Pipeline Versioning π
Track changes to your pipelines like you track code with Git.
What you'll learn
How to version your pipelines and compare changes over time. "What changed between v1 and v2?" is a question you'll ask every week β versioning answers it instantly.
Why Versioning Matters
Without versioning:
- No audit trail: "Who changed the data loader last month?"
- Risky deployments: You don't know what changed
- Lost history: Can't rollback to a working version
With flowyml versioning:
- Full history: See exactly what changed and when
- Safe releases: Compare new version with production before deploying
- Easy rollback: Restore a previous version instantly
When to Version
| Scenario |
Action |
| Before deploying to production |
Always save a version |
| After adding/removing steps |
Save with descriptive metadata |
| Major logic changes |
Bump version number (v1.0 β v2.0) |
| Daily experiments |
Not necessaryβonly version what you'll deploy |
Overview βΉοΈ
The VersionedPipeline class provides:
- Save pipeline versions with metadata
- Compare versions to see what changed
- Track pipeline evolution over time
- Maintain a history of pipeline configurations
Basic Usage π
| from flowyml import VersionedPipeline, step, context
# Create a versioned pipeline with context
ctx = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training_pipeline", context=ctx, version="v1.0.0")
# Add steps
@step(outputs=["data"])
def load_data():
return load_from_source()
@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
return train_model(data, learning_rate, epochs)
pipeline.add_step(load_data)
pipeline.add_step(train)
# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})
|
Using Pipeline with Version Parameter
You can also create a versioned pipeline directly using the Pipeline class with a version parameter:
| from flowyml import Pipeline, step, context
ctx = context(learning_rate=0.001, epochs=10)
# Automatically creates a VersionedPipeline when version is provided
pipeline = Pipeline("training_pipeline", context=ctx, version="v1.0.1")
@step(outputs=["data"])
def load_data():
return load_from_source()
@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
return train_model(data, learning_rate, epochs)
pipeline.add_step(load_data)
pipeline.add_step(train)
# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})
|
Comparing Versions π
| # Make changes to the pipeline
@step(inputs=["model"], outputs=["metrics"])
def evaluate(model):
return evaluate_model(model)
pipeline.add_step(evaluate)
pipeline.version = "v1.1.0"
pipeline.save_version(metadata={"description": "Added evaluation step"})
# Compare versions
diff = pipeline.compare_with("v1.0.0")
print(diff)
# Output:
# {
# 'current_version': 'v1.1.0',
# 'compared_to': 'v1.0.0',
# 'added_steps': ['evaluate'],
# 'removed_steps': [],
# 'modified_steps': [],
# 'step_order_changed': True,
# 'context_changes': {...}
# }
# Display comparison in readable format
pipeline.display_comparison("v1.0.0")
|
Listing Versions π
| # Get all saved versions
versions = pipeline.list_versions()
print(versions) # ['v1.0.0', 'v1.1.0']
# Get specific version details
version_info = pipeline.get_version("v1.0.0")
print(version_info.steps)
print(version_info.created_at)
print(version_info.metadata)
|
Version Storage πΎ
Versions are stored as JSON files in .flowyml/versions/{pipeline_name}/ by default. You can customize the storage location:
| pipeline = VersionedPipeline(
"my_pipeline",
versions_dir="/custom/path/versions"
)
|
Best Practices π‘
1. Use Semantic Versioning
| pipeline.version = "v1.0.0" # Major.Minor.Patch
|
| pipeline.save_version(metadata={
"description": "Added data validation step",
"author": "data-team"impact": "improved data quality",
"breaking_changes": False
})
|
3. Compare Before Deploying
| # Always compare with production version before deploying
if pipeline.version != "v1.0.0": # production version
diff = pipeline.compare_with("v1.0.0")
if diff['removed_steps'] or diff['modified_steps']:
print("β οΈ Breaking changes detected!")
pipeline.display_comparison("v1.0.0")
|
Advanced Features β‘
Hash-Based Change Detection
The versioning system uses content hashing to detect changes in step implementations:
| # Same step name, different implementation
@step(outputs=["data"])
def load_data():
# Modified implementation
return load_from_new_source() # Changed!
pipeline.save_version()
# Will detect that load_data was modified
|
Context Parameter Tracking
Changes to context parameters are automatically tracked. You can pass context when creating the pipeline:
| from flowyml import VersionedPipeline, context
ctx1 = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training", context=ctx1, version="v1.0.0")
pipeline.save_version()
# Change context
ctx2 = context(learning_rate=0.01, epochs=20) # Changed!
pipeline = VersionedPipeline("training", context=ctx2, version="v1.1.0")
pipeline.save_version()
diff = pipeline.compare_with("v1.0.0")
# Will show context_changes with added, removed, and modified parameters
|
Integration with CI/CD π
| # In your CI pipeline
def verify_version_changes():
pipeline = VersionedPipeline.load("production_pipeline")
# Get current production version
prod_version = get_production_version()
# Compare
diff = pipeline.compare_with(prod_version)
# Enforce policies
if diff['removed_steps']:
raise ValueError("Cannot remove steps in minor version update")
if diff['modified_steps']:
# Require integration tests
run_integration_tests()
return diff
|
API Reference π
VersionedPipeline
Constructor:
| VersionedPipeline(
name: str,
version: str = "v0.1.0",
versions_dir: str = ".flowyml/versions",
context: Context | None = None,
**kwargs # Additional Pipeline parameters (project_name, executor, etc.)
)
|
Methods:
- save_version(metadata: Optional[Dict] = None) - Save current version
- list_versions() -> List[str] - List all saved versions
- get_version(version: str) -> PipelineVersion - Get version details
- compare_with(other_version: str) -> Dict - Compare with another version
- display_comparison(other_version: str) - Pretty print comparison
- run(*args, **kwargs) - Run the pipeline (inherited from Pipeline)
PipelineVersion
Attributes:
- version: str - Version identifier
- pipeline_name: str - Pipeline name
- created_at: str - Creation timestamp
- steps: List[str] - List of step names
- step_hashes: Dict[str, str] - Step content hashes
- context_params: Dict - Context parameters
- metadata: Dict - Custom metadata