Skip to content

Types API πŸ“

Type definitions used in flowyml.

Resource Requirements

Resource requirements for a pipeline step.

Orchestrator-agnostic resource specification that can be translated to platform-specific formats (Kubernetes, Vertex AI, SageMaker, etc.).

Parameters:

Name Type Description Default
cpu str | None

CPU cores (e.g., "2", "500m", "2.5")

None
memory str | None

RAM amount (e.g., "4Gi", "8192Mi", "16G")

None
storage str | None

Ephemeral storage (e.g., "100Gi", "50G")

None
gpu GPUConfig | None

GPU configuration

None
node_affinity NodeAffinity | None

Node selection rules

None

Examples:

>>> # Simple CPU/memory
>>> resources = ResourceRequirements(cpu="2", memory="4Gi")
>>> # With GPU
>>> resources = ResourceRequirements(cpu="4", memory="16Gi", gpu=GPUConfig(gpu_type="nvidia-tesla-v100", count=2))
1
2
3
4
5
6
7
8
>>> # With node affinity
>>> resources = ResourceRequirements(
...     cpu="8",
...     memory="32Gi",
...     node_affinity=NodeAffinity(
...         required={"gpu": "true"}, tolerations=[{"key": "nvidia.com/gpu", "operator": "Exists"}]
...     ),
... )

Functions

__getitem__(key: str) -> Any

Provide dict-style access for backwards compatibility.

Source code in flowyml/core/resources.py
def __getitem__(self, key: str) -> Any:
    """Provide dict-style access for backwards compatibility."""
    if not hasattr(self, key):
        raise KeyError(key)
    value = getattr(self, key)
    if key == "gpu" and isinstance(value, GPUConfig):
        return value.count
    return value

__post_init__()

Validate resource specifications.

Source code in flowyml/core/resources.py
def __post_init__(self):
    """Validate resource specifications."""
    if self.cpu and not self._is_valid_cpu(self.cpu):
        msg = f"Invalid CPU format: {self.cpu}"
        raise ValueError(msg)
    if self.memory and not self._is_valid_memory(self.memory):
        msg = f"Invalid memory format: {self.memory}"
        raise ValueError(msg)
    if self.storage and not self._is_valid_memory(self.storage):
        msg = f"Invalid storage format: {self.storage}"
        raise ValueError(msg)

get_gpu_count() -> int

Get total number of GPUs requested.

Source code in flowyml/core/resources.py
def get_gpu_count(self) -> int:
    """Get total number of GPUs requested."""
    return self.gpu.count if self.gpu else 0

has_gpu() -> bool

Check if GPU resources are requested.

Source code in flowyml/core/resources.py
def has_gpu(self) -> bool:
    """Check if GPU resources are requested."""
    return self.gpu is not None

merge_with(other: ResourceRequirements) -> ResourceRequirements

Merge with another ResourceRequirements, taking maximum of each.

This is used when grouping steps to aggregate their resource needs. Strategy: - CPU: Take maximum - Memory: Take maximum - Storage: Take maximum - GPU: Merge configs (max count, best type) - Node affinity: Merge constraints

Parameters:

Name Type Description Default
other ResourceRequirements

Another ResourceRequirements to merge with

required

Returns:

Type Description
ResourceRequirements

New ResourceRequirements with merged specifications

Source code in flowyml/core/resources.py
def merge_with(self, other: "ResourceRequirements") -> "ResourceRequirements":
    """Merge with another ResourceRequirements, taking maximum of each.

    This is used when grouping steps to aggregate their resource needs.
    Strategy:
    - CPU: Take maximum
    - Memory: Take maximum
    - Storage: Take maximum
    - GPU: Merge configs (max count, best type)
    - Node affinity: Merge constraints

    Args:
        other: Another ResourceRequirements to merge with

    Returns:
        New ResourceRequirements with merged specifications
    """
    # Merge CPU
    merged_cpu = None
    if self.cpu and other.cpu:
        merged_cpu = self._compare_cpu(self.cpu, other.cpu)
    elif self.cpu:
        merged_cpu = self.cpu
    elif other.cpu:
        merged_cpu = other.cpu

    # Merge memory
    merged_memory = None
    if self.memory and other.memory:
        merged_memory = self._compare_memory(self.memory, other.memory)
    elif self.memory:
        merged_memory = self.memory
    elif other.memory:
        merged_memory = other.memory

    # Merge storage
    merged_storage = None
    if self.storage and other.storage:
        merged_storage = self._compare_memory(self.storage, other.storage)
    elif self.storage:
        merged_storage = self.storage
    elif other.storage:
        merged_storage = other.storage

    # Merge GPU
    merged_gpu = None
    if self.gpu and other.gpu:
        merged_gpu = self.gpu.merge_with(other.gpu)
    elif self.gpu:
        merged_gpu = self.gpu
    elif other.gpu:
        merged_gpu = other.gpu

    # Merge node affinity
    merged_affinity = None
    if self.node_affinity and other.node_affinity:
        merged_affinity = self.node_affinity.merge_with(other.node_affinity)
    elif self.node_affinity:
        merged_affinity = self.node_affinity
    elif other.node_affinity:
        merged_affinity = other.node_affinity

    return ResourceRequirements(
        cpu=merged_cpu,
        memory=merged_memory,
        storage=merged_storage,
        gpu=merged_gpu,
        node_affinity=merged_affinity,
    )

to_dict() -> dict[str, Any]

Convert to dictionary representation.

Source code in flowyml/core/resources.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary representation."""
    result = {}
    if self.cpu:
        result["cpu"] = self.cpu
    if self.memory:
        result["memory"] = self.memory
    if self.storage:
        result["storage"] = self.storage
    if self.gpu:
        result["gpu"] = self.gpu.to_dict()
    if self.node_affinity:
        result["node_affinity"] = self.node_affinity.to_dict()
    return result

Scheduler Config

Bases: BaseModel

Scheduler configuration.

Functions

from_env() -> SchedulerConfig classmethod

Load from environment variables.

Source code in flowyml/core/scheduler_config.py
@classmethod
def from_env(cls) -> "SchedulerConfig":
    """Load from environment variables."""
    return cls(
        persist_schedules=os.getenv("FLOWYML_SCHEDULER_PERSIST", "true").lower() == "true",
        db_path=os.getenv("FLOWYML_SCHEDULER_DB_PATH"),
        distributed=os.getenv("FLOWYML_SCHEDULER_DISTRIBUTED", "false").lower() == "true",
        lock_backend=os.getenv("FLOWYML_SCHEDULER_LOCK_BACKEND", "file"),
        redis_url=os.getenv("FLOWYML_SCHEDULER_REDIS_URL"),
        check_interval_seconds=int(os.getenv("FLOWYML_SCHEDULER_CHECK_INTERVAL", "10")),
        max_concurrent_runs=int(os.getenv("FLOWYML_SCHEDULER_MAX_CONCURRENT", "5")),
        timezone=os.getenv("FLOWYML_SCHEDULER_TIMEZONE", "UTC"),
    )