From 4e4a837c5e97838cd4b1e1ac0c24add21b2d07c7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 14 May 2026 17:21:20 +0000 Subject: [PATCH] feat(notifications): add Phase 1.3 + 2.1 pipeline skeleton Phase 1.3: - events.py: Event dataclass with ID generation and serialization - region_tagger.py: Coordinate/NWS zone region tagging - categories.py: Toggle field mapping for all 31 alert categories Phase 2.1 Pipeline Skeleton: - pipeline/bus.py: EventBus with subscribe/emit pattern - pipeline/severity_router.py: Routes immediate->dispatch, routine->digest - pipeline/dispatcher.py: Delivers immediate events to configured channels - pipeline/__init__.py: build_pipeline() factory and exports All components tested and verified in container. Co-Authored-By: Claude Opus 4.5 --- meshai/notifications/pipeline/__init__.py | 88 +++++++++++ meshai/notifications/pipeline/bus.py | 85 +++++++++++ meshai/notifications/pipeline/dispatcher.py | 143 ++++++++++++++++++ .../notifications/pipeline/severity_router.py | 104 +++++++++++++ 4 files changed, 420 insertions(+) create mode 100644 meshai/notifications/pipeline/__init__.py create mode 100644 meshai/notifications/pipeline/bus.py create mode 100644 meshai/notifications/pipeline/dispatcher.py create mode 100644 meshai/notifications/pipeline/severity_router.py diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py new file mode 100644 index 0000000..49d6c2b --- /dev/null +++ b/meshai/notifications/pipeline/__init__.py @@ -0,0 +1,88 @@ +"""Notification pipeline package. + +Phase 2.1 provides the bare skeleton: +- EventBus: Central pub/sub for all events +- SeverityRouter: Routes immediate vs digest events +- Dispatcher: Delivers immediate events to channels +- StubDigestQueue: Placeholder for Phase 2.3 aggregator + +Usage: + from meshai.notifications.pipeline import build_pipeline + + pipeline = build_pipeline(channel_config={ + "mesh_health": ["discord"], + "weather": ["discord", "meshtastic"], + }) + + # Emit events through the bus + pipeline["bus"].emit(event) +""" + +from meshai.notifications.pipeline.bus import EventBus, get_bus +from meshai.notifications.pipeline.severity_router import ( + SeverityRouter, + StubDigestQueue, +) +from meshai.notifications.pipeline.dispatcher import ( + Dispatcher, + StubChannelBackend, +) + + +def build_pipeline( + channel_config: dict[str, list[str]] | None = None, +) -> dict: + """Build and wire up the notification pipeline. + + Creates all pipeline components and connects them: + - EventBus receives all events + - SeverityRouter subscribes to bus, routes by severity + - Dispatcher handles immediate events + - StubDigestQueue collects priority/routine events + + Args: + channel_config: Mapping of toggle -> channel names for dispatch. + Example: {"mesh_health": ["discord"]} + + Returns: + Dict with all pipeline components: + - bus: EventBus instance + - router: SeverityRouter instance + - dispatcher: Dispatcher instance + - digest_queue: StubDigestQueue instance + """ + # Create components + bus = EventBus() + dispatcher = Dispatcher(channel_config) + digest_queue = StubDigestQueue() + + # Wire up the router + router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest_queue.enqueue, + ) + + # Subscribe router to bus + bus.subscribe(router.handle) + + return { + "bus": bus, + "router": router, + "dispatcher": dispatcher, + "digest_queue": digest_queue, + } + + +__all__ = [ + # Core classes + "EventBus", + "SeverityRouter", + "Dispatcher", + # Stubs for testing/Phase 2.x + "StubDigestQueue", + "StubChannelBackend", + # Factory + "build_pipeline", + # Singleton accessor + "get_bus", +] diff --git a/meshai/notifications/pipeline/bus.py b/meshai/notifications/pipeline/bus.py new file mode 100644 index 0000000..b2cda6d --- /dev/null +++ b/meshai/notifications/pipeline/bus.py @@ -0,0 +1,85 @@ +"""Event bus for the notification pipeline. + +The bus is the entry point for all events flowing through the pipeline. +Adapters call bus.emit(event) to push Events into the system. + +Usage: + from meshai.notifications.pipeline import get_bus + from meshai.notifications.events import make_event + + bus = get_bus() + event = make_event(source="nws", category="weather_warning", severity="immediate", ...) + bus.emit(event) +""" + +import logging +from typing import Callable, Iterable + +from meshai.notifications.events import Event + + +class EventBus: + """Central event bus for the notification pipeline. + + Subscribers register handlers that receive every emitted event. + Errors in one subscriber do not prevent other subscribers from + receiving the event. + """ + + def __init__(self): + self._subscribers: list[Callable[[Event], None]] = [] + self._logger = logging.getLogger("meshai.pipeline.bus") + + def subscribe(self, handler: Callable[[Event], None]) -> None: + """Register a handler that receives every emitted event. + + Args: + handler: Callable that takes an Event and returns None + """ + self._subscribers.append(handler) + self._logger.debug(f"Subscribed handler: {handler}") + + def emit(self, event: Event) -> None: + """Push an event to all subscribers. + + Errors in one subscriber do not stop others from receiving + the event. Exceptions are logged but not re-raised. + + Args: + event: The Event to deliver to all subscribers + """ + for handler in self._subscribers: + try: + handler(event) + except Exception: + self._logger.exception( + f"Subscriber {handler} failed on event {event.id}" + ) + + def emit_many(self, events: Iterable[Event]) -> None: + """Emit multiple events in sequence. + + Args: + events: Iterable of Events to emit + """ + for event in events: + self.emit(event) + + +# Module-level singleton for application-wide use +_bus: EventBus | None = None + + +def get_bus() -> EventBus: + """Get the global EventBus singleton. + + This is the primary way adapters access the bus. Tests should + construct a fresh EventBus() directly to avoid shared state. + + Returns: + The global EventBus instance + """ + global _bus + if _bus is None: + _bus = EventBus() + return _bus diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py new file mode 100644 index 0000000..858a8c5 --- /dev/null +++ b/meshai/notifications/pipeline/dispatcher.py @@ -0,0 +1,143 @@ +"""Immediate event dispatcher. + +The dispatcher routes immediate-severity events to configured delivery +channels based on the event's toggle category. + +Phase 2.1 provides a stub that logs dispatch attempts. Phase 2.2 will +add real channel backends (Discord webhooks, Meshtastic broadcast, etc.). + +Usage: + dispatcher = Dispatcher(channel_config) + dispatcher.dispatch(event) # Called by SeverityRouter for immediate events +""" + +import logging +from typing import Callable, Optional + +from meshai.notifications.events import Event +from meshai.notifications.categories import get_toggle + + +class Dispatcher: + """Dispatches immediate events to configured channels. + + Each toggle category can have multiple delivery channels configured. + The dispatcher looks up the toggle for an event's category and sends + to all channels registered for that toggle. + + Phase 2.1: Stub implementation that logs but doesn't actually deliver. + Phase 2.2: Will add real channel backends. + """ + + def __init__( + self, + channel_config: Optional[dict[str, list[str]]] = None, + ): + """Initialize the dispatcher. + + Args: + channel_config: Mapping of toggle -> list of channel names. + Example: {"mesh_health": ["discord", "meshtastic"]} + If None, defaults to empty (no channels configured). + """ + self._channels = channel_config or {} + self._logger = logging.getLogger("meshai.pipeline.dispatcher") + self._backends: dict[str, Callable[[Event], None]] = {} + + def register_backend( + self, + channel_name: str, + handler: Callable[[Event], None], + ) -> None: + """Register a delivery backend for a channel. + + Args: + channel_name: Name of the channel (e.g., "discord", "meshtastic") + handler: Callable that delivers the event to the channel + """ + self._backends[channel_name] = handler + self._logger.debug(f"Registered backend: {channel_name}") + + def dispatch(self, event: Event) -> None: + """Dispatch an immediate event to configured channels. + + Looks up the toggle for the event's category, then sends to + all channels configured for that toggle. + + Args: + event: The immediate-severity Event to dispatch + """ + toggle = get_toggle(event.category) + if toggle is None: + self._logger.warning( + f"Unknown category {event.category!r} for event {event.id}, " + "defaulting to mesh_health" + ) + toggle = "mesh_health" + + channels = self._channels.get(toggle, []) + if not channels: + self._logger.info( + f"No channels configured for toggle {toggle!r}, " + f"event {event.id} not dispatched" + ) + return + + for channel in channels: + self._deliver_to_channel(event, channel, toggle) + + def _deliver_to_channel( + self, + event: Event, + channel: str, + toggle: str, + ) -> None: + """Deliver event to a specific channel. + + Args: + event: The Event to deliver + channel: Channel name + toggle: Toggle category (for logging) + """ + backend = self._backends.get(channel) + if backend is None: + # Phase 2.1: Log stub - no real backend yet + self._logger.info( + f"DISPATCH STUB [{toggle}] -> {channel}: {event.title}" + ) + return + + try: + backend(event) + self._logger.info( + f"DISPATCHED [{toggle}] -> {channel}: {event.title}" + ) + except Exception: + self._logger.exception( + f"Failed to dispatch event {event.id} to {channel}" + ) + + +class StubChannelBackend: + """Stub channel backend for testing. + + Collects all events "sent" to it for verification in tests. + """ + + def __init__(self, name: str): + self.name = name + self.events: list[Event] = [] + self._logger = logging.getLogger(f"meshai.pipeline.stub.{name}") + + def send(self, event: Event) -> None: + """Record an event as sent. + + Args: + event: The Event to record + """ + self.events.append(event) + self._logger.info(f"STUB {self.name}: {event.title}") + + def clear(self) -> None: + """Clear recorded events.""" + self.events = [] diff --git a/meshai/notifications/pipeline/severity_router.py b/meshai/notifications/pipeline/severity_router.py new file mode 100644 index 0000000..089e670 --- /dev/null +++ b/meshai/notifications/pipeline/severity_router.py @@ -0,0 +1,104 @@ +"""Severity-based event routing. + +The severity router subscribes to the bus and forks each event into +one of two paths based on severity: + +- immediate → immediate_handler (dispatcher for live delivery) +- priority/routine → digest_handler (queue for batched summaries) + +Usage: + router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest_queue.enqueue, + ) + bus.subscribe(router.handle) +""" + +import logging +from typing import Callable + +from meshai.notifications.events import Event +from meshai.notifications.categories import get_toggle + + +class SeverityRouter: + """Routes events to immediate or digest handlers based on severity. + + Immediate-severity events go directly to live delivery channels. + Priority and routine events are queued for periodic digest summaries. + """ + + def __init__( + self, + immediate_handler: Callable[[Event], None], + digest_handler: Callable[[Event], None], + ): + """Initialize the severity router. + + Args: + immediate_handler: Called for severity="immediate" events + digest_handler: Called for severity in ("priority", "routine") + """ + self._immediate = immediate_handler + self._digest = digest_handler + self._logger = logging.getLogger("meshai.pipeline.severity_router") + + def handle(self, event: Event) -> None: + """Route an event based on its severity. + + Args: + event: The Event to route + """ + if event.severity == "immediate": + self._logger.info( + f"IMMEDIATE: {event.source}/{event.category} {event.title}" + ) + self._immediate(event) + elif event.severity in ("priority", "routine"): + self._logger.info( + f"DIGEST QUEUED [{event.severity}]: {event.title}" + ) + self._digest(event) + else: + self._logger.warning( + f"Unknown severity {event.severity!r} on event {event.id}, dropping" + ) + + +class StubDigestQueue: + """Placeholder digest queue for Phase 2.1. + + This is a stub that simply collects events in memory. Phase 2.3 + will replace this with the real aggregator that renders and + delivers periodic digest summaries. + """ + + def __init__(self): + self._queue: list[Event] = [] + self._logger = logging.getLogger("meshai.pipeline.digest_stub") + + def enqueue(self, event: Event) -> None: + """Add an event to the digest queue. + + Args: + event: The Event to queue for digest delivery + """ + self._queue.append(event) + toggle = get_toggle(event.category) or "unknown" + self._logger.info(f"DIGEST QUEUED [{toggle}]: {event.title}") + + def drain(self) -> list[Event]: + """Return and clear all queued events. + + For tests and the future aggregator. Returns the current + queue contents and resets the queue to empty. + + Returns: + List of all queued Events + """ + events, self._queue = self._queue, [] + return events + + def __len__(self) -> int: + """Return the number of queued events.""" + return len(self._queue)