Flatten pipe/flow hierarchy to flow + category tag in registry model¶
Goal¶
Replace the two-level pipe/flow hierarchy with a flat flow list where pipe becomes an optional category tag on the FlowEntry. This simplifies the registry, makes refactoring easier, and aligns with the v2 architecture where one flow = one snakebids app = one derivative.
Context¶
See roadmap: docs/log/idea/idea-arash-20260330-174518-164647.md
Current state: PipelineRegistry in packages/pipeio/src/pipeio/registry.py stores flows keyed by pipe/flow with separate pipe and name (flow) fields on FlowEntry. All MCP tools accept pipe + flow as separate arguments. The pipe level is just a category — it doesn't own any state, config, or behavior.
Prompt¶
Flatten the pipe/flow hierarchy in pipeio's registry model. This is a research + design task — produce a migration plan, do NOT implement yet.
Read
packages/pipeio/src/pipeio/registry.py— understandPipelineRegistry,FlowEntry,ModEntry. Note how flows are keyed (pipe/flow), howpipeandnameare used.Grep all MCP tools in
packages/pipeio/src/pipeio/mcp.pyfor how they usepipeandflowarguments. Count how many tools takepipeas a required arg.Check the projio wrapper layer (
src/projio/mcp/pipeio.py,src/projio/mcp/server.py) for the same pattern.Check
packages/pipeio/src/pipeio/cli.pyfor CLI usage of pipe/flow.Read
registry_scanto understand how pipe names are derived from filesystem paths.Design the new model:
FlowEntrygets acategory: str = ""field (replacespipe)- Registry keys become just the flow name (or a slug like
preprocess-ecephys)registry.get()accepts justflow(with optionalcategoryfilter)- MCP tools accept
flowas primary arg,categoryas optional filterBackward compat:
pipe/flowkey format still parseableProduce a migration plan listing:
- Every file that needs to change
- Every function signature that changes
- Backward compatibility strategy
- Test impact
Write the plan into the Result section of this task note.
Acceptance Criteria¶
- [ ] Complete inventory of pipe/flow usage across codebase
- [ ] New registry model designed (FlowEntry with category)
- [ ] Migration plan with file-by-file changes
- [ ] Backward compatibility strategy documented
Result¶
Codebase Inventory¶
Current FlowEntry model (packages/pipeio/src/pipeio/registry.py)¶
class FlowEntry(BaseModel):
name: str # flow name, e.g. "ecephys"
pipe: str # pipe/category, e.g. "preprocess"
code_path: str
config_path: str | None = None
doc_path: str | None = None
mods: dict[str, ModEntry] = {}
Registry keys are either "pipe" (self-named single flow) or "pipe/flow" (e.g., "preprocess/ecephys").
registry.get(pipe: str, flow: str | None = None) — pipe is the primary required arg. Auto-selects when pipe has exactly one flow or a flow named after the pipe.
scan() derives pipe from the first-level subdirectory name under pipelines_dir/.
_find_doc_path() uses docs/pipe-<pipe>/flow-<flow>/ convention.
MCP tool pipe arg inventory (packages/pipeio/src/pipeio/mcp.py)¶
21 tools with pipe as a required (non-Optional) arg:
| Tool | pipe |
flow |
|---|---|---|
mcp_flow_status |
required | required |
mcp_nb_update |
required | required |
mcp_nb_create |
required | required |
mcp_nb_sync |
required | required |
mcp_nb_publish |
required | required |
mcp_nb_analyze |
required | required |
mcp_nb_exec |
required | required |
mcp_mod_create |
required | required |
mcp_rule_stub |
required | required |
mcp_mod_list |
required | str \| None = None |
mcp_mod_context |
required | str \| None = None |
mcp_rule_list |
required | str \| None = None |
mcp_rule_insert |
required | str \| None = None |
mcp_rule_update |
required | str \| None = None |
mcp_config_read |
required | str \| None = None |
mcp_config_patch |
required | str \| None = None |
mcp_config_init |
required | str \| None = None |
mcp_dag |
required | str \| None = None |
mcp_completion |
required | str \| None = None |
mcp_log_parse |
required | str \| None = None |
mcp_run |
required | str \| None = None |
2 tools with pipe as optional filter only:
- mcp_flow_list(root, pipe: str | None = None)
- mcp_cross_flow(root, pipe: str | None = None, flow: str | None = None)
Tools with no pipe/flow (no change needed): mcp_nb_status, mcp_mod_resolve, mcp_registry_scan, mcp_modkey_bib, mcp_docs_collect, mcp_docs_nav, mcp_contracts_validate, mcp_registry_validate, mcp_run_status, mcp_run_dashboard, mcp_run_kill.
Projio wrapper (src/projio/mcp/pipeio.py)¶
All 21 tools above + 2 optional tools have wrapper functions with identical pipe/flow parameter shapes. Wrappers simply forward to pipeio.mcp.*.
Projio server (src/projio/mcp/server.py)¶
Each tool has a @server.tool("pipeio_*") registration with explicit pipe: str, flow: str parameters. All 21 required-pipe tools need signature updates.
CLI (packages/pipeio/src/pipeio/cli.py)¶
pipeio flow list [--pipe <name>]— optional filterpipeio flow new <pipe> <flow>— both positional argspipeio registry scan— no pipe/flow args (filesystem walk)pipeio registry validate— no pipe/flow args
New Model Design¶
FlowEntry field rename¶
class FlowEntry(BaseModel):
name: str
category: str = "" # replaces pipe; "" means uncategorized
code_path: str
config_path: str | None = None
doc_path: str | None = None
mods: dict[str, ModEntry] = Field(default_factory=dict)
@property
def pipe(self) -> str:
"""Backward-compat alias for category."""
return self.category
Use a model_validator(mode="before") to map the old pipe YAML field to category on load:
@model_validator(mode="before")
@classmethod
def _migrate_pipe(cls, data):
if isinstance(data, dict) and "pipe" in data and "category" not in data:
data["category"] = data.pop("pipe")
return data
Registry key format¶
New default key: just the flow name (e.g., "ecephys").
Collision rule: if two flows share a name across categories, scan() produces compound slug "{category}-{flow}" as the key (and sets name to the compound slug).
Backward-compat on load (from_yaml): if a key contains / (old pipe/flow format), split it, use flow part as the name-lookup key, and set category from the pipe part.
# from_yaml migration step (pseudocode):
for key, entry_data in raw["flows"].items():
if "/" in key:
pipe_part, flow_part = key.split("/", 1)
entry_data.setdefault("category", pipe_part)
new_key = flow_part if flow_part != pipe_part else flow_part
else:
new_key = key
flows[new_key] = FlowEntry(**entry_data)
registry.get() new signature¶
# OLD
def get(self, pipe: str, flow: str | None = None) -> FlowEntry
# NEW
def get(self, flow: str, category: str | None = None) -> FlowEntry
Resolution logic (replacing the current pipe-first logic):
1. Direct key lookup: self.flows[flow]
2. If category given and key not found, search by (entry.name == flow and entry.category == category)
3. If still not found, search by entry.name == flow — if exactly one match, return it
4. If multiple matches (same flow name in different categories), raise ValueError listing them (category required)
5. Raise KeyError if nothing found
Old call sites registry.get(pipe, flow) → registry.get(flow, pipe) — argument order swaps.
registry.list_flows() / list_pipes() changes¶
# NEW
def list_flows(self, category: str | None = None, pipe: str | None = None) -> list[FlowEntry]:
# pipe= accepted as alias for category= (compat)
cat = category or pipe
entries = list(self.flows.values())
if cat:
entries = [f for f in entries if f.category == cat]
return entries
def list_categories(self) -> list[str]:
return sorted({f.category for f in self.flows.values() if f.category})
def list_pipes(self) -> list[str]:
"""Deprecated alias for list_categories()."""
return self.list_categories()
validate() changes¶
flow_id = f"{entry.pipe}/{entry.name}"→flow_id = f"{entry.category}/{entry.name}"(or justentry.name)- Slug check on
entry.pipe→entry.category - Uniqueness check on
entry.name(not onpipe/name)
scan() / _discover_flows() changes¶
FlowEntry(pipe=pipe, ...)→FlowEntry(category=pipe, ...)- Key generation:
key = flow_name(notpipe/flow); collision →f"{pipe}-{flow}" - Self-named flow (
flow == pipe): key =flow_name, category =pipe
_find_doc_path() change¶
Rename parameter pipe → category (internal, no public API impact):
def _find_doc_path(docs_dir, category: str, flow: str) -> str | None:
flow_doc = docs_dir / f"pipe-{category}" / f"flow-{flow}" # path format unchanged
File-by-File Migration Plan¶
1. packages/pipeio/src/pipeio/registry.py¶
| Item | Change |
|---|---|
FlowEntry.pipe: str |
→ category: str = ""; add pipe property alias |
Add model_validator |
Map pipe → category on load |
PipelineRegistry.get(pipe, flow) |
→ get(flow, category=None) — swap arg order |
list_flows(pipe=None) |
→ list_flows(category=None, pipe=None) |
list_pipes() |
Keep as alias for list_categories() |
Add list_categories() |
New canonical method |
validate() |
entry.pipe → entry.category; update flow_id |
from_yaml() |
Add key migration: "pipe/flow" → "flow" |
to_yaml() |
Write category field (not pipe) |
scan() |
pipe=pipe → category=pipe; new key format |
_discover_flows() |
FlowEntry(pipe=...) → FlowEntry(category=...) |
_find_doc_path() |
Rename param pipe → category |
LOC impact: ~60 lines changed, ~20 lines added (validator, list_categories, compat logic).
2. packages/pipeio/src/pipeio/mcp.py¶
For all 21 tools with required pipe:
- pipe: str → category: str = "" (optional, for disambiguation)
- flow: str becomes the primary required arg (already non-optional for most)
- All registry.get(pipe, flow) call sites → registry.get(flow, category or None)
- entry.pipe references → entry.category
- Output dict keys: keep "pipe" alongside "category" during transition
- list_flows(pipe=pipe) → list_flows(category=category)
For the 2 optional-pipe tools:
- mcp_flow_list(root, pipe=None) → mcp_flow_list(root, category=None, pipe=None)
- mcp_cross_flow(root, pipe=None, flow=None) → mcp_cross_flow(root, category=None, flow=None)
LOC impact: ~21 function signatures + ~50 internal call sites.
3. packages/pipeio/src/pipeio/cli.py¶
| Item | Change |
|---|---|
pipeio flow list --pipe |
Add --category as primary; keep --pipe as compat alias |
pipeio flow new <pipe> <flow> |
→ pipeio flow new <flow> [--category <cat>] (or keep both positional for compat) |
registry.get(pipe, flow) call sites |
→ registry.get(flow, pipe or None) |
f.pipe references |
→ f.category |
list_pipes() call |
→ list_categories() (or keep as alias) |
LOC impact: ~15 lines changed.
4. src/projio/mcp/pipeio.py¶
For all 21 wrapper functions:
- pipe: str → category: str = ""
- Docstrings updated (Args section)
- Forwarded kwargs updated
LOC impact: ~21 function signatures + docstrings, ~100 lines total.
5. src/projio/mcp/server.py¶
For all @server.tool registrations with pipe:
- pipe: str → category: str = ""
- Update call forwarding
LOC impact: ~21 registration blocks, ~50 lines.
Backward Compatibility Strategy¶
Three layers of compat, all non-breaking for existing registry.yml files:
-
YAML loading (
from_yaml): If a key contains/, split and migrate to new key+category. If entry haspipe:field (notcategory:), the Pydantic model_validator silently promotes it. Existing registry.yml files load without modification. -
Python API:
FlowEntry.pipekept as a property alias forcategory.list_pipes()kept as alias forlist_categories().registry.get()arg order changes — this is a breaking Python API change that cannot be silently fixed (positional callers break). Mitigation: provide a compat shimget_v1(pipe, flow)during transition, or use keyword-only args. -
MCP tool API: Adding
categoryas an optional kwarg is non-breaking — existing callers passingpipe=will break (unknown kwarg). This IS a breaking change at the MCP level. Mitigation options: - Accept both
pipeandcategoryin wrapper signatures during a transition period:def tool(flow: str, category: str = "", pipe: str = "")wherepipeoverridescategoryif both given. -
OR bump MCP schema version and update callers (agents) simultaneously.
-
Filesystem convention: No change —
pipelines/<category>/<flow>/paths are unchanged.scan()still reads the same directory structure. -
Modkey format (
pipe-X_flow-Y_mod-Z): The modkey BibTeX format referencespipeandflowby name. After migration,pipebecomescategory. The format string stays the same; only the internal field name changes. No change to existing BibTeX citekeys.
Recommended transition approach:
- Phase 1: Add category to FlowEntry as the canonical field with pipe as compat alias. Keep old registry key format parseable. Do NOT change MCP signatures yet.
- Phase 2: Change MCP tool signatures to accept flow primary + category optional. Accept legacy pipe= kwarg and warn. Update tests.
- Phase 3: Remove legacy pipe kwarg from public API and MCP tools. Update CLAUDE.md, server.py, docs.
Test Impact¶
packages/pipeio/tests/test_registry.py (~20 call sites)¶
FlowEntry(name=..., pipe=...)→FlowEntry(name=..., category=...)entry.pipe→entry.categoryreg.get("preprocess", "ieeg")→reg.get("ieeg", "preprocess")reg.get("brainstate")→reg.get("brainstate")(unchanged — single arg still works)reg.list_pipes()→reg.list_categories()reg.list_flows(pipe=...)→reg.list_flows(category=...)- Scan test: key assertions change from
"preprocess/ecephys"→"ecephys"
packages/pipeio/tests/test_mcp.py (~30 call sites)¶
mcp_rule_list(tmp_path, pipe="preproc", flow="denoise")→mcp_rule_list(tmp_path, flow="denoise", category="preproc")result["pipe"]assertions may still pass if we keeppipein output dicts during transition- All registry fixtures:
"preproc/denoise"keys →"denoise"keys
Other test files¶
test_cli.py:--pipeCLI arg tests →--categorytest_docs.py,test_mod_discovery.py,test_contracts.py,test_pipeio.py: grep forpipe=kwargs and registry key strings containing/
Estimated test changes: ~60–80 lines across 5 test files.
Summary Table¶
| File | Nature | Effort |
|---|---|---|
registry.py |
Core model change, compat validator, key migration | Medium |
mcp.py |
21 signature changes + ~50 call sites | Large |
cli.py |
15 lines, compat aliases | Small |
pipeio.py (projio wrapper) |
21 signatures + docstrings | Medium |
server.py (projio MCP) |
21 registrations | Small |
test_registry.py |
~20 assertion updates | Small |
test_mcp.py |
~30 call site updates | Small |
| Other tests | ~10 updates | Small |
Recommended implementation order: registry.py first (core + compat) → mcp.py → pipeio.py + server.py → cli.py → tests → docs.
Acceptance Criteria¶
- [x] Complete inventory of pipe/flow usage across codebase
- [x] New registry model designed (FlowEntry with category)
- [x] Migration plan with file-by-file changes
- [x] Backward compatibility strategy documented