feat(notifications): end-to-end verification system

- Channel connectivity test: SMTP, webhook, mesh with real errors
- Rule test shows live data from feeds, not canned examples
- Near-miss detection: shows events filtered by threshold
- Three send actions: current conditions, example alert, live alert
- Rule status indicators: last fired, data source health
- All errors show actual error messages
- Disabled feed detection with clear warnings

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
zvx-echo6 2026-05-13 18:40:18 -06:00
commit e35c0f5553
4 changed files with 3294 additions and 233 deletions

View file

@ -1,4 +1,4 @@
"""Notification channel implementations."""
"""Notification channel implementations with connectivity testing."""
import asyncio
import logging
@ -29,8 +29,25 @@ class NotificationChannel(ABC):
raise NotImplementedError
@abstractmethod
async def test(self) -> tuple[bool, str]:
"""Send test message. Returns (success, message)."""
async def test_connection(self) -> dict:
"""Test channel connectivity without sending actual content.
Returns:
{
"success": bool,
"message": str, # Human-readable result
"error": str, # Detailed error if failed
"details": {} # Channel-specific details
}
"""
raise NotImplementedError
async def deliver_test(self, message: str) -> tuple[bool, str]:
"""Deliver a specific test message through the channel.
Returns:
(success, result_message)
"""
raise NotImplementedError
@ -62,17 +79,74 @@ class MeshBroadcastChannel(NotificationChannel):
logger.error("Failed to broadcast alert: %s", e)
return False
async def test(self) -> tuple[bool, str]:
"""Send test broadcast."""
async def test_connection(self) -> dict:
"""Test mesh radio connectivity."""
if not self._connector:
return {
"success": False,
"message": "Not connected to radio",
"error": "MeshConnector not initialized. Check that meshtastic is connected.",
"details": {"channel": self._channel}
}
try:
# Check if interface is connected
interface = getattr(self._connector, '_interface', None)
if not interface:
return {
"success": False,
"message": "Radio interface not available",
"error": "Meshtastic interface not initialized",
"details": {"channel": self._channel}
}
# Get channel info
channels = getattr(interface, 'channels', [])
channel_name = "Unknown"
if self._channel < len(channels):
ch = channels[self._channel]
channel_name = getattr(ch, 'settings', {}).get('name', f'Channel {self._channel}')
if hasattr(ch, 'settings') and hasattr(ch.settings, 'name'):
channel_name = ch.settings.name or f'Channel {self._channel}'
# Send actual test message
self._connector.send_message(
text="[TEST] MeshAI notification system test",
text="MeshAI channel test - if you see this, delivery works",
destination=None,
channel=self._channel,
)
return True, "Test message sent to channel %d" % self._channel
return {
"success": True,
"message": f"Sent to channel {self._channel}: {channel_name}",
"error": "",
"details": {
"channel": self._channel,
"channel_name": channel_name,
}
}
except Exception as e:
return False, "Failed to send test: %s" % e
return {
"success": False,
"message": f"Failed to send to channel {self._channel}",
"error": str(e),
"details": {"channel": self._channel}
}
async def deliver_test(self, message: str) -> tuple[bool, str]:
"""Deliver a specific test message."""
if not self._connector:
return False, "Not connected to radio"
try:
self._connector.send_message(
text=message,
destination=None,
channel=self._channel,
)
return True, f"Sent to mesh channel {self._channel}"
except Exception as e:
return False, f"Mesh broadcast failed: {e}"
class MeshDMChannel(NotificationChannel):
@ -94,7 +168,7 @@ class MeshDMChannel(NotificationChannel):
for node_id in self._node_ids:
try:
dest = int(node_id) if node_id.isdigit() else node_id
dest = int(node_id, 16) if node_id.startswith("!") else (int(node_id) if node_id.isdigit() else node_id)
self._connector.send_message(text=message, destination=dest, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
@ -102,21 +176,91 @@ class MeshDMChannel(NotificationChannel):
return success
async def test(self) -> tuple[bool, str]:
"""Send test DM to all configured nodes."""
async def test_connection(self) -> dict:
"""Test DM delivery to configured nodes."""
if not self._connector:
return {
"success": False,
"message": "Not connected to radio",
"error": "MeshConnector not initialized",
"details": {"node_ids": self._node_ids}
}
if not self._node_ids:
return False, "No node IDs configured"
try:
for node_id in self._node_ids:
dest = int(node_id) if node_id.isdigit() else node_id
return {
"success": False,
"message": "No recipient nodes configured",
"error": "Add at least one node ID to receive DMs",
"details": {"node_ids": []}
}
results = []
all_success = True
for node_id in self._node_ids:
try:
dest = int(node_id, 16) if node_id.startswith("!") else (int(node_id) if node_id.isdigit() else node_id)
self._connector.send_message(
text="[TEST] MeshAI notification test",
text="MeshAI DM test",
destination=dest,
channel=0,
)
return True, "Test DMs sent to %d nodes" % len(self._node_ids)
except Exception as e:
return False, "Failed to send test DMs: %s" % e
results.append({"node": node_id, "success": True})
except Exception as e:
results.append({"node": node_id, "success": False, "error": str(e)})
all_success = False
success_nodes = [r["node"] for r in results if r["success"]]
failed_nodes = [r for r in results if not r["success"]]
if all_success:
node_names = ", ".join(self._node_ids)
return {
"success": True,
"message": f"Sent DM to {node_names}",
"error": "",
"details": {"results": results}
}
elif success_nodes:
return {
"success": False,
"message": f"Partial: sent to {len(success_nodes)}, failed {len(failed_nodes)}",
"error": "; ".join([f"{r['node']}: {r.get('error', 'unknown')}" for r in failed_nodes]),
"details": {"results": results}
}
else:
return {
"success": False,
"message": "All DMs failed",
"error": "; ".join([f"{r['node']}: {r.get('error', 'unknown')}" for r in failed_nodes]),
"details": {"results": results}
}
async def deliver_test(self, message: str) -> tuple[bool, str]:
"""Deliver a specific test message via DM."""
if not self._connector:
return False, "Not connected to radio"
if not self._node_ids:
return False, "No recipient nodes configured"
success_count = 0
errors = []
for node_id in self._node_ids:
try:
dest = int(node_id, 16) if node_id.startswith("!") else (int(node_id) if node_id.isdigit() else node_id)
self._connector.send_message(text=message, destination=dest, channel=0)
success_count += 1
except Exception as e:
errors.append(f"{node_id}: {e}")
if success_count == len(self._node_ids):
return True, f"Sent DM to {success_count} node(s)"
elif success_count > 0:
return True, f"Sent to {success_count}/{len(self._node_ids)} nodes. Errors: {'; '.join(errors)}"
else:
return False, f"All DMs failed: {'; '.join(errors)}"
class EmailChannel(NotificationChannel):
@ -172,29 +316,193 @@ class EmailChannel(NotificationChannel):
if self._tls:
context = ssl.create_default_context()
with smtplib.SMTP(self._host, self._port) as server:
with smtplib.SMTP(self._host, self._port, timeout=15) as server:
server.starttls(context=context)
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
else:
with smtplib.SMTP(self._host, self._port) as server:
with smtplib.SMTP(self._host, self._port, timeout=15) as server:
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
async def test(self) -> tuple[bool, str]:
async def test_connection(self) -> dict:
"""Test SMTP connectivity and authentication."""
if not self._host:
return {
"success": False,
"message": "SMTP host not configured",
"error": "Set smtp_host in email configuration",
"details": {}
}
if not self._recipients:
return {
"success": False,
"message": "No recipients configured",
"error": "Add at least one email recipient",
"details": {}
}
if not self._from:
return {
"success": False,
"message": "From address not configured",
"error": "Set from_address in email configuration",
"details": {}
}
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._test_smtp_connection)
return result
except Exception as e:
return {
"success": False,
"message": "SMTP test failed",
"error": str(e),
"details": {"host": self._host, "port": self._port}
}
def _test_smtp_connection(self) -> dict:
"""Actually test SMTP connection (blocking)."""
try:
if self._tls:
context = ssl.create_default_context()
with smtplib.SMTP(self._host, self._port, timeout=15) as server:
server.starttls(context=context)
if self._user and self._password:
server.login(self._user, self._password)
# Send actual test email
msg = MIMEMultipart()
msg["From"] = self._from
msg["To"] = ", ".join(self._recipients)
msg["Subject"] = "[MeshAI] Channel connectivity test"
msg.attach(MIMEText(
"This is a test message from MeshAI to verify email delivery is working.\n\n"
f"Sent: {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"SMTP: {self._host}:{self._port} (TLS)\n\n"
"If you received this, email delivery is working correctly.",
"plain"
))
server.sendmail(self._from, self._recipients, msg.as_string())
else:
with smtplib.SMTP(self._host, self._port, timeout=15) as server:
if self._user and self._password:
server.login(self._user, self._password)
msg = MIMEMultipart()
msg["From"] = self._from
msg["To"] = ", ".join(self._recipients)
msg["Subject"] = "[MeshAI] Channel connectivity test"
msg.attach(MIMEText(
"This is a test message from MeshAI to verify email delivery is working.\n\n"
f"Sent: {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"SMTP: {self._host}:{self._port}\n\n"
"If you received this, email delivery is working correctly.",
"plain"
))
server.sendmail(self._from, self._recipients, msg.as_string())
recipient_str = self._recipients[0]
if len(self._recipients) > 1:
recipient_str += f" +{len(self._recipients) - 1} more"
return {
"success": True,
"message": f"Email sent to {recipient_str} via {self._host}:{self._port}",
"error": "",
"details": {
"host": self._host,
"port": self._port,
"tls": self._tls,
"recipients": self._recipients,
}
}
except smtplib.SMTPAuthenticationError as e:
return {
"success": False,
"message": "SMTP authentication failed",
"error": f"Authentication failed with username '{self._user}'. Check username/password. For Gmail, use an App Password.",
"details": {"host": self._host, "port": self._port, "user": self._user}
}
except smtplib.SMTPConnectError as e:
return {
"success": False,
"message": "Connection refused",
"error": f"Could not connect to {self._host}:{self._port}. Check host and port.",
"details": {"host": self._host, "port": self._port}
}
except smtplib.SMTPServerDisconnected as e:
return {
"success": False,
"message": "Server disconnected",
"error": f"Server {self._host} disconnected unexpectedly. May need TLS.",
"details": {"host": self._host, "port": self._port, "tls": self._tls}
}
except ssl.SSLError as e:
return {
"success": False,
"message": "SSL/TLS error",
"error": f"SSL error connecting to {self._host}:{self._port}. Try toggling TLS setting.",
"details": {"host": self._host, "port": self._port, "tls": self._tls}
}
except TimeoutError:
return {
"success": False,
"message": "Connection timeout",
"error": f"Connection to {self._host}:{self._port} timed out. Check host/port and firewall.",
"details": {"host": self._host, "port": self._port}
}
except OSError as e:
if "Name or service not known" in str(e) or "getaddrinfo failed" in str(e):
return {
"success": False,
"message": "Host not found",
"error": f"Cannot resolve hostname '{self._host}'. Check the SMTP host.",
"details": {"host": self._host}
}
elif "Connection refused" in str(e):
return {
"success": False,
"message": "Connection refused",
"error": f"Connection refused at {self._host}:{self._port}. Check port number.",
"details": {"host": self._host, "port": self._port}
}
else:
return {
"success": False,
"message": "Network error",
"error": str(e),
"details": {"host": self._host, "port": self._port}
}
async def deliver_test(self, message: str) -> tuple[bool, str]:
"""Deliver a specific test message via email."""
if not self._recipients:
return False, "No recipients configured"
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._send_email,
"[MeshAI TEST] Notification Test",
"Test message from MeshAI.",
message,
)
return True, "Test email sent to %d recipients" % len(self._recipients)
recipient_str = self._recipients[0]
if len(self._recipients) > 1:
recipient_str += f" +{len(self._recipients) - 1}"
return True, f"Email sent to {recipient_str}"
except smtplib.SMTPAuthenticationError:
return False, f"SMTP auth failed for {self._user}"
except smtplib.SMTPConnectError:
return False, f"Cannot connect to {self._host}:{self._port}"
except Exception as e:
return False, "Failed to send test email: %s" % e
return False, f"Email failed: {e}"
class WebhookChannel(NotificationChannel):
@ -267,12 +575,175 @@ class WebhookChannel(NotificationChannel):
logger.error("Webhook failed: %s", e)
return False
async def test(self) -> tuple[bool, str]:
test_alert = {"type": "test", "severity": "info", "message": "MeshAI test message"}
success = await self.deliver(test_alert, {})
if success:
return True, "Test sent to %s" % self._url
return False, "Webhook failed"
async def test_connection(self) -> dict:
"""Test webhook connectivity."""
if not self._url:
return {
"success": False,
"message": "Webhook URL not configured",
"error": "Set webhook_url in configuration",
"details": {}
}
# Validate URL format
try:
from urllib.parse import urlparse
parsed = urlparse(self._url)
if not parsed.scheme or not parsed.netloc:
return {
"success": False,
"message": "Invalid URL format",
"error": f"URL must include scheme (https://) and host",
"details": {"url": self._url}
}
except Exception:
return {
"success": False,
"message": "Invalid URL",
"error": "Could not parse webhook URL",
"details": {"url": self._url}
}
# Build test payload based on webhook type
if "discord.com" in self._url:
payload = {
"embeds": [{
"title": "MeshAI: Channel Test",
"description": "This is a connectivity test from MeshAI. If you see this, webhook delivery is working.",
"color": 0x00FF00,
}]
}
elif "slack.com" in self._url:
payload = {
"text": "MeshAI: Channel connectivity test - webhook delivery is working"
}
elif "ntfy" in self._url:
# ntfy uses plain text body
try:
async with httpx.AsyncClient() as client:
headers = {
**self._headers,
"Title": "MeshAI Channel Test",
"Priority": "3",
}
resp = await client.post(
self._url,
content="Channel connectivity test - if you see this, webhook delivery works",
headers=headers,
timeout=10,
)
if resp.status_code < 400:
return {
"success": True,
"message": f"Webhook returned {resp.status_code} OK",
"error": "",
"details": {"url": self._url, "status": resp.status_code}
}
else:
return {
"success": False,
"message": f"Webhook returned {resp.status_code}",
"error": f"HTTP {resp.status_code}: {resp.text[:200]}",
"details": {"url": self._url, "status": resp.status_code}
}
except httpx.ConnectError as e:
return {
"success": False,
"message": "Connection failed",
"error": f"Cannot connect to {parsed.netloc}. Check URL.",
"details": {"url": self._url}
}
except httpx.TimeoutException:
return {
"success": False,
"message": "Connection timeout",
"error": f"Request to {parsed.netloc} timed out",
"details": {"url": self._url}
}
except Exception as e:
return {
"success": False,
"message": "Request failed",
"error": str(e),
"details": {"url": self._url}
}
else:
payload = {
"type": "test",
"severity": "info",
"message": "MeshAI channel connectivity test",
"timestamp": time.time(),
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
json=payload,
headers={"Content-Type": "application/json", **self._headers},
timeout=10,
)
if resp.status_code < 400:
return {
"success": True,
"message": f"Webhook returned {resp.status_code} OK",
"error": "",
"details": {"url": self._url, "status": resp.status_code}
}
else:
# Try to get error details from response
error_body = ""
try:
error_body = resp.text[:200]
except Exception:
pass
return {
"success": False,
"message": f"Webhook returned {resp.status_code}",
"error": f"HTTP {resp.status_code}{': ' + error_body if error_body else ''}",
"details": {"url": self._url, "status": resp.status_code}
}
except httpx.ConnectError as e:
return {
"success": False,
"message": "Connection failed",
"error": f"Cannot connect to {parsed.netloc}. Check URL and network.",
"details": {"url": self._url}
}
except httpx.TimeoutException:
return {
"success": False,
"message": "Connection timeout",
"error": f"Request to {parsed.netloc} timed out after 10s",
"details": {"url": self._url}
}
except Exception as e:
return {
"success": False,
"message": "Request failed",
"error": str(e),
"details": {"url": self._url}
}
async def deliver_test(self, message: str) -> tuple[bool, str]:
"""Deliver a specific test message via webhook."""
try:
test_alert = {"type": "test", "severity": "info", "message": message}
success = await self.deliver(test_alert, {})
if success:
try:
from urllib.parse import urlparse
host = urlparse(self._url).netloc
return True, f"Sent to {host}"
except Exception:
return True, "Webhook delivered"
else:
return False, "Webhook returned error"
except Exception as e:
return False, f"Webhook failed: {e}"
def create_channel(config: dict, connector=None) -> NotificationChannel:

