Skip to content

Audit pipeio run tools — datalad run migration design

1. Current Implementation Audit

mcp_run (mcp.py:3147)

Launches Snakemake in a detached screen session with custom state tracking.

Mechanism: - Resolves flow from registry → gets code_path and Snakefile location - Generates a UUID-based run_id and timestamp-based log path under <flow_dir>/.snakemake/log/ - Builds a bare snakemake --snakefile <path> --cores N command (no snakebids run.py) - Wraps in screen -dmS pipeio-<pipe>-<flow>-<id> bash -c "... 2>&1 | tee <log>" - Records run state to .pipeio/runs.json: id, pipe, flow, screen name, log path, status, timestamp

Problems: 1. Bypasses snakebids run.py — misses analysis_level routing, participant filtering, BIDS validation 2. No provenance — runs leave no structured record in git history 3. Custom state file (runs.json) duplicates what git commits would provide 4. Screen sessions are fragile — no recovery after reboot, no structured exit handling 5. Log path is inside .snakemake/ which is gitignored — logs are ephemeral

mcp_run_status (mcp.py:3254)

Checks run state by combining runs.json with screen -ls output.

Mechanism: - Loads runs.json, filters by run_id/pipe/flow - Checks if screen session name appears in screen -ls output - Parses last 2000 chars of log file for: EXIT_CODE=N, progress (N of M steps), Error in rule X - Updates status: running → finished (if screen gone), ok/error (if exit code found)

Problems: 1. Status is inferred heuristically from log tail — fragile regex parsing 2. Screen session check is name-based string match — not robust 3. No persistent status — if runs.json is lost, all history is gone

mcp_run_dashboard (mcp.py:3343)

Aggregates run stats by calling mcp_run_status and bucketing by flow.

Mechanism: - Calls mcp_run_status(root) to refresh all run states - Groups by pipe/flow, counts active/completed/failed

Problems: 1. Entirely dependent on runs.json — no historical data beyond current state file 2. No way to see runs from before the state file was created

mcp_run_kill (mcp.py:3388)

Kills a screen session by run_id.

Mechanism: - Looks up screen name in runs.json - Runs screen -S <name> -X quit - Updates status to "killed" in runs.json

Problems: 1. screen -X quit may not cleanly stop Snakemake (no SIGTERM → SIGKILL escalation) 2. Snakemake's own lock files (.snakemake/locks/) may be left behind

2. Existing datalad infrastructure (projio/mcp/datalad.py)

Available tools: datalad_save, datalad_status, datalad_push, datalad_pull, datalad_siblings, git_status.

Key patterns: - All commands resolve via _resolve_datalad_cmd() which reads DATALAD from Makefile vars - Conda wrapping: if the binary is in a conda env, auto-wraps with conda run -n <env> - Subdataset support via _resolve_dataset(root, dataset)-C <path> prefix - Standard _run() helper: captures stdout/stderr, 120s timeout, structured return

Missing: No datalad_run tool exists yet. This is the key gap.

3. How datalad run works

Command structure

datalad run \
  --input raw/sub-01 \
  --output derivatives/preprocess/sub-01 \
  --message "preprocess sub-01" \
  --explicit \
  -- python run.py <flow_dir> participant --participant_label 01

