Agent Server¶
The Agent Server provides a JSON-RPC interface for external tools to interact with Pivot's watch mode.
Overview¶
When running in watch mode with the TUI, Pivot starts an Agent Server that exposes pipeline operations over a Unix socket. This enables IDE integrations, external automation tools, and custom interfaces to control pipeline execution.
Protocol¶
Transport: Unix domain socket (path provided by the TUI when starting the server)
Protocol: JSON-RPC 2.0
Security: Socket permissions 0o600 (owner-only read/write)
Example request:
Example response:
{
"jsonrpc": "2.0",
"result": {"run_id": "abc123def456", "status": "started", "stages_queued": ["train"]},
"id": 1
}
Available Methods¶
| Method | Type | Params | Description |
|---|---|---|---|
run |
Command | stages?, force? |
Queue execution request |
cancel |
Command | - | Request cancellation of current execution |
set_on_error |
Command | mode |
Set error handling ("fail" or "keep_going") |
status |
Query | - | Get current execution state |
stages |
Query | - | List all registered stages |
stage_info |
Query | stage |
Get detailed info for a specific stage |
explain |
Query | stage |
Get explanation of why a stage will run |
events_since |
Query | version |
Poll for events since a version cursor |
commit |
Query | - | Commit pending output changes |
diff_output |
Query | path, old_hash?, new_hash?, max_rows? |
Get diff of an output file |
run¶
Queue stages for execution.
Parameters:
stages(optional): List of stage names to run. If omitted, runs all stages.force(optional): If true, ignore cache and force re-execution.
Returns:
run_id: Unique identifier for this execution (12-character UUID prefix)status: Always"started"on successstages_queued: List of stage names that will execute
Example:
// Request
{"jsonrpc": "2.0", "method": "run", "params": {"stages": ["preprocess", "train"]}, "id": 1}
// Response
{"jsonrpc": "2.0", "result": {"run_id": "abc123def456", "status": "started", "stages_queued": ["preprocess", "train"]}, "id": 1}
status¶
Get the current state of the pipeline or a specific run.
Parameters:
run_id(optional): Specific run to query. If omitted, returns current state.
Returns:
state: Current state (idle,watching,running,completed,failed)run_id: Current or specified run ID (when applicable)stages_completed: List of completed stage namesstages_pending: List of pending stage namesran,skipped,failed: Counts of stage outcomes
Example:
// Request
{"jsonrpc": "2.0", "method": "status", "id": 2}
// Response
{
"jsonrpc": "2.0",
"result": {
"state": "running",
"run_id": "abc123def456",
"stages_completed": ["preprocess"],
"stages_pending": ["train", "evaluate"],
"ran": 1,
"skipped": 0,
"failed": 0
},
"id": 2
}
stages¶
List all registered stages and their metadata.
Parameters: None
Returns:
stages: List of stage objects withname,deps, andouts
Example:
// Request
{"jsonrpc": "2.0", "method": "stages", "id": 3}
// Response
{
"jsonrpc": "2.0",
"result": {
"stages": [
{"name": "preprocess", "deps": ["data/raw.csv"], "outs": ["data/clean.csv"]},
{"name": "train", "deps": ["data/clean.csv"], "outs": ["models/model.pkl"]}
]
},
"id": 3
}
cancel¶
Request cancellation of current execution.
Parameters: None
Returns:
cancelled:trueif a running execution was cancelled,falseif nothing was running
When called during an active execution, sets a threading.Event that the executor checks between stages. Cancellation is stage-level granularity: the currently running stage completes normally (ensuring outputs are in a consistent state), but no new stages are started. Pending stages are marked as skipped with reason "cancelled".
State Machine¶
The Agent Server tracks pipeline state through these transitions:
| State | Description |
|---|---|
IDLE |
Server started, not yet watching |
WATCHING |
Monitoring for file changes |
RUNNING |
Executing stages |
COMPLETED |
Last run succeeded |
FAILED |
Last run had failures |
After COMPLETED or FAILED, the state returns to WATCHING to await further changes or explicit run requests.
Thread Safety¶
The Engine uses locks to protect atomic state transitions. When a run request arrives during execution, it returns an error (-32001 EXECUTION_IN_PROGRESS) rather than queuing.
Error Codes¶
| Code | Name | Description |
|---|---|---|
-32700 |
Parse error | Invalid JSON |
-32600 |
Invalid request | Request structure invalid |
-32601 |
Method not found | Unknown RPC method |
-32602 |
Invalid params | Method parameters invalid |
-32603 |
Internal error | Server-side exception |
-32001 |
Execution in progress | A run is already executing |
-32002 |
Stage not found | Requested stage doesn't exist (includes suggestions for typos) |
Client Example¶
Using Python with raw sockets:
import socket
import json
def call_agent(method: str, params: dict | None = None) -> dict:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(".pivot/agent.sock")
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": 1
}
sock.sendall(json.dumps(request).encode() + b"\n")
response = sock.recv(4096).decode()
sock.close()
return json.loads(response)
# Trigger a run
result = call_agent("run", {"stages": ["train"], "force": True})
print(f"Run started: {result['result']['stages_queued']}")
# Check status
status = call_agent("status")
print(f"State: {status['result']['state']}")
Limitations¶
-
Cancellation granularity: Cancellation takes effect between stages, not mid-stage. The currently running stage will complete before cancellation is observed.
-
Single client: The server handles one request at a time. Concurrent requests are serialized.
-
Local only: Unix sockets are local-machine only. For remote access, use SSH tunneling or a proxy.
See Also¶
- Engine Architecture - Execution engine architecture
- Watch Mode - Watch mode and file monitoring
- TUI Architecture - Terminal UI communication