Skip to content

Plugins API Reference πŸ”Œ

Base classes and utilities for the FlowyML plugin system.

Base Plugin

flowyml.plugins.base.BasePlugin(name: str = None, **config)

Bases: ABC

Base class for all FlowyML plugins.

All plugins must inherit from this class and implement the required methods.

Initialize the plugin.

Parameters:

Name Type Description Default
name str

Optional name override for this instance.

None
**config

Configuration parameters for the plugin.

{}
Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Attributes

config: dict[str, Any] property

Get the plugin configuration.

name: str property

Get the plugin instance name.

plugin_type: PluginType abstractmethod property

Return the type of this plugin.

Functions

cleanup() -> None

Cleanup resources used by the plugin.

Override this method to close connections, flush buffers, etc.

Source code in flowyml/plugins/base.py
def cleanup(self) -> None:
    """Cleanup resources used by the plugin.

    Override this method to close connections, flush buffers, etc.
    """
    self._is_initialized = False

from_dict(data: dict[str, Any]) -> BasePlugin classmethod

Create a plugin instance from a dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing plugin configuration.

required

Returns:

Type Description
BasePlugin

A new plugin instance.

Source code in flowyml/plugins/base.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "BasePlugin":
    """Create a plugin instance from a dictionary.

    Args:
        data: Dictionary containing plugin configuration.

    Returns:
        A new plugin instance.
    """
    config = data.get("config", {})
    name = data.get("name")
    return cls(name=name, **config)

initialize() -> None

Initialize the plugin. Called before first use.

Override this method to perform setup operations like connecting to external services.

Source code in flowyml/plugins/base.py
def initialize(self) -> None:
    """Initialize the plugin. Called before first use.

    Override this method to perform setup operations like
    connecting to external services.
    """
    self._is_initialized = True

to_dict() -> dict[str, Any]

Serialize the plugin configuration to a dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation of the plugin.

Source code in flowyml/plugins/base.py
def to_dict(self) -> dict[str, Any]:
    """Serialize the plugin configuration to a dictionary.

    Returns:
        Dictionary representation of the plugin.
    """
    return {
        "name": self._name,
        "type": self.plugin_type.value,
        "config": self._config,
    }

validate() -> bool

Validate that the plugin is properly configured.

Returns:

Type Description
bool

True if the plugin is valid and ready to use.

Source code in flowyml/plugins/base.py
def validate(self) -> bool:
    """Validate that the plugin is properly configured.

    Returns:
        True if the plugin is valid and ready to use.
    """
    return True

Plugin Metadata

flowyml.plugins.base.PluginMetadata(name: str, description: str, plugin_type: PluginType, version: str = '1.0.0', author: str = 'FlowyML', packages: list[str] = list(), documentation_url: str = '', tags: list[str] = list()) dataclass

Metadata for a plugin.

Plugin Type

flowyml.plugins.base.PluginType

Bases: Enum

Types of plugins available in FlowyML.

Experiment Tracker

flowyml.plugins.base.ExperimentTracker(name: str = None, **config)

Bases: BasePlugin

Base class for experiment tracking plugins.

Experiment trackers record parameters, metrics, and artifacts from ML experiments for reproducibility and comparison.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Functions

end_run(status: str = 'FINISHED') -> None abstractmethod

End the current run.

Parameters:

Name Type Description Default
status str

Final status of the run (FINISHED, FAILED, etc.)

'FINISHED'
Source code in flowyml/plugins/base.py
@abstractmethod
def end_run(self, status: str = "FINISHED") -> None:
    """End the current run.

    Args:
        status: Final status of the run (FINISHED, FAILED, etc.)
    """
    pass

get_tracking_uri() -> str

Get the tracking URI for this tracker.

Returns:

Type Description
str

The tracking URI.

Source code in flowyml/plugins/base.py
def get_tracking_uri(self) -> str:
    """Get the tracking URI for this tracker.

    Returns:
        The tracking URI.
    """
    return self._config.get("tracking_uri", "")

log_artifact(local_path: str, artifact_path: str = None) -> None

Log an artifact (file) for the current run.

Parameters:

Name Type Description Default
local_path str

