Adding Loaders¶
Guide for adding new data loaders to Pivot.
Loader Architecture¶
Pivot provides three base classes for data I/O, allowing you to implement exactly the capabilities you need:
@dataclasses.dataclass(frozen=True)
class Reader[R](ABC):
"""Read-only - can load data from a file.
Use for dependencies where you only need to read, not write.
"""
@abstractmethod
def load(self, path: pathlib.Path) -> R:
"""Load data from file path."""
...
@dataclasses.dataclass(frozen=True)
class Writer[W](ABC):
"""Write-only - can save data to a file.
Use for outputs where you only need to write, not read.
"""
@abstractmethod
def save(self, data: W, path: pathlib.Path) -> None:
"""Save data to file path."""
...
@dataclasses.dataclass(frozen=True)
class Loader[W, R = W](Writer[W], Reader[R], ABC):
"""Bidirectional - can both save and load data.
W is the write type (what save() accepts).
R is the read type (what load() returns).
For symmetric loaders where W == R, use a single type parameter: Loader[T].
"""
...
When to Use Each Base Class¶
| Base Class | Use Case | Example |
|---|---|---|
Reader[R] |
Dependencies only (read from disk) | Reading config files, external data |
Writer[W] |
Outputs only (write to disk) | MatplotlibFigure - images can't be read back as Figures |
Loader[T] |
Both directions (most common) | CSV, JSON, Pickle - symmetric read/write |
Loader[W, R] |
Asymmetric bidirectional | Write one type, read back a different type |
Note: Loader[T] uses PEP 696 type parameter defaults, so Loader[T] is equivalent to Loader[T, T].
Creating a Custom Loader¶
Step 1: Choose the Right Base Class¶
Reader[R]- Use when you only need to read data (e.g., forDep()only)Writer[W]- Use when you only need to write data (e.g., forOut()only, like plots)Loader[T]- Use when you need both read and write (most common, required forIncrementalOut)
Step 2: Define the Loader Class¶
Create a frozen dataclass that extends the appropriate base class:
# src/pivot/loaders.py (or your own module)
import dataclasses
import pathlib
import pandas
from pivot.loaders import Loader
# Bidirectional loader (most common) - Loader[T] defaults R=W via PEP 696
@dataclasses.dataclass(frozen=True)
class Parquet(Loader[pandas.DataFrame]):
"""Parquet file loader."""
compression: str = "snappy"
engine: str = "auto"
def load(self, path: pathlib.Path) -> pandas.DataFrame:
return pandas.read_parquet(path, engine=self.engine)
def save(self, data: pandas.DataFrame, path: pathlib.Path) -> None:
data.to_parquet(path, compression=self.compression, engine=self.engine)
Here are examples using each base class:
from pivot.loaders import Reader, Writer, Loader
# Reader-only: for dependencies you only read
@dataclasses.dataclass(frozen=True)
class ConfigReader(Reader[dict]):
"""Read-only config loader."""
def load(self, path: pathlib.Path) -> dict:
with open(path) as f:
return json.load(f)
# Writer-only: for outputs you only write (like plots)
@dataclasses.dataclass(frozen=True)
class PlotWriter(Writer[Figure]):
"""Write-only plot saver."""
dpi: int = 150
def save(self, data: Figure, path: pathlib.Path) -> None:
data.savefig(path, dpi=self.dpi)
# Bidirectional: for read/write (required for IncrementalOut)
@dataclasses.dataclass(frozen=True)
class Parquet(Loader[pandas.DataFrame]):
"""Parquet file loader - Loader[T] is shorthand for Loader[T, T]."""
...
Step 3: Add Type Hints¶
Ensure the loader is properly typed:
@dataclasses.dataclass(frozen=True)
class Parquet(Loader[pandas.DataFrame]):
"""Parquet file loader."""
compression: str = "snappy"
def load(self, path: pathlib.Path) -> pandas.DataFrame:
return pandas.read_parquet(path)
def save(self, data: pandas.DataFrame, path: pathlib.Path) -> None:
data.to_parquet(path, compression=self.compression)
Step 4: Add Tests¶
# tests/unit/test_loaders.py
import pathlib
import pandas
import pytest
from pivot.loaders import Parquet
def test_parquet_roundtrip(tmp_path: pathlib.Path) -> None:
"""Test Parquet loader can save and load data."""
loader = Parquet()
path = tmp_path / "test.parquet"
# Create test data
original = pandas.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})
# Save
loader.save(original, path)
assert path.exists()
# Load
loaded = loader.load(path)
# Verify
pandas.testing.assert_frame_equal(original, loaded)
def test_parquet_compression(tmp_path: pathlib.Path) -> None:
"""Test Parquet loader respects compression option."""
loader = Parquet(compression="gzip")
path = tmp_path / "test.parquet"
data = pandas.DataFrame({"a": [1, 2, 3]})
loader.save(data, path)
# Verify compression was used (file size would differ)
assert path.exists()
Requirements¶
Custom loaders must be:
1. Immutable¶
Use @dataclasses.dataclass(frozen=True):
# Good - frozen
@dataclasses.dataclass(frozen=True)
class MyLoader(Loader[T]):
option: str = "default"
# Bad - mutable
class MyLoader(Loader[T]):
def __init__(self, option: str = "default"):
self.option = option # Mutable!
2. Module-Level¶
Define loaders at module level for pickling to worker processes:
# Good - module level
@dataclasses.dataclass(frozen=True)
class MyLoader(Loader[T]):
...
# Bad - nested in function
def create_loader():
@dataclasses.dataclass(frozen=True)
class MyLoader(Loader[T]): # Can't be pickled!
...
3. Fingerprinted¶
Loader code is automatically fingerprinted. Changes to loader code trigger stage re-runs.
Options are part of the fingerprint:
Usage Example¶
from typing import Annotated, TypedDict
import pandas
from pivot import outputs
from myproject.loaders import Parquet
class ProcessOutputs(TypedDict):
data: Annotated[pandas.DataFrame, outputs.Out("data.parquet", Parquet())]
def process(
raw: Annotated[pandas.DataFrame, outputs.Dep("raw.parquet", Parquet())],
) -> ProcessOutputs:
return {"data": raw.dropna()}
Adding to Built-in Loaders¶
To add a loader to Pivot's built-in set:
- Add to
src/pivot/loaders.py - Export in
__all__ - Add documentation to
docs/concepts/dependencies.md - Add tests
Checklist¶
- [ ] Uses
@dataclasses.dataclass(frozen=True) - [ ] Defined at module level
- [ ] Extends the appropriate base class (
Reader,Writer, orLoader) - [ ] Implements required methods (
load()for Reader,save()for Writer, both for Loader) - [ ] Has proper type hints
- [ ] Has unit tests for roundtrip (for bidirectional loaders)
- [ ] Has tests for all options
- [ ] Documented in README or docs (if adding to core)
See Also¶
- Dependencies - Using dependencies and loaders
- Code Style - Coding conventions