Skip to content

Caching โšก

flowyml's intelligent caching system eliminates redundant work, saving you time and compute costs.

[!NOTE] What you'll learn: How to skip expensive steps that have already run

Key insight: The fastest code is the code you don't run. flowyml automatically detects when inputs and code haven't changed.

Why Caching Matters

Without caching: - Wasted time: Re-running data loading (10 mins) just to fix a typo in plotting - Wasted money: Retraining models (GPU hours) when only the evaluation metric changed - Slow iteration: Feedback loops take hours instead of seconds

With flowyml caching: - Instant feedback: Skip straight to the step you're working on - Cost savings: Reduce cloud compute bills by 40-60% - Resume capability: Crash in step 5? Fix it and resume instantly; steps 1-4 are cached

How Caching Works

flowyml calculates a Cache Key for every step before it runs. If a matching key exists, the step is skipped.

The key combines: 1. Code Hash: The source code of the function 2. Input Hash: The values of all input arguments 3. Configuration: Parameters injected from context

graph TD
    A[Start Step] --> B{Cache Key Exists?}
    B -- Yes --> C[Load Output from Disk]
    B -- No --> D[Execute Step]
    D --> E[Save Output to Disk]
    C --> F[Return Result]
    E --> F

Configuration โš™๏ธ

Pipeline-Level Caching

Enable or disable caching for an entire pipeline:

from flowyml import Pipeline

# Enable caching (default)
pipeline = Pipeline("my_pipeline", enable_cache=True)

# Disable caching
pipeline = Pipeline("my_pipeline", enable_cache=False)

# Custom cache directory
pipeline = Pipeline("my_pipeline", cache_dir="./custom_cache")

Step-Level Caching

Control caching for individual steps:

from flowyml import step

# Use default caching (code_hash)
@step(outputs=["result"])
def cached_step(data):
    return expensive_computation(data)

# Disable caching for this step
@step(cache=False, outputs=["fresh_data"])
def always_run():
    return fetch_latest_data()

Decision Guide: Caching Strategies

Strategy Behavior Use When
code_hash (Default) Invalidate if code OR inputs change Development: You're tweaking logic and need safety.
input_hash Invalidate ONLY if inputs change Production/Training: Code is stable, but you're tuning hyperparameters.
cache=False Always run Side Effects: Sending emails, writing to DB, fetching live data.

1. cache="code_hash" (Safest)

The default. If you change anything in the function (even a comment), it re-runs.

@step(cache="code_hash")
def process(data):
    # Changing this comment triggers a re-run!
    return [x * 2 for x in data]

2. cache="input_hash" (Fastest)

Ignores code changes. Only re-runs if the data passed to it changes.

@step(cache="input_hash")
def train_model(dataset, epochs):
    # You can refactor this code without triggering a 4-hour training run
    # It only re-runs if 'dataset' or 'epochs' change
    return model.fit(dataset, epochs=epochs)

[!WARNING] Use input_hash carefully! If you change the logic (e.g., fix a bug) but inputs stay the same, flowyml won't know to re-run it. You'll get the old, buggy result.

3. cache=False (Side Effects)

For steps that must always run.

@step(cache=False)
def send_slack_alert(metrics):
    # Always send the alert, even if metrics haven't changed
    slack.send(f"Accuracy: {metrics['accuracy']}")

Practical Examples

Example 1: Data Pipeline with Mixed Caching

from flowyml import Pipeline, step

@step(cache="code_hash", outputs=["raw"])
def load_data(file_path: str):
    """Cache invalidated if code OR file_path changes."""
    return pd.read_csv(file_path)

@step(inputs=["raw"], cache="input_hash", outputs=["clean"])
def clean_data(raw):
    """Cache invalidated only if raw data changes."""
    return raw.dropna().reset_index(drop=True)

@step(inputs=["clean"], cache=False, outputs=["result"])
def upload_to_db(clean):
    """Never cached - always uploads."""
    database.save(clean)
    return {"uploaded": len(clean)}

pipeline = Pipeline("etl", enable_cache=True)
pipeline.add_step(load_data)
pipeline.add_step(clean_data)
pipeline.add_step(upload_to_db)

Example 2: Machine Learning Training

@step(cache="code_hash", outputs=["dataset"])
def prepare_dataset(data_path: str, test_split: float):
    """Recompute if code or parameters change."""
    return load_and_split(data_path, test_split)

@step(inputs=["dataset"], cache="input_hash", outputs=["model"])
def train_model(dataset, learning_rate: float, epochs: int):
    """Expensive training - cache based on inputs only."""
    # Training logic doesn't change often, but we try different hyperparameters
    model = Model()
    model.fit(dataset, lr=learning_rate, epochs=epochs)
    return model

@step(inputs=["model"], cache=False, outputs=["metrics"])
def evaluate_fresh(model):
    """Always evaluate to get latest metrics."""
    return model.evaluate(test_data)

Example 3: Debugging

# During development
pipeline = Pipeline("debug_pipeline", enable_cache=False)
# All steps run fresh every time

# After development
pipeline = Pipeline("prod_pipeline", enable_cache=True)
# Benefit from caching

Cache Performance ๐Ÿ“Š

Cache Hits Save Time

import time

@step(cache="code_hash")
def expensive_step():
    time.sleep(5)  # Simulate expensive computation
    return "result"

pipeline = Pipeline("perf_test", enable_cache=True)
pipeline.add_step(expensive_step)