Key semantics

  • --input: files to datalad get before execution (ensures data availability)
  • --output: files to unlock before execution (allows writing to annexed files)
  • --explicit: only save --output paths (don't auto-detect changes) — critical for clean provenance
  • --message: commit message for the run record
  • The command after -- is recorded verbatim in the commit metadata

Run records

  • Stored as git commits with structured metadata in .datalad/config and commit message
  • Machine-readable: git log --format=%H + datalad rerun --report to inspect
  • datalad rerun <commit> re-executes the recorded command with the same inputs/outputs

Interaction with Snakemake

  • Snakemake manages its own job graph and parallelism
  • datalad run wraps the entire Snakemake invocation as one atomic operation
  • The --output must cover the derivative directory (can be a directory path)
  • Snakemake locks (.snakemake/locks/) should be excluded from tracking

4. New pipeio_run interface design

Signature

def pipeio_run(
    pipe: str,
    flow: str = "",
    analysis_level: str = "participant",
    participant_label: str = "",
    cores: int = 1,
    dryrun: bool = False,
    extra_args: list[str] | None = None,
) -> dict:

Implementation plan

  1. Resolve flow from registry → get code_path, config_path
  2. Read contract data from config.yml:
  3. input_dir--input for datalad run
  4. output_dir (derivative dir) → --output for datalad run
  5. If pybids_inputs has specific paths, use those for finer --input declarations
  6. Build snakebids command:
    python run.py <flow_dir> <analysis_level> --participant_label <label> --cores <N>
    
  7. Wrap with datalad run:
    datalad run \
      --input <input_dir> \
      --output <output_dir> \
      --message "pipeio: <pipe>/<flow> <analysis_level> [<participant>]" \
      --explicit \
      -- python run.py <flow_dir> <analysis_level> [--participant_label <label>] [--cores <N>]
    
  8. Return: commit hash from the resulting datalad save, derivative dir, flow metadata

Dry run support

  • dryrun=True → append -n to the snakemake args (via extra_args or snakebids passthrough)
  • Could also use datalad run with --dry-run (though datalad's dry-run is about the run record, not snakemake's)

Background execution

Open question. Options: - (a) Synchronous only — simplest, the MCP call blocks until done. Works for short runs but not hours-long jobs. - (b) Screen + datalad run — wrap datalad run ... in screen. But then provenance depends on the screen session completing cleanly. - (c) Worklog queue — use projio's existing enqueue_task mechanism. The task note contains the run parameters; the queue runner executes datalad run. This integrates with existing notification infrastructure.

Recommendation: Option (c) for long runs, option (a) for dry runs and short participant-level runs. The MCP tool should be synchronous; background execution is a worklog concern.

5. Replacements for run_status / dashboard / kill

Status → git log of datalad run records

def pipeio_run_history(
    pipe: str = "",
    flow: str = "",
    limit: int = 20,
) -> dict:
  • Parse git log --format=... for commits with datalad run metadata
  • Filter by flow/pipe using commit message conventions
  • Extract: commit hash, timestamp, command, inputs, outputs, exit status
  • Optionally augment with snakemake --summary for detailed rule status

Dashboard → aggregate from git history

def pipeio_run_summary(pipe: str = "") -> dict:
  • Count run records per flow from git history
  • Show last run date, status (success/fail based on whether a revert commit follows)
  • No custom state file needed — git is the source of truth

Kill → process management

If background execution is via screen: - Same mechanism as current (find and kill screen session) - But this is now a generic concern, not pipeio-specific

If via worklog queue: - Use worklog_run_kill(queue_id) — already exists

Recommendation: Remove pipeio_run_kill entirely. Background job management belongs to the worklog/queue layer.

6. Blockers and open questions

Does the derivative dir need to be a DataLad subdataset?

Probably not for v1. datalad run --output derivatives/<flow>/ works on tracked directories within the main dataset. Making derivatives a subdataset would add complexity (separate save/push) but enable independent versioning. Defer to v2.

How does datalad run interact with Snakemake's job management?

  • Snakemake manages internal parallelism (--cores N) and its own job scheduler
  • datalad run wraps the entire invocation as one command — it doesn't know about individual Snakemake jobs
  • Snakemake's temp files (.snakemake/) should be in .gitignore to avoid polluting run records
  • Lock files: If a previous run was interrupted, .snakemake/locks/ may block. The new tool should detect this and offer --unlock passthrough.

What happens with long-running jobs (hours/days)?

  • Synchronous datalad run blocks the calling process — fine for queue-based execution but not for interactive MCP calls
  • Solution: For interactive use, support dryrun=True to preview the command. For actual execution, route through the worklog queue which handles timeouts, notifications, and session management.
  • The datalad run command itself is recorded — if interrupted, datalad rerun can re-execute.

How to handle partial runs / restarts?

  • Snakemake natively handles restarts — it checks which outputs exist and only re-runs missing rules
  • After an interrupted datalad run, the working tree may have uncommitted changes in the output dir
  • Pattern: datalad save -m "partial run" to checkpoint, then re-run. Or datalad run again with the same command — Snakemake's incrementality handles the rest.
  • --explicit flag is important here: only declared outputs are saved, preventing accidental commits of temp files.

conda environment wrapping

  • The snakebids run.py likely needs to run in a specific conda env (e.g. cogpy for snakemake+snakebids)
  • The datalad run command itself needs to run via the labpy env (for datalad+git-annex)
  • Pattern: datalad run -- conda run -n cogpy python run.py ...
  • This is already the pattern in projio's datalad.py — extend it.

7. Migration plan

Phase 1: Add datalad_run MCP tool (projio layer)

  • Add datalad_run(inputs, outputs, message, command) to src/projio/mcp/datalad.py
  • Uses same _resolve_datalad_cmd() and conda wrapping patterns
  • Generic tool — not pipeio-specific

Phase 2: New pipeio_run using datalad_run

  • Reads contract data from flow config to compute --input/--output
  • Constructs the snakebids command
  • Calls the new datalad_run tool
  • Returns structured result with commit hash

Phase 3: Deprecate old tools

  • Mark pipeio_run, pipeio_run_status, pipeio_run_dashboard, pipeio_run_kill as deprecated
  • Replace run_status/dashboard with pipeio_run_history/pipeio_run_summary backed by git log
  • Remove screen-based execution entirely

Phase 4: Clean up

  • Remove runs.json state file logic
  • Remove screen dependency
  • Update all documentation and agent instructions