Outputs¶
Outputs declare the files a stage writes. They form the other half of the DAG — every output path can become a dependency for downstream stages.
The return-type pattern¶
Outputs are declared in the stage function's return type, not its parameters.
For multiple outputs, use a TypedDict where each field is annotated with an
output spec:
from typing import Annotated, TypedDict
from pandas import DataFrame
import pivot
class TrainOutputs(TypedDict):
predictions: Annotated[DataFrame, pivot.Out("predictions.csv", pivot.loaders.CSV())]
metrics: Annotated[dict, pivot.Metric("metrics.json")]
def train(
data: Annotated[DataFrame, pivot.Dep("features.csv", pivot.loaders.CSV())],
) -> TrainOutputs:
model = fit(data)
return {
"predictions": model.predict(data),
"metrics": {"accuracy": 0.95, "f1": 0.91},
}
The stage returns a plain dict matching the TypedDict keys. Pivot serializes each value to its declared path using the specified loader.
Single-output shorthand¶
When a stage produces exactly one file, skip the TypedDict and annotate the return type directly:
def clean(
raw: Annotated[DataFrame, pivot.Dep("raw.csv", pivot.loaders.CSV())],
) -> Annotated[DataFrame, pivot.Out("clean.csv", pivot.loaders.CSV())]:
return raw.dropna()
The stage returns the value directly (not wrapped in a dict). Pivot saves it to the declared path.
Output types at a glance¶
| Type | Syntax | Cached | Use case |
|---|---|---|---|
Out[W] |
Out(path, writer, cache=True) |
Yes | Standard file output |
Metric |
Metric(path) |
No | JSON metrics (git-tracked) |
Plot[W] |
Plot(path, writer) |
Yes | Visualization files |
IncrementalOut[W,R] |
IncrementalOut(path, loader) |
Yes | State carried across runs |
DirectoryOut[T] |
DirectoryOut(path + "/", writer) |
Yes | Dynamic file sets |
All output types are frozen dataclasses. They are immutable, picklable, and their code is fingerprinted for change detection.
Out — standard file output¶
The workhorse output type. Writes a single file (or multiple files) using a Writer:
class Results(TypedDict):
report: Annotated[dict, pivot.Out("report.json", pivot.loaders.JSON())]
data: Annotated[DataFrame, pivot.Out("processed.csv", pivot.loaders.CSV())]
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str \| list[str] \| tuple[str, ...] |
required | Output file path(s) |
loader |
Writer[W] |
required | Serializer for the data (named loader for consistency with Dep; accepts any Writer) |
cache |
bool |
True |
Whether to cache in .pivot/cache |
Multi-file outputs¶
A single output key can write multiple files:
class ShardOutputs(TypedDict):
shards: Annotated[list[dict], pivot.Out(["shard_a.json", "shard_b.json"], pivot.loaders.JSON())]
def split(...) -> ShardOutputs:
return {"shards": [data_a, data_b]} # list matches path list
The return value must be a list/tuple with the same length as the path list. Each element is written to the corresponding path.
Disabling caching¶
Set cache=False to skip caching for outputs that shouldn't be stored
(e.g., temporary files or files you want to always regenerate):
Metric — git-tracked JSON metrics¶
Metric is a specialized Out for small JSON values that should be
version-controlled rather than cached:
Defaults that differ from Out:
| Parameter | Out default |
Metric default |
|---|---|---|
loader |
required | JSON() (automatic) |
cache |
True |
False |
Metrics are not cached by default because they're small and belong in git for
tracking experiment history. The loader defaults to JSON() — you typically
don't need to specify it.
Metric accepts any JSON-serializable value: dicts, lists, strings, numbers,
booleans, or None.
Plot — visualization output¶
Plot extends Out with optional metadata for visualization tools:
import pathlib
from matplotlib.figure import Figure
import pivot
class TrainOutputs(TypedDict):
# Automatic: Pivot saves the figure and closes it
loss_curve: Annotated[Figure, pivot.Plot("plots/loss.png", pivot.loaders.MatplotlibFigure())]
# Manual: stage creates the file, Pivot just tracks it
roc_plot: Annotated[pathlib.Path, pivot.Plot("plots/roc.png", pivot.loaders.PathOnly())]
Extra parameters (beyond Out):
| Parameter | Type | Default | Description |
|---|---|---|---|
x |
str \| None |
None |
X-axis column name (for structured plot data) |
y |
str \| None |
None |
Y-axis column name (for structured plot data) |
template |
str \| None |
None |
Plot template identifier |
MatplotlibFigure() is a write-only loader that calls fig.savefig() and then
plt.close(fig) to prevent memory leaks. Format is inferred from the file
extension (.png, .pdf, .svg).
IncrementalOut — state across runs¶
IncrementalOut is for outputs that accumulate state across runs. Before the
stage executes, Pivot restores the previous output from cache so the stage can
read, modify, and write it back.
from typing import Annotated, TypedDict
import pivot
class CacheOutputs(TypedDict):
cache: Annotated[dict, pivot.IncrementalOut("cache.json", pivot.loaders.JSON())]
def update_cache(
cache: Annotated[dict, pivot.IncrementalOut("cache.json", pivot.loaders.JSON())],
new_data: Annotated[DataFrame, pivot.Dep("new_data.csv", pivot.loaders.CSV())],
) -> CacheOutputs:
# `cache` contains the previous run's output (or {} on first run)
cache["latest"] = process(new_data)
return {"cache": cache}
How it works¶
- Before execution, Pivot checks for a cached previous output
- If found, restores it and injects as the parameter value
- If not found (first run), uses the loader's
empty()method (e.g.,{}forJSON, emptyDataFrameforCSV) - Stage reads, modifies, and returns the updated value
- Pivot writes and re-caches the result
Rules¶
- The
IncrementalOutmust appear on both a parameter and a return field - Path and loader must match between input and output
- For TypedDict returns, the parameter name must match the return field name
- For single-output stages, only one
IncrementalOutparameter is allowed - The input does not create a DAG edge (it's self-referential)
- Requires a
Loader(not justReaderorWriter) since it reads and writes
DirectoryOut — dynamic file sets¶
When the number or names of output files aren't known until runtime, use
DirectoryOut:
from typing import Annotated, TypedDict
import pivot
class TaskMetrics(TypedDict):
accuracy: float
loss: float
class ProcessOutputs(TypedDict):
results: Annotated[dict[str, TaskMetrics], pivot.DirectoryOut("metrics/tasks/", pivot.loaders.YAML())]
def process_tasks(...) -> ProcessOutputs:
return {
"results": {
"task_a.yaml": TaskMetrics(accuracy=0.95, loss=0.12),
"task_b.yaml": TaskMetrics(accuracy=0.87, loss=0.31),
}
}
How it works:
- Path must end with
/(enforced at construction time) - Return value is
dict[str, T]where keys are relative paths within the directory and values are the data to serialize - Each key must include a file extension
- Keys are validated: no absolute paths, no
..traversal, no empty names - Duplicate keys after normalization are rejected
- Case collisions are detected (for cross-platform safety)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str (ending with /) |
required | Output directory |
loader |
Writer[T] |
required | Serializer for each file |
cache |
bool |
True |
Whether to cache files |
Each file in the directory is cached individually by content hash. Downstream
stages can depend on the directory (Dep("metrics/tasks/")) or on specific
files within it (Dep("metrics/tasks/task_a.yaml")).
Overriding output paths¶
Like dependencies, output paths can be overridden at registration time:
# Simple path override
pipeline.register(train, out_path_overrides={
"predictions": "v2/predictions.csv",
})
# Override with options
pipeline.register(train, out_path_overrides={
"predictions": {"path": "v2/predictions.csv", "cache": False},
})
For single-output stages, the override key can be any string (there's only one output to override):
IncrementalOut paths cannot be overridden (input and output paths must match).
Decision tree: which output type?¶
Is this a JSON metric you want in git?
└─ Yes → Metric("path.json")
Is this a visualization?
└─ Yes → Plot("path.png", MatplotlibFigure())
Does this output carry state between runs?
└─ Yes → IncrementalOut("path", loader)
Are the output files determined at runtime?
└─ Yes → DirectoryOut("dir/", writer)
Everything else:
└─ Out("path", writer)
How Pivot writes outputs¶
After a stage returns, Pivot:
- Validates all TypedDict keys are present in the return value
- Resolves each output path relative to the project root
- Creates parent directories as needed
- Writes each file sequentially using the output's loader
- Hashes the written files for cache storage
- Updates the stage's lock file with output hashes
Validation (step 1) happens before any writes begin, so missing keys or type mismatches are caught early. However, there is no rollback — if the third of five writes fails, the first two are already on disk.
Multi-file paths and expansion¶
Internally, Pivot expands multi-file outputs (Out(["a.csv", "b.csv"], ...))
into individual single-path output specs for DAG construction and caching. Each
file is tracked and cached independently. This means fine-grained cache hits —
if only one file in a multi-file output changes, only that file is re-cached.
Summary¶
| Pattern | When to use |
|---|---|
TypedDict with Out fields |
Multiple outputs from one stage |
Annotated[T, Out(...)] return |
Single output |
Metric("path.json") |
Small JSON values for git tracking |
Plot("path.png", writer) |
Visualizations |
IncrementalOut("path", loader) |
State that accumulates across runs |
DirectoryOut("dir/", writer) |
Runtime-determined file sets |
cache=False |
Outputs that shouldn't be cached |
See also: Artifacts & the DAG | Dependencies | Loaders