Skip to content

⚑ Advanced Features β€” Feature Encyclopedia

FlowyML isn't just for building DAGs; it's an enterprise-grade platform designed to handle the complexities of production machine learning. This guide is your gateway to every advanced capability.

πŸ—ΊοΈ Master Every Feature

From intelligent caching to GenAI observability β€” these are the features that set FlowyML apart from every other ML framework.


πŸ“š Feature Index

⚑ Execution & Performance

🧠 Intelligence & AI

πŸ›‘οΈ Reliability & Ops


⚑ Step Grouping

Step Grouping allows you to run multiple consecutive steps in the same execution environment (container/process). This is critical for optimizing performance when you have many small steps.

🎯 When to use

Use grouping for small, sequential tasks (clean β†’ transform β†’ validate) that don't need separate containers. Skip it for heavy compute steps that benefit from isolated resources.

from flowyml import step

# These three steps will execute in ONE container
@step(outputs=["raw"], execution_group="prep")
def load(): ...

@step(inputs=["raw"], outputs=["clean"], execution_group="prep")
def clean(raw): ...

@step(inputs=["clean"], outputs=["stats"], execution_group="prep")
def analyze(clean): ...

πŸ“Š Resource Aggregation

FlowyML intelligently calculates the resources needed for a group by taking the Maximum of all participants. If Step A needs 1 GPU and Step B needs 2 GPUs, the entire group will provision 2 GPUs.

β†’ Deep Dive: Step Grouping Guide


πŸ•΅οΈ GenAI & LLM Observability

FlowyML provides deep tracing for LLM applications. Unlike generic loggers, we understand the structure of GenAI chains.

πŸ” Waterfall View

See nested calls (Chain β†’ Retrieval β†’ LLM) with per-step token counts and timing.

πŸ’° Auto-Costing

Automatic cost calculation for OpenAI, Anthropic, Cohere, and LlamaIndex models.

πŸ”— Trace-to-Eval

Bridge production traces directly into an Evaluation Dataset for offline scoring.

1
2
3
4
5
6
7
from flowyml import trace_llm

@trace_llm(name="qa_system", model="gpt-4o")
def ask(question: str):
    # This entire execution, including tokens and cost,
    # is tracked and visible in the FlowyML Dashboard.
    return llm.invoke(question)

β†’ Deep Dive: LLM Tracing Guide Β· Eval Adapters


πŸ‘€ Human-in-the-Loop

Some actions shouldn't be fully automated. FlowyML provides Approval Gates that pause pipeline execution and notify your team.

from flowyml import Pipeline, approval

pipeline = Pipeline("deploy-to-prod")
pipeline.add_step(train_model)

# The pipeline will PAUSE here and notify the team
pipeline.add_step(
    approval(
        name="release_gate",
        approver="senior-ds@company.com",
        timeout_seconds=3600
    )
)

pipeline.add_step(deploy_trigger)

⏱️ Timeout Behavior

If no approval is received within timeout_seconds, the pipeline will fail safely. Set this based on your team's SLA.

β†’ Deep Dive: Human-in-the-Loop Guide


πŸ’Ύ Checkpointing

ML training is expensive and prone to transient failures (preemptible instances, OOM, network). FlowyML Checkpoints ensure you never lose progress.

  • πŸ”„ Automatic State Saving: Every artifact is saved to the ArtifactStore
  • ⚑ Intelligent Resumption: Use pipeline.rerun(run_id="...") to skip the 10-hour processing and jump straight to the training step that failed
  • πŸ”’ Immutable Snapshots: Pipeline snapshots guarantee reproducibility
1
2
3
4
5
# Resume from failure β€” skips all completed steps
result = pipeline.rerun(run_id="previous_failed_run")

# Resume from a specific step
result = pipeline.rerun(run_id="abc-123", from_step="train_model")

β†’ Deep Dive: Checkpointing & Experiment Tracking Guide


πŸ“Š Data Drift Monitoring

FlowyML includes high-performance statistical utilities to monitor your data distribution and detect model degradation before it reaches production.

from flowyml.monitoring import detect_drift

# Compare current production batch vs. historical training baseline
drift = detect_drift(
    reference_data=baseline_df['age'],
    current_data=production_df['age'],
    threshold=0.1
)

if drift['drift_detected']:
    # Automatically triggers a Slack alert via flowyml Notification System
    send_alert(f"Drift detected in 'age' (PSI: {drift['psi']})")

β†’ Deep Dive: Data Drift Guide


πŸ”” Notification Hub

Connect your pipelines to the tools your team uses. Configure once β€” all pipelines inherit the channels.

1
2
3
4
5
6
7
8
9
from flowyml import configure_notifications

configure_notifications(
    slack_webhook="https://hooks.slack.com/...",
    email_config={...},
    console=True
)

# Steps will automatically report start/success/failure to these channels
Channel Setup Best For
πŸ–₯️ Console Always enabled Development & debugging
πŸ’¬ Slack slack_webhook URL Real-time team alerts
πŸ“§ Email email_config dict Daily summaries & reports
πŸ”§ Custom Your subclass Discord, PagerDuty, Teams

β†’ Deep Dive: Notifications & Alerts Guide


πŸ“… Scheduling & Automation

Run your pipelines on a schedule without needing a separate cron job or Airflow instance.

1
2
3
4
5
6
7
8
9
from flowyml import PipelineScheduler

scheduler = PipelineScheduler()
scheduler.schedule_daily(
    "model_refresh",
    lambda: my_pipeline.run(),
    hour=3
)
scheduler.start()

β†’ Deep Dive: Scheduling Guide


πŸ† Model Leaderboard & Comparisons

Keep track of your best experiments with an automatic leaderboard.

1
2
3
4
5
6
7
8
from flowyml import ModelLeaderboard

board = ModelLeaderboard(metric="val_accuracy", higher_is_better=True)
board.add_score("res-net-50", run_id="r1", score=0.94)
board.add_score("vit-base", run_id="r2", score=0.96)

# Prints a beautiful CLI table or renders in the Dashboard
board.display()

β†’ Deep Dive: Model Leaderboard Guide


🧠 Dynamic Sub-Pipelines

For advanced users, FlowyML allows you to generate entire pipelines at runtime. This is perfect for Hyperparameter Search or Cross-Validation.

from flowyml import dynamic, Pipeline, step

@dynamic(outputs=["best_model"])
def hp_search(lrs: list):
    sub = Pipeline("sweep")
    for lr in lrs:
        @step(outputs=[f"model_{lr}"])
        def train(): return train_with_lr(lr)
        sub.add_step(train)
    return sub

β†’ Deep Dive: Dynamic Workflows Guide Β· Sub-Pipelines Guide


πŸ“ What's Next?

πŸš€ Deploy

Learn how to deploy your pipelines as REST APIs in the Deployment Lab.

πŸ”Œ Extend

Explore the Plugin API to build your own custom integrations.

πŸ’» Examples

Browse the Examples Gallery for production-ready pipeline templates.