Skip to content

Artifact Stores API πŸ“¦

Artifact Stores manage the storage and retrieval of step outputs.

Base Artifact Store

Bases: ABC

Base class for artifact storage backends.

Functions

delete(path: str) -> None abstractmethod

Delete artifact at path.

Source code in flowyml/storage/artifacts.py
@abstractmethod
def delete(self, path: str) -> None:
    """Delete artifact at path."""
    pass

exists(path: str) -> bool abstractmethod

Check if artifact exists at path.

Source code in flowyml/storage/artifacts.py
@abstractmethod
def exists(self, path: str) -> bool:
    """Check if artifact exists at path."""
    pass

list_artifacts(prefix: str = '') -> list[str] abstractmethod

List all artifacts with optional prefix filter.

Source code in flowyml/storage/artifacts.py
@abstractmethod
def list_artifacts(self, prefix: str = "") -> list[str]:
    """List all artifacts with optional prefix filter."""
    pass

load(path: str) -> Any abstractmethod

Load an artifact from storage.

Parameters:

Name Type Description Default
path str

Storage path of the artifact

required

Returns:

Type Description
Any

The loaded artifact

Source code in flowyml/storage/artifacts.py
@abstractmethod
def load(self, path: str) -> Any:
    """Load an artifact from storage.

    Args:
        path: Storage path of the artifact

    Returns:
        The loaded artifact
    """
    pass

materialize(obj: Any, name: str, run_id: str, step_name: str, project_name: str = 'default') -> str

Materialize artifact to structured storage.

Parameters:

Name Type Description Default
obj Any

Object to materialize

required
name str

Name of the artifact

required
run_id str

ID of the current run

required
step_name str

Name of the step producing the artifact

required
project_name str

Name of the project

'default'

Returns:

Type Description
str

Path where artifact was saved

Source code in flowyml/storage/artifacts.py
def materialize(
    self,
    obj: Any,
    name: str,
    run_id: str,
    step_name: str,
    project_name: str = "default",
) -> str:
    """Materialize artifact to structured storage.

    Args:
        obj: Object to materialize
        name: Name of the artifact
        run_id: ID of the current run
        step_name: Name of the step producing the artifact
        project_name: Name of the project

    Returns:
        Path where artifact was saved
    """
    pass

save(artifact: Any, path: str, metadata: dict | None = None) -> str abstractmethod

Save an artifact to storage.

Parameters:

Name Type Description Default
artifact Any

The artifact to save

required
path str

Storage path for the artifact

required
metadata dict | None

Optional metadata dictionary

None

Returns:

Type Description
str

Full path where artifact was saved

Source code in flowyml/storage/artifacts.py
@abstractmethod
def save(self, artifact: Any, path: str, metadata: dict | None = None) -> str:
    """Save an artifact to storage.

    Args:
        artifact: The artifact to save
        path: Storage path for the artifact
        metadata: Optional metadata dictionary

    Returns:
        Full path where artifact was saved
    """
    pass

Local Artifact Store

Bases: ArtifactStore

Local filesystem artifact storage.

Initialize local artifact store.

Parameters:

Name Type Description Default
base_path str

Base directory for storing artifacts

'.flowyml/artifacts'
Source code in flowyml/storage/artifacts.py
def __init__(self, base_path: str = ".flowyml/artifacts"):
    """Initialize local artifact store.

    Args:
        base_path: Base directory for storing artifacts
    """
    self.base_path = Path(base_path)
    self.base_path.mkdir(parents=True, exist_ok=True)

Functions

delete(path: str) -> None

Delete artifact from filesystem.

Parameters:

Name Type Description Default
path str

Relative path to delete

required
Source code in flowyml/storage/artifacts.py
def delete(self, path: str) -> None:
    """Delete artifact from filesystem.

    Args:
        path: Relative path to delete
    """
    full_path = self.base_path / path
    if full_path.exists():
        if full_path.is_dir():
            shutil.rmtree(full_path)
        else:
            full_path.unlink()

        # Also delete metadata if exists
        metadata_path = full_path.with_suffix(".meta.json")
        if metadata_path.exists():
            metadata_path.unlink()

exists(path: str) -> bool

Check if artifact exists.

Parameters:

Name Type Description Default
path str

Relative path to check

required

Returns:

Type Description
bool

True if artifact exists, False otherwise

Source code in flowyml/storage/artifacts.py
def exists(self, path: str) -> bool:
    """Check if artifact exists.

    Args:
        path: Relative path to check

    Returns:
        True if artifact exists, False otherwise
    """
    full_path = self.base_path / path
    return full_path.exists()

