diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 49d6c2b..fcc5314 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -1,88 +1,66 @@ -"""Notification pipeline package. - -Phase 2.1 provides the bare skeleton: -- EventBus: Central pub/sub for all events -- SeverityRouter: Routes immediate vs digest events -- Dispatcher: Delivers immediate events to channels -- StubDigestQueue: Placeholder for Phase 2.3 aggregator - -Usage: - from meshai.notifications.pipeline import build_pipeline - - pipeline = build_pipeline(channel_config={ - "mesh_health": ["discord"], - "weather": ["discord", "meshtastic"], - }) - - # Emit events through the bus - pipeline["bus"].emit(event) -""" - -from meshai.notifications.pipeline.bus import EventBus, get_bus -from meshai.notifications.pipeline.severity_router import ( - SeverityRouter, - StubDigestQueue, -) -from meshai.notifications.pipeline.dispatcher import ( - Dispatcher, - StubChannelBackend, -) - - -def build_pipeline( - channel_config: dict[str, list[str]] | None = None, -) -> dict: - """Build and wire up the notification pipeline. - - Creates all pipeline components and connects them: - - EventBus receives all events - - SeverityRouter subscribes to bus, routes by severity - - Dispatcher handles immediate events - - StubDigestQueue collects priority/routine events - - Args: - channel_config: Mapping of toggle -> channel names for dispatch. - Example: {"mesh_health": ["discord"]} - - Returns: - Dict with all pipeline components: - - bus: EventBus instance - - router: SeverityRouter instance - - dispatcher: Dispatcher instance - - digest_queue: StubDigestQueue instance - """ - # Create components - bus = EventBus() - dispatcher = Dispatcher(channel_config) - digest_queue = StubDigestQueue() - - # Wire up the router - router = SeverityRouter( - immediate_handler=dispatcher.dispatch, - digest_handler=digest_queue.enqueue, - ) - - # Subscribe router to bus - bus.subscribe(router.handle) - - return { - "bus": bus, - "router": router, - "dispatcher": dispatcher, - "digest_queue": digest_queue, - } - - -__all__ = [ - # Core classes - "EventBus", - "SeverityRouter", - "Dispatcher", - # Stubs for testing/Phase 2.x - "StubDigestQueue", - "StubChannelBackend", - # Factory - "build_pipeline", - # Singleton accessor - "get_bus", -] +"""Notification pipeline package. + +Phase 2.1 skeleton: + - EventBus: pub/sub for adapter ingress + - SeverityRouter: forks immediate vs digest paths + - Dispatcher: routes immediate events to channels via existing rules + - StubDigestQueue: placeholder for Phase 2.3 aggregator + +Usage: + from meshai.notifications.pipeline import build_pipeline + bus = build_pipeline(config) + bus.emit(event) +""" + +from meshai.notifications.channels import create_channel +from meshai.notifications.pipeline.bus import EventBus, get_bus +from meshai.notifications.pipeline.severity_router import ( + SeverityRouter, + StubDigestQueue, +) +from meshai.notifications.pipeline.dispatcher import Dispatcher + + +def build_pipeline(config) -> EventBus: + """Build the pipeline and return the EventBus. + + Adapters emit events to this bus and they flow through the + severity router to either the dispatcher (immediate) or the + digest stub (priority/routine). + """ + bus = EventBus() + dispatcher = Dispatcher(config, create_channel) + digest = StubDigestQueue() + severity_router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest.enqueue, + ) + bus.subscribe(severity_router.handle) + return bus + + +def build_pipeline_components(config) -> tuple: + """Like build_pipeline, but returns all components for test inspection. + + Returns (bus, dispatcher, digest, severity_router). + """ + bus = EventBus() + dispatcher = Dispatcher(config, create_channel) + digest = StubDigestQueue() + severity_router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest.enqueue, + ) + bus.subscribe(severity_router.handle) + return bus, dispatcher, digest, severity_router + + +__all__ = [ + "EventBus", + "SeverityRouter", + "StubDigestQueue", + "Dispatcher", + "build_pipeline", + "build_pipeline_components", + "get_bus", +] diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index 858a8c5..1df1eb2 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -1,143 +1,77 @@ -"""Immediate event dispatcher. - -The dispatcher routes immediate-severity events to configured delivery -channels based on the event's toggle category. - -Phase 2.1 provides a stub that logs dispatch attempts. Phase 2.2 will -add real channel backends (Discord webhooks, Meshtastic broadcast, etc.). - -Usage: - dispatcher = Dispatcher(channel_config) - dispatcher.dispatch(event) # Called by SeverityRouter for immediate events -""" - -import logging -from typing import Callable, Optional - -from meshai.notifications.events import Event -from meshai.notifications.categories import get_toggle - - -class Dispatcher: - """Dispatches immediate events to configured channels. - - Each toggle category can have multiple delivery channels configured. - The dispatcher looks up the toggle for an event's category and sends - to all channels registered for that toggle. - - Phase 2.1: Stub implementation that logs but doesn't actually deliver. - Phase 2.2: Will add real channel backends. - """ - - def __init__( - self, - channel_config: Optional[dict[str, list[str]]] = None, - ): - """Initialize the dispatcher. - - Args: - channel_config: Mapping of toggle -> list of channel names. - Example: {"mesh_health": ["discord", "meshtastic"]} - If None, defaults to empty (no channels configured). - """ - self._channels = channel_config or {} - self._logger = logging.getLogger("meshai.pipeline.dispatcher") - self._backends: dict[str, Callable[[Event], None]] = {} - - def register_backend( - self, - channel_name: str, - handler: Callable[[Event], None], - ) -> None: - """Register a delivery backend for a channel. - - Args: - channel_name: Name of the channel (e.g., "discord", "meshtastic") - handler: Callable that delivers the event to the channel - """ - self._backends[channel_name] = handler - self._logger.debug(f"Registered backend: {channel_name}") - - def dispatch(self, event: Event) -> None: - """Dispatch an immediate event to configured channels. - - Looks up the toggle for the event's category, then sends to - all channels configured for that toggle. - - Args: - event: The immediate-severity Event to dispatch - """ - toggle = get_toggle(event.category) - if toggle is None: - self._logger.warning( - f"Unknown category {event.category!r} for event {event.id}, " - "defaulting to mesh_health" - ) - toggle = "mesh_health" - - channels = self._channels.get(toggle, []) - if not channels: - self._logger.info( - f"No channels configured for toggle {toggle!r}, " - f"event {event.id} not dispatched" - ) - return - - for channel in channels: - self._deliver_to_channel(event, channel, toggle) - - def _deliver_to_channel( - self, - event: Event, - channel: str, - toggle: str, - ) -> None: - """Deliver event to a specific channel. - - Args: - event: The Event to deliver - channel: Channel name - toggle: Toggle category (for logging) - """ - backend = self._backends.get(channel) - if backend is None: - # Phase 2.1: Log stub - no real backend yet - self._logger.info( - f"DISPATCH STUB [{toggle}] -> {channel}: {event.title}" - ) - return - - try: - backend(event) - self._logger.info( - f"DISPATCHED [{toggle}] -> {channel}: {event.title}" - ) - except Exception: - self._logger.exception( - f"Failed to dispatch event {event.id} to {channel}" - ) - - -class StubChannelBackend: - """Stub channel backend for testing. - - Collects all events "sent" to it for verification in tests. - """ - - def __init__(self, name: str): - self.name = name - self.events: list[Event] = [] - self._logger = logging.getLogger(f"meshai.pipeline.stub.{name}") - - def send(self, event: Event) -> None: - """Record an event as sent. - - Args: - event: The Event to record - """ - self.events.append(event) - self._logger.info(f"STUB {self.name}: {event.title}") - - def clear(self) -> None: - """Clear recorded events.""" - self.events = [] +"""Immediate event dispatcher. + +The dispatcher routes immediate-severity events through the existing +NotificationRuleConfig rules and delivers via channels.py. This is the +transitional bridge between the new Event pipeline and the existing +channel implementations. +""" + +import logging +from typing import Callable + +from meshai.notifications.events import Event + + +class Dispatcher: + """Dispatches immediate events to channels matching configured rules.""" + + SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2} + + def __init__(self, config, channel_factory: Callable): + """Initialize. + + Args: + config: The full Config object (provides config.notifications.rules) + channel_factory: Callable taking a NotificationRuleConfig and + returning a NotificationChannel. This is create_channel + from meshai/notifications/channels.py. + """ + self._config = config + self._channel_factory = channel_factory + self._logger = logging.getLogger("meshai.pipeline.dispatcher") + + def dispatch(self, event: Event) -> None: + """Deliver an immediate-severity event to all matching channels.""" + rules = self._matching_rules(event) + if not rules: + self._logger.debug( + f"No matching rules for {event.source}/{event.category}, skipping" + ) + return + for rule in rules: + try: + channel = self._channel_factory(rule) + alert = { + "category": event.category, + "severity": event.severity, + "message": event.summary or event.title, + "node_id": event.node_ids[0] if event.node_ids else None, + "region": event.region, + "timestamp": event.timestamp, + } + channel.deliver(alert) + self._logger.info( + f"Dispatched event {event.id} via {rule.delivery_type}" + ) + except Exception: + self._logger.exception( + f"Channel delivery failed for rule {rule.name}" + ) + + def _matching_rules(self, event: Event) -> list: + """Return enabled condition rules matching this event's category + and severity threshold.""" + event_rank = self.SEVERITY_RANK.get(event.severity, 0) + matches = [] + for rule in self._config.notifications.rules: + if not rule.enabled: + continue + if rule.trigger_type != "condition": + continue + if rule.categories and event.category not in rule.categories: + continue + min_rank = self.SEVERITY_RANK.get(rule.min_severity, 0) + if event_rank < min_rank: + continue + matches.append(rule) + return matches