feat(notifications): Phase 2.6.5 wire EventBus into main.py runtime path

Closes the dark store->bus path. The to_event() methods added in Phase
2.6 for NWS and FIRMS were exercised only by unit tests because main.py
never built the pipeline or passed an EventBus to EnvironmentalStore.

Insertion points (matching existing init/lifecycle conventions):
- _init_components(): inside the notifications.enabled block, after the
  NotificationRouter init, build the v0.3 pipeline via build_pipeline()
  and stash it on self.event_bus; then construct EnvironmentalStore with
  event_bus=self.event_bus so newly-seen adapter events emit to the bus.
- start(): after _write_pid(), await start_pipeline() to launch the
  digest scheduler now that the event loop is running; the scheduler is
  stored on self._pipeline_scheduler.
- stop(): await stop_pipeline() during teardown.
- env/store._emit_event(): emission log promoted DEBUG->INFO for runtime
  traceability of events crossing the bus.

When notifications are disabled, self.event_bus stays None and the store
receives None (emission no-ops), preserving prior behavior.

Tests: 132 passing, no regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-27 15:39:40 +00:00
commit 074e020463
2 changed files with 30 additions and 2 deletions

7
meshai/env/store.py vendored
View file

@ -114,7 +114,12 @@ class EnvironmentalStore:
try: try:
event = adapter.to_event(raw_evt) event = adapter.to_event(raw_evt)
self._event_bus.emit(event) self._event_bus.emit(event)
logger.debug("Emitted %s event %s to pipeline", event.source, event.id) logger.info(
"Emitted %s event %s (%s) to pipeline bus",
event.source,
event.id,
event.category,
)
except Exception as e: except Exception as e:
logger.warning("Failed to emit event to pipeline: %s", e) logger.warning("Failed to emit event to pipeline: %s", e)

View file

@ -46,6 +46,8 @@ class MeshAI:
self.subscription_manager = None self.subscription_manager = None
self.alert_engine = None self.alert_engine = None
self.notification_router = None self.notification_router = None
self.event_bus = None # Notification pipeline EventBus (v0.3)
self._pipeline_scheduler = None # DigestScheduler from start_pipeline()
self.env_store = None # Environmental feeds store self.env_store = None # Environmental feeds store
self._last_sub_check: float = 0.0 self._last_sub_check: float = 0.0
self.router: Optional[MessageRouter] = None self.router: Optional[MessageRouter] = None
@ -80,6 +82,14 @@ class MeshAI:
# Write PID file # Write PID file
self._write_pid() self._write_pid()
# Start the notification pipeline's async components (digest scheduler).
# build_pipeline() ran in _init_components(); this starts its scheduler
# now that we are inside the running event loop.
if self.event_bus is not None:
from .notifications.pipeline import start_pipeline
self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config)
logger.info("Notification pipeline started")
logger.info("MeshAI started successfully") logger.info("MeshAI started successfully")
# Keep running # Keep running
@ -169,6 +179,10 @@ class MeshAI:
logger.info("Stopping MeshAI...") logger.info("Stopping MeshAI...")
self._running = False self._running = False
if self._pipeline_scheduler is not None:
from .notifications.pipeline import stop_pipeline
await stop_pipeline(self._pipeline_scheduler)
if self.connector: if self.connector:
self.connector.disconnect() self.connector.disconnect()
@ -352,13 +366,22 @@ class MeshAI:
) )
logger.info("Notification router initialized") logger.info("Notification router initialized")
# Notification pipeline (v0.3 EventBus). Built here so env
# adapters constructed below can emit Events into the live
# pipeline at runtime via EnvironmentalStore(event_bus=...).
from .notifications.pipeline import build_pipeline
self.event_bus = build_pipeline(self.config, self.llm, self.connector)
logger.info("Notification pipeline EventBus initialized")
# Environmental feeds # Environmental feeds
env_cfg = self.config.environmental env_cfg = self.config.environmental
if env_cfg.enabled: if env_cfg.enabled:
from .env.store import EnvironmentalStore from .env.store import EnvironmentalStore
# Pass region anchors for fire proximity calculation # Pass region anchors for fire proximity calculation
region_anchors = self.config.mesh_intelligence.regions if self.config.mesh_intelligence.enabled else [] region_anchors = self.config.mesh_intelligence.regions if self.config.mesh_intelligence.enabled else []
self.env_store = EnvironmentalStore(config=env_cfg, region_anchors=region_anchors) self.env_store = EnvironmentalStore(
config=env_cfg, region_anchors=region_anchors, event_bus=self.event_bus
)
logger.info(f"Environmental feeds enabled ({len(self.env_store._adapters)} adapters)") logger.info(f"Environmental feeds enabled ({len(self.env_store._adapters)} adapters)")
else: else:
self.env_store = None self.env_store = None