fix(notifications): Phase 2.16.1 unblock pipeline -- grouper flush + rules coercion + toggle warning

Phase 2.16 found the live notification pipeline never delivered any
environmental event. Two independent blocking bugs, both fixed here.

BUG A -- grouper held events forever (nothing drove tick()).
Every adapter event sets a group_key, so all were buffered in the Grouper
and never flushed (start_pipeline only started the DigestScheduler; no
tick driver existed). Fixes (per Matt's decisions):
  - Grouper.handle(): immediate-severity events now BYPASS the window
    entirely (delivered straight to next_handler), no buffering latency.
    routine/priority still coalesce.
  - start_pipeline(): schedules an asyncio flush task that calls
    grouper.tick() every `grouper_flush_seconds` (default 5s) so
    coalesced events drain within the window even when poll cadence is
    sparse. stop_pipeline() signals + cancels it.
  before/after (grouper held_count): an immediate+group_key event used to
  sit held (count 1) forever; now held_count==0 on arrival (bypassed). A
  routine event is held (count 1) then drained to 0 by tick()/flush.

BUG B -- notification rules loaded as dicts, crashing the dispatcher.
Root cause (more precise than 2.16's guess): the rules coercion is NOT
missing from the multi-file loader -- it lives in _dict_to_dataclass's
explicit `elif key == "notifications"` branch, but that branch was DEAD
CODE, shadowed by the generic `if hasattr(field_type,
"__dataclass_fields__")` handler that runs first for every dataclass
field (including notifications). So Config.notifications.rules stayed a
list of dicts on ALL load paths, and Dispatcher._matching_rules threw
`AttributeError: 'dict' object has no attribute 'enabled'`. Fix: hoist
the notifications special-handling ahead of the generic handler (and drop
the now-truly-dead duplicate elif).
  before/after (cfg.notifications.rules[0] type): dict -> NotificationRuleConfig.

OBS C -- empty enabled_toggles. Left as 'pass all' for v0.3 (per Matt);
added a startup WARNING in build_pipeline so operators see gating is off:
"enabled_toggles is empty -- ToggleFilter passing all events. Configure
toggles to enable gating." (confirmed firing live).

Tests:
  - tests/test_pipeline_grouper.py (new): test_immediate_severity_bypasses_grouper,
    test_periodic_flush_drains_routine, test_priority_is_also_coalesced_not_bypassed.
  - tests/test_config_loader.py (new): test_multifile_load_coerces_notification_rules,
    test_rules_attribute_access_does_not_raise (regression guards for Bug B).
  - tests/test_pipeline_inhibitor_grouper.py (updated): 5 existing grouper
    hold/coalesce/flush tests primed the grouper with immediate+group_key
    events expecting them to be held; switched those to 'priority' (still
    buffered; still outranks the routine event in the inhibitor-chain test)
    to match the intended immediate-bypass behavior.
  Full suite: 253 passed (was 248 + 5 new; 5 existing updated, none lost).

VERIFICATION (rebuilt prod, traced end-to-end via in-process build_pipeline
probe with a recording channel + live config):
  - rules[0] type: NotificationRuleConfig (Bug B fixed).
  - IMMEDIATE event: held_count==0 on emit (bypassed) -> reached
    channel.deliver(): delivered=[('PROBE_RULE','E2E IMMEDIATE')].
  - ROUTINE event: held_count==1 -> after flush 0 -> reached
    channel.deliver(): delivered+=[('PROBE_RULE','E2E ROUTINE')].
  - Natural Summit-Creek-shaped nifc wildfire_incident (routine, no
    matching dispatch rule): held 1 -> after flush -> landed in the digest
    accumulator (1 event). End-to-end channel.deliver evidence = the
    RecChannel.deliver() calls above.
  - Live container: 8 adapters, healthy, "Grouper flush task started
    (every 5s)", the enabled_toggles warning fired, and NO dispatcher
    AttributeError/traceback.

Follow-up (non-blocking): several Phase 2.7-2.14 categories (e.g.
wildfire_incident, earthquake_event) aren't in the category->toggle map,
so they fall to toggle 'other'. Harmless while enabled_toggles is empty
(pass-all), but should be mapped before toggle gating is turned on.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-28 00:36:13 +00:00
commit 20e0dec28a
6 changed files with 198 additions and 19 deletions

View file

@ -654,8 +654,21 @@ def _dict_to_dataclass(cls, data: dict):
field_type = field_types[key]
# Notifications needs special rules/channels coercion -- must run
# BEFORE the generic nested-dataclass handler, which would otherwise
# shadow it and leave rules as raw dicts (Phase 2.16.1 fix).
if key == "notifications" and isinstance(value, dict):
notifications = _dict_to_dataclass(NotificationsConfig, value)
if "rules" in value and isinstance(value["rules"], list):
notifications.rules = [
_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r
for r in value["rules"]
]
if "channels" in value and isinstance(value["channels"], list) and value["channels"]:
_migrate_legacy_channels(notifications, value)
kwargs[key] = notifications
# Handle nested dataclasses
if hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict):
elif hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(field_type, value)
# Handle list of MeshSourceConfig
elif key == "mesh_sources" and isinstance(value, list):
@ -701,14 +714,6 @@ def _dict_to_dataclass(cls, data: dict):
kwargs[key] = _dict_to_dataclass(TogglesConfig, value)
elif key == "digest" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(DigestConfig, value)
elif key == "notifications" and isinstance(value, dict):
notifications = _dict_to_dataclass(NotificationsConfig, value)
if "rules" in value and isinstance(value["rules"], list):
notifications.rules = [_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r for r in value["rules"]]
# Migrate old channels+rules format if present
if "channels" in value and isinstance(value["channels"], list) and value["channels"]:
_migrate_legacy_channels(notifications, value)
kwargs[key] = notifications
else:
kwargs[key] = value

View file

@ -22,6 +22,7 @@ Usage:
"""
import asyncio
import logging
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
@ -37,6 +38,9 @@ from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
from meshai.notifications.pipeline.scheduler import DigestScheduler
_logger = logging.getLogger("meshai.pipeline")
def build_pipeline(config, llm_backend, connector=None) -> EventBus:
"""Build the pipeline and return the EventBus.
@ -83,6 +87,12 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus:
if enabled_list:
enabled_toggles = set(enabled_list)
if not enabled_toggles:
_logger.warning(
"enabled_toggles is empty -- ToggleFilter passing all events. "
"Configure toggles to enable gating."
)
toggle_filter = ToggleFilter(
next_handler=_tee,
enabled_toggles=enabled_toggles,
@ -189,6 +199,30 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
)
await scheduler.start()
# Phase 2.16.1: periodically flush the grouper so coalesced (routine/
# priority) events are delivered within the window even when poll cadence
# is sparse. Immediate events bypass the grouper and don't need this.
grouper = components["grouper"]
flush_interval = getattr(config.notifications, "grouper_flush_seconds", 5.0) or 5.0
flush_stop = asyncio.Event()
async def _grouper_flush_loop():
while not flush_stop.is_set():
try:
await asyncio.wait_for(flush_stop.wait(), timeout=flush_interval)
return
except asyncio.TimeoutError:
pass
try:
grouper.tick()
except Exception:
_logger.exception("Grouper flush tick failed")
flush_task = asyncio.create_task(_grouper_flush_loop(), name="grouper-flush")
scheduler._grouper_flush_task = flush_task
scheduler._grouper_flush_stop = flush_stop
_logger.info(f"Grouper flush task started (every {flush_interval:.0f}s)")
# Stash scheduler for stop_pipeline
bus._pipeline_scheduler = scheduler
@ -202,6 +236,16 @@ async def stop_pipeline(scheduler: DigestScheduler) -> None:
scheduler: DigestScheduler returned by start_pipeline()
"""
if scheduler is not None:
flush_stop = getattr(scheduler, "_grouper_flush_stop", None)
flush_task = getattr(scheduler, "_grouper_flush_task", None)
if flush_stop is not None:
flush_stop.set()
if flush_task is not None:
flush_task.cancel()
try:
await flush_task
except (asyncio.CancelledError, Exception):
pass
await scheduler.stop()

View file

@ -51,6 +51,12 @@ class Grouper:
event with the same group_key. The held event is emitted
later via tick().
"""
# Immediate-severity events bypass the coalescing window entirely
# (Phase 2.16.1): they must be delivered without buffering latency.
if event.severity == "immediate":
self._next(event)
return
if not event.group_key:
self._next(event)
return

View file

@ -0,0 +1,63 @@
"""Phase 2.16.1: lock in notification-rule coercion in the config loader path.
Regression guard for the bug where the generic nested-dataclass handler in
_dict_to_dataclass shadowed the explicit 'notifications' branch, leaving
cfg.notifications.rules as raw dicts (which crashed Dispatcher._matching_rules
on rule.enabled). config_loader.load_config uses this same _dict_to_dataclass.
"""
from meshai.config import Config, NotificationRuleConfig, _dict_to_dataclass
def test_multifile_load_coerces_notification_rules():
"""notifications.rules dicts are coerced to NotificationRuleConfig."""
data = {
"notifications": {
"enabled": True,
"rules": [
{
"name": "Test Rule",
"enabled": True,
"trigger_type": "condition",
"categories": ["earthquake_event"],
"min_severity": "routine",
"delivery_type": "mesh_broadcast",
},
{
"name": "Second Rule",
"enabled": False,
"trigger_type": "condition",
"categories": ["wildfire_incident"],
"delivery_type": "email",
},
],
}
}
cfg = _dict_to_dataclass(Config, data)
rules = cfg.notifications.rules
assert len(rules) == 2
# Coerced to the dataclass, NOT left as dicts.
assert all(isinstance(r, NotificationRuleConfig) for r in rules)
# Attribute access (what Dispatcher._matching_rules needs) works.
assert rules[0].enabled is True
assert rules[0].name == "Test Rule"
assert rules[1].enabled is False
def test_rules_attribute_access_does_not_raise():
"""Dispatcher-style attribute access on every rule succeeds."""
data = {
"notifications": {
"rules": [
{"name": "R", "enabled": True, "trigger_type": "condition",
"categories": ["earthquake_event"], "min_severity": "immediate"},
]
}
}
cfg = _dict_to_dataclass(Config, data)
for r in cfg.notifications.rules:
# These are the accesses Dispatcher._matching_rules performs.
_ = r.enabled
_ = r.trigger_type
_ = r.categories
_ = r.min_severity

View file

@ -0,0 +1,61 @@
"""Phase 2.16.1 grouper tests: immediate bypass + periodic flush of routine."""
from meshai.notifications.pipeline.grouper import Grouper
from meshai.notifications.events import make_event
class Recorder:
def __init__(self):
self.received = []
def handle(self, event):
self.received.append(event)
def _ev(severity, group_key="gk1"):
return make_event(
source="usgs_quake",
category="earthquake_event",
severity=severity,
title=f"test {severity}",
lat=42.6,
lon=-114.5,
group_key=group_key,
inhibit_keys=[group_key],
)
def test_immediate_severity_bypasses_grouper():
"""An immediate event with a group_key is delivered at once, not buffered."""
rec = Recorder()
g = Grouper(next_handler=rec.handle, window_seconds=60.0)
g.handle(_ev("immediate"))
# Delivered immediately, nothing held.
assert len(rec.received) == 1
assert rec.received[0].severity == "immediate"
assert g.held_count() == 0
def test_periodic_flush_drains_routine():
"""A routine event is held, then released by tick() once its window passes."""
rec = Recorder()
g = Grouper(next_handler=rec.handle, window_seconds=0.0) # 0s window -> tick drains now
g.handle(_ev("routine"))
# Held on arrival, not yet delivered.
assert g.held_count() == 1
assert rec.received == []
# The periodic flush task calls tick(); simulate one tick.
drained = g.tick()
assert drained == 1
assert len(rec.received) == 1
assert rec.received[0].severity == "routine"
assert g.held_count() == 0
def test_priority_is_also_coalesced_not_bypassed():
"""Priority events still buffer (only immediate bypasses)."""
rec = Recorder()
g = Grouper(next_handler=rec.handle, window_seconds=60.0)
g.handle(_ev("priority"))
assert rec.received == []
assert g.held_count() == 1

View file

@ -108,7 +108,7 @@ class TestGrouper:
def test_event_with_group_key_is_held_not_emitted(self):
next_handler = Mock()
grouper = Grouper(next_handler)
event = make_event("e1", "immediate", group_key="fire:42")
event = make_event("e1", "priority", group_key="fire:42")
grouper.handle(event)
next_handler.assert_not_called()
assert grouper.held_count() == 1
@ -116,8 +116,8 @@ class TestGrouper:
def test_second_same_group_key_replaces_first(self):
next_handler = Mock()
grouper = Grouper(next_handler)
ev1 = make_event("e1", "immediate", group_key="fire:42")
ev2 = make_event("e2", "immediate", group_key="fire:42")
ev1 = make_event("e1", "priority", group_key="fire:42")
ev2 = make_event("e2", "priority", group_key="fire:42")
grouper.handle(ev1)
grouper.handle(ev2)
next_handler.assert_not_called()
@ -134,7 +134,7 @@ class TestGrouper:
grouper._now = lambda: current_time[0]
current_time[0] = 0.0
event = make_event("e1", "immediate", group_key="g")
event = make_event("e1", "priority", group_key="g")
grouper.handle(event)
assert grouper.held_count() == 1
@ -151,9 +151,9 @@ class TestGrouper:
def test_flush_all_emits_everything_immediately(self):
next_handler = Mock()
grouper = Grouper(next_handler)
ev1 = make_event("e1", "immediate", group_key="g1")
ev2 = make_event("e2", "immediate", group_key="g2")
ev3 = make_event("e3", "immediate", group_key="g3")
ev1 = make_event("e1", "priority", group_key="g1")
ev2 = make_event("e2", "priority", group_key="g2")
ev3 = make_event("e3", "priority", group_key="g3")
grouper.handle(ev1)
grouper.handle(ev2)
grouper.handle(ev3)
@ -172,8 +172,8 @@ class TestInhibitorGrouperChain:
grouper = Grouper(next_handler=terminal)
inhibitor = Inhibitor(next_handler=grouper.handle)
# Send immediate event with group_key and inhibit_keys
ev1 = make_event("e1", "immediate", group_key="g1", inhibit_keys=["k1"])
# Send priority event with group_key and inhibit_keys (immediate would bypass)
ev1 = make_event("e1", "priority", group_key="g1", inhibit_keys=["k1"])
inhibitor.handle(ev1)
# After inhibitor: passed (no prior key)
# After grouper: held (group_key present)
@ -191,4 +191,4 @@ class TestInhibitorGrouperChain:
grouper.flush_all()
terminal.assert_called_once()
emitted = terminal.call_args[0][0]
assert emitted.id == "e1" # the immediate, not suppressed routine
assert emitted.id == "e1" # the priority event, not the suppressed routine