Skip to content

🌊 Features Explorer

🌊 FlowyML Features Explorer

Discover everything FlowyML can do β€” from GenAI observability and LLM evaluation to pipeline scheduling, data drift detection, and beyond. Each feature is production-ready and batteries-included.

πŸ€– GenAI πŸ“Š Evaluations ⏰ Scheduling πŸ“‰ Drift Detection πŸ”€ Dynamic Workflows πŸ“¦ Artifact Catalog

New Features Overview

1️⃣ GenAI & LLM Monitoring

Track LLM calls, tokens, and costs automatically:

from flowyml import trace_llm

@trace_llm(name="summarize")
def generate_summary(text):
    response = openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Summarize: {text}"}]
    )
    return response.choices[0].message.content

# Traces are automatically saved
result = generate_summary("Long text here...")

Features: - Automatic input/output capture - Token usage tracking - Cost calculation - Parent-child trace relationships - View traces in UI at /api/traces


2️⃣ πŸ”— GenAI Observability ⚑ NEW

Full-stack GenAI observability for any AI framework β€” LangGraph, LangChain, OpenAI SDK, CrewAI, AutoGen, or custom code. Track every LLM call, tool invocation, chain execution, RAG pipeline, tokens, costs, artifacts, and errors with a single import.

4 Supported Frameworks:

# LangGraph β€” @observe() decorator, zero-config
from flowyml import observe
@observe(name="my_agent", project="chatbot")
def handle_query(query, flowyml_session=None):
    return graph.invoke(input, config=flowyml_session.config)

# LangChain β€” trace_chain() for chains/runnables
from flowyml.integrations.langchain import trace_chain
with trace_chain("qa_chain") as session:
    result = chain.invoke(input, config=session.config)

# OpenAI SDK β€” drop-in replacement, no LangChain needed
from flowyml import TracedOpenAI
client = TracedOpenAI(project="my_app")
response = client.chat.completions.create(model="gpt-4o-mini", messages=[...])

# Any Framework β€” universal adapter
from flowyml import log_llm_call
log_llm_call(model="gpt-4o", prompt="Hello", response="Hi!", prompt_tokens=5, completion_tokens=2)

What Gets Tracked Automatically: - πŸ€– Every LLM call (model, prompts, responses β€” saved as first-class artifacts) - πŸ”§ Every tool invocation (input, output, duration) - πŸ”— Chain/graph node execution with parent-child hierarchy - πŸ“š RAG pipeline: retriever queries + retrieved documents as artifacts - πŸ“Š Token usage (prompt + completion + total per call and session) - πŸ’° Cost estimation (OpenAI, Anthropic, Google, Mistral, Cohere models) - ⏱ Latency per step and total session duration - πŸ“¦ Artifacts: prompts, responses, documents, configs β€” all first-class citizens - 🎨 Canvas-ready DAG: full trace tree for FlowyML canvas visualization - ❌ Error tracking with full context - 🏷 Model identification, tagging, and multi-model sessions - πŸ” View everything in FlowyML UI at /api/traces


3️⃣ Keras Integration

Automatic experiment tracking for Keras models:

from flowyml import flowymlKerasCallback

model.fit(
    x_train, y_train,
    epochs=10,
    callbacks=[
        flowymlKerasCallback(
            experiment_name="mnist_training",
            run_name="baseline_v1",
            log_model=True
        )
    ]
)

Auto-logs: - Training metrics (loss, accuracy, etc.) - Model architecture & summary - Optimizer configuration - Training parameters - Model checkpoints


4️⃣ Data Drift Detection

Monitor data distribution shifts:

from flowyml import detect_drift, compute_stats

# Detect drift
drift_result = detect_drift(
    reference_data=train_data['feature'],
    current_data=prod_data['feature'],
    threshold=0.1  # PSI threshold
)

if drift_result['drift_detected']:
    print(f"⚠️ Drift detected! PSI: {drift_result['psi']:.4f}")

# Compute stats
stats = compute_stats(data)
print(f"Mean: {stats['mean']}, Std: {stats['std']}")

5️⃣ ZenML Auto-Integration ⚑NEW

