Skip to content

Flatten pipe/flow hierarchy to flow + category tag in registry model

Goal

Replace the two-level pipe/flow hierarchy with a flat flow list where pipe becomes an optional category tag on the FlowEntry. This simplifies the registry, makes refactoring easier, and aligns with the v2 architecture where one flow = one snakebids app = one derivative.

Context

See roadmap: docs/log/idea/idea-arash-20260330-174518-164647.md

Current state: PipelineRegistry in packages/pipeio/src/pipeio/registry.py stores flows keyed by pipe/flow with separate pipe and name (flow) fields on FlowEntry. All MCP tools accept pipe + flow as separate arguments. The pipe level is just a category — it doesn't own any state, config, or behavior.

Prompt

Flatten the pipe/flow hierarchy in pipeio's registry model. This is a research + design task — produce a migration plan, do NOT implement yet.

  1. Read packages/pipeio/src/pipeio/registry.py — understand PipelineRegistry, FlowEntry, ModEntry. Note how flows are keyed (pipe/flow), how pipe and name are used.

  2. Grep all MCP tools in packages/pipeio/src/pipeio/mcp.py for how they use pipe and flow arguments. Count how many tools take pipe as a required arg.

  3. Check the projio wrapper layer (src/projio/mcp/pipeio.py, src/projio/mcp/server.py) for the same pattern.

  4. Check packages/pipeio/src/pipeio/cli.py for CLI usage of pipe/flow.

  5. Read registry_scan to understand how pipe names are derived from filesystem paths.

  6. Design the new model:

  7. FlowEntry gets a category: str = "" field (replaces pipe)
  8. Registry keys become just the flow name (or a slug like preprocess-ecephys)
  9. registry.get() accepts just flow (with optional category filter)
  10. MCP tools accept flow as primary arg, category as optional filter
  11. Backward compat: pipe/flow key format still parseable

  12. Produce a migration plan listing:

  13. Every file that needs to change
  14. Every function signature that changes
  15. Backward compatibility strategy
  16. Test impact

Write the plan into the Result section of this task note.

Acceptance Criteria

  • [ ] Complete inventory of pipe/flow usage across codebase
  • [ ] New registry model designed (FlowEntry with category)
  • [ ] Migration plan with file-by-file changes
  • [ ] Backward compatibility strategy documented

Result

Codebase Inventory

Current FlowEntry model (packages/pipeio/src/pipeio/registry.py)

class FlowEntry(BaseModel):
    name: str        # flow name, e.g. "ecephys"
    pipe: str        # pipe/category, e.g. "preprocess"
    code_path: str
    config_path: str | None = None
    doc_path: str | None = None
    mods: dict[str, ModEntry] = {}

Registry keys are either "pipe" (self-named single flow) or "pipe/flow" (e.g., "preprocess/ecephys").

registry.get(pipe: str, flow: str | None = None)pipe is the primary required arg. Auto-selects when pipe has exactly one flow or a flow named after the pipe.

scan() derives pipe from the first-level subdirectory name under pipelines_dir/. _find_doc_path() uses docs/pipe-<pipe>/flow-<flow>/ convention.

MCP tool pipe arg inventory (packages/pipeio/src/pipeio/mcp.py)

21 tools with pipe as a required (non-Optional) arg:

Tool pipe flow
mcp_flow_status required required
mcp_nb_update required required
mcp_nb_create required required
mcp_nb_sync required required
mcp_nb_publish required required
mcp_nb_analyze required required
mcp_nb_exec required required
mcp_mod_create required required
mcp_rule_stub required required
mcp_mod_list required str \| None = None
mcp_mod_context required str \| None = None
mcp_rule_list required str \| None = None
mcp_rule_insert required str \| None = None
mcp_rule_update required str \| None = None
mcp_config_read required str \| None = None
mcp_config_patch required str \| None = None
mcp_config_init required str \| None = None
mcp_dag required str \| None = None
mcp_completion required str \| None = None
mcp_log_parse required str \| None = None
mcp_run required str \| None = None

2 tools with pipe as optional filter only: - mcp_flow_list(root, pipe: str | None = None) - mcp_cross_flow(root, pipe: str | None = None, flow: str | None = None)

Tools with no pipe/flow (no change needed): mcp_nb_status, mcp_mod_resolve, mcp_registry_scan, mcp_modkey_bib, mcp_docs_collect, mcp_docs_nav, mcp_contracts_validate, mcp_registry_validate, mcp_run_status, mcp_run_dashboard, mcp_run_kill.

Projio wrapper (src/projio/mcp/pipeio.py)

All 21 tools above + 2 optional tools have wrapper functions with identical pipe/flow parameter shapes. Wrappers simply forward to pipeio.mcp.*.

Projio server (src/projio/mcp/server.py)

Each tool has a @server.tool("pipeio_*") registration with explicit pipe: str, flow: str parameters. All 21 required-pipe tools need signature updates.

