Skip to content

Pipeline Versioning πŸ”„

Track changes to your pipelines like you track code with Git.

What you'll learn

How to version your pipelines and compare changes over time. "What changed between v1 and v2?" is a question you'll ask every week β€” versioning answers it instantly.

Why Versioning Matters

Without versioning: - No audit trail: "Who changed the data loader last month?" - Risky deployments: You don't know what changed - Lost history: Can't rollback to a working version

With flowyml versioning: - Full history: See exactly what changed and when - Safe releases: Compare new version with production before deploying - Easy rollback: Restore a previous version instantly

When to Version

Scenario Action
Before deploying to production Always save a version
After adding/removing steps Save with descriptive metadata
Major logic changes Bump version number (v1.0 β†’ v2.0)
Daily experiments Not necessaryβ€”only version what you'll deploy

Overview ℹ️

The VersionedPipeline class provides: - Save pipeline versions with metadata - Compare versions to see what changed - Track pipeline evolution over time - Maintain a history of pipeline configurations

Basic Usage πŸš€

from flowyml import VersionedPipeline, step, context

# Create a versioned pipeline with context
ctx = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training_pipeline", context=ctx, version="v1.0.0")

# Add steps
@step(outputs=["data"])
def load_data():
    return load_from_source()

@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
    return train_model(data, learning_rate, epochs)

pipeline.add_step(load_data)
pipeline.add_step(train)

# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})

Using Pipeline with Version Parameter

You can also create a versioned pipeline directly using the Pipeline class with a version parameter:

from flowyml import Pipeline, step, context

ctx = context(learning_rate=0.001, epochs=10)

# Automatically creates a VersionedPipeline when version is provided
pipeline = Pipeline("training_pipeline", context=ctx, version="v1.0.1")

@step(outputs=["data"])
def load_data():
    return load_from_source()

@step(inputs=["data"], outputs=["model"])
def train(data, learning_rate: float, epochs: int):
    return train_model(data, learning_rate, epochs)

pipeline.add_step(load_data)
pipeline.add_step(train)

# Save the version
pipeline.save_version(metadata={"description": "Initial training pipeline"})

Comparing Versions πŸ”

# Make changes to the pipeline
@step(inputs=["model"], outputs=["metrics"])
def evaluate(model):
    return evaluate_model(model)

pipeline.add_step(evaluate)
pipeline.version = "v1.1.0"
pipeline.save_version(metadata={"description": "Added evaluation step"})

# Compare versions
diff = pipeline.compare_with("v1.0.0")
print(diff)
# Output:
# {
#   'current_version': 'v1.1.0',
#   'compared_to': 'v1.0.0',
#   'added_steps': ['evaluate'],
#   'removed_steps': [],
#   'modified_steps': [],
#   'step_order_changed': True,
#   'context_changes': {...}
# }

# Display comparison in readable format
pipeline.display_comparison("v1.0.0")

Listing Versions πŸ“‹

1
2
3
4
5
6
7
8
9
# Get all saved versions
versions = pipeline.list_versions()
print(versions)  # ['v1.0.0', 'v1.1.0']

# Get specific version details
version_info = pipeline.get_version("v1.0.0")
print(version_info.steps)
print(version_info.created_at)
print(version_info.metadata)

Version Storage πŸ’Ύ

Versions are stored as JSON files in .flowyml/versions/{pipeline_name}/ by default. You can customize the storage location:

1
2
3
4
pipeline = VersionedPipeline(
    "my_pipeline",
    versions_dir="/custom/path/versions"
)

Best Practices πŸ’‘

1. Use Semantic Versioning

pipeline.version = "v1.0.0"  # Major.Minor.Patch

2. Add Descriptive Metadata

1
2
3
4
5
pipeline.save_version(metadata={
    "description": "Added data validation step",
    "author": "data-team"impact": "improved data quality",
    "breaking_changes": False
})

3. Compare Before Deploying

1
2
3
4
5
6
# Always compare with production version before deploying
if pipeline.version != "v1.0.0":  # production version
    diff = pipeline.compare_with("v1.0.0")
    if diff['removed_steps'] or diff['modified_steps']:
        print("⚠️ Breaking changes detected!")
        pipeline.display_comparison("v1.0.0")

Advanced Features ⚑

Hash-Based Change Detection

The versioning system uses content hashing to detect changes in step implementations:

1
2
3
4
5
6
7
8
# Same step name, different implementation
@step(outputs=["data"])
def load_data():
    # Modified implementation
    return load_from_new_source()  # Changed!

pipeline.save_version()
# Will detect that load_data was modified

Context Parameter Tracking

Changes to context parameters are automatically tracked. You can pass context when creating the pipeline:

from flowyml import VersionedPipeline, context

ctx1 = context(learning_rate=0.001, epochs=10)
pipeline = VersionedPipeline("training", context=ctx1, version="v1.0.0")
pipeline.save_version()

# Change context
ctx2 = context(learning_rate=0.01, epochs=20)  # Changed!
pipeline = VersionedPipeline("training", context=ctx2, version="v1.1.0")
pipeline.save_version()

diff = pipeline.compare_with("v1.0.0")
# Will show context_changes with added, removed, and modified parameters

Integration with CI/CD πŸš€

# In your CI pipeline
def verify_version_changes():
    pipeline = VersionedPipeline.load("production_pipeline")

    # Get current production version
    prod_version = get_production_version()

    # Compare
    diff = pipeline.compare_with(prod_version)

    # Enforce policies
    if diff['removed_steps']:
        raise ValueError("Cannot remove steps in minor version update")

    if diff['modified_steps']:
        # Require integration tests
        run_integration_tests()

    return diff

API Reference πŸ“š

VersionedPipeline

Constructor:

1
2
3
4
5
6
7
VersionedPipeline(
    name: str,
    version: str = "v0.1.0",
    versions_dir: str = ".flowyml/versions",
    context: Context | None = None,
    **kwargs  # Additional Pipeline parameters (project_name, executor, etc.)
)

Methods: - save_version(metadata: Optional[Dict] = None) - Save current version - list_versions() -> List[str] - List all saved versions - get_version(version: str) -> PipelineVersion - Get version details - compare_with(other_version: str) -> Dict - Compare with another version - display_comparison(other_version: str) - Pretty print comparison - run(*args, **kwargs) - Run the pipeline (inherited from Pipeline)

PipelineVersion

Attributes: - version: str - Version identifier - pipeline_name: str - Pipeline name - created_at: str - Creation timestamp - steps: List[str] - List of step names - step_hashes: Dict[str, str] - Step content hashes - context_params: Dict - Context parameters - metadata: Dict - Custom metadata