get_metadata(path: str) -> dict | None

Get metadata for an artifact.

Parameters:

Name Type Description Default
path str

Relative path to the artifact

required

Returns:

Type Description
dict | None

Metadata dictionary or None if no metadata exists

Source code in flowyml/storage/artifacts.py
def get_metadata(self, path: str) -> dict | None:
    """Get metadata for an artifact.

    Args:
        path: Relative path to the artifact

    Returns:
        Metadata dictionary or None if no metadata exists
    """
    full_path = self.base_path / path
    metadata_path = full_path.with_suffix(".meta.json")

    if not metadata_path.exists():
        return None

    import json

    with open(metadata_path) as f:
        return json.load(f)

list_artifacts(prefix: str = '') -> list[str]

List all artifacts with optional prefix.

Parameters:

Name Type Description Default
prefix str

Optional prefix filter

''

Returns:

Type Description
list[str]

List of artifact paths

Source code in flowyml/storage/artifacts.py
def list_artifacts(self, prefix: str = "") -> list[str]:
    """List all artifacts with optional prefix.

    Args:
        prefix: Optional prefix filter

    Returns:
        List of artifact paths
    """
    search_path = self.base_path / prefix if prefix else self.base_path

    if not search_path.exists():
        return []

    artifacts = []
    for item in search_path.rglob("*"):
        if item.is_file() and not item.name.endswith(".meta.json"):
            rel_path = item.relative_to(self.base_path)
            artifacts.append(str(rel_path))

    return sorted(artifacts)

load(path: str) -> Any

Load artifact from local filesystem.

Parameters:

Name Type Description Default
path str

Relative or absolute path to the artifact

required

Returns:

Type Description
Any

The loaded artifact

Source code in flowyml/storage/artifacts.py
def load(self, path: str) -> Any:
    """Load artifact from local filesystem.

    Args:
        path: Relative or absolute path to the artifact

    Returns:
        The loaded artifact
    """
    full_path = Path(path) if Path(path).is_absolute() else self.base_path / path

    if not full_path.exists():
        raise FileNotFoundError(f"Artifact not found at {full_path}")

    with open(full_path, "rb") as f:
        return pickle.load(f)

materialize(obj: Any, name: str, run_id: str, step_name: str, project_name: str = 'default') -> str

Materialize artifact to structured storage.

Source code in flowyml/storage/artifacts.py
def materialize(
    self,
    obj: Any,
    name: str,
    run_id: str,
    step_name: str,
    project_name: str = "default",
) -> str:
    """Materialize artifact to structured storage."""
    from datetime import datetime
    from flowyml.storage.materializers.base import get_materializer
    import shutil
    import cloudpickle
    import json

    date_str = datetime.now().strftime("%Y-%m-%d")
    # Structure: project / date / run_id / data / step / name
    rel_path = Path(project_name) / date_str / run_id / "data" / step_name / name
    full_path = self.base_path / rel_path

    # Clean up if exists
    if full_path.exists():
        if full_path.is_dir():
            shutil.rmtree(full_path)
        else:
            full_path.unlink()

    full_path.mkdir(parents=True, exist_ok=True)

    materializer = get_materializer(obj)
    if materializer:
        materializer.save(obj, full_path)
    else:
        # Fallback to cloudpickle (more robust than pickle)
        fallback_file = full_path / "data.pkl"
        with open(fallback_file, "wb") as f:
            cloudpickle.dump(obj, f)
        # Save metadata
        with open(full_path / "metadata.json", "w") as f:
            json.dump(
                {
                    "type": type(obj).__name__,
                    "serializer": "cloudpickle",
                    "format": "pickle",
                    "file": fallback_file.name,
                },
                f,
                indent=2,
            )

    return str(full_path)

save(artifact: Any, path: str, metadata: dict | None = None) -> str

Save artifact to local filesystem.

Parameters:

Name Type Description Default
artifact Any

The artifact to save

required
path str

Relative path for the artifact

required
metadata dict | None

Optional metadata dictionary

None

Returns:

Type Description
str

Full path where artifact was saved

Source code in flowyml/storage/artifacts.py
def save(self, artifact: Any, path: str, metadata: dict | None = None) -> str:
    """Save artifact to local filesystem.

    Args:
        artifact: The artifact to save
        path: Relative path for the artifact
        metadata: Optional metadata dictionary

    Returns:
        Full path where artifact was saved
    """
    full_path = self.base_path / path
    full_path.parent.mkdir(parents=True, exist_ok=True)

    # Save artifact using pickle by default
    with open(full_path, "wb") as f:
        pickle.dump(artifact, f)

    # Save metadata if provided
    if metadata:
        metadata_path = full_path.with_suffix(".meta.json")
        import json

        with open(metadata_path, "w") as f:
            json.dump(metadata, f, indent=2)

    return str(full_path)