Path to the local file to log.

required
artifact_path str

Optional path within the artifact store.

None
Source code in flowyml/plugins/base.py
def log_artifact(self, local_path: str, artifact_path: str = None) -> None:
    """Log an artifact (file) for the current run.

    Args:
        local_path: Path to the local file to log.
        artifact_path: Optional path within the artifact store.
    """
    pass

log_metrics(metrics: dict[str, float], step: int = None) -> None abstractmethod

Log metrics for the current run.

Parameters:

Name Type Description Default
metrics dict[str, float]

Dictionary of metric names and values.

required
step int

Optional step number for the metrics.

None
Source code in flowyml/plugins/base.py
@abstractmethod
def log_metrics(self, metrics: dict[str, float], step: int = None) -> None:
    """Log metrics for the current run.

    Args:
        metrics: Dictionary of metric names and values.
        step: Optional step number for the metrics.
    """
    pass

log_model(model: Any, artifact_path: str, model_type: str = None) -> None

Log a model for the current run.

Parameters:

Name Type Description Default
model Any

The model object to log.

required
artifact_path str

Path within the artifact store.

required
model_type str

Type of model (sklearn, pytorch, tensorflow, etc.)

None
Source code in flowyml/plugins/base.py
def log_model(self, model: Any, artifact_path: str, model_type: str = None) -> None:
    """Log a model for the current run.

    Args:
        model: The model object to log.
        artifact_path: Path within the artifact store.
        model_type: Type of model (sklearn, pytorch, tensorflow, etc.)
    """
    pass

log_params(params: dict[str, Any]) -> None abstractmethod

Log parameters for the current run.

Parameters:

Name Type Description Default
params dict[str, Any]

Dictionary of parameter names and values.

required
Source code in flowyml/plugins/base.py
@abstractmethod
def log_params(self, params: dict[str, Any]) -> None:
    """Log parameters for the current run.

    Args:
        params: Dictionary of parameter names and values.
    """
    pass

start_run(run_name: str, experiment_name: str = None, tags: dict = None) -> str abstractmethod

Start a new experiment run.

Parameters:

Name Type Description Default
run_name str

Name for this run.

required
experiment_name str

Name of the experiment to log to.

None
tags dict

Optional tags for the run.

None

Returns:

Type Description
str

Run ID.

Source code in flowyml/plugins/base.py
@abstractmethod
def start_run(self, run_name: str, experiment_name: str = None, tags: dict = None) -> str:
    """Start a new experiment run.

    Args:
        run_name: Name for this run.
        experiment_name: Name of the experiment to log to.
        tags: Optional tags for the run.

    Returns:
        Run ID.
    """
    pass

Artifact Store Plugin

flowyml.plugins.base.ArtifactStorePlugin(name: str = None, **config)

Bases: BasePlugin

Base class for artifact storage plugins.

Artifact stores handle the persistence of artifacts like datasets, models, and other files.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Attributes

root_path: str property

Get the root path of the artifact store.

Functions

delete(path: str) -> bool

Delete an artifact from the store.

Parameters:

Name Type Description Default
path str

Path to delete.

required

Returns:

Type Description
bool

True if deletion was successful.

Source code in flowyml/plugins/base.py
def delete(self, path: str) -> bool:
    """Delete an artifact from the store.

    Args:
        path: Path to delete.

    Returns:
        True if deletion was successful.
    """
    return False

exists(path: str) -> bool abstractmethod

Check if an artifact exists.

Parameters:

Name Type Description Default
path str

Path to check.

required

Returns:

Type Description
bool

True if the artifact exists.

Source code in flowyml/plugins/base.py
@abstractmethod
def exists(self, path: str) -> bool:
    """Check if an artifact exists.

    Args:
        path: Path to check.

    Returns:
        True if the artifact exists.
    """
    pass

list(path: str = '') -> list[str]

List artifacts in a directory.

Parameters:

Name Type Description Default
path str

Directory path to list.

''

Returns:

Type Description
list[str]

List of artifact paths.

Source code in flowyml/plugins/base.py
def list(self, path: str = "") -> list[str]:  # noqa: A003
    """List artifacts in a directory.

    Args:
        path: Directory path to list.

    Returns:
        List of artifact paths.
    """
    return []

