feat(notifications): Phase 2.2 inhibitor and grouper

Adds inline pipeline stages between the bus and the severity router:

- Inhibitor: suppresses lower-or-equal severity events when a key
  in event.inhibit_keys is already active. TTL configurable, default
  30 minutes.
- Grouper: coalesces events sharing group_key within a time window
  (default 60s). Most recent event wins. tick() and flush_all()
  drive emission; no background timers in Phase 2.2.
- build_pipeline now wires: bus -> inhibitor -> grouper -> severity_router

Phase 2.1 dispatcher tests continue to pass unchanged.
This commit is contained in:
K7ZVX 2026-05-14 18:53:03 +00:00
commit e67e2cd6a0
4 changed files with 403 additions and 12 deletions

View file

@ -1,10 +1,12 @@
"""Notification pipeline package.
Phase 2.1 skeleton:
- EventBus: pub/sub for adapter ingress
- SeverityRouter: forks immediate vs digest paths
- Dispatcher: routes immediate events to channels via existing rules
- StubDigestQueue: placeholder for Phase 2.3 aggregator
Phase 2.1 + 2.2:
- EventBus: pub/sub ingress
- Inhibitor: suppresses redundant events by inhibit_keys
- Grouper: coalesces events sharing group_key within a window
- SeverityRouter: forks immediate vs digest
- Dispatcher: routes immediate via channels (existing rules schema)
- StubDigestQueue: placeholder for Phase 2.3
Usage:
from meshai.notifications.pipeline import build_pipeline
@ -19,14 +21,15 @@ from meshai.notifications.pipeline.severity_router import (
StubDigestQueue,
)
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
def build_pipeline(config) -> EventBus:
"""Build the pipeline and return the EventBus.
Adapters emit events to this bus and they flow through the
severity router to either the dispatcher (immediate) or the
digest stub (priority/routine).
Wiring:
bus -> inhibitor -> grouper -> severity_router -> (dispatcher | digest_stub)
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
@ -35,14 +38,16 @@ def build_pipeline(config) -> EventBus:
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(severity_router.handle)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus
def build_pipeline_components(config) -> tuple:
"""Like build_pipeline, but returns all components for test inspection.
Returns (bus, dispatcher, digest, severity_router).
Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
@ -51,8 +56,10 @@ def build_pipeline_components(config) -> tuple:
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(severity_router.handle)
return bus, dispatcher, digest, severity_router
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus, inhibitor, grouper, severity_router, dispatcher, digest
__all__ = [
@ -60,6 +67,8 @@ __all__ = [
"SeverityRouter",
"StubDigestQueue",
"Dispatcher",
"Inhibitor",
"Grouper",
"build_pipeline",
"build_pipeline_components",
"get_bus",

View file

@ -0,0 +1,96 @@
"""Event grouper.
Coalesces events sharing a group_key inside a time window. The most
recent event for a group_key wins; older versions are replaced.
Events without a group_key pass through immediately.
The grouper holds events in a window. tick() flushes events whose
window has expired. For Phase 2.2, tick() is called explicitly by
tests; Phase 2.3+ will integrate with the digest scheduler for live
operation.
"""
import logging
import time
from typing import Callable
from meshai.notifications.events import Event
class Grouper:
"""Coalesce same-group_key events inside a window."""
def __init__(
self,
next_handler: Callable[[Event], None],
window_seconds: float = 60.0,
):
"""Initialize.
Args:
next_handler: Callable that receives events when they
exit the grouper (either immediately if no group_key,
or after the window expires).
window_seconds: How long to hold a group_key before
emitting downstream (default 60 seconds).
"""
self._next = next_handler
self._window = window_seconds
# {group_key: (event, hold_until_ts)}
self._held: dict[str, tuple[Event, float]] = {}
self._logger = logging.getLogger("meshai.pipeline.grouper")
def _now(self) -> float:
return time.time()
def handle(self, event: Event) -> None:
"""Process an event.
Events without group_key pass through immediately.
Events with group_key are held, replacing any prior held
event with the same group_key. The held event is emitted
later via tick().
"""
if not event.group_key:
self._next(event)
return
now = self._now()
hold_until = now + self._window
prior = self._held.get(event.group_key)
if prior is not None:
self._logger.info(
f"COALESCED event {event.id} into group {event.group_key!r}, "
f"replacing prior event {prior[0].id}"
)
self._held[event.group_key] = (event, hold_until)
def tick(self) -> int:
"""Flush events whose window has expired.
Returns the number of events emitted.
"""
now = self._now()
to_emit = [
(gk, ev) for gk, (ev, hu) in self._held.items() if hu <= now
]
for gk, _ in to_emit:
del self._held[gk]
for _, ev in to_emit:
self._next(ev)
return len(to_emit)
def flush_all(self) -> int:
"""Immediately emit every held event, regardless of window.
Used at shutdown and by tests. Returns count emitted.
"""
events = [ev for ev, _ in self._held.values()]
self._held.clear()
for ev in events:
self._next(ev)
return len(events)
def held_count(self) -> int:
"""For tests: number of events currently held."""
return len(self._held)

View file

@ -0,0 +1,92 @@
"""Severity-based event inhibitor.
Suppresses lower-severity events when a higher-severity event for the
same logical incident (matching inhibit_keys) is already active.
Inhibit keys are operator-defined strings on the Event that identify
the underlying incident. Two events sharing an inhibit_key refer to
the same situation. If a critical event for "battery:BLD-MTN" fired
recently, a subsequent warning event with the same key gets suppressed.
"""
import logging
import time
from typing import Callable
from meshai.notifications.events import Event
class Inhibitor:
"""Suppress lower-severity events when higher-severity is active."""
SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2}
def __init__(
self,
next_handler: Callable[[Event], None],
ttl_seconds: float = 1800.0,
):
"""Initialize.
Args:
next_handler: Callable that receives non-suppressed events.
ttl_seconds: How long an inhibit_key remains active after
the originating event (default 30 minutes).
"""
self._next = next_handler
self._ttl = ttl_seconds
# {inhibit_key: (rank, expires_at)}
self._active: dict[str, tuple[int, float]] = {}
self._logger = logging.getLogger("meshai.pipeline.inhibitor")
def _now(self) -> float:
# Hookable for tests
return time.time()
def _prune_expired(self, now: float) -> None:
expired = [k for k, (_, exp) in self._active.items() if exp <= now]
for k in expired:
del self._active[k]
def handle(self, event: Event) -> None:
"""Process an event: either suppress it or pass it on.
If any of the event's inhibit_keys is currently active at a
higher-or-equal rank, the event is suppressed. Otherwise, the
event's inhibit_keys are recorded/upgraded, and the event is
passed to the next handler.
"""
now = self._now()
self._prune_expired(now)
event_rank = self.SEVERITY_RANK.get(event.severity, 0)
# Check suppression
for key in event.inhibit_keys:
entry = self._active.get(key)
if entry is not None:
active_rank, _ = entry
if active_rank >= event_rank:
self._logger.info(
f"SUPPRESSED event {event.id} ({event.severity}) "
f"by active key {key!r} at rank {active_rank}"
)
return
# Record / upgrade entries
new_expires = now + self._ttl
for key in event.inhibit_keys:
existing = self._active.get(key)
if existing is None or existing[0] < event_rank:
self._active[key] = (event_rank, new_expires)
# Pass through
self._next(event)
def active_keys(self) -> dict[str, tuple[int, float]]:
"""For tests: snapshot of currently-active inhibit keys."""
return dict(self._active)
def clear(self) -> None:
"""For tests: reset state."""
self._active.clear()

View file

@ -0,0 +1,194 @@
"""Tests for Phase 2.2 inhibitor and grouper."""
import pytest
from unittest.mock import Mock
from meshai.notifications.events import Event
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
def make_event(id, severity, inhibit_keys=None, group_key=None):
return Event(
id=id,
source="test",
category="test_cat",
severity=severity,
title=f"Event {id}",
inhibit_keys=inhibit_keys or [],
group_key=group_key,
)
# ===================== INHIBITOR TESTS =====================
class TestInhibitor:
def test_event_without_inhibit_keys_passes_through(self):
next_handler = Mock()
inhibitor = Inhibitor(next_handler)
event = make_event("e1", "immediate", inhibit_keys=[])
inhibitor.handle(event)
next_handler.assert_called_once_with(event)
def test_lower_severity_after_higher_is_suppressed(self):
next_handler = Mock()
inhibitor = Inhibitor(next_handler)
ev1 = make_event("e1", "immediate", inhibit_keys=["battery:NODE1"])
ev2 = make_event("e2", "priority", inhibit_keys=["battery:NODE1"])
inhibitor.handle(ev1)
inhibitor.handle(ev2)
assert next_handler.call_count == 1
next_handler.assert_called_once_with(ev1)
def test_equal_severity_is_suppressed(self):
next_handler = Mock()
inhibitor = Inhibitor(next_handler)
ev1 = make_event("e1", "priority", inhibit_keys=["key1"])
ev2 = make_event("e2", "priority", inhibit_keys=["key1"])
inhibitor.handle(ev1)
inhibitor.handle(ev2)
assert next_handler.call_count == 1
def test_higher_severity_after_lower_passes_and_upgrades(self):
next_handler = Mock()
inhibitor = Inhibitor(next_handler)
ev1 = make_event("e1", "priority", inhibit_keys=["key1"])
ev2 = make_event("e2", "immediate", inhibit_keys=["key1"])
inhibitor.handle(ev1)
inhibitor.handle(ev2)
assert next_handler.call_count == 2
keys = inhibitor.active_keys()
assert keys["key1"][0] == 2 # immediate rank
def test_inhibit_key_expires_after_ttl(self):
next_handler = Mock()
inhibitor = Inhibitor(next_handler, ttl_seconds=10)
current_time = [0.0]
inhibitor._now = lambda: current_time[0]
ev1 = make_event("e1", "immediate", inhibit_keys=["key1"])
current_time[0] = 0.0
inhibitor.handle(ev1)
current_time[0] = 15.0
ev2 = make_event("e2", "routine", inhibit_keys=["key1"])
inhibitor.handle(ev2)
assert next_handler.call_count == 2
def test_multiple_keys_any_active_suppresses(self):
next_handler = Mock()
inhibitor = Inhibitor(next_handler)
ev1 = make_event("e1", "immediate", inhibit_keys=["a", "b"])
inhibitor.handle(ev1)
assert next_handler.call_count == 1
ev2 = make_event("e2", "routine", inhibit_keys=["b", "c"])
inhibitor.handle(ev2)
assert next_handler.call_count == 1 # suppressed by "b"
ev3 = make_event("e3", "routine", inhibit_keys=["c", "d"])
inhibitor.handle(ev3)
assert next_handler.call_count == 2 # passes, no active key
# ===================== GROUPER TESTS =====================
class TestGrouper:
def test_event_without_group_key_emits_immediately(self):
next_handler = Mock()
grouper = Grouper(next_handler)
event = make_event("e1", "immediate", group_key=None)
grouper.handle(event)
next_handler.assert_called_once_with(event)
def test_event_with_group_key_is_held_not_emitted(self):
next_handler = Mock()
grouper = Grouper(next_handler)
event = make_event("e1", "immediate", group_key="fire:42")
grouper.handle(event)
next_handler.assert_not_called()
assert grouper.held_count() == 1
def test_second_same_group_key_replaces_first(self):
next_handler = Mock()
grouper = Grouper(next_handler)
ev1 = make_event("e1", "immediate", group_key="fire:42")
ev2 = make_event("e2", "immediate", group_key="fire:42")
grouper.handle(ev1)
grouper.handle(ev2)
next_handler.assert_not_called()
assert grouper.held_count() == 1
grouper.flush_all()
assert next_handler.call_count == 1
emitted_event = next_handler.call_args[0][0]
assert emitted_event.id == "e2"
def test_tick_emits_when_window_expired(self):
next_handler = Mock()
grouper = Grouper(next_handler, window_seconds=5)
current_time = [0.0]
grouper._now = lambda: current_time[0]
current_time[0] = 0.0
event = make_event("e1", "immediate", group_key="g")
grouper.handle(event)
assert grouper.held_count() == 1
current_time[0] = 3.0
grouper.tick()
next_handler.assert_not_called()
assert grouper.held_count() == 1
current_time[0] = 10.0
grouper.tick()
next_handler.assert_called_once()
assert grouper.held_count() == 0
def test_flush_all_emits_everything_immediately(self):
next_handler = Mock()
grouper = Grouper(next_handler)
ev1 = make_event("e1", "immediate", group_key="g1")
ev2 = make_event("e2", "immediate", group_key="g2")
ev3 = make_event("e3", "immediate", group_key="g3")
grouper.handle(ev1)
grouper.handle(ev2)
grouper.handle(ev3)
assert grouper.held_count() == 3
grouper.flush_all()
assert next_handler.call_count == 3
assert grouper.held_count() == 0
# ===================== INTEGRATION TEST =====================
class TestInhibitorGrouperChain:
def test_inhibitor_then_grouper_chain(self):
terminal = Mock()
grouper = Grouper(next_handler=terminal)
inhibitor = Inhibitor(next_handler=grouper.handle)
# Send immediate event with group_key and inhibit_keys
ev1 = make_event("e1", "immediate", group_key="g1", inhibit_keys=["k1"])
inhibitor.handle(ev1)
# After inhibitor: passed (no prior key)
# After grouper: held (group_key present)
terminal.assert_not_called()
assert grouper.held_count() == 1
# Send routine event with same group_key and inhibit_keys
ev2 = make_event("e2", "routine", group_key="g1", inhibit_keys=["k1"])
inhibitor.handle(ev2)
# After inhibitor: SUPPRESSED (k1 active at higher rank)
terminal.assert_not_called()
assert grouper.held_count() == 1 # still 1, not 2
# Flush grouper
grouper.flush_all()
terminal.assert_called_once()
emitted = terminal.call_args[0][0]
assert emitted.id == "e1" # the immediate, not suppressed routine