Skip to content

Welcome to FlowyML 🌊

flowyml Logo
The Enterprise-Grade ML Pipeline Framework for Humans


FlowyML is a production-ready ML pipeline orchestration framework that bridges the gap between rapid experimentation and enterprise deployment where assets are first-class citizens. Write pipelines as simple Python scripts, then scale them to production without rewriting a single line.

[!TIP] The promise: Go from notebook to production in hours, not weeks. FlowyML handles orchestration, caching, versioning, and monitoring — so you can focus on ML, not infrastructure.

🎯 Why FlowyML?

The Problem FlowyML Solves

Most ML teams face the same painful trade-offs:

  • Quick prototyping tools (notebooks, scripts) don't scale to production
  • Enterprise MLOps platforms require steep learning curves and complex configuration or is vendor locked
  • Orchestration frameworks force you into rigid patterns or obscure DSLs
  • Experiment tracking is disconnected from execution and deployment

FlowyML eliminates these trade-offs. It's designed for the way data scientists actually work — with pure Python — while providing enterprise-grade capabilities when you need them, extensible with plugins, assets are first-class citizens, experiments and storage are integrated and versioned.

What Makes FlowyML Different

  • 🚀 Zero Boilerplate --- Define steps as pure Python functions. No YAML, no DSLs, no complex wiring. Just @step decorators and you're done.

    Why this matters: Reduces pipeline development time by 70% compared to traditional frameworks. Your team spends time on ML, not on orchestration syntax.

  • 🧠 Auto-Context Injection --- Parameters are automatically injected into steps based on type hints. Change a hyperparameter once, it flows everywhere.

    Why this matters: Eliminates configuration management headaches. Run the same pipeline with different configs for dev/staging/prod without code changes.

  • ⚡ Smart Caching --- Intelligent caching strategies (code hash, input hash) skip expensive recomputation. Only re-run what actually changed.

    Why this matters: Saves compute costs by 40-60% in iterative development. A 3-hour pipeline becomes 20 minutes when you're tweaking one step.

  • 👁 Real-Time UI --- Beautiful, dark-mode dashboard to monitor pipelines, visualize DAGs, and inspect artifacts as they execute.

    Why this matters: Debug production issues in minutes instead of hours. See exactly what's happening, when it's happening.

  • :plug: Extensive Plugin Ecosystem --- FlowyML is built on top of plugins, which allows you to extend its functionality with custom plugins and components from other frameworks.

    Why this matters: FlowyML is designed to be flexible and composable.

  • :branch: Steps Grouping, branching and conditions --- FlowyML allows you to group consecutive steps to run in the same container/executor. Perfect for reducing overhead while maintaining clear step boundaries and separation of concerns with branching and conditions to incorporate logic into your pipelines natively

    Why this matters: FlowyML allows for clear separation of concerns and composable logic into your pipelines natively.

  • 🏢 Centralized Hub & Docker --- Run locally per project or deploy as a centralized entity for the company. Backend and Frontend are fully dockerized.

    Why this matters: Start small on your laptop, then scale to a company-wide platform without changing your code.

  • 📁 Project-Based Organization --- Built-in multi-tenancy for managing multiple teams and initiatives. Scoped runs, artifacts, and metadata.

    Why this matters: Keep your work organized and secure as your team grows.

  • 📝 Pipeline Templates --- Stop reinventing the wheel. Use pre-built templates for common ML patterns like Training, ETL, and A/B Testing.

    Why this matters: Standardize your team's workflows and get started in seconds.

💡 Real-World Impact

Here's what FlowyML delivers in practice:

Challenge Without FlowyML With FlowyML
Dev → Production 4-8 weeks of rewriting Hours using same code
Pipeline iteration Full re-runs (hours) Cached steps (minutes)
Debugging failures Parse logs, guess state Visual DAG + artifact inspection
Team collaboration Divergent scripts Standardized, versioned pipelines
Multi-environment Rewrite for each Single pipeline, multiple stacks
Experiment tracking Manual logging Automatic lineage + versioning

