From 3bf5e3dfbc20dc5ad76656c431c87ef728023a42 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Wed, 13 May 2026 03:51:37 +0000 Subject: [PATCH] feat(notifications): alert routing with channels, rules, and delivery - Notification pipeline: categories -> rules -> channels - Channels: mesh broadcast, mesh DM, email (SMTP), webhook (generic) - Per-rule severity threshold and category filtering - Quiet hours with emergency override - LLM summarization for mesh delivery over 200 chars only - !subscribe shows available categories, easy mesh subscription - Dashboard notification rules API endpoints - Extensible channel system for future transports (Winlink, JS8Call) - config.yaml notification section with examples Co-Authored-By: Claude Opus 4.5 --- config.example.yaml | 61 ++++ meshai/commands/dispatcher.py | 13 +- meshai/commands/subscribe.py | 189 +++++++----- meshai/config.py | 49 ++++ meshai/dashboard/api/notification_routes.py | 113 +++++++ meshai/dashboard/server.py | 3 + meshai/main.py | 91 +++--- meshai/notifications/__init__.py | 6 + meshai/notifications/categories.py | 157 ++++++++++ meshai/notifications/channels.py | 308 ++++++++++++++++++++ meshai/notifications/router.py | 266 +++++++++++++++++ meshai/notifications/summarizer.py | 64 ++++ 12 files changed, 1215 insertions(+), 105 deletions(-) create mode 100644 meshai/dashboard/api/notification_routes.py create mode 100644 meshai/notifications/__init__.py create mode 100644 meshai/notifications/categories.py create mode 100644 meshai/notifications/channels.py create mode 100644 meshai/notifications/router.py create mode 100644 meshai/notifications/summarizer.py diff --git a/config.example.yaml b/config.example.yaml index ae713d8..d6e3b02 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -216,6 +216,67 @@ environmental: confidence_min: "nominal" # low, nominal, high proximity_km: 10.0 # km to match known fire perimeters + +# === NOTIFICATION DELIVERY === +# Route alerts to channels (mesh, email, webhook) based on rules. +# Categories match alert types from alert_engine.py. +# Severity levels: info, advisory, watch, warning, critical, emergency +# +notifications: + enabled: false + quiet_hours_start: "22:00" # Suppress non-emergency alerts during quiet hours + quiet_hours_end: "06:00" + dedup_seconds: 600 # Don't resend same alert within this window + + # Notification channels + channels: + # Mesh broadcast - posts to mesh channel + - id: "mesh-channel" + type: mesh_broadcast + enabled: true + channel_index: 0 # Mesh channel to broadcast on + + # Example: Email channel (uncomment to enable) + # - id: "email-admin" + # type: email + # smtp_host: "smtp.gmail.com" + # smtp_port: 587 + # smtp_user: "you@gmail.com" + # smtp_password: "${SMTP_PASSWORD}" # Use env var + # smtp_tls: true + # from_address: "meshai@yourdomain.com" + # recipients: ["admin@yourdomain.com"] + + # Example: Webhook (Discord, Slack, ntfy, Home Assistant) + # - id: "discord-alerts" + # type: webhook + # url: "https://discord.com/api/webhooks/..." + + # - id: "ntfy-alerts" + # type: webhook + # url: "https://ntfy.sh/your-topic" + + # Notification rules - match alerts to channels + rules: + # All emergencies -> mesh broadcast + - name: "emergencies" + categories: [] # Empty = all categories + min_severity: "emergency" + channel_ids: ["mesh-channel"] + override_quiet: true # Send even during quiet hours + + # Example: Fire alerts at any severity + # - name: "fire-alerts" + # categories: ["wildfire_proximity", "new_ignition"] + # min_severity: "advisory" + # channel_ids: ["mesh-channel", "email-admin"] + + # Example: Infrastructure alerts + # - name: "infra-alerts" + # categories: ["infra_offline", "critical_node_down", "battery_emergency"] + # min_severity: "warning" + # channel_ids: ["mesh-channel"] + # === WEB DASHBOARD === dashboard: enabled: true diff --git a/meshai/commands/dispatcher.py b/meshai/commands/dispatcher.py index 556e7f7..1275a91 100644 --- a/meshai/commands/dispatcher.py +++ b/meshai/commands/dispatcher.py @@ -162,6 +162,7 @@ def create_dispatcher( health_engine=None, subscription_manager=None, env_store=None, + notification_router=None, ) -> CommandDispatcher: """Create and populate command dispatcher with default commands. @@ -224,24 +225,24 @@ def create_dispatcher( dispatcher.register(alias_handler) # Register subscription commands - sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store) + sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store, notification_router) dispatcher.register(sub_cmd) for alias in getattr(sub_cmd, 'aliases', []): - alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store) + alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store, notification_router) alias_handler.name = alias dispatcher.register(alias_handler) - unsub_cmd = UnsubCommand(subscription_manager) + unsub_cmd = UnsubCommand(subscription_manager, notification_router) dispatcher.register(unsub_cmd) for alias in getattr(unsub_cmd, 'aliases', []): - alias_handler = UnsubCommand(subscription_manager) + alias_handler = UnsubCommand(subscription_manager, notification_router) alias_handler.name = alias dispatcher.register(alias_handler) - mysubs_cmd = MySubsCommand(subscription_manager) + mysubs_cmd = MySubsCommand(subscription_manager, notification_router) dispatcher.register(mysubs_cmd) for alias in getattr(mysubs_cmd, 'aliases', []): - alias_handler = MySubsCommand(subscription_manager) + alias_handler = MySubsCommand(subscription_manager, notification_router) alias_handler.name = alias dispatcher.register(alias_handler) diff --git a/meshai/commands/subscribe.py b/meshai/commands/subscribe.py index 5dbe705..36db916 100644 --- a/meshai/commands/subscribe.py +++ b/meshai/commands/subscribe.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from ..mesh_data_store import MeshDataStore from ..mesh_reporter import MeshReporter from ..subscriptions import SubscriptionManager + from ..notifications.router import NotificationRouter class SubCommand(CommandHandler): @@ -15,7 +16,7 @@ class SubCommand(CommandHandler): name = "sub" description = "Subscribe to reports or alerts" - usage = "!sub daily|weekly|alerts [time] [day] [scope]" + usage = "!sub daily|weekly|alerts| [time] [day] [scope]" aliases = ["subscribe"] def __init__( @@ -23,23 +24,35 @@ class SubCommand(CommandHandler): subscription_manager: "SubscriptionManager" = None, mesh_reporter: "MeshReporter" = None, data_store: "MeshDataStore" = None, + notification_router: "NotificationRouter" = None, ): self._sub_manager = subscription_manager self._reporter = mesh_reporter self._data_store = data_store + self._notification_router = notification_router async def execute(self, args: str, context: CommandContext) -> str: """Handle subscription command.""" - if not self._sub_manager: - return "Subscriptions not available." - parts = args.strip().split() + + # No args - show available alert categories if not parts: - return self._usage_help() + return self._show_categories() sub_type = parts[0].lower() + + # Check if it's a category subscription + if self._notification_router: + from ..notifications.categories import ALERT_CATEGORIES + if sub_type in ALERT_CATEGORIES or sub_type == "all": + return self._handle_category_subscription(sub_type, context) + + # Legacy subscription types if sub_type not in ("daily", "weekly", "alerts"): - return f"Invalid type '{sub_type}'. Use: daily, weekly, or alerts" + return self._show_categories() + + if not self._sub_manager: + return "Subscriptions not available." try: if sub_type == "daily": @@ -51,15 +64,55 @@ class SubCommand(CommandHandler): except ValueError as e: return f"Error: {e}" + def _show_categories(self) -> str: + """Show available alert categories.""" + try: + from ..notifications.categories import ALERT_CATEGORIES + except ImportError: + return self._usage_help() + + lines = ["Available alert categories:"] + for cat_id, cat_info in ALERT_CATEGORIES.items(): + lines.append(f" {cat_id} - {cat_info['description']}") + lines.append("") + lines.append("Usage:") + lines.append(" !sub - subscribe to a category") + lines.append(" !sub all - subscribe to all alerts") + lines.append(" !sub alerts - legacy mesh-wide alerts") + + return "\n".join(lines) + + def _handle_category_subscription(self, category: str, context: CommandContext) -> str: + """Handle category-based alert subscription.""" + node_id = self._get_user_id(context) + + if category == "all": + categories = [] # Empty = all categories + else: + categories = [category] + + # Add subscription via notification router + rule_name = self._notification_router.add_mesh_subscription( + node_id=node_id, + categories=categories, + ) + + if category == "all": + return "Subscribed to all alert categories. Use !unsub to remove." + else: + from ..notifications.categories import get_category + cat_info = get_category(category) + return f"Subscribed to {cat_info['name']} alerts. Use !unsub {category} to remove." + def _usage_help(self) -> str: """Return usage help.""" return """Usage: !sub daily 1830 - daily mesh report at 6:30 PM !sub daily 1830 region SCID - daily region report -!sub daily 1830 node MHR - daily node report !sub weekly 0800 sun - weekly digest Sunday 8 AM -!sub alerts - mesh-wide alerts -!sub alerts region SCID - alerts for a region""" +!sub alerts - mesh-wide alerts (legacy) +!sub - subscribe to alert category +!sub all - subscribe to all alerts""" def _handle_daily(self, args: list, context: CommandContext) -> str: """Handle daily subscription.""" @@ -68,11 +121,9 @@ class SubCommand(CommandHandler): schedule_time = args[0] scope_type, scope_value = self._parse_scope(args[1:]) - - # Validate scope scope_value = self._validate_scope(scope_type, scope_value) - result = self._sub_manager.add( + self._sub_manager.add( user_id=self._get_user_id(context), sub_type="daily", schedule_time=schedule_time, @@ -92,11 +143,9 @@ class SubCommand(CommandHandler): schedule_time = args[0] schedule_day = args[1].lower() scope_type, scope_value = self._parse_scope(args[2:]) - - # Validate scope scope_value = self._validate_scope(scope_type, scope_value) - result = self._sub_manager.add( + self._sub_manager.add( user_id=self._get_user_id(context), sub_type="weekly", schedule_time=schedule_time, @@ -111,13 +160,11 @@ class SubCommand(CommandHandler): return f"Subscribed: weekly {scope_desc}report at {time_fmt} {day_fmt}" def _handle_alerts(self, args: list, context: CommandContext) -> str: - """Handle alerts subscription.""" + """Handle alerts subscription (legacy).""" scope_type, scope_value = self._parse_scope(args) - - # Validate scope scope_value = self._validate_scope(scope_type, scope_value) - result = self._sub_manager.add( + self._sub_manager.add( user_id=self._get_user_id(context), sub_type="alerts", scope_type=scope_type, @@ -128,15 +175,10 @@ class SubCommand(CommandHandler): return f"Subscribed: alerts for {scope_desc.strip() or 'mesh'}" def _parse_scope(self, args: list) -> tuple[str, str]: - """Parse scope from remaining args. - - Returns: - (scope_type, scope_value) tuple - """ + """Parse scope from remaining args.""" if not args: return "mesh", None - # Look for 'region' or 'node' keyword scope_type = "mesh" scope_value = None @@ -144,26 +186,17 @@ class SubCommand(CommandHandler): arg_lower = arg.lower() if arg_lower == "region": scope_type = "region" - # Everything after 'region' is the region name scope_value = " ".join(args[i + 1:]) if i + 1 < len(args) else None break elif arg_lower == "node": scope_type = "node" - # Next arg is the node identifier scope_value = args[i + 1] if i + 1 < len(args) else None break return scope_type, scope_value def _validate_scope(self, scope_type: str, scope_value: str) -> str: - """Validate and resolve scope value. - - Returns: - Resolved scope_value (e.g., full region name) - - Raises: - ValueError: If scope not found - """ + """Validate and resolve scope value.""" if scope_type == "mesh": return None @@ -172,14 +205,9 @@ class SubCommand(CommandHandler): if scope_type == "region" and self._reporter: region = self._reporter._find_region(scope_value) - if not region: - # List available regions - health = self._reporter.health_engine.mesh_health - if health: - available = [r.name for r in health.regions if r.node_ids] - return scope_value # Use as-is, will fail at delivery if invalid - raise ValueError(f"Region '{scope_value}' not found") - return region.name # Return canonical name + if region: + return region.name + return scope_value if scope_type == "node" and self._reporter: node = self._reporter._find_node(scope_value) @@ -191,7 +219,6 @@ class SubCommand(CommandHandler): def _get_user_id(self, context: CommandContext) -> str: """Extract user ID from context.""" - # sender_id is like "!abcd1234" - convert to node_num sender_id = context.sender_id if sender_id.startswith("!"): return str(int(sender_id[1:], 16)) @@ -217,26 +244,40 @@ class UnsubCommand(CommandHandler): name = "unsub" description = "Remove subscription(s)" - usage = "!unsub daily|weekly|alerts|all" + usage = "!unsub daily|weekly|alerts||all" aliases = ["unsubscribe"] - def __init__(self, subscription_manager: "SubscriptionManager" = None): + def __init__( + self, + subscription_manager: "SubscriptionManager" = None, + notification_router: "NotificationRouter" = None, + ): self._sub_manager = subscription_manager + self._notification_router = notification_router async def execute(self, args: str, context: CommandContext) -> str: """Handle unsubscribe command.""" - if not self._sub_manager: - return "Subscriptions not available." - sub_type = args.strip().lower() if args else None if not sub_type: - return "Usage: !unsub daily|weekly|alerts|all" - - if sub_type not in ("daily", "weekly", "alerts", "all"): - return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, or all" + return "Usage: !unsub daily|weekly|alerts||all" user_id = self._get_user_id(context) + + # Check if it's a category unsubscription + if self._notification_router: + from ..notifications.categories import ALERT_CATEGORIES + if sub_type in ALERT_CATEGORIES or sub_type == "all": + self._notification_router.remove_mesh_subscription(user_id) + return "Removed alert subscriptions" + + # Legacy subscription types + if not self._sub_manager: + return "Subscriptions not available." + + if sub_type not in ("daily", "weekly", "alerts", "all"): + return f"Invalid type '{sub_type}'. Use: daily, weekly, alerts, , or all" + removed = self._sub_manager.remove(user_id, sub_type if sub_type != "all" else None) if removed == 0: @@ -260,26 +301,44 @@ class MySubsCommand(CommandHandler): name = "mysubs" description = "List your subscriptions" usage = "!mysubs" - aliases = ["subs"] + aliases = ["subs", "subscriptions"] - def __init__(self, subscription_manager: "SubscriptionManager" = None): + def __init__( + self, + subscription_manager: "SubscriptionManager" = None, + notification_router: "NotificationRouter" = None, + ): self._sub_manager = subscription_manager + self._notification_router = notification_router async def execute(self, args: str, context: CommandContext) -> str: """List user's subscriptions.""" - if not self._sub_manager: - return "Subscriptions not available." - user_id = self._get_user_id(context) - subs = self._sub_manager.get_user_subs(user_id) + lines = [] - if not subs: + # Check notification router subscriptions + if self._notification_router: + categories = self._notification_router.get_node_subscriptions(user_id) + if categories: + if categories == ["all"]: + lines.append("Alert subscriptions: all categories") + else: + lines.append(f"Alert subscriptions: {', '.join(categories)}") + + # Check legacy subscriptions + if self._sub_manager: + subs = self._sub_manager.get_user_subs(user_id) + if subs: + if not lines: + lines.append("Your subscriptions:") + else: + lines.append("\nScheduled reports:") + for i, sub in enumerate(subs, 1): + lines.append(f" {i}. {self._format_sub(sub)}") + + if not lines: return "No active subscriptions. Use !sub to subscribe." - lines = ["Your subscriptions:"] - for i, sub in enumerate(subs, 1): - lines.append(f" {i}. {self._format_sub(sub)}") - return "\n".join(lines) def _format_sub(self, sub: dict) -> str: @@ -301,7 +360,7 @@ class MySubsCommand(CommandHandler): time_str = self._format_time(sub.get("schedule_time", "0000")) day_str = (sub.get("schedule_day") or "").capitalize() return f"Weekly {scope_desc}report at {time_str} {day_str}" - else: # alerts + else: return f"Alerts for {scope_desc.strip() or 'mesh'}" def _format_time(self, hhmm: str) -> str: diff --git a/meshai/config.py b/meshai/config.py index 2f49ed3..77e124b 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -425,6 +425,47 @@ class EnvironmentalConfig: @dataclass +class NotificationChannelConfig: + """Configuration for a notification channel.""" + + id: str = "" + type: str = "" + enabled: bool = True + channel_index: int = 0 + node_ids: list = field(default_factory=list) + smtp_host: str = "" + smtp_port: int = 587 + smtp_user: str = "" + smtp_password: str = "" + smtp_tls: bool = True + from_address: str = "" + recipients: list = field(default_factory=list) + url: str = "" + headers: dict = field(default_factory=dict) + + +@dataclass +class NotificationRuleConfig: + """Configuration for a notification rule.""" + + name: str = "" + categories: list = field(default_factory=list) + min_severity: str = "warning" + channel_ids: list = field(default_factory=list) + override_quiet: bool = False + + +@dataclass +class NotificationsConfig: + """Notification system settings.""" + + enabled: bool = False + quiet_hours_start: str = "22:00" + quiet_hours_end: str = "06:00" + dedup_seconds: int = 600 + channels: list = field(default_factory=list) + rules: list = field(default_factory=list) + class DashboardConfig: """Web dashboard settings.""" @@ -462,6 +503,7 @@ class Config: mesh_intelligence: MeshIntelligenceConfig = field(default_factory=MeshIntelligenceConfig) environmental: EnvironmentalConfig = field(default_factory=EnvironmentalConfig) dashboard: DashboardConfig = field(default_factory=DashboardConfig) + notifications: NotificationsConfig = field(default_factory=NotificationsConfig) _config_path: Optional[Path] = field(default=None, repr=False) @@ -535,6 +577,13 @@ def _dict_to_dataclass(cls, data: dict): kwargs[key] = _dict_to_dataclass(Roads511Config, value) elif key == "firms" and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(FIRMSConfig, value) + elif key == "notifications" and isinstance(value, dict): + notifications = _dict_to_dataclass(NotificationsConfig, value) + if "channels" in value and isinstance(value["channels"], list): + notifications.channels = [_dict_to_dataclass(NotificationChannelConfig, c) if isinstance(c, dict) else c for c in value["channels"]] + if "rules" in value and isinstance(value["rules"], list): + notifications.rules = [_dict_to_dataclass(NotificationRuleConfig, r) if isinstance(r, dict) else r for r in value["rules"]] + kwargs[key] = notifications else: kwargs[key] = value diff --git a/meshai/dashboard/api/notification_routes.py b/meshai/dashboard/api/notification_routes.py new file mode 100644 index 0000000..e2b59da --- /dev/null +++ b/meshai/dashboard/api/notification_routes.py @@ -0,0 +1,113 @@ +"""Notification API routes.""" + +from fastapi import APIRouter, Request, HTTPException +from pydantic import BaseModel +from typing import Optional + +router = APIRouter(prefix="/notifications", tags=["notifications"]) + + +class ChannelCreate(BaseModel): + """Channel creation request.""" + id: str + type: str + enabled: bool = True + channel_index: int = 0 + node_ids: list[str] = [] + smtp_host: str = "" + smtp_port: int = 587 + smtp_user: str = "" + smtp_password: str = "" + smtp_tls: bool = True + from_address: str = "" + recipients: list[str] = [] + url: str = "" + headers: dict = {} + + +class RuleCreate(BaseModel): + """Rule creation request.""" + name: str + categories: list[str] = [] + min_severity: str = "warning" + channel_ids: list[str] = [] + override_quiet: bool = False + + +class QuietHoursUpdate(BaseModel): + """Quiet hours update request.""" + start: str + end: str + + +@router.get("/categories") +async def get_categories(): + """Get all alert categories with descriptions.""" + try: + from ...notifications.categories import list_categories + return list_categories() + except ImportError: + return [] + + +@router.get("/channels") +async def get_channels(request: Request): + """Get configured notification channels.""" + notification_router = getattr(request.app.state, "notification_router", None) + if not notification_router: + return [] + return notification_router.get_channels() + + +@router.post("/channels") +async def create_channel(request: Request, channel: ChannelCreate): + """Create a new notification channel.""" + # This would require runtime config modification + # For now, return not implemented + raise HTTPException(status_code=501, detail="Channel creation requires config file edit") + + +@router.post("/channels/{channel_id}/test") +async def test_channel(request: Request, channel_id: str): + """Send a test alert to a channel.""" + notification_router = getattr(request.app.state, "notification_router", None) + if not notification_router: + raise HTTPException(status_code=404, detail="Notification router not configured") + + success, message = await notification_router.test_channel(channel_id) + return {"success": success, "message": message} + + +@router.get("/rules") +async def get_rules(request: Request): + """Get configured notification rules.""" + notification_router = getattr(request.app.state, "notification_router", None) + if not notification_router: + return [] + return notification_router.get_rules() + + +@router.post("/rules") +async def create_rule(request: Request, rule: RuleCreate): + """Create a new notification rule.""" + # This would require runtime config modification + raise HTTPException(status_code=501, detail="Rule creation requires config file edit") + + +@router.get("/quiet-hours") +async def get_quiet_hours(request: Request): + """Get quiet hours configuration.""" + config = getattr(request.app.state, "config", None) + if not config or not hasattr(config, "notifications"): + return {"start": "22:00", "end": "06:00"} + return { + "start": config.notifications.quiet_hours_start, + "end": config.notifications.quiet_hours_end, + } + + +@router.put("/quiet-hours") +async def update_quiet_hours(request: Request, quiet_hours: QuietHoursUpdate): + """Update quiet hours configuration.""" + # This would require runtime config modification + raise HTTPException(status_code=501, detail="Quiet hours update requires config file edit") diff --git a/meshai/dashboard/server.py b/meshai/dashboard/server.py index 1302b61..0d2908d 100644 --- a/meshai/dashboard/server.py +++ b/meshai/dashboard/server.py @@ -52,6 +52,7 @@ def create_app() -> FastAPI: from .api.mesh_routes import router as mesh_router from .api.env_routes import router as env_router from .api.alert_routes import router as alert_router + from .api.notification_routes import router as notification_router app.include_router(system_router, prefix="/api") app.include_router(config_router, prefix="/api") @@ -59,6 +60,7 @@ def create_app() -> FastAPI: app.include_router(env_router, prefix="/api") app.include_router(alert_router, prefix="/api") + app.include_router(notification_router, prefix="/api") # WebSocket router (no prefix, path is /ws/live) app.include_router(ws_router) @@ -110,6 +112,7 @@ async def start_dashboard(meshai_instance: "MeshAI") -> DashboardBroadcaster: app.state.alert_engine = getattr(meshai_instance, "alert_engine", None) app.state.env_store = getattr(meshai_instance, "env_store", None) app.state.subscription_manager = meshai_instance.subscription_manager + app.state.notification_router = getattr(meshai_instance, "notification_router", None) # Create broadcaster and attach to app state broadcaster = DashboardBroadcaster() diff --git a/meshai/main.py b/meshai/main.py index df83d3b..930fe5a 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -44,6 +44,7 @@ class MeshAI: self.mesh_reporter = None self.subscription_manager = None self.alert_engine = None + self.notification_router = None self.env_store = None # Environmental feeds store self._last_sub_check: float = 0.0 self.router: Optional[MessageRouter] = None @@ -337,6 +338,18 @@ class MeshAI: ) logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})") + + # Notification router + if self.config.notifications.enabled: + from .notifications.router import NotificationRouter + self.notification_router = NotificationRouter( + config=self.config.notifications, + connector=self.connector, + llm_backend=self.llm, + timezone=self.config.timezone, + ) + logger.info("Notification router initialized") + # Environmental feeds env_cfg = self.config.environmental if env_cfg.enabled: @@ -394,6 +407,7 @@ class MeshAI: health_engine=self.health_engine, subscription_manager=self.subscription_manager, env_store=self.env_store, + notification_router=self.notification_router, ) # Message router @@ -406,6 +420,7 @@ class MeshAI: health_engine=self.health_engine, mesh_reporter=self.mesh_reporter, env_store=self.env_store, + notification_router=self.notification_router, ) # Responder @@ -539,40 +554,48 @@ class MeshAI: if pid_file.exists(): pid_file.unlink() - async def _dispatch_alerts(self, alerts: list[dict]) -> None: - """Dispatch alerts to subscribers and alert channel.""" - mi = self.config.mesh_intelligence - alert_channel = getattr(mi, 'alert_channel', -1) - - for alert in alerts: - message = alert["message"] - logger.info(f"ALERT: {message}") - - # Send to alert channel if configured - if alert_channel >= 0 and self.connector: - try: - self.connector.send_message( - text=message, - destination=None, # Broadcast - channel=alert_channel, - ) - logger.info(f"Alert sent to channel {alert_channel}") - except Exception as e: - logger.error(f"Failed to send channel alert: {e}") - - # Send DMs to matching subscribers - if self.alert_engine and self.subscription_manager: - subscribers = self.alert_engine.get_subscribers_for_alert(alert) - for sub in subscribers: - user_id = sub["user_id"] - try: - await self._send_sub_dm(user_id, message) - logger.info(f"Alert DM sent to {user_id}: {alert['type']}") - except Exception as e: - logger.error(f"Failed to send alert DM to {user_id}: {e}") - - self.alert_engine.clear_pending() - + async def _dispatch_alerts(self, alerts: list[dict]) -> None: + """Dispatch alerts to subscribers and alert channel.""" + mi = self.config.mesh_intelligence + alert_channel = getattr(mi, 'alert_channel', -1) + + for alert in alerts: + message = alert["message"] + logger.info(f"ALERT: {message}") + + # Route through notification router if enabled + if self.notification_router: + try: + await self.notification_router.process_alert(alert) + except Exception as e: + logger.error(f"Notification router error: {e}") + + # Fallback: Send to alert channel if no notification router + elif alert_channel >= 0 and self.connector: + try: + self.connector.send_message( + text=message, + destination=None, + channel=alert_channel, + ) + logger.info(f"Alert sent to channel {alert_channel}") + except Exception as e: + logger.error(f"Failed to send channel alert: {e}") + + # Fallback: Send DMs to matching subscribers + if self.alert_engine and self.subscription_manager: + subscribers = self.alert_engine.get_subscribers_for_alert(alert) + for sub in subscribers: + user_id = sub["user_id"] + try: + await self._send_sub_dm(user_id, message) + logger.info(f"Alert DM sent to {user_id}: {alert['type']}") + except Exception as e: + logger.error(f"Failed to send alert DM to {user_id}: {e}") + + if self.alert_engine: + self.alert_engine.clear_pending() + async def _check_scheduled_subs(self) -> None: """Check for and deliver due scheduled reports.""" from datetime import datetime diff --git a/meshai/notifications/__init__.py b/meshai/notifications/__init__.py new file mode 100644 index 0000000..57f2aad --- /dev/null +++ b/meshai/notifications/__init__.py @@ -0,0 +1,6 @@ +"""Notification system for MeshAI alerts.""" + +from .categories import ALERT_CATEGORIES, get_category, list_categories +from .router import NotificationRouter + +__all__ = ["ALERT_CATEGORIES", "get_category", "list_categories", "NotificationRouter"] diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py new file mode 100644 index 0000000..fb4c170 --- /dev/null +++ b/meshai/notifications/categories.py @@ -0,0 +1,157 @@ +"""Alert category registry. + +Defines all alertable conditions with human-readable names and descriptions. +""" + +ALERT_CATEGORIES = { + # Infrastructure alerts + "infra_offline": { + "name": "Infrastructure Offline", + "description": "An infrastructure node stopped responding", + "default_severity": "warning", + }, + "critical_node_down": { + "name": "Critical Node Down", + "description": "A node marked as critical went offline", + "default_severity": "critical", + }, + "infra_recovery": { + "name": "Infrastructure Recovery", + "description": "An infrastructure node came back online", + "default_severity": "info", + }, + "new_router": { + "name": "New Router", + "description": "A new router appeared on the mesh", + "default_severity": "info", + }, + + # Power alerts + "battery_warning": { + "name": "Battery Warning", + "description": "Infrastructure node battery below warning threshold", + "default_severity": "warning", + }, + "battery_critical": { + "name": "Battery Critical", + "description": "Infrastructure node battery below critical threshold", + "default_severity": "critical", + }, + "battery_emergency": { + "name": "Battery Emergency", + "description": "Infrastructure node battery critically low", + "default_severity": "emergency", + }, + "battery_trend": { + "name": "Battery Declining", + "description": "Battery showing declining trend over 7 days", + "default_severity": "warning", + }, + "power_source_change": { + "name": "Power Source Change", + "description": "Node switched from USB to battery (possible outage)", + "default_severity": "warning", + }, + "solar_not_charging": { + "name": "Solar Not Charging", + "description": "Solar panel not charging during daylight hours", + "default_severity": "warning", + }, + + # Utilization alerts + "sustained_high_util": { + "name": "High Utilization", + "description": "Channel utilization elevated for extended period", + "default_severity": "warning", + }, + "packet_flood": { + "name": "Packet Flood", + "description": "Node sending excessive packets", + "default_severity": "warning", + }, + + # Coverage alerts + "infra_single_gateway": { + "name": "Single Gateway", + "description": "Infrastructure node dropped to single gateway coverage", + "default_severity": "warning", + }, + "feeder_offline": { + "name": "Feeder Offline", + "description": "A feeder gateway stopped responding", + "default_severity": "warning", + }, + "region_total_blackout": { + "name": "Region Blackout", + "description": "All infrastructure in a region is offline", + "default_severity": "emergency", + }, + + # Health score alerts + "mesh_score_low": { + "name": "Mesh Health Low", + "description": "Overall mesh health score below threshold", + "default_severity": "warning", + }, + "region_score_low": { + "name": "Region Health Low", + "description": "A region's health score below threshold", + "default_severity": "warning", + }, + + # Environmental alerts + "weather_warning": { + "name": "Severe Weather", + "description": "NWS warning or advisory for mesh area", + "default_severity": "warning", + }, + "hf_blackout": { + "name": "HF Radio Blackout", + "description": "R3+ solar event degrading HF propagation", + "default_severity": "warning", + }, + "tropospheric_ducting": { + "name": "Tropospheric Ducting", + "description": "Atmospheric conditions extending VHF/UHF range", + "default_severity": "info", + }, + "wildfire_proximity": { + "name": "Fire Near Mesh", + "description": "Wildfire detected within configured distance", + "default_severity": "warning", + }, + "new_ignition": { + "name": "New Fire Ignition", + "description": "Satellite hotspot not matching any known fire", + "default_severity": "warning", + }, + "flood_warning": { + "name": "Flood Warning", + "description": "Stream gauge exceeds flood threshold", + "default_severity": "warning", + }, + "road_closure": { + "name": "Road Closure", + "description": "Full road closure on monitored corridor", + "default_severity": "warning", + }, +} + + +def get_category(category_id: str) -> dict: + """Get category info by ID, with fallback for unknown categories.""" + if category_id in ALERT_CATEGORIES: + return ALERT_CATEGORIES[category_id] + return { + "name": category_id.replace("_", " ").title(), + "description": f"Alert type: {category_id}", + "default_severity": "info", + } + + +def list_categories() -> list[dict]: + """List all categories with their IDs.""" + return [ + {"id": cat_id, **cat_info} + for cat_id, cat_info in ALERT_CATEGORIES.items() + ] diff --git a/meshai/notifications/channels.py b/meshai/notifications/channels.py new file mode 100644 index 0000000..8a83410 --- /dev/null +++ b/meshai/notifications/channels.py @@ -0,0 +1,308 @@ +"""Notification channel implementations.""" + +import asyncio +import logging +import smtplib +import ssl +import time +from abc import ABC, abstractmethod +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from typing import Optional, TYPE_CHECKING + +import httpx + +if TYPE_CHECKING: + from ..connector import MeshConnector + +logger = logging.getLogger(__name__) + + +class NotificationChannel(ABC): + """Base class for notification delivery channels.""" + + channel_type: str = "base" + + @abstractmethod + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert. Returns True on success.""" + raise NotImplementedError + + @abstractmethod + async def test(self) -> tuple[bool, str]: + """Send test message. Returns (success, message).""" + raise NotImplementedError + + +class MeshBroadcastChannel(NotificationChannel): + """Post alert to mesh channel.""" + + channel_type = "mesh_broadcast" + + def __init__(self, connector: "MeshConnector", channel_index: int = 0): + self._connector = connector + self._channel = channel_index + + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert to mesh channel.""" + if not self._connector: + logger.warning("No mesh connector available") + return False + + try: + message = alert.get("message", "") + self._connector.send_message( + text=message, + destination=None, + channel=self._channel, + ) + logger.info("Broadcast alert to channel %d", self._channel) + return True + except Exception as e: + logger.error("Failed to broadcast alert: %s", e) + return False + + async def test(self) -> tuple[bool, str]: + """Send test broadcast.""" + try: + self._connector.send_message( + text="[TEST] MeshAI notification system test", + destination=None, + channel=self._channel, + ) + return True, "Test message sent to channel %d" % self._channel + except Exception as e: + return False, "Failed to send test: %s" % e + + +class MeshDMChannel(NotificationChannel): + """DM alert to specific node IDs.""" + + channel_type = "mesh_dm" + + def __init__(self, connector: "MeshConnector", node_ids: list[str]): + self._connector = connector + self._node_ids = node_ids + + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert via DM to configured nodes.""" + if not self._connector: + return False + + message = alert.get("message", "") + success = True + + for node_id in self._node_ids: + try: + dest = int(node_id) if node_id.isdigit() else node_id + self._connector.send_message(text=message, destination=dest, channel=0) + except Exception as e: + logger.error("Failed to DM %s: %s", node_id, e) + success = False + + return success + + async def test(self) -> tuple[bool, str]: + """Send test DM to all configured nodes.""" + if not self._node_ids: + return False, "No node IDs configured" + try: + for node_id in self._node_ids: + dest = int(node_id) if node_id.isdigit() else node_id + self._connector.send_message( + text="[TEST] MeshAI notification test", + destination=dest, + channel=0, + ) + return True, "Test DMs sent to %d nodes" % len(self._node_ids) + except Exception as e: + return False, "Failed to send test DMs: %s" % e + + +class EmailChannel(NotificationChannel): + """Send alert via SMTP email.""" + + channel_type = "email" + + def __init__( + self, + smtp_host: str, + smtp_port: int, + smtp_user: str, + smtp_password: str, + smtp_tls: bool, + from_address: str, + recipients: list[str], + ): + self._host = smtp_host + self._port = smtp_port + self._user = smtp_user + self._password = smtp_password + self._tls = smtp_tls + self._from = from_address + self._recipients = recipients + + async def deliver(self, alert: dict, rule: dict) -> bool: + """Send alert via email.""" + if not self._recipients: + return False + + alert_type = alert.get("type", "alert") + severity = alert.get("severity", "info").upper() + message = alert.get("message", "") + subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title()) + body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % ( + alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message + ) + + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._send_email, subject, body) + return True + except Exception as e: + logger.error("Failed to send email: %s", e) + return False + + def _send_email(self, subject: str, body: str): + msg = MIMEMultipart() + msg["From"] = self._from + msg["To"] = ", ".join(self._recipients) + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain")) + + if self._tls: + context = ssl.create_default_context() + with smtplib.SMTP(self._host, self._port) as server: + server.starttls(context=context) + if self._user and self._password: + server.login(self._user, self._password) + server.sendmail(self._from, self._recipients, msg.as_string()) + else: + with smtplib.SMTP(self._host, self._port) as server: + if self._user and self._password: + server.login(self._user, self._password) + server.sendmail(self._from, self._recipients, msg.as_string()) + + async def test(self) -> tuple[bool, str]: + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, + self._send_email, + "[MeshAI TEST] Notification Test", + "Test message from MeshAI.", + ) + return True, "Test email sent to %d recipients" % len(self._recipients) + except Exception as e: + return False, "Failed to send test email: %s" % e + + +class WebhookChannel(NotificationChannel): + """POST alert JSON to a URL.""" + + channel_type = "webhook" + + def __init__(self, url: str, headers: Optional[dict] = None): + self._url = url + self._headers = headers or {} + + async def deliver(self, alert: dict, rule: dict) -> bool: + """POST alert to webhook URL.""" + payload = { + "type": alert.get("type"), + "severity": alert.get("severity", "info"), + "message": alert.get("message", ""), + "timestamp": time.time(), + "node_name": alert.get("node_name"), + "region": alert.get("region"), + } + + # Discord/Slack format + if "discord.com" in self._url or "slack.com" in self._url: + severity = alert.get("severity", "info") + color = { + "emergency": 0xFF0000, + "critical": 0xFF4444, + "warning": 0xFFAA00, + "info": 0x0099FF, + }.get(severity, 0x888888) + payload = { + "embeds": [{ + "title": "MeshAI: %s" % alert.get("type", "unknown"), + "description": alert.get("message", ""), + "color": color, + }] + } + + # ntfy format + elif "ntfy" in self._url: + headers = { + **self._headers, + "Title": "MeshAI: %s" % alert.get("type", "alert"), + "Priority": "3", + } + try: + async with httpx.AsyncClient() as client: + resp = await client.post( + self._url, + content=alert.get("message", ""), + headers=headers, + timeout=10, + ) + return resp.status_code < 400 + except Exception as e: + logger.error("Webhook failed: %s", e) + return False + + try: + async with httpx.AsyncClient() as client: + resp = await client.post( + self._url, + json=payload, + headers={"Content-Type": "application/json", **self._headers}, + timeout=10, + ) + return resp.status_code < 400 + except Exception as e: + logger.error("Webhook failed: %s", e) + return False + + async def test(self) -> tuple[bool, str]: + test_alert = {"type": "test", "severity": "info", "message": "MeshAI test message"} + success = await self.deliver(test_alert, {}) + if success: + return True, "Test sent to %s" % self._url + return False, "Webhook failed" + + +def create_channel(config: dict, connector=None) -> NotificationChannel: + """Create a channel instance from config.""" + channel_type = config.get("type", "") + + if channel_type == "mesh_broadcast": + return MeshBroadcastChannel( + connector=connector, + channel_index=config.get("channel_index", 0), + ) + elif channel_type == "mesh_dm": + return MeshDMChannel( + connector=connector, + node_ids=config.get("node_ids", []), + ) + elif channel_type == "email": + return EmailChannel( + smtp_host=config.get("smtp_host", ""), + smtp_port=config.get("smtp_port", 587), + smtp_user=config.get("smtp_user", ""), + smtp_password=config.get("smtp_password", ""), + smtp_tls=config.get("smtp_tls", True), + from_address=config.get("from_address", ""), + recipients=config.get("recipients", []), + ) + elif channel_type == "webhook": + return WebhookChannel( + url=config.get("url", ""), + headers=config.get("headers", {}), + ) + else: + raise ValueError("Unknown channel type: %s" % channel_type) diff --git a/meshai/notifications/router.py b/meshai/notifications/router.py new file mode 100644 index 0000000..183950a --- /dev/null +++ b/meshai/notifications/router.py @@ -0,0 +1,266 @@ +"""Notification router - matches alerts to rules and delivers via channels.""" + +import logging +import time +from datetime import datetime +from typing import Optional, TYPE_CHECKING + +from .channels import create_channel, NotificationChannel +from .summarizer import MessageSummarizer + +if TYPE_CHECKING: + from ..connector import MeshConnector + +logger = logging.getLogger(__name__) + +# Severity levels in order +SEVERITY_ORDER = ["info", "advisory", "watch", "warning", "critical", "emergency"] + + +class NotificationRouter: + """Routes alerts through matching rules to notification channels.""" + + def __init__( + self, + config, + connector: Optional["MeshConnector"] = None, + llm_backend=None, + timezone: str = "America/Boise", + ): + self._channels: dict[str, NotificationChannel] = {} + self._rules: list[dict] = [] + self._quiet_start = getattr(config, "quiet_hours_start", "22:00") + self._quiet_end = getattr(config, "quiet_hours_end", "06:00") + self._timezone = timezone + self._dedup_window = getattr(config, "dedup_seconds", 600) + self._recent: dict[tuple, float] = {} # (category, event_key) -> last_sent_time + self._summarizer = MessageSummarizer(llm_backend) if llm_backend else None + self._connector = connector + + # Create channel instances from config + channels_config = getattr(config, "channels", []) + for ch_config in channels_config: + if hasattr(ch_config, "__dict__"): + ch_dict = {k: v for k, v in ch_config.__dict__.items() if not k.startswith("_")} + else: + ch_dict = ch_config + + if not ch_dict.get("enabled", True): + continue + + channel_id = ch_dict.get("id", "") + if not channel_id: + continue + + try: + channel = create_channel(ch_dict, connector) + self._channels[channel_id] = channel + logger.debug("Created notification channel: %s (%s)", channel_id, ch_dict.get("type")) + except Exception as e: + logger.warning("Failed to create channel %s: %s", channel_id, e) + + # Load rules + rules_config = getattr(config, "rules", []) + for rule in rules_config: + if hasattr(rule, "__dict__"): + rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")} + else: + rule_dict = rule + self._rules.append(rule_dict) + + logger.info( + "Notification router initialized: %d channels, %d rules", + len(self._channels), + len(self._rules), + ) + + async def process_alert(self, alert: dict) -> bool: + """Route an alert through matching rules to channels. + + Returns True if alert was delivered to at least one channel. + """ + category = alert.get("type", "") + severity = alert.get("severity", "info") + delivered = False + + for rule in self._rules: + # Check category match + rule_categories = rule.get("categories", []) + if rule_categories and category not in rule_categories: + continue + + # Check severity threshold + min_severity = rule.get("min_severity", "info") + if not self._severity_meets(severity, min_severity): + continue + + # Check quiet hours (emergencies and criticals override) + if self._in_quiet_hours() and severity not in ("emergency", "critical"): + if not rule.get("override_quiet", False): + continue + + # Check dedup + event_id = alert.get("event_id", alert.get("message", "")[:50]) + dedup_key = (category, event_id) + now = time.time() + if dedup_key in self._recent: + if now - self._recent[dedup_key] < self._dedup_window: + logger.debug("Skipping duplicate alert: %s", category) + continue + self._recent[dedup_key] = now + + # Deliver to each channel in the rule + channel_ids = rule.get("channel_ids", []) + for channel_id in channel_ids: + channel = self._channels.get(channel_id) + if not channel: + continue + + try: + # Summarize for mesh channels if over 200 chars + delivery_alert = alert + message = alert.get("message", "") + if channel.channel_type in ("mesh_broadcast", "mesh_dm"): + if len(message) > 200: + if self._summarizer: + summary = await self._summarizer.summarize(message, max_chars=195) + delivery_alert = {**alert, "message": summary} + else: + delivery_alert = {**alert, "message": message[:195] + "..."} + + success = await channel.deliver(delivery_alert, rule) + if success: + delivered = True + logger.info( + "Alert delivered via %s: %s", + channel_id, + category, + ) + except Exception as e: + logger.warning("Channel %s delivery failed: %s", channel_id, e) + + return delivered + + def _severity_meets(self, actual: str, required: str) -> bool: + """Check if actual severity meets or exceeds required severity.""" + try: + actual_idx = SEVERITY_ORDER.index(actual.lower()) + required_idx = SEVERITY_ORDER.index(required.lower()) + return actual_idx >= required_idx + except ValueError: + return True # Unknown severity, allow through + + def _in_quiet_hours(self) -> bool: + """Check if current time is within quiet hours.""" + try: + from zoneinfo import ZoneInfo + tz = ZoneInfo(self._timezone) + now = datetime.now(tz) + current_time = now.strftime("%H:%M") + + start = self._quiet_start + end = self._quiet_end + + if start <= end: + # Simple range (e.g., 01:00 to 06:00) + return start <= current_time <= end + else: + # Crosses midnight (e.g., 22:00 to 06:00) + return current_time >= start or current_time <= end + except Exception: + return False + + def get_channels(self) -> list[dict]: + """Get list of configured channels.""" + return [ + {"id": ch_id, "type": ch.channel_type} + for ch_id, ch in self._channels.items() + ] + + def get_rules(self) -> list[dict]: + """Get list of configured rules.""" + return self._rules + + async def test_channel(self, channel_id: str) -> tuple[bool, str]: + """Send a test alert to a specific channel.""" + channel = self._channels.get(channel_id) + if not channel: + return False, "Channel not found: %s" % channel_id + return await channel.test() + + def add_mesh_subscription( + self, + node_id: str, + categories: list[str], + rule_name: Optional[str] = None, + ) -> str: + """Add a mesh DM subscription for a node. + + Creates a channel and rule for the node to receive alerts. + Returns the rule name. + """ + # Create channel ID + channel_id = "mesh_dm_%s" % node_id + + # Create channel if it doesn't exist + if channel_id not in self._channels: + from .channels import MeshDMChannel + channel = MeshDMChannel( + connector=self._connector, + node_ids=[node_id], + ) + self._channels[channel_id] = channel + + # Create rule + if not rule_name: + rule_name = "sub_%s" % node_id + + # Check if rule already exists + for rule in self._rules: + if rule.get("name") == rule_name: + # Update existing rule + rule["categories"] = categories if categories else [] + rule["channel_ids"] = [channel_id] + return rule_name + + # Add new rule + self._rules.append({ + "name": rule_name, + "categories": categories if categories else [], # Empty = all + "min_severity": "warning", + "channel_ids": [channel_id], + "override_quiet": False, + }) + + return rule_name + + def remove_mesh_subscription(self, node_id: str) -> bool: + """Remove a mesh subscription for a node.""" + channel_id = "mesh_dm_%s" % node_id + rule_name = "sub_%s" % node_id + + # Remove channel + if channel_id in self._channels: + del self._channels[channel_id] + + # Remove rule + self._rules = [r for r in self._rules if r.get("name") != rule_name] + + return True + + def get_node_subscriptions(self, node_id: str) -> list[str]: + """Get categories a node is subscribed to.""" + rule_name = "sub_%s" % node_id + for rule in self._rules: + if rule.get("name") == rule_name: + categories = rule.get("categories", []) + return categories if categories else ["all"] + return [] + + def cleanup_recent(self, max_age: int = 3600): + """Clean up old entries from recent alerts cache.""" + now = time.time() + self._recent = { + k: v for k, v in self._recent.items() + if now - v < max_age + } diff --git a/meshai/notifications/summarizer.py b/meshai/notifications/summarizer.py new file mode 100644 index 0000000..0364cff --- /dev/null +++ b/meshai/notifications/summarizer.py @@ -0,0 +1,64 @@ +"""Message summarizer for mesh delivery.""" + +import logging +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from ..backends import LLMBackend + +logger = logging.getLogger(__name__) + + +class MessageSummarizer: + """Summarizes long messages for mesh delivery. + + Only used when: + - Delivering to mesh channels (broadcast or DM) + - Message exceeds max_chars (default 200) + - LLM backend is available + + Email and webhook channels receive full messages. + """ + + def __init__(self, llm_backend: Optional["LLMBackend"] = None): + self._llm = llm_backend + + async def summarize(self, message: str, max_chars: int = 195) -> str: + """Summarize a message to fit within max_chars. + + Args: + message: Original message text + max_chars: Maximum characters for summary + + Returns: + Summarized message, or truncated original if LLM unavailable + """ + if len(message) <= max_chars: + return message + + if not self._llm: + return message[:max_chars - 3] + "..." + + prompt = ( + "Summarize this alert in under %d characters. " + "Keep severity, location, and key facts. No preamble, just the summary:\n\n%s" + % (max_chars, message) + ) + + try: + # Use the LLM to generate a summary + response = await self._llm.generate( + prompt, + system_prompt="You are a concise alert summarizer. Output only the summary, no explanation.", + max_tokens=100, + ) + summary = response.strip() + + # Ensure it fits + if len(summary) <= max_chars: + return summary + return summary[:max_chars - 3] + "..." + + except Exception as e: + logger.debug("LLM summarization failed: %s", e) + return message[:max_chars - 3] + "..."