mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-05-22 07:34:47 +02:00
fix(mesh): use configured offline threshold in data store
- Add offline_threshold_hours parameter to MeshDataStore.__init__ - Compute is_online in _normalize_node using configured threshold - Pass config.mesh_intelligence.offline_threshold_hours from main.py - Removes reliance on health engine for initial is_online computation Verification: - Unit test confirms 2h threshold marks 3h-old node offline - Unit test confirms 4h threshold marks same node online - Container starts healthy with no config errors - Health engine reports 16/16 infra online
This commit is contained in:
parent
21d6520ffd
commit
7a4bd4f38f
2 changed files with 65 additions and 59 deletions
109
meshai/main.py
109
meshai/main.py
|
|
@ -265,6 +265,7 @@ class MeshAI:
|
||||||
self.data_store = MeshDataStore(
|
self.data_store = MeshDataStore(
|
||||||
source_configs=enabled_sources,
|
source_configs=enabled_sources,
|
||||||
db_path="/data/mesh_history.db",
|
db_path="/data/mesh_history.db",
|
||||||
|
offline_threshold_hours=self.config.mesh_intelligence.offline_threshold_hours,
|
||||||
)
|
)
|
||||||
# Initial fetch and backfill
|
# Initial fetch and backfill
|
||||||
self.data_store.force_refresh()
|
self.data_store.force_refresh()
|
||||||
|
|
@ -338,18 +339,18 @@ class MeshAI:
|
||||||
)
|
)
|
||||||
logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})")
|
logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})")
|
||||||
|
|
||||||
|
|
||||||
# Notification router
|
# Notification router
|
||||||
if self.config.notifications.enabled:
|
if self.config.notifications.enabled:
|
||||||
from .notifications.router import NotificationRouter
|
from .notifications.router import NotificationRouter
|
||||||
self.notification_router = NotificationRouter(
|
self.notification_router = NotificationRouter(
|
||||||
config=self.config.notifications,
|
config=self.config.notifications,
|
||||||
connector=self.connector,
|
connector=self.connector,
|
||||||
llm_backend=self.llm,
|
llm_backend=self.llm,
|
||||||
timezone=self.config.timezone,
|
timezone=self.config.timezone,
|
||||||
)
|
)
|
||||||
logger.info("Notification router initialized")
|
logger.info("Notification router initialized")
|
||||||
|
|
||||||
# Environmental feeds
|
# Environmental feeds
|
||||||
env_cfg = self.config.environmental
|
env_cfg = self.config.environmental
|
||||||
if env_cfg.enabled:
|
if env_cfg.enabled:
|
||||||
|
|
@ -554,48 +555,48 @@ class MeshAI:
|
||||||
if pid_file.exists():
|
if pid_file.exists():
|
||||||
pid_file.unlink()
|
pid_file.unlink()
|
||||||
|
|
||||||
async def _dispatch_alerts(self, alerts: list[dict]) -> None:
|
async def _dispatch_alerts(self, alerts: list[dict]) -> None:
|
||||||
"""Dispatch alerts to subscribers and alert channel."""
|
"""Dispatch alerts to subscribers and alert channel."""
|
||||||
mi = self.config.mesh_intelligence
|
mi = self.config.mesh_intelligence
|
||||||
alert_channel = getattr(mi, 'alert_channel', -1)
|
alert_channel = getattr(mi, 'alert_channel', -1)
|
||||||
|
|
||||||
for alert in alerts:
|
for alert in alerts:
|
||||||
message = alert["message"]
|
message = alert["message"]
|
||||||
logger.info(f"ALERT: {message}")
|
logger.info(f"ALERT: {message}")
|
||||||
|
|
||||||
# Route through notification router if enabled
|
# Route through notification router if enabled
|
||||||
if self.notification_router:
|
if self.notification_router:
|
||||||
try:
|
try:
|
||||||
await self.notification_router.process_alert(alert)
|
await self.notification_router.process_alert(alert)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Notification router error: {e}")
|
logger.error(f"Notification router error: {e}")
|
||||||
|
|
||||||
# Fallback: Send to alert channel if no notification router
|
# Fallback: Send to alert channel if no notification router
|
||||||
elif alert_channel >= 0 and self.connector:
|
elif alert_channel >= 0 and self.connector:
|
||||||
try:
|
try:
|
||||||
self.connector.send_message(
|
self.connector.send_message(
|
||||||
text=message,
|
text=message,
|
||||||
destination=None,
|
destination=None,
|
||||||
channel=alert_channel,
|
channel=alert_channel,
|
||||||
)
|
)
|
||||||
logger.info(f"Alert sent to channel {alert_channel}")
|
logger.info(f"Alert sent to channel {alert_channel}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to send channel alert: {e}")
|
logger.error(f"Failed to send channel alert: {e}")
|
||||||
|
|
||||||
# Fallback: Send DMs to matching subscribers
|
# Fallback: Send DMs to matching subscribers
|
||||||
if self.alert_engine and self.subscription_manager:
|
if self.alert_engine and self.subscription_manager:
|
||||||
subscribers = self.alert_engine.get_subscribers_for_alert(alert)
|
subscribers = self.alert_engine.get_subscribers_for_alert(alert)
|
||||||
for sub in subscribers:
|
for sub in subscribers:
|
||||||
user_id = sub["user_id"]
|
user_id = sub["user_id"]
|
||||||
try:
|
try:
|
||||||
await self._send_sub_dm(user_id, message)
|
await self._send_sub_dm(user_id, message)
|
||||||
logger.info(f"Alert DM sent to {user_id}: {alert['type']}")
|
logger.info(f"Alert DM sent to {user_id}: {alert['type']}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to send alert DM to {user_id}: {e}")
|
logger.error(f"Failed to send alert DM to {user_id}: {e}")
|
||||||
|
|
||||||
if self.alert_engine:
|
if self.alert_engine:
|
||||||
self.alert_engine.clear_pending()
|
self.alert_engine.clear_pending()
|
||||||
|
|
||||||
async def _check_scheduled_subs(self) -> None:
|
async def _check_scheduled_subs(self) -> None:
|
||||||
"""Check for and deliver due scheduled reports."""
|
"""Check for and deliver due scheduled reports."""
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
|
||||||
|
|
@ -230,16 +230,19 @@ class MeshDataStore:
|
||||||
self,
|
self,
|
||||||
source_configs: list[MeshSourceConfig],
|
source_configs: list[MeshSourceConfig],
|
||||||
db_path: str = "/data/mesh_history.db",
|
db_path: str = "/data/mesh_history.db",
|
||||||
|
offline_threshold_hours: int = 2,
|
||||||
):
|
):
|
||||||
"""Initialize the data store.
|
"""Initialize the data store.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
source_configs: List of source configurations
|
source_configs: List of source configurations
|
||||||
db_path: Path to SQLite database for historical data
|
db_path: Path to SQLite database for historical data
|
||||||
|
offline_threshold_hours: Hours before a node is considered offline
|
||||||
"""
|
"""
|
||||||
self._sources: dict[str, MeshviewSource | MeshMonitorDataSource | MQTTSource] = {}
|
self._sources: dict[str, MeshviewSource | MeshMonitorDataSource | MQTTSource] = {}
|
||||||
self._db_path = db_path
|
self._db_path = db_path
|
||||||
self._db: Optional[sqlite3.Connection] = None
|
self._db: Optional[sqlite3.Connection] = None
|
||||||
|
self._offline_threshold_hours = offline_threshold_hours
|
||||||
|
|
||||||
# Live state
|
# Live state
|
||||||
self._nodes: dict[int, UnifiedNode] = {}
|
self._nodes: dict[int, UnifiedNode] = {}
|
||||||
|
|
@ -745,11 +748,13 @@ class MeshDataStore:
|
||||||
|
|
||||||
node.last_heard = ts or 0.0
|
node.last_heard = ts or 0.0
|
||||||
|
|
||||||
# NOTE: is_online is set by MeshHealthEngine.compute() using the
|
# Compute is_online based on configured threshold
|
||||||
# configured offline_threshold_hours. Don't set it here with a
|
# This ensures correct status immediately, before health engine runs
|
||||||
# hardcoded value - let the health engine determine online status.
|
if node.last_heard:
|
||||||
# The health engine runs on every refresh cycle and will set is_online
|
offline_threshold = time.time() - (self._offline_threshold_hours * 3600)
|
||||||
# based on: (now - last_heard) < (offline_threshold_hours * 3600)
|
node.is_online = node.last_heard > offline_threshold
|
||||||
|
else:
|
||||||
|
node.is_online = False
|
||||||
|
|
||||||
# Hops, SNR, RSSI (MM)
|
# Hops, SNR, RSSI (MM)
|
||||||
node.hops_away = raw.get("hopsAway")
|
node.hops_away = raw.get("hopsAway")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue