Skip to content

Pipeline Versioning πŸ”„

Track changes to your pipelines like you track code with Git.

[!NOTE] What you'll learn: How to version your pipelines and compare changes over time

Key insight: "What changed between v1 and v2?" is a question you'll ask every week. Versioning answers it instantly.

Why Versioning Matters

Without versioning: - No audit trail: "Who changed the data loader last month?" - Risky deployments: You don't know what changed - Lost history: Can't rollback to a working version

With flowyml versioning: - Full history: See exactly what changed and when - Safe releases: Compare new version with production before deploying - Easy rollback: Restore a previous version instantly

When to Version

Scenario Action
Before deploying to production Always save a version
After adding/removing steps Save with descriptive metadata
Major logic changes Bump version number (v1.0 β†’ v2.0)
Daily experiments Not necessaryβ€”only version what you'll deploy

Overview ℹ️

The VersionedPipeline class provides: - Save pipeline versions with metadata - Compare versions to see what changed - Track pipeline evolution over time - Maintain a history of pipeline configurations

Basic Usage πŸš€

from flowyml import VersionedPipeline, step, context

# Create a versioned pipeline with context
ctx = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training_pipeline", context=ctx, version="v1.0.0")

# Add steps
@step(outputs=["data"])
def load_data():
    return load_from_source()

@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
    return train_model(data, learning_rate, epochs)

pipeline.add_step(load_data)
pipeline.add_step(train)

# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})

Using Pipeline with Version Parameter

You can also create a versioned pipeline directly using the Pipeline class with a version parameter:

from flowyml import Pipeline, step, context

ctx = context(learning_rate=0.001, epochs=10)

# Automatically creates a VersionedPipeline when version is provided
pipeline = Pipeline("training_pipeline", context=ctx, version="v1.0.1")

@step(outputs=["data"])
def load_data():
    return load_from_source()

@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
    return train_model(data, learning_rate, epochs)

pipeline.add_step(load_data)
pipeline.add_step(train)

# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})

Comparing Versions πŸ”

# Make changes to the pipeline
@step(inputs=["model"], outputs=["metrics"])
def evaluate(model):
    return evaluate_model(model)

pipeline.add_step(evaluate)
pipeline.version = "v1.1.0"
pipeline.save_version(metadata={"description": "Added evaluation step"})

# Compare versions
diff = pipeline.compare_with("v1.0.0")
print(diff)
# Output:
# {
#   'current_version': 'v1.1.0',
#   'compared_to': 'v1.0.0',
#   'added_steps': ['evaluate'],
#   'removed_steps': [],
#   'modified_steps': [],
#   'step_order_changed': True,
#   'context_changes': {...}
# }

# Display comparison in readable format
pipeline.display_comparison("v1.0.0")

Listing Versions πŸ“‹

# Get all saved versions
versions = pipeline.list_versions()
print(versions)  # ['v1.0.0', 'v1.1.0']

# Get specific version details
version_info = pipeline.get_version("v1.0.0")
print(version_info.steps)
print(version_info.created_at)
print(version_info.metadata)

Version Storage πŸ’Ύ

Versions are stored as JSON files in .flowyml/versions/{pipeline_name}/ by default. You can customize the storage location:

pipeline = VersionedPipeline(
    "my_pipeline",
    versions_dir="/custom/path/versions"
)

Best Practices πŸ’‘

1. Use Semantic Versioning

pipeline.version = "v1.0.0"  # Major.Minor.Patch

2. Add Descriptive Metadata

pipeline.save_version(metadata={
    "description": "Added data validation step",
    "author": "data-team"impact": "improved data quality",
    "breaking_changes": False
})

3. Compare Before Deploying

# Always compare with production version before deploying
if pipeline.version != "v1.0.0":  # production version
    diff = pipeline.compare_with("v1.0.0")
    if diff['removed_steps'] or diff['modified_steps']:
        print("⚠️ Breaking changes detected!")
        pipeline.display_comparison("v1.0.0")

Advanced Features ⚑

Hash-Based Change Detection

The versioning system uses content hashing to detect changes in step implementations:

# Same step name, different implementation
@step(outputs=["data"])
def load_data():
    # Modified implementation
    return load_from_new_source()  # Changed!

pipeline.save_version()
# Will detect that load_data was modified

Context Parameter Tracking

Changes to context parameters are automatically tracked. You can pass context when creating the pipeline:

from flowyml import VersionedPipeline, context

ctx1 = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training", context=ctx1, version="v1.0.0")
pipeline.save_version()

# Change context
ctx2 = context(learning_rate=0.01, epochs=20)  # Changed!
pipeline = VersionedPipeline("training", context=ctx2, version="v1.1.0")
pipeline.save_version()

diff = pipeline.compare_with("v1.0.0")
# Will show context_changes with added, removed, and modified parameters

Integration with CI/CD πŸš€

# In your CI pipeline
def verify_version_changes():
    pipeline = VersionedPipeline.load("production_pipeline")

    # Get current production version
    prod_version = get_production_version()

    # Compare
    diff = pipeline.compare_with(prod_version)

    # Enforce policies
    if diff['removed_steps']:
        raise ValueError("Cannot remove steps in minor version update")

    if diff['modified_steps']:
        # Require integration tests
        run_integration_tests()

    return diff

API Reference πŸ“š

VersionedPipeline

Constructor:

VersionedPipeline(
    name: str,
    version: str = "v0.1.0",
    versions_dir: str = ".flowyml/versions",
    context: Context | None = None,
    **kwargs  # Additional Pipeline parameters (project_name, executor, etc.)
)

Methods: - save_version(metadata: Optional[Dict] = None) - Save current version - list_versions() -> List[str] - List all saved versions - get_version(version: str) -> PipelineVersion - Get version details - compare_with(other_version: str) -> Dict - Compare with another version - display_comparison(other_version: str) - Pretty print comparison - run(*args, **kwargs) - Run the pipeline (inherited from Pipeline)

PipelineVersion

Attributes: - version: str - Version identifier - pipeline_name: str - Pipeline name - created_at: str - Creation timestamp - steps: List[str] - List of step names - step_hashes: Dict[str, str] - Step content hashes - context_params: Dict - Context parameters - metadata: Dict - Custom metadata