feat(notifications): alert routing with channels, rules, and delivery

- Notification pipeline: categories -> rules -> channels
- Channels: mesh broadcast, mesh DM, email (SMTP), webhook (generic)
- Per-rule severity threshold and category filtering
- Quiet hours with emergency override
- LLM summarization for mesh delivery over 200 chars only
- !subscribe shows available categories, easy mesh subscription
- Dashboard notification rules API endpoints
- Extensible channel system for future transports (Winlink, JS8Call)
- config.yaml notification section with examples

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-13 03:51:37 +00:00
commit 3bf5e3dfbc
12 changed files with 1215 additions and 105 deletions

View file

@ -216,6 +216,67 @@ environmental:
confidence_min: "nominal" # low, nominal, high confidence_min: "nominal" # low, nominal, high
proximity_km: 10.0 # km to match known fire perimeters proximity_km: 10.0 # km to match known fire perimeters
# === NOTIFICATION DELIVERY ===
# Route alerts to channels (mesh, email, webhook) based on rules.
# Categories match alert types from alert_engine.py.
# Severity levels: info, advisory, watch, warning, critical, emergency
#
notifications:
enabled: false
quiet_hours_start: "22:00" # Suppress non-emergency alerts during quiet hours
quiet_hours_end: "06:00"
dedup_seconds: 600 # Don't resend same alert within this window
# Notification channels
channels:
# Mesh broadcast - posts to mesh channel
- id: "mesh-channel"
type: mesh_broadcast
enabled: true
channel_index: 0 # Mesh channel to broadcast on
# Example: Email channel (uncomment to enable)
# - id: "email-admin"
# type: email
# smtp_host: "smtp.gmail.com"
# smtp_port: 587
# smtp_user: "you@gmail.com"
# smtp_password: "${SMTP_PASSWORD}" # Use env var
# smtp_tls: true
# from_address: "meshai@yourdomain.com"
# recipients: ["admin@yourdomain.com"]
# Example: Webhook (Discord, Slack, ntfy, Home Assistant)
# - id: "discord-alerts"
# type: webhook
# url: "https://discord.com/api/webhooks/..."
# - id: "ntfy-alerts"
# type: webhook
# url: "https://ntfy.sh/your-topic"
# Notification rules - match alerts to channels
rules:
# All emergencies -> mesh broadcast
- name: "emergencies"
categories: [] # Empty = all categories
min_severity: "emergency"
channel_ids: ["mesh-channel"]
override_quiet: true # Send even during quiet hours
# Example: Fire alerts at any severity
# - name: "fire-alerts"
# categories: ["wildfire_proximity", "new_ignition"]
# min_severity: "advisory"
# channel_ids: ["mesh-channel", "email-admin"]
# Example: Infrastructure alerts
# - name: "infra-alerts"
# categories: ["infra_offline", "critical_node_down", "battery_emergency"]
# min_severity: "warning"
# channel_ids: ["mesh-channel"]
# === WEB DASHBOARD === # === WEB DASHBOARD ===
dashboard: dashboard:
enabled: true enabled: true

View file

@ -162,6 +162,7 @@ def create_dispatcher(
health_engine=None, health_engine=None,
subscription_manager=None, subscription_manager=None,
env_store=None, env_store=None,
notification_router=None,
) -> CommandDispatcher: ) -> CommandDispatcher:
"""Create and populate command dispatcher with default commands. """Create and populate command dispatcher with default commands.
@ -224,24 +225,24 @@ def create_dispatcher(
dispatcher.register(alias_handler) dispatcher.register(alias_handler)
# Register subscription commands # Register subscription commands
sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store) sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store, notification_router)
dispatcher.register(sub_cmd) dispatcher.register(sub_cmd)
for alias in getattr(sub_cmd, 'aliases', []): for alias in getattr(sub_cmd, 'aliases', []):
alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store) alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store, notification_router)
alias_handler.name = alias alias_handler.name = alias
dispatcher.register(alias_handler) dispatcher.register(alias_handler)
unsub_cmd = UnsubCommand(subscription_manager) unsub_cmd = UnsubCommand(subscription_manager, notification_router)
dispatcher.register(unsub_cmd) dispatcher.register(unsub_cmd)
for alias in getattr(unsub_cmd, 'aliases', []): for alias in getattr(unsub_cmd, 'aliases', []):
alias_handler = UnsubCommand(subscription_manager) alias_handler = UnsubCommand(subscription_manager, notification_router)
alias_handler.name = alias alias_handler.name = alias
dispatcher.register(alias_handler) dispatcher.register(alias_handler)
mysubs_cmd = MySubsCommand(subscription_manager) mysubs_cmd = MySubsCommand(subscription_manager, notification_router)
dispatcher.register(mysubs_cmd) dispatcher.register(mysubs_cmd)
for alias in getattr(mysubs_cmd, 'aliases', []): for alias in getattr(mysubs_cmd, 'aliases', []):
alias_handler = MySubsCommand(subscription_manager) alias_handler = MySubsCommand(subscription_manager, notification_router)
alias_handler.name = alias alias_handler.name = alias
dispatcher.register(alias_handler) dispatcher.register(alias_handler)

View file