load(path: str) -> Any abstractmethod

Load an artifact from the store.

Parameters:

Name Type Description Default
path str

Path to the artifact.

required

Returns:

Type Description
Any

The loaded artifact.

Source code in flowyml/plugins/base.py
@abstractmethod
def load(self, path: str) -> Any:
    """Load an artifact from the store.

    Args:
        path: Path to the artifact.

    Returns:
        The loaded artifact.
    """
    pass

save(artifact: Any, path: str) -> str abstractmethod

Save an artifact to the store.

Parameters:

Name Type Description Default
artifact Any

The artifact to save.

required
path str

Path within the store.

required

Returns:

Type Description
str

Full URI of the saved artifact.

Source code in flowyml/plugins/base.py
@abstractmethod
def save(self, artifact: Any, path: str) -> str:
    """Save an artifact to the store.

    Args:
        artifact: The artifact to save.
        path: Path within the store.

    Returns:
        Full URI of the saved artifact.
    """
    pass

Orchestrator Plugin

flowyml.plugins.base.OrchestratorPlugin(name: str = None, **config)

Bases: BasePlugin

Base class for orchestrator plugins.

Orchestrators manage the execution of pipeline steps, handling scheduling, resource allocation, and monitoring.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Functions

cancel_run(run_id: str) -> bool

Cancel a running pipeline.

Parameters:

Name Type Description Default
run_id str

The run identifier.

required

Returns:

Type Description
bool

True if cancellation was successful.

Source code in flowyml/plugins/base.py
def cancel_run(self, run_id: str) -> bool:
    """Cancel a running pipeline.

    Args:
        run_id: The run identifier.

    Returns:
        True if cancellation was successful.
    """
    return False

get_run_status(run_id: str) -> str

Get the status of a pipeline run.

Parameters:

Name Type Description Default
run_id str

The run identifier.

required

Returns:

Type Description
str

Run status string.

Source code in flowyml/plugins/base.py
def get_run_status(self, run_id: str) -> str:
    """Get the status of a pipeline run.

    Args:
        run_id: The run identifier.

    Returns:
        Run status string.
    """
    return "unknown"

list_runs(pipeline_name: str = None, limit: int = 100) -> list[dict]

List pipeline runs.

Parameters:

Name Type Description Default
pipeline_name str

Optional filter by pipeline name.

None
limit int

Maximum number of runs to return.

100

Returns:

Type Description
list[dict]

List of run dictionaries.

Source code in flowyml/plugins/base.py
def list_runs(self, pipeline_name: str = None, limit: int = 100) -> list[dict]:
    """List pipeline runs.

    Args:
        pipeline_name: Optional filter by pipeline name.
        limit: Maximum number of runs to return.

    Returns:
        List of run dictionaries.
    """
    return []

run_pipeline(pipeline: Any, run_id: str, context: dict[str, Any] = None, **kwargs) -> Any abstractmethod

Run a pipeline.

Parameters:

Name Type Description Default
pipeline Any

The pipeline to run.

required
run_id str

Unique identifier for this run.

required
context dict[str, Any]

Optional context dictionary.

None
**kwargs

Additional orchestrator-specific arguments.

{}

Returns:

Type Description
Any

Run result or status.

Source code in flowyml/plugins/base.py
@abstractmethod
def run_pipeline(
    self,
    pipeline: Any,
    run_id: str,
    context: dict[str, Any] = None,
    **kwargs,
) -> Any:
    """Run a pipeline.

    Args:
        pipeline: The pipeline to run.
        run_id: Unique identifier for this run.
        context: Optional context dictionary.
        **kwargs: Additional orchestrator-specific arguments.

    Returns:
        Run result or status.
    """
    pass

Container Registry Plugin

flowyml.plugins.base.ContainerRegistryPlugin(name: str = None, **config)

Bases: BasePlugin

Base class for container registry plugins.

Container registries store and manage Docker images for pipeline execution.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Attributes

registry_uri: str property

Get the registry URI.

Functions

get_image_uri(image_name: str, tag: str = 'latest') -> str abstractmethod

