fix(notifications): align Phase 2.1 dispatcher with spec

The initial 2.1 dispatcher was a logging stub with manual backend
registration. The spec required integration with the existing
NotificationRuleConfig schema and channels.py create_channel factory.

- Dispatcher takes (config, channel_factory)
- _matching_rules iterates config.notifications.rules with severity ranking
- dispatch() builds alert dict and calls channel.deliver()
- build_pipeline(config) returns EventBus per spec
- build_pipeline_components(config) added for test introspection
This commit is contained in:
K7ZVX 2026-05-14 18:06:08 +00:00
commit 866c55a91c
2 changed files with 143 additions and 231 deletions

View file

@ -1,88 +1,66 @@
"""Notification pipeline package.
Phase 2.1 provides the bare skeleton:
- EventBus: Central pub/sub for all events
- SeverityRouter: Routes immediate vs digest events
- Dispatcher: Delivers immediate events to channels
- StubDigestQueue: Placeholder for Phase 2.3 aggregator
Usage:
from meshai.notifications.pipeline import build_pipeline
pipeline = build_pipeline(channel_config={
"mesh_health": ["discord"],
"weather": ["discord", "meshtastic"],
})
# Emit events through the bus
pipeline["bus"].emit(event)
"""
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue,
)
from meshai.notifications.pipeline.dispatcher import (
Dispatcher,
StubChannelBackend,
)
def build_pipeline(
channel_config: dict[str, list[str]] | None = None,
) -> dict:
"""Build and wire up the notification pipeline.
Creates all pipeline components and connects them:
- EventBus receives all events
- SeverityRouter subscribes to bus, routes by severity
- Dispatcher handles immediate events
- StubDigestQueue collects priority/routine events
Args:
channel_config: Mapping of toggle -> channel names for dispatch.
Example: {"mesh_health": ["discord"]}
Returns:
Dict with all pipeline components:
- bus: EventBus instance
- router: SeverityRouter instance
- dispatcher: Dispatcher instance
- digest_queue: StubDigestQueue instance
"""
# Create components
bus = EventBus()
dispatcher = Dispatcher(channel_config)
digest_queue = StubDigestQueue()
# Wire up the router
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest_queue.enqueue,
)
# Subscribe router to bus
bus.subscribe(router.handle)
return {
"bus": bus,
"router": router,
"dispatcher": dispatcher,
"digest_queue": digest_queue,
}
__all__ = [
# Core classes
"EventBus",
"SeverityRouter",
"Dispatcher",
# Stubs for testing/Phase 2.x
"StubDigestQueue",
"StubChannelBackend",
# Factory
"build_pipeline",
# Singleton accessor
"get_bus",
]
"""Notification pipeline package.
Phase 2.1 skeleton:
- EventBus: pub/sub for adapter ingress
- SeverityRouter: forks immediate vs digest paths
- Dispatcher: routes immediate events to channels via existing rules
- StubDigestQueue: placeholder for Phase 2.3 aggregator
Usage:
from meshai.notifications.pipeline import build_pipeline
bus = build_pipeline(config)
bus.emit(event)
"""
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue,
)
from meshai.notifications.pipeline.dispatcher import Dispatcher
def build_pipeline(config) -> EventBus:
"""Build the pipeline and return the EventBus.
Adapters emit events to this bus and they flow through the
severity router to either the dispatcher (immediate) or the
digest stub (priority/routine).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
digest = StubDigestQueue()
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(severity_router.handle)
return bus
def build_pipeline_components(config) -> tuple:
"""Like build_pipeline, but returns all components for test inspection.
Returns (bus, dispatcher, digest, severity_router).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
digest = StubDigestQueue()
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(severity_router.handle)
return bus, dispatcher, digest, severity_router
__all__ = [
"EventBus",
"SeverityRouter",
"StubDigestQueue",
"Dispatcher",
"build_pipeline",
"build_pipeline_components",
"get_bus",
]

View file