Import the entire ZenML ecosystem with one line:

from flowyml.stacks import import_all_zenml

# Import all ZenML components at once
components = import_all_zenml()
# Done! All ZenML orchestrators, artifact stores, etc. are ready

CLI commands:

# Check ZenML status
flowyml zenml status

# List and install integrations
flowyml zenml list
flowyml zenml install mlflow
flowyml zenml install kubernetes

# Import all at once
flowyml zenml import-all

Features: - Automatic discovery of ZenML integrations - Zero configuration wrapping of components - Full CLI for installation and import - Works with 50+ ZenML integrations


6️⃣ Pipeline Scheduling

Run pipelines automatically on a schedule:

from flowyml import PipelineScheduler

scheduler = PipelineScheduler()

# Daily at 2am
scheduler.schedule_daily(
    name="daily_training",
    pipeline_func=lambda: my_pipeline.run(),
    hour=2, minute=0
)

# Every 6 hours
scheduler.schedule_interval(
    name="data_refresh",
    pipeline_func=lambda: refresh_pipeline.run(),
    hours=6
)

# Start scheduler
scheduler.start()  # Non-blocking
# scheduler.start(blocking=True)  # Blocking

7️⃣ Notifications

Get notified about pipeline events:

from flowyml import configure_notifications, get_notifier

# Configure channels
configure_notifications(
    console=True,
    slack_webhook="https://hooks.slack.com/...",
    email_config={
        'smtp_host': 'smtp.gmail.com',
        'username': 'you@gmail.com',
        'password': 'your-password',
        'from_addr': 'you@gmail.com',
        'to_addrs': ['team@company.com']
    }
)

# Use in your code
notifier = get_notifier()
notifier.notify(
    title="Training Complete",
    message="Model achieved 95% accuracy",
    level="success"
)

# Or use event hooks
notifier.on_pipeline_success(pipeline.name, run_id, duration)
notifier.on_pipeline_failure(pipeline.name, run_id, error)
notifier.on_drift_detected(feature_name, psi_value)

8️⃣ Model Leaderboard

Compare and rank models:

from flowyml import ModelLeaderboard

leaderboard = ModelLeaderboard(
    metric="accuracy",
    higher_is_better=True
)

# Add scores
leaderboard.add_score("bert-base", run_id="run_123", score=0.92)
leaderboard.add_score("distilbert", run_id="run_124", score=0.89)

# Display rankings
leaderboard.display(n=10)

# Get top models
top_5 = leaderboard.get_top(n=5)

Compare multiple runs:

from flowyml import compare_runs

comparison = compare_runs(
    run_ids=["run_123", "run_124", "run_125"],
    metrics=["accuracy", "f1_score", "latency"]
)

9️⃣ Pipeline Templates

Create pipelines from pre-built templates:

from flowyml import create_from_template, list_templates

# See available templates
print(list_templates())  # ['ml_training', 'etl', 'ab_test']

# Create from template
pipeline = create_from_template(
    'ml_training',
    name='my_training',
    data_loader=load_data,
    preprocessor=preprocess,
    trainer=train_model,
    evaluator=evaluate,
    model_saver=save_model
)

# Run it
result = pipeline.run()

Available Templates: - ml_training: Standard ML training workflow - etl / data_pipeline: Extract-Transform-Load - ab_test: A/B testing with model comparison


πŸ”Ÿ Checkpointing

Resume failed pipelines:

from flowyml import PipelineCheckpoint

checkpoint = PipelineCheckpoint(run_id="run_123")

# In your step
def expensive_computation():
    result = do_work()
    checkpoint.save_step_state("compute", result)
    return result

# Resume later
if checkpoint.exists():
    state = checkpoint.load()
    last_step = state['last_completed_step']
    output = checkpoint.load_step_state(last_step)

1️⃣1️⃣ Human-in-the-Loop

Add approval gates to pipelines:

from flowyml import approval, Pipeline

pipeline = Pipeline("sensitive_operation")

# Add approval step
approval_step = approval(
    name="approve_deployment",
    approver="data-team",
    timeout_seconds=3600,
    auto_approve_if=lambda: os.getenv("AUTO_APPROVE") == "true"
)

