diff --git a/meshai/config.py b/meshai/config.py index 8c0cdca..96bd010 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -654,8 +654,21 @@ def _dict_to_dataclass(cls, data: dict): field_type = field_types[key] + # Notifications needs special rules/channels coercion -- must run + # BEFORE the generic nested-dataclass handler, which would otherwise + # shadow it and leave rules as raw dicts (Phase 2.16.1 fix). + if key == "notifications" and isinstance(value, dict): + notifications = _dict_to_dataclass(NotificationsConfig, value) + if "rules" in value and isinstance(value["rules"], list): + notifications.rules = [ + _dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r + for r in value["rules"] + ] + if "channels" in value and isinstance(value["channels"], list) and value["channels"]: + _migrate_legacy_channels(notifications, value) + kwargs[key] = notifications # Handle nested dataclasses - if hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict): + elif hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(field_type, value) # Handle list of MeshSourceConfig elif key == "mesh_sources" and isinstance(value, list): @@ -701,14 +714,6 @@ def _dict_to_dataclass(cls, data: dict): kwargs[key] = _dict_to_dataclass(TogglesConfig, value) elif key == "digest" and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(DigestConfig, value) - elif key == "notifications" and isinstance(value, dict): - notifications = _dict_to_dataclass(NotificationsConfig, value) - if "rules" in value and isinstance(value["rules"], list): - notifications.rules = [_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r for r in value["rules"]] - # Migrate old channels+rules format if present - if "channels" in value and isinstance(value["channels"], list) and value["channels"]: - _migrate_legacy_channels(notifications, value) - kwargs[key] = notifications else: kwargs[key] = value diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 05d96ef..b1a3e57 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -22,6 +22,7 @@ Usage: """ import asyncio +import logging from meshai.notifications.channels import create_channel from meshai.notifications.pipeline.bus import EventBus, get_bus @@ -37,6 +38,9 @@ from meshai.notifications.pipeline.digest import DigestAccumulator, Digest from meshai.notifications.pipeline.scheduler import DigestScheduler +_logger = logging.getLogger("meshai.pipeline") + + def build_pipeline(config, llm_backend, connector=None) -> EventBus: """Build the pipeline and return the EventBus. @@ -83,6 +87,12 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus: if enabled_list: enabled_toggles = set(enabled_list) + if not enabled_toggles: + _logger.warning( + "enabled_toggles is empty -- ToggleFilter passing all events. " + "Configure toggles to enable gating." + ) + toggle_filter = ToggleFilter( next_handler=_tee, enabled_toggles=enabled_toggles, @@ -189,6 +199,30 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: ) await scheduler.start() + # Phase 2.16.1: periodically flush the grouper so coalesced (routine/ + # priority) events are delivered within the window even when poll cadence + # is sparse. Immediate events bypass the grouper and don't need this. + grouper = components["grouper"] + flush_interval = getattr(config.notifications, "grouper_flush_seconds", 5.0) or 5.0 + flush_stop = asyncio.Event() + + async def _grouper_flush_loop(): + while not flush_stop.is_set(): + try: + await asyncio.wait_for(flush_stop.wait(), timeout=flush_interval) + return + except asyncio.TimeoutError: + pass + try: + grouper.tick() + except Exception: + _logger.exception("Grouper flush tick failed") + + flush_task = asyncio.create_task(_grouper_flush_loop(), name="grouper-flush") + scheduler._grouper_flush_task = flush_task + scheduler._grouper_flush_stop = flush_stop + _logger.info(f"Grouper flush task started (every {flush_interval:.0f}s)") + # Stash scheduler for stop_pipeline bus._pipeline_scheduler = scheduler @@ -202,6 +236,16 @@ async def stop_pipeline(scheduler: DigestScheduler) -> None: scheduler: DigestScheduler returned by start_pipeline() """ if scheduler is not None: + flush_stop = getattr(scheduler, "_grouper_flush_stop", None) + flush_task = getattr(scheduler, "_grouper_flush_task", None) + if flush_stop is not None: + flush_stop.set() + if flush_task is not None: + flush_task.cancel() + try: + await flush_task + except (asyncio.CancelledError, Exception): + pass await scheduler.stop() diff --git a/meshai/notifications/pipeline/grouper.py b/meshai/notifications/pipeline/grouper.py index 231d45c..1260099 100644 --- a/meshai/notifications/pipeline/grouper.py +++ b/meshai/notifications/pipeline/grouper.py @@ -51,6 +51,12 @@ class Grouper: event with the same group_key. The held event is emitted later via tick(). """ + # Immediate-severity events bypass the coalescing window entirely + # (Phase 2.16.1): they must be delivered without buffering latency. + if event.severity == "immediate": + self._next(event) + return + if not event.group_key: self._next(event) return diff --git a/tests/test_config_loader.py b/tests/test_config_loader.py new file mode 100644 index 0000000..727247f --- /dev/null +++ b/tests/test_config_loader.py @@ -0,0 +1,63 @@ +"""Phase 2.16.1: lock in notification-rule coercion in the config loader path. + +Regression guard for the bug where the generic nested-dataclass handler in +_dict_to_dataclass shadowed the explicit 'notifications' branch, leaving +cfg.notifications.rules as raw dicts (which crashed Dispatcher._matching_rules +on rule.enabled). config_loader.load_config uses this same _dict_to_dataclass. +""" + +from meshai.config import Config, NotificationRuleConfig, _dict_to_dataclass + + +def test_multifile_load_coerces_notification_rules(): + """notifications.rules dicts are coerced to NotificationRuleConfig.""" + data = { + "notifications": { + "enabled": True, + "rules": [ + { + "name": "Test Rule", + "enabled": True, + "trigger_type": "condition", + "categories": ["earthquake_event"], + "min_severity": "routine", + "delivery_type": "mesh_broadcast", + }, + { + "name": "Second Rule", + "enabled": False, + "trigger_type": "condition", + "categories": ["wildfire_incident"], + "delivery_type": "email", + }, + ], + } + } + cfg = _dict_to_dataclass(Config, data) + rules = cfg.notifications.rules + assert len(rules) == 2 + # Coerced to the dataclass, NOT left as dicts. + assert all(isinstance(r, NotificationRuleConfig) for r in rules) + # Attribute access (what Dispatcher._matching_rules needs) works. + assert rules[0].enabled is True + assert rules[0].name == "Test Rule" + assert rules[1].enabled is False + + +def test_rules_attribute_access_does_not_raise(): + """Dispatcher-style attribute access on every rule succeeds.""" + data = { + "notifications": { + "rules": [ + {"name": "R", "enabled": True, "trigger_type": "condition", + "categories": ["earthquake_event"], "min_severity": "immediate"}, + ] + } + } + cfg = _dict_to_dataclass(Config, data) + for r in cfg.notifications.rules: + # These are the accesses Dispatcher._matching_rules performs. + _ = r.enabled + _ = r.trigger_type + _ = r.categories + _ = r.min_severity diff --git a/tests/test_pipeline_grouper.py b/tests/test_pipeline_grouper.py new file mode 100644 index 0000000..e50e429 --- /dev/null +++ b/tests/test_pipeline_grouper.py @@ -0,0 +1,61 @@ +"""Phase 2.16.1 grouper tests: immediate bypass + periodic flush of routine.""" + +from meshai.notifications.pipeline.grouper import Grouper +from meshai.notifications.events import make_event + + +class Recorder: + def __init__(self): + self.received = [] + + def handle(self, event): + self.received.append(event) + + +def _ev(severity, group_key="gk1"): + return make_event( + source="usgs_quake", + category="earthquake_event", + severity=severity, + title=f"test {severity}", + lat=42.6, + lon=-114.5, + group_key=group_key, + inhibit_keys=[group_key], + ) + + +def test_immediate_severity_bypasses_grouper(): + """An immediate event with a group_key is delivered at once, not buffered.""" + rec = Recorder() + g = Grouper(next_handler=rec.handle, window_seconds=60.0) + g.handle(_ev("immediate")) + # Delivered immediately, nothing held. + assert len(rec.received) == 1 + assert rec.received[0].severity == "immediate" + assert g.held_count() == 0 + + +def test_periodic_flush_drains_routine(): + """A routine event is held, then released by tick() once its window passes.""" + rec = Recorder() + g = Grouper(next_handler=rec.handle, window_seconds=0.0) # 0s window -> tick drains now + g.handle(_ev("routine")) + # Held on arrival, not yet delivered. + assert g.held_count() == 1 + assert rec.received == [] + # The periodic flush task calls tick(); simulate one tick. + drained = g.tick() + assert drained == 1 + assert len(rec.received) == 1 + assert rec.received[0].severity == "routine" + assert g.held_count() == 0 + + +def test_priority_is_also_coalesced_not_bypassed(): + """Priority events still buffer (only immediate bypasses).""" + rec = Recorder() + g = Grouper(next_handler=rec.handle, window_seconds=60.0) + g.handle(_ev("priority")) + assert rec.received == [] + assert g.held_count() == 1 diff --git a/tests/test_pipeline_inhibitor_grouper.py b/tests/test_pipeline_inhibitor_grouper.py index dbb7a34..7121576 100644 --- a/tests/test_pipeline_inhibitor_grouper.py +++ b/tests/test_pipeline_inhibitor_grouper.py @@ -108,7 +108,7 @@ class TestGrouper: def test_event_with_group_key_is_held_not_emitted(self): next_handler = Mock() grouper = Grouper(next_handler) - event = make_event("e1", "immediate", group_key="fire:42") + event = make_event("e1", "priority", group_key="fire:42") grouper.handle(event) next_handler.assert_not_called() assert grouper.held_count() == 1 @@ -116,8 +116,8 @@ class TestGrouper: def test_second_same_group_key_replaces_first(self): next_handler = Mock() grouper = Grouper(next_handler) - ev1 = make_event("e1", "immediate", group_key="fire:42") - ev2 = make_event("e2", "immediate", group_key="fire:42") + ev1 = make_event("e1", "priority", group_key="fire:42") + ev2 = make_event("e2", "priority", group_key="fire:42") grouper.handle(ev1) grouper.handle(ev2) next_handler.assert_not_called() @@ -134,7 +134,7 @@ class TestGrouper: grouper._now = lambda: current_time[0] current_time[0] = 0.0 - event = make_event("e1", "immediate", group_key="g") + event = make_event("e1", "priority", group_key="g") grouper.handle(event) assert grouper.held_count() == 1 @@ -151,9 +151,9 @@ class TestGrouper: def test_flush_all_emits_everything_immediately(self): next_handler = Mock() grouper = Grouper(next_handler) - ev1 = make_event("e1", "immediate", group_key="g1") - ev2 = make_event("e2", "immediate", group_key="g2") - ev3 = make_event("e3", "immediate", group_key="g3") + ev1 = make_event("e1", "priority", group_key="g1") + ev2 = make_event("e2", "priority", group_key="g2") + ev3 = make_event("e3", "priority", group_key="g3") grouper.handle(ev1) grouper.handle(ev2) grouper.handle(ev3) @@ -172,8 +172,8 @@ class TestInhibitorGrouperChain: grouper = Grouper(next_handler=terminal) inhibitor = Inhibitor(next_handler=grouper.handle) - # Send immediate event with group_key and inhibit_keys - ev1 = make_event("e1", "immediate", group_key="g1", inhibit_keys=["k1"]) + # Send priority event with group_key and inhibit_keys (immediate would bypass) + ev1 = make_event("e1", "priority", group_key="g1", inhibit_keys=["k1"]) inhibitor.handle(ev1) # After inhibitor: passed (no prior key) # After grouper: held (group_key present) @@ -191,4 +191,4 @@ class TestInhibitorGrouperChain: grouper.flush_all() terminal.assert_called_once() emitted = terminal.call_args[0][0] - assert emitted.id == "e1" # the immediate, not suppressed routine + assert emitted.id == "e1" # the priority event, not the suppressed routine