chore(meshai): v0.5.5 -- cleanup bundle (gitignore env anchor, ducting health event_count, mesh_sources secret stripping, delete unused SeverityRouter)

Four independent low-risk fixes from the deferred list. Bundled in a single
commit because none are large enough to warrant their own tag and none
touch the safe-mode-sensitive paths (dispatcher / consumer / toggle config).

1) .gitignore: change bare `env/` to `/env/` so the rule anchors at the
   repo root only. The unanchored form was matching `meshai/env/` (the
   adapter package directory) and forced `git add -f` workarounds during
   2.14 / 2.16.1. Verified post-fix: `git check-ignore -vn meshai/env/test.py`
   reports no pattern match; `git check-ignore -v env/foo` still matches
   the new `/env/` rule.

2) meshai/env/ducting.py: health_status.event_count was hardcoded `0`
   from before Phase 2.13 added real event emission. Replaced with
   `len(self._events)`, which is the pattern every other env adapter
   already uses (fires/firms/nws/swpc/traffic/roads511/usgs/usgs_quake/
   avalanche). Flows through env.store.health_status → /api/env/status
   so the dashboard counter starts reflecting reality.

3) meshai/config_loader.py save_section: list-section secret stripping.
   The path landed in C.2.1 fed list items into check_secrets() with
   path="" or with `<field>[<i>]` syntax, neither of which matched the
   `mesh_sources.*.api_token` / `notifications.rules.*.smtp_password`
   regexes in SECRET_FIELDS (where `*` matches a single dotted token).
   Result: a raw secret submitted on a list-section save could slip
   through to the YAML file. Fix uses dotted-index form `<field>.<i>.<key>`
   for both nested-list (notifications.rules) and top-level-list
   (mesh_sources) paths. Also extended _raw_section construction +
   _ondisk_ref to walk list-shaped on-disk YAML by integer index so
   the C.3.1 ${VAR}-placeholder preservation now works for list sections
   too. Three new tests round-trip the mesh_sources placeholder case,
   the mesh_sources raw-secret rejection, and the nested-list
   notifications.rules placeholder case.

4) meshai/notifications/pipeline/severity_router.py: deleted.
   The fork-by-severity routing it implemented was never wired in
   production -- _tee in build_pipeline does the dispatcher+digest
   fanout directly. The class had two test references in
   tests/test_pipeline_skeleton.py that exercised "no matching rule"
   and "unknown severity" paths; those guarantees are now covered by
   tests/test_v052_dispatcher.py (stats counters) and the existing
   Dispatcher-class tests. Removed the file, the __init__.py imports
   and __all__ entries (SeverityRouter + StubDigestQueue both), the
   two test methods, and the docstring mention.

Verification:
- py_compile clean on all four touched modules.
- `grep -rn SeverityRouter meshai/ tests/` returns zero.
- pytest 328 passed (was 327 at v0.5.4; net: -2 SeverityRouter tests,
  +3 secret-preservation tests = +1).
- .gitignore anchor diagnosed via `git check-ignore -vn`.

Safe-mode preserved -- no toggle enabled, no master enabled, no central
enabled, no adapter feed_source flipped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
matt+claude 2026-06-04 02:50:45 +00:00
commit c211d34060
7 changed files with 111 additions and 173 deletions

4
.gitignore vendored
View file

@ -33,7 +33,9 @@ wheels/
# Virtual environments
venv/
ENV/
env/
# v0.5.5: anchor to repo root only -- bare `env/` matched meshai/env/ (the
# adapter package directory) and forced `git add -f` workarounds in 2.14/2.16.1.
/env/
.venv/
# IDE

View file

