diff --git a/meshai/env/store.py b/meshai/env/store.py index a6ea2fd..6f1543b 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -114,7 +114,12 @@ class EnvironmentalStore: try: event = adapter.to_event(raw_evt) self._event_bus.emit(event) - logger.debug("Emitted %s event %s to pipeline", event.source, event.id) + logger.info( + "Emitted %s event %s (%s) to pipeline bus", + event.source, + event.id, + event.category, + ) except Exception as e: logger.warning("Failed to emit event to pipeline: %s", e) diff --git a/meshai/main.py b/meshai/main.py index 6d1c41f..6a994dc 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -46,6 +46,8 @@ class MeshAI: self.subscription_manager = None self.alert_engine = None self.notification_router = None + self.event_bus = None # Notification pipeline EventBus (v0.3) + self._pipeline_scheduler = None # DigestScheduler from start_pipeline() self.env_store = None # Environmental feeds store self._last_sub_check: float = 0.0 self.router: Optional[MessageRouter] = None @@ -80,6 +82,14 @@ class MeshAI: # Write PID file self._write_pid() + # Start the notification pipeline's async components (digest scheduler). + # build_pipeline() ran in _init_components(); this starts its scheduler + # now that we are inside the running event loop. + if self.event_bus is not None: + from .notifications.pipeline import start_pipeline + self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config) + logger.info("Notification pipeline started") + logger.info("MeshAI started successfully") # Keep running @@ -169,6 +179,10 @@ class MeshAI: logger.info("Stopping MeshAI...") self._running = False + if self._pipeline_scheduler is not None: + from .notifications.pipeline import stop_pipeline + await stop_pipeline(self._pipeline_scheduler) + if self.connector: self.connector.disconnect() @@ -352,13 +366,22 @@ class MeshAI: ) logger.info("Notification router initialized") + # Notification pipeline (v0.3 EventBus). Built here so env + # adapters constructed below can emit Events into the live + # pipeline at runtime via EnvironmentalStore(event_bus=...). + from .notifications.pipeline import build_pipeline + self.event_bus = build_pipeline(self.config, self.llm, self.connector) + logger.info("Notification pipeline EventBus initialized") + # Environmental feeds env_cfg = self.config.environmental if env_cfg.enabled: from .env.store import EnvironmentalStore # Pass region anchors for fire proximity calculation region_anchors = self.config.mesh_intelligence.regions if self.config.mesh_intelligence.enabled else [] - self.env_store = EnvironmentalStore(config=env_cfg, region_anchors=region_anchors) + self.env_store = EnvironmentalStore( + config=env_cfg, region_anchors=region_anchors, event_bus=self.event_bus + ) logger.info(f"Environmental feeds enabled ({len(self.env_store._adapters)} adapters)") else: self.env_store = None