From 074e020463ba921f018dc74a5800d04db7d855d3 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Wed, 27 May 2026 15:39:40 +0000 Subject: [PATCH] feat(notifications): Phase 2.6.5 wire EventBus into main.py runtime path Closes the dark store->bus path. The to_event() methods added in Phase 2.6 for NWS and FIRMS were exercised only by unit tests because main.py never built the pipeline or passed an EventBus to EnvironmentalStore. Insertion points (matching existing init/lifecycle conventions): - _init_components(): inside the notifications.enabled block, after the NotificationRouter init, build the v0.3 pipeline via build_pipeline() and stash it on self.event_bus; then construct EnvironmentalStore with event_bus=self.event_bus so newly-seen adapter events emit to the bus. - start(): after _write_pid(), await start_pipeline() to launch the digest scheduler now that the event loop is running; the scheduler is stored on self._pipeline_scheduler. - stop(): await stop_pipeline() during teardown. - env/store._emit_event(): emission log promoted DEBUG->INFO for runtime traceability of events crossing the bus. When notifications are disabled, self.event_bus stays None and the store receives None (emission no-ops), preserving prior behavior. Tests: 132 passing, no regressions. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/env/store.py | 7 ++++++- meshai/main.py | 25 ++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/meshai/env/store.py b/meshai/env/store.py index a6ea2fd..6f1543b 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -114,7 +114,12 @@ class EnvironmentalStore: try: event = adapter.to_event(raw_evt) self._event_bus.emit(event) - logger.debug("Emitted %s event %s to pipeline", event.source, event.id) + logger.info( + "Emitted %s event %s (%s) to pipeline bus", + event.source, + event.id, + event.category, + ) except Exception as e: logger.warning("Failed to emit event to pipeline: %s", e) diff --git a/meshai/main.py b/meshai/main.py index 6d1c41f..6a994dc 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -46,6 +46,8 @@ class MeshAI: self.subscription_manager = None self.alert_engine = None self.notification_router = None + self.event_bus = None # Notification pipeline EventBus (v0.3) + self._pipeline_scheduler = None # DigestScheduler from start_pipeline() self.env_store = None # Environmental feeds store self._last_sub_check: float = 0.0 self.router: Optional[MessageRouter] = None @@ -80,6 +82,14 @@ class MeshAI: # Write PID file self._write_pid() + # Start the notification pipeline's async components (digest scheduler). + # build_pipeline() ran in _init_components(); this starts its scheduler + # now that we are inside the running event loop. + if self.event_bus is not None: + from .notifications.pipeline import start_pipeline + self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config) + logger.info("Notification pipeline started") + logger.info("MeshAI started successfully") # Keep running @@ -169,6 +179,10 @@ class MeshAI: logger.info("Stopping MeshAI...") self._running = False + if self._pipeline_scheduler is not None: + from .notifications.pipeline import stop_pipeline + await stop_pipeline(self._pipeline_scheduler) + if self.connector: self.connector.disconnect() @@ -352,13 +366,22 @@ class MeshAI: ) logger.info("Notification router initialized") + # Notification pipeline (v0.3 EventBus). Built here so env + # adapters constructed below can emit Events into the live + # pipeline at runtime via EnvironmentalStore(event_bus=...). + from .notifications.pipeline import build_pipeline + self.event_bus = build_pipeline(self.config, self.llm, self.connector) + logger.info("Notification pipeline EventBus initialized") + # Environmental feeds env_cfg = self.config.environmental if env_cfg.enabled: from .env.store import EnvironmentalStore # Pass region anchors for fire proximity calculation region_anchors = self.config.mesh_intelligence.regions if self.config.mesh_intelligence.enabled else [] - self.env_store = EnvironmentalStore(config=env_cfg, region_anchors=region_anchors) + self.env_store = EnvironmentalStore( + config=env_cfg, region_anchors=region_anchors, event_bus=self.event_bus + ) logger.info(f"Environmental feeds enabled ({len(self.env_store._adapters)} adapters)") else: self.env_store = None