pipeline.add_step(approval_step)

1️⃣2️⃣ Evaluations Framework ⚑NEW

Evaluate ML models and LLM outputs with 29+ built-in scorers:

from flowyml.evals import evaluate, EvalDataset, Accuracy, F1Score, EvalSuite

# Quick evaluation
data = EvalDataset.create_classical(
    "model_v2", predictions=[1, 0, 1, 1], targets=[1, 0, 0, 1]
)
result = evaluate(data=data, scorers=[Accuracy(threshold=0.9), F1Score()])

# Reusable suite
suite = EvalSuite("quality_gates", scorers=[Accuracy(), F1Score()])
result = suite.run(data=data, experiment="model_v2")

# GenAI (LLM-as-a-judge)
from flowyml.evals import Relevance, make_judge

judge = make_judge("quality", "Evaluate response accuracy", model="openai:/gpt-4o-mini")
result = evaluate(data=genai_data, scorers=[Relevance(), judge])

Features: - 29+ built-in scorers (classification, regression, GenAI) - Custom scorers via make_judge() and make_scorer() - Automatic regression detection - CI/CD quality gates (EvalAssert) - Pipeline-native evaluations (EvalStep) - Judge Arena (A/B test evaluators) - Continuous evaluation via schedules - Trace-to-evaluation bridge - Full CLI: flowyml eval run/compare/assert/scorers - See full guide: docs/evaluations.md


πŸ“Š UI Features

Access the web UI at http://localhost:8080:

  • Dashboard: Overview stats
  • Runs: Pipeline execution history
  • Pipelines: All registered pipelines
  • Assets: Browse artifacts
  • Experiments: Compare experiment runs
  • Traces: View LLM call traces (NEW!)

🎯 Quick Start Example

Complete example using multiple features:

from flowyml import (
    Pipeline, step, context,
    configure_notifications,
    PipelineScheduler,
    ModelLeaderboard,
    flowymlKerasCallback
)

# 1. Configure notifications
configure_notifications(console=True)

# 2. Define pipeline
ctx = context(epochs=10, batch_size=32)
pipeline = Pipeline("training", context=ctx)

@step(outputs=["model", "metrics"])
def train(epochs: int, batch_size: int):
    model = create_model()
    history = model.fit(
        x_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[flowymlKerasCallback("mnist_exp")]
    )
    return model, history.history

pipeline.add_step(train)

# 3. Add to leaderboard
leaderboard = ModelLeaderboard("val_accuracy")

@step(inputs=["metrics"])
def log_to_leaderboard(metrics):
    accuracy = metrics['val_accuracy'][-1]
    leaderboard.add_score("my_model", run_id="...", score=accuracy)

pipeline.add_step(log_to_leaderboard)

# 4. Schedule to run daily
scheduler = PipelineScheduler()
scheduler.schedule_daily(
    name="daily_training",
    pipeline_func=lambda: pipeline.run(),
    hour=2
)
scheduler.start()

πŸ“š Additional Resources


πŸ†• Pipeline Engineering Features

1️⃣3️⃣ Build-Time Type Validation ⚑NEW

Catch type mismatches between connected steps at Pipeline.build() time:

@step(outputs=["model"])
def train() -> Model:
    return Model(clf)

@step(inputs=["model"])
def evaluate(model: Dataset):  # ❌ Type mismatch! Expects Dataset, gets Model
    pass

pipeline.add_step(train).add_step(evaluate)
pipeline.build()  # Raises: Pipeline type validation failed

1️⃣4️⃣ Map Tasks ⚑NEW

Distribute work over collections with configurable concurrency and per-item retries:

from flowyml import map_task

@map_task(concurrency=8, retries=2, min_success_ratio=0.95)
def process_document(doc: dict) -> dict:
    return transform(doc)

result = process_document(documents)
print(f"Processed {result.successes}/{result.total}")

See full guide: docs/advanced/map-tasks.md


1️⃣5️⃣ Dynamic Workflows ⚑NEW

Generate sub-pipelines at runtime based on intermediate results:

from flowyml import dynamic, Pipeline, step