Get the full URI for an image.

Parameters:

Name Type Description Default
image_name str

Name of the image.

required
tag str

Image tag.

'latest'

Returns:

Type Description
str

Full image URI.

Source code in flowyml/plugins/base.py
@abstractmethod
def get_image_uri(self, image_name: str, tag: str = "latest") -> str:
    """Get the full URI for an image.

    Args:
        image_name: Name of the image.
        tag: Image tag.

    Returns:
        Full image URI.
    """
    pass

pull_image(image_name: str, tag: str = 'latest') -> None

Pull an image from the registry.

Parameters:

Name Type Description Default
image_name str

Name of the image.

required
tag str

Image tag.

'latest'
Source code in flowyml/plugins/base.py
def pull_image(self, image_name: str, tag: str = "latest") -> None:
    """Pull an image from the registry.

    Args:
        image_name: Name of the image.
        tag: Image tag.
    """
    pass

push_image(image_name: str, tag: str = 'latest') -> str abstractmethod

Push an image to the registry.

Parameters:

Name Type Description Default
image_name str

Name of the image.

required
tag str

Image tag.

'latest'

Returns:

Type Description
str

Full image URI.

Source code in flowyml/plugins/base.py
@abstractmethod
def push_image(self, image_name: str, tag: str = "latest") -> str:
    """Push an image to the registry.

    Args:
        image_name: Name of the image.
        tag: Image tag.

    Returns:
        Full image URI.
    """
    pass

Model Registry Plugin

flowyml.plugins.base.ModelRegistryPlugin(name: str = None, **config)

Bases: BasePlugin

Base class for model registry plugins.

Model registries track, version, and manage ML models throughout their lifecycle.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Functions

get_model(name: str, version: str = None) -> Any abstractmethod

Get a model by name.

Parameters:

Name Type Description Default
name str

Model name.

required
version str

Optional version (defaults to latest).

None

Returns:

Type Description
Any

The model.

Source code in flowyml/plugins/base.py
@abstractmethod
def get_model(self, name: str, version: str = None) -> Any:
    """Get a model by name.

    Args:
        name: Model name.
        version: Optional version (defaults to latest).

    Returns:
        The model.
    """
    pass

list_models(name: str = None) -> list[dict]

List registered models.

Parameters:

Name Type Description Default
name str

Optional filter by model name.

None

Returns:

Type Description
list[dict]

List of model metadata dictionaries.

Source code in flowyml/plugins/base.py
def list_models(self, name: str = None) -> list[dict]:
    """List registered models.

    Args:
        name: Optional filter by model name.

    Returns:
        List of model metadata dictionaries.
    """
    return []

register_model(name: str, model_uri: str, version: str = None, metadata: dict = None) -> str abstractmethod

Register a model.

Parameters:

Name Type Description Default
name str

Model name.

required
model_uri str

URI to the model artifact.

required
version str

Optional version string.

None
metadata dict

Optional metadata dictionary.

None

Returns:

Type Description
str

Model version identifier.

Source code in flowyml/plugins/base.py
@abstractmethod
def register_model(
    self,
    name: str,
    model_uri: str,
    version: str = None,
    metadata: dict = None,
) -> str:
    """Register a model.

    Args:
        name: Model name.
        model_uri: URI to the model artifact.
        version: Optional version string.
        metadata: Optional metadata dictionary.

    Returns:
        Model version identifier.
    """
    pass

transition_model_stage(name: str, version: str, stage: str) -> None

Transition a model to a new stage.

Parameters:

Name Type Description Default
name str

Model name.

required
version str

Model version.

required
stage str

Target stage (staging, production, archived).

required
Source code in flowyml/plugins/base.py
def transition_model_stage(
    self,
    name: str,
    version: str,
    stage: str,
) -> None:
    """Transition a model to a new stage.

    Args:
        name: Model name.
        version: Model version.
        stage: Target stage (staging, production, archived).
    """
    pass

Model Deployer Plugin

flowyml.plugins.base.ModelDeployerPlugin(name: str = None, **config)

Bases: BasePlugin

Base class for model deployment plugins.

