β‘ Databricks Integration
Run FlowyML pipelines on Databricks with automatic cluster management, job submission, and native MLflow integration.
π₯ Spark π MLflow π Unity Catalog βοΈ Multi-Cloud
β‘ Databricks
What you'll learn
How to run FlowyML pipelines on Databricks β from cluster provisioning to job submission, log streaming, and secret management β with zero infrastructure code.
FlowyML maps seamlessly onto Databricks primitives:
| FlowyML Concept | Databricks Primitive |
|---|---|
| Stack | Cluster + Runtime |
| Pipeline | Workflow / Job Run |
| Artifacts | MLflow / DBFS |
| Secrets | Databricks Secrets / Vault |
Prerequisites
# Install the Databricks SDK (optional dependency)
pip install "flowyml[databricks]"
# or
pip install databricks-sdk
Authentication
The Databricks adapter uses the SDK's unified auth β no credentials are hard-coded. It respects DATABRICKS_HOST, DATABRICKS_TOKEN, Azure AD, AWS profiles, and all other native credential providers.
Stack Configuration
Configure a Databricks stack in your flowyml.yaml:
stacks:
databricks-prod:
compute:
backend: databricks
cluster_name: flowyml-training
node_type_id: i3.xlarge
num_workers: 4
spark_version: "14.3.x-scala2.12"
runtime:
python_version: "3.11"
secrets:
provider: vault
vault_url: https://vault.company.com
Or via environment variables:
export DATABRICKS_HOST=https://adb-xxxx.azuredatabricks.net
export DATABRICKS_TOKEN=dapi-xxxxx
export FLOWYML_STACK=databricks-prod
Execution Flow
The DatabricksBackendAdapter manages the full lifecycle:
graph LR
A[prepare] --> B[submit]
B --> C[status]
C --> D[logs]
C --> E[cancel]
C -->|poll| C
1. Prepare β Cluster Management
from flowyml import Pipeline
pipeline = Pipeline("training", stack="databricks-prod")
pipeline.run() # prepare() is called automatically
What happens under the hood:
- Reuse: Finds an existing cluster with matching name in
RUNNINGorRESIZINGstate - Restart: Restarts a
TERMINATEDcluster instead of creating a new one - Create: Provisions a new cluster from the stack spec if none exists
2. Submit β Job Submission
The pipeline graph is submitted as a Databricks Job Run using python_wheel_task:
# The adapter builds a Databricks job definition:
{
"run_name": "flowyml-training-a1b2c3d4",
"tasks": [{
"task_key": "flowyml_pipeline",
"python_wheel_task": {
"package_name": "flowyml",
"entry_point": "run_pipeline",
"parameters": ["--pipeline-name", "training", "--run-id", "..."]
},
"existing_cluster_id": "0123-456789-abc"
}]
}
3. Status Monitoring
Databricks lifecycle states are mapped to FlowyML's RunStatus:
| Databricks State | Result | FlowyML Status |
|---|---|---|
PENDING / RUNNING |
β | RUNNING |
TERMINATED |
SUCCESS |
SUCCEEDED |
TERMINATED |
FAILED / TIMEDOUT |
FAILED |
TERMINATED |
CANCELED |
CANCELLED |
SKIPPED |
β | CANCELLED |
INTERNAL_ERROR |
β | FAILED |
4. Log Streaming
# Logs are streamed from the Databricks run output:
# - Notebook output results
# - Job stdout/stderr logs
# - Error traces (if any)
5. Cancellation
Runs can be cancelled at any time via WorkspaceClient.jobs.cancel_run().
Cluster Configuration
Fixed Workers
Autoscaling
compute:
backend: databricks
cluster_name: flowyml-autoscale
node_type_id: i3.xlarge
autoscale:
min_workers: 1
max_workers: 8
GPU Instances
compute:
backend: databricks
cluster_name: flowyml-gpu
node_type_id: p3.2xlarge # NVIDIA V100
num_workers: 2
Full Example
from flowyml import Pipeline, step, Context
from flowyml.assets import Dataset, Model, Metrics
@step
def load_data(ctx: Context) -> Dataset:
import pandas as pd
df = pd.read_csv("/dbfs/data/training.csv")
return Dataset(df, name="training_data")
@step
def train(ctx: Context, data: Dataset) -> Model:
from sklearn.ensemble import RandomForestClassifier
clf = RandomForestClassifier(n_estimators=100)
clf.fit(data.data.drop("target", axis=1), data.data["target"])
return Model(clf, name="fraud_detector", framework="sklearn")
@step
def evaluate(ctx: Context, model: Model, data: Dataset) -> Metrics:
preds = model.data.predict(data.data.drop("target", axis=1))
accuracy = (preds == data.data["target"]).mean()
return Metrics({"accuracy": accuracy, "n_samples": len(data.data)})
# Run on Databricks β cluster is auto-managed
pipeline = Pipeline("fraud_detection", stack="databricks-prod")
pipeline.add_steps([load_data, train, evaluate])
pipeline.run()
Best Practices
Use autoscaling for variable workloads
Autoscaling clusters optimize cost by scaling down during idle periods and up during heavy computation.
Pin Spark versions
Always specify spark_version in your stack config to ensure reproducible environments across runs.
Cluster startup time
New clusters take 5β10 minutes to start. Reuse existing clusters by keeping cluster_name consistent across runs.
π What's Next?
π Secrets Management
Configure Vault, Azure KV, or AWS SM for secure credential access.
π Experiment Tracking
Dual-write tracking to FlowyML and MLflow/WandB simultaneously.