fix(notifications): test button sends real data preview, not generic string

- Tests check current conditions against rule categories/severity
- Shows actual alert messages that would fire right now
- Falls back to example messages from category registry if no matches
- Preview mode shows without sending, Send Test delivers with [TEST] prefix
- Mesh delivery applies real summarization so preview matches actual output
- Added test dialog UI showing conditions matched and preview messages

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-13 23:32:22 +00:00
commit 0ad37e55d9
8 changed files with 1198 additions and 816 deletions

View file

@ -1098,7 +1098,17 @@ export default function Notifications() {
const [saving, setSaving] = useState(false)
const [error, setError] = useState<string | null>(null)
const [success, setSuccess] = useState<string | null>(null)
const [testResult, setTestResult] = useState<{ success: boolean; message: string } | null>(null)
const [testResult, setTestResult] = useState<{
success?: boolean;
message?: string;
conditions_matched?: number;
preview_messages?: string[];
is_example?: boolean;
delivered?: boolean;
delivery_method?: string;
delivery_result?: string;
} | null>(null)
const [testDialog, setTestDialog] = useState<{ open: boolean; ruleIndex: number; loading: boolean }>({ open: false, ruleIndex: -1, loading: false })
const [showTemplates, setShowTemplates] = useState(false)
const [hasChanges, setHasChanges] = useState(false)
@ -1223,17 +1233,46 @@ export default function Notifications() {
}
const testRule = async (index: number) => {
// Open dialog and show preview first
setTestDialog({ open: true, ruleIndex: index, loading: true })
try {
const res = await fetch(`/api/notifications/rules/${index}/test`, { method: 'POST' })
const res = await fetch(`/api/notifications/rules/${index}/test`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ send: false }) // Preview only
})
const result = await res.json()
setTestResult(result)
setTimeout(() => setTestResult(null), 5000)
setTestDialog(d => ({ ...d, loading: false }))
} catch {
setTestResult({ success: false, message: 'Test failed' })
setTimeout(() => setTestResult(null), 5000)
setTestResult({ success: false, message: 'Failed to get preview' })
setTestDialog(d => ({ ...d, loading: false }))
}
}
const sendTest = async () => {
const index = testDialog.ruleIndex
setTestDialog(d => ({ ...d, loading: true }))
try {
const res = await fetch(`/api/notifications/rules/${index}/test`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ send: true }) // Actually send
})
const result = await res.json()
setTestResult(result)
setTestDialog(d => ({ ...d, loading: false }))
} catch {
setTestResult({ success: false, message: 'Failed to send test' })
setTestDialog(d => ({ ...d, loading: false }))
}
}
const closeTestDialog = () => {
setTestDialog({ open: false, ruleIndex: -1, loading: false })
setTestResult(null)
}
if (loading) {
return (
<div className="flex items-center justify-center h-64">
@ -1252,6 +1291,85 @@ export default function Notifications() {
return (
<div className="max-w-4xl mx-auto space-y-6">
{/* Test Dialog */}
{testDialog.open && (
<div className="fixed inset-0 z-50 flex items-center justify-center bg-black/50">
<div className="bg-[#1a2332] border border-[#2a3a4a] rounded-lg shadow-xl max-w-lg w-full mx-4 max-h-[80vh] overflow-auto">
<div className="p-4 border-b border-[#2a3a4a] flex items-center justify-between">
<h3 className="text-lg font-semibold">Test Notification Rule</h3>
<button onClick={closeTestDialog} className="text-slate-500 hover:text-slate-300">
<X size={20} />
</button>
</div>
<div className="p-4 space-y-4">
{testDialog.loading ? (
<div className="flex items-center justify-center py-8">
<div className="text-slate-400">Checking conditions...</div>
</div>
) : testResult ? (
<>
{/* Conditions summary */}
<div className="flex items-center gap-2">
{testResult.conditions_matched && testResult.conditions_matched > 0 ? (
<span className="px-2 py-1 bg-green-500/20 text-green-400 rounded text-sm">
{testResult.conditions_matched} condition{testResult.conditions_matched !== 1 ? 's' : ''} match
</span>
) : (
<span className="px-2 py-1 bg-yellow-500/20 text-yellow-400 rounded text-sm">
No current matches showing examples
</span>
)}
</div>
{/* Preview messages */}
<div className="space-y-2">
<div className="text-sm text-slate-500">
{testResult.is_example ? 'Example messages:' : 'Current alerts that would fire:'}
</div>
{testResult.preview_messages?.map((msg, i) => (
<div key={i} className="p-3 bg-slate-800 rounded text-sm font-mono break-words">
{msg}
</div>
))}
</div>
{/* Delivery result */}
{testResult.delivered && (
<div className="p-3 bg-green-500/10 border border-green-500/30 rounded text-green-400 text-sm">
{testResult.delivery_result}
</div>
)}
{/* Legacy format support */}
{testResult.message && !testResult.preview_messages && (
<div className={`p-3 rounded text-sm ${testResult.success ? 'bg-green-500/10 text-green-400' : 'bg-red-500/10 text-red-400'}`}>
{testResult.message}
</div>
)}
</>
) : null}
</div>
<div className="p-4 border-t border-[#2a3a4a] flex justify-end gap-2">
<button
onClick={closeTestDialog}
className="px-4 py-2 text-slate-400 hover:text-slate-200"
>
Close
</button>
{testResult && !testResult.delivered && testResult.delivery_method && (
<button
onClick={sendTest}
disabled={testDialog.loading}
className="px-4 py-2 bg-accent hover:bg-accent/80 rounded disabled:opacity-50"
>
{testDialog.loading ? 'Sending...' : 'Send Test'}
</button>
)}
</div>
</div>
</div>
)}
{/* Header */}
<div className="flex items-center justify-between">
<div>

View file

@ -1,35 +1,84 @@
"""Notification API routes."""
from fastapi import APIRouter, Request, HTTPException
router = APIRouter(prefix="/notifications", tags=["notifications"])
@router.get("/categories")
async def get_categories():
"""Get all alert categories with descriptions."""
try:
from ...notifications.categories import list_categories
return list_categories()
except ImportError:
return []
@router.get("/rules")
async def get_rules(request: Request):
"""Get configured notification rules."""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
return []
return notification_router.get_rules()
@router.post("/rules/{rule_index}/test")
async def test_rule(request: Request, rule_index: int):
"""Send a test alert through a specific rule."""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
raise HTTPException(status_code=404, detail="Notification router not configured")
success, message = await notification_router.test_rule(rule_index)
return {"success": success, "message": message}
"""Notification API routes."""
from fastapi import APIRouter, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
router = APIRouter(prefix="/notifications", tags=["notifications"])
class TestRequest(BaseModel):
"""Request body for test endpoint."""
send: bool = False # True = actually deliver, False = preview only
@router.get("/categories")
async def get_categories():
"""Get all alert categories with descriptions."""
try:
from ...notifications.categories import list_categories
return list_categories()
except ImportError:
return []
@router.get("/rules")
async def get_rules(request: Request):
"""Get configured notification rules."""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
return []
return notification_router.get_rules()
@router.post("/rules/{rule_index}/test")
async def test_rule(request: Request, rule_index: int, body: Optional[TestRequest] = None):
"""Test a notification rule against current conditions.
Returns:
{
"conditions_matched": int, # Number of matching alerts
"preview_messages": list[str], # Messages that would send
"is_example": bool, # True if using example messages
"delivered": bool, # True if actually sent
"delivery_method": str, # e.g. "mesh_broadcast"
"delivery_result": str, # Result message
}
"""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
raise HTTPException(status_code=404, detail="Notification router not configured")
alert_engine = getattr(request.app.state, "alert_engine", None)
env_store = getattr(request.app.state, "env_store", None)
send = body.send if body else False
result = await notification_router.test_rule_with_conditions(
rule_index,
alert_engine=alert_engine,
env_store=env_store,
send=send,
)
return result
@router.post("/rules/{rule_index}/preview")
async def preview_rule(request: Request, rule_index: int):
"""Preview what a rule would match right now (without sending)."""
notification_router = getattr(request.app.state, "notification_router", None)
if not notification_router:
raise HTTPException(status_code=404, detail="Notification router not configured")
alert_engine = getattr(request.app.state, "alert_engine", None)
env_store = getattr(request.app.state, "env_store", None)
result = await notification_router.test_rule_with_conditions(
rule_index,
alert_engine=alert_engine,
env_store=env_store,
send=False,
)
return result

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -8,8 +8,8 @@
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&display=swap" rel="stylesheet">
<script type="module" crossorigin src="/assets/index-CYCOCObI.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-DbmGQdf0.css">
<script type="module" crossorigin src="/assets/index-BXyt_EfK.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-CtFYHJy4.css">
</head>
<body>
<div id="root"></div>

View file

@ -1,308 +1,317 @@
"""Notification channel implementations."""
import asyncio
import logging
import smtplib
import ssl
import time
from abc import ABC, abstractmethod
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional, TYPE_CHECKING
import httpx
if TYPE_CHECKING:
from ..connector import MeshConnector
logger = logging.getLogger(__name__)
class NotificationChannel(ABC):
"""Base class for notification delivery channels."""
channel_type: str = "base"
@abstractmethod
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert. Returns True on success."""
raise NotImplementedError
@abstractmethod
async def test(self) -> tuple[bool, str]:
"""Send test message. Returns (success, message)."""
raise NotImplementedError
class MeshBroadcastChannel(NotificationChannel):
"""Post alert to mesh channel."""
channel_type = "mesh_broadcast"
def __init__(self, connector: "MeshConnector", channel_index: int = 0):
self._connector = connector
self._channel = channel_index
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert to mesh channel."""
if not self._connector:
logger.warning("No mesh connector available")
return False
try:
message = alert.get("message", "")
self._connector.send_message(
text=message,
destination=None,
channel=self._channel,
)
logger.info("Broadcast alert to channel %d", self._channel)
return True
except Exception as e:
logger.error("Failed to broadcast alert: %s", e)
return False
async def test(self) -> tuple[bool, str]:
"""Send test broadcast."""
try:
self._connector.send_message(
text="[TEST] MeshAI notification system test",
destination=None,
channel=self._channel,
)
return True, "Test message sent to channel %d" % self._channel
except Exception as e:
return False, "Failed to send test: %s" % e
class MeshDMChannel(NotificationChannel):
"""DM alert to specific node IDs."""
channel_type = "mesh_dm"
def __init__(self, connector: "MeshConnector", node_ids: list[str]):
self._connector = connector
self._node_ids = node_ids
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via DM to configured nodes."""
if not self._connector:
return False
message = alert.get("message", "")
success = True
for node_id in self._node_ids:
try:
dest = int(node_id) if node_id.isdigit() else node_id
self._connector.send_message(text=message, destination=dest, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
success = False
return success
async def test(self) -> tuple[bool, str]:
"""Send test DM to all configured nodes."""
if not self._node_ids:
return False, "No node IDs configured"
try:
for node_id in self._node_ids:
dest = int(node_id) if node_id.isdigit() else node_id
self._connector.send_message(
text="[TEST] MeshAI notification test",
destination=dest,
channel=0,
)
return True, "Test DMs sent to %d nodes" % len(self._node_ids)
except Exception as e:
return False, "Failed to send test DMs: %s" % e
class EmailChannel(NotificationChannel):
"""Send alert via SMTP email."""
channel_type = "email"
def __init__(
self,
smtp_host: str,
smtp_port: int,
smtp_user: str,
smtp_password: str,
smtp_tls: bool,
from_address: str,
recipients: list[str],
):
self._host = smtp_host
self._port = smtp_port
self._user = smtp_user
self._password = smtp_password
self._tls = smtp_tls
self._from = from_address
self._recipients = recipients
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via email."""
if not self._recipients:
return False
alert_type = alert.get("type", "alert")
severity = alert.get("severity", "info").upper()
message = alert.get("message", "")
subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title())
body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % (
alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message
)
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._send_email, subject, body)
return True
except Exception as e:
logger.error("Failed to send email: %s", e)
return False
def _send_email(self, subject: str, body: str):
msg = MIMEMultipart()
msg["From"] = self._from
msg["To"] = ", ".join(self._recipients)
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
if self._tls:
context = ssl.create_default_context()
with smtplib.SMTP(self._host, self._port) as server:
server.starttls(context=context)
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
else:
with smtplib.SMTP(self._host, self._port) as server:
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
async def test(self) -> tuple[bool, str]:
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._send_email,
"[MeshAI TEST] Notification Test",
"Test message from MeshAI.",
)
return True, "Test email sent to %d recipients" % len(self._recipients)
except Exception as e:
return False, "Failed to send test email: %s" % e
class WebhookChannel(NotificationChannel):
"""POST alert JSON to a URL."""
channel_type = "webhook"
def __init__(self, url: str, headers: Optional[dict] = None):
self._url = url
self._headers = headers or {}
async def deliver(self, alert: dict, rule: dict) -> bool:
"""POST alert to webhook URL."""
payload = {
"type": alert.get("type"),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"timestamp": time.time(),
"node_name": alert.get("node_name"),
"region": alert.get("region"),
}
# Discord/Slack format
if "discord.com" in self._url or "slack.com" in self._url:
severity = alert.get("severity", "info")
color = {
"emergency": 0xFF0000,
"critical": 0xFF4444,
"warning": 0xFFAA00,
"info": 0x0099FF,
}.get(severity, 0x888888)
payload = {
"embeds": [{
"title": "MeshAI: %s" % alert.get("type", "unknown"),
"description": alert.get("message", ""),
"color": color,
}]
}
# ntfy format
elif "ntfy" in self._url:
headers = {
**self._headers,
"Title": "MeshAI: %s" % alert.get("type", "alert"),
"Priority": "3",
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
content=alert.get("message", ""),
headers=headers,
timeout=10,
)
return resp.status_code < 400
except Exception as e:
logger.error("Webhook failed: %s", e)
return False
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
json=payload,
headers={"Content-Type": "application/json", **self._headers},
timeout=10,
)
return resp.status_code < 400
except Exception as e:
logger.error("Webhook failed: %s", e)
return False
async def test(self) -> tuple[bool, str]:
test_alert = {"type": "test", "severity": "info", "message": "MeshAI test message"}
success = await self.deliver(test_alert, {})
if success:
return True, "Test sent to %s" % self._url
return False, "Webhook failed"
def create_channel(config: dict, connector=None) -> NotificationChannel:
"""Create a channel instance from config."""
channel_type = config.get("type", "")
if channel_type == "mesh_broadcast":
return MeshBroadcastChannel(
connector=connector,
channel_index=config.get("channel_index", 0),
)
elif channel_type == "mesh_dm":
return MeshDMChannel(
connector=connector,
node_ids=config.get("node_ids", []),
)
elif channel_type == "email":
return EmailChannel(
smtp_host=config.get("smtp_host", ""),
smtp_port=config.get("smtp_port", 587),
smtp_user=config.get("smtp_user", ""),
smtp_password=config.get("smtp_password", ""),
smtp_tls=config.get("smtp_tls", True),
from_address=config.get("from_address", ""),
recipients=config.get("recipients", []),
)
elif channel_type == "webhook":
return WebhookChannel(
url=config.get("url", ""),
headers=config.get("headers", {}),
)
else:
raise ValueError("Unknown channel type: %s" % channel_type)
"""Notification channel implementations."""
import asyncio
import logging
import smtplib
import ssl
import time
from abc import ABC, abstractmethod
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional, TYPE_CHECKING
import httpx
if TYPE_CHECKING:
from ..connector import MeshConnector
logger = logging.getLogger(__name__)
class NotificationChannel(ABC):
"""Base class for notification delivery channels."""
channel_type: str = "base"
@abstractmethod
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert. Returns True on success."""
raise NotImplementedError
@abstractmethod
async def test(self) -> tuple[bool, str]:
"""Send test message. Returns (success, message)."""
raise NotImplementedError
class MeshBroadcastChannel(NotificationChannel):
"""Post alert to mesh channel."""
channel_type = "mesh_broadcast"
def __init__(self, connector: "MeshConnector", channel_index: int = 0):
self._connector = connector
self._channel = channel_index
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert to mesh channel."""
if not self._connector:
logger.warning("No mesh connector available")
return False
try:
message = alert.get("message", "")
self._connector.send_message(
text=message,
destination=None,
channel=self._channel,
)
logger.info("Broadcast alert to channel %d", self._channel)
return True
except Exception as e:
logger.error("Failed to broadcast alert: %s", e)
return False
async def test(self) -> tuple[bool, str]:
"""Send test broadcast."""
try:
self._connector.send_message(
text="[TEST] MeshAI notification system test",
destination=None,
channel=self._channel,
)
return True, "Test message sent to channel %d" % self._channel
except Exception as e:
return False, "Failed to send test: %s" % e
class MeshDMChannel(NotificationChannel):
"""DM alert to specific node IDs."""
channel_type = "mesh_dm"
def __init__(self, connector: "MeshConnector", node_ids: list[str]):
self._connector = connector
self._node_ids = node_ids
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via DM to configured nodes."""
if not self._connector:
return False
message = alert.get("message", "")
success = True
for node_id in self._node_ids:
try:
dest = int(node_id) if node_id.isdigit() else node_id
self._connector.send_message(text=message, destination=dest, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
success = False
return success
async def test(self) -> tuple[bool, str]:
"""Send test DM to all configured nodes."""
if not self._node_ids:
return False, "No node IDs configured"
try:
for node_id in self._node_ids:
dest = int(node_id) if node_id.isdigit() else node_id
self._connector.send_message(
text="[TEST] MeshAI notification test",
destination=dest,
channel=0,
)
return True, "Test DMs sent to %d nodes" % len(self._node_ids)
except Exception as e:
return False, "Failed to send test DMs: %s" % e
class EmailChannel(NotificationChannel):
"""Send alert via SMTP email."""
channel_type = "email"
def __init__(
self,
smtp_host: str,
smtp_port: int,
smtp_user: str,
smtp_password: str,
smtp_tls: bool,
from_address: str,
recipients: list[str],
):
self._host = smtp_host
self._port = smtp_port
self._user = smtp_user
self._password = smtp_password
self._tls = smtp_tls
self._from = from_address
self._recipients = recipients
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via email."""
if not self._recipients:
return False
alert_type = alert.get("type", "alert")
severity = alert.get("severity", "info").upper()
message = alert.get("message", "")
subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title())
body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % (
alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message
)
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._send_email, subject, body)
return True
except Exception as e:
logger.error("Failed to send email: %s", e)
return False
def _send_email(self, subject: str, body: str):
msg = MIMEMultipart()
msg["From"] = self._from
msg["To"] = ", ".join(self._recipients)
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
if self._tls:
context = ssl.create_default_context()
with smtplib.SMTP(self._host, self._port) as server:
server.starttls(context=context)
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
else:
with smtplib.SMTP(self._host, self._port) as server:
if self._user and self._password:
server.login(self._user, self._password)
server.sendmail(self._from, self._recipients, msg.as_string())
async def test(self) -> tuple[bool, str]:
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._send_email,
"[MeshAI TEST] Notification Test",
"Test message from MeshAI.",
)
return True, "Test email sent to %d recipients" % len(self._recipients)
except Exception as e:
return False, "Failed to send test email: %s" % e
class WebhookChannel(NotificationChannel):
"""POST alert JSON to a URL."""
channel_type = "webhook"
def __init__(self, url: str, headers: Optional[dict] = None):
self._url = url
self._headers = headers or {}
async def deliver(self, alert: dict, rule: dict) -> bool:
"""POST alert to webhook URL."""
payload = {
"type": alert.get("type"),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"timestamp": time.time(),
"node_name": alert.get("node_name"),
"region": alert.get("region"),
}
# Discord/Slack format
if "discord.com" in self._url or "slack.com" in self._url:
severity = alert.get("severity", "info")
color = {
"emergency": 0xFF0000,
"critical": 0xFF4444,
"warning": 0xFFAA00,
"info": 0x0099FF,
}.get(severity, 0x888888)
payload = {
"embeds": [{
"title": "MeshAI: %s" % alert.get("type", "unknown"),
"description": alert.get("message", ""),
"color": color,
}]
}
# ntfy format
elif "ntfy" in self._url:
headers = {
**self._headers,
"Title": "MeshAI: %s" % alert.get("type", "alert"),
"Priority": "3",
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
content=alert.get("message", ""),
headers=headers,
timeout=10,
)
return resp.status_code < 400
except Exception as e:
logger.error("Webhook failed: %s", e)
return False
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
json=payload,
headers={"Content-Type": "application/json", **self._headers},
timeout=10,
)
return resp.status_code < 400
except Exception as e:
logger.error("Webhook failed: %s", e)
return False
async def test(self) -> tuple[bool, str]:
test_alert = {"type": "test", "severity": "info", "message": "MeshAI test message"}
success = await self.deliver(test_alert, {})
if success:
return True, "Test sent to %s" % self._url
async def deliver_test(self, message: str) -> bool:
"""Deliver a specific test message via webhook."""
try:
test_alert = {"type": "test", "severity": "info", "message": message}
return await self.deliver(test_alert, {})
except Exception as e:
logger.warning("Webhook test failed: %s", e)
return False
return False, "Webhook failed"
def create_channel(config: dict, connector=None) -> NotificationChannel:
"""Create a channel instance from config."""
channel_type = config.get("type", "")
if channel_type == "mesh_broadcast":
return MeshBroadcastChannel(
connector=connector,
channel_index=config.get("channel_index", 0),
)
elif channel_type == "mesh_dm":
return MeshDMChannel(
connector=connector,
node_ids=config.get("node_ids", []),
)
elif channel_type == "email":
return EmailChannel(
smtp_host=config.get("smtp_host", ""),
smtp_port=config.get("smtp_port", 587),
smtp_user=config.get("smtp_user", ""),
smtp_password=config.get("smtp_password", ""),
smtp_tls=config.get("smtp_tls", True),
from_address=config.get("from_address", ""),
recipients=config.get("recipients", []),
)
elif channel_type == "webhook":
return WebhookChannel(
url=config.get("url", ""),
headers=config.get("headers", {}),
)
else:
raise ValueError("Unknown channel type: %s" % channel_type)