@dynamic(outputs=["best_model"])
def hyperparameter_search(config: dict):
    sub = Pipeline("hp_search")
    for lr in config["learning_rates"]:
        @step(outputs=[f"model_lr_{lr}"])
        def train(learning_rate=lr):
            return train_model(learning_rate)
        sub.add_step(train)
    return sub

See full guide: docs/advanced/dynamic-workflows.md


1️⃣6️⃣ Sub-Pipeline Composition ⚑NEW

Nest entire pipelines as steps in other pipelines:

preprocess = Pipeline("preprocessing")
preprocess.add_step(clean_data).add_step(normalize)

parent = Pipeline("training")
parent.add_sub_pipeline(preprocess, inputs=["raw"], outputs=["clean"])
parent.add_step(train_model)

See full guide: docs/advanced/subpipelines.md


1️⃣7️⃣ Artifact Catalog with Lineage ⚑NEW

Centralized artifact discovery, tagging, and lineage tracking:

from flowyml import ArtifactCatalog

catalog = ArtifactCatalog()  # Auto-selects local or remote backend

# Register with lineage
model_id = catalog.register(
    name="classifier", artifact_type="Model",
    parent_ids=[dataset_id],
    tags={"stage": "production"},
)

# Discover and search
models = catalog.search("classifier")
lineage = catalog.get_lineage(model_id)

See full guide: docs/advanced/artifact-catalog.md


1️⃣8️⃣ Immutable Pipeline Snapshots ⚑NEW

Capture exact pipeline definitions at execution time for reproducibility:

from flowyml import freeze_pipeline

snapshot = freeze_pipeline(pipeline)
print(snapshot.snapshot_hash)    # SHA-256 seal
print(snapshot.step_hashes)     # Per-step code hashes
assert snapshot.verify()        # Verify integrity

1️⃣9️⃣ Enhanced DAG Validation ⚑NEW

Pipeline.build() now detects: - Dead outputs: assets produced but never consumed - Unreachable nodes: steps that can't be reached from root nodes - Type mismatches: incompatible types between connected steps


2️⃣0️⃣ Prompt Asset ⚑NEW

First-class prompt management for LLM/GenAI workflows β€” versioned, templated, and lineage-tracked:

from flowyml import Prompt

# Text prompt with variable substitution
prompt = Prompt(
    name="summarize",
    template="Summarize the following text:\n\n{text}",
    model="gpt-4",
    temperature=0.7,
    max_tokens=500,
)
rendered = prompt.render(text="Long document here...")

# Chat-style prompt (OpenAI format)
chat_prompt = Prompt.create(
    template=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain {topic} in simple terms."},
    ],
    name="explain",
    model="gpt-4",
)
messages = chat_prompt.render(topic="neural networks")

# Lineage: track prompt evolution
v2 = Prompt(name="summarize_v2", template="...", parent=prompt)
# v2.parents β†’ [prompt]

Features: - Text and chat-style (multi-message) templates - {variable} substitution with .render() - Model config tracking (model, temperature, max_tokens) - Automatic variable extraction from templates - Full lineage tracking (parent β†’ child prompt chains) - .create() factory with auto-generated names


2️⃣1️⃣ Checkpoint Asset ⚑NEW

Training checkpoint management with epoch/step tracking and framework-agnostic persistence:

from flowyml import Checkpoint

# Save training state
checkpoint = Checkpoint.create(
    data=model.state_dict(),
    name="resnet50_epoch_10",
    epoch=10,
    step=5000,
    metrics={"loss": 0.23, "accuracy": 0.91},
    is_best=True,
)

# Inspect checkpoint
print(checkpoint.epoch)              # 10
print(checkpoint.checkpoint_metrics)  # {"loss": 0.23, "accuracy": 0.91}
print(checkpoint.is_best)            # True

# Save to disk (auto-detects PyTorch or falls back to pickle)
path = checkpoint.save("checkpoints/epoch_10.pt")

# Lineage: link to parent model
model_asset = Model(name="resnet50", data=model)
ckpt = Checkpoint.create(data=state, parent=model_asset)

Features: - Epoch and global step tracking - Metrics capture at checkpoint time - is_best flag for best-model tracking - Automatic state key extraction (for PyTorch state_dicts) - Framework-agnostic save (PyTorch β†’ pickle fallback) - Lineage tracking (checkpoint β†’ model relationship)


2️⃣2️⃣ Stack Hydration from YAML ⚑NEW

Define stacks in flowyml.yaml and hydrate them into live, fully-wired Stack objects:

# flowyml.yaml
stacks:
  local:
    orchestrator: { type: local }
    artifact_store: { type: local, path: "./artifacts" }

  gcp-prod:
    orchestrator: { type: vertex_ai, project: my-gcp-project }
    artifact_store: { type: gcs, bucket: ml-artifacts }
    model_registry: { type: vertex_model_registry }
    experiment_tracker: { type: mlflow }
    artifact_routing:
      Model:   { store: gcs, register: true, deploy: true }
      Dataset: { store: gcs, path: "{run_id}/data/{step_name}" }
      Metrics: { log_to_tracker: true }

active_stack: local
from flowyml.plugins.config import PluginConfig
from flowyml.plugins.stack_config import StackManager

config = PluginConfig("flowyml.yaml")
manager = StackManager(config)

# Hydrate into a live stack (resolves components via ComponentRegistry)
live_stack = manager.get_stack("gcp-prod").to_stack()

# Use it directly
pipeline = Pipeline("train", stack=live_stack)
pipeline.run()

# Context-manager switching
with manager.use_stack("gcp-prod"):
    pipeline.run()  # Uses GCP stack
# Reverts to previous active stack

Features: - StackConfig.to_stack() hydrates YAML β†’ live Stack objects - Automatic component resolution via ComponentRegistry - Fallback to LocalOrchestrator / LocalArtifactStore for unknown types - Artifact routing rules attached to hydrated stack - use_stack() context manager for temporary stack switching - FLOWYML_STACK environment variable override


2️⃣3️⃣ Docker Build & Push ⚑NEW

Automatic Docker image lifecycle for remote pipeline execution β€” data scientists never touch Docker, platform teams control images via enterprise stacks:

# Zero-config build (auto-detects deps)
flowyml docker build

# GPU build with push to registry
flowyml docker build --gpu --push --registry myregistry.azurecr.io

# Preview generated Dockerfile
flowyml docker generate --deps poetry

# Inspect auto-detected configuration
flowyml docker inspect

Features:

  • 7 dependency managers auto-detected: conda, uv, poetry, pipenv, setup.py, pip, freeze
  • Multi-stage builds with BuildKit cache and non-root user
  • GPU/CUDA support with one flag (--gpu)
  • 4 registries: Docker Hub, ACR, ECR, GCR
  • Content-hash tagging for deterministic, cache-friendly builds
  • Image policies β€” enforce base images, required labels, and registry allowlists
  • Enterprise integration β€” StackDefinition.to_docker_config() for governed stacks
  • 5 CLI commands: docker build, push, generate, inspect, login
  • See full guide: docs/guides/docker.md

πŸ““ Design Pipelines Visually with FlowyML Notebook

🌊 FlowyML Notebook β€” The Reactive Notebook That Ships to Production

FlowyML Notebook is a companion reactive notebook environment that replaces Jupyter for ML workflows. Write Python cells with automatic dependency tracking, then promote directly to FlowyML pipelines with one click.

pip install flowyml-notebook
fml-notebook dev  # πŸ”₯ Launch with hot-reload

Key features: Reactive DAG Β· Pure .py Storage Β· SmartPrep Advisor Β· Algorithm Matchmaker Β· 43 Recipes Β· GitHub Integration Β· AI Assistant Β· Publish as App

Learn more Β· GitHub Β· Docs


🌐 Explore the Full Ecosystem

πŸ““ FlowyML Notebook

Reactive notebook for designing and shipping pipelines visually.

Explore β†’

πŸ”Œ Integrations

Keras, PyTorch, MLflow, W&B, GCP, AWS, Azure, and more.

Browse β†’

🌊 Ecosystem

The complete FlowyML Universe and how it all fits together.

Discover β†’


Happy MLOps! 🌊