size(path: str) -> int

Get size of artifact in bytes.

Parameters:

Name Type Description Default
path str

Relative path to the artifact

required

Returns:

Type Description
int

Size in bytes

Source code in flowyml/storage/artifacts.py
def size(self, path: str) -> int:
    """Get size of artifact in bytes.

    Args:
        path: Relative path to the artifact

    Returns:
        Size in bytes
    """
    full_path = self.base_path / path
    if full_path.exists():
        return full_path.stat().st_size
    return 0

GCS Artifact Store

Bases: ArtifactStore

Google Cloud Storage artifact store.

Stores pipeline artifacts in Google Cloud Storage buckets.

Example
1
2
3
from flowyml.stacks.gcp import GCSArtifactStore

artifact_store = GCSArtifactStore(bucket_name="my-flowyml-artifacts", project_id="my-gcp-project")

Initialize GCS artifact store.

Parameters:

Name Type Description Default
name str

Name of the artifact store

'gcs'
bucket_name str | None

GCS bucket name

None
project_id str | None

GCP project ID

None
prefix str

Prefix for all artifacts in bucket

'flowyml'
Source code in flowyml/stacks/gcp.py
def __init__(
    self,
    name: str = "gcs",
    bucket_name: str | None = None,
    project_id: str | None = None,
    prefix: str = "flowyml",
):
    """Initialize GCS artifact store.

    Args:
        name: Name of the artifact store
        bucket_name: GCS bucket name
        project_id: GCP project ID
        prefix: Prefix for all artifacts in bucket
    """
    super().__init__(name)
    self.bucket_name = bucket_name
    self.project_id = project_id
    self.prefix = prefix

Functions

exists(path: str) -> bool

Check if artifact exists in GCS.

Source code in flowyml/stacks/gcp.py
def exists(self, path: str) -> bool:
    """Check if artifact exists in GCS."""
    from google.cloud import storage

    client = storage.Client(project=self.project_id)
    bucket = client.bucket(self.bucket_name)

    full_path = f"{self.prefix}/{path}"
    blob = bucket.blob(full_path)

    return blob.exists()

load(path: str) -> Any

Load artifact from GCS.

Source code in flowyml/stacks/gcp.py
def load(self, path: str) -> Any:
    """Load artifact from GCS."""
    from google.cloud import storage
    import pickle

    client = storage.Client(project=self.project_id)
    bucket = client.bucket(self.bucket_name)

    # Handle both full gs:// URIs and relative paths
    if path.startswith("gs://"):
        # Extract bucket and path from URI
        parts = path.replace("gs://", "").split("/", 1)
        blob_path = parts[1] if len(parts) > 1 else ""
    else:
        blob_path = f"{self.prefix}/{path}"

    blob = bucket.blob(blob_path)
    data = blob.download_as_bytes()

    return pickle.loads(data)

save(artifact: Any, path: str) -> str

Save artifact to GCS.

Source code in flowyml/stacks/gcp.py
def save(self, artifact: Any, path: str) -> str:
    """Save artifact to GCS."""
    from google.cloud import storage
    import pickle

    client = storage.Client(project=self.project_id)
    bucket = client.bucket(self.bucket_name)

    # Full path with prefix
    full_path = f"{self.prefix}/{path}"
    blob = bucket.blob(full_path)

    # Serialize and upload
    data = pickle.dumps(artifact)
    blob.upload_from_string(data)

    return f"gs://{self.bucket_name}/{full_path}"

to_dict() -> dict[str, Any]

Convert to dictionary.

Source code in flowyml/stacks/gcp.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "name": self.name,
        "type": "gcs",
        "bucket_name": self.bucket_name,
        "project_id": self.project_id,
        "prefix": self.prefix,
    }

validate() -> bool

Validate GCS configuration.

Source code in flowyml/stacks/gcp.py
def validate(self) -> bool:
    """Validate GCS configuration."""
    if not self.bucket_name:
        raise ValueError("bucket_name is required for GCSArtifactStore")

    # Check if google-cloud-storage is installed
    import importlib.util

    if importlib.util.find_spec("google.cloud.storage") is not None:
        return True
    raise ImportError(
        "google-cloud-storage is required for GCSArtifactStore. Install with: pip install google-cloud-storage",
    )