[!IMPORTANT] Production-Ready from Day One: FlowyML isn't just for prototypes. It's built for regulated industries, multi-tenant deployments, and enterprise scale. But you can start simple and grow into those features.

🚀 Feature Showcase

FlowyML isn't just another orchestrator. It's a complete toolkit for building, debugging, and deploying ML applications.

1. Zero-Boilerplate Orchestration

Write pipelines as standard Python functions. No YAML, no DSLs, no complex wiring.

@step(outputs=["data"])
def load():
    return [1, 2, 3]

@step(inputs=["data"], outputs=["model"])
def train(data):
    return Model.train(data)

# It's just Python!
pipeline = Pipeline("simple").add_step(load).add_step(train)
pipeline.run()

2. 🧠 Intelligent Caching

Don't waste time re-running successful steps. FlowyML's multi-strategy caching understands your code and data.

  • Code Hash: Re-runs only when you change the code.
  • Input Hash: Re-runs only when input data changes.
  • Time-based: Re-runs after a specific duration.
# Only re-runs if 'data' changes, ignoring code changes
@step(cache="input_hash", outputs=["processed"])
def expensive_processing(data):
    return process(data)

3. 🤖 LLM & GenAI Ready

Built-in tools for the new era of AI. Trace token usage, latency, and costs automatically with built-in observability.

from flowyml import trace_llm

@step
@trace_llm(model="gpt-4", tags=["production"])
def generate_summary(text: str):
    # flowyml automatically tracks:
    # - Token usage (prompt/completion)
    # - Cost estimation
    # - Latency & Success/Failure rates
    return openai.ChatCompletion.create(...)

4. ⚡ Efficient Step Grouping & Separation of Concerns

Group consecutive steps to run in the same container/executor. Perfect for reducing overhead while maintaining clear step boundaries and keeping logic separate from configuration.

from flowyml.core.resources import ResourceRequirements, GPUConfig

# Group preprocessing steps - they'll share the same container
@step(outputs=["raw"], execution_group="preprocessing",
      resources=ResourceRequirements(cpu="2", memory="4Gi"))
def load_data():
    return fetch_from_source()

@step(inputs=["raw"], outputs=["clean"], execution_group="preprocessing",
      resources=ResourceRequirements(cpu="4", memory="8Gi"))
def clean_data(raw):
    return preprocess(raw)

# FlowyML automatically:
# ✅ Analyzes DAG for consecutive steps
# ✅ Aggregates resources (cpu="4", memory="8Gi")
# ✅ Executes in same environment (no container restart)

[!TIP] Why this matters: Traditional frameworks (like ZenML) run each step in a separate container, creating unnecessary overhead. FlowyML's intelligent grouping lets you maintain clean step separation while optimizing execution.

5. 🔀 Dynamic Workflows

Real-world ML isn't linear. Build complex, adaptive workflows with conditional logic and branching.

from flowyml import If, Switch

# Run 'deploy' only if model accuracy > 0.9
pipeline.add_step(
    If(condition=lambda ctx: ctx["accuracy"] > 0.9)
    .then(deploy_model)
    .else_(notify_team)
)

# Multi-way branching
pipeline.add_step(
    Switch(selector=lambda ctx: ctx["model_type"])
    .case("classification", train_classifier)
    .case("regression", train_regressor)
    .default(train_generic)
)

6. 🧩 Universal Plugin System

Extend with any tool. Even wrap and reuse ZenML components!

from flowyml.stacks.plugins import load_component

# Load any ZenML orchestrator
k8s_orch = load_component(
    "zenml:zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator"
)

# Use ZenML integrations
mlflow_tracker = load_component("zenml:zenml.integrations.mlflow.MLflowExperimentTracker")
great_expectations = load_component("zenml:zenml.integrations.great_expectations.DataValidator")

[!IMPORTANT] Best of Both Worlds: FlowyML's plugin system gives you access to ZenML's entire ecosystem while maintaining FlowyML's superior developer experience.

7. 👤 Human-in-the-Loop

Pause pipelines for manual approval, review, or intervention.

from flowyml import approval

