diff --git a/meshai/main.py b/meshai/main.py index c674275..48bc44b 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -265,6 +265,7 @@ class MeshAI: self.data_store = MeshDataStore( source_configs=enabled_sources, db_path="/data/mesh_history.db", + offline_threshold_hours=self.config.mesh_intelligence.offline_threshold_hours, ) # Initial fetch and backfill self.data_store.force_refresh() @@ -338,18 +339,18 @@ class MeshAI: ) logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})") - - # Notification router - if self.config.notifications.enabled: - from .notifications.router import NotificationRouter - self.notification_router = NotificationRouter( - config=self.config.notifications, - connector=self.connector, - llm_backend=self.llm, - timezone=self.config.timezone, - ) - logger.info("Notification router initialized") - + + # Notification router + if self.config.notifications.enabled: + from .notifications.router import NotificationRouter + self.notification_router = NotificationRouter( + config=self.config.notifications, + connector=self.connector, + llm_backend=self.llm, + timezone=self.config.timezone, + ) + logger.info("Notification router initialized") + # Environmental feeds env_cfg = self.config.environmental if env_cfg.enabled: @@ -554,48 +555,48 @@ class MeshAI: if pid_file.exists(): pid_file.unlink() - async def _dispatch_alerts(self, alerts: list[dict]) -> None: - """Dispatch alerts to subscribers and alert channel.""" - mi = self.config.mesh_intelligence - alert_channel = getattr(mi, 'alert_channel', -1) - - for alert in alerts: - message = alert["message"] - logger.info(f"ALERT: {message}") - - # Route through notification router if enabled - if self.notification_router: - try: - await self.notification_router.process_alert(alert) - except Exception as e: - logger.error(f"Notification router error: {e}") - - # Fallback: Send to alert channel if no notification router - elif alert_channel >= 0 and self.connector: - try: - self.connector.send_message( - text=message, - destination=None, - channel=alert_channel, - ) - logger.info(f"Alert sent to channel {alert_channel}") - except Exception as e: - logger.error(f"Failed to send channel alert: {e}") - - # Fallback: Send DMs to matching subscribers - if self.alert_engine and self.subscription_manager: - subscribers = self.alert_engine.get_subscribers_for_alert(alert) - for sub in subscribers: - user_id = sub["user_id"] - try: - await self._send_sub_dm(user_id, message) - logger.info(f"Alert DM sent to {user_id}: {alert['type']}") - except Exception as e: - logger.error(f"Failed to send alert DM to {user_id}: {e}") - - if self.alert_engine: - self.alert_engine.clear_pending() - + async def _dispatch_alerts(self, alerts: list[dict]) -> None: + """Dispatch alerts to subscribers and alert channel.""" + mi = self.config.mesh_intelligence + alert_channel = getattr(mi, 'alert_channel', -1) + + for alert in alerts: + message = alert["message"] + logger.info(f"ALERT: {message}") + + # Route through notification router if enabled + if self.notification_router: + try: + await self.notification_router.process_alert(alert) + except Exception as e: + logger.error(f"Notification router error: {e}") + + # Fallback: Send to alert channel if no notification router + elif alert_channel >= 0 and self.connector: + try: + self.connector.send_message( + text=message, + destination=None, + channel=alert_channel, + ) + logger.info(f"Alert sent to channel {alert_channel}") + except Exception as e: + logger.error(f"Failed to send channel alert: {e}") + + # Fallback: Send DMs to matching subscribers + if self.alert_engine and self.subscription_manager: + subscribers = self.alert_engine.get_subscribers_for_alert(alert) + for sub in subscribers: + user_id = sub["user_id"] + try: + await self._send_sub_dm(user_id, message) + logger.info(f"Alert DM sent to {user_id}: {alert['type']}") + except Exception as e: + logger.error(f"Failed to send alert DM to {user_id}: {e}") + + if self.alert_engine: + self.alert_engine.clear_pending() + async def _check_scheduled_subs(self) -> None: """Check for and deliver due scheduled reports.""" from datetime import datetime diff --git a/meshai/mesh_data_store.py b/meshai/mesh_data_store.py index 3c885d0..4c4351f 100644 --- a/meshai/mesh_data_store.py +++ b/meshai/mesh_data_store.py @@ -230,16 +230,19 @@ class MeshDataStore: self, source_configs: list[MeshSourceConfig], db_path: str = "/data/mesh_history.db", + offline_threshold_hours: int = 2, ): """Initialize the data store. Args: source_configs: List of source configurations db_path: Path to SQLite database for historical data + offline_threshold_hours: Hours before a node is considered offline """ self._sources: dict[str, MeshviewSource | MeshMonitorDataSource | MQTTSource] = {} self._db_path = db_path self._db: Optional[sqlite3.Connection] = None + self._offline_threshold_hours = offline_threshold_hours # Live state self._nodes: dict[int, UnifiedNode] = {} @@ -745,11 +748,13 @@ class MeshDataStore: node.last_heard = ts or 0.0 - # NOTE: is_online is set by MeshHealthEngine.compute() using the - # configured offline_threshold_hours. Don't set it here with a - # hardcoded value - let the health engine determine online status. - # The health engine runs on every refresh cycle and will set is_online - # based on: (now - last_heard) < (offline_threshold_hours * 3600) + # Compute is_online based on configured threshold + # This ensures correct status immediately, before health engine runs + if node.last_heard: + offline_threshold = time.time() - (self._offline_threshold_hours * 3600) + node.is_online = node.last_heard > offline_threshold + else: + node.is_online = False # Hops, SNR, RSSI (MM) node.hops_away = raw.get("hopsAway")