View file

@ -1,354 +1,560 @@
"""Notification router - matches alerts to rules and delivers via channels."""
import asyncio
import logging
import time
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from .channels import create_channel, NotificationChannel
from .summarizer import MessageSummarizer
if TYPE_CHECKING:
from ..connector import MeshConnector
logger = logging.getLogger(__name__)
# Severity levels in order
SEVERITY_ORDER = ["info", "advisory", "watch", "warning", "critical", "emergency"]
class NotificationRouter:
"""Routes alerts through matching rules to notification channels."""
def __init__(
self,
config,
connector: Optional["MeshConnector"] = None,
llm_backend=None,
timezone: str = "America/Boise",
):
self._rules: list[dict] = []
self._quiet_enabled = getattr(config, "quiet_hours_enabled", True)
self._quiet_start = getattr(config, "quiet_hours_start", "22:00")
self._quiet_end = getattr(config, "quiet_hours_end", "06:00")
self._timezone = timezone
self._recent: dict[tuple, float] = {} # (rule_name, category, event_key) -> last_sent_time
self._summarizer = MessageSummarizer(llm_backend) if llm_backend else None
self._connector = connector
self._config = config
# Load rules from config
rules_config = getattr(config, "rules", [])
for rule in rules_config:
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule) if isinstance(rule, dict) else {}
# Skip disabled rules
if not rule_dict.get("enabled", True):
continue
# Only load condition-triggered rules (scheduled rules handled by scheduler)
if rule_dict.get("trigger_type", "condition") == "condition":
self._rules.append(rule_dict)
logger.info("Notification router initialized: %d condition rules", len(self._rules))
def _create_channel_for_rule(self, rule: dict) -> Optional[NotificationChannel]:
"""Create a channel instance from a rule's inline delivery config.
Returns None if delivery_type is empty or invalid.
"""
delivery_type = rule.get("delivery_type", "")
# Empty delivery type is valid - rule exists but doesn't deliver
if not delivery_type:
return None
if delivery_type == "mesh_broadcast":
config = {
"type": "mesh_broadcast",
"channel_index": rule.get("broadcast_channel", 0),
}
elif delivery_type == "mesh_dm":
config = {
"type": "mesh_dm",
"node_ids": rule.get("node_ids", []),
}
elif delivery_type == "email":
config = {
"type": "email",
"smtp_host": rule.get("smtp_host", ""),
"smtp_port": rule.get("smtp_port", 587),
"smtp_user": rule.get("smtp_user", ""),
"smtp_password": rule.get("smtp_password", ""),
"smtp_tls": rule.get("smtp_tls", True),
"from_address": rule.get("from_address", ""),
"recipients": rule.get("recipients", []),
}
elif delivery_type == "webhook":
config = {
"type": "webhook",
"url": rule.get("webhook_url", ""),
"headers": rule.get("webhook_headers", {}),
}
else:
logger.warning("Unknown delivery type '%s' in rule '%s'", delivery_type, rule.get("name"))
return None
try:
return create_channel(config, self._connector)
except Exception as e:
logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e)
return None
async def process_alert(self, alert: dict) -> bool:
"""Route an alert through matching rules.
Returns True if alert was delivered to at least one channel.
"""
category = alert.get("type", "")
severity = alert.get("severity", "info")
delivered = False
for rule in self._rules:
rule_name = rule.get("name", "unnamed")
# Check category match
rule_categories = rule.get("categories", [])
if rule_categories and category not in rule_categories:
continue
# Check severity threshold
min_severity = rule.get("min_severity", "info")
if not self._severity_meets(severity, min_severity):
continue
# Check quiet hours (only if quiet hours are enabled globally)
if self._quiet_enabled and self._in_quiet_hours():
# Emergencies and criticals always go through
if severity not in ("emergency", "critical"):
# Check if rule overrides quiet hours
if not rule.get("override_quiet", False):
logger.debug("Skipping alert (quiet hours): %s via %s", category, rule_name)
continue
# Check cooldown
cooldown = rule.get("cooldown_minutes", 10) * 60
event_id = alert.get("event_id", alert.get("message", "")[:50])
dedup_key = (rule_name, category, event_id)
now = time.time()
if dedup_key in self._recent:
if now - self._recent[dedup_key] < cooldown:
logger.debug("Skipping alert (cooldown): %s via %s", category, rule_name)
continue
self._recent[dedup_key] = now
# Log rule match
logger.info("Rule '%s' matched alert: %s (%s)", rule_name, category, severity)
# Check if rule has delivery configured
delivery_type = rule.get("delivery_type", "")
if not delivery_type:
logger.info("Rule '%s' matched but has no delivery configured", rule_name)
continue
# Create channel and deliver
channel = self._create_channel_for_rule(rule)
if not channel:
logger.warning("Rule '%s' failed to create delivery channel", rule_name)
continue
try:
# Summarize for mesh channels if over 200 chars
delivery_alert = alert
message = alert.get("message", "")
if channel.channel_type in ("mesh_broadcast", "mesh_dm"):
if len(message) > 200:
if self._summarizer:
summary = await self._summarizer.summarize(message, max_chars=195)
delivery_alert = {**alert, "message": summary}
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
logger.info("Alert delivered via rule '%s': %s", rule_name, category)
except Exception as e:
logger.warning("Rule '%s' delivery failed: %s", rule_name, e)
return delivered
def _severity_meets(self, actual: str, required: str) -> bool:
"""Check if actual severity meets or exceeds required severity."""
try:
actual_idx = SEVERITY_ORDER.index(actual.lower())
required_idx = SEVERITY_ORDER.index(required.lower())
return actual_idx >= required_idx
except ValueError:
return True # Unknown severity, allow through
def _in_quiet_hours(self) -> bool:
"""Check if current time is within quiet hours."""
if not self._quiet_enabled:
return False
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(self._timezone)
now = datetime.now(tz)
current_time = now.strftime("%H:%M")
start = self._quiet_start
end = self._quiet_end
if start <= end:
# Simple range (e.g., 01:00 to 06:00)
return start <= current_time <= end
else:
# Crosses midnight (e.g., 22:00 to 06:00)
return current_time >= start or current_time <= end
except Exception:
return False
def get_rules(self) -> list[dict]:
"""Get list of configured rules."""
return self._rules
async def test_rule(self, rule_index: int) -> tuple[bool, str]:
"""Send a test alert through a specific rule."""
rules_config = getattr(self._config, "rules", [])
if rule_index < 0 or rule_index >= len(rules_config):
return False, "Rule index out of range"
rule = rules_config[rule_index]
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule)
# Check if delivery is configured
if not rule_dict.get("delivery_type"):
return False, "No delivery method configured for this rule"
channel = self._create_channel_for_rule(rule_dict)
if not channel:
return False, "Failed to create delivery channel"
return await channel.test()
async def preview_rule(self, rule_index: int) -> dict:
"""Preview what a rule would match right now.
Returns:
{
"matches": bool,
"conditions": [...], # Current conditions that match
"preview": str, # Example message
}
"""
rules_config = getattr(self._config, "rules", [])
if rule_index < 0 or rule_index >= len(rules_config):
return {"matches": False, "conditions": [], "preview": "Invalid rule index"}
rule = rules_config[rule_index]
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule)
# For condition rules, show example based on categories
if rule_dict.get("trigger_type", "condition") == "condition":
from .categories import get_category
categories = rule_dict.get("categories", [])
if not categories:
# All categories - show first example
example = get_category("infra_offline")
return {
"matches": True,
"conditions": ["All alert categories"],
"preview": example.get("example_message", "Alert notification"),
}
else:
# Show example from first category
cat_info = get_category(categories[0])
return {
"matches": True,
"conditions": [get_category(c)["name"] for c in categories],
"preview": cat_info.get("example_message", f"Alert: {categories[0]}"),
}
# For schedule rules, generate preview report
elif rule_dict.get("trigger_type") == "schedule":
message_type = rule_dict.get("message_type", "mesh_health_summary")
return {
"matches": True,
"conditions": [f"Scheduled: {rule_dict.get('schedule_frequency', 'daily')}"],
"preview": f"[{message_type}] Report content would appear here",
}
return {"matches": False, "conditions": [], "preview": "Unknown rule type"}
def add_mesh_subscription(
self,
node_id: str,
categories: list[str],
rule_name: Optional[str] = None,
) -> str:
"""Add a mesh DM subscription for a node.
Creates a rule for the node to receive alerts.
Returns the rule name.
"""
if not rule_name:
rule_name = "sub_%s" % node_id
# Check if rule already exists
for rule in self._rules:
if rule.get("name") == rule_name:
# Update existing rule
rule["categories"] = categories if categories else []
rule["node_ids"] = [node_id]
return rule_name
# Add new rule
self._rules.append({
"name": rule_name,
"enabled": True,
"trigger_type": "condition",
"categories": categories if categories else [], # Empty = all
"min_severity": "warning",
"delivery_type": "mesh_dm",
"node_ids": [node_id],
"cooldown_minutes": 10,
"override_quiet": False,
})
return rule_name
def remove_mesh_subscription(self, node_id: str) -> bool:
"""Remove a mesh subscription for a node."""
rule_name = "sub_%s" % node_id
self._rules = [r for r in self._rules if r.get("name") != rule_name]
return True
def get_node_subscriptions(self, node_id: str) -> list[str]:
"""Get categories a node is subscribed to."""
rule_name = "sub_%s" % node_id
for rule in self._rules:
if rule.get("name") == rule_name:
categories = rule.get("categories", [])
return categories if categories else ["all"]
return []
def cleanup_recent(self, max_age: int = 3600):
"""Clean up old entries from recent alerts cache."""
now = time.time()
self._recent = {
k: v for k, v in self._recent.items()
if now - v < max_age
}
"""Notification router - matches alerts to rules and delivers via channels."""
import asyncio
import logging
import time
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from .channels import create_channel, NotificationChannel
from .summarizer import MessageSummarizer
if TYPE_CHECKING:
from ..connector import MeshConnector
logger = logging.getLogger(__name__)
# Severity levels in order
SEVERITY_ORDER = ["info", "advisory", "watch", "warning", "critical", "emergency"]
class NotificationRouter:
"""Routes alerts through matching rules to notification channels."""
def __init__(
self,
config,
connector: Optional["MeshConnector"] = None,
llm_backend=None,
timezone: str = "America/Boise",
):
self._rules: list[dict] = []
self._quiet_enabled = getattr(config, "quiet_hours_enabled", True)
self._quiet_start = getattr(config, "quiet_hours_start", "22:00")
self._quiet_end = getattr(config, "quiet_hours_end", "06:00")
self._timezone = timezone
self._recent: dict[tuple, float] = {} # (rule_name, category, event_key) -> last_sent_time
self._summarizer = MessageSummarizer(llm_backend) if llm_backend else None
self._connector = connector
self._config = config
# Load rules from config
rules_config = getattr(config, "rules", [])
for rule in rules_config:
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule) if isinstance(rule, dict) else {}
# Skip disabled rules
if not rule_dict.get("enabled", True):
continue
# Only load condition-triggered rules (scheduled rules handled by scheduler)
if rule_dict.get("trigger_type", "condition") == "condition":
self._rules.append(rule_dict)
logger.info("Notification router initialized: %d condition rules", len(self._rules))
def _create_channel_for_rule(self, rule: dict) -> Optional[NotificationChannel]:
"""Create a channel instance from a rule's inline delivery config.
Returns None if delivery_type is empty or invalid.
"""
delivery_type = rule.get("delivery_type", "")
# Empty delivery type is valid - rule exists but doesn't deliver
if not delivery_type:
return None
if delivery_type == "mesh_broadcast":
config = {
"type": "mesh_broadcast",
"channel_index": rule.get("broadcast_channel", 0),
}
elif delivery_type == "mesh_dm":
config = {
"type": "mesh_dm",
"node_ids": rule.get("node_ids", []),
}
elif delivery_type == "email":
config = {
"type": "email",
"smtp_host": rule.get("smtp_host", ""),
"smtp_port": rule.get("smtp_port", 587),
"smtp_user": rule.get("smtp_user", ""),
"smtp_password": rule.get("smtp_password", ""),
"smtp_tls": rule.get("smtp_tls", True),
"from_address": rule.get("from_address", ""),
"recipients": rule.get("recipients", []),
}
elif delivery_type == "webhook":
config = {
"type": "webhook",
"url": rule.get("webhook_url", ""),
"headers": rule.get("webhook_headers", {}),
}
else:
logger.warning("Unknown delivery type '%s' in rule '%s'", delivery_type, rule.get("name"))
return None
try:
return create_channel(config, self._connector)
except Exception as e:
logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e)
return None
async def process_alert(self, alert: dict) -> bool:
"""Route an alert through matching rules.
Returns True if alert was delivered to at least one channel.
"""
category = alert.get("type", "")
severity = alert.get("severity", "info")
delivered = False
for rule in self._rules:
rule_name = rule.get("name", "unnamed")
# Check category match
rule_categories = rule.get("categories", [])
if rule_categories and category not in rule_categories:
continue
# Check severity threshold
min_severity = rule.get("min_severity", "info")
if not self._severity_meets(severity, min_severity):
continue
# Check quiet hours (only if quiet hours are enabled globally)
if self._quiet_enabled and self._in_quiet_hours():
# Emergencies and criticals always go through
if severity not in ("emergency", "critical"):
# Check if rule overrides quiet hours
if not rule.get("override_quiet", False):
logger.debug("Skipping alert (quiet hours): %s via %s", category, rule_name)
continue
# Check cooldown
cooldown = rule.get("cooldown_minutes", 10) * 60
event_id = alert.get("event_id", alert.get("message", "")[:50])
dedup_key = (rule_name, category, event_id)
now = time.time()
if dedup_key in self._recent:
if now - self._recent[dedup_key] < cooldown:
logger.debug("Skipping alert (cooldown): %s via %s", category, rule_name)
continue
self._recent[dedup_key] = now
# Log rule match
logger.info("Rule '%s' matched alert: %s (%s)", rule_name, category, severity)
# Check if rule has delivery configured
delivery_type = rule.get("delivery_type", "")
if not delivery_type:
logger.info("Rule '%s' matched but has no delivery configured", rule_name)
continue
# Create channel and deliver
channel = self._create_channel_for_rule(rule)
if not channel:
logger.warning("Rule '%s' failed to create delivery channel", rule_name)
continue
try:
# Summarize for mesh channels if over 200 chars
delivery_alert = alert
message = alert.get("message", "")
if channel.channel_type in ("mesh_broadcast", "mesh_dm"):
if len(message) > 200:
if self._summarizer:
summary = await self._summarizer.summarize(message, max_chars=195)
delivery_alert = {**alert, "message": summary}
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
logger.info("Alert delivered via rule '%s': %s", rule_name, category)
except Exception as e:
logger.warning("Rule '%s' delivery failed: %s", rule_name, e)
return delivered
def _severity_meets(self, actual: str, required: str) -> bool:
"""Check if actual severity meets or exceeds required severity."""
try:
actual_idx = SEVERITY_ORDER.index(actual.lower())
required_idx = SEVERITY_ORDER.index(required.lower())
return actual_idx >= required_idx
except ValueError:
return True # Unknown severity, allow through
def _in_quiet_hours(self) -> bool:
"""Check if current time is within quiet hours."""
if not self._quiet_enabled:
return False
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(self._timezone)
now = datetime.now(tz)
current_time = now.strftime("%H:%M")
start = self._quiet_start
end = self._quiet_end
if start <= end:
# Simple range (e.g., 01:00 to 06:00)
return start <= current_time <= end
else:
# Crosses midnight (e.g., 22:00 to 06:00)
return current_time >= start or current_time <= end
except Exception:
return False
def get_rules(self) -> list[dict]:
"""Get list of configured rules."""
return self._rules
async def test_rule(self, rule_index: int) -> tuple[bool, str]:
"""Send a test alert through a specific rule (legacy method)."""
result = await self.test_rule_with_conditions(rule_index, send=True)
return result.get("delivered", False), result.get("delivery_result", "Unknown")
async def test_rule_with_conditions(
self,
rule_index: int,
alert_engine=None,
env_store=None,
send: bool = False,
) -> dict:
"""Test a rule against current conditions.
Args:
rule_index: Index of the rule to test
alert_engine: AlertEngine instance for pending alerts
env_store: EnvStore instance for environmental events
send: Whether to actually deliver (True) or just preview (False)
Returns:
{
"conditions_matched": int,
"preview_messages": list[str],
"is_example": bool,
"delivered": bool,
"delivery_method": str,
"delivery_result": str,
}
"""
from .categories import get_category
rules_config = getattr(self._config, "rules", [])
if rule_index < 0 or rule_index >= len(rules_config):
return {
"conditions_matched": 0,
"preview_messages": [],
"is_example": False,
"delivered": False,
"delivery_method": "",
"delivery_result": "Rule index out of range",
}
rule = rules_config[rule_index]
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule)
rule_categories = rule_dict.get("categories", [])
min_severity = rule_dict.get("min_severity", "info")
delivery_type = rule_dict.get("delivery_type", "")
# Collect matching alerts from alert_engine
matching_alerts = []
if alert_engine and hasattr(alert_engine, "get_pending_alerts"):
try:
for alert in alert_engine.get_pending_alerts():
category = alert.get("type", "")
severity = alert.get("severity", "info")
# Check category match
if rule_categories and category not in rule_categories:
continue
# Check severity threshold
if not self._severity_meets(severity, min_severity):
continue
matching_alerts.append(alert)
except Exception as e:
logger.warning("Error getting pending alerts: %s", e)
# Collect matching env events
if env_store and hasattr(env_store, "get_active"):
try:
# Map category prefixes to env sources
source_map = {
"weather_": "nws",
"fire_": "nifc",
"wildfire_": "nifc",
"new_ignition": "firms",
"stream_": "usgs",
"road_": "here",
"traffic_": "here",
"avalanche_": "avy",
"hf_blackout": "swpc",
"geomagnetic_": "swpc",
"tropospheric_": "ducting",
}
# Get all active events
all_events = env_store.get_active()
for event in all_events:
event_type = event.get("type", "")
severity = event.get("severity", "info")
# Try to match to a category
matched_category = None
for cat in rule_categories if rule_categories else list(source_map.keys()):
if event_type.startswith(cat.rstrip("_")) or cat in event_type:
matched_category = cat
break
if rule_categories and not matched_category:
continue
# Check severity
if not self._severity_meets(severity, min_severity):
continue
# Convert to alert format
matching_alerts.append({
"type": event_type,
"severity": severity,
"message": event.get("message", event.get("summary", str(event))),
})
except Exception as e:
logger.warning("Error getting env events: %s", e)
# Build preview messages
preview_messages = []
is_example = False
if matching_alerts:
# Use real alerts
for alert in matching_alerts[:5]: # Limit to 5
msg = alert.get("message", "")
if len(msg) > 200 and delivery_type in ("mesh_broadcast", "mesh_dm"):
# Truncate for mesh delivery preview
msg = msg[:195] + "..."
preview_messages.append(msg)
else:
# No matches - use example messages
is_example = True
if rule_categories:
for cat_id in rule_categories[:3]:
cat_info = get_category(cat_id)
example = cat_info.get("example_message", f"Alert: {cat_id}")
preview_messages.append(f"[EXAMPLE] {example}")
else:
# Rule matches all categories - show generic example
cat_info = get_category("infra_offline")
preview_messages.append(f"[EXAMPLE] {cat_info.get('example_message', 'Alert notification')}")
# Check if delivery is configured
if not delivery_type:
return {
"conditions_matched": len(matching_alerts),
"preview_messages": preview_messages,
"is_example": is_example,
"delivered": False,
"delivery_method": "",
"delivery_result": "No delivery method configured for this rule",
}
# Create channel
channel = self._create_channel_for_rule(rule_dict)
if not channel:
return {
"conditions_matched": len(matching_alerts),
"preview_messages": preview_messages,
"is_example": is_example,
"delivered": False,
"delivery_method": delivery_type,
"delivery_result": "Failed to create delivery channel",
}
# If not sending, just return preview
if not send:
return {
"conditions_matched": len(matching_alerts),
"preview_messages": preview_messages,
"is_example": is_example,
"delivered": False,
"delivery_method": delivery_type,
"delivery_result": "Preview only - use send=true to deliver",
}
# Actually send the test message
try:
# Pick the first message to send
if preview_messages:
test_message = preview_messages[0]
if not test_message.startswith("["):
test_message = f"[TEST] {test_message}"
elif test_message.startswith("[EXAMPLE]"):
test_message = test_message.replace("[EXAMPLE]", "[TEST]")
else:
test_message = "[TEST] MeshAI notification test"
# Send through channel with the real message
success = await channel.deliver_test(test_message)
if success:
delivery_result = f"Sent to {delivery_type}"
if delivery_type == "mesh_broadcast":
delivery_result = f"Sent to channel {rule_dict.get('broadcast_channel', 0)}"
elif delivery_type == "mesh_dm":
node_count = len(rule_dict.get("node_ids", []))
delivery_result = f"Sent DM to {node_count} node(s)"
elif delivery_type == "email":
recipient_count = len(rule_dict.get("recipients", []))
delivery_result = f"Sent to {recipient_count} recipient(s)"
else:
delivery_result = "Delivery failed"
return {
"conditions_matched": len(matching_alerts),
"preview_messages": preview_messages,
"is_example": is_example,
"delivered": success,
"delivery_method": delivery_type,
"delivery_result": delivery_result,
}
except Exception as e:
logger.warning("Test delivery failed: %s", e)
return {
"conditions_matched": len(matching_alerts),
"preview_messages": preview_messages,
"is_example": is_example,
"delivered": False,
"delivery_method": delivery_type,
"delivery_result": f"Delivery error: {e}",
}
async def preview_rule(self, rule_index: int) -> dict:
"""Preview what a rule would match right now.
Returns:
{
"matches": bool,
"conditions": [...], # Current conditions that match
"preview": str, # Example message
}
"""
rules_config = getattr(self._config, "rules", [])
if rule_index < 0 or rule_index >= len(rules_config):
return {"matches": False, "conditions": [], "preview": "Invalid rule index"}
rule = rules_config[rule_index]
if hasattr(rule, "__dict__"):
rule_dict = {k: v for k, v in rule.__dict__.items() if not k.startswith("_")}
else:
rule_dict = dict(rule)
# For condition rules, show example based on categories
if rule_dict.get("trigger_type", "condition") == "condition":
from .categories import get_category
categories = rule_dict.get("categories", [])
if not categories:
# All categories - show first example
example = get_category("infra_offline")
return {
"matches": True,
"conditions": ["All alert categories"],
"preview": example.get("example_message", "Alert notification"),
}
else:
# Show example from first category
cat_info = get_category(categories[0])
return {
"matches": True,
"conditions": [get_category(c)["name"] for c in categories],
"preview": cat_info.get("example_message", f"Alert: {categories[0]}"),
}
# For schedule rules, generate preview report
elif rule_dict.get("trigger_type") == "schedule":
message_type = rule_dict.get("message_type", "mesh_health_summary")
return {
"matches": True,
"conditions": [f"Scheduled: {rule_dict.get('schedule_frequency', 'daily')}"],
"preview": f"[{message_type}] Report content would appear here",
}
return {"matches": False, "conditions": [], "preview": "Unknown rule type"}
def add_mesh_subscription(
self,
node_id: str,
categories: list[str],
rule_name: Optional[str] = None,
) -> str:
"""Add a mesh DM subscription for a node.
Creates a rule for the node to receive alerts.
Returns the rule name.
"""
if not rule_name:
rule_name = "sub_%s" % node_id
# Check if rule already exists
for rule in self._rules:
if rule.get("name") == rule_name:
# Update existing rule
rule["categories"] = categories if categories else []
rule["node_ids"] = [node_id]
return rule_name
# Add new rule
self._rules.append({
"name": rule_name,
"enabled": True,
"trigger_type": "condition",
"categories": categories if categories else [], # Empty = all
"min_severity": "warning",
"delivery_type": "mesh_dm",
"node_ids": [node_id],
"cooldown_minutes": 10,
"override_quiet": False,
})
return rule_name
def remove_mesh_subscription(self, node_id: str) -> bool:
"""Remove a mesh subscription for a node."""
rule_name = "sub_%s" % node_id
self._rules = [r for r in self._rules if r.get("name") != rule_name]
return True
def get_node_subscriptions(self, node_id: str) -> list[str]:
"""Get categories a node is subscribed to."""
rule_name = "sub_%s" % node_id
for rule in self._rules:
if rule.get("name") == rule_name:
categories = rule.get("categories", [])
return categories if categories else ["all"]
return []
def cleanup_recent(self, max_age: int = 3600):
"""Clean up old entries from recent alerts cache."""
now = time.time()
self._recent = {
k: v for k, v in self._recent.items()
if now - v < max_age
}