pipeline.add_step(train_model)
pipeline.add_step(
    approval(
        name="approve_deployment",
        approver="ml-team",
        timeout_seconds=3600,
        auto_approve_if=lambda: os.getenv("CI") == "true"
    )
)
pipeline.add_step(deploy_model)

8. 📊 Built-in Experiment Tracking

No external tools needed. Tracking is built-in and automatic.

from flowyml.tracking import Experiment

exp = Experiment(
    name="baseline_training",
    description="Baseline model experiments",
    tags={"framework": "pytorch", "version": "v1"}
)

exp.log_run(
    run_id="run_001",
    metrics={"accuracy": 0.95, "loss": 0.12},
    parameters={"lr": 0.01, "batch_size": 32}
)

# Get best performing run
best = exp.get_best_run("accuracy", maximize=True)

9. 🏆 Model Leaderboard & Registry

Track, compare, version, and stage your models.

from flowyml import ModelLeaderboard
from flowyml.core import Model

# Leaderboard for model comparison
leaderboard = ModelLeaderboard(metric="accuracy", higher_is_better=True)
leaderboard.add_score(model_name="bert-base", run_id="run_123", score=0.92)
leaderboard.display()  # Beautiful console output

# Model registry with stages
model = Model.create(artifact=trained_model, score=0.95)
model.register(name="text_classifier", stage="production", version="v1.2.0")

10. 📅 Built-in Scheduling

Schedule recurring jobs without external orchestrators.

from flowyml import PipelineScheduler

scheduler = PipelineScheduler()

# Daily at 2 AM
scheduler.schedule_daily(
    name="daily_training",
    pipeline_func=lambda: pipeline.run(),
    hour=2, minute=0
)

# Every 6 hours
scheduler.schedule_interval(
    name="data_refresh",
    pipeline_func=lambda: refresh_pipeline.run(),
    hours=6
)

scheduler.start()  # Non-blocking

11. 🔔 Smart Notifications

Slack, Email, and custom alerts. All built-in.

from flowyml import configure_notifications, get_notifier

configure_notifications(
    console=True,
    slack_webhook="https://hooks.slack.com/services/YOUR/WEBHOOK",
    email_config={...}
)

# Automatic notifications
# - Pipeline start/success/failure
# - Data drift detection
# - Manual triggers available

notifier = get_notifier()
notifier.notify("Model deployed!", level="success")

12. 🎯 Interactive Debugging

Set breakpoints, inspect state, and debug like regular Python code.

from flowyml import StepDebugger

debugger = StepDebugger()
debugger.set_breakpoint("train_model")

# Run with debugging enabled
pipeline.run(debug=True)

# When breakpoint hits:
# - Inspect variables
# - Check intermediate outputs
# - Step through execution

13. 📦 First-Class Asset Types

Assets are not just files; they are first-class citizens with lineage, metadata, and versioning. Specialized types for ML workflows.

from flowyml import Dataset, Model, Metrics, FeatureSet

# Type-safe ML assets with metadata
dataset = Dataset.create(
    data=df,
    name="training_data",
    metadata={"source": "postgres", "rows": 10000}
)

model = Model.create(
    artifact=trained_model,
    score=0.95,
    metadata={"framework": "pytorch", "params": {...}}
)

metrics = Metrics.create(values={"accuracy": 0.95, "f1": 0.93})

14. 🔄 Smart Retries & Circuit Breakers

Handle failures gracefully. Production-ready from day one.

# Automatic retries
@step(retry=3, timeout=300)
def flaky_api_call():
    return external_api.fetch()

# Circuit breakers prevent cascading failures
# Automatically stops calling failing dependencies

15. 📈 Data Drift Detection

Monitor distribution shifts. Trigger retraining automatically.

from flowyml import detect_drift

drift_result = detect_drift(
    reference_data=train_feature,
    current_data=prod_feature,
    threshold=0.1  # PSI threshold
)

if drift_result['drift_detected']:
    print(f"⚠️ Drift detected! PSI: {drift_result['psi']:.4f}")
    trigger_retraining()

16. 🌐 Pipeline Versioning

Git-like versioning for pipelines. Track changes, compare, rollback.

