Caching โก
flowyml's intelligent caching system eliminates redundant work, saving you time and compute costs.
[!NOTE] What you'll learn: How to skip expensive steps that have already run
Key insight: The fastest code is the code you don't run. flowyml automatically detects when inputs and code haven't changed.
Why Caching Matters
Without caching: - Wasted time: Re-running data loading (10 mins) just to fix a typo in plotting - Wasted money: Retraining models (GPU hours) when only the evaluation metric changed - Slow iteration: Feedback loops take hours instead of seconds
With flowyml caching: - Instant feedback: Skip straight to the step you're working on - Cost savings: Reduce cloud compute bills by 40-60% - Resume capability: Crash in step 5? Fix it and resume instantly; steps 1-4 are cached
How Caching Works
flowyml calculates a Cache Key for every step before it runs. If a matching key exists, the step is skipped.
The key combines: 1. Code Hash: The source code of the function 2. Input Hash: The values of all input arguments 3. Configuration: Parameters injected from context
graph TD
A[Start Step] --> B{Cache Key Exists?}
B -- Yes --> C[Load Output from Disk]
B -- No --> D[Execute Step]
D --> E[Save Output to Disk]
C --> F[Return Result]
E --> F
Configuration โ๏ธ
Pipeline-Level Caching
Enable or disable caching for an entire pipeline:
from flowyml import Pipeline
# Enable caching (default)
pipeline = Pipeline("my_pipeline", enable_cache=True)
# Disable caching
pipeline = Pipeline("my_pipeline", enable_cache=False)
# Custom cache directory
pipeline = Pipeline("my_pipeline", cache_dir="./custom_cache")
Step-Level Caching
Control caching for individual steps:
from flowyml import step
# Use default caching (code_hash)
@step(outputs=["result"])
def cached_step(data):
return expensive_computation(data)
# Disable caching for this step
@step(cache=False, outputs=["fresh_data"])
def always_run():
return fetch_latest_data()
Decision Guide: Caching Strategies
| Strategy | Behavior | Use When |
|---|---|---|
code_hash (Default) |
Invalidate if code OR inputs change | Development: You're tweaking logic and need safety. |
input_hash |
Invalidate ONLY if inputs change | Production/Training: Code is stable, but you're tuning hyperparameters. |
cache=False |
Always run | Side Effects: Sending emails, writing to DB, fetching live data. |
1. cache="code_hash" (Safest)
The default. If you change anything in the function (even a comment), it re-runs.
@step(cache="code_hash")
def process(data):
# Changing this comment triggers a re-run!
return [x * 2 for x in data]
2. cache="input_hash" (Fastest)
Ignores code changes. Only re-runs if the data passed to it changes.
@step(cache="input_hash")
def train_model(dataset, epochs):
# You can refactor this code without triggering a 4-hour training run
# It only re-runs if 'dataset' or 'epochs' change
return model.fit(dataset, epochs=epochs)
[!WARNING] Use
input_hashcarefully! If you change the logic (e.g., fix a bug) but inputs stay the same, flowyml won't know to re-run it. You'll get the old, buggy result.
3. cache=False (Side Effects)
For steps that must always run.
@step(cache=False)
def send_slack_alert(metrics):
# Always send the alert, even if metrics haven't changed
slack.send(f"Accuracy: {metrics['accuracy']}")
Practical Examples
Example 1: Data Pipeline with Mixed Caching
from flowyml import Pipeline, step
@step(cache="code_hash", outputs=["raw"])
def load_data(file_path: str):
"""Cache invalidated if code OR file_path changes."""
return pd.read_csv(file_path)
@step(inputs=["raw"], cache="input_hash", outputs=["clean"])
def clean_data(raw):
"""Cache invalidated only if raw data changes."""
return raw.dropna().reset_index(drop=True)
@step(inputs=["clean"], cache=False, outputs=["result"])
def upload_to_db(clean):
"""Never cached - always uploads."""
database.save(clean)
return {"uploaded": len(clean)}
pipeline = Pipeline("etl", enable_cache=True)
pipeline.add_step(load_data)
pipeline.add_step(clean_data)
pipeline.add_step(upload_to_db)
Example 2: Machine Learning Training
@step(cache="code_hash", outputs=["dataset"])
def prepare_dataset(data_path: str, test_split: float):
"""Recompute if code or parameters change."""
return load_and_split(data_path, test_split)
@step(inputs=["dataset"], cache="input_hash", outputs=["model"])
def train_model(dataset, learning_rate: float, epochs: int):
"""Expensive training - cache based on inputs only."""
# Training logic doesn't change often, but we try different hyperparameters
model = Model()
model.fit(dataset, lr=learning_rate, epochs=epochs)
return model
@step(inputs=["model"], cache=False, outputs=["metrics"])
def evaluate_fresh(model):
"""Always evaluate to get latest metrics."""
return model.evaluate(test_data)
Example 3: Debugging
# During development
pipeline = Pipeline("debug_pipeline", enable_cache=False)
# All steps run fresh every time
# After development
pipeline = Pipeline("prod_pipeline", enable_cache=True)
# Benefit from caching
Cache Performance ๐
Cache Hits Save Time
import time
@step(cache="code_hash")
def expensive_step():
time.sleep(5) # Simulate expensive computation
return "result"
pipeline = Pipeline("perf_test", enable_cache=True)
pipeline.add_step(expensive_step)
# First run: 5+ seconds
result1 = pipeline.run() # โฑ๏ธ 5.2s
# Second run: milliseconds!
result2 = pipeline.run() # โฑ๏ธ 0.05s โก (cached)
Monitoring Cache Usage
Check which steps were cached:
result = pipeline.run()
for step_name, step_result in result.step_results.items():
status = "CACHED" if step_result.cached else "EXECUTED"
print(f"{step_name}: {status} ({step_result.duration_seconds:.2f}s)")
# Output:
# load_data: CACHED (0.01s)
# process: EXECUTED (2.34s)
# train: CACHED (0.02s)
Cache Storage ๐พ
Default Location
By default, cache is stored in .flowyml/cache:
.flowyml/
โโโ cache/
โ โโโ cache.db # SQLite database tracking cache entries
โ โโโ objects/ # Pickled cached objects
โ โโโ abc123.pkl
โ โโโ def456.pkl
Custom Cache Directory
Cache Entry Structure
Each cache entry contains: - Key: Hash of code + inputs + step name - Value: Serialized step output - Metadata: Timestamp, step name, code hash - TTL: Optional time-to-live
Managing the Cache ๐งน
Clear Cache via Code
from flowyml.core.cache import CacheStore
cache = CacheStore(".flowyml/cache")
# Clear all cache
cache.clear()
# Clear specific step
cache.clear_step("my_step_name")
Cache Invalidation Patterns
Manual Invalidation
# Before running, clear cache for specific step
cache.clear_step("data_loading")
# Then run pipeline
result = pipeline.run()
Automatic Invalidation
Cache is automatically invalidated when:
- Function code changes (for code_hash strategy)
- Input values change
- Dependencies are updated
Time-Based Invalidation
Implement custom TTL:
from datetime import datetime, timedelta
@step
def load_with_ttl():
# Custom logic to check cache age
cache_age = get_cache_age("load_with_ttl")
if cache_age > timedelta(hours=24):
# Force re-execution
return fetch_fresh_data()
Debugging Cache Issues ๐ง
Verify Cache is Working
# Run with debug output
result = pipeline.run(debug=True)
# Check cache status for each step
for step_name, step_result in result.step_results.items():
if step_result.cached:
print(f"โ {step_name} was cached")
else:
print(f"โ {step_name} was executed (not cached)")
Common Cache Issues
Issue: Cache Not Hitting When Expected
Causes:
- Input values changed slightly (floating point precision)
- Code changed (even whitespace with code_hash)
- Cache was cleared
- Different function object (re-imported module)
Solutions:
# Use input_hash if code changes shouldn't matter
@step(cache="input_hash")
# Round inputs to avoid floating point issues
@step
def process(value: float):
value = round(value, 6) # Normalize precision
...
Issue: Stale Cache
Cause: Using input_hash but logic changed
Solution:
# Temporarily disable cache to force refresh
@step(cache=False) # Remove after verified
def updated_logic(data):
# New implementation
...
Or manually clear:
When to Use Each Strategy
| Strategy | Development | Production | Stable Logic | Changing Logic | Side Effects |
|---|---|---|---|---|---|
code_hash (default) |
โ Best | โ Safe | โ Good | โ Best | โ ๏ธ Use with care |
input_hash |
โ ๏ธ Risky | โ Efficient | โ Best | โ Dangerous | โ ๏ธ Use with care |
False |
โ Debugging | โ Real-time | โ Wasteful | โ Fine | โ Required |
Best Practices ๐ก
1. Enable Caching in Development
# โ
Good - iterates faster
pipeline = Pipeline("dev", enable_cache=True)
# Changes to unchanged steps don't re-run
2. Be Careful with Side Effects
# โ Bad - side effect might be cached!
@step
def send_email(results):
email_service.send(results) # Might not run if cached!
return "sent"
# โ
Good - disable cache for side effects
@step(cache=False)
def send_email(results):
email_service.send(results) # Always runs
return "sent"
3. Use Input Hash for Stable, Expensive Operations
# โ
Good - training is expensive and logic is stable
@step(cache="input_hash")
def train_model(data, hyperparams):
# 2 hours of training
model = expensive_training(data, hyperparams)
return model
4. Clear Cache Between Major Changes
5. Monitor Cache Hit Rates
result = pipeline.run()
total_steps = len(result.step_results)
cached_steps = sum(1 for r in result.step_results.values() if r.cached)
hit_rate = cached_steps / total_steps * 100
print(f"Cache hit rate: {hit_rate:.1f}%")
# Target: > 50% in development, 20-40% in production
Advanced: Custom Cache Keys
For complete control, provide a custom cache key function:
def my_cache_key(step, inputs, context):
# Custom logic to generate cache key
return f"{step.name}:{hash(str(inputs))}"
@step(cache=my_cache_key)
def custom_cached_step(data):
...
Next Steps ๐
- Steps: Configure step caching strategies
- Pipelines: Enable caching at pipeline level
- Configuration: Set cache directory in config