feat: Full alert engine — 17 conditions, scaling cooldown, per-condition TUI toggles

Alert conditions across all 5 pillars:
  Infrastructure: offline, recovery, new router
  Power: battery 50/25/10%, 7-day trend, USB→battery, solar not charging
  Utilization: sustained >20% for 6h, packet flood >500/24h
  Coverage: infra single gateway, feeder offline, region blackout
  Scores: mesh <70, region <60

Scaling cooldown: immediate → 12h → 24h → 48h → stop
Recovery notifications when conditions resolve
Per-condition on/off toggles in TUI
Battery trend queries SQLite node_snapshots for 7-day history

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-06 05:39:11 +00:00
commit 736d6a313a
4 changed files with 712 additions and 194 deletions

View file

@ -1,16 +1,62 @@
"""Alert engine detects mesh state changes and dispatches alerts."""
"""Alert engine - detects mesh state changes and dispatches alerts."""
import logging
import time
from datetime import datetime
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from .config import AlertRulesConfig, MeshIntelligenceConfig
from .mesh_health import MeshHealthEngine
from .mesh_reporter import MeshReporter
from .subscriptions import SubscriptionManager
logger = logging.getLogger(__name__)
# Scaling cooldown schedule (seconds after first alert)
# Alert 1: immediate, Alert 2: +12h, Alert 3: +24h more, Alert 4: +48h more, then stop
ESCALATION_SCHEDULE = [0, 12 * 3600, 24 * 3600, 48 * 3600]
class AlertState:
"""Tracks escalation state for a single condition."""
def __init__(self):
self.first_fired: float = 0
self.alert_count: int = 0
self.last_fired: float = 0
self.resolved: bool = False
def should_fire(self, now: float) -> bool:
"""Check if this condition should fire based on scaling cooldown."""
if self.resolved:
return False
if self.alert_count == 0:
return True
if self.alert_count >= len(ESCALATION_SCHEDULE):
return False
elapsed_since_last = now - self.last_fired
required_wait = ESCALATION_SCHEDULE[self.alert_count]
return elapsed_since_last >= required_wait
def fire(self, now: float):
"""Record that an alert was fired."""
if self.alert_count == 0:
self.first_fired = now
self.last_fired = now
self.alert_count += 1
def resolve(self):
"""Mark condition as resolved."""
self.resolved = True
def reset(self):
"""Full reset for new occurrence."""
self.first_fired = 0
self.alert_count = 0
self.last_fired = 0
self.resolved = False
class AlertEngine:
"""Detects mesh state changes and dispatches alerts."""
@ -20,139 +66,498 @@ class AlertEngine:
health_engine: "MeshHealthEngine",
reporter: "MeshReporter",
subscription_manager: "SubscriptionManager",
critical_nodes: list[str] = None,
alert_cooldown_minutes: int = 30,
config: "MeshIntelligenceConfig",
db_path: str = "",
):
self._health = health_engine
self._reporter = reporter
self._subs = subscription_manager
self._critical_nodes = set(n.upper() for n in (critical_nodes or []))
self._cooldown_seconds = alert_cooldown_minutes * 60
self._rules = config.alert_rules
self._critical_nodes = set(n.upper() for n in (config.critical_nodes or []))
self._db_path = db_path
# Previous state snapshot for change detection
self._prev_infra_online: dict[int, bool] = {} # node_num -> was_online
self._prev_battery: dict[int, float] = {} # node_num -> battery_percent
# Cooldown tracker: condition_key -> last_alert_time
self._cooldowns: dict[str, float] = {}
# Queued alerts for delivery
self._states: dict[str, AlertState] = {}
self._prev_infra_online: dict[int, bool] = {}
self._prev_battery: dict[int, float] = {}
self._prev_power_source: dict[int, str] = {}
self._prev_gateways: dict[int, float] = {}
self._prev_mesh_score: Optional[float] = None
self._prev_region_scores: dict[str, float] = {}
self._prev_feeder_gateways: set[str] = set()
self._known_routers: set[int] = set()
self._util_exceeded_since: dict[int, float] = {}
self._first_run = True
self._pending_alerts: list[dict] = []
def check(self) -> list[dict]:
"""Compare current health to previous state. Returns list of alert dicts.
def _get_state(self, key: str) -> AlertState:
if key not in self._states:
self._states[key] = AlertState()
return self._states[key]
Each alert dict: {
"type": "infra_offline" | "infra_recovery" | "battery_critical" | "critical_node_down",
"node_name": str,
"node_short": str,
"node_num": int,
"region": str,
"message": str,
"scope_type": "mesh" | "region" | "node",
"scope_value": str,
"is_critical": bool,
}
"""
def check(self) -> list[dict]:
"""Run all alert checks. Returns list of alert dicts."""
health = self._health.mesh_health
if not health:
return []
now = time.time()
alerts = []
alerts.extend(self._check_infrastructure(health, now))
alerts.extend(self._check_power(health, now))
alerts.extend(self._check_utilization(health, now))
alerts.extend(self._check_coverage(health, now))
alerts.extend(self._check_health_scores(health, now))
self._first_run = False
self._pending_alerts = alerts
return alerts
def _check_infrastructure(self, health, now: float) -> list[dict]:
alerts = []
for node in health.nodes.values():
if not node.is_infrastructure:
continue
node_num = node.node_num
name = node.long_name or node.short_name or str(node_num)
short = node.short_name or str(node_num)
short = (node.short_name or str(node_num)).upper()
region = node.region or "Unknown"
is_critical = short.upper() in self._critical_nodes
is_critical = short in self._critical_nodes
region_display = self._get_region_display(region)
# --- Infrastructure offline detection ---
was_online = self._prev_infra_online.get(node_num)
is_online = node.is_online
if was_online is not None: # Skip first run (no previous state)
if was_online and not is_online:
# Node went OFFLINE
if not self._first_run and was_online is not None:
if was_online and not is_online and self._rules.infra_offline:
key = f"offline_{node_num}"
state = self._get_state(key)
state.resolved = False
if state.should_fire(now):
alert_type = "critical_node_down" if is_critical else "infra_offline"
cooldown_key = f"offline_{node_num}"
emoji = "\U0001F6A8" if is_critical else "\u274C"
escalation = f" (alert {state.alert_count + 1}/4)" if state.alert_count > 0 else ""
alerts.append(self._make_alert(
alert_type, name, short, node_num, region,
f"{emoji} {name} went offline in {region_display}.{escalation}",
is_critical,
))
state.fire(now)
if self._check_cooldown(cooldown_key, now):
emoji = "\U0001F6A8" if is_critical else "\u274C" # 🚨 or ❌
elif not was_online and is_online and self._rules.infra_recovery:
key = f"offline_{node_num}"
state = self._get_state(key)
if state.alert_count > 0:
alerts.append(self._make_alert(
"infra_recovery", name, short, node_num, region,
f"\u2705 {name} is back online in {region_display}.",
is_critical,
))
state.resolve()
if self._rules.new_router and not self._first_run:
if node_num not in self._known_routers:
alerts.append(self._make_alert(
"new_router", name, short, node_num, region,
f"\U0001F4E1 New router appeared: {name} in {region_display}.",
False,
))
self._prev_infra_online[node_num] = is_online
self._known_routers.add(node_num)
return alerts
def _check_power(self, health, now: float) -> list[dict]:
alerts = []
for node in health.nodes.values():
if not node.is_infrastructure:
continue
if node.battery_percent is None:
continue
node_num = node.node_num
name = node.long_name or node.short_name or str(node_num)
short = (node.short_name or str(node_num)).upper()
region = node.region or "Unknown"
is_critical = short in self._critical_nodes
region_display = self._get_region_display(region)
bat = node.battery_percent
if self._rules.power_source_change and not self._first_run:
current_source = "usb" if bat > 100 else "battery"
prev_source = self._prev_power_source.get(node_num)
if prev_source == "usb" and current_source == "battery":
key = f"power_change_{node_num}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append(self._make_alert(
"power_source_change", name, short, node_num, region,
f"\u26A1 {name} switched from USB to battery in {region_display}. Possible power outage.",
is_critical,
))
state.fire(now)
elif prev_source == "battery" and current_source == "usb":
key = f"power_change_{node_num}"
state = self._get_state(key)
if state.alert_count > 0:
state.resolve()
self._prev_power_source[node_num] = current_source
if 0 < bat <= 100 and not self._first_run:
prev_bat = self._prev_battery.get(node_num)
if self._rules.battery_emergency and bat < self._rules.battery_emergency_threshold:
if prev_bat is None or prev_bat >= self._rules.battery_emergency_threshold:
key = f"bat_emergency_{node_num}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append(self._make_alert(
"battery_emergency", name, short, node_num, region,
f"\U0001F6A8 {name} battery EMERGENCY at {bat:.0f}% in {region_display}.",
is_critical,
))
state.fire(now)
elif self._rules.battery_critical and bat < self._rules.battery_critical_threshold:
if prev_bat is None or prev_bat >= self._rules.battery_critical_threshold:
key = f"bat_critical_{node_num}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append(self._make_alert(
"battery_critical", name, short, node_num, region,
f"\U0001F50B {name} battery critical at {bat:.0f}% in {region_display}.",
is_critical,
))
state.fire(now)
elif self._rules.battery_warning and bat < self._rules.battery_warning_threshold:
if prev_bat is None or prev_bat >= self._rules.battery_warning_threshold:
key = f"bat_warning_{node_num}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append(self._make_alert(
"battery_warning", name, short, node_num, region,
f"\U0001F50B {name} battery low at {bat:.0f}% in {region_display}.",
is_critical,
))
state.fire(now)
if prev_bat is not None and bat > prev_bat + 5:
for prefix in ["bat_emergency", "bat_critical", "bat_warning"]:
key = f"{prefix}_{node_num}"
state = self._get_state(key)
if state.alert_count > 0:
state.resolve()
if self._rules.battery_trend_declining and 0 < bat <= 100:
trend = self._get_battery_trend(node_num, days=7)
if trend and trend["direction"] == "declining" and trend["total_drop"] > 10:
key = f"bat_trend_{node_num}"
state = self._get_state(key)
if state.alert_count == 0 and state.should_fire(now):
alerts.append(self._make_alert(
"battery_trend", name, short, node_num, region,
f"\U0001F50B {name} battery declining: {trend['start']:.0f}% \u2192 {trend['end']:.0f}% over 7 days ({trend['rate']:.1f}%/day) in {region_display}.",
is_critical,
))
state.fire(now)
if self._rules.solar_not_charging and getattr(node, "has_solar", False) and 0 < bat <= 100:
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo("America/Boise")
hour = datetime.now(tz).hour
if 8 <= hour <= 18:
prev_bat = self._prev_battery.get(node_num)
if prev_bat is not None and bat < prev_bat - 2:
key = f"solar_{node_num}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append(self._make_alert(
"solar_not_charging", name, short, node_num, region,
f"\u2600\uFE0F {name} solar not charging in {region_display}.",
is_critical,
))
state.fire(now)
except Exception:
pass
self._prev_battery[node_num] = bat
return alerts
def _check_utilization(self, health, now: float) -> list[dict]:
alerts = []
for node in health.nodes.values():
node_num = node.node_num
name = node.long_name or node.short_name or str(node_num)
short = (node.short_name or str(node_num)).upper()
region = node.region or "Unknown"
region_display = self._get_region_display(region)
if self._rules.sustained_high_util and node.channel_utilization is not None:
threshold = self._rules.high_util_threshold
required_hours = self._rules.high_util_hours
if node.channel_utilization > threshold:
if node_num not in self._util_exceeded_since:
self._util_exceeded_since[node_num] = now
else:
duration_hours = (now - self._util_exceeded_since[node_num]) / 3600
if duration_hours >= required_hours:
key = f"util_{node_num}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append(self._make_alert(
"sustained_high_util", name, short, node_num, region,
f"\U0001F525 {name} at {node.channel_utilization:.0f}% util for {duration_hours:.0f}+ hours in {region_display}.",
False,
))
state.fire(now)
else:
if node_num in self._util_exceeded_since:
del self._util_exceeded_since[node_num]
key = f"util_{node_num}"
state = self._get_state(key)
if state.alert_count > 0:
state.resolve()
if self._rules.packet_flood and not self._first_run:
if getattr(node, "packets_sent_24h", 0) > self._rules.packet_flood_threshold:
key = f"flood_{node_num}"
state = self._get_state(key)
if state.alert_count == 0:
alerts.append(self._make_alert(
"packet_flood", name, short, node_num, region,
f"\U0001F4E1 {name} sent {node.packets_sent_24h} packets in 24h (threshold: {self._rules.packet_flood_threshold}) in {region_display}.",
False,
))
state.fire(now)
return alerts
def _check_coverage(self, health, now: float) -> list[dict]:
alerts = []
for node in health.nodes.values():
if not node.is_infrastructure:
continue
node_num = node.node_num
name = node.long_name or node.short_name or str(node_num)
short = (node.short_name or str(node_num)).upper()
region = node.region or "Unknown"
is_critical = short in self._critical_nodes
region_display = self._get_region_display(region)
if self._rules.infra_single_gateway and node.avg_gateways is not None and not self._first_run:
prev_gw = self._prev_gateways.get(node_num)
if prev_gw is not None and prev_gw > 1.0 and node.avg_gateways <= 1.0:
key = f"single_gw_{node_num}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append(self._make_alert(
"infra_single_gateway", name, short, node_num, region,
f"\u26A0\uFE0F {name} dropped to single gateway in {region_display}. At risk if gateway fails.",
is_critical,
))
state.fire(now)
elif prev_gw is not None and prev_gw <= 1.0 and node.avg_gateways > 1.0:
key = f"single_gw_{node_num}"
state = self._get_state(key)
if state.alert_count > 0:
state.resolve()
self._prev_gateways[node_num] = node.avg_gateways
if self._rules.feeder_offline and not self._first_run:
current_feeders = set()
for node in health.nodes.values():
for gw in getattr(node, "feeder_gateways", []):
gw_name = gw.get("gateway_name") or gw.get("gateway_id", "")
if gw_name:
current_feeders.add(gw_name)
if self._prev_feeder_gateways:
lost_feeders = self._prev_feeder_gateways - current_feeders
for feeder in lost_feeders:
key = f"feeder_{feeder}"
state = self._get_state(key)
if state.should_fire(now):
alerts.append({
"type": "feeder_offline",
"node_name": feeder,
"node_short": feeder,
"node_num": 0,
"region": "",
"message": f"\U0001F4E1 Feeder gateway {feeder} stopped responding.",
"scope_type": "mesh",
"scope_value": None,
"is_critical": False,
})
state.fire(now)
recovered_feeders = current_feeders - self._prev_feeder_gateways
for feeder in recovered_feeders:
key = f"feeder_{feeder}"
state = self._get_state(key)
if state.alert_count > 0:
state.resolve()
self._prev_feeder_gateways = current_feeders
if self._rules.region_total_blackout and not self._first_run:
for region in health.regions:
if not region.node_ids:
continue
infra_in_region = []
for nid_str in region.node_ids:
try:
nid = int(nid_str)
except (ValueError, TypeError):
continue
node = health.nodes.get(nid)
if node and node.is_infrastructure:
infra_in_region.append(node)
if infra_in_region and all(not n.is_online for n in infra_in_region):
key = f"blackout_{region.name}"
state = self._get_state(key)
if state.should_fire(now):
region_display = self._get_region_display(region.name)
alerts.append({
"type": "region_total_blackout",
"node_name": region.name,
"node_short": region.name,
"node_num": 0,
"region": region.name,
"message": f"\U0001F6A8 TOTAL BLACKOUT: All infrastructure in {region_display} is offline!",
"scope_type": "region",
"scope_value": region.name,
"is_critical": True,
})
state.fire(now)
return alerts
def _check_health_scores(self, health, now: float) -> list[dict]:
alerts = []
if self._first_run:
self._prev_mesh_score = health.score.composite
for region in health.regions:
self._prev_region_scores[region.name] = region.score.composite
return alerts
if self._rules.mesh_score_alert:
current = health.score.composite
threshold = self._rules.mesh_score_threshold
if current < threshold and (self._prev_mesh_score is None or self._prev_mesh_score >= threshold):
key = "mesh_score"
state = self._get_state(key)
if state.should_fire(now):
alerts.append({
"type": "mesh_score_low",
"node_name": "Mesh",
"node_short": "MESH",
"node_num": 0,
"region": "",
"message": f"\U0001F4C9 Mesh health dropped to {current:.0f}/100 (threshold: {threshold}).",
"scope_type": "mesh",
"scope_value": None,
"is_critical": False,
})
state.fire(now)
elif current >= threshold:
key = "mesh_score"
state = self._get_state(key)
if state.alert_count > 0:
state.resolve()
self._prev_mesh_score = current
if self._rules.region_score_alert:
threshold = self._rules.region_score_threshold
for region in health.regions:
current = region.score.composite
prev = self._prev_region_scores.get(region.name)
if current < threshold and (prev is None or prev >= threshold):
key = f"region_score_{region.name}"
state = self._get_state(key)
if state.should_fire(now):
region_display = self._get_region_display(region.name)
alerts.append({
"type": "region_score_low",
"node_name": region.name,
"node_short": region.name,
"node_num": 0,
"region": region.name,
"message": f"\U0001F4C9 {region_display} health dropped to {current:.0f}/100 (threshold: {threshold}).",
"scope_type": "region",
"scope_value": region.name,
"is_critical": False,
})
state.fire(now)
elif current >= threshold:
key = f"region_score_{region.name}"
state = self._get_state(key)
if state.alert_count > 0:
state.resolve()
self._prev_region_scores[region.name] = current
return alerts
def _get_battery_trend(self, node_num: int, days: int = 7) -> Optional[dict]:
"""Query SQLite for battery trend over N days."""
if not self._db_path:
return None
try:
import sqlite3
conn = sqlite3.connect(self._db_path)
cursor = conn.cursor()
cutoff = time.time() - (days * 86400)
rows = cursor.execute("""
SELECT battery_percent, timestamp
FROM node_snapshots
WHERE node_num = ? AND timestamp > ? AND battery_percent IS NOT NULL
AND battery_percent > 0 AND battery_percent <= 100
ORDER BY timestamp ASC
""", (node_num, cutoff)).fetchall()
conn.close()
if len(rows) < 10:
return None
start_bat = rows[0][0]
end_bat = rows[-1][0]
total_drop = start_bat - end_bat
duration_days = (rows[-1][1] - rows[0][1]) / 86400
if duration_days < 1:
return None
rate = total_drop / duration_days
return {
"start": start_bat,
"end": end_bat,
"total_drop": total_drop,
"duration_days": duration_days,
"rate": rate,
"direction": "declining" if rate > 1.0 else "stable" if abs(rate) < 1.0 else "charging",
}
except Exception as e:
logger.debug(f"Battery trend query error: {e}")
return None
def _make_alert(self, alert_type, name, short, node_num, region, message, is_critical):
return {
"type": alert_type,
"node_name": name,
"node_short": short,
"node_num": node_num,
"region": region,
"message": f"{emoji} {name} went offline in {region_display}.",
"scope_type": "region",
"scope_value": region,
"message": message,
"scope_type": "region" if region and region != "Unknown" else "mesh",
"scope_value": region if region and region != "Unknown" else None,
"is_critical": is_critical,
})
self._cooldowns[cooldown_key] = now
elif not was_online and is_online:
# Node came BACK ONLINE
cooldown_key = f"recovery_{node_num}"
if self._check_cooldown(cooldown_key, now):
region_display = self._get_region_display(region)
alerts.append({
"type": "infra_recovery",
"node_name": name,
"node_short": short,
"node_num": node_num,
"region": region,
"message": f"\u2705 {name} is back online in {region_display}.", # ✅
"scope_type": "region",
"scope_value": region,
"is_critical": is_critical,
})
self._cooldowns[cooldown_key] = now
# --- Battery critical detection (infra only) ---
if node.battery_percent is not None and 0 < node.battery_percent <= 100:
prev_bat = self._prev_battery.get(node_num)
current_bat = node.battery_percent
if current_bat < 10 and (prev_bat is None or prev_bat >= 10):
# Battery just dropped below 10%
cooldown_key = f"battery_{node_num}"
if self._check_cooldown(cooldown_key, now):
region_display = self._get_region_display(region)
alerts.append({
"type": "battery_critical",
"node_name": name,
"node_short": short,
"node_num": node_num,
"region": region,
"message": f"\U0001F50B {name} battery critical at {current_bat:.0f}% in {region_display}.", # 🔋
"scope_type": "region",
"scope_value": region,
"is_critical": is_critical,
})
self._cooldowns[cooldown_key] = now
self._prev_battery[node_num] = current_bat
# Update state snapshot
self._prev_infra_online[node_num] = is_online
self._pending_alerts = alerts
return alerts
}
def _get_region_display(self, region: str) -> str:
"""Get display name for region."""
if not self._reporter:
return region
try:
@ -163,28 +568,15 @@ class AlertEngine:
pass
return region
def _check_cooldown(self, key: str, now: float) -> bool:
"""Check if enough time has passed since last alert for this condition."""
last = self._cooldowns.get(key, 0)
return (now - last) >= self._cooldown_seconds
def get_pending_alerts(self) -> list[dict]:
"""Get alerts pending delivery."""
return self._pending_alerts
def clear_pending(self):
"""Clear pending alerts after delivery."""
self._pending_alerts = []
def get_subscribers_for_alert(self, alert: dict) -> list[dict]:
"""Find subscribers matching an alert's scope."""
if not self._subs:
return []
# Get all alert subscribers
# mesh-scope subscribers get everything
# region-scope subscribers get alerts for their region
# node-scope subscribers get alerts for their specific node
return self._subs.get_alert_subscribers(
scope_type=alert.get("scope_type"),
scope_value=alert.get("scope_value"),

View file

@ -1042,6 +1042,7 @@ class Configurator:
table.add_row("7", "Critical Nodes", ", ".join(crit_nodes) if crit_nodes else "[dim]none[/dim]")
table.add_row("8", "Alert Channel", f"Channel {alert_ch}" if alert_ch >= 0 else "[dim]disabled[/dim]")
table.add_row("9", "Alert Cooldown (min)", str(alert_cd))
table.add_row("10", "Alert Rules", "Configure conditions")
table.add_row("0", "Back", "")
console.print(table)
@ -1096,6 +1097,8 @@ class Configurator:
self.modified = True
except ValueError:
pass
elif choice == 10:
self._edit_alert_rules()
def _edit_critical_nodes(self) -> None:
"""Edit critical node list."""
@ -1139,6 +1142,86 @@ class Configurator:
except (ValueError, IndexError):
pass
def _edit_alert_rules(self) -> None:
"""Edit per-condition alert toggles."""
mi = self.config.mesh_intelligence
rules = mi.alert_rules
while True:
self._clear()
console.print("[bold]Alert Rules[/bold]")
console.print("[dim]Toggle individual alert conditions on/off.[/dim]")
console.print()
table = Table(box=box.ROUNDED)
table.add_column("#", style="cyan", width=3)
table.add_column("Category", style="white")
table.add_column("Condition", style="white")
table.add_column("Status", style="green")
# Infrastructure
table.add_row("1", "Infra", "Router offline", self._status_icon(rules.infra_offline))
table.add_row("2", "Infra", "Router recovery", self._status_icon(rules.infra_recovery))
table.add_row("3", "Infra", "New router appears", self._status_icon(rules.new_router))
# Power
table.add_row("4", "Power", f"Battery warning (<{rules.battery_warning_threshold}%)", self._status_icon(rules.battery_warning))
table.add_row("5", "Power", f"Battery critical (<{rules.battery_critical_threshold}%)", self._status_icon(rules.battery_critical))
table.add_row("6", "Power", f"Battery emergency (<{rules.battery_emergency_threshold}%)", self._status_icon(rules.battery_emergency))
table.add_row("7", "Power", "7-day declining trend", self._status_icon(rules.battery_trend_declining))
table.add_row("8", "Power", "USB to battery (power outage)", self._status_icon(rules.power_source_change))
table.add_row("9", "Power", "Solar not charging", self._status_icon(rules.solar_not_charging))
# Utilization
table.add_row("10", "Util", f"Sustained >{rules.high_util_threshold}% for {rules.high_util_hours}h", self._status_icon(rules.sustained_high_util))
table.add_row("11", "Util", f"Packet flood (>{rules.packet_flood_threshold}/24h)", self._status_icon(rules.packet_flood))
# Coverage
table.add_row("12", "Coverage", "Infra drops to 1 gateway", self._status_icon(rules.infra_single_gateway))
table.add_row("13", "Coverage", "Feeder gateway offline", self._status_icon(rules.feeder_offline))
table.add_row("14", "Coverage", "Region total blackout", self._status_icon(rules.region_total_blackout))
# Health scores
table.add_row("15", "Scores", f"Mesh score <{rules.mesh_score_threshold}", self._status_icon(rules.mesh_score_alert))
table.add_row("16", "Scores", f"Region score <{rules.region_score_threshold}", self._status_icon(rules.region_score_alert))
table.add_row("", "", "", "")
table.add_row("0", "", "Back", "")
console.print(table)
console.print()
choice = IntPrompt.ask("Toggle option (0 to go back)", default=0)
if choice == 0:
return
# Map choice to field toggle
toggles = {
1: "infra_offline",
2: "infra_recovery",
3: "new_router",
4: "battery_warning",
5: "battery_critical",
6: "battery_emergency",
7: "battery_trend_declining",
8: "power_source_change",
9: "solar_not_charging",
10: "sustained_high_util",
11: "packet_flood",
12: "infra_single_gateway",
13: "feeder_offline",
14: "region_total_blackout",
15: "mesh_score_alert",
16: "region_score_alert",
}
field = toggles.get(choice)
if field and hasattr(rules, field):
current = getattr(rules, field)
setattr(rules, field, not current)
self.modified = True
def _edit_regions(self) -> None:
"""Edit region anchor points."""
from ..config import RegionAnchor

View file

@ -187,6 +187,45 @@ class RegionAnchor:
cities: list[str] = field(default_factory=list) # e.g., ["Twin Falls", "Burley", "Jerome"]
@dataclass
class AlertRulesConfig:
"""Per-condition alert toggles and thresholds."""
# Infrastructure
infra_offline: bool = True
infra_recovery: bool = True
new_router: bool = True
# Power
battery_trend_declining: bool = True
battery_warning: bool = True
battery_critical: bool = True
battery_emergency: bool = True
battery_warning_threshold: int = 50
battery_critical_threshold: int = 25
battery_emergency_threshold: int = 10
power_source_change: bool = True
solar_not_charging: bool = True
# Utilization
sustained_high_util: bool = True
high_util_threshold: float = 20.0
high_util_hours: int = 6
packet_flood: bool = True
packet_flood_threshold: int = 500
# Coverage
infra_single_gateway: bool = True
feeder_offline: bool = True
region_total_blackout: bool = True
# Health Scores
mesh_score_alert: bool = True
mesh_score_threshold: int = 70
region_score_alert: bool = True
region_score_threshold: int = 60
@dataclass
class MeshIntelligenceConfig:
"""Mesh intelligence and health scoring settings."""
@ -202,6 +241,7 @@ class MeshIntelligenceConfig:
critical_nodes: list[str] = field(default_factory=list) # Short names of critical nodes (e.g., ["MHR", "HPR"])
alert_channel: int = -1 # Channel to broadcast alerts on. -1 = disabled, 0+ = channel index
alert_cooldown_minutes: int = 30 # Min minutes between repeated alerts for same condition
alert_rules: AlertRulesConfig = field(default_factory=AlertRulesConfig)
@dataclass
@ -272,6 +312,9 @@ def _dict_to_dataclass(cls, data: dict):
if isinstance(item, dict) else item
for item in value
]
# Handle AlertRulesConfig
elif key == "alert_rules" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(AlertRulesConfig, value)
else:
kwargs[key] = value

View file

@ -279,10 +279,10 @@ class MeshAI:
health_engine=self.health_engine,
reporter=self.mesh_reporter,
subscription_manager=self.subscription_manager,
critical_nodes=getattr(mi, 'critical_nodes', []),
alert_cooldown_minutes=getattr(mi, 'alert_cooldown_minutes', 30),
config=mi,
db_path="/data/mesh_history.db",
)
logger.info(f"Alert engine initialized (critical nodes: {getattr(mi, 'critical_nodes', [])})")
logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})")
# Knowledge base (optional - gracefully degrade if deps missing)
kb_cfg = self.config.knowledge