CLI (packages/pipeio/src/pipeio/cli.py)

  • pipeio flow list [--pipe <name>] — optional filter
  • pipeio flow new <pipe> <flow> — both positional args
  • pipeio registry scan — no pipe/flow args (filesystem walk)
  • pipeio registry validate — no pipe/flow args

New Model Design

FlowEntry field rename

class FlowEntry(BaseModel):
    name: str
    category: str = ""      # replaces pipe; "" means uncategorized
    code_path: str
    config_path: str | None = None
    doc_path: str | None = None
    mods: dict[str, ModEntry] = Field(default_factory=dict)

    @property
    def pipe(self) -> str:
        """Backward-compat alias for category."""
        return self.category

Use a model_validator(mode="before") to map the old pipe YAML field to category on load:

@model_validator(mode="before")
@classmethod
def _migrate_pipe(cls, data):
    if isinstance(data, dict) and "pipe" in data and "category" not in data:
        data["category"] = data.pop("pipe")
    return data

Registry key format

New default key: just the flow name (e.g., "ecephys"). Collision rule: if two flows share a name across categories, scan() produces compound slug "{category}-{flow}" as the key (and sets name to the compound slug).

Backward-compat on load (from_yaml): if a key contains / (old pipe/flow format), split it, use flow part as the name-lookup key, and set category from the pipe part.

# from_yaml migration step (pseudocode):
for key, entry_data in raw["flows"].items():
    if "/" in key:
        pipe_part, flow_part = key.split("/", 1)
        entry_data.setdefault("category", pipe_part)
        new_key = flow_part if flow_part != pipe_part else flow_part
    else:
        new_key = key
    flows[new_key] = FlowEntry(**entry_data)

registry.get() new signature

# OLD
def get(self, pipe: str, flow: str | None = None) -> FlowEntry

# NEW
def get(self, flow: str, category: str | None = None) -> FlowEntry

Resolution logic (replacing the current pipe-first logic): 1. Direct key lookup: self.flows[flow] 2. If category given and key not found, search by (entry.name == flow and entry.category == category) 3. If still not found, search by entry.name == flow — if exactly one match, return it 4. If multiple matches (same flow name in different categories), raise ValueError listing them (category required) 5. Raise KeyError if nothing found

Old call sites registry.get(pipe, flow)registry.get(flow, pipe) — argument order swaps.

registry.list_flows() / list_pipes() changes

# NEW
def list_flows(self, category: str | None = None, pipe: str | None = None) -> list[FlowEntry]:
    # pipe= accepted as alias for category= (compat)
    cat = category or pipe
    entries = list(self.flows.values())
    if cat:
        entries = [f for f in entries if f.category == cat]
    return entries

def list_categories(self) -> list[str]:
    return sorted({f.category for f in self.flows.values() if f.category})

def list_pipes(self) -> list[str]:
    """Deprecated alias for list_categories()."""
    return self.list_categories()

validate() changes

  • flow_id = f"{entry.pipe}/{entry.name}"flow_id = f"{entry.category}/{entry.name}" (or just entry.name)
  • Slug check on entry.pipeentry.category
  • Uniqueness check on entry.name (not on pipe/name)

scan() / _discover_flows() changes

  • FlowEntry(pipe=pipe, ...)FlowEntry(category=pipe, ...)
  • Key generation: key = flow_name (not pipe/flow); collision → f"{pipe}-{flow}"
  • Self-named flow (flow == pipe): key = flow_name, category = pipe

_find_doc_path() change

Rename parameter pipecategory (internal, no public API impact):

def _find_doc_path(docs_dir, category: str, flow: str) -> str | None:
    flow_doc = docs_dir / f"pipe-{category}" / f"flow-{flow}"  # path format unchanged


File-by-File Migration Plan

1. packages/pipeio/src/pipeio/registry.py

Item Change
FlowEntry.pipe: str category: str = ""; add pipe property alias
Add model_validator Map pipecategory on load
PipelineRegistry.get(pipe, flow) get(flow, category=None) — swap arg order
list_flows(pipe=None) list_flows(category=None, pipe=None)
list_pipes() Keep as alias for list_categories()
Add list_categories() New canonical method
validate() entry.pipeentry.category; update flow_id
from_yaml() Add key migration: "pipe/flow""flow"
to_yaml() Write category field (not pipe)
scan() pipe=pipecategory=pipe; new key format
_discover_flows() FlowEntry(pipe=...)FlowEntry(category=...)
_find_doc_path() Rename param pipecategory

LOC impact: ~60 lines changed, ~20 lines added (validator, list_categories, compat logic).

2. packages/pipeio/src/pipeio/mcp.py

For all 21 tools with required pipe: - pipe: strcategory: str = "" (optional, for disambiguation) - flow: str becomes the primary required arg (already non-optional for most) - All registry.get(pipe, flow) call sites → registry.get(flow, category or None) - entry.pipe references → entry.category - Output dict keys: keep "pipe" alongside "category" during transition - list_flows(pipe=pipe)list_flows(category=category)