from flowyml import Pipeline, VersionedPipeline, context

ctx = context(learning_rate=0.001, epochs=10)

# Method 1: Use Pipeline with version parameter (auto-creates VersionedPipeline)
pipeline = Pipeline("training", context=ctx, version="v1.0.0", project_name="ml_project")
pipeline.add_step(load_data)
pipeline.save_version()

# ... make changes ...
pipeline.version = "v1.1.0"
pipeline.save_version()

# Method 2: Use VersionedPipeline directly
pipeline = VersionedPipeline("training", context=ctx, version="v1.0.0", project_name="ml_project")
pipeline.add_step(load_data)
pipeline.save_version()

# Compare versions to see exactly what changed (steps, code, context)
diff = pipeline.compare_with("v1.0.0")
print(diff["modified_steps"])  # ['train_model']
print(diff["context_changes"]) # {'learning_rate': {'old': 0.01, 'new': 0.001}}

17. 🏭 Enterprise Production Features

Everything you need to run mission-critical workloads.

  • 🔄 Automatic Retries: Handle transient failures gracefully.
  • ⏰ Scheduling: Built-in cron scheduler for recurring jobs.
  • 🔔 Notifications: Slack/Email alerts on success or failure.
  • 🛡️ Circuit Breakers: Stop cascading failures automatically.

18. 🏢 Centralized Hub & Docker

Ready for the enterprise. Run locally per project or deploy as a centralized entity for the company. - Docker Ready: Backend and Frontend are fully dockerized. - Centralized Hub: Share pipelines, artifacts, and experiments across the team.

  • Remote Execution: Configure local clients to execute on the remote hub.

19. 🔌 Universal Integrations

Works with your existing stack.

  • ML Frameworks: PyTorch, TensorFlow, Keras, Scikit-learn, HuggingFace.
  • Cloud Providers: AWS, GCP, Azure (via plugins).
  • Tools: MLflow, Weights & Biases, Great Expectations.

20. 📂 Project-Based Organization

Built-in multi-tenancy for managing multiple teams and initiatives.

from flowyml import Project

project = Project("recommendation_system")
pipeline = project.create_pipeline("training")

# All runs, artifacts, and metadata are automatically scoped to the project
runs = project.list_runs()
stats = project.get_stats()

21. 📝 Pipeline Templates

Stop reinventing the wheel. Use pre-built templates for common ML patterns.

from flowyml.core.templates import create_from_template

# Create a standard training pipeline in one line
pipeline = create_from_template(
    "ml_training",
    data_loader=my_loader,
    trainer=my_trainer,
    evaluator=my_evaluator
)

⚡️ Quick Start

See how simple it is — this is a complete, runnable ML pipeline:

from flowyml import Pipeline, step, context

@step(outputs=["dataset"])
def load_data():
    return [1, 2, 3, 4, 5]

@step(inputs=["dataset"], outputs=["model"])
def train_model(dataset, learning_rate: float = 0.01):
    # 'learning_rate' is automatically injected from context!
    print(f"Training on {len(dataset)} items with lr={learning_rate}")
    return "my_trained_model"

# Run it!
ctx = context(learning_rate=0.05)
pipeline = Pipeline("quickstart", context=ctx)
pipeline.add_step(load_data)
pipeline.add_step(train_model)

pipeline.run()

That's it. No YAML. No config files. No boilerplate. Just Python.

🚀 Ready to Build Better Pipelines?

  • 🚀 Getting Started --- Build your first pipeline in 5 minutes. No prior MLOps experience required.

  • 📖 Core Concepts --- Understand pipelines, steps, context, and assets — the building blocks of FlowyML.

  • ⚡ Advanced Features --- Deep dive into caching, parallelism, debugging, and production patterns.

  • 📈 User Guide --- Master projects, versioning, scheduling, and multi-tenant deployments.

  • :plug: Integrations --- Connect with Keras, GCP, and your existing ML stack.

  • 🛠 API Reference --- Complete API documentation for every class and function.


Questions? Issues? Open an issue on GitHub or check out the Resources page for community links.