@ -8,6 +8,7 @@ if TYPE_CHECKING:
from ..mesh_data_store import MeshDataStore from ..mesh_data_store import MeshDataStore
from ..mesh_reporter import MeshReporter from ..mesh_reporter import MeshReporter
from ..subscriptions import SubscriptionManager from ..subscriptions import SubscriptionManager
from ..notifications.router import NotificationRouter
class SubCommand(CommandHandler): class SubCommand(CommandHandler):
@ -15,7 +16,7 @@ class SubCommand(CommandHandler):
name = "sub" name = "sub"
description = "Subscribe to reports or alerts" description = "Subscribe to reports or alerts"
usage = "!sub daily|weekly|alerts [time] [day] [scope]" usage = "!sub daily|weekly|alerts|<category> [time] [day] [scope]"
aliases = ["subscribe"] aliases = ["subscribe"]
def __init__( def __init__(
@ -23,23 +24,35 @@ class SubCommand(CommandHandler):
subscription_manager: "SubscriptionManager" = None, subscription_manager: "SubscriptionManager" = None,
mesh_reporter: "MeshReporter" = None, mesh_reporter: "MeshReporter" = None,
data_store: "MeshDataStore" = None, data_store: "MeshDataStore" = None,
notification_router: "NotificationRouter" = None,
): ):
self._sub_manager = subscription_manager self._sub_manager = subscription_manager
self._reporter = mesh_reporter self._reporter = mesh_reporter
self._data_store = data_store self._data_store = data_store
self._notification_router = notification_router
async def execute(self, args: str, context: CommandContext) -> str: async def execute(self, args: str, context: CommandContext) -> str:
"""Handle subscription command.""" """Handle subscription command."""
if not self._sub_manager:
return "Subscriptions not available."
parts = args.strip().split() parts = args.strip().split()
# No args - show available alert categories
if not parts: if not parts:
return self._usage_help() return self._show_categories()
sub_type = parts[0].lower() sub_type = parts[0].lower()
# Check if it's a category subscription
if self._notification_router:
from ..notifications.categories import ALERT_CATEGORIES
if sub_type in ALERT_CATEGORIES or sub_type == "all":
return self._handle_category_subscription(sub_type, context)
# Legacy subscription types
if sub_type not in ("daily", "weekly", "alerts"): if sub_type not in ("daily", "weekly", "alerts"):
return f"Invalid type '{sub_type}'. Use: daily, weekly, or alerts" return self._show_categories()
if not self._sub_manager:
return "Subscriptions not available."
try: try:
if sub_type == "daily": if sub_type == "daily":
@ -51,15 +64,55 @@ class SubCommand(CommandHandler):
except ValueError as e: except ValueError as e:
return f"Error: {e}" return f"Error: {e}"
def _show_categories(self) -> str:
"""Show available alert categories."""
try:
from ..notifications.categories import ALERT_CATEGORIES
except ImportError:
return self._usage_help()
lines = ["Available alert categories:"]
for cat_id, cat_info in ALERT_CATEGORIES.items():
lines.append(f" {cat_id} - {cat_info['description']}")
lines.append("")
lines.append("Usage:")
lines.append(" !sub <category> - subscribe to a category")
lines.append(" !sub all - subscribe to all alerts")
lines.append(" !sub alerts - legacy mesh-wide alerts")
return "\n".join(lines)
def _handle_category_subscription(self, category: str, context: CommandContext) -> str:
"""Handle category-based alert subscription."""
node_id = self._get_user_id(context)
if category == "all":
categories = [] # Empty = all categories
else:
categories = [category]
# Add subscription via notification router
rule_name = self._notification_router.add_mesh_subscription(
node_id=node_id,
categories=categories,
)
if category == "all":
return "Subscribed to all alert categories. Use !unsub to remove."
else:
from ..notifications.categories import get_category
cat_info = get_category(category)
return f"Subscribed to {cat_info['name']} alerts. Use !unsub {category} to remove."
def _usage_help(self) -> str: def _usage_help(self) -> str:
"""Return usage help.""" """Return usage help."""
return """Usage: return """Usage:
!sub daily 1830 - daily mesh report at 6:30 PM !sub daily 1830 - daily mesh report at 6:30 PM
!sub daily 1830 region SCID - daily region report !sub daily 1830 region SCID - daily region report
!sub daily 1830 node MHR - daily node report
!sub weekly 0800 sun - weekly digest Sunday 8 AM !sub weekly 0800 sun - weekly digest Sunday 8 AM
!sub alerts - mesh-wide alerts !sub alerts - mesh-wide alerts (legacy)
!sub alerts region SCID - alerts for a region""" !sub <category> - subscribe to alert category
!sub all - subscribe to all alerts"""
def _handle_daily(self, args: list, context: CommandContext) -> str: def _handle_daily(self, args: list, context: CommandContext) -> str:
"""Handle daily subscription.""" """Handle daily subscription."""
@ -68,11 +121,9 @@ class SubCommand(CommandHandler):
schedule_time = args[0] schedule_time = args[0]
scope_type, scope_value = self._parse_scope(args[1:]) scope_type, scope_value = self._parse_scope(args[1:])
# Validate scope
scope_value = self._validate_scope(scope_type, scope_value) scope_value = self._validate_scope(scope_type, scope_value)
result = self._sub_manager.add( self._sub_manager.add(
user_id=self._get_user_id(context), user_id=self._get_user_id(context),
sub_type="daily", sub_type="daily",
schedule_time=schedule_time, schedule_time=schedule_time,
@ -92,11 +143,9 @@ class SubCommand(CommandHandler):
schedule_time = args[0] schedule_time = args[0]
schedule_day = args[1].lower() schedule_day = args[1].lower()
scope_type, scope_value = self._parse_scope(args[2:]) scope_type, scope_value = self._parse_scope(args[2:])
# Validate scope
scope_value = self._validate_scope(scope_type, scope_value) scope_value = self._validate_scope(scope_type, scope_value)
result = self._sub_manager.add( self._sub_manager.add(
user_id=self._get_user_id(context), user_id=self._get_user_id(context),
sub_type="weekly", sub_type="weekly",
schedule_time=schedule_time, schedule_time=schedule_time,
@ -111,13 +160,11 @@ class SubCommand(CommandHandler):
return f"Subscribed: weekly {scope_desc}report at {time_fmt} {day_fmt}" return f"Subscribed: weekly {scope_desc}report at {time_fmt} {day_fmt}"
def _handle_alerts(self, args: list, context: CommandContext) -> str: def _handle_alerts(self, args: list, context: CommandContext) -> str:
"""Handle alerts subscription.""" """Handle alerts subscription (legacy)."""
scope_type, scope_value = self._parse_scope(args) scope_type, scope_value = self._parse_scope(args)
# Validate scope
scope_value = self._validate_scope(scope_type, scope_value) scope_value = self._validate_scope(scope_type, scope_value)
result = self._sub_manager.add( self._sub_manager.add(
user_id=self._get_user_id(context), user_id=self._get_user_id(context),
sub_type="alerts", sub_type="alerts",
scope_type=scope_type, scope_type=scope_type,
@ -128,15 +175,10 @@ class SubCommand(CommandHandler):
return f"Subscribed: alerts for {scope_desc.strip() or 'mesh'}" return f"Subscribed: alerts for {scope_desc.strip() or 'mesh'}"
def _parse_scope(self, args: list) -> tuple[str, str]: def _parse_scope(self, args: list) -> tuple[str, str]:
"""Parse scope from remaining args. """Parse scope from remaining args."""
Returns:
(scope_type, scope_value) tuple
"""
if not args: if not args:
return "mesh", None return "mesh", None
# Look for 'region' or 'node' keyword
scope_type = "mesh" scope_type = "mesh"
scope_value = None scope_value = None
@ -144,26 +186,17 @@ class SubCommand(CommandHandler):
arg_lower = arg.lower() arg_lower = arg.lower()
if arg_lower == "region": if arg_lower == "region":
scope_type = "region" scope_type = "region"
# Everything after 'region' is the region name
scope_value = " ".join(args[i + 1:]) if i + 1 < len(args) else None scope_value = " ".join(args[i + 1:]) if i + 1 < len(args) else None
break break
elif arg_lower == "node": elif arg_lower == "node":
scope_type = "node" scope_type = "node"
# Next arg is the node identifier
scope_value = args[i + 1] if i + 1 < len(args) else None scope_value = args[i + 1] if i + 1 < len(args) else None
break break
return scope_type, scope_value return scope_type, scope_value
def _validate_scope(self, scope_type: str, scope_value: str) -> str: def _validate_scope(self, scope_type: str, scope_value: str) -> str:
"""Validate and resolve scope value. """Validate and resolve scope value."""
Returns:
Resolved scope_value (e.g., full region name)
Raises:
ValueError: If scope not found
"""
if scope_type == "mesh": if scope_type == "mesh":
return None return None
@ -172,14 +205,9 @@ class SubCommand(CommandHandler):
if scope_type == "region" and self._reporter: if scope_type == "region" and self._reporter:
region = self._reporter._find_region(scope_value) region = self._reporter._find_region(scope_value)
if not region: if region:
# List available regions return region.name
health = self._reporter.health_engine.mesh_health return scope_value
if health:
available = [r.name for r in health.regions if r.node_ids]
return scope_value # Use as-is, will fail at delivery if invalid
raise ValueError(f"Region '{scope_value}' not found")
return region.name # Return canonical name
if scope_type == "node" and self._reporter: if scope_type == "node" and self._reporter:
node = self._reporter._find_node(scope_value) node = self._reporter._find_node(scope_value)
@ -191,7 +219,6 @@ class SubCommand(CommandHandler):
def _get_user_id(self, context: CommandContext) -> str: def _get_user_id(self, context: CommandContext) -> str:
"""Extract user ID from context.""" """Extract user ID from context."""
# sender_id is like "!abcd1234" - convert to node_num
sender_id = context.sender_id sender_id = context.sender_id
if sender_id.startswith("!"): if sender_id.startswith("!"):
return str(int(sender_id[1:], 16)) return str(int(sender_id[1:], 16))
@ -217,26 +244,40 @@ class UnsubCommand(CommandHandler):
name = "unsub" name = "unsub"
description = "Remove subscription(s)" description = "Remove subscription(s)"
usage = "!unsub daily|weekly|alerts|all" usage = "!unsub daily|weekly|alerts|<category>|all"
aliases = ["unsubscribe"] aliases = ["unsubscribe"]
def __init__(self, subscription_manager: "SubscriptionManager" = None): def __init__(
self,
subscription_manager: "SubscriptionManager" = None,
notification_router: "NotificationRouter" = None,
):
self._sub_manager = subscription_manager self._sub_manager = subscription_manager
self._notification_router = notification_router
async def execute(self, args: str, context: CommandContext) -> str: async def execute(self, args: str, context: CommandContext) -> str:
"""Handle unsubscribe command.""" """Handle unsubscribe command."""
if not self._sub_manager:
return "Subscriptions not available."
sub_type = args.strip().lower() if args else None sub_type = args.strip().lower() if args else None
if not sub_type: if not sub_type:
return "Usage: !unsub daily|weekly|alerts|all" return "Usage: !unsub daily|weekly|alerts|<category>|all"
if sub_type not in ("daily", "weekly", "alerts", "all"):
return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, or all"
user_id = self._get_user_id(context) user_id = self._get_user_id(context)
# Check if it's a category unsubscription
if self._notification_router:
from ..notifications.categories import ALERT_CATEGORIES
if sub_type in ALERT_CATEGORIES or sub_type == "all":
self._notification_router.remove_mesh_subscription(user_id)
return "Removed alert subscriptions"
# Legacy subscription types
if not self._sub_manager:
return "Subscriptions not available."
if sub_type not in ("daily", "weekly", "alerts", "all"):
return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, <category>, or all"
removed = self._sub_manager.remove(user_id, sub_type if sub_type != "all" else None) removed = self._sub_manager.remove(user_id, sub_type if sub_type != "all" else None)
if removed == 0: if removed == 0:
@ -260,26 +301,44 @@ class MySubsCommand(CommandHandler):
name = "mysubs" name = "mysubs"
description = "List your subscriptions" description = "List your subscriptions"
usage = "!mysubs" usage = "!mysubs"
aliases = ["subs"] aliases = ["subs", "subscriptions"]
def __init__(self, subscription_manager: "SubscriptionManager" = None): def __init__(
self,
subscription_manager: "SubscriptionManager" = None,
notification_router: "NotificationRouter" = None,
):
self._sub_manager = subscription_manager self._sub_manager = subscription_manager
self._notification_router = notification_router
async def execute(self, args: str, context: CommandContext) -> str: async def execute(self, args: str, context: CommandContext) -> str:
"""List user's subscriptions.""" """List user's subscriptions."""
if not self._sub_manager:
return "Subscriptions not available."
user_id = self._get_user_id(context) user_id = self._get_user_id(context)
subs = self._sub_manager.get_user_subs(user_id) lines = []
if not subs: # Check notification router subscriptions
if self._notification_router:
categories = self._notification_router.get_node_subscriptions(user_id)
if categories:
if categories == ["all"]:
lines.append("Alert subscriptions: all categories")
else:
lines.append(f"Alert subscriptions: {', '.join(categories)}")
# Check legacy subscriptions
if self._sub_manager:
subs = self._sub_manager.get_user_subs(user_id)
if subs:
if not lines:
lines.append("Your subscriptions:")
else:
lines.append("\nScheduled reports:")
for i, sub in enumerate(subs, 1):
lines.append(f" {i}. {self._format_sub(sub)}")
if not lines:
return "No active subscriptions. Use !sub to subscribe." return "No active subscriptions. Use !sub to subscribe."
lines = ["Your subscriptions:"]
for i, sub in enumerate(subs, 1):
lines.append(f" {i}. {self._format_sub(sub)}")
return "\n".join(lines) return "\n".join(lines)
def _format_sub(self, sub: dict) -> str: def _format_sub(self, sub: dict) -> str:
@ -301,7 +360,7 @@ class MySubsCommand(CommandHandler):
time_str = self._format_time(sub.get("schedule_time", "0000")) time_str = self._format_time(sub.get("schedule_time", "0000"))
day_str = (sub.get("schedule_day") or "").capitalize() day_str = (sub.get("schedule_day") or "").capitalize()
return f"Weekly {scope_desc}report at {time_str} {day_str}" return f"Weekly {scope_desc}report at {time_str} {day_str}"
else: # alerts else:
return f"Alerts for {scope_desc.strip() or 'mesh'}" return f"Alerts for {scope_desc.strip() or 'mesh'}"
def _format_time(self, hhmm: str) -> str: def _format_time(self, hhmm: str) -> str:

View file

@ -425,6 +425,47 @@ class EnvironmentalConfig:
@dataclass @dataclass
class NotificationChannelConfig:
"""Configuration for a notification channel."""
id: str = ""
type: str = ""
enabled: bool = True
channel_index: int = 0
node_ids: list = field(default_factory=list)
smtp_host: str = ""
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""
smtp_tls: bool = True
from_address: str = ""
recipients: list = field(default_factory=list)
url: str = ""
headers: dict = field(default_factory=dict)
@dataclass
class NotificationRuleConfig:
"""Configuration for a notification rule."""
name: str = ""
categories: list = field(default_factory=list)
min_severity: str = "warning"
channel_ids: list = field(default_factory=list)
override_quiet: bool = False
@dataclass
class NotificationsConfig:
"""Notification system settings."""
enabled: bool = False
quiet_hours_start: str = "22:00"
quiet_hours_end: str = "06:00"
dedup_seconds: int = 600
channels: list = field(default_factory=list)
rules: list = field(default_factory=list)
class DashboardConfig: class DashboardConfig:
"""Web dashboard settings.""" """Web dashboard settings."""
@ -462,6 +503,7 @@ class Config:
mesh_intelligence: MeshIntelligenceConfig = field(default_factory=MeshIntelligenceConfig) mesh_intelligence: MeshIntelligenceConfig = field(default_factory=MeshIntelligenceConfig)
environmental: EnvironmentalConfig = field(default_factory=EnvironmentalConfig) environmental: EnvironmentalConfig = field(default_factory=EnvironmentalConfig)
dashboard: DashboardConfig = field(default_factory=DashboardConfig) dashboard: DashboardConfig = field(default_factory=DashboardConfig)
notifications: NotificationsConfig = field(default_factory=NotificationsConfig)
_config_path: Optional[Path] = field(default=None, repr=False) _config_path: Optional[Path] = field(default=None, repr=False)
@ -535,6 +577,13 @@ def _dict_to_dataclass(cls, data: dict):
kwargs[key] = _dict_to_dataclass(Roads511Config, value) kwargs[key] = _dict_to_dataclass(Roads511Config, value)
elif key == "firms" and isinstance(value, dict): elif key == "firms" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(FIRMSConfig, value) kwargs[key] = _dict_to_dataclass(FIRMSConfig, value)
elif key == "notifications" and isinstance(value, dict):
notifications = _dict_to_dataclass(NotificationsConfig, value)
if "channels" in value and isinstance(value["channels"], list):
notifications.channels = [_dict_to_dataclass(NotificationChannelConfig, c) if isinstance(c, dict) else c for c in value["channels"]]
if "rules" in value and isinstance(value["rules"], list):
notifications.rules = [_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r for r in value["rules"]]
kwargs[key] = notifications
else: else:
kwargs[key] = value kwargs[key] = value

View file

@ -0,0 +1,113 @@
"""Notification API routes."""
from fastapi import APIRouter, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
router = APIRouter(prefix="/notifications", tags=["notifications"])
class ChannelCreate(BaseModel):
"""Channel creation request."""
id: str
type: str
enabled: bool = True
channel_index: int = 0
node_ids: list[str] = []
smtp_host: str = ""
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""
smtp_tls: bool = True
from_address: str = ""
recipients: list[str] = []
url: str = ""
headers: dict = {}
class RuleCreate(BaseModel):
"""Rule creation request."""
name: str
categories: list[str] = []
min_severity: str = "warning"
channel_ids: list[str] = []
override_quiet: bool = False
class QuietHoursUpdate(BaseModel):
"""Quiet hours update request."""
start: str
end: str
@router.get("/categories")
async def get_categories():
"""Get all alert categories with descriptions."""
try:
from ...notifications.categories import list_categories
return list_categories()
except ImportError:
return []
@router.get("/channels")
async def get_channels(request: Request):
"""Get configured notification channels."""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
return []
return notification_router.get_channels()
@router.post("/channels")
async def create_channel(request: Request, channel: ChannelCreate):
"""Create a new notification channel."""
# This would require runtime config modification
# For now, return not implemented
raise HTTPException(status_code=501, detail="Channel creation requires config file edit")
@router.post("/channels/{channel_id}/test")
async def test_channel(request: Request, channel_id: str):
"""Send a test alert to a channel."""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
raise HTTPException(status_code=404, detail="Notification router not configured")
success, message = await notification_router.test_channel(channel_id)
return {"success": success, "message": message}
@router.get("/rules")
async def get_rules(request: Request):
"""Get configured notification rules."""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
return []
return notification_router.get_rules()
@router.post("/rules")
async def create_rule(request: Request, rule: RuleCreate):
"""Create a new notification rule."""
# This would require runtime config modification
raise HTTPException(status_code=501, detail="Rule creation requires config file edit")
@router.get("/quiet-hours")
async def get_quiet_hours(request: Request):
"""Get quiet hours configuration."""
config = getattr(request.app.state, "config", None)
if not config or not hasattr(config, "notifications"):
return {"start": "22:00", "end": "06:00"}
return {
"start": config.notifications.quiet_hours_start,
"end": config.notifications.quiet_hours_end,
}
@router.put("/quiet-hours")
async def update_quiet_hours(request: Request, quiet_hours: QuietHoursUpdate):
"""Update quiet hours configuration."""
# This would require runtime config modification
raise HTTPException(status_code=501, detail="Quiet hours update requires config file edit")

View file

@ -52,6 +52,7 @@ def create_app() -> FastAPI:
from .api.mesh_routes import router as mesh_router from .api.mesh_routes import router as mesh_router
from .api.env_routes import router as env_router from .api.env_routes import router as env_router
from .api.alert_routes import router as alert_router from .api.alert_routes import router as alert_router
from .api.notification_routes import router as notification_router
app.include_router(system_router, prefix="/api") app.include_router(system_router, prefix="/api")
app.include_router(config_router, prefix="/api") app.include_router(config_router, prefix="/api")
@ -59,6 +60,7 @@ def create_app() -> FastAPI:
app.include_router(env_router, prefix="/api") app.include_router(env_router, prefix="/api")
app.include_router(alert_router, prefix="/api") app.include_router(alert_router, prefix="/api")
app.include_router(notification_router, prefix="/api")
# WebSocket router (no prefix, path is /ws/live) # WebSocket router (no prefix, path is /ws/live)
app.include_router(ws_router) app.include_router(ws_router)
@ -110,6 +112,7 @@ async def start_dashboard(meshai_instance: "MeshAI") -> DashboardBroadcaster:
app.state.alert_engine = getattr(meshai_instance, "alert_engine", None) app.state.alert_engine = getattr(meshai_instance, "alert_engine", None)
app.state.env_store = getattr(meshai_instance, "env_store", None) app.state.env_store = getattr(meshai_instance, "env_store", None)
app.state.subscription_manager = meshai_instance.subscription_manager app.state.subscription_manager = meshai_instance.subscription_manager
app.state.notification_router = getattr(meshai_instance, "notification_router", None)
# Create broadcaster and attach to app state # Create broadcaster and attach to app state
broadcaster = DashboardBroadcaster() broadcaster = DashboardBroadcaster()

View file

@ -44,6 +44,7 @@ class MeshAI:
self.mesh_reporter = None self.mesh_reporter = None
self.subscription_manager = None self.subscription_manager = None
self.alert_engine = None self.alert_engine = None
self.notification_router = None
self.env_store = None # Environmental feeds store self.env_store = None # Environmental feeds store
self._last_sub_check: float = 0.0 self._last_sub_check: float = 0.0
self.router: Optional[MessageRouter] = None self.router: Optional[MessageRouter] = None
@ -337,6 +338,18 @@ class MeshAI:
) )
logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})") logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})")
# Notification router
if self.config.notifications.enabled:
from .notifications.router import NotificationRouter
self.notification_router = NotificationRouter(
config=self.config.notifications,
connector=self.connector,
llm_backend=self.llm,
timezone=self.config.timezone,
)
logger.info("Notification router initialized")
# Environmental feeds # Environmental feeds
env_cfg = self.config.environmental env_cfg = self.config.environmental
if env_cfg.enabled: if env_cfg.enabled:
@ -394,6 +407,7 @@ class MeshAI:
health_engine=self.health_engine, health_engine=self.health_engine,
subscription_manager=self.subscription_manager, subscription_manager=self.subscription_manager,
env_store=self.env_store, env_store=self.env_store,
notification_router=self.notification_router,
) )
# Message router # Message router
@ -406,6 +420,7 @@ class MeshAI:
health_engine=self.health_engine, health_engine=self.health_engine,
mesh_reporter=self.mesh_reporter, mesh_reporter=self.mesh_reporter,
env_store=self.env_store, env_store=self.env_store,
notification_router=self.notification_router,
) )
# Responder # Responder
@ -539,40 +554,48 @@ class MeshAI:
if pid_file.exists(): if pid_file.exists():
pid_file.unlink() pid_file.unlink()
async def _dispatch_alerts(self, alerts: list[dict]) -> None: async def _dispatch_alerts(self, alerts: list[dict]) -> None:
"""Dispatch alerts to subscribers and alert channel.""" """Dispatch alerts to subscribers and alert channel."""
mi = self.config.mesh_intelligence mi = self.config.mesh_intelligence
alert_channel = getattr(mi, 'alert_channel', -1) alert_channel = getattr(mi, 'alert_channel', -1)
for alert in alerts: for alert in alerts:
message = alert["message"] message = alert["message"]
logger.info(f"ALERT: {message}") logger.info(f"ALERT: {message}")
# Send to alert channel if configured # Route through notification router if enabled
if alert_channel >= 0 and self.connector: if self.notification_router:
try: try:
self.connector.send_message( await self.notification_router.process_alert(alert)
text=message, except Exception as e:
destination=None, # Broadcast logger.error(f"Notification router error: {e}")
channel=alert_channel,
) # Fallback: Send to alert channel if no notification router
logger.info(f"Alert sent to channel {alert_channel}") elif alert_channel >= 0 and self.connector:
except Exception as e: try:
logger.error(f"Failed to send channel alert: {e}") self.connector.send_message(
text=message,
# Send DMs to matching subscribers destination=None,
if self.alert_engine and self.subscription_manager: channel=alert_channel,
subscribers = self.alert_engine.get_subscribers_for_alert(alert) )
for sub in subscribers: logger.info(f"Alert sent to channel {alert_channel}")
user_id = sub["user_id"] except Exception as e:
try: logger.error(f"Failed to send channel alert: {e}")
await self._send_sub_dm(user_id, message)
logger.info(f"Alert DM sent to {user_id}: {alert['type']}") # Fallback: Send DMs to matching subscribers
except Exception as e: if self.alert_engine and self.subscription_manager:
logger.error(f"Failed to send alert DM to {user_id}: {e}") subscribers = self.alert_engine.get_subscribers_for_alert(alert)
for sub in subscribers:
self.alert_engine.clear_pending() user_id = sub["user_id"]
try:
await self._send_sub_dm(user_id, message)
logger.info(f"Alert DM sent to {user_id}: {alert['type']}")
except Exception as e:
logger.error(f"Failed to send alert DM to {user_id}: {e}")
if self.alert_engine:
self.alert_engine.clear_pending()
async def _check_scheduled_subs(self) -> None: async def _check_scheduled_subs(self) -> None:
"""Check for and deliver due scheduled reports.""" """Check for and deliver due scheduled reports."""
from datetime import datetime from datetime import datetime

View file

@ -0,0 +1,6 @@
"""Notification system for MeshAI alerts."""
from .categories import ALERT_CATEGORIES, get_category, list_categories
from .router import NotificationRouter
__all__ = ["ALERT_CATEGORIES", "get_category", "list_categories", "NotificationRouter"]

View file

@ -0,0 +1,157 @@
"""Alert category registry.
Defines all alertable conditions with human-readable names and descriptions.
"""
ALERT_CATEGORIES = {
# Infrastructure alerts
"infra_offline": {
"name": "Infrastructure Offline",
"description": "An infrastructure node stopped responding",
"default_severity": "warning",
},
"critical_node_down": {
"name": "Critical Node Down",
"description": "A node marked as critical went offline",
"default_severity": "critical",
},
"infra_recovery": {
"name": "Infrastructure Recovery",
"description": "An infrastructure node came back online",
"default_severity": "info",
},
"new_router": {
"name": "New Router",
"description": "A new router appeared on the mesh",
"default_severity": "info",
},
# Power alerts
"battery_warning": {
"name": "Battery Warning",
"description": "Infrastructure node battery below warning threshold",
"default_severity": "warning",
},
"battery_critical": {
"name": "Battery Critical",
"description": "Infrastructure node battery below critical threshold",
"default_severity": "critical",
},
"battery_emergency": {
"name": "Battery Emergency",
"description": "Infrastructure node battery critically low",
"default_severity": "emergency",
},
"battery_trend": {
"name": "Battery Declining",
"description": "Battery showing declining trend over 7 days",
"default_severity": "warning",
},
"power_source_change": {
"name": "Power Source Change",
"description": "Node switched from USB to battery (possible outage)",
"default_severity": "warning",
},
"solar_not_charging": {
"name": "Solar Not Charging",
"description": "Solar panel not charging during daylight hours",
"default_severity": "warning",
},
# Utilization alerts
"sustained_high_util": {
"name": "High Utilization",
"description": "Channel utilization elevated for extended period",
"default_severity": "warning",
},
"packet_flood": {
"name": "Packet Flood",
"description": "Node sending excessive packets",
"default_severity": "warning",
},
# Coverage alerts
"infra_single_gateway": {
"name": "Single Gateway",
"description": "Infrastructure node dropped to single gateway coverage",
"default_severity": "warning",
},
"feeder_offline": {
"name": "Feeder Offline",
"description": "A feeder gateway stopped responding",
"default_severity": "warning",
},
"region_total_blackout": {
"name": "Region Blackout",
"description": "All infrastructure in a region is offline",
"default_severity": "emergency",
},
# Health score alerts
"mesh_score_low": {
"name": "Mesh Health Low",
"description": "Overall mesh health score below threshold",
"default_severity": "warning",
},
"region_score_low": {
"name": "Region Health Low",
"description": "A region's health score below threshold",
"default_severity": "warning",
},
# Environmental alerts
"weather_warning": {
"name": "Severe Weather",
"description": "NWS warning or advisory for mesh area",
"default_severity": "warning",
},
"hf_blackout": {
"name": "HF Radio Blackout",
"description": "R3+ solar event degrading HF propagation",
"default_severity": "warning",
},
"tropospheric_ducting": {
"name": "Tropospheric Ducting",
"description": "Atmospheric conditions extending VHF/UHF range",
"default_severity": "info",
},
"wildfire_proximity": {
"name": "Fire Near Mesh",
"description": "Wildfire detected within configured distance",
"default_severity": "warning",
},
"new_ignition": {
"name": "New Fire Ignition",
"description": "Satellite hotspot not matching any known fire",
"default_severity": "warning",
},
"flood_warning": {
"name": "Flood Warning",
"description": "Stream gauge exceeds flood threshold",
"default_severity": "warning",
},
"road_closure": {
"name": "Road Closure",
"description": "Full road closure on monitored corridor",
"default_severity": "warning",
},
}
def get_category(category_id: str) -> dict:
"""Get category info by ID, with fallback for unknown categories."""
if category_id in ALERT_CATEGORIES:
return ALERT_CATEGORIES[category_id]
return {
"name": category_id.replace("_", " ").title(),
"description": f"Alert type: {category_id}",
"default_severity": "info",
}
def list_categories() -> list[dict]:
"""List all categories with their IDs."""
return [
{"id": cat_id, **cat_info}
for cat_id, cat_info in ALERT_CATEGORIES.items()
]