For the 2 optional-pipe tools: - mcp_flow_list(root, pipe=None)mcp_flow_list(root, category=None, pipe=None) - mcp_cross_flow(root, pipe=None, flow=None)mcp_cross_flow(root, category=None, flow=None)

LOC impact: ~21 function signatures + ~50 internal call sites.

3. packages/pipeio/src/pipeio/cli.py

Item Change
pipeio flow list --pipe Add --category as primary; keep --pipe as compat alias
pipeio flow new <pipe> <flow> pipeio flow new <flow> [--category <cat>] (or keep both positional for compat)
registry.get(pipe, flow) call sites registry.get(flow, pipe or None)
f.pipe references f.category
list_pipes() call list_categories() (or keep as alias)

LOC impact: ~15 lines changed.

4. src/projio/mcp/pipeio.py

For all 21 wrapper functions: - pipe: strcategory: str = "" - Docstrings updated (Args section) - Forwarded kwargs updated

LOC impact: ~21 function signatures + docstrings, ~100 lines total.

5. src/projio/mcp/server.py

For all @server.tool registrations with pipe: - pipe: strcategory: str = "" - Update call forwarding

LOC impact: ~21 registration blocks, ~50 lines.


Backward Compatibility Strategy

Three layers of compat, all non-breaking for existing registry.yml files:

  1. YAML loading (from_yaml): If a key contains /, split and migrate to new key+category. If entry has pipe: field (not category:), the Pydantic model_validator silently promotes it. Existing registry.yml files load without modification.

  2. Python API: FlowEntry.pipe kept as a property alias for category. list_pipes() kept as alias for list_categories(). registry.get() arg order changes — this is a breaking Python API change that cannot be silently fixed (positional callers break). Mitigation: provide a compat shim get_v1(pipe, flow) during transition, or use keyword-only args.

  3. MCP tool API: Adding category as an optional kwarg is non-breaking — existing callers passing pipe= will break (unknown kwarg). This IS a breaking change at the MCP level. Mitigation options:

  4. Accept both pipe and category in wrapper signatures during a transition period: def tool(flow: str, category: str = "", pipe: str = "") where pipe overrides category if both given.
  5. OR bump MCP schema version and update callers (agents) simultaneously.

  6. Filesystem convention: No change — pipelines/<category>/<flow>/ paths are unchanged. scan() still reads the same directory structure.

  7. Modkey format (pipe-X_flow-Y_mod-Z): The modkey BibTeX format references pipe and flow by name. After migration, pipe becomes category. The format string stays the same; only the internal field name changes. No change to existing BibTeX citekeys.

Recommended transition approach: - Phase 1: Add category to FlowEntry as the canonical field with pipe as compat alias. Keep old registry key format parseable. Do NOT change MCP signatures yet. - Phase 2: Change MCP tool signatures to accept flow primary + category optional. Accept legacy pipe= kwarg and warn. Update tests. - Phase 3: Remove legacy pipe kwarg from public API and MCP tools. Update CLAUDE.md, server.py, docs.


Test Impact

packages/pipeio/tests/test_registry.py (~20 call sites)

  • FlowEntry(name=..., pipe=...)FlowEntry(name=..., category=...)
  • entry.pipeentry.category
  • reg.get("preprocess", "ieeg")reg.get("ieeg", "preprocess")
  • reg.get("brainstate")reg.get("brainstate") (unchanged — single arg still works)
  • reg.list_pipes()reg.list_categories()
  • reg.list_flows(pipe=...)reg.list_flows(category=...)
  • Scan test: key assertions change from "preprocess/ecephys""ecephys"

packages/pipeio/tests/test_mcp.py (~30 call sites)

  • mcp_rule_list(tmp_path, pipe="preproc", flow="denoise")mcp_rule_list(tmp_path, flow="denoise", category="preproc")
  • result["pipe"] assertions may still pass if we keep pipe in output dicts during transition
  • All registry fixtures: "preproc/denoise" keys → "denoise" keys

Other test files

  • test_cli.py: --pipe CLI arg tests → --category
  • test_docs.py, test_mod_discovery.py, test_contracts.py, test_pipeio.py: grep for pipe= kwargs and registry key strings containing /

Estimated test changes: ~60–80 lines across 5 test files.


Summary Table

File Nature Effort
registry.py Core model change, compat validator, key migration Medium
mcp.py 21 signature changes + ~50 call sites Large
cli.py 15 lines, compat aliases Small
pipeio.py (projio wrapper) 21 signatures + docstrings Medium
server.py (projio MCP) 21 registrations Small
test_registry.py ~20 assertion updates Small
test_mcp.py ~30 call site updates Small
Other tests ~10 updates Small

Recommended implementation order: registry.py first (core + compat) → mcp.pypipeio.py + server.pycli.py → tests → docs.


Acceptance Criteria

  • [x] Complete inventory of pipe/flow usage across codebase
  • [x] New registry model designed (FlowEntry with category)
  • [x] Migration plan with file-by-file changes
  • [x] Backward compatibility strategy documented