View file

@ -1,6 +1,9 @@
"""Notification router - matches alerts to rules and delivers via channels."""
import asyncio
import json
import logging
import os
import time
from datetime import datetime
from typing import Optional, TYPE_CHECKING
@ -16,6 +19,9 @@ logger = logging.getLogger(__name__)
# Severity levels in order
SEVERITY_ORDER = ["info", "advisory", "watch", "warning", "critical", "emergency"]
# State file for rule statistics
RULE_STATS_FILE = "/opt/meshai/data/rule_stats.json"
class NotificationRouter:
"""Routes alerts through matching rules to notification channels."""
@ -27,117 +33,177 @@ class NotificationRouter:
llm_backend=None,
timezone: str = "America/Boise",
):
self._channels: dict[str, NotificationChannel] = {}
self._rules: list[dict] = []
self._quiet_enabled = getattr(config, "quiet_hours_enabled", True)
self._quiet_start = getattr(config, "quiet_hours_start", "22:00")
self._quiet_end = getattr(config, "quiet_hours_end", "06:00")
self._timezone = timezone
self._dedup_window = getattr(config, "dedup_seconds", 600)
self._recent: dict[tuple, float] = {} # (category, event_key) -> last_sent_time
self._recent: dict[tuple, float] = {} # (rule_name, category, event_key) -> last_sent_time
self._summarizer = MessageSummarizer(llm_backend) if llm_backend else None
self._connector = connector
self._config = config
# Create channel instances from config
channels_config = getattr(config, "channels", [])
for ch_config in channels_config:
if hasattr(ch_config, "__dict__"):
ch_dict = {k: v for k, v in ch_config.__dict__.items() if not k.startswith("_")}
else:
ch_dict = ch_config
# Rule statistics: {rule_name: {last_fired, last_test, fire_count}}
self._rule_stats = self._load_rule_stats()
if not ch_dict.get("enabled", True):
continue
channel_id = ch_dict.get("id", "")
if not channel_id:
continue
try:
channel = create_channel(ch_dict, connector)
self._channels[channel_id] = channel
logger.debug("Created notification channel: %s (%s)", channel_id, ch_dict.get("type"))
except Exception as e:
logger.warning("Failed to create channel %s: %s", channel_id, e)
# Load rules
# Load rules from config
rules_config = getattr(config, "rules", [])
for rule in rules_config:
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = rule
self._rules.append(rule_dict)
rule_dict = dict(rule) if isinstance(rule, dict) else {}
logger.info(
"Notification router initialized: %d channels, %d rules",
len(self._channels),
len(self._rules),
)
# Skip disabled rules
if not rule_dict.get("enabled", True):
continue
# Only load condition-triggered rules (scheduled rules handled by scheduler)
if rule_dict.get("trigger_type", "condition") == "condition":
self._rules.append(rule_dict)
logger.info("Notification router initialized: %d condition rules", len(self._rules))
def _load_rule_stats(self) -> dict:
"""Load rule statistics from persistent storage."""
try:
if os.path.exists(RULE_STATS_FILE):
with open(RULE_STATS_FILE, "r") as f:
return json.load(f)
except Exception as e:
logger.warning("Failed to load rule stats: %s", e)
return {}
def _save_rule_stats(self):
"""Save rule statistics to persistent storage."""
try:
os.makedirs(os.path.dirname(RULE_STATS_FILE), exist_ok=True)
with open(RULE_STATS_FILE, "w") as f:
json.dump(self._rule_stats, f, indent=2)
except Exception as e:
logger.warning("Failed to save rule stats: %s", e)
def _record_fire(self, rule_name: str):
"""Record that a rule fired."""
if rule_name not in self._rule_stats:
self._rule_stats[rule_name] = {"last_fired": None, "last_test": None, "fire_count": 0}
self._rule_stats[rule_name]["last_fired"] = time.time()
self._rule_stats[rule_name]["fire_count"] = self._rule_stats[rule_name].get("fire_count", 0) + 1
self._save_rule_stats()
def _record_test(self, rule_name: str):
"""Record that a rule was tested."""
if rule_name not in self._rule_stats:
self._rule_stats[rule_name] = {"last_fired": None, "last_test": None, "fire_count": 0}
self._rule_stats[rule_name]["last_test"] = time.time()
self._save_rule_stats()
def get_rule_stats(self, rule_name: str) -> dict:
"""Get statistics for a rule."""
return self._rule_stats.get(rule_name, {"last_fired": None, "last_test": None, "fire_count": 0})
def _create_channel_for_rule(self, rule: dict) -> Optional[NotificationChannel]:
"""Create a channel instance from a rule's inline delivery config."""
delivery_type = rule.get("delivery_type", "")
if not delivery_type:
return None
if delivery_type == "mesh_broadcast":
config = {
"type": "mesh_broadcast",
"channel_index": rule.get("broadcast_channel", 0),
}
elif delivery_type == "mesh_dm":
config = {
"type": "mesh_dm",
"node_ids": rule.get("node_ids", []),
}
elif delivery_type == "email":
config = {
"type": "email",
"smtp_host": rule.get("smtp_host", ""),
"smtp_port": rule.get("smtp_port", 587),
"smtp_user": rule.get("smtp_user", ""),
"smtp_password": rule.get("smtp_password", ""),
"smtp_tls": rule.get("smtp_tls", True),
"from_address": rule.get("from_address", ""),
"recipients": rule.get("recipients", []),
}
elif delivery_type == "webhook":
config = {
"type": "webhook",
"url": rule.get("webhook_url", ""),
"headers": rule.get("webhook_headers", {}),
}
else:
logger.warning("Unknown delivery type '%s' in rule '%s'", delivery_type, rule.get("name"))
return None
try:
return create_channel(config, self._connector)
except Exception as e:
logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e)
return None
async def process_alert(self, alert: dict) -> bool:
"""Route an alert through matching rules to channels.
Returns True if alert was delivered to at least one channel.
"""
"""Route an alert through matching rules."""
category = alert.get("type", "")
severity = alert.get("severity", "info")
delivered = False
for rule in self._rules:
# Check category match
rule_name = rule.get("name", "unnamed")
rule_categories = rule.get("categories", [])
if rule_categories and category not in rule_categories:
continue
# Check severity threshold
min_severity = rule.get("min_severity", "info")
if not self._severity_meets(severity, min_severity):
continue
# Check quiet hours (emergencies and criticals override)
if self._in_quiet_hours() and severity not in ("emergency", "critical"):
if not rule.get("override_quiet", False):
continue
if self._quiet_enabled and self._in_quiet_hours():
if severity not in ("emergency", "critical"):
if not rule.get("override_quiet", False):
continue
# Check dedup
cooldown = rule.get("cooldown_minutes", 10) * 60
event_id = alert.get("event_id", alert.get("message", "")[:50])
dedup_key = (category, event_id)
dedup_key = (rule_name, category, event_id)
now = time.time()
if dedup_key in self._recent:
if now - self._recent[dedup_key] < self._dedup_window:
logger.debug("Skipping duplicate alert: %s", category)
if now - self._recent[dedup_key] < cooldown:
continue
self._recent[dedup_key] = now
# Deliver to each channel in the rule
channel_ids = rule.get("channel_ids", [])
for channel_id in channel_ids:
channel = self._channels.get(channel_id)
if not channel:
continue
logger.info("Rule '%s' matched alert: %s (%s)", rule_name, category, severity)
try:
# Summarize for mesh channels if over 200 chars
delivery_alert = alert
message = alert.get("message", "")
if channel.channel_type in ("mesh_broadcast", "mesh_dm"):
if len(message) > 200:
if self._summarizer:
summary = await self._summarizer.summarize(message, max_chars=195)
delivery_alert = {**alert, "message": summary}
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
delivery_type = rule.get("delivery_type", "")
if not delivery_type:
continue
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
logger.info(
"Alert delivered via %s: %s",
channel_id,
category,
)
except Exception as e:
logger.warning("Channel %s delivery failed: %s", channel_id, e)
channel = self._create_channel_for_rule(rule)
if not channel:
continue
try:
delivery_alert = alert
message = alert.get("message", "")
if channel.channel_type in ("mesh_broadcast", "mesh_dm"):
if len(message) > 200:
if self._summarizer:
summary = await self._summarizer.summarize(message, max_chars=195)
delivery_alert = {**alert, "message": summary}
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
self._record_fire(rule_name)
except Exception as e:
logger.warning("Rule '%s' delivery failed: %s", rule_name, e)
return delivered
@ -148,87 +214,548 @@ class NotificationRouter:
required_idx = SEVERITY_ORDER.index(required.lower())
return actual_idx >= required_idx
except ValueError:
return True # Unknown severity, allow through
return True
def _in_quiet_hours(self) -> bool:
"""Check if current time is within quiet hours."""
if not self._quiet_enabled:
return False
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(self._timezone)
now = datetime.now(tz)
current_time = now.strftime("%H:%M")
start = self._quiet_start
end = self._quiet_end
if start <= end:
# Simple range (e.g., 01:00 to 06:00)
return start <= current_time <= end
else:
# Crosses midnight (e.g., 22:00 to 06:00)
return current_time >= start or current_time <= end
except Exception:
return False
def get_channels(self) -> list[dict]:
"""Get list of configured channels."""
return [
{"id": ch_id, "type": ch.channel_type}
for ch_id, ch in self._channels.items()
]
def get_rules(self) -> list[dict]:
"""Get list of configured rules."""
return self._rules
"""Get list of configured rules with stats."""
rules_with_stats = []
for rule in self._rules:
rule_copy = dict(rule)
stats = self.get_rule_stats(rule.get("name", ""))
rule_copy["_stats"] = stats
rules_with_stats.append(rule_copy)
return rules_with_stats
async def test_channel(self, channel_id: str) -> tuple[bool, str]:
"""Send a test alert to a specific channel."""
channel = self._channels.get(channel_id)
if not channel:
return False, "Channel not found: %s" % channel_id
return await channel.test()
async def test_channel(self, channel_config: dict) -> dict:
"""Test a channel's connectivity.
def add_mesh_subscription(
self,
node_id: str,
categories: list[str],
rule_name: Optional[str] = None,
) -> str:
"""Add a mesh DM subscription for a node.
Args:
channel_config: Channel configuration dict with type and settings
Creates a channel and rule for the node to receive alerts.
Returns the rule name.
Returns:
{success, message, error, details}
"""
# Create channel ID
channel_id = "mesh_dm_%s" % node_id
try:
channel = create_channel(channel_config, self._connector)
return await channel.test_connection()
except ValueError as e:
return {
"success": False,
"message": "Invalid channel configuration",
"error": str(e),
"details": {}
}
except Exception as e:
return {
"success": False,
"message": "Channel test failed",
"error": str(e),
"details": {}
}
# Create channel if it doesn't exist
if channel_id not in self._channels:
from .channels import MeshDMChannel
channel = MeshDMChannel(
connector=self._connector,
node_ids=[node_id],
)
self._channels[channel_id] = channel
def get_source_health(self, rule_categories: list, env_store=None) -> dict:
"""Get health status of data sources for a rule's categories.
# Create rule
Returns:
{
category_id: {
"enabled": bool,
"active_events": int,
"source": str,
"status": "ok" | "disabled" | "no_data"
}
}
"""
# Map categories to their data sources
category_sources = {
"hf_blackout": "swpc",
"geomagnetic_storm": "swpc",
"tropospheric_ducting": "ducting",
"weather_warning": "nws",
"fire_proximity": "nifc",
"wildfire_proximity": "nifc",
"new_ignition": "firms",
"stream_flood_warning": "usgs",
"stream_high_water": "usgs",
"road_closure": "roads511",
"traffic_congestion": "traffic",
"avalanche_warning": "avalanche",
"avalanche_considerable": "avalanche",
"infra_offline": "health",
"critical_node_down": "health",
"battery_warning": "health",
"battery_critical": "health",
"battery_emergency": "health",
"mesh_score_low": "health",
"high_utilization": "health",
"infra_recovery": "health",
"packet_flood": "health",
}
result = {}
for cat_id in rule_categories:
source = category_sources.get(cat_id, "unknown")
if source == "health":
# Mesh health is always available
result[cat_id] = {
"enabled": True,
"active_events": 0, # Would need health_engine to check
"source": "mesh_health",
"status": "ok"
}
elif env_store is None:
result[cat_id] = {
"enabled": False,
"active_events": 0,
"source": source,
"status": "disabled"
}
else:
# Check if source has an adapter
adapters = getattr(env_store, '_adapters', {})
if source in adapters:
events = env_store.get_active(source=source)
result[cat_id] = {
"enabled": True,
"active_events": len(events) if events else 0,
"source": source,
"status": "ok"
}
else:
result[cat_id] = {
"enabled": False,
"active_events": 0,
"source": source,
"status": "disabled"
}
return result
async def test_rule_with_conditions(
self,
rule_index: int,
alert_engine=None,
env_store=None,
health_engine=None,
send: bool = False,
action: str = "preview",
) -> dict:
"""Test a rule against current conditions with live data.
Args:
rule_index: Index of the rule to test
alert_engine: AlertEngine instance for pending alerts
env_store: EnvStore instance for environmental events
health_engine: MeshHealthEngine for mesh status
send: Legacy param - use action instead
action: "preview", "send_test", "send_status", "send_live"
"""
from .categories import get_category
rules_config = getattr(self._config, "rules", [])
if rule_index < 0 or rule_index >= len(rules_config):
return {
"conditions_matched": 0,
"preview_messages": [],
"is_example": False,
"delivered": False,
"delivery_method": "",
"delivery_result": "Rule index out of range",
}
rule = rules_config[rule_index]
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule)
rule_name = rule_dict.get("name", f"Rule {rule_index}")
rule_categories = rule_dict.get("categories", [])
min_severity = rule_dict.get("min_severity", "info")
delivery_type = rule_dict.get("delivery_type", "")
# Legacy support
if send and action == "preview":
action = "send_test"
# ============================================================
# SECTION 1: Collect LIVE DATA for rule's categories
# ============================================================
live_data_lines = []
feeds_not_enabled = []
category_sources = {
"hf_blackout": "swpc", "geomagnetic_storm": "swpc",
"tropospheric_ducting": "ducting",
"weather_warning": "nws",
"fire_proximity": "nifc", "wildfire_proximity": "nifc", "new_ignition": "firms",
"stream_flood_warning": "usgs", "stream_high_water": "usgs",
"road_closure": "roads511", "traffic_congestion": "traffic",
"avalanche_warning": "avalanche", "avalanche_considerable": "avalanche",
"infra_offline": "health", "critical_node_down": "health",
"battery_warning": "health", "battery_critical": "health",
"mesh_score_low": "health", "high_utilization": "health",
}
sources_needed = set()
for cat in rule_categories if rule_categories else []:
if cat in category_sources:
sources_needed.add(category_sources[cat])
# Check which sources are available
if env_store:
adapters = getattr(env_store, '_adapters', {})
if "swpc" in sources_needed:
if "swpc" in adapters and hasattr(env_store, 'get_swpc_status'):
swpc = env_store.get_swpc_status()
if swpc:
kp = swpc.get("kp_current", "?")
sfi = swpc.get("sfi", "?")
r = swpc.get("r_scale", 0)
s = swpc.get("s_scale", 0)
g = swpc.get("g_scale", 0)
live_data_lines.append(f"RF: SFI {sfi}, Kp {kp}, R{r}/S{s}/G{g}")
else:
feeds_not_enabled.append("SWPC")
if "ducting" in sources_needed:
if "ducting" in adapters and hasattr(env_store, 'get_ducting_status'):
ducting = env_store.get_ducting_status()
if ducting:
condition = ducting.get("condition", "unknown")
gradient = ducting.get("min_gradient", "?")
live_data_lines.append(f"Tropo: {condition}, dM/dz {gradient}")
else:
feeds_not_enabled.append("Ducting")
if "nws" in sources_needed:
if "nws" in adapters:
nws = env_store.get_active(source="nws")
if nws:
live_data_lines.append(f"NWS: {len(nws)} active alert(s)")
for a in nws[:2]:
headline = a.get('headline', a.get('message', 'Alert'))[:60]
live_data_lines.append(f" - {headline}")
else:
live_data_lines.append("NWS: No active alerts")
else:
feeds_not_enabled.append("NWS")
if "nifc" in sources_needed:
if "nifc" in adapters:
fires = env_store.get_active(source="nifc")
if fires:
live_data_lines.append(f"Fires: {len(fires)} active")
else:
live_data_lines.append("Fires: None active")
else:
feeds_not_enabled.append("NIFC")
if "firms" in sources_needed:
if "firms" in adapters:
hotspots = env_store.get_active(source="firms")
if hotspots:
live_data_lines.append(f"Hotspots: {len(hotspots)} detected")
else:
live_data_lines.append("Hotspots: None detected")
else:
feeds_not_enabled.append("FIRMS")
if "usgs" in sources_needed:
if "usgs" in adapters:
streams = env_store.get_active(source="usgs")
if streams:
live_data_lines.append(f"Streams: {len(streams)} gauge(s) reporting")
else:
live_data_lines.append("Streams: No alerts")
else:
feeds_not_enabled.append("USGS")
if "traffic" in sources_needed:
if "traffic" in adapters:
traffic = env_store.get_active(source="traffic")
if traffic:
live_data_lines.append(f"Traffic: {len(traffic)} corridor(s)")
else:
live_data_lines.append("Traffic: Normal")
else:
feeds_not_enabled.append("Traffic")
if "roads511" in sources_needed:
if "roads511" in adapters:
roads = env_store.get_active(source="roads511")
if roads:
live_data_lines.append(f"Roads: {len(roads)} event(s)")
else:
live_data_lines.append("Roads: No closures")
else:
feeds_not_enabled.append("511 Roads")
elif sources_needed - {"health"}:
feeds_not_enabled.append("Environmental feeds")
if health_engine and "health" in sources_needed:
mesh_health = getattr(health_engine, 'mesh_health', None)
if mesh_health:
score = mesh_health.score
live_data_lines.append(f"Mesh: {score.composite:.0f}/100, {score.infra_online}/{score.infra_total} infra")
# Add warning if feeds not enabled
if feeds_not_enabled:
live_data_lines.append(f"[!] Not enabled: {', '.join(feeds_not_enabled)}")
# ============================================================
# SECTION 2: Check for MATCHING and NEAR-MISS events
# ============================================================
matching_alerts = []
below_threshold = []
all_events = []
if alert_engine and hasattr(alert_engine, "get_pending_alerts"):
try:
for alert in alert_engine.get_pending_alerts():
all_events.append({
"type": alert.get("type", ""),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"headline": alert.get("message", "")[:80],
})
except Exception:
pass
if env_store and hasattr(env_store, "get_active"):
try:
for event in env_store.get_active():
all_events.append({
"type": event.get("type", event.get("category", "")),
"severity": event.get("severity", "info"),
"message": event.get("message", event.get("headline", str(event))),
"headline": event.get("headline", event.get("message", "Event"))[:80],
})
except Exception:
pass
for event in all_events:
event_type = event["type"]
severity = event["severity"]
category_match = not rule_categories
if not category_match:
for cat in rule_categories:
if event_type.startswith(cat.rstrip("_")) or cat in event_type or event_type == cat:
category_match = True
break
if category_match:
if self._severity_meets(severity, min_severity):
matching_alerts.append(event)
else:
below_threshold.append(event)
# ============================================================
# SECTION 3: Build response
# ============================================================
preview_messages = []
is_example = False
below_threshold_summary = ""
below_threshold_events = []
suggestion = ""
if matching_alerts:
for alert in matching_alerts[:5]:
msg = alert.get("message", "")
if len(msg) > 200 and delivery_type in ("mesh_broadcast", "mesh_dm"):
msg = msg[:195] + "..."
preview_messages.append(msg)
else:
is_example = True
if below_threshold:
severity_counts = {}
for evt in below_threshold:
sev = evt["severity"]
severity_counts[sev] = severity_counts.get(sev, 0) + 1
parts = [f"{count} at '{sev}'" for sev, count in severity_counts.items()]
below_threshold_summary = f"{len(below_threshold)} event(s) filtered by severity: {', '.join(parts)}. Rule requires '{min_severity}' or higher."
suggestion = f"Lower severity threshold to '{list(severity_counts.keys())[0]}' to match these events"
below_threshold_events = [{"headline": e["headline"], "severity": e["severity"]} for e in below_threshold[:5]]
if rule_categories:
for cat_id in rule_categories[:3]:
cat_info = get_category(cat_id)
preview_messages.append(f"[EXAMPLE] {cat_info.get('example_message', f'Alert: {cat_id}')}")
else:
cat_info = get_category("infra_offline")
preview_messages.append(f"[EXAMPLE] {cat_info.get('example_message', 'Alert notification')}")
# Get source health
source_health = self.get_source_health(rule_categories, env_store)
# ============================================================
# SECTION 4: Handle delivery actions
# ============================================================
delivered = False
delivery_result = "Preview only"
delivery_error = ""
if action != "preview":
if not delivery_type:
delivery_result = "No delivery method configured"
delivery_error = "Configure a delivery method to send test messages"
else:
channel = self._create_channel_for_rule(rule_dict)
if channel:
try:
if action == "send_status" and live_data_lines:
# Filter out the warning line for status message
data_lines = [l for l in live_data_lines if not l.startswith("[!]")]
status_msg = "[STATUS] " + " | ".join(data_lines[:4])
if len(status_msg) > 200:
status_msg = status_msg[:195] + "..."
success, result = await channel.deliver_test(status_msg)
delivered = success
delivery_result = result if success else f"Failed: {result}"
if not success:
delivery_error = result
elif action == "send_live" and matching_alerts:
live_msg = f"[LIVE TEST] {matching_alerts[0].get('message', '')}"
if len(live_msg) > 200:
live_msg = live_msg[:195] + "..."
success, result = await channel.deliver_test(live_msg)
delivered = success
delivery_result = result if success else f"Failed: {result}"
if not success:
delivery_error = result
elif action == "send_test":
if preview_messages:
test_msg = preview_messages[0]
if test_msg.startswith("[EXAMPLE]"):
test_msg = test_msg.replace("[EXAMPLE]", "[TEST]")
elif not test_msg.startswith("["):
test_msg = f"[TEST] {test_msg}"
else:
test_msg = "[TEST] MeshAI notification test"
success, result = await channel.deliver_test(test_msg)
delivered = success
delivery_result = result if success else f"Failed: {result}"
if not success:
delivery_error = result
# Record test
if action != "preview":
self._record_test(rule_name)
except Exception as e:
delivery_result = f"Delivery error"
delivery_error = str(e)
# Get rule stats
stats = self.get_rule_stats(rule_name)
return {
"live_data_summary": live_data_lines,
"conditions_matched": len(matching_alerts),
"preview_messages": preview_messages,
"is_example": is_example,
"conditions_below_threshold": len(below_threshold),
"below_threshold_summary": below_threshold_summary,
"below_threshold_events": below_threshold_events,
"suggestion": suggestion,
"delivered": delivered,
"delivery_method": delivery_type,
"delivery_result": delivery_result,
"delivery_error": delivery_error,
"can_send_live": len(matching_alerts) > 0,
"source_health": source_health,
"rule_stats": stats,
}
async def test_rule(self, rule_index: int) -> tuple[bool, str]:
"""Send a test alert through a specific rule (legacy method)."""
result = await self.test_rule_with_conditions(rule_index, action="send_test")
return result.get("delivered", False), result.get("delivery_result", "Unknown")
async def preview_rule(self, rule_index: int) -> dict:
"""Preview what a rule would match right now."""
rules_config = getattr(self._config, "rules", [])
if rule_index < 0 or rule_index >= len(rules_config):
return {"matches": False, "conditions": [], "preview": "Invalid rule index"}
rule = rules_config[rule_index]
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule)
if rule_dict.get("trigger_type", "condition") == "condition":
from .categories import get_category
categories = rule_dict.get("categories", [])
if not categories:
example = get_category("infra_offline")
return {
"matches": True,
"conditions": ["All alert categories"],
"preview": example.get("example_message", "Alert notification"),
}
else:
cat_info = get_category(categories[0])
return {
"matches": True,
"conditions": [get_category(c)["name"] for c in categories],
"preview": cat_info.get("example_message", f"Alert: {categories[0]}"),
}
elif rule_dict.get("trigger_type") == "schedule":
message_type = rule_dict.get("message_type", "mesh_health_summary")
return {
"matches": True,
"conditions": [f"Scheduled: {rule_dict.get('schedule_frequency', 'daily')}"],
"preview": f"[{message_type}] Report content would appear here",
}
return {"matches": False, "conditions": [], "preview": "Unknown rule type"}
def add_mesh_subscription(self, node_id: str, categories: list[str], rule_name: Optional[str] = None) -> str:
"""Add a mesh DM subscription for a node."""
if not rule_name:
rule_name = "sub_%s" % node_id
# Check if rule already exists
for rule in self._rules:
if rule.get("name") == rule_name:
# Update existing rule
rule["categories"] = categories if categories else []
rule["channel_ids"] = [channel_id]
rule["node_ids"] = [node_id]
return rule_name
# Add new rule
self._rules.append({
"name": rule_name,
"categories": categories if categories else [], # Empty = all
"enabled": True,
"trigger_type": "condition",
"categories": categories if categories else [],
"min_severity": "warning",
"channel_ids": [channel_id],
"delivery_type": "mesh_dm",
"node_ids": [node_id],
"cooldown_minutes": 10,
"override_quiet": False,
})
@ -236,16 +763,8 @@ class NotificationRouter:
def remove_mesh_subscription(self, node_id: str) -> bool:
"""Remove a mesh subscription for a node."""
channel_id = "mesh_dm_%s" % node_id
rule_name = "sub_%s" % node_id
# Remove channel
if channel_id in self._channels:
del self._channels[channel_id]
# Remove rule
self._rules = [r for r in self._rules if r.get("name") != rule_name]
return True
def get_node_subscriptions(self, node_id: str) -> list[str]:
@ -260,7 +779,4 @@ class NotificationRouter:
def cleanup_recent(self, max_age: int = 3600):
"""Clean up old entries from recent alerts cache."""
now = time.time()
self._recent = {
k: v for k, v in self._recent.items()
if now - v < max_age
}
self._recent = {k: v for k, v in self._recent.items() if now - v < max_age}