View file

@ -0,0 +1,308 @@
"""Notification channel implementations."""
import asyncio
import logging
import smtplib
import ssl
import time
from abc import ABC, abstractmethod
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional, TYPE_CHECKING
import httpx
if TYPE_CHECKING:
from ..connector import MeshConnector
logger = logging.getLogger(__name__)
class NotificationChannel(ABC):
"""Base class for notification delivery channels."""
channel_type: str = "base"
@abstractmethod
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert. Returns True on success."""
raise NotImplementedError
@abstractmethod
async def test(self) -> tuple[bool, str]:
"""Send test message. Returns (success, message)."""
raise NotImplementedError
class MeshBroadcastChannel(NotificationChannel):
"""Post alert to mesh channel."""
channel_type = "mesh_broadcast"
def __init__(self, connector: "MeshConnector", channel_index: int = 0):
self._connector = connector
self._channel = channel_index
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert to mesh channel."""
if not self._connector:
logger.warning("No mesh connector available")
return False
try:
message = alert.get("message", "")
self._connector.send_message(
text=message,
destination=None,
channel=self._channel,
)
logger.info("Broadcast alert to channel %d", self._channel)
return True
except Exception as e:
logger.error("Failed to broadcast alert: %s", e)
return False
async def test(self) -> tuple[bool, str]:
"""Send test broadcast."""
try:
self._connector.send_message(
text="[TEST] MeshAI notification system test",
destination=None,
channel=self._channel,
)
return True, "Test message sent to channel %d" % self._channel
except Exception as e:
return False, "Failed to send test: %s" % e
class MeshDMChannel(NotificationChannel):
"""DM alert to specific node IDs."""
channel_type = "mesh_dm"
def __init__(self, connector: "MeshConnector", node_ids: list[str]):
self._connector = connector
self._node_ids = node_ids
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via DM to configured nodes."""
if not self._connector:
return False
message = alert.get("message", "")
success = True
for node_id in self._node_ids:
try:
dest = int(node_id) if node_id.isdigit() else node_id
self._connector.send_message(text=message, destination=dest, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
success = False
return success
async def test(self) -> tuple[bool, str]:
"""Send test DM to all configured nodes."""
if not self._node_ids:
return False, "No node IDs configured"
try:
for node_id in self._node_ids:
dest = int(node_id) if node_id.isdigit() else node_id
self._connector.send_message(
text="[TEST] MeshAI notification test",
destination=dest,
channel=0,
)
return True, "Test DMs sent to %d nodes" % len(self._node_ids)
except Exception as e:
return False, "Failed to send test DMs: %s" % e
class EmailChannel(NotificationChannel):
"""Send alert via SMTP email."""
channel_type = "email"
def __init__(
self,
smtp_host: str,
smtp_port: int,
smtp_user: str,
smtp_password: str,
smtp_tls: bool,
from_address: str,
recipients: list[str],
):
self._host = smtp_host
self._port = smtp_port
self._user = smtp_user
self._password = smtp_password
self._tls = smtp_tls
self._from = from_address
self._recipients = recipients
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via email."""
if not self._recipients:
return False
alert_type = alert.get("type", "alert")
severity = alert.get("severity", "info").upper()
message = alert.get("message", "")
subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title())
body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % (
alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message
)
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._send_email, subject, body)
return True
except Exception as e:
logger.error("Failed to send email: %s", e)
return False
def _send_email(self, subject: str, body: str):
msg = MIMEMultipart()
msg["From"] = self._from
msg["To"] = ", ".join(self._recipients)
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
if self._tls:
context = ssl.create_default_context()
with smtplib.SMTP(self._host, self._port) as server:
server.starttls(context=context)
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
else:
with smtplib.SMTP(self._host, self._port) as server:
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
async def test(self) -> tuple[bool, str]:
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._send_email,
"[MeshAI TEST] Notification Test",
"Test message from MeshAI.",
)
return True, "Test email sent to %d recipients" % len(self._recipients)
except Exception as e:
return False, "Failed to send test email: %s" % e
class WebhookChannel(NotificationChannel):
"""POST alert JSON to a URL."""
channel_type = "webhook"
def __init__(self, url: str, headers: Optional[dict] = None):
self._url = url
self._headers = headers or {}
async def deliver(self, alert: dict, rule: dict) -> bool:
"""POST alert to webhook URL."""
payload = {
"type": alert.get("type"),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"timestamp": time.time(),
"node_name": alert.get("node_name"),
"region": alert.get("region"),
}
# Discord/Slack format
if "discord.com" in self._url or "slack.com" in self._url:
severity = alert.get("severity", "info")
color = {
"emergency": 0xFF0000,
"critical": 0xFF4444,
"warning": 0xFFAA00,
"info": 0x0099FF,
}.get(severity, 0x888888)
payload = {
"embeds": [{
"title": "MeshAI: %s" % alert.get("type", "unknown"),
"description": alert.get("message", ""),
"color": color,
}]
}
# ntfy format
elif "ntfy" in self._url:
headers = {
**self._headers,
"Title": "MeshAI: %s" % alert.get("type", "alert"),
"Priority": "3",
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
content=alert.get("message", ""),
headers=headers,
timeout=10,
)
return resp.status_code < 400
except Exception as e:
logger.error("Webhook failed: %s", e)
return False
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
json=payload,
headers={"Content-Type": "application/json", **self._headers},
timeout=10,
)
return resp.status_code < 400
except Exception as e:
logger.error("Webhook failed: %s", e)
return False
async def test(self) -> tuple[bool, str]:
test_alert = {"type": "test", "severity": "info", "message": "MeshAI test message"}
success = await self.deliver(test_alert, {})
if success:
return True, "Test sent to %s" % self._url
return False, "Webhook failed"
def create_channel(config: dict, connector=None) -> NotificationChannel:
"""Create a channel instance from config."""
channel_type = config.get("type", "")
if channel_type == "mesh_broadcast":
return MeshBroadcastChannel(
connector=connector,
channel_index=config.get("channel_index", 0),
)
elif channel_type == "mesh_dm":
return MeshDMChannel(
connector=connector,
node_ids=config.get("node_ids", []),
)
elif channel_type == "email":
return EmailChannel(
smtp_host=config.get("smtp_host", ""),
smtp_port=config.get("smtp_port", 587),
smtp_user=config.get("smtp_user", ""),
smtp_password=config.get("smtp_password", ""),
smtp_tls=config.get("smtp_tls", True),
from_address=config.get("from_address", ""),
recipients=config.get("recipients", []),
)
elif channel_type == "webhook":
return WebhookChannel(
url=config.get("url", ""),
headers=config.get("headers", {}),
)
else:
raise ValueError("Unknown channel type: %s" % channel_type)