Model deployers handle deploying ML models to inference endpoints for serving predictions.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Functions

deploy(model: Any, endpoint_name: str, model_name: str = None, **config) -> str abstractmethod

Deploy a model to an endpoint.

Parameters:

Name Type Description Default
model Any

The model to deploy (URI, artifact, or object).

required
endpoint_name str

Name for the endpoint.

required
model_name str

Optional model name in registry.

None
**config

Deployment configuration.

{}

Returns:

Type Description
str

Endpoint URI or identifier.

Source code in flowyml/plugins/base.py
@abstractmethod
def deploy(
    self,
    model: Any,
    endpoint_name: str,
    model_name: str = None,
    **config,
) -> str:
    """Deploy a model to an endpoint.

    Args:
        model: The model to deploy (URI, artifact, or object).
        endpoint_name: Name for the endpoint.
        model_name: Optional model name in registry.
        **config: Deployment configuration.

    Returns:
        Endpoint URI or identifier.
    """
    pass

get_endpoint_status(endpoint: str) -> dict[str, Any]

Get the status of an endpoint.

Parameters:

Name Type Description Default
endpoint str

Endpoint URI or identifier.

required

Returns:

Type Description
dict[str, Any]

Status dictionary.

Source code in flowyml/plugins/base.py
def get_endpoint_status(self, endpoint: str) -> dict[str, Any]:
    """Get the status of an endpoint.

    Args:
        endpoint: Endpoint URI or identifier.

    Returns:
        Status dictionary.
    """
    return {"status": "unknown"}

list_endpoints() -> list[dict]

List all deployed endpoints.

Returns:

Type Description
list[dict]

List of endpoint metadata dictionaries.

Source code in flowyml/plugins/base.py
def list_endpoints(self) -> list[dict]:
    """List all deployed endpoints.

    Returns:
        List of endpoint metadata dictionaries.
    """
    return []

predict(endpoint: str, data: Any) -> Any abstractmethod

Make predictions using a deployed model.

Parameters:

Name Type Description Default
endpoint str

Endpoint URI or identifier.

required
data Any

Input data for prediction.

required

Returns:

Type Description
Any

Prediction results.

Source code in flowyml/plugins/base.py
@abstractmethod
def predict(self, endpoint: str, data: Any) -> Any:
    """Make predictions using a deployed model.

    Args:
        endpoint: Endpoint URI or identifier.
        data: Input data for prediction.

    Returns:
        Prediction results.
    """
    pass

undeploy(endpoint: str) -> bool

Undeploy a model endpoint.

Parameters:

Name Type Description Default
endpoint str

Endpoint URI or identifier.

required

Returns:

Type Description
bool

True if successful.

Source code in flowyml/plugins/base.py
def undeploy(self, endpoint: str) -> bool:
    """Undeploy a model endpoint.

    Args:
        endpoint: Endpoint URI or identifier.

    Returns:
        True if successful.
    """
    return False

Alerter Plugin

flowyml.plugins.base.AlerterPlugin(name: str = None, **config)

Bases: BasePlugin

Base class for alerter plugins.

Alerters send notifications about pipeline events, errors, and other important occurrences.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Functions

send_alert(title: str, message: str, level: str = 'info', **kwargs) -> bool abstractmethod

Send an alert notification.

Parameters:

Name Type Description Default
title str

Alert title.

required
message str

Alert message body.

required
level str

Alert level (info, warning, error, critical).

'info'
**kwargs

Additional alerter-specific parameters.

{}

Returns:

Type Description
bool

True if alert was sent successfully.

Source code in flowyml/plugins/base.py
@abstractmethod
def send_alert(
    self,
    title: str,
    message: str,
    level: str = "info",
    **kwargs,
) -> bool:
    """Send an alert notification.

    Args:
        title: Alert title.
        message: Alert message body.
        level: Alert level (info, warning, error, critical).
        **kwargs: Additional alerter-specific parameters.

    Returns:
        True if alert was sent successfully.
    """
    pass

send_error(title: str, message: str) -> bool

Send an error notification.

Source code in flowyml/plugins/base.py
def send_error(self, title: str, message: str) -> bool:
    """Send an error notification."""
    return self.send_alert(title, message, level="error")