@ -594,7 +594,13 @@ def save_section(
if target_file in ("meshtastic.yaml", "config.yaml") and isinstance(_raw_on_disk, dict):
_raw_section = _raw_on_disk.get(section_name) or {}
else:
_raw_section = _raw_on_disk if isinstance(_raw_on_disk, dict) else {}
# v0.5.5: list-shaped sections (mesh_sources.yaml) load as a top-level
# list; carry the list through so _ondisk_ref can walk it by integer
# index. dict|list|None covered; anything else falls back to {}.
if isinstance(_raw_on_disk, (dict, list)):
_raw_section = _raw_on_disk
else:
_raw_section = {}
_secrets_path = config_dir.parent / "secrets" / ".env"
if not _secrets_path.exists():
@ -608,10 +614,18 @@ def save_section(
return v if v is not None else _env_file.get(name)
def _ondisk_ref(field_path: str):
# v0.5.5: walk dicts by key, lists by integer index so paths like
# `0.api_token` (mesh_sources) and `rules.0.smtp_password`
# (notifications) resolve to their on-disk ${VAR} ref correctly.
node = _raw_section
for part in field_path.split("."):
if isinstance(node, dict) and part in node:
node = node[part]
elif isinstance(node, list):
try:
node = node[int(part)]
except (ValueError, IndexError, TypeError):
return None
else:
return None
return node
@ -637,8 +651,12 @@ def save_section(
elif isinstance(value, dict):
cleaned[key] = check_secrets(value, field_path)
elif isinstance(value, list):
# v0.5.5: dotted-index form (`<field>.<i>.<key>`) so list-item
# secret paths match SECRET_FIELDS entries like
# `notifications.rules.*.smtp_password` — the `*` regex token
# matches a single dot-separated token, not a `[i]` suffix.
cleaned[key] = [
check_secrets(item, f"{field_path}[{i}]")
check_secrets(item, f"{field_path}.{i}")
if isinstance(item, dict) else item
for i, item in enumerate(value)
]
@ -648,8 +666,15 @@ def save_section(
# List sections (e.g. mesh_sources) have no top-level dict to scan for
# local fields; clean each item for secrets and write the list directly.
# v0.5.5: each item carries its index as the section-relative path root so
# `_is_secret_field("mesh_sources", "<i>.api_token")` matches the pattern
# `mesh_sources.*.api_token` (previously it stripped to bare `api_token`
# and let raw secrets through).
if isinstance(data, list):
domain_data = [check_secrets(item) if isinstance(item, dict) else item for item in data]
domain_data = [
check_secrets(item, str(i)) if isinstance(item, dict) else item
for i, item in enumerate(data)
]
local_updates = {}
else:
data = check_secrets(data)

View file

@ -453,6 +453,8 @@ class DuctingAdapter:
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": 0,
# v0.5.5: was hardcoded 0 -- since Phase 2.13 emits real events,
# mirror the pattern every other env adapter already uses.
"event_count": len(self._events),
"last_fetch": self._last_tick,
}

View file

@ -26,10 +26,6 @@ import logging
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue, # kept for Phase 2.1 backward-compat tests
)
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
@ -251,8 +247,6 @@ async def stop_pipeline(scheduler: DigestScheduler) -> None:
__all__ = [
"EventBus",
"SeverityRouter",
"StubDigestQueue",
"Dispatcher",
"Inhibitor",
"Grouper",

View file

@ -1,104 +0,0 @@
"""Severity-based event routing.
The severity router subscribes to the bus and forks each event into
one of two paths based on severity:
- immediate immediate_handler (dispatcher for live delivery)
- priority/routine digest_handler (queue for batched summaries)
Usage:
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest_queue.enqueue,
)
bus.subscribe(router.handle)
"""
import logging
from typing import Callable
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
class SeverityRouter:
"""Routes events to immediate or digest handlers based on severity.
Immediate-severity events go directly to live delivery channels.
Priority and routine events are queued for periodic digest summaries.
"""
def __init__(
self,
immediate_handler: Callable[[Event], None],
digest_handler: Callable[[Event], None],
):
"""Initialize the severity router.
Args:
immediate_handler: Called for severity="immediate" events
digest_handler: Called for severity in ("priority", "routine")
"""
self._immediate = immediate_handler
self._digest = digest_handler
self._logger = logging.getLogger("meshai.pipeline.severity_router")
def handle(self, event: Event) -> None:
"""Route an event based on its severity.
Args:
event: The Event to route
"""
if event.severity == "immediate":
self._logger.info(
f"IMMEDIATE: {event.source}/{event.category} {event.title}"
)
self._immediate(event)
elif event.severity in ("priority", "routine"):
self._logger.info(
f"DIGEST QUEUED [{event.severity}]: {event.title}"
)
self._digest(event)
else:
self._logger.warning(
f"Unknown severity {event.severity!r} on event {event.id}, dropping"
)
class StubDigestQueue:
"""Placeholder digest queue for Phase 2.1.
This is a stub that simply collects events in memory. Phase 2.3
will replace this with the real aggregator that renders and
delivers periodic digest summaries.
"""
def __init__(self):
self._queue: list[Event] = []
self._logger = logging.getLogger("meshai.pipeline.digest_stub")
def enqueue(self, event: Event) -> None:
"""Add an event to the digest queue.
Args:
event: The Event to queue for digest delivery
"""
self._queue.append(event)
toggle = get_toggle(event.category) or "unknown"
self._logger.info(f"DIGEST QUEUED [{toggle}]: {event.title}")
def drain(self) -> list[Event]:
"""Return and clear all queued events.
For tests and the future aggregator. Returns the current
queue contents and resets the queue to empty.
Returns:
List of all queued Events
"""
events, self._queue = self._queue, []
return events
def __len__(self) -> int:
"""Return the number of queued events."""
return len(self._queue)

View file

@ -3,9 +3,12 @@
These tests verify the core routing and dispatch behavior of the
notification pipeline without requiring real channel backends.
Updated in Phase 2.4: Events now go to BOTH dispatcher and accumulator
(no severity-based fork). SeverityRouter class kept for backward
compatibility but not used in production wiring.
Updated in Phase 2.4: Events go to BOTH dispatcher and accumulator
(no severity-based fork). v0.5.5 retired the unused fork-by-severity
module _tee in build_pipeline does the fanout directly. The two
tests in this file that relied on that module to drive a no-rule /
unknown-severity scenario are covered by tests/test_v052_dispatcher.py
(stats counters) and the Dispatcher-class tests below.
"""
import asyncio
@ -18,7 +21,6 @@ from meshai.notifications.events import Event, make_event
from meshai.notifications.pipeline import build_pipeline_components
from meshai.notifications.pipeline.bus import EventBus
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.severity_router import SeverityRouter, StubDigestQueue
# Minimal config stubs for testing
@ -157,32 +159,6 @@ class TestTeeRouting:
assert mock_channel.deliver.call_count == 1
class TestNoMatchingRule:
def test_immediate_event_with_no_matching_rule_skips_silently(self):
"""Events with no matching rules don't crash."""
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[])
)
mock_factory = Mock()
bus = EventBus()
dispatcher = Dispatcher(config, mock_factory)
digest = StubDigestQueue()
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(router.handle)
event = make_event(
source="test",
category="test_cat",
severity="immediate",
title="No Rule Alert",
)
bus.emit(event)
mock_factory.assert_not_called()
class TestSubscriberIsolation:
def test_subscriber_exception_isolation(self):
@ -205,31 +181,3 @@ class TestSubscriberIsolation:
second_handler.assert_called_once()
class TestUnknownSeverity:
def test_unknown_severity_dropped_without_crash(self):
"""Events with unknown severity are dropped gracefully."""
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[])
)
mock_factory = Mock()
bus = EventBus()
dispatcher = Dispatcher(config, mock_factory)
digest = StubDigestQueue()
mock_dispatch = Mock()
mock_enqueue = Mock()
router = SeverityRouter(
immediate_handler=mock_dispatch,
digest_handler=mock_enqueue,
)
bus.subscribe(router.handle)
event = Event(
id="test123",
source="test",
category="test_cat",
severity="bogus",
title="Bogus Severity",
)
bus.emit(event)
mock_dispatch.assert_not_called()
mock_enqueue.assert_not_called()