# First run: 5+ seconds
result1 = pipeline.run()  # โฑ๏ธ 5.2s

# Second run: milliseconds!
result2 = pipeline.run()  # โฑ๏ธ 0.05s โšก (cached)

Monitoring Cache Usage

Check which steps were cached:

result = pipeline.run()

for step_name, step_result in result.step_results.items():
    status = "CACHED" if step_result.cached else "EXECUTED"
    print(f"{step_name}: {status} ({step_result.duration_seconds:.2f}s)")

# Output:
# load_data: CACHED (0.01s)
# process: EXECUTED (2.34s)
# train: CACHED (0.02s)

Cache Storage ๐Ÿ’พ

Default Location

By default, cache is stored in .flowyml/cache:

.flowyml/
โ”œโ”€โ”€ cache/
โ”‚   โ”œโ”€โ”€ cache.db       # SQLite database tracking cache entries
โ”‚   โ””โ”€โ”€ objects/       # Pickled cached objects
โ”‚       โ”œโ”€โ”€ abc123.pkl
โ”‚       โ””โ”€โ”€ def456.pkl

Custom Cache Directory

pipeline = Pipeline(
    "my_pipeline",
    enable_cache=True,
    cache_dir="/path/to/custom/cache"
)

Cache Entry Structure

Each cache entry contains: - Key: Hash of code + inputs + step name - Value: Serialized step output - Metadata: Timestamp, step name, code hash - TTL: Optional time-to-live

Managing the Cache ๐Ÿงน

Clear Cache via Code

from flowyml.core.cache import CacheStore

cache = CacheStore(".flowyml/cache")

# Clear all cache
cache.clear()

# Clear specific step
cache.clear_step("my_step_name")

Cache Invalidation Patterns

Manual Invalidation

# Before running, clear cache for specific step
cache.clear_step("data_loading")

# Then run pipeline
result = pipeline.run()

Automatic Invalidation

Cache is automatically invalidated when: - Function code changes (for code_hash strategy) - Input values change - Dependencies are updated

Time-Based Invalidation

Implement custom TTL:

from datetime import datetime, timedelta

@step
def load_with_ttl():
    # Custom logic to check cache age
    cache_age = get_cache_age("load_with_ttl")
    if cache_age > timedelta(hours=24):
        # Force re-execution
        return fetch_fresh_data()

Debugging Cache Issues ๐Ÿ”ง

Verify Cache is Working

# Run with debug output
result = pipeline.run(debug=True)

# Check cache status for each step
for step_name, step_result in result.step_results.items():
    if step_result.cached:
        print(f"โœ“ {step_name} was cached")
    else:
        print(f"โš  {step_name} was executed (not cached)")

Common Cache Issues

Issue: Cache Not Hitting When Expected

Causes: - Input values changed slightly (floating point precision) - Code changed (even whitespace with code_hash) - Cache was cleared - Different function object (re-imported module)

Solutions:

# Use input_hash if code changes shouldn't matter
@step(cache="input_hash")

# Round inputs to avoid floating point issues
@step
def process(value: float):
    value = round(value, 6)  # Normalize precision
    ...

Issue: Stale Cache

Cause: Using input_hash but logic changed

Solution:

# Temporarily disable cache to force refresh
@step(cache=False)  # Remove after verified
def updated_logic(data):
    # New implementation
    ...

Or manually clear:

cache.clear_step("updated_logic")

When to Use Each Strategy

Strategy Development Production Stable Logic Changing Logic Side Effects
code_hash (default) โœ… Best โœ… Safe โœ… Good โœ… Best โš ๏ธ Use with care
input_hash โš ๏ธ Risky โœ… Efficient โœ… Best โŒ Dangerous โš ๏ธ Use with care
False โœ… Debugging โœ… Real-time โŒ Wasteful โœ… Fine โœ… Required

Best Practices ๐Ÿ’ก

1. Enable Caching in Development

# โœ… Good - iterates faster
pipeline = Pipeline("dev", enable_cache=True)

# Changes to unchanged steps don't re-run

2. Be Careful with Side Effects

# โŒ Bad - side effect might be cached!
@step
def send_email(results):
    email_service.send(results)  # Might not run if cached!
    return "sent"

# โœ… Good - disable cache for side effects
@step(cache=False)
def send_email(results):
    email_service.send(results)  # Always runs
    return "sent"

3. Use Input Hash for Stable, Expensive Operations

# โœ… Good - training is expensive and logic is stable
@step(cache="input_hash")
def train_model(data, hyperparams):
    # 2 hours of training
    model = expensive_training(data, hyperparams)
    return model

4. Clear Cache Between Major Changes

# When refactoring or major updates
cache_store.clear()

# Or via CLI
# flowyml cache clear

5. Monitor Cache Hit Rates

result = pipeline.run()

total_steps = len(result.step_results)
cached_steps = sum(1 for r in result.step_results.values() if r.cached)
hit_rate = cached_steps / total_steps * 100

print(f"Cache hit rate: {hit_rate:.1f}%")
# Target: > 50% in development, 20-40% in production

Advanced: Custom Cache Keys

For complete control, provide a custom cache key function:

def my_cache_key(step, inputs, context):
    # Custom logic to generate cache key
    return f"{step.name}:{hash(str(inputs))}"

@step(cache=my_cache_key)
def custom_cached_step(data):
    ...

Next Steps ๐Ÿ“š