flowyml provides interactive debugging tools that let you pause pipelines, inspect state, and fix issues without restarting from scratch.
What you'll learn
How to debug pipelines like standard Python code. flowyml brings the "IDE experience" to distributed pipelines.
Why Interactive Debugging Matters π€
The old way (Airflow/Kubeflow):
1. Push code
2. Wait 10 mins for container build
3. Wait 20 mins for pipeline to fail
4. Read logs: KeyError: 'x'
5. Add print statement, repeat
The flowyml way:
1. Set a breakpoint: debugger.add_breakpoint(...)
2. Run pipeline locally
3. Execution pauses at the error
4. Inspect variables, fix code, resume
Debugging Strategies π―
| Strategy |
Tool |
Use When |
| Interactive |
StepDebugger |
You need to inspect variables mid-execution |
| Tracing |
PipelineDebugger |
You need to see the sequence of events and timing |
| Post-Mortem |
analyze_errors |
A run already failed and you want to know why |
Overview βΉοΈ
flowyml provides comprehensive debugging tools:
- StepDebugger: Debug individual steps with breakpoints and inspection
- PipelineDebugger: Debug entire pipelines with execution tracing
- Utility Functions: Quick debugging helpers
Step Debugging π
Basic Usage
| from flowyml import StepDebugger, step
@step(outputs=["result"])
def process_data(data):
return [x * 2 for x in data]
# Create debugger
debugger = StepDebugger(process_data)
# Set breakpoint
debugger.add_breakpoint(
condition=lambda inputs: len(inputs['data']) > 100,
action=lambda step, inputs: print(f"Large dataset: {len(inputs['data'])} items")
)
### Real-World Pattern: Conditional Breakpoint
Stop execution only when data looks weird.
```python
from flowyml import StepDebugger, step
@step
def process_data(batch):
# ... complex logic ...
return result
# Debug specific edge case
debugger = StepDebugger(process_data)
# "Stop if we get an empty batch"
debugger.add_breakpoint(
condition=lambda inputs: len(inputs['batch']) == 0,
action=lambda step, inputs: print(f"β οΈ Empty batch detected in {step.name}!")
)
# Run pipeline - it will pause ONLY if the condition is met
pipeline.run(debugger=debugger)
|
| # Inspect inputs before execution
debugger.inspect_inputs(data=[1, 2, 3])
# Output: {'data': [1, 2, 3]}
# Execute and inspect output
result = debugger.debug_execute(data=[1, 2, 3])
debugger.inspect_output()
# Output: [2, 4, 6]
|
Breakpoints
| # Conditional breakpoint
debugger.add_breakpoint(
condition=lambda inputs: inputs.get('threshold', 0) > 10,
action=lambda step, inputs: print(f"β οΈ High threshold: {inputs['threshold']}")
)
# Always-trigger breakpoint
debugger.add_breakpoint(
action=lambda step, inputs: print(f"Executing {step.name}")
)
# Remove breakpoints
debugger.clear_breakpoints()
|
Exception Debugging
| @step(outputs=["result"])
def failing_step(data):
if len(data) == 0:
raise ValueError("Empty data")
return data[0]
debugger = StepDebugger(failing_step)
try:
result = debugger.debug_execute(data=[])
except Exception as e:
# Get detailed traceback
debugger.debug_exception(e)
# Shows: exception type, message, inputs that caused it
|
Pipeline Debugging πΈοΈ
Tracing Execution
| from flowyml import PipelineDebugger, Pipeline
pipeline = Pipeline("my_pipeline")
pipeline.add_step(load)
pipeline.add_step(transform)
pipeline.add_step(save)
# Create debugger
debugger = PipelineDebugger(pipeline)
# Enable tracing
debugger.enable_tracing()
# Run with tracing
result = pipeline.run()
# View execution trace
trace = debugger.get_trace()
for entry in trace:
print(f"{entry['timestamp']}: {entry['step_name']} - {entry['event']}")
|
Profiling
| # Profile pipeline execution
debugger.enable_profiling()
result = pipeline.run()
# Get profile report
profile = debugger.get_profile()
for step_name, metrics in profile.items():
print(f"{step_name}:")
print(f" Duration: {metrics['duration_seconds']:.2f}s")
print(f" Memory: {metrics['memory_mb']:.1f}MB")
print(f" CPU: {metrics['cpu_percent']:.1f}%")
|
DAG Visualization
| # Visualize pipeline structure
debugger.visualize_dag(output_path="pipeline_dag.png")
# Or get DOT format
dot = debugger.get_dag_dot()
print(dot)
|
Error Analysis
| # After failed run
if not result.success:
analysis = debugger.analyze_errors(result)
print(f"Failed steps: {analysis['failed_steps']}")
print(f"Error types: {analysis['error_types']}")
print(f"Common patterns: {analysis['patterns']}")
|
Execution Replay
| # Replay a previous run
debugger.replay_execution(run_id="abc123")
# Replay with modifications
debugger.replay_execution(
run_id="abc123",
override_inputs={"step1": {"new_param": "value"}}
)
|
Quick Debugging Utilities π οΈ
debug_step
| from flowyml.utils.debug import debug_step
@step(outputs=["result"])
def my_step(data):
return process(data)
# Quick debug - prints inputs/outputs
result = debug_step(my_step, data=[1, 2, 3])
# Output:
# π Debugging: my_step
# π₯ Inputs: {'data': [1, 2, 3]}
# π€ Output: [2, 4, 6]
# β±οΈ Duration: 0.001s
|
trace_step
| from flowyml.utils.debug import trace_step
# Trace step execution
@trace_step
@step(outputs=["result"])
def traced_step(data):
return process(data)
# Automatically prints:
# β Entering traced_step
# β Exiting traced_step (0.001s)
|
profile_step
| from flowyml.utils.debug import profile_step
# Profile step performance
stats = profile_step(my_step, data=[1, 2, 3])
print(f"Execution time: {stats['time']:.3f}s")
print(f"Memory used: {stats['memory_mb']:.1f}MB")
|
Best Practices π‘
1. Use Conditional Breakpoints
| # Only break on interesting cases
debugger.add_breakpoint(
condition=lambda inputs: inputs['value'] < 0,
action=lambda step, inputs: print(f"β οΈ Negative value: {inputs['value']}")
)
|
2. Enable Tracing for Production Issues
| # Enable tracing when issues occur
if environment == "production":
if detect_anomaly():
debugger.enable_tracing()
result = pipeline.run()
trace = debugger.get_trace()
send_to_monitoring(trace)
|
3. Profile Before Optimization
| # Identify bottlenecks
debugger.enable_profiling()
result = pipeline.run()
profile = debugger.get_profile()
slowest = max(profile.items(), key=lambda x: x[1]['duration_seconds'])
print(f"Bottleneck: {slowest[0]} ({slowest[1]['duration_seconds']:.2f}s)")
|
4. Combine with Logging
| import logging
logger = logging.getLogger(__name__)
debugger.add_breakpoint(
action=lambda step, inputs: logger.info(
f"Executing {step.name} with {len(inputs)} inputs"
)
)
|
Advanced Usage β‘
Custom Breakpoint Actions
| def save_snapshot(step, inputs):
"""Save inputs to file for later analysis"""
import pickle
with open(f"snapshot_{step.name}.pkl", "wb") as f:
pickle.dump(inputs, f)
print(f"πΎ Saved snapshot for {step.name}")
debugger.add_breakpoint(
condition=lambda inputs: should_save(inputs),
action=save_snapshot
)
|
Programmatic Error Handling
| class DebugHandler:
def __init__(self):
self.errors = []
def handle_error(self, step, inputs, exception):
self.errors.append({
'step': step.name,
'inputs': inputs,
'error': str(exception)
})
# Send alert
alert_team(f"Error in {step.name}: {exception}")
handler = DebugHandler()
debugger.add_breakpoint(
action=lambda step, inputs: handler.handle_error(step, inputs, None)
)
|
API Reference π
StepDebugger
Methods:
- debug_execute(**inputs) -> Any - Execute with debugging
- inspect_inputs(**inputs) -> Dict - Inspect inputs
- inspect_output() -> Any - Inspect last output
- add_breakpoint(action, condition=None) - Add breakpoint
- clear_breakpoints() - Remove all breakpoints
- debug_exception(exception) - Analyze exception
PipelineDebugger
| PipelineDebugger(pipeline: Pipeline)
|
Methods:
- enable_tracing() - Enable execution tracing
- enable_profiling() - Enable performance profiling
- get_trace() -> List[Dict] - Get execution trace
- get_profile() -> Dict - Get performance profile
- visualize_dag(output_path: str) - Visualize pipeline
- analyze_errors(result) -> Dict - Analyze failed run
- replay_execution(run_id, override_inputs=None) - Replay run
Utility Functions
debug_step(step, inputs)**:
- Executes step with detailed logging
- Returns: step output
trace_step(func):
- Decorator for automatic tracing
- Prints entry/exit with timing
profile_step(step, inputs)**:
- Profiles step execution
- Returns: Dict with timing and memory stats