@ -1,143 +1,77 @@
"""Immediate event dispatcher.
The dispatcher routes immediate-severity events to configured delivery
channels based on the event's toggle category.
Phase 2.1 provides a stub that logs dispatch attempts. Phase 2.2 will
add real channel backends (Discord webhooks, Meshtastic broadcast, etc.).
Usage:
dispatcher = Dispatcher(channel_config)
dispatcher.dispatch(event) # Called by SeverityRouter for immediate events
"""
import logging
from typing import Callable, Optional
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
class Dispatcher:
"""Dispatches immediate events to configured channels.
Each toggle category can have multiple delivery channels configured.
The dispatcher looks up the toggle for an event's category and sends
to all channels registered for that toggle.
Phase 2.1: Stub implementation that logs but doesn't actually deliver.
Phase 2.2: Will add real channel backends.
"""
def __init__(
self,
channel_config: Optional[dict[str, list[str]]] = None,
):
"""Initialize the dispatcher.
Args:
channel_config: Mapping of toggle -> list of channel names.
Example: {"mesh_health": ["discord", "meshtastic"]}
If None, defaults to empty (no channels configured).
"""
self._channels = channel_config or {}
self._logger = logging.getLogger("meshai.pipeline.dispatcher")
self._backends: dict[str, Callable[[Event], None]] = {}
def register_backend(
self,
channel_name: str,
handler: Callable[[Event], None],
) -> None:
"""Register a delivery backend for a channel.
Args:
channel_name: Name of the channel (e.g., "discord", "meshtastic")
handler: Callable that delivers the event to the channel
"""
self._backends[channel_name] = handler
self._logger.debug(f"Registered backend: {channel_name}")
def dispatch(self, event: Event) -> None:
"""Dispatch an immediate event to configured channels.
Looks up the toggle for the event's category, then sends to
all channels configured for that toggle.
Args:
event: The immediate-severity Event to dispatch
"""
toggle = get_toggle(event.category)
if toggle is None:
self._logger.warning(
f"Unknown category {event.category!r} for event {event.id}, "
"defaulting to mesh_health"
)
toggle = "mesh_health"
channels = self._channels.get(toggle, [])
if not channels:
self._logger.info(
f"No channels configured for toggle {toggle!r}, "
f"event {event.id} not dispatched"
)
return
for channel in channels:
self._deliver_to_channel(event, channel, toggle)
def _deliver_to_channel(
self,
event: Event,
channel: str,
toggle: str,
) -> None:
"""Deliver event to a specific channel.
Args:
event: The Event to deliver
channel: Channel name
toggle: Toggle category (for logging)
"""
backend = self._backends.get(channel)
if backend is None:
# Phase 2.1: Log stub - no real backend yet
self._logger.info(
f"DISPATCH STUB [{toggle}] -> {channel}: {event.title}"
)
return
try:
backend(event)
self._logger.info(
f"DISPATCHED [{toggle}] -> {channel}: {event.title}"
)
except Exception:
self._logger.exception(
f"Failed to dispatch event {event.id} to {channel}"
)
class StubChannelBackend:
"""Stub channel backend for testing.
Collects all events "sent" to it for verification in tests.
"""
def __init__(self, name: str):
self.name = name
self.events: list[Event] = []
self._logger = logging.getLogger(f"meshai.pipeline.stub.{name}")
def send(self, event: Event) -> None:
"""Record an event as sent.
Args:
event: The Event to record
"""
self.events.append(event)
self._logger.info(f"STUB {self.name}: {event.title}")
def clear(self) -> None:
"""Clear recorded events."""
self.events = []
"""Immediate event dispatcher.
The dispatcher routes immediate-severity events through the existing
NotificationRuleConfig rules and delivers via channels.py. This is the
transitional bridge between the new Event pipeline and the existing
channel implementations.
"""
import logging
from typing import Callable
from meshai.notifications.events import Event
class Dispatcher:
"""Dispatches immediate events to channels matching configured rules."""
SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2}
def __init__(self, config, channel_factory: Callable):
"""Initialize.
Args:
config: The full Config object (provides config.notifications.rules)
channel_factory: Callable taking a NotificationRuleConfig and
returning a NotificationChannel. This is create_channel
from meshai/notifications/channels.py.
"""
self._config = config
self._channel_factory = channel_factory
self._logger = logging.getLogger("meshai.pipeline.dispatcher")
def dispatch(self, event: Event) -> None:
"""Deliver an immediate-severity event to all matching channels."""
rules = self._matching_rules(event)
if not rules:
self._logger.debug(
f"No matching rules for {event.source}/{event.category}, skipping"
)
return
for rule in rules:
try:
channel = self._channel_factory(rule)
alert = {
"category": event.category,
"severity": event.severity,
"message": event.summary or event.title,
"node_id": event.node_ids[0] if event.node_ids else None,
"region": event.region,
"timestamp": event.timestamp,
}
channel.deliver(alert)
self._logger.info(
f"Dispatched event {event.id} via {rule.delivery_type}"
)
except Exception:
self._logger.exception(
f"Channel delivery failed for rule {rule.name}"
)
def _matching_rules(self, event: Event) -> list:
"""Return enabled condition rules matching this event's category
and severity threshold."""
event_rank = self.SEVERITY_RANK.get(event.severity, 0)
matches = []
for rule in self._config.notifications.rules:
if not rule.enabled:
continue
if rule.trigger_type != "condition":
continue
if rule.categories and event.category not in rule.categories:
continue
min_rank = self.SEVERITY_RANK.get(rule.min_severity, 0)
if event_rank < min_rank:
continue
matches.append(rule)
return matches