send_success(title: str, message: str) -> bool

Send a success notification.

Source code in flowyml/plugins/base.py
def send_success(self, title: str, message: str) -> bool:
    """Send a success notification."""
    return self.send_alert(title, message, level="success")

Feature Store Plugin

flowyml.plugins.base.FeatureStorePlugin(name: str = None, **config)

Bases: BasePlugin

Base class for feature store plugins.

Feature stores manage ML features for training and inference, providing versioning, serving, and discovery capabilities.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Functions

get_feature_view(name: str, version: str = None) -> Any abstractmethod

Get a feature view by name.

Parameters:

Name Type Description Default
name str

Name of the feature view.

required
version str

Optional version.

None

Returns:

Type Description
Any

The feature view.

Source code in flowyml/plugins/base.py
@abstractmethod
def get_feature_view(self, name: str, version: str = None) -> Any:
    """Get a feature view by name.

    Args:
        name: Name of the feature view.
        version: Optional version.

    Returns:
        The feature view.
    """
    pass

get_historical_features(feature_refs: list[str], entity_df: Any) -> Any

Get historical features for training.

Parameters:

Name Type Description Default
feature_refs list[str]

List of feature references.

required
entity_df Any

DataFrame with entity keys and timestamps.

required

Returns:

Type Description
Any

DataFrame with features.

Source code in flowyml/plugins/base.py
def get_historical_features(
    self,
    feature_refs: list[str],
    entity_df: Any,
) -> Any:
    """Get historical features for training.

    Args:
        feature_refs: List of feature references.
        entity_df: DataFrame with entity keys and timestamps.

    Returns:
        DataFrame with features.
    """
    pass

get_online_features(feature_refs: list[str], entity_rows: list[dict]) -> dict[str, list] abstractmethod

Get online (real-time) features.

Parameters:

Name Type Description Default
feature_refs list[str]

List of feature references (feature_view:feature).

required
entity_rows list[dict]

Entity key-value pairs.

required

Returns:

Type Description
dict[str, list]

Dictionary of feature names to value lists.

Source code in flowyml/plugins/base.py
@abstractmethod
def get_online_features(
    self,
    feature_refs: list[str],
    entity_rows: list[dict],
) -> dict[str, list]:
    """Get online (real-time) features.

    Args:
        feature_refs: List of feature references (feature_view:feature).
        entity_rows: Entity key-value pairs.

    Returns:
        Dictionary of feature names to value lists.
    """
    pass

Data Validator Plugin

flowyml.plugins.base.DataValidatorPlugin(name: str = None, **config)

Bases: BasePlugin

Base class for data validation plugins.

Data validators check data quality, schema conformance, and detect anomalies in datasets.

Source code in flowyml/plugins/base.py
def __init__(self, name: str = None, **config):
    """Initialize the plugin.

    Args:
        name: Optional name override for this instance.
        **config: Configuration parameters for the plugin.
    """
    self._name = name or self.__class__.__name__
    self._config = config
    self._is_initialized = False

Functions

get_data_profile(data: Any) -> dict[str, Any]

Generate a profile of the data.

Parameters:

Name Type Description Default
data Any

The data to profile.

required

Returns:

Type Description
dict[str, Any]

Data profile dictionary.

Source code in flowyml/plugins/base.py
def get_data_profile(self, data: Any) -> dict[str, Any]:
    """Generate a profile of the data.

    Args:
        data: The data to profile.

    Returns:
        Data profile dictionary.
    """
    return {}

validate(data: Any, expectations: Any = None) -> dict[str, Any] abstractmethod

Validate data against expectations.

Parameters:

Name Type Description Default
data Any

The data to validate.

required
expectations Any

Validation rules/expectations.

None

Returns:

Type Description
dict[str, Any]

Validation results dictionary.

Source code in flowyml/plugins/base.py
@abstractmethod
def validate(self, data: Any, expectations: Any = None) -> dict[str, Any]:
    """Validate data against expectations.

    Args:
        data: The data to validate.
        expectations: Validation rules/expectations.

    Returns:
        Validation results dictionary.
    """
    pass