Pipeline Scheduling β°
Automate pipeline execution so you never miss a deadline.
What you'll learn
How to schedule pipelines for recurring execution with zero manual intervention. Manual pipeline execution doesn't scale β scheduling turns ad-hoc jobs into reliable automation.
Why Scheduling Matters β°
Without scheduling: - Manual overhead: "Did someone run the daily ETL?" - Missed deadlines: Forgetting to run the weekly report - No reliability: Pipelines run only when someone remembers
With flowyml scheduling: - Zero manual work: Pipelines run automatically - Multi-timezone: Run at 9 AM local time for each region - Fault-tolerant: Survives restarts, prevents duplicate runs
Decision Guide: Strategy βοΈ
| Use Case | Schedule Type | Example |
|---|---|---|
| Business Reports | Daily at specific time |
"Run sales report at 8 AM" |
| Data Sync | Interval (minutes/hours) |
"Poll API every 15 minutes" |
| Complex Timing | Cron expression |
"Every weekday at 9 AM, except holidays" |
| High Frequency | Hourly at specific minute |
"Update cache at :00 past each hour" |
Overview βΉοΈ
The PipelineScheduler provides:
- Cron schedules: Complex schedules using standard cron syntax
- Daily schedules: Run at specific times each day
- Hourly schedules: Run at specific minute each hour
- Interval schedules: Run at regular intervals
- Timezone support: Schedule in any timezone
- Persistence: Schedules survive restarts (SQLAlchemy-backed SQLite database)
- Distributed: Coordinate across multiple servers (Redis/File locking)
- Execution History: Track all scheduled run executions with metadata
Quick Start π
Schedule Types π
Cron Schedule βοΈ
Use standard cron expressions for complex schedules. Requires croniter.
Daily Schedule π
Run at a specific time each day.
Hourly Schedule π
Run at a specific minute each hour.
Interval Schedule β±οΈ
Run at regular intervals.
Advanced Features β‘
Persistence πΎ
Schedules are automatically persisted to a local SQLite database (.flowyml_scheduler.db) using SQLAlchemy. This ensures that schedules are not lost if the application restarts and provides better type safety and database portability.
Technical Details: - Uses SQLAlchemy ORM for all database operations (no raw SQL queries) - Supports SQLite by default, with easy migration to PostgreSQL if needed - Automatic schema creation and migration - Transaction-safe operations
To configure persistence:
Distributed Scheduling π
For multi-server deployments, flowyml supports distributed locking to prevent duplicate executions.
File-based Locking (Default): Good for single-machine, multi-process setups. Redis Locking: Recommended for multi-server setups.
Timezone Support π
All schedule methods accept a timezone argument. Requires pytz.
Monitoring & Health π₯
The scheduler tracks metrics and health status.
Execution History π
The scheduler automatically tracks execution history for all scheduled runs, including: - Start and completion times - Success/failure status - Duration - Error messages (if any) - Run IDs (for linking to pipeline runs in the UI)
Managing Schedules π οΈ
List All Schedules
Enable/Disable/Remove
API Integration π
The scheduler is fully integrated with the flowyml Backend API.
Endpoints:
- GET /api/schedules: List all schedules
- POST /api/schedules: Create a new schedule
- GET /api/scheduler/health: Get scheduler health metrics
- POST /api/schedules/{name}/enable: Enable schedule
- POST /api/schedules/{name}/disable: Disable schedule
- DELETE /api/schedules/{name}: Delete schedule
Deployment π
Docker Deployment π³
Mount a volume to persist the scheduler database.
Environment Config βοΈ
Configure the scheduler via environment variables:
flowyml_SCHEDULER_PERSIST: "true"/"false"flowyml_SCHEDULER_DB_PATH: Path to SQLite DBflowyml_SCHEDULER_DISTRIBUTED: "true"/"false"flowyml_SCHEDULER_REDIS_URL: Redis connection stringflowyml_SCHEDULER_TIMEZONE: Default timezone
API Reference π
PipelineScheduler
Methods:
- schedule_cron(name, func, cron_expression, timezone="UTC") - Schedule using cron expression
- schedule_daily(name, func, hour, minute, timezone="UTC") - Schedule daily at specific time
- schedule_hourly(name, func, minute, timezone="UTC") - Schedule hourly at specific minute
- schedule_interval(name, func, hours, minutes, seconds, timezone="UTC") - Schedule at intervals
- health_check() -> Dict - Get scheduler health and metrics
- get_history(schedule_name, limit=50) -> List[Dict] - Get execution history for a schedule
- list_schedules() -> List[Schedule] - List all schedules
- enable(name) - Enable a schedule
- disable(name) - Disable a schedule
- unschedule(name) - Remove a schedule
- clear() - Remove all schedules
- start() - Start the scheduler
- stop() - Stop the scheduler
SchedulerConfig
Configuration object for the scheduler.
persist_schedules: booldb_path: strdistributed: boollock_backend: strredis_url: strtimezone: str