meshai/tests/test_pipeline_skeleton.py

183 lines
5.8 KiB
Python
Raw Normal View History

"""Test cases for Phase 2.1 notification pipeline skeleton.
These tests verify the core routing and dispatch behavior of the
notification pipeline without requiring real channel backends.
chore(meshai): v0.5.5 -- cleanup bundle (gitignore env anchor, ducting health event_count, mesh_sources secret stripping, delete unused SeverityRouter) Four independent low-risk fixes from the deferred list. Bundled in a single commit because none are large enough to warrant their own tag and none touch the safe-mode-sensitive paths (dispatcher / consumer / toggle config). 1) .gitignore: change bare `env/` to `/env/` so the rule anchors at the repo root only. The unanchored form was matching `meshai/env/` (the adapter package directory) and forced `git add -f` workarounds during 2.14 / 2.16.1. Verified post-fix: `git check-ignore -vn meshai/env/test.py` reports no pattern match; `git check-ignore -v env/foo` still matches the new `/env/` rule. 2) meshai/env/ducting.py: health_status.event_count was hardcoded `0` from before Phase 2.13 added real event emission. Replaced with `len(self._events)`, which is the pattern every other env adapter already uses (fires/firms/nws/swpc/traffic/roads511/usgs/usgs_quake/ avalanche). Flows through env.store.health_status → /api/env/status so the dashboard counter starts reflecting reality. 3) meshai/config_loader.py save_section: list-section secret stripping. The path landed in C.2.1 fed list items into check_secrets() with path="" or with `<field>[<i>]` syntax, neither of which matched the `mesh_sources.*.api_token` / `notifications.rules.*.smtp_password` regexes in SECRET_FIELDS (where `*` matches a single dotted token). Result: a raw secret submitted on a list-section save could slip through to the YAML file. Fix uses dotted-index form `<field>.<i>.<key>` for both nested-list (notifications.rules) and top-level-list (mesh_sources) paths. Also extended _raw_section construction + _ondisk_ref to walk list-shaped on-disk YAML by integer index so the C.3.1 ${VAR}-placeholder preservation now works for list sections too. Three new tests round-trip the mesh_sources placeholder case, the mesh_sources raw-secret rejection, and the nested-list notifications.rules placeholder case. 4) meshai/notifications/pipeline/severity_router.py: deleted. The fork-by-severity routing it implemented was never wired in production -- _tee in build_pipeline does the dispatcher+digest fanout directly. The class had two test references in tests/test_pipeline_skeleton.py that exercised "no matching rule" and "unknown severity" paths; those guarantees are now covered by tests/test_v052_dispatcher.py (stats counters) and the existing Dispatcher-class tests. Removed the file, the __init__.py imports and __all__ entries (SeverityRouter + StubDigestQueue both), the two test methods, and the docstring mention. Verification: - py_compile clean on all four touched modules. - `grep -rn SeverityRouter meshai/ tests/` returns zero. - pytest 328 passed (was 327 at v0.5.4; net: -2 SeverityRouter tests, +3 secret-preservation tests = +1). - .gitignore anchor diagnosed via `git check-ignore -vn`. Safe-mode preserved -- no toggle enabled, no master enabled, no central enabled, no adapter feed_source flipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:50:45 +00:00
Updated in Phase 2.4: Events go to BOTH dispatcher and accumulator
(no severity-based fork). v0.5.5 retired the unused fork-by-severity
module _tee in build_pipeline does the fanout directly. The two
tests in this file that relied on that module to drive a no-rule /
unknown-severity scenario are covered by tests/test_v052_dispatcher.py
(stats counters) and the Dispatcher-class tests below.
"""
import asyncio
import pytest
from unittest.mock import Mock, AsyncMock, patch
from dataclasses import dataclass, field
from meshai.notifications.events import Event, make_event
from meshai.notifications.pipeline import build_pipeline_components
from meshai.notifications.pipeline.bus import EventBus
from meshai.notifications.pipeline.dispatcher import Dispatcher
# Minimal config stubs for testing
@dataclass
class NotificationRuleConfigStub:
name: str = "test_rule"
enabled: bool = True
trigger_type: str = "condition"
categories: list = field(default_factory=list)
min_severity: str = "routine"
delivery_type: str = "mesh_broadcast"
@dataclass
class NotificationsConfigStub:
rules: list = field(default_factory=list)
@dataclass
class ConfigStub:
notifications: NotificationsConfigStub = field(default_factory=NotificationsConfigStub)
class TestImmediateDispatch:
def test_immediate_event_with_matching_rule_dispatches(self):
"""Immediate events reach the dispatcher and get delivered."""
rule = NotificationRuleConfigStub(
enabled=True,
trigger_type="condition",
categories=["test_cat"],
min_severity="routine",
delivery_type="mesh_broadcast",
)
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
bus = EventBus()
dispatcher = Dispatcher(config, mock_factory)
event = make_event(
source="test",
category="test_cat",
severity="immediate",
title="Test Alert",
summary="Test summary message",
)
# Run dispatch in async context since it's now async
asyncio.run(dispatcher.dispatch(event))
assert mock_channel.deliver.call_count == 1
alert = mock_channel.deliver.call_args[0][0]
assert alert.category == "test_cat"
assert alert.severity == "immediate"
assert alert.message
class TestTeeRouting:
"""Phase 2.4: Events go to BOTH dispatcher and accumulator."""
def test_routine_event_goes_to_both_dispatcher_and_accumulator(self):
"""Routine events reach both dispatcher and accumulator in Phase 2.4."""
rule = NotificationRuleConfigStub(
enabled=True,
trigger_type="condition",
categories=["test_cat"],
min_severity="routine",
delivery_type="mesh_broadcast",
)
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
# Create dispatcher
dispatcher = Dispatcher(config, mock_factory)
# Create accumulator mock
accumulator_calls = []
def mock_enqueue(event):
accumulator_calls.append(event)
event = make_event(
source="test",
category="test_cat",
severity="routine",
title="Routine Alert",
)
# Run dispatch in async context
asyncio.run(dispatcher.dispatch(event))
mock_enqueue(event)
# Both paths received the event
assert len(accumulator_calls) == 1
# Dispatcher found a matching rule and delivered
assert mock_channel.deliver.call_count == 1
def test_priority_event_goes_to_both_dispatcher_and_accumulator(self):
"""Priority events reach both dispatcher and accumulator in Phase 2.4."""
rule = NotificationRuleConfigStub(
enabled=True,
trigger_type="condition",
categories=["test_cat"],
min_severity="routine",
delivery_type="mesh_broadcast",
)
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
dispatcher = Dispatcher(config, mock_factory)
accumulator_calls = []
def mock_enqueue(event):
accumulator_calls.append(event)
event = make_event(
source="test",
category="test_cat",
severity="priority",
title="Priority Alert",
)
# Run dispatch in async context
asyncio.run(dispatcher.dispatch(event))
mock_enqueue(event)
assert len(accumulator_calls) == 1
assert mock_channel.deliver.call_count == 1
class TestSubscriberIsolation:
def test_subscriber_exception_isolation(self):
"""Exceptions in one subscriber don't affect others."""
bus = EventBus()
def failing_handler(event):
raise RuntimeError("Handler failed")
second_handler = Mock()
bus.subscribe(failing_handler)
bus.subscribe(second_handler)
event = make_event(
source="test",
category="test_cat",
severity="immediate",
title="Test Event",
)
bus.emit(event)
second_handler.assert_called_once()