View file

@ -0,0 +1,266 @@
"""Notification router - matches alerts to rules and delivers via channels."""
import logging
import time
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from .channels import create_channel, NotificationChannel
from .summarizer import MessageSummarizer
if TYPE_CHECKING:
from ..connector import MeshConnector
logger = logging.getLogger(__name__)
# Severity levels in order
SEVERITY_ORDER = ["info", "advisory", "watch", "warning", "critical", "emergency"]
class NotificationRouter:
"""Routes alerts through matching rules to notification channels."""
def __init__(
self,
config,
connector: Optional["MeshConnector"] = None,
llm_backend=None,
timezone: str = "America/Boise",
):
self._channels: dict[str, NotificationChannel] = {}
self._rules: list[dict] = []
self._quiet_start = getattr(config, "quiet_hours_start", "22:00")
self._quiet_end = getattr(config, "quiet_hours_end", "06:00")
self._timezone = timezone
self._dedup_window = getattr(config, "dedup_seconds", 600)
self._recent: dict[tuple, float] = {} # (category, event_key) -> last_sent_time
self._summarizer = MessageSummarizer(llm_backend) if llm_backend else None
self._connector = connector
# Create channel instances from config
channels_config = getattr(config, "channels", [])
for ch_config in channels_config:
if hasattr(ch_config, "__dict__"):
ch_dict = {k: v for k, v in ch_config.__dict__.items() if not k.startswith("_")}
else:
ch_dict = ch_config
if not ch_dict.get("enabled", True):
continue
channel_id = ch_dict.get("id", "")
if not channel_id:
continue
try:
channel = create_channel(ch_dict, connector)
self._channels[channel_id] = channel
logger.debug("Created notification channel: %s (%s)", channel_id, ch_dict.get("type"))
except Exception as e:
logger.warning("Failed to create channel %s: %s", channel_id, e)
# Load rules
rules_config = getattr(config, "rules", [])
for rule in rules_config:
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = rule
self._rules.append(rule_dict)
logger.info(
"Notification router initialized: %d channels, %d rules",
len(self._channels),
len(self._rules),
)
async def process_alert(self, alert: dict) -> bool:
"""Route an alert through matching rules to channels.
Returns True if alert was delivered to at least one channel.
"""
category = alert.get("type", "")
severity = alert.get("severity", "info")
delivered = False
for rule in self._rules:
# Check category match
rule_categories = rule.get("categories", [])
if rule_categories and category not in rule_categories:
continue
# Check severity threshold
min_severity = rule.get("min_severity", "info")
if not self._severity_meets(severity, min_severity):
continue
# Check quiet hours (emergencies and criticals override)
if self._in_quiet_hours() and severity not in ("emergency", "critical"):
if not rule.get("override_quiet", False):
continue
# Check dedup
event_id = alert.get("event_id", alert.get("message", "")[:50])
dedup_key = (category, event_id)
now = time.time()
if dedup_key in self._recent:
if now - self._recent[dedup_key] < self._dedup_window:
logger.debug("Skipping duplicate alert: %s", category)
continue
self._recent[dedup_key] = now
# Deliver to each channel in the rule
channel_ids = rule.get("channel_ids", [])
for channel_id in channel_ids:
channel = self._channels.get(channel_id)
if not channel:
continue
try:
# Summarize for mesh channels if over 200 chars
delivery_alert = alert
message = alert.get("message", "")
if channel.channel_type in ("mesh_broadcast", "mesh_dm"):
if len(message) > 200:
if self._summarizer:
summary = await self._summarizer.summarize(message, max_chars=195)
delivery_alert = {**alert, "message": summary}
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
logger.info(
"Alert delivered via %s: %s",
channel_id,
category,
)
except Exception as e:
logger.warning("Channel %s delivery failed: %s", channel_id, e)
return delivered
def _severity_meets(self, actual: str, required: str) -> bool:
"""Check if actual severity meets or exceeds required severity."""
try:
actual_idx = SEVERITY_ORDER.index(actual.lower())
required_idx = SEVERITY_ORDER.index(required.lower())
return actual_idx >= required_idx
except ValueError:
return True # Unknown severity, allow through
def _in_quiet_hours(self) -> bool:
"""Check if current time is within quiet hours."""
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(self._timezone)
now = datetime.now(tz)
current_time = now.strftime("%H:%M")
start = self._quiet_start
end = self._quiet_end
if start <= end:
# Simple range (e.g., 01:00 to 06:00)
return start <= current_time <= end
else:
# Crosses midnight (e.g., 22:00 to 06:00)
return current_time >= start or current_time <= end
except Exception:
return False
def get_channels(self) -> list[dict]:
"""Get list of configured channels."""
return [
{"id": ch_id, "type": ch.channel_type}
for ch_id, ch in self._channels.items()
]
def get_rules(self) -> list[dict]:
"""Get list of configured rules."""
return self._rules
async def test_channel(self, channel_id: str) -> tuple[bool, str]:
"""Send a test alert to a specific channel."""
channel = self._channels.get(channel_id)
if not channel:
return False, "Channel not found: %s" % channel_id
return await channel.test()
def add_mesh_subscription(
self,
node_id: str,
categories: list[str],
rule_name: Optional[str] = None,
) -> str:
"""Add a mesh DM subscription for a node.
Creates a channel and rule for the node to receive alerts.
Returns the rule name.
"""
# Create channel ID
channel_id = "mesh_dm_%s" % node_id
# Create channel if it doesn't exist
if channel_id not in self._channels:
from .channels import MeshDMChannel
channel = MeshDMChannel(
connector=self._connector,
node_ids=[node_id],
)
self._channels[channel_id] = channel
# Create rule
if not rule_name:
rule_name = "sub_%s" % node_id
# Check if rule already exists
for rule in self._rules:
if rule.get("name") == rule_name:
# Update existing rule
rule["categories"] = categories if categories else []
rule["channel_ids"] = [channel_id]
return rule_name
# Add new rule
self._rules.append({
"name": rule_name,
"categories": categories if categories else [], # Empty = all
"min_severity": "warning",
"channel_ids": [channel_id],
"override_quiet": False,
})
return rule_name
def remove_mesh_subscription(self, node_id: str) -> bool:
"""Remove a mesh subscription for a node."""
channel_id = "mesh_dm_%s" % node_id
rule_name = "sub_%s" % node_id
# Remove channel
if channel_id in self._channels:
del self._channels[channel_id]
# Remove rule
self._rules = [r for r in self._rules if r.get("name") != rule_name]
return True
def get_node_subscriptions(self, node_id: str) -> list[str]:
"""Get categories a node is subscribed to."""
rule_name = "sub_%s" % node_id
for rule in self._rules:
if rule.get("name") == rule_name:
categories = rule.get("categories", [])
return categories if categories else ["all"]
return []
def cleanup_recent(self, max_age: int = 3600):
"""Clean up old entries from recent alerts cache."""
now = time.time()
self._recent = {
k: v for k, v in self._recent.items()
if now - v < max_age
}

