π Features Explorer
π FlowyML Features Explorer
Discover everything FlowyML can do β from GenAI observability and LLM evaluation to pipeline scheduling, data drift detection, and beyond. Each feature is production-ready and batteries-included.
π€ GenAI π Evaluations β° Scheduling π Drift Detection π Dynamic Workflows π¦ Artifact Catalog
New Features Overview
1οΈβ£ GenAI & LLM Monitoring
Track LLM calls, tokens, and costs automatically:
from flowyml import trace_llm
@trace_llm(name="summarize")
def generate_summary(text):
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Summarize: {text}"}]
)
return response.choices[0].message.content
# Traces are automatically saved
result = generate_summary("Long text here...")
Features:
- Automatic input/output capture
- Token usage tracking
- Cost calculation
- Parent-child trace relationships
- View traces in UI at /api/traces
2οΈβ£ π GenAI Observability β‘ NEW
Full-stack GenAI observability for any AI framework β LangGraph, LangChain, OpenAI SDK, CrewAI, AutoGen, or custom code. Track every LLM call, tool invocation, chain execution, RAG pipeline, tokens, costs, artifacts, and errors with a single import.
4 Supported Frameworks:
# LangGraph β @observe() decorator, zero-config
from flowyml import observe
@observe(name="my_agent", project="chatbot")
def handle_query(query, flowyml_session=None):
return graph.invoke(input, config=flowyml_session.config)
# LangChain β trace_chain() for chains/runnables
from flowyml.integrations.langchain import trace_chain
with trace_chain("qa_chain") as session:
result = chain.invoke(input, config=session.config)
# OpenAI SDK β drop-in replacement, no LangChain needed
from flowyml import TracedOpenAI
client = TracedOpenAI(project="my_app")
response = client.chat.completions.create(model="gpt-4o-mini", messages=[...])
# Any Framework β universal adapter
from flowyml import log_llm_call
log_llm_call(model="gpt-4o", prompt="Hello", response="Hi!", prompt_tokens=5, completion_tokens=2)
What Gets Tracked Automatically:
- π€ Every LLM call (model, prompts, responses β saved as first-class artifacts)
- π§ Every tool invocation (input, output, duration)
- π Chain/graph node execution with parent-child hierarchy
- π RAG pipeline: retriever queries + retrieved documents as artifacts
- π Token usage (prompt + completion + total per call and session)
- π° Cost estimation (OpenAI, Anthropic, Google, Mistral, Cohere models)
- β± Latency per step and total session duration
- π¦ Artifacts: prompts, responses, documents, configs β all first-class citizens
- π¨ Canvas-ready DAG: full trace tree for FlowyML canvas visualization
- β Error tracking with full context
- π· Model identification, tagging, and multi-model sessions
- π View everything in FlowyML UI at /api/traces
3οΈβ£ Keras Integration
Automatic experiment tracking for Keras models:
from flowyml import flowymlKerasCallback
model.fit(
x_train, y_train,
epochs=10,
callbacks=[
flowymlKerasCallback(
experiment_name="mnist_training",
run_name="baseline_v1",
log_model=True
)
]
)
Auto-logs: - Training metrics (loss, accuracy, etc.) - Model architecture & summary - Optimizer configuration - Training parameters - Model checkpoints
4οΈβ£ Data Drift Detection
Monitor data distribution shifts:
from flowyml import detect_drift, compute_stats
# Detect drift
drift_result = detect_drift(
reference_data=train_data['feature'],
current_data=prod_data['feature'],
threshold=0.1 # PSI threshold
)
if drift_result['drift_detected']:
print(f"β οΈ Drift detected! PSI: {drift_result['psi']:.4f}")
# Compute stats
stats = compute_stats(data)
print(f"Mean: {stats['mean']}, Std: {stats['std']}")
5οΈβ£ ZenML Auto-Integration β‘NEW
Import the entire ZenML ecosystem with one line:
from flowyml.stacks import import_all_zenml
# Import all ZenML components at once
components = import_all_zenml()
# Done! All ZenML orchestrators, artifact stores, etc. are ready
CLI commands:
# Check ZenML status
flowyml zenml status
# List and install integrations
flowyml zenml list
flowyml zenml install mlflow
flowyml zenml install kubernetes
# Import all at once
flowyml zenml import-all
Features: - Automatic discovery of ZenML integrations - Zero configuration wrapping of components - Full CLI for installation and import - Works with 50+ ZenML integrations
6οΈβ£ Pipeline Scheduling
Run pipelines automatically on a schedule:
from flowyml import PipelineScheduler
scheduler = PipelineScheduler()
# Daily at 2am
scheduler.schedule_daily(
name="daily_training",
pipeline_func=lambda: my_pipeline.run(),
hour=2, minute=0
)
# Every 6 hours
scheduler.schedule_interval(
name="data_refresh",
pipeline_func=lambda: refresh_pipeline.run(),
hours=6
)
# Start scheduler
scheduler.start() # Non-blocking
# scheduler.start(blocking=True) # Blocking
7οΈβ£ Notifications
Get notified about pipeline events:
from flowyml import configure_notifications, get_notifier
# Configure channels
configure_notifications(
console=True,
slack_webhook="https://hooks.slack.com/...",
email_config={
'smtp_host': 'smtp.gmail.com',
'username': 'you@gmail.com',
'password': 'your-password',
'from_addr': 'you@gmail.com',
'to_addrs': ['team@company.com']
}
)
# Use in your code
notifier = get_notifier()
notifier.notify(
title="Training Complete",
message="Model achieved 95% accuracy",
level="success"
)
# Or use event hooks
notifier.on_pipeline_success(pipeline.name, run_id, duration)
notifier.on_pipeline_failure(pipeline.name, run_id, error)
notifier.on_drift_detected(feature_name, psi_value)
8οΈβ£ Model Leaderboard
Compare and rank models:
from flowyml import ModelLeaderboard
leaderboard = ModelLeaderboard(
metric="accuracy",
higher_is_better=True
)
# Add scores
leaderboard.add_score("bert-base", run_id="run_123", score=0.92)
leaderboard.add_score("distilbert", run_id="run_124", score=0.89)
# Display rankings
leaderboard.display(n=10)
# Get top models
top_5 = leaderboard.get_top(n=5)
Compare multiple runs:
from flowyml import compare_runs
comparison = compare_runs(
run_ids=["run_123", "run_124", "run_125"],
metrics=["accuracy", "f1_score", "latency"]
)
9οΈβ£ Pipeline Templates
Create pipelines from pre-built templates:
from flowyml import create_from_template, list_templates
# See available templates
print(list_templates()) # ['ml_training', 'etl', 'ab_test']
# Create from template
pipeline = create_from_template(
'ml_training',
name='my_training',
data_loader=load_data,
preprocessor=preprocess,
trainer=train_model,
evaluator=evaluate,
model_saver=save_model
)
# Run it
result = pipeline.run()
Available Templates:
- ml_training: Standard ML training workflow
- etl / data_pipeline: Extract-Transform-Load
- ab_test: A/B testing with model comparison
π Checkpointing
Resume failed pipelines:
from flowyml import PipelineCheckpoint
checkpoint = PipelineCheckpoint(run_id="run_123")
# In your step
def expensive_computation():
result = do_work()
checkpoint.save_step_state("compute", result)
return result
# Resume later
if checkpoint.exists():
state = checkpoint.load()
last_step = state['last_completed_step']
output = checkpoint.load_step_state(last_step)
1οΈβ£1οΈβ£ Human-in-the-Loop
Add approval gates to pipelines:
from flowyml import approval, Pipeline
pipeline = Pipeline("sensitive_operation")
# Add approval step
approval_step = approval(
name="approve_deployment",
approver="data-team",
timeout_seconds=3600,
auto_approve_if=lambda: os.getenv("AUTO_APPROVE") == "true"
)
pipeline.add_step(approval_step)
1οΈβ£2οΈβ£ Evaluations Framework β‘NEW
Evaluate ML models and LLM outputs with 29+ built-in scorers:
from flowyml.evals import evaluate, EvalDataset, Accuracy, F1Score, EvalSuite
# Quick evaluation
data = EvalDataset.create_classical(
"model_v2", predictions=[1, 0, 1, 1], targets=[1, 0, 0, 1]
)
result = evaluate(data=data, scorers=[Accuracy(threshold=0.9), F1Score()])
# Reusable suite
suite = EvalSuite("quality_gates", scorers=[Accuracy(), F1Score()])
result = suite.run(data=data, experiment="model_v2")
# GenAI (LLM-as-a-judge)
from flowyml.evals import Relevance, make_judge
judge = make_judge("quality", "Evaluate response accuracy", model="openai:/gpt-4o-mini")
result = evaluate(data=genai_data, scorers=[Relevance(), judge])
Features:
- 29+ built-in scorers (classification, regression, GenAI)
- Custom scorers via make_judge() and make_scorer()
- Automatic regression detection
- CI/CD quality gates (EvalAssert)
- Pipeline-native evaluations (EvalStep)
- Judge Arena (A/B test evaluators)
- Continuous evaluation via schedules
- Trace-to-evaluation bridge
- Full CLI: flowyml eval run/compare/assert/scorers
- See full guide: docs/evaluations.md
π UI Features
Access the web UI at http://localhost:8080:
- Dashboard: Overview stats
- Runs: Pipeline execution history
- Pipelines: All registered pipelines
- Assets: Browse artifacts
- Experiments: Compare experiment runs
- Traces: View LLM call traces (NEW!)
π― Quick Start Example
Complete example using multiple features:
from flowyml import (
Pipeline, step, context,
configure_notifications,
PipelineScheduler,
ModelLeaderboard,
flowymlKerasCallback
)
# 1. Configure notifications
configure_notifications(console=True)
# 2. Define pipeline
ctx = context(epochs=10, batch_size=32)
pipeline = Pipeline("training", context=ctx)
@step(outputs=["model", "metrics"])
def train(epochs: int, batch_size: int):
model = create_model()
history = model.fit(
x_train, y_train,
epochs=epochs,
batch_size=batch_size,
callbacks=[flowymlKerasCallback("mnist_exp")]
)
return model, history.history
pipeline.add_step(train)
# 3. Add to leaderboard
leaderboard = ModelLeaderboard("val_accuracy")
@step(inputs=["metrics"])
def log_to_leaderboard(metrics):
accuracy = metrics['val_accuracy'][-1]
leaderboard.add_score("my_model", run_id="...", score=accuracy)
pipeline.add_step(log_to_leaderboard)
# 4. Schedule to run daily
scheduler = PipelineScheduler()
scheduler.schedule_daily(
name="daily_training",
pipeline_func=lambda: pipeline.run(),
hour=2
)
scheduler.start()
π Additional Resources
- Full documentation: Getting Started
- API Reference:
/api/docs(when server is running) - Examples: Examples
- Roadmap: Check the GitHub repository for the latest roadmap
π Pipeline Engineering Features
1οΈβ£3οΈβ£ Build-Time Type Validation β‘NEW
Catch type mismatches between connected steps at Pipeline.build() time:
@step(outputs=["model"])
def train() -> Model:
return Model(clf)
@step(inputs=["model"])
def evaluate(model: Dataset): # β Type mismatch! Expects Dataset, gets Model
pass
pipeline.add_step(train).add_step(evaluate)
pipeline.build() # Raises: Pipeline type validation failed
1οΈβ£4οΈβ£ Map Tasks β‘NEW
Distribute work over collections with configurable concurrency and per-item retries:
from flowyml import map_task
@map_task(concurrency=8, retries=2, min_success_ratio=0.95)
def process_document(doc: dict) -> dict:
return transform(doc)
result = process_document(documents)
print(f"Processed {result.successes}/{result.total}")
See full guide: docs/advanced/map-tasks.md
1οΈβ£5οΈβ£ Dynamic Workflows β‘NEW
Generate sub-pipelines at runtime based on intermediate results:
from flowyml import dynamic, Pipeline, step
@dynamic(outputs=["best_model"])
def hyperparameter_search(config: dict):
sub = Pipeline("hp_search")
for lr in config["learning_rates"]:
@step(outputs=[f"model_lr_{lr}"])
def train(learning_rate=lr):
return train_model(learning_rate)
sub.add_step(train)
return sub
See full guide: docs/advanced/dynamic-workflows.md
1οΈβ£6οΈβ£ Sub-Pipeline Composition β‘NEW
Nest entire pipelines as steps in other pipelines:
preprocess = Pipeline("preprocessing")
preprocess.add_step(clean_data).add_step(normalize)
parent = Pipeline("training")
parent.add_sub_pipeline(preprocess, inputs=["raw"], outputs=["clean"])
parent.add_step(train_model)
See full guide: docs/advanced/subpipelines.md
1οΈβ£7οΈβ£ Artifact Catalog with Lineage β‘NEW
Centralized artifact discovery, tagging, and lineage tracking:
from flowyml import ArtifactCatalog
catalog = ArtifactCatalog() # Auto-selects local or remote backend
# Register with lineage
model_id = catalog.register(
name="classifier", artifact_type="Model",
parent_ids=[dataset_id],
tags={"stage": "production"},
)
# Discover and search
models = catalog.search("classifier")
lineage = catalog.get_lineage(model_id)
See full guide: docs/advanced/artifact-catalog.md
1οΈβ£8οΈβ£ Immutable Pipeline Snapshots β‘NEW
Capture exact pipeline definitions at execution time for reproducibility:
from flowyml import freeze_pipeline
snapshot = freeze_pipeline(pipeline)
print(snapshot.snapshot_hash) # SHA-256 seal
print(snapshot.step_hashes) # Per-step code hashes
assert snapshot.verify() # Verify integrity
1οΈβ£9οΈβ£ Enhanced DAG Validation β‘NEW
Pipeline.build() now detects:
- Dead outputs: assets produced but never consumed
- Unreachable nodes: steps that can't be reached from root nodes
- Type mismatches: incompatible types between connected steps
2οΈβ£0οΈβ£ Prompt Asset β‘NEW
First-class prompt management for LLM/GenAI workflows β versioned, templated, and lineage-tracked:
from flowyml import Prompt
# Text prompt with variable substitution
prompt = Prompt(
name="summarize",
template="Summarize the following text:\n\n{text}",
model="gpt-4",
temperature=0.7,
max_tokens=500,
)
rendered = prompt.render(text="Long document here...")
# Chat-style prompt (OpenAI format)
chat_prompt = Prompt.create(
template=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain {topic} in simple terms."},
],
name="explain",
model="gpt-4",
)
messages = chat_prompt.render(topic="neural networks")
# Lineage: track prompt evolution
v2 = Prompt(name="summarize_v2", template="...", parent=prompt)
# v2.parents β [prompt]
Features:
- Text and chat-style (multi-message) templates
- {variable} substitution with .render()
- Model config tracking (model, temperature, max_tokens)
- Automatic variable extraction from templates
- Full lineage tracking (parent β child prompt chains)
- .create() factory with auto-generated names
2οΈβ£1οΈβ£ Checkpoint Asset β‘NEW
Training checkpoint management with epoch/step tracking and framework-agnostic persistence:
from flowyml import Checkpoint
# Save training state
checkpoint = Checkpoint.create(
data=model.state_dict(),
name="resnet50_epoch_10",
epoch=10,
step=5000,
metrics={"loss": 0.23, "accuracy": 0.91},
is_best=True,
)
# Inspect checkpoint
print(checkpoint.epoch) # 10
print(checkpoint.checkpoint_metrics) # {"loss": 0.23, "accuracy": 0.91}
print(checkpoint.is_best) # True
# Save to disk (auto-detects PyTorch or falls back to pickle)
path = checkpoint.save("checkpoints/epoch_10.pt")
# Lineage: link to parent model
model_asset = Model(name="resnet50", data=model)
ckpt = Checkpoint.create(data=state, parent=model_asset)
Features:
- Epoch and global step tracking
- Metrics capture at checkpoint time
- is_best flag for best-model tracking
- Automatic state key extraction (for PyTorch state_dicts)
- Framework-agnostic save (PyTorch β pickle fallback)
- Lineage tracking (checkpoint β model relationship)
2οΈβ£2οΈβ£ Stack Hydration from YAML β‘NEW
Define stacks in flowyml.yaml and hydrate them into live, fully-wired Stack objects:
# flowyml.yaml
stacks:
local:
orchestrator: { type: local }
artifact_store: { type: local, path: "./artifacts" }
gcp-prod:
orchestrator: { type: vertex_ai, project: my-gcp-project }
artifact_store: { type: gcs, bucket: ml-artifacts }
model_registry: { type: vertex_model_registry }
experiment_tracker: { type: mlflow }
artifact_routing:
Model: { store: gcs, register: true, deploy: true }
Dataset: { store: gcs, path: "{run_id}/data/{step_name}" }
Metrics: { log_to_tracker: true }
active_stack: local
from flowyml.plugins.config import PluginConfig
from flowyml.plugins.stack_config import StackManager
config = PluginConfig("flowyml.yaml")
manager = StackManager(config)
# Hydrate into a live stack (resolves components via ComponentRegistry)
live_stack = manager.get_stack("gcp-prod").to_stack()
# Use it directly
pipeline = Pipeline("train", stack=live_stack)
pipeline.run()
# Context-manager switching
with manager.use_stack("gcp-prod"):
pipeline.run() # Uses GCP stack
# Reverts to previous active stack
Features:
- StackConfig.to_stack() hydrates YAML β live Stack objects
- Automatic component resolution via ComponentRegistry
- Fallback to LocalOrchestrator / LocalArtifactStore for unknown types
- Artifact routing rules attached to hydrated stack
- use_stack() context manager for temporary stack switching
- FLOWYML_STACK environment variable override
2οΈβ£3οΈβ£ Docker Build & Push β‘NEW
Automatic Docker image lifecycle for remote pipeline execution β data scientists never touch Docker, platform teams control images via enterprise stacks:
# Zero-config build (auto-detects deps)
flowyml docker build
# GPU build with push to registry
flowyml docker build --gpu --push --registry myregistry.azurecr.io
# Preview generated Dockerfile
flowyml docker generate --deps poetry
# Inspect auto-detected configuration
flowyml docker inspect
Features:
- 7 dependency managers auto-detected: conda, uv, poetry, pipenv, setup.py, pip, freeze
- Multi-stage builds with BuildKit cache and non-root user
- GPU/CUDA support with one flag (
--gpu) - 4 registries: Docker Hub, ACR, ECR, GCR
- Content-hash tagging for deterministic, cache-friendly builds
- Image policies β enforce base images, required labels, and registry allowlists
- Enterprise integration β
StackDefinition.to_docker_config()for governed stacks - 5 CLI commands:
docker build,push,generate,inspect,login - See full guide:
docs/guides/docker.md
π Design Pipelines Visually with FlowyML Notebook
π FlowyML Notebook β The Reactive Notebook That Ships to Production
FlowyML Notebook is a companion reactive notebook environment that replaces Jupyter for ML workflows. Write Python cells with automatic dependency tracking, then promote directly to FlowyML pipelines with one click.
Key features: Reactive DAG Β· Pure .py Storage Β· SmartPrep Advisor Β· Algorithm Matchmaker Β· 43 Recipes Β· GitHub Integration Β· AI Assistant Β· Publish as App
Learn more Β· GitHub Β· Docs
π Explore the Full Ecosystem
Happy MLOps! π