Pipeline Versioning π
Track changes to your pipelines like you track code with Git.
[!NOTE] What you'll learn: How to version your pipelines and compare changes over time
Key insight: "What changed between v1 and v2?" is a question you'll ask every week. Versioning answers it instantly.
Why Versioning Matters
Without versioning: - No audit trail: "Who changed the data loader last month?" - Risky deployments: You don't know what changed - Lost history: Can't rollback to a working version
With flowyml versioning: - Full history: See exactly what changed and when - Safe releases: Compare new version with production before deploying - Easy rollback: Restore a previous version instantly
When to Version
| Scenario | Action |
|---|---|
| Before deploying to production | Always save a version |
| After adding/removing steps | Save with descriptive metadata |
| Major logic changes | Bump version number (v1.0 β v2.0) |
| Daily experiments | Not necessaryβonly version what you'll deploy |
Overview βΉοΈ
The VersionedPipeline class provides:
- Save pipeline versions with metadata
- Compare versions to see what changed
- Track pipeline evolution over time
- Maintain a history of pipeline configurations
Basic Usage π
from flowyml import VersionedPipeline, step, context
# Create a versioned pipeline with context
ctx = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training_pipeline", context=ctx, version="v1.0.0")
# Add steps
@step(outputs=["data"])
def load_data():
return load_from_source()
@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
return train_model(data, learning_rate, epochs)
pipeline.add_step(load_data)
pipeline.add_step(train)
# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})
Using Pipeline with Version Parameter
You can also create a versioned pipeline directly using the Pipeline class with a version parameter:
from flowyml import Pipeline, step, context
ctx = context(learning_rate=0.001, epochs=10)
# Automatically creates a VersionedPipeline when version is provided
pipeline = Pipeline("training_pipeline", context=ctx, version="v1.0.1")
@step(outputs=["data"])
def load_data():
return load_from_source()
@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
return train_model(data, learning_rate, epochs)
pipeline.add_step(load_data)
pipeline.add_step(train)
# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})
Comparing Versions π
# Make changes to the pipeline
@step(inputs=["model"], outputs=["metrics"])
def evaluate(model):
return evaluate_model(model)
pipeline.add_step(evaluate)
pipeline.version = "v1.1.0"
pipeline.save_version(metadata={"description": "Added evaluation step"})
# Compare versions
diff = pipeline.compare_with("v1.0.0")
print(diff)
# Output:
# {
# 'current_version': 'v1.1.0',
# 'compared_to': 'v1.0.0',
# 'added_steps': ['evaluate'],
# 'removed_steps': [],
# 'modified_steps': [],
# 'step_order_changed': True,
# 'context_changes': {...}
# }
# Display comparison in readable format
pipeline.display_comparison("v1.0.0")
Listing Versions π
# Get all saved versions
versions = pipeline.list_versions()
print(versions) # ['v1.0.0', 'v1.1.0']
# Get specific version details
version_info = pipeline.get_version("v1.0.0")
print(version_info.steps)
print(version_info.created_at)
print(version_info.metadata)
Version Storage πΎ
Versions are stored as JSON files in .flowyml/versions/{pipeline_name}/ by default. You can customize the storage location:
Best Practices π‘
1. Use Semantic Versioning
2. Add Descriptive Metadata
pipeline.save_version(metadata={
"description": "Added data validation step",
"author": "data-team"impact": "improved data quality",
"breaking_changes": False
})
3. Compare Before Deploying
# Always compare with production version before deploying
if pipeline.version != "v1.0.0": # production version
diff = pipeline.compare_with("v1.0.0")
if diff['removed_steps'] or diff['modified_steps']:
print("β οΈ Breaking changes detected!")
pipeline.display_comparison("v1.0.0")
Advanced Features β‘
Hash-Based Change Detection
The versioning system uses content hashing to detect changes in step implementations:
# Same step name, different implementation
@step(outputs=["data"])
def load_data():
# Modified implementation
return load_from_new_source() # Changed!
pipeline.save_version()
# Will detect that load_data was modified
Context Parameter Tracking
Changes to context parameters are automatically tracked. You can pass context when creating the pipeline:
from flowyml import VersionedPipeline, context
ctx1 = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training", context=ctx1, version="v1.0.0")
pipeline.save_version()
# Change context
ctx2 = context(learning_rate=0.01, epochs=20) # Changed!
pipeline = VersionedPipeline("training", context=ctx2, version="v1.1.0")
pipeline.save_version()
diff = pipeline.compare_with("v1.0.0")
# Will show context_changes with added, removed, and modified parameters
Integration with CI/CD π
# In your CI pipeline
def verify_version_changes():
pipeline = VersionedPipeline.load("production_pipeline")
# Get current production version
prod_version = get_production_version()
# Compare
diff = pipeline.compare_with(prod_version)
# Enforce policies
if diff['removed_steps']:
raise ValueError("Cannot remove steps in minor version update")
if diff['modified_steps']:
# Require integration tests
run_integration_tests()
return diff
API Reference π
VersionedPipeline
Constructor:
VersionedPipeline(
name: str,
version: str = "v0.1.0",
versions_dir: str = ".flowyml/versions",
context: Context | None = None,
**kwargs # Additional Pipeline parameters (project_name, executor, etc.)
)
Methods:
- save_version(metadata: Optional[Dict] = None) - Save current version
- list_versions() -> List[str] - List all saved versions
- get_version(version: str) -> PipelineVersion - Get version details
- compare_with(other_version: str) -> Dict - Compare with another version
- display_comparison(other_version: str) - Pretty print comparison
- run(*args, **kwargs) - Run the pipeline (inherited from Pipeline)
PipelineVersion
Attributes:
- version: str - Version identifier
- pipeline_name: str - Pipeline name
- created_at: str - Creation timestamp
- steps: List[str] - List of step names
- step_hashes: Dict[str, str] - Step content hashes
- context_params: Dict - Context parameters
- metadata: Dict - Custom metadata