refactor(notifications): self-contained rules, remove abstract channels

- Each notification rule contains its own delivery config inline
- No more separate channels with abstract IDs to cross-reference
- Delivery type selector (Mesh Broadcast/DM/Email/Webhook) with
  inline config fields per type
- Follows MeshMonitor trigger-action UX pattern
- Channel picker from radio for mesh broadcast
- Node picker for mesh DM
- Collapsed rule cards show readable one-line summary
- Trigger type: condition (alerts) or schedule (daily reports)
- Schedule triggers support daily, weekly, custom cron
- Message types: mesh health, RF propagation, alerts digest, custom
- Migrates old channels+rules config to new flat format on load

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-13 07:31:59 +00:00
commit b4f7e24c26
9 changed files with 1248 additions and 1095 deletions

View file

@ -1,5 +1,6 @@
"""Notification router - matches alerts to rules and delivers via channels."""
import asyncio
import logging
import time
from datetime import datetime
@ -27,55 +28,76 @@ class NotificationRouter:
llm_backend=None,
timezone: str = "America/Boise",
):
self._channels: dict[str, NotificationChannel] = {}
self._rules: list[dict] = []
self._quiet_start = getattr(config, "quiet_hours_start", "22:00")
self._quiet_end = getattr(config, "quiet_hours_end", "06:00")
self._timezone = timezone
self._dedup_window = getattr(config, "dedup_seconds", 600)
self._recent: dict[tuple, float] = {} # (category, event_key) -> last_sent_time
self._summarizer = MessageSummarizer(llm_backend) if llm_backend else None
self._connector = connector
self._config = config
# Create channel instances from config
channels_config = getattr(config, "channels", [])
for ch_config in channels_config:
if hasattr(ch_config, "__dict__"):
ch_dict = {k: v for k, v in ch_config.__dict__.items() if not k.startswith("_")}
else:
ch_dict = ch_config
if not ch_dict.get("enabled", True):
continue
channel_id = ch_dict.get("id", "")
if not channel_id:
continue
try:
channel = create_channel(ch_dict, connector)
self._channels[channel_id] = channel
logger.debug("Created notification channel: %s (%s)", channel_id, ch_dict.get("type"))
except Exception as e:
logger.warning("Failed to create channel %s: %s", channel_id, e)
# Load rules
# Load rules from config
rules_config = getattr(config, "rules", [])
for rule in rules_config:
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = rule
self._rules.append(rule_dict)
rule_dict = dict(rule) if isinstance(rule, dict) else {}
logger.info(
"Notification router initialized: %d channels, %d rules",
len(self._channels),
len(self._rules),
)
# Skip disabled rules
if not rule_dict.get("enabled", True):
continue
# Only load condition-triggered rules (scheduled rules handled by scheduler)
if rule_dict.get("trigger_type", "condition") == "condition":
self._rules.append(rule_dict)
logger.info("Notification router initialized: %d condition rules", len(self._rules))
def _create_channel_for_rule(self, rule: dict) -> Optional[NotificationChannel]:
"""Create a channel instance from a rule's inline delivery config."""
delivery_type = rule.get("delivery_type", "")
if delivery_type == "mesh_broadcast":
config = {
"type": "mesh_broadcast",
"channel_index": rule.get("broadcast_channel", 0),
}
elif delivery_type == "mesh_dm":
config = {
"type": "mesh_dm",
"node_ids": rule.get("node_ids", []),
}
elif delivery_type == "email":
config = {
"type": "email",
"smtp_host": rule.get("smtp_host", ""),
"smtp_port": rule.get("smtp_port", 587),
"smtp_user": rule.get("smtp_user", ""),
"smtp_password": rule.get("smtp_password", ""),
"smtp_tls": rule.get("smtp_tls", True),
"from_address": rule.get("from_address", ""),
"recipients": rule.get("recipients", []),
}
elif delivery_type == "webhook":
config = {
"type": "webhook",
"url": rule.get("webhook_url", ""),
"headers": rule.get("webhook_headers", {}),
}
else:
logger.warning("Unknown delivery type: %s", delivery_type)
return None
try:
return create_channel(config, self._connector)
except Exception as e:
logger.warning("Failed to create channel for rule %s: %s", rule.get("name"), e)
return None
async def process_alert(self, alert: dict) -> bool:
"""Route an alert through matching rules to channels.
"""Route an alert through matching rules.
Returns True if alert was delivered to at least one channel.
"""
@ -99,45 +121,41 @@ class NotificationRouter:
if not rule.get("override_quiet", False):
continue
# Check dedup
# Check cooldown
cooldown = rule.get("cooldown_minutes", 10) * 60
event_id = alert.get("event_id", alert.get("message", "")[:50])
dedup_key = (category, event_id)
rule_name = rule.get("name", "unknown")
dedup_key = (rule_name, category, event_id)
now = time.time()
if dedup_key in self._recent:
if now - self._recent[dedup_key] < self._dedup_window:
logger.debug("Skipping duplicate alert: %s", category)
if now - self._recent[dedup_key] < cooldown:
logger.debug("Skipping alert (cooldown): %s via %s", category, rule_name)
continue
self._recent[dedup_key] = now
# Deliver to each channel in the rule
channel_ids = rule.get("channel_ids", [])
for channel_id in channel_ids:
channel = self._channels.get(channel_id)
if not channel:
continue
# Create channel and deliver
channel = self._create_channel_for_rule(rule)
if not channel:
continue
try:
# Summarize for mesh channels if over 200 chars
delivery_alert = alert
message = alert.get("message", "")
if channel.channel_type in ("mesh_broadcast", "mesh_dm"):
if len(message) > 200:
if self._summarizer:
summary = await self._summarizer.summarize(message, max_chars=195)
delivery_alert = {**alert, "message": summary}
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
try:
# Summarize for mesh channels if over 200 chars
delivery_alert = alert
message = alert.get("message", "")
if channel.channel_type in ("mesh_broadcast", "mesh_dm"):
if len(message) > 200:
if self._summarizer:
summary = await self._summarizer.summarize(message, max_chars=195)
delivery_alert = {**alert, "message": summary}
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
logger.info(
"Alert delivered via %s: %s",
channel_id,
category,
)
except Exception as e:
logger.warning("Channel %s delivery failed: %s", channel_id, e)
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
logger.info("Alert delivered via %s: %s", rule_name, category)
except Exception as e:
logger.warning("Rule %s delivery failed: %s", rule_name, e)
return delivered
@ -170,22 +188,26 @@ class NotificationRouter:
except Exception:
return False
def get_channels(self) -> list[dict]:
"""Get list of configured channels."""
return [
{"id": ch_id, "type": ch.channel_type}
for ch_id, ch in self._channels.items()
]
def get_rules(self) -> list[dict]:
"""Get list of configured rules."""
return self._rules
async def test_channel(self, channel_id: str) -> tuple[bool, str]:
"""Send a test alert to a specific channel."""
channel = self._channels.get(channel_id)
async def test_rule(self, rule_index: int) -> tuple[bool, str]:
"""Send a test alert through a specific rule."""
rules_config = getattr(self._config, "rules", [])
if rule_index < 0 or rule_index >= len(rules_config):
return False, "Rule index out of range"
rule = rules_config[rule_index]
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule)
channel = self._create_channel_for_rule(rule_dict)
if not channel:
return False, "Channel not found: %s" % channel_id
return False, "Failed to create delivery channel"
return await channel.test()
def add_mesh_subscription(
@ -196,22 +218,9 @@ class NotificationRouter:
) -> str:
"""Add a mesh DM subscription for a node.
Creates a channel and rule for the node to receive alerts.
Creates a rule for the node to receive alerts.
Returns the rule name.
"""
# Create channel ID
channel_id = "mesh_dm_%s" % node_id
# Create channel if it doesn't exist
if channel_id not in self._channels:
from .channels import MeshDMChannel
channel = MeshDMChannel(
connector=self._connector,
node_ids=[node_id],
)
self._channels[channel_id] = channel
# Create rule
if not rule_name:
rule_name = "sub_%s" % node_id
@ -220,15 +229,19 @@ class NotificationRouter:
if rule.get("name") == rule_name:
# Update existing rule
rule["categories"] = categories if categories else []
rule["channel_ids"] = [channel_id]
rule["node_ids"] = [node_id]
return rule_name
# Add new rule
self._rules.append({
"name": rule_name,
"enabled": True,
"trigger_type": "condition",
"categories": categories if categories else [], # Empty = all
"min_severity": "warning",
"channel_ids": [channel_id],
"delivery_type": "mesh_dm",
"node_ids": [node_id],
"cooldown_minutes": 10,
"override_quiet": False,
})
@ -236,16 +249,8 @@ class NotificationRouter:
def remove_mesh_subscription(self, node_id: str) -> bool:
"""Remove a mesh subscription for a node."""
channel_id = "mesh_dm_%s" % node_id
rule_name = "sub_%s" % node_id
# Remove channel
if channel_id in self._channels:
del self._channels[channel_id]
# Remove rule
self._rules = [r for r in self._rules if r.get("name") != rule_name]
return True
def get_node_subscriptions(self, node_id: str) -> list[str]: