From 20e0dec28a5574b945c789170d642640f1461f93 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 28 May 2026 00:36:13 +0000 Subject: [PATCH] fix(notifications): Phase 2.16.1 unblock pipeline -- grouper flush + rules coercion + toggle warning Phase 2.16 found the live notification pipeline never delivered any environmental event. Two independent blocking bugs, both fixed here. BUG A -- grouper held events forever (nothing drove tick()). Every adapter event sets a group_key, so all were buffered in the Grouper and never flushed (start_pipeline only started the DigestScheduler; no tick driver existed). Fixes (per Matt's decisions): - Grouper.handle(): immediate-severity events now BYPASS the window entirely (delivered straight to next_handler), no buffering latency. routine/priority still coalesce. - start_pipeline(): schedules an asyncio flush task that calls grouper.tick() every `grouper_flush_seconds` (default 5s) so coalesced events drain within the window even when poll cadence is sparse. stop_pipeline() signals + cancels it. before/after (grouper held_count): an immediate+group_key event used to sit held (count 1) forever; now held_count==0 on arrival (bypassed). A routine event is held (count 1) then drained to 0 by tick()/flush. BUG B -- notification rules loaded as dicts, crashing the dispatcher. Root cause (more precise than 2.16's guess): the rules coercion is NOT missing from the multi-file loader -- it lives in _dict_to_dataclass's explicit `elif key == "notifications"` branch, but that branch was DEAD CODE, shadowed by the generic `if hasattr(field_type, "__dataclass_fields__")` handler that runs first for every dataclass field (including notifications). So Config.notifications.rules stayed a list of dicts on ALL load paths, and Dispatcher._matching_rules threw `AttributeError: 'dict' object has no attribute 'enabled'`. Fix: hoist the notifications special-handling ahead of the generic handler (and drop the now-truly-dead duplicate elif). before/after (cfg.notifications.rules[0] type): dict -> NotificationRuleConfig. OBS C -- empty enabled_toggles. Left as 'pass all' for v0.3 (per Matt); added a startup WARNING in build_pipeline so operators see gating is off: "enabled_toggles is empty -- ToggleFilter passing all events. Configure toggles to enable gating." (confirmed firing live). Tests: - tests/test_pipeline_grouper.py (new): test_immediate_severity_bypasses_grouper, test_periodic_flush_drains_routine, test_priority_is_also_coalesced_not_bypassed. - tests/test_config_loader.py (new): test_multifile_load_coerces_notification_rules, test_rules_attribute_access_does_not_raise (regression guards for Bug B). - tests/test_pipeline_inhibitor_grouper.py (updated): 5 existing grouper hold/coalesce/flush tests primed the grouper with immediate+group_key events expecting them to be held; switched those to 'priority' (still buffered; still outranks the routine event in the inhibitor-chain test) to match the intended immediate-bypass behavior. Full suite: 253 passed (was 248 + 5 new; 5 existing updated, none lost). VERIFICATION (rebuilt prod, traced end-to-end via in-process build_pipeline probe with a recording channel + live config): - rules[0] type: NotificationRuleConfig (Bug B fixed). - IMMEDIATE event: held_count==0 on emit (bypassed) -> reached channel.deliver(): delivered=[('PROBE_RULE','E2E IMMEDIATE')]. - ROUTINE event: held_count==1 -> after flush 0 -> reached channel.deliver(): delivered+=[('PROBE_RULE','E2E ROUTINE')]. - Natural Summit-Creek-shaped nifc wildfire_incident (routine, no matching dispatch rule): held 1 -> after flush -> landed in the digest accumulator (1 event). End-to-end channel.deliver evidence = the RecChannel.deliver() calls above. - Live container: 8 adapters, healthy, "Grouper flush task started (every 5s)", the enabled_toggles warning fired, and NO dispatcher AttributeError/traceback. Follow-up (non-blocking): several Phase 2.7-2.14 categories (e.g. wildfire_incident, earthquake_event) aren't in the category->toggle map, so they fall to toggle 'other'. Harmless while enabled_toggles is empty (pass-all), but should be mapped before toggle gating is turned on. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/config.py | 23 +++++---- meshai/notifications/pipeline/__init__.py | 44 ++++++++++++++++ meshai/notifications/pipeline/grouper.py | 6 +++ tests/test_config_loader.py | 63 +++++++++++++++++++++++ tests/test_pipeline_grouper.py | 61 ++++++++++++++++++++++ tests/test_pipeline_inhibitor_grouper.py | 20 +++---- 6 files changed, 198 insertions(+), 19 deletions(-) create mode 100644 tests/test_config_loader.py create mode 100644 tests/test_pipeline_grouper.py diff --git a/meshai/config.py b/meshai/config.py index 8c0cdca..96bd010 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -654,8 +654,21 @@ def _dict_to_dataclass(cls, data: dict): field_type = field_types[key] + # Notifications needs special rules/channels coercion -- must run + # BEFORE the generic nested-dataclass handler, which would otherwise + # shadow it and leave rules as raw dicts (Phase 2.16.1 fix). + if key == "notifications" and isinstance(value, dict): + notifications = _dict_to_dataclass(NotificationsConfig, value) + if "rules" in value and isinstance(value["rules"], list): + notifications.rules = [ + _dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r + for r in value["rules"] + ] + if "channels" in value and isinstance(value["channels"], list) and value["channels"]: + _migrate_legacy_channels(notifications, value) + kwargs[key] = notifications # Handle nested dataclasses - if hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict): + elif hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(field_type, value) # Handle list of MeshSourceConfig elif key == "mesh_sources" and isinstance(value, list): @@ -701,14 +714,6 @@ def _dict_to_dataclass(cls, data: dict): kwargs[key] = _dict_to_dataclass(TogglesConfig, value) elif key == "digest" and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(DigestConfig, value) - elif key == "notifications" and isinstance(value, dict): - notifications = _dict_to_dataclass(NotificationsConfig, value) - if "rules" in value and isinstance(value["rules"], list): - notifications.rules = [_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r for r in value["rules"]] - # Migrate old channels+rules format if present - if "channels" in value and isinstance(value["channels"], list) and value["channels"]: - _migrate_legacy_channels(notifications, value) - kwargs[key] = notifications else: kwargs[key] = value diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 05d96ef..b1a3e57 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -22,6 +22,7 @@ Usage: """ import asyncio +import logging from meshai.notifications.channels import create_channel from meshai.notifications.pipeline.bus import EventBus, get_bus @@ -37,6 +38,9 @@ from meshai.notifications.pipeline.digest import DigestAccumulator, Digest from meshai.notifications.pipeline.scheduler import DigestScheduler +_logger = logging.getLogger("meshai.pipeline") + + def build_pipeline(config, llm_backend, connector=None) -> EventBus: """Build the pipeline and return the EventBus. @@ -83,6 +87,12 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus: if enabled_list: enabled_toggles = set(enabled_list) + if not enabled_toggles: + _logger.warning( + "enabled_toggles is empty -- ToggleFilter passing all events. " + "Configure toggles to enable gating." + ) + toggle_filter = ToggleFilter( next_handler=_tee, enabled_toggles=enabled_toggles, @@ -189,6 +199,30 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: ) await scheduler.start() + # Phase 2.16.1: periodically flush the grouper so coalesced (routine/ + # priority) events are delivered within the window even when poll cadence + # is sparse. Immediate events bypass the grouper and don't need this. + grouper = components["grouper"] + flush_interval = getattr(config.notifications, "grouper_flush_seconds", 5.0) or 5.0 + flush_stop = asyncio.Event() + + async def _grouper_flush_loop(): + while not flush_stop.is_set(): + try: + await asyncio.wait_for(flush_stop.wait(), timeout=flush_interval) + return + except asyncio.TimeoutError: + pass + try: + grouper.tick() + except Exception: + _logger.exception("Grouper flush tick failed") + + flush_task = asyncio.create_task(_grouper_flush_loop(), name="grouper-flush") + scheduler._grouper_flush_task = flush_task + scheduler._grouper_flush_stop = flush_stop + _logger.info(f"Grouper flush task started (every {flush_interval:.0f}s)") + # Stash scheduler for stop_pipeline bus._pipeline_scheduler = scheduler @@ -202,6 +236,16 @@ async def stop_pipeline(scheduler: DigestScheduler) -> None: scheduler: DigestScheduler returned by start_pipeline() """ if scheduler is not None: + flush_stop = getattr(scheduler, "_grouper_flush_stop", None) + flush_task = getattr(scheduler, "_grouper_flush_task", None) + if flush_stop is not None: + flush_stop.set() + if flush_task is not None: + flush_task.cancel() + try: + await flush_task + except (asyncio.CancelledError, Exception): + pass await scheduler.stop() diff --git a/meshai/notifications/pipeline/grouper.py b/meshai/notifications/pipeline/grouper.py index 231d45c..1260099 100644 --- a/meshai/notifications/pipeline/grouper.py +++ b/meshai/notifications/pipeline/grouper.py @@ -51,6 +51,12 @@ class Grouper: event with the same group_key. The held event is emitted later via tick(). """ + # Immediate-severity events bypass the coalescing window entirely + # (Phase 2.16.1): they must be delivered without buffering latency. + if event.severity == "immediate": + self._next(event) + return + if not event.group_key: self._next(event) return diff --git a/tests/test_config_loader.py b/tests/test_config_loader.py new file mode 100644 index 0000000..727247f --- /dev/null +++ b/tests/test_config_loader.py @@ -0,0 +1,63 @@ +"""Phase 2.16.1: lock in notification-rule coercion in the config loader path. + +Regression guard for the bug where the generic nested-dataclass handler in +_dict_to_dataclass shadowed the explicit 'notifications' branch, leaving +cfg.notifications.rules as raw dicts (which crashed Dispatcher._matching_rules +on rule.enabled). config_loader.load_config uses this same _dict_to_dataclass. +""" + +from meshai.config import Config, NotificationRuleConfig, _dict_to_dataclass + + +def test_multifile_load_coerces_notification_rules(): + """notifications.rules dicts are coerced to NotificationRuleConfig.""" + data = { + "notifications": { + "enabled": True, + "rules": [ + { + "name": "Test Rule", + "enabled": True, + "trigger_type": "condition", + "categories": ["earthquake_event"], + "min_severity": "routine", + "delivery_type": "mesh_broadcast", + }, + { + "name": "Second Rule", + "enabled": False, + "trigger_type": "condition", + "categories": ["wildfire_incident"], + "delivery_type": "email", + }, + ], + } + } + cfg = _dict_to_dataclass(Config, data) + rules = cfg.notifications.rules + assert len(rules) == 2 + # Coerced to the dataclass, NOT left as dicts. + assert all(isinstance(r, NotificationRuleConfig) for r in rules) + # Attribute access (what Dispatcher._matching_rules needs) works. + assert rules[0].enabled is True + assert rules[0].name == "Test Rule" + assert rules[1].enabled is False + + +def test_rules_attribute_access_does_not_raise(): + """Dispatcher-style attribute access on every rule succeeds.""" + data = { + "notifications": { + "rules": [ + {"name": "R", "enabled": True, "trigger_type": "condition", + "categories": ["earthquake_event"], "min_severity": "immediate"}, + ] + } + } + cfg = _dict_to_dataclass(Config, data) + for r in cfg.notifications.rules: + # These are the accesses Dispatcher._matching_rules performs. + _ = r.enabled + _ = r.trigger_type + _ = r.categories + _ = r.min_severity diff --git a/tests/test_pipeline_grouper.py b/tests/test_pipeline_grouper.py new file mode 100644 index 0000000..e50e429 --- /dev/null +++ b/tests/test_pipeline_grouper.py @@ -0,0 +1,61 @@ +"""Phase 2.16.1 grouper tests: immediate bypass + periodic flush of routine.""" + +from meshai.notifications.pipeline.grouper import Grouper +from meshai.notifications.events import make_event + + +class Recorder: + def __init__(self): + self.received = [] + + def handle(self, event): + self.received.append(event) + + +def _ev(severity, group_key="gk1"): + return make_event( + source="usgs_quake", + category="earthquake_event", + severity=severity, + title=f"test {severity}", + lat=42.6, + lon=-114.5, + group_key=group_key, + inhibit_keys=[group_key], + ) + + +def test_immediate_severity_bypasses_grouper(): + """An immediate event with a group_key is delivered at once, not buffered.""" + rec = Recorder() + g = Grouper(next_handler=rec.handle, window_seconds=60.0) + g.handle(_ev("immediate")) + # Delivered immediately, nothing held. + assert len(rec.received) == 1 + assert rec.received[0].severity == "immediate" + assert g.held_count() == 0 + + +def test_periodic_flush_drains_routine(): + """A routine event is held, then released by tick() once its window passes.""" + rec = Recorder() + g = Grouper(next_handler=rec.handle, window_seconds=0.0) # 0s window -> tick drains now + g.handle(_ev("routine")) + # Held on arrival, not yet delivered. + assert g.held_count() == 1 + assert rec.received == [] + # The periodic flush task calls tick(); simulate one tick. + drained = g.tick() + assert drained == 1 + assert len(rec.received) == 1 + assert rec.received[0].severity == "routine" + assert g.held_count() == 0 + + +def test_priority_is_also_coalesced_not_bypassed(): + """Priority events still buffer (only immediate bypasses).""" + rec = Recorder() + g = Grouper(next_handler=rec.handle, window_seconds=60.0) + g.handle(_ev("priority")) + assert rec.received == [] + assert g.held_count() == 1 diff --git a/tests/test_pipeline_inhibitor_grouper.py b/tests/test_pipeline_inhibitor_grouper.py index dbb7a34..7121576 100644 --- a/tests/test_pipeline_inhibitor_grouper.py +++ b/tests/test_pipeline_inhibitor_grouper.py @@ -108,7 +108,7 @@ class TestGrouper: def test_event_with_group_key_is_held_not_emitted(self): next_handler = Mock() grouper = Grouper(next_handler) - event = make_event("e1", "immediate", group_key="fire:42") + event = make_event("e1", "priority", group_key="fire:42") grouper.handle(event) next_handler.assert_not_called() assert grouper.held_count() == 1 @@ -116,8 +116,8 @@ class TestGrouper: def test_second_same_group_key_replaces_first(self): next_handler = Mock() grouper = Grouper(next_handler) - ev1 = make_event("e1", "immediate", group_key="fire:42") - ev2 = make_event("e2", "immediate", group_key="fire:42") + ev1 = make_event("e1", "priority", group_key="fire:42") + ev2 = make_event("e2", "priority", group_key="fire:42") grouper.handle(ev1) grouper.handle(ev2) next_handler.assert_not_called() @@ -134,7 +134,7 @@ class TestGrouper: grouper._now = lambda: current_time[0] current_time[0] = 0.0 - event = make_event("e1", "immediate", group_key="g") + event = make_event("e1", "priority", group_key="g") grouper.handle(event) assert grouper.held_count() == 1 @@ -151,9 +151,9 @@ class TestGrouper: def test_flush_all_emits_everything_immediately(self): next_handler = Mock() grouper = Grouper(next_handler) - ev1 = make_event("e1", "immediate", group_key="g1") - ev2 = make_event("e2", "immediate", group_key="g2") - ev3 = make_event("e3", "immediate", group_key="g3") + ev1 = make_event("e1", "priority", group_key="g1") + ev2 = make_event("e2", "priority", group_key="g2") + ev3 = make_event("e3", "priority", group_key="g3") grouper.handle(ev1) grouper.handle(ev2) grouper.handle(ev3) @@ -172,8 +172,8 @@ class TestInhibitorGrouperChain: grouper = Grouper(next_handler=terminal) inhibitor = Inhibitor(next_handler=grouper.handle) - # Send immediate event with group_key and inhibit_keys - ev1 = make_event("e1", "immediate", group_key="g1", inhibit_keys=["k1"]) + # Send priority event with group_key and inhibit_keys (immediate would bypass) + ev1 = make_event("e1", "priority", group_key="g1", inhibit_keys=["k1"]) inhibitor.handle(ev1) # After inhibitor: passed (no prior key) # After grouper: held (group_key present) @@ -191,4 +191,4 @@ class TestInhibitorGrouperChain: grouper.flush_all() terminal.assert_called_once() emitted = terminal.call_args[0][0] - assert emitted.id == "e1" # the immediate, not suppressed routine + assert emitted.id == "e1" # the priority event, not the suppressed routine