Audit pipeio run tools — datalad run migration design¶
1. Current Implementation Audit¶
mcp_run (mcp.py:3147)¶
Launches Snakemake in a detached screen session with custom state tracking.
Mechanism:
- Resolves flow from registry → gets code_path and Snakefile location
- Generates a UUID-based run_id and timestamp-based log path under <flow_dir>/.snakemake/log/
- Builds a bare snakemake --snakefile <path> --cores N command (no snakebids run.py)
- Wraps in screen -dmS pipeio-<pipe>-<flow>-<id> bash -c "... 2>&1 | tee <log>"
- Records run state to .pipeio/runs.json: id, pipe, flow, screen name, log path, status, timestamp
Problems:
1. Bypasses snakebids run.py — misses analysis_level routing, participant filtering, BIDS validation
2. No provenance — runs leave no structured record in git history
3. Custom state file (runs.json) duplicates what git commits would provide
4. Screen sessions are fragile — no recovery after reboot, no structured exit handling
5. Log path is inside .snakemake/ which is gitignored — logs are ephemeral
mcp_run_status (mcp.py:3254)¶
Checks run state by combining runs.json with screen -ls output.
Mechanism:
- Loads runs.json, filters by run_id/pipe/flow
- Checks if screen session name appears in screen -ls output
- Parses last 2000 chars of log file for: EXIT_CODE=N, progress (N of M steps), Error in rule X
- Updates status: running → finished (if screen gone), ok/error (if exit code found)
Problems:
1. Status is inferred heuristically from log tail — fragile regex parsing
2. Screen session check is name-based string match — not robust
3. No persistent status — if runs.json is lost, all history is gone
mcp_run_dashboard (mcp.py:3343)¶
Aggregates run stats by calling mcp_run_status and bucketing by flow.
Mechanism:
- Calls mcp_run_status(root) to refresh all run states
- Groups by pipe/flow, counts active/completed/failed
Problems:
1. Entirely dependent on runs.json — no historical data beyond current state file
2. No way to see runs from before the state file was created
mcp_run_kill (mcp.py:3388)¶
Kills a screen session by run_id.
Mechanism:
- Looks up screen name in runs.json
- Runs screen -S <name> -X quit
- Updates status to "killed" in runs.json
Problems:
1. screen -X quit may not cleanly stop Snakemake (no SIGTERM → SIGKILL escalation)
2. Snakemake's own lock files (.snakemake/locks/) may be left behind
2. Existing datalad infrastructure (projio/mcp/datalad.py)¶
Available tools: datalad_save, datalad_status, datalad_push, datalad_pull, datalad_siblings, git_status.
Key patterns:
- All commands resolve via _resolve_datalad_cmd() which reads DATALAD from Makefile vars
- Conda wrapping: if the binary is in a conda env, auto-wraps with conda run -n <env>
- Subdataset support via _resolve_dataset(root, dataset) → -C <path> prefix
- Standard _run() helper: captures stdout/stderr, 120s timeout, structured return
Missing: No datalad_run tool exists yet. This is the key gap.
3. How datalad run works¶
Command structure¶
datalad run \
--input raw/sub-01 \
--output derivatives/preprocess/sub-01 \
--message "preprocess sub-01" \
--explicit \
-- python run.py <flow_dir> participant --participant_label 01
Key semantics¶
--input: files todatalad getbefore execution (ensures data availability)--output: files to unlock before execution (allows writing to annexed files)--explicit: only save--outputpaths (don't auto-detect changes) — critical for clean provenance--message: commit message for the run record- The command after
--is recorded verbatim in the commit metadata
Run records¶
- Stored as git commits with structured metadata in
.datalad/configand commit message - Machine-readable:
git log --format=%H+datalad rerun --reportto inspect datalad rerun <commit>re-executes the recorded command with the same inputs/outputs
Interaction with Snakemake¶
- Snakemake manages its own job graph and parallelism
datalad runwraps the entire Snakemake invocation as one atomic operation- The
--outputmust cover the derivative directory (can be a directory path) - Snakemake locks (
.snakemake/locks/) should be excluded from tracking
4. New pipeio_run interface design¶
Signature¶
def pipeio_run(
pipe: str,
flow: str = "",
analysis_level: str = "participant",
participant_label: str = "",
cores: int = 1,
dryrun: bool = False,
extra_args: list[str] | None = None,
) -> dict:
Implementation plan¶
- Resolve flow from registry → get
code_path,config_path - Read contract data from
config.yml: input_dir→--inputfor datalad runoutput_dir(derivative dir) →--outputfor datalad run- If
pybids_inputshas specific paths, use those for finer--inputdeclarations - Build snakebids command:
python run.py <flow_dir> <analysis_level> --participant_label <label> --cores <N> - Wrap with datalad run:
datalad run \ --input <input_dir> \ --output <output_dir> \ --message "pipeio: <pipe>/<flow> <analysis_level> [<participant>]" \ --explicit \ -- python run.py <flow_dir> <analysis_level> [--participant_label <label>] [--cores <N>] - Return: commit hash from the resulting datalad save, derivative dir, flow metadata
Dry run support¶
dryrun=True→ append-nto the snakemake args (viaextra_argsor snakebids passthrough)- Could also use
datalad runwith--dry-run(though datalad's dry-run is about the run record, not snakemake's)
Background execution¶
Open question. Options:
- (a) Synchronous only — simplest, the MCP call blocks until done. Works for short runs but not hours-long jobs.
- (b) Screen + datalad run — wrap datalad run ... in screen. But then provenance depends on the screen session completing cleanly.
- (c) Worklog queue — use projio's existing enqueue_task mechanism. The task note contains the run parameters; the queue runner executes datalad run. This integrates with existing notification infrastructure.
Recommendation: Option (c) for long runs, option (a) for dry runs and short participant-level runs. The MCP tool should be synchronous; background execution is a worklog concern.
5. Replacements for run_status / dashboard / kill¶
Status → git log of datalad run records¶
def pipeio_run_history(
pipe: str = "",
flow: str = "",
limit: int = 20,
) -> dict:
- Parse
git log --format=...for commits withdatalad runmetadata - Filter by flow/pipe using commit message conventions
- Extract: commit hash, timestamp, command, inputs, outputs, exit status
- Optionally augment with
snakemake --summaryfor detailed rule status
Dashboard → aggregate from git history¶
def pipeio_run_summary(pipe: str = "") -> dict:
- Count run records per flow from git history
- Show last run date, status (success/fail based on whether a revert commit follows)
- No custom state file needed — git is the source of truth
Kill → process management¶
If background execution is via screen: - Same mechanism as current (find and kill screen session) - But this is now a generic concern, not pipeio-specific
If via worklog queue:
- Use worklog_run_kill(queue_id) — already exists
Recommendation: Remove pipeio_run_kill entirely. Background job management belongs to the worklog/queue layer.
6. Blockers and open questions¶
Does the derivative dir need to be a DataLad subdataset?¶
Probably not for v1. datalad run --output derivatives/<flow>/ works on tracked directories within the main dataset. Making derivatives a subdataset would add complexity (separate save/push) but enable independent versioning. Defer to v2.
How does datalad run interact with Snakemake's job management?¶
- Snakemake manages internal parallelism (
--cores N) and its own job scheduler datalad runwraps the entire invocation as one command — it doesn't know about individual Snakemake jobs- Snakemake's temp files (
.snakemake/) should be in.gitignoreto avoid polluting run records - Lock files: If a previous run was interrupted,
.snakemake/locks/may block. The new tool should detect this and offer--unlockpassthrough.
What happens with long-running jobs (hours/days)?¶
- Synchronous
datalad runblocks the calling process — fine for queue-based execution but not for interactive MCP calls - Solution: For interactive use, support
dryrun=Trueto preview the command. For actual execution, route through the worklog queue which handles timeouts, notifications, and session management. - The datalad run command itself is recorded — if interrupted,
datalad reruncan re-execute.
How to handle partial runs / restarts?¶
- Snakemake natively handles restarts — it checks which outputs exist and only re-runs missing rules
- After an interrupted
datalad run, the working tree may have uncommitted changes in the output dir - Pattern:
datalad save -m "partial run"to checkpoint, then re-run. Ordatalad runagain with the same command — Snakemake's incrementality handles the rest. --explicitflag is important here: only declared outputs are saved, preventing accidental commits of temp files.
conda environment wrapping¶
- The snakebids
run.pylikely needs to run in a specific conda env (e.g.cogpyfor snakemake+snakebids) - The
datalad runcommand itself needs to run via thelabpyenv (for datalad+git-annex) - Pattern:
datalad run -- conda run -n cogpy python run.py ... - This is already the pattern in projio's datalad.py — extend it.
7. Migration plan¶
Phase 1: Add datalad_run MCP tool (projio layer)¶
- Add
datalad_run(inputs, outputs, message, command)tosrc/projio/mcp/datalad.py - Uses same
_resolve_datalad_cmd()and conda wrapping patterns - Generic tool — not pipeio-specific
Phase 2: New pipeio_run using datalad_run¶
- Reads contract data from flow config to compute
--input/--output - Constructs the snakebids command
- Calls the new
datalad_runtool - Returns structured result with commit hash
Phase 3: Deprecate old tools¶
- Mark
pipeio_run,pipeio_run_status,pipeio_run_dashboard,pipeio_run_killas deprecated - Replace
run_status/dashboardwithpipeio_run_history/pipeio_run_summarybacked by git log - Remove screen-based execution entirely
Phase 4: Clean up¶
- Remove
runs.jsonstate file logic - Remove screen dependency
- Update all documentation and agent instructions