View file

@ -0,0 +1,64 @@
"""Message summarizer for mesh delivery."""
import logging
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from ..backends import LLMBackend
logger = logging.getLogger(__name__)
class MessageSummarizer:
"""Summarizes long messages for mesh delivery.
Only used when:
- Delivering to mesh channels (broadcast or DM)
- Message exceeds max_chars (default 200)
- LLM backend is available
Email and webhook channels receive full messages.
"""
def __init__(self, llm_backend: Optional["LLMBackend"] = None):
self._llm = llm_backend
async def summarize(self, message: str, max_chars: int = 195) -> str:
"""Summarize a message to fit within max_chars.
Args:
message: Original message text
max_chars: Maximum characters for summary
Returns:
Summarized message, or truncated original if LLM unavailable
"""
if len(message) <= max_chars:
return message
if not self._llm:
return message[:max_chars - 3] + "..."
prompt = (
"Summarize this alert in under %d characters. "
"Keep severity, location, and key facts. No preamble, just the summary:\n\n%s"
% (max_chars, message)
)
try:
# Use the LLM to generate a summary
response = await self._llm.generate(
prompt,
system_prompt="You are a concise alert summarizer. Output only the summary, no explanation.",
max_tokens=100,
)
summary = response.strip()
# Ensure it fits
if len(summary) <= max_chars:
return summary
return summary[:max_chars - 3] + "..."
except Exception as e:
logger.debug("LLM summarization failed: %s", e)
return message[:max_chars - 3] + "..."