View file

@ -55,3 +55,74 @@ def test_no_placeholder_still_rejects(tmp_path):
written = yaml.safe_load((cfg / "env_feeds.yaml").read_text())
assert "api_key" not in written.get("traffic", {})
assert "traffic.api_key" in res["rejected_secrets"]
# v0.5.5: secret stripping for LIST-shaped sections.
# C.2.1 added save_section's list-section branch but field paths inside list
# items used `<key>` instead of `<index>.<key>`, so SECRET_FIELDS patterns
# like `mesh_sources.*.api_token` never matched on the list-section save
# path. The fix lands a dotted-index form (mesh_sources.0.api_token).
def test_mesh_sources_list_preserves_secret_ref(tmp_path):
"""List section (mesh_sources): an item's ${VAR} api_token must survive a
GUI round-trip that submits the resolved value."""
cfg = _setup(
tmp_path,
"", # env_feeds.yaml unused; we write mesh_sources.yaml directly below
"MS_TEST_TOKEN=realtoken-xyz\n",
)
(cfg / "mesh_sources.yaml").write_text(
"- name: src-a\n url: http://a.local\n api_token: ${MS_TEST_TOKEN}\n"
)
res = save_section(
"mesh_sources",
[{"name": "src-a", "url": "http://a.local", "api_token": "realtoken-xyz"}],
cfg,
)
written = yaml.safe_load((cfg / "mesh_sources.yaml").read_text())
assert written[0]["api_token"] == "${MS_TEST_TOKEN}" # placeholder preserved
# rejected_secrets stores section-relative paths (no section prefix),
# matching the convention asserted by test_no_placeholder_still_rejects.
assert "0.api_token" not in res["rejected_secrets"]
def test_mesh_sources_list_rejects_raw_when_no_placeholder(tmp_path):
"""List section: raw secret without an on-disk placeholder must be rejected
(same negative case the object-section path already enforces)."""
cfg = tmp_path / "config"
cfg.mkdir()
res = save_section(
"mesh_sources",
[{"name": "src-a", "url": "http://a.local", "api_token": "RAW_LEAK"}],
cfg,
)
written = yaml.safe_load((cfg / "mesh_sources.yaml").read_text())
assert "api_token" not in (written[0] if written else {})
assert "0.api_token" in res["rejected_secrets"]
def test_notifications_rules_nested_list_preserves_secret_ref(tmp_path):
"""Object section with a nested list (notifications.rules) -- same dotted-
index path semantics; smtp_password placeholder must survive round-trip."""
cfg = _setup(
tmp_path,
"",
"C55_SMTP_PASS=letmein\n",
)
# notifications lives in its own file per SECTION_TO_FILE.
(cfg / "notifications.yaml").write_text(
"enabled: true\n"
"rules:\n"
" - name: r0\n enabled: true\n smtp_password: ${C55_SMTP_PASS}\n"
)
res = save_section(
"notifications",
{
"enabled": True,
"rules": [{"name": "r0", "enabled": True, "smtp_password": "letmein"}],
},
cfg,
)
written = yaml.safe_load((cfg / "notifications.yaml").read_text())
assert written["rules"][0]["smtp_password"] == "${C55_SMTP_PASS}"
assert "rules.0.smtp_password" not in res["rejected_secrets"]