mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-10 17:04:45 +02:00
Compare commits
No commits in common. "9c5a106c9fe08b690a20c052d90685f2e21b1064" and "d6bc6b2b8973a5408c5267aa20d53161ccbd305e" have entirely different histories.
9c5a106c9f
...
d6bc6b2b89
26 changed files with 1147 additions and 2622 deletions
|
|
@ -484,14 +484,6 @@ class NotificationRuleConfig:
|
|||
channel_ids: list = field(default_factory=list)
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class TogglesConfig:
|
||||
"""Master toggle filter settings."""
|
||||
|
||||
enabled: list[str] = field(default_factory=list) # Toggle names that are enabled (empty = all)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DigestConfig:
|
||||
"""Digest scheduler settings."""
|
||||
|
|
@ -508,7 +500,6 @@ class NotificationsConfig:
|
|||
quiet_hours_enabled: bool = True # Master toggle for quiet hours
|
||||
quiet_hours_start: str = "22:00"
|
||||
quiet_hours_end: str = "06:00"
|
||||
toggles: TogglesConfig = field(default_factory=TogglesConfig)
|
||||
digest: DigestConfig = field(default_factory=DigestConfig)
|
||||
rules: list = field(default_factory=list) # List of NotificationRuleConfig
|
||||
|
||||
|
|
@ -681,8 +672,6 @@ def _dict_to_dataclass(cls, data: dict):
|
|||
kwargs[key] = _dict_to_dataclass(FIRMSConfig, value)
|
||||
elif key == "dashboard" and isinstance(value, dict):
|
||||
kwargs[key] = _dict_to_dataclass(DashboardConfig, value)
|
||||
elif key == "toggles" and isinstance(value, dict):
|
||||
kwargs[key] = _dict_to_dataclass(TogglesConfig, value)
|
||||
elif key == "digest" and isinstance(value, dict):
|
||||
kwargs[key] = _dict_to_dataclass(DigestConfig, value)
|
||||
elif key == "notifications" and isinstance(value, dict):
|
||||
|
|
|
|||
60
meshai/env/firms.py
vendored
60
meshai/env/firms.py
vendored
|
|
@ -3,12 +3,10 @@
|
|||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from meshai.notifications.events import Event, make_event
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..config import FIRMSConfig
|
||||
|
||||
|
|
@ -344,62 +342,6 @@ class FIRMSAdapter:
|
|||
|
||||
return (None, None)
|
||||
|
||||
def to_event(self, evt: dict) -> Optional["Event"]:
|
||||
"""Translate a stored FIRMS event dict into a pipeline Event.
|
||||
|
||||
Args:
|
||||
evt: Internal event dict from get_events()
|
||||
|
||||
Returns:
|
||||
Event instance ready for EventBus emission, or None if
|
||||
the dict is missing required fields (lat/lon).
|
||||
"""
|
||||
try:
|
||||
lat = evt.get("lat")
|
||||
lon = evt.get("lon")
|
||||
if lat is None or lon is None:
|
||||
return None # Can't make a useful Event without coords
|
||||
|
||||
props = evt.get("properties", {}) or {}
|
||||
is_new_ignition = bool(props.get("new_ignition", False))
|
||||
category = "new_ignition" if is_new_ignition else "wildfire_proximity"
|
||||
|
||||
severity = evt.get("severity", "routine")
|
||||
|
||||
title = evt.get("headline", "") or "Fire Hotspot"
|
||||
|
||||
# Build a richer summary including FRP, confidence, distance
|
||||
summary_parts = [title]
|
||||
if props.get("frp") is not None:
|
||||
summary_parts.append(f"FRP {int(props['frp'])} MW")
|
||||
if props.get("confidence"):
|
||||
summary_parts.append(f"conf {props['confidence']}")
|
||||
if props.get("distance_km") is not None and props.get("nearest_anchor"):
|
||||
summary_parts.append(
|
||||
f"{int(props['distance_km'])} km from {props['nearest_anchor']}"
|
||||
)
|
||||
summary = " | ".join(summary_parts)[:300]
|
||||
|
||||
spatial_key = f"firms:{round(lat, 2):.2f}:{round(lon, 2):.2f}"
|
||||
|
||||
return make_event(
|
||||
source="firms",
|
||||
category=category,
|
||||
severity=severity,
|
||||
title=title,
|
||||
summary=summary,
|
||||
timestamp=evt.get("fetched_at"),
|
||||
expires=evt.get("expires"),
|
||||
region=props.get("nearest_anchor"),
|
||||
lat=lat,
|
||||
lon=lon,
|
||||
group_key=spatial_key,
|
||||
inhibit_keys=[spatial_key],
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(f"FIRMS to_event failed for evt: {evt.get('event_id')}")
|
||||
return None
|
||||
|
||||
def get_events(self) -> list:
|
||||
"""Get current hotspot events."""
|
||||
return self._events
|
||||
|
|
|
|||
69
meshai/env/nws.py
vendored
69
meshai/env/nws.py
vendored
|
|
@ -4,12 +4,10 @@ import json
|
|||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from meshai.notifications.events import Event, make_event
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..config import NWSConfig
|
||||
|
||||
|
|
@ -41,71 +39,6 @@ class NWSAlertsAdapter:
|
|||
else: # moderate, minor, unknown
|
||||
return "routine"
|
||||
|
||||
def _derive_category(self, event_type: str) -> str:
|
||||
"""Derive notification category from NWS event type suffix.
|
||||
|
||||
NWS event types like "Red Flag Warning", "Winter Storm Watch",
|
||||
"Wind Advisory" map to our fine-grained weather categories.
|
||||
|
||||
Args:
|
||||
event_type: NWS event type string (e.g., "Tornado Warning")
|
||||
|
||||
Returns:
|
||||
Category key: weather_warning, weather_watch, weather_advisory,
|
||||
or weather_statement
|
||||
"""
|
||||
event_type_lower = event_type.lower()
|
||||
if event_type_lower.endswith("warning"):
|
||||
return "weather_warning"
|
||||
elif event_type_lower.endswith("watch"):
|
||||
return "weather_watch"
|
||||
elif event_type_lower.endswith("advisory"):
|
||||
return "weather_advisory"
|
||||
else:
|
||||
# Covers "Special Weather Statement", "Short Term Forecast", etc.
|
||||
return "weather_statement"
|
||||
|
||||
def to_event(self, raw: dict) -> Event:
|
||||
"""Convert internal event dict to pipeline Event.
|
||||
|
||||
Args:
|
||||
raw: Internal event dict from get_events()
|
||||
|
||||
Returns:
|
||||
Event instance ready for EventBus emission
|
||||
"""
|
||||
event_type = raw.get("event_type", "Unknown")
|
||||
category = self._derive_category(event_type)
|
||||
nws_severity = raw.get("severity", "unknown")
|
||||
severity = self._map_nws_severity(nws_severity)
|
||||
|
||||
# Build group_key for dedup: same alert ID should merge
|
||||
group_key = raw.get("event_id", "")
|
||||
|
||||
# Build inhibit_keys: a Warning supersedes Watch/Advisory for same hazard
|
||||
inhibit_keys = []
|
||||
if category == "weather_warning":
|
||||
# Warning inhibits corresponding Watch/Advisory
|
||||
base = event_type.rsplit(" ", 1)[0] if " " in event_type else event_type
|
||||
inhibit_keys = [f"nws:{base} Watch", f"nws:{base} Advisory"]
|
||||
|
||||
return make_event(
|
||||
source="nws",
|
||||
category=category,
|
||||
severity=severity,
|
||||
title=raw.get("headline", event_type),
|
||||
summary=raw.get("headline", ""),
|
||||
body=raw.get("description", ""),
|
||||
effective=raw.get("onset") or None,
|
||||
expires=raw.get("expires") or None,
|
||||
lat=raw.get("lat"),
|
||||
lon=raw.get("lon"),
|
||||
nws_zones=raw.get("areas", []),
|
||||
group_key=group_key,
|
||||
inhibit_keys=inhibit_keys,
|
||||
data=raw,
|
||||
)
|
||||
|
||||
def tick(self) -> bool:
|
||||
"""Execute one polling tick.
|
||||
|
||||
|
|
|
|||
32
meshai/env/store.py
vendored
32
meshai/env/store.py
vendored
|
|
@ -2,11 +2,10 @@
|
|||
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..config import EnvironmentalConfig
|
||||
from ..notifications.pipeline import EventBus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -14,15 +13,9 @@ logger = logging.getLogger(__name__)
|
|||
class EnvironmentalStore:
|
||||
"""Cache and tick-driver for all environmental feed adapters."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: "EnvironmentalConfig",
|
||||
region_anchors: list = None,
|
||||
event_bus: Optional["EventBus"] = None,
|
||||
):
|
||||
def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None):
|
||||
self._adapters = {} # name -> adapter instance
|
||||
self._events = {} # (source, event_id) -> event dict
|
||||
self._event_bus = event_bus # Pipeline EventBus for emission
|
||||
self._swpc_status = {} # Kp/SFI/scales snapshot
|
||||
self._ducting_status = {} # tropo ducting assessment
|
||||
self._mesh_zones = config.nws_zones or []
|
||||
|
|
@ -94,29 +87,12 @@ class EnvironmentalStore:
|
|||
self._swpc_status = adapter.get_status()
|
||||
# Also ingest any alert events (R-scale >= 3)
|
||||
for evt in adapter.get_events():
|
||||
key = (evt["source"], evt["event_id"])
|
||||
is_new = key not in self._events
|
||||
self._events[key] = evt
|
||||
if is_new and self._event_bus and hasattr(adapter, "to_event"):
|
||||
self._emit_event(adapter, evt)
|
||||
self._events[(evt["source"], evt["event_id"])] = evt
|
||||
elif name == "ducting":
|
||||
self._ducting_status = adapter.get_status()
|
||||
else:
|
||||
for evt in adapter.get_events():
|
||||
key = (evt["source"], evt["event_id"])
|
||||
is_new = key not in self._events
|
||||
self._events[key] = evt
|
||||
if is_new and self._event_bus and hasattr(adapter, "to_event"):
|
||||
self._emit_event(adapter, evt)
|
||||
|
||||
def _emit_event(self, adapter, raw_evt: dict):
|
||||
"""Convert raw event to pipeline Event and emit to bus."""
|
||||
try:
|
||||
event = adapter.to_event(raw_evt)
|
||||
self._event_bus.emit(event)
|
||||
logger.debug("Emitted %s event %s to pipeline", event.source, event.id)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to emit event to pipeline: %s", e)
|
||||
self._events[(evt["source"], evt["event_id"])] = evt
|
||||
|
||||
def _purge_expired(self):
|
||||
"""Remove expired events."""
|
||||
|
|
|
|||
|
|
@ -174,33 +174,12 @@ ALERT_CATEGORIES = {
|
|||
|
||||
# Environmental - Weather
|
||||
"weather_warning": {
|
||||
"name": "Severe Weather Warning",
|
||||
"description": "NWS Warning affecting your mesh area — highest urgency weather alert",
|
||||
"name": "Severe Weather",
|
||||
"description": "NWS warning or advisory affecting your mesh area",
|
||||
"default_severity": "priority",
|
||||
"example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z",
|
||||
"toggle": "weather",
|
||||
},
|
||||
"weather_watch": {
|
||||
"name": "Weather Watch",
|
||||
"description": "NWS Watch affecting your mesh area — conditions favorable for hazardous weather",
|
||||
"default_severity": "routine",
|
||||
"example_message": "⏳ Winter Storm Watch — Wood River Valley. Heavy snow possible Thu night through Fri.",
|
||||
"toggle": "weather",
|
||||
},
|
||||
"weather_advisory": {
|
||||
"name": "Weather Advisory",
|
||||
"description": "NWS Advisory affecting your mesh area — weather may cause inconvenience",
|
||||
"default_severity": "routine",
|
||||
"example_message": "ℹ Wind Advisory — Magic Valley. SW winds 25-35 mph with gusts to 50 mph.",
|
||||
"toggle": "weather",
|
||||
},
|
||||
"weather_statement": {
|
||||
"name": "Weather Statement",
|
||||
"description": "NWS Special Weather Statement — general awareness, no specific hazard",
|
||||
"default_severity": "routine",
|
||||
"example_message": "📋 Special Weather Statement — Isolated thunderstorms possible this afternoon.",
|
||||
"toggle": "weather",
|
||||
},
|
||||
|
||||
# Environmental - Space Weather
|
||||
"hf_blackout": {
|
||||
|
|
|
|||
|
|
@ -14,10 +14,6 @@ import httpx
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from ..connector import MeshConnector
|
||||
from ..config import NotificationRuleConfig
|
||||
from .events import NotificationPayload
|
||||
|
||||
from meshai.notifications.renderers import MeshRenderer, EmailRenderer, WebhookRenderer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -28,7 +24,7 @@ class NotificationChannel(ABC):
|
|||
channel_type: str = "base"
|
||||
|
||||
@abstractmethod
|
||||
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
|
||||
async def deliver(self, alert: dict, rule: dict) -> bool:
|
||||
"""Send alert. Returns True on success."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
|
@ -63,34 +59,21 @@ class MeshBroadcastChannel(NotificationChannel):
|
|||
def __init__(self, connector: "MeshConnector", channel_index: int = 0):
|
||||
self._connector = connector
|
||||
self._channel = channel_index
|
||||
self._renderer = MeshRenderer()
|
||||
|
||||
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
|
||||
async def deliver(self, alert: dict, rule: dict) -> bool:
|
||||
"""Send alert to mesh channel."""
|
||||
if not self._connector:
|
||||
logger.warning("No mesh connector available")
|
||||
return False
|
||||
|
||||
try:
|
||||
# If payload already has chunk metadata (from digest), use message directly
|
||||
if alert.chunk_index is not None:
|
||||
message = alert.get("message", "")
|
||||
self._connector.send_message(
|
||||
text=alert.message or "",
|
||||
text=message,
|
||||
destination=None,
|
||||
channel=self._channel,
|
||||
)
|
||||
logger.info("Broadcast pre-chunked alert to channel %d", self._channel)
|
||||
return True
|
||||
|
||||
# Render to chunks for single-event delivery
|
||||
chunks = self._renderer.render(alert)
|
||||
for chunk in chunks:
|
||||
self._connector.send_message(
|
||||
text=chunk,
|
||||
destination=None,
|
||||
channel=self._channel,
|
||||
)
|
||||
logger.info("Broadcast %d chunk(s) to channel %d", len(chunks), self._channel)
|
||||
logger.info("Broadcast alert to channel %d", self._channel)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("Failed to broadcast alert: %s", e)
|
||||
|
|
@ -174,23 +157,16 @@ class MeshDMChannel(NotificationChannel):
|
|||
def __init__(self, connector: "MeshConnector", node_ids: list[str]):
|
||||
self._connector = connector
|
||||
self._node_ids = node_ids
|
||||
self._renderer = MeshRenderer()
|
||||
|
||||
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
|
||||
async def deliver(self, alert: dict, rule: dict) -> bool:
|
||||
"""Send alert via DM to configured nodes."""
|
||||
if not self._connector:
|
||||
return False
|
||||
|
||||
# If payload already has chunk metadata (from digest), use message directly
|
||||
if alert.chunk_index is not None:
|
||||
messages = [alert.message or ""]
|
||||
else:
|
||||
# Render to chunks for single-event delivery
|
||||
messages = self._renderer.render(alert)
|
||||
|
||||
message = alert.get("message", "")
|
||||
success = True
|
||||
|
||||
for node_id in self._node_ids:
|
||||
for message in messages:
|
||||
try:
|
||||
node_id = str(node_id)
|
||||
self._connector.send_message(text=message, destination=node_id, channel=0)
|
||||
|
|
@ -309,17 +285,19 @@ class EmailChannel(NotificationChannel):
|
|||
self._tls = smtp_tls
|
||||
self._from = from_address
|
||||
self._recipients = recipients
|
||||
self._renderer = EmailRenderer()
|
||||
|
||||
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
|
||||
async def deliver(self, alert: dict, rule: dict) -> bool:
|
||||
"""Send alert via email."""
|
||||
if not self._recipients:
|
||||
return False
|
||||
|
||||
# Use renderer for subject and body
|
||||
rendered = self._renderer.render(alert)
|
||||
subject = rendered["subject"]
|
||||
body = rendered["body"]
|
||||
alert_type = alert.get("type", "alert")
|
||||
severity = alert.get("severity", "routine").upper()
|
||||
message = alert.get("message", "")
|
||||
subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title())
|
||||
body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % (
|
||||
alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message
|
||||
)
|
||||
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
|
|
@ -535,16 +513,21 @@ class WebhookChannel(NotificationChannel):
|
|||
def __init__(self, url: str, headers: Optional[dict] = None):
|
||||
self._url = url
|
||||
self._headers = headers or {}
|
||||
self._renderer = WebhookRenderer()
|
||||
|
||||
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
|
||||
async def deliver(self, alert: dict, rule: dict) -> bool:
|
||||
"""POST alert to webhook URL."""
|
||||
# Use renderer for generic JSON payload
|
||||
payload = self._renderer.render(alert)
|
||||
payload = {
|
||||
"type": alert.get("type"),
|
||||
"severity": alert.get("severity", "routine"),
|
||||
"message": alert.get("message", ""),
|
||||
"timestamp": time.time(),
|
||||
"node_name": alert.get("node_name"),
|
||||
"region": alert.get("region"),
|
||||
}
|
||||
|
||||
# Discord/Slack format
|
||||
if "discord.com" in self._url or "slack.com" in self._url:
|
||||
severity = alert.severity or "routine"
|
||||
severity = alert.get("severity", "routine")
|
||||
color = {
|
||||
"immediate": 0xFF0000,
|
||||
"priority": 0xFFAA00,
|
||||
|
|
@ -552,8 +535,8 @@ class WebhookChannel(NotificationChannel):
|
|||
}.get(severity, 0x888888)
|
||||
payload = {
|
||||
"embeds": [{
|
||||
"title": "MeshAI: %s" % (alert.event_type or "unknown"),
|
||||
"description": alert.message or "",
|
||||
"title": "MeshAI: %s" % alert.get("type", "unknown"),
|
||||
"description": alert.get("message", ""),
|
||||
"color": color,
|
||||
}]
|
||||
}
|
||||
|
|
@ -562,14 +545,14 @@ class WebhookChannel(NotificationChannel):
|
|||
elif "ntfy" in self._url:
|
||||
headers = {
|
||||
**self._headers,
|
||||
"Title": "MeshAI: %s" % (alert.event_type or "alert"),
|
||||
"Title": "MeshAI: %s" % alert.get("type", "alert"),
|
||||
"Priority": "3",
|
||||
}
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
self._url,
|
||||
content=alert.message or "",
|
||||
content=alert.get("message", ""),
|
||||
headers=headers,
|
||||
timeout=10,
|
||||
)
|
||||
|
|
@ -762,52 +745,8 @@ class WebhookChannel(NotificationChannel):
|
|||
return False, f"Webhook failed: {e}"
|
||||
|
||||
|
||||
def create_channel(rule: "NotificationRuleConfig", connector=None) -> NotificationChannel:
|
||||
"""Create a channel instance from a NotificationRuleConfig.
|
||||
|
||||
Args:
|
||||
rule: NotificationRuleConfig with delivery_type and channel settings
|
||||
connector: MeshConnector instance (required for mesh channels)
|
||||
|
||||
Returns:
|
||||
NotificationChannel instance
|
||||
"""
|
||||
delivery_type = rule.delivery_type
|
||||
|
||||
if delivery_type == "mesh_broadcast":
|
||||
return MeshBroadcastChannel(
|
||||
connector=connector,
|
||||
channel_index=rule.broadcast_channel,
|
||||
)
|
||||
elif delivery_type == "mesh_dm":
|
||||
return MeshDMChannel(
|
||||
connector=connector,
|
||||
node_ids=rule.node_ids,
|
||||
)
|
||||
elif delivery_type == "email":
|
||||
return EmailChannel(
|
||||
smtp_host=rule.smtp_host,
|
||||
smtp_port=rule.smtp_port,
|
||||
smtp_user=rule.smtp_user,
|
||||
smtp_password=rule.smtp_password,
|
||||
smtp_tls=rule.smtp_tls,
|
||||
from_address=rule.from_address,
|
||||
recipients=rule.recipients,
|
||||
)
|
||||
elif delivery_type == "webhook":
|
||||
return WebhookChannel(
|
||||
url=rule.webhook_url,
|
||||
headers=rule.webhook_headers,
|
||||
)
|
||||
else:
|
||||
raise ValueError("Unknown delivery type: %s" % delivery_type)
|
||||
|
||||
|
||||
def create_channel_from_dict(config: dict, connector=None) -> NotificationChannel:
|
||||
"""Create a channel instance from a dict config (legacy interface).
|
||||
|
||||
Used by old router.py and test_channel API. Will be removed in Phase 2.7.
|
||||
"""
|
||||
def create_channel(config: dict, connector=None) -> NotificationChannel:
|
||||
"""Create a channel instance from config."""
|
||||
channel_type = config.get("type", "")
|
||||
|
||||
if channel_type == "mesh_broadcast":
|
||||
|
|
|
|||
|
|
@ -133,52 +133,6 @@ class Event:
|
|||
return cls(**d)
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class NotificationPayload:
|
||||
"""Per-delivery alert content handed to a NotificationChannel.
|
||||
|
||||
This is the runtime alert shape: derived from an Event (or
|
||||
built directly by the old router) and consumed by channels.py
|
||||
implementations.
|
||||
"""
|
||||
message: str # The rendered text to deliver
|
||||
category: str # e.g. "weather_warning"
|
||||
severity: str # "immediate" | "priority" | "routine"
|
||||
timestamp: float # Unix epoch when generated
|
||||
|
||||
# Optional context fields (None when not applicable)
|
||||
node_id: Optional[str] = None
|
||||
node_name: Optional[str] = None
|
||||
region: Optional[str] = None
|
||||
event_type: Optional[str] = None # Maps to old dict's "type" field
|
||||
|
||||
# Chunk metadata for mesh deliveries (set by scheduler/digest path)
|
||||
chunk_index: Optional[int] = None
|
||||
chunk_total: Optional[int] = None
|
||||
|
||||
# Source Event reference for advanced channel use (renderers in 2.5b)
|
||||
source_event: Optional["Event"] = None
|
||||
|
||||
|
||||
def make_payload_from_event(event: "Event", **overrides) -> NotificationPayload:
|
||||
"""Helper to convert an Event into a NotificationPayload."""
|
||||
p = NotificationPayload(
|
||||
message=event.summary or event.title or event.category,
|
||||
category=event.category,
|
||||
severity=event.severity,
|
||||
timestamp=event.timestamp,
|
||||
node_id=event.node_ids[0] if event.node_ids else None,
|
||||
region=event.region,
|
||||
event_type=event.category,
|
||||
source_event=event,
|
||||
)
|
||||
for k, v in overrides.items():
|
||||
setattr(p, k, v)
|
||||
return p
|
||||
|
||||
|
||||
|
||||
def make_event(
|
||||
source: str,
|
||||
category: str,
|
||||
|
|
|
|||
|
|
@ -1,18 +1,17 @@
|
|||
"""Notification pipeline package.
|
||||
|
||||
Phase 2.4:
|
||||
Phase 2.1 + 2.2 + 2.3a + 2.3b:
|
||||
- EventBus: pub/sub ingress
|
||||
- Inhibitor: suppresses redundant events by inhibit_keys
|
||||
- Grouper: coalesces events sharing group_key within a window
|
||||
- ToggleFilter: drops events whose toggle isn't enabled
|
||||
- Tee: sends events to both dispatcher and accumulator
|
||||
- Dispatcher: routes to channels based on rules
|
||||
- DigestAccumulator: logs events for LLM-summarized periodic digest
|
||||
- DigestScheduler: fires digest at configured time
|
||||
- SeverityRouter: forks immediate vs digest
|
||||
- Dispatcher: routes immediate via channels (existing rules schema)
|
||||
- DigestAccumulator: tracks priority/routine events for periodic digest
|
||||
- DigestScheduler: fires digest at configured time (Phase 2.3b)
|
||||
|
||||
Usage:
|
||||
from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline
|
||||
bus = build_pipeline(config, llm_backend) # llm_backend from main.py
|
||||
bus = build_pipeline(config)
|
||||
bus.emit(event)
|
||||
|
||||
# Async lifecycle
|
||||
|
|
@ -21,8 +20,6 @@ Usage:
|
|||
await stop_pipeline(scheduler)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
from meshai.notifications.channels import create_channel
|
||||
from meshai.notifications.pipeline.bus import EventBus, get_bus
|
||||
from meshai.notifications.pipeline.severity_router import (
|
||||
|
|
@ -32,25 +29,17 @@ from meshai.notifications.pipeline.severity_router import (
|
|||
from meshai.notifications.pipeline.dispatcher import Dispatcher
|
||||
from meshai.notifications.pipeline.inhibitor import Inhibitor
|
||||
from meshai.notifications.pipeline.grouper import Grouper
|
||||
from meshai.notifications.pipeline.toggle_filter import ToggleFilter
|
||||
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
|
||||
from meshai.notifications.pipeline.scheduler import DigestScheduler
|
||||
|
||||
|
||||
def build_pipeline(config, llm_backend, connector=None) -> EventBus:
|
||||
def build_pipeline(config) -> EventBus:
|
||||
"""Build the pipeline and return the EventBus.
|
||||
|
||||
Args:
|
||||
config: Full Config object.
|
||||
llm_backend: An already-constructed LLMBackend instance
|
||||
(from main.py or a test). Pipeline components share
|
||||
this single instance. May be None for fallback behavior.
|
||||
connector: Optional MeshtasticConnector for mesh channels.
|
||||
|
||||
Components are stashed on bus._pipeline_components for lifecycle use.
|
||||
"""
|
||||
bus = EventBus()
|
||||
dispatcher = Dispatcher(config, create_channel, connector=connector)
|
||||
dispatcher = Dispatcher(config, create_channel)
|
||||
|
||||
# Build include_toggles from config
|
||||
digest_cfg = getattr(config.notifications, "digest", None)
|
||||
|
|
@ -60,35 +49,12 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus:
|
|||
if include_list:
|
||||
include_toggles = list(include_list)
|
||||
|
||||
accumulator = DigestAccumulator(
|
||||
llm_backend=llm_backend,
|
||||
include_toggles=include_toggles,
|
||||
digest = DigestAccumulator(include_toggles=include_toggles)
|
||||
severity_router = SeverityRouter(
|
||||
immediate_handler=dispatcher.dispatch,
|
||||
digest_handler=digest.enqueue,
|
||||
)
|
||||
|
||||
# Tee closure: events go to BOTH dispatcher and accumulator
|
||||
# dispatcher.dispatch() is async, so fire-and-forget with create_task
|
||||
def _tee(event):
|
||||
try:
|
||||
asyncio.create_task(dispatcher.dispatch(event))
|
||||
except RuntimeError:
|
||||
# No running event loop (e.g. sync tests) - skip async dispatch
|
||||
pass
|
||||
accumulator.enqueue(event)
|
||||
|
||||
# Build enabled toggles set from config
|
||||
toggles_cfg = getattr(config.notifications, "toggles", None)
|
||||
enabled_toggles = None
|
||||
if toggles_cfg is not None:
|
||||
enabled_list = getattr(toggles_cfg, "enabled", None)
|
||||
if enabled_list:
|
||||
enabled_toggles = set(enabled_list)
|
||||
|
||||
toggle_filter = ToggleFilter(
|
||||
next_handler=_tee,
|
||||
enabled_toggles=enabled_toggles,
|
||||
)
|
||||
|
||||
grouper = Grouper(next_handler=toggle_filter.handle)
|
||||
grouper = Grouper(next_handler=severity_router.handle)
|
||||
inhibitor = Inhibitor(next_handler=grouper.handle)
|
||||
bus.subscribe(inhibitor.handle)
|
||||
|
||||
|
|
@ -96,30 +62,21 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus:
|
|||
bus._pipeline_components = {
|
||||
"inhibitor": inhibitor,
|
||||
"grouper": grouper,
|
||||
"toggle_filter": toggle_filter,
|
||||
"severity_router": severity_router,
|
||||
"dispatcher": dispatcher,
|
||||
"accumulator": accumulator,
|
||||
"connector": connector,
|
||||
"digest": digest,
|
||||
}
|
||||
|
||||
return bus
|
||||
|
||||
|
||||
def build_pipeline_components(config, llm_backend, connector=None) -> tuple:
|
||||
def build_pipeline_components(config) -> tuple:
|
||||
"""Like build_pipeline, but returns all components for tests.
|
||||
|
||||
Args:
|
||||
config: Full Config object.
|
||||
llm_backend: An already-constructed LLMBackend instance
|
||||
(from main.py or a test). Pipeline components share
|
||||
this single instance. May be None for fallback behavior.
|
||||
connector: Optional MeshtasticConnector for mesh channels.
|
||||
|
||||
Returns:
|
||||
(bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator).
|
||||
Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest).
|
||||
"""
|
||||
bus = EventBus()
|
||||
dispatcher = Dispatcher(config, create_channel, connector=connector)
|
||||
dispatcher = Dispatcher(config, create_channel)
|
||||
|
||||
# Build include_toggles from config
|
||||
digest_cfg = getattr(config.notifications, "digest", None)
|
||||
|
|
@ -129,39 +86,15 @@ def build_pipeline_components(config, llm_backend, connector=None) -> tuple:
|
|||
if include_list:
|
||||
include_toggles = list(include_list)
|
||||
|
||||
accumulator = DigestAccumulator(
|
||||
llm_backend=llm_backend,
|
||||
include_toggles=include_toggles,
|
||||
digest = DigestAccumulator(include_toggles=include_toggles)
|
||||
severity_router = SeverityRouter(
|
||||
immediate_handler=dispatcher.dispatch,
|
||||
digest_handler=digest.enqueue,
|
||||
)
|
||||
|
||||
# Tee closure: events go to BOTH dispatcher and accumulator
|
||||
# dispatcher.dispatch() is async, so fire-and-forget with create_task
|
||||
def _tee(event):
|
||||
try:
|
||||
asyncio.create_task(dispatcher.dispatch(event))
|
||||
except RuntimeError:
|
||||
# No running event loop (e.g. sync tests) - skip async dispatch
|
||||
pass
|
||||
accumulator.enqueue(event)
|
||||
|
||||
# Build enabled toggles set from config
|
||||
toggles_cfg = getattr(config.notifications, "toggles", None)
|
||||
enabled_toggles = None
|
||||
if toggles_cfg is not None:
|
||||
enabled_list = getattr(toggles_cfg, "enabled", None)
|
||||
if enabled_list:
|
||||
enabled_toggles = set(enabled_list)
|
||||
|
||||
toggle_filter = ToggleFilter(
|
||||
next_handler=_tee,
|
||||
enabled_toggles=enabled_toggles,
|
||||
)
|
||||
|
||||
grouper = Grouper(next_handler=toggle_filter.handle)
|
||||
grouper = Grouper(next_handler=severity_router.handle)
|
||||
inhibitor = Inhibitor(next_handler=grouper.handle)
|
||||
bus.subscribe(inhibitor.handle)
|
||||
|
||||
return bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator
|
||||
return bus, inhibitor, grouper, severity_router, dispatcher, digest
|
||||
|
||||
|
||||
async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
|
||||
|
|
@ -178,14 +111,12 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
|
|||
if components is None:
|
||||
raise RuntimeError("bus missing _pipeline_components; use build_pipeline()")
|
||||
|
||||
accumulator = components["accumulator"]
|
||||
digest = components["digest"]
|
||||
|
||||
connector = components.get("connector")
|
||||
scheduler = DigestScheduler(
|
||||
accumulator=accumulator,
|
||||
accumulator=digest,
|
||||
config=config,
|
||||
channel_factory=create_channel,
|
||||
connector=connector,
|
||||
)
|
||||
await scheduler.start()
|
||||
|
||||
|
|
@ -212,7 +143,6 @@ __all__ = [
|
|||
"Dispatcher",
|
||||
"Inhibitor",
|
||||
"Grouper",
|
||||
"ToggleFilter",
|
||||
"DigestAccumulator",
|
||||
"Digest",
|
||||
"DigestScheduler",
|
||||
|
|
|
|||
|
|
@ -1,24 +1,33 @@
|
|||
"""Digest accumulator and renderer for Phase 2.4.
|
||||
"""Digest accumulator and renderer for Phase 2.3a.
|
||||
|
||||
Logs all events between digest emissions and renders LLM-summarized
|
||||
digest output per toggle. No active/resolved tracking — just a
|
||||
chronological log that the LLM summarizes.
|
||||
Holds priority and routine events between digest emissions, tracks
|
||||
active vs recently-resolved events, and renders the two-section
|
||||
digest output (ACTIVE NOW + SINCE LAST DIGEST) when called.
|
||||
|
||||
render_digest() is async and calls the LLM once per non-empty toggle.
|
||||
No scheduling logic here. render_digest() is called explicitly by
|
||||
the future scheduler (Phase 2.3b) or by tests.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
from typing import Optional
|
||||
|
||||
from meshai.notifications.events import Event
|
||||
from meshai.notifications.categories import get_toggle
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from meshai.backends.base import LLMBackend
|
||||
|
||||
# Lowercase substrings in event.title that indicate the event is
|
||||
# a resolution of a prior alert. Conservative list — easy to extend.
|
||||
RESOLUTION_MARKERS = (
|
||||
"cleared",
|
||||
"reopened",
|
||||
"ended",
|
||||
"resolved",
|
||||
"back online",
|
||||
"recovered",
|
||||
"lifted",
|
||||
)
|
||||
|
||||
# Display labels per toggle (used in rendered output)
|
||||
TOGGLE_LABELS = {
|
||||
|
|
@ -46,23 +55,11 @@ TOGGLE_ORDER = [
|
|||
"other",
|
||||
]
|
||||
|
||||
# System prompt for digest summarization
|
||||
DIGEST_SYSTEM_PROMPT = (
|
||||
"You are summarizing a category of mesh-network alerts for a "
|
||||
"morning digest broadcast. Given a list of events in chronological "
|
||||
"order (immediate severity first, then priority, then routine), "
|
||||
"produce ONE SHORT LINE summarizing what happened. "
|
||||
"Be specific about node IDs, places, and counts when present. "
|
||||
"Aim for 80-140 characters. Do not use markdown. No bullet points. "
|
||||
"Plain prose. End with a period."
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Digest:
|
||||
"""Result of render_digest(). Carries sections and metadata."""
|
||||
"""Result of render_digest(). Carries both sections and metadata."""
|
||||
rendered_at: float
|
||||
# Keep these fields for type compatibility; populated empty in Phase 2.4+
|
||||
active: dict[str, list[Event]] = field(default_factory=dict)
|
||||
since_last: dict[str, list[Event]] = field(default_factory=dict)
|
||||
mesh_chunks: list[str] = field(default_factory=list)
|
||||
|
|
@ -70,31 +67,28 @@ class Digest:
|
|||
full: str = ""
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return not self.mesh_chunks or (
|
||||
len(self.mesh_chunks) == 1 and "No alerts" in self.mesh_chunks[0]
|
||||
)
|
||||
return not self.active and not self.since_last
|
||||
|
||||
|
||||
class DigestAccumulator:
|
||||
"""Logs events and produces LLM-summarized periodic digests.
|
||||
"""Tracks priority/routine events and produces periodic digests.
|
||||
|
||||
Args:
|
||||
llm_backend: LLM backend for generating summaries. If None,
|
||||
falls back to count-based summaries.
|
||||
mesh_char_limit: Maximum characters per mesh chunk (default 200).
|
||||
include_toggles: List of toggle names to include in digest output.
|
||||
If None, defaults to all toggles in TOGGLE_ORDER except
|
||||
rf_propagation.
|
||||
mesh_char_limit: Maximum characters per mesh chunk (default 200).
|
||||
rf_propagation. Unknown toggle names in the list are silently
|
||||
accepted (TOGGLE_ORDER drives display order, include_toggles
|
||||
drives which toggles are tracked).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
llm_backend: Optional["LLMBackend"] = None,
|
||||
include_toggles: list[str] | None = None,
|
||||
mesh_char_limit: int = 200,
|
||||
include_toggles: list[str] | None = None,
|
||||
):
|
||||
self._llm = llm_backend
|
||||
self._events_since_last_digest: dict[str, list[Event]] = {}
|
||||
self._active: dict[str, list[Event]] = {} # toggle -> events
|
||||
self._since_last: dict[str, list[Event]] = {} # toggle -> events
|
||||
self._last_digest_at: float = 0.0
|
||||
self._mesh_char_limit = mesh_char_limit
|
||||
# Default: all known toggles except rf_propagation
|
||||
|
|
@ -107,7 +101,7 @@ class DigestAccumulator:
|
|||
# ---- ingress ----
|
||||
|
||||
def enqueue(self, event: Event) -> None:
|
||||
"""Log an event for the next digest."""
|
||||
"""SeverityRouter calls this for priority/routine events."""
|
||||
toggle = get_toggle(event.category) or "other"
|
||||
|
||||
# Skip non-included toggles
|
||||
|
|
@ -117,201 +111,348 @@ class DigestAccumulator:
|
|||
)
|
||||
return
|
||||
|
||||
# Append to the event log
|
||||
self._events_since_last_digest.setdefault(toggle, []).append(event)
|
||||
active_for_toggle = self._active.setdefault(toggle, [])
|
||||
|
||||
# Resolution detection
|
||||
if self._is_resolution(event, self._now()):
|
||||
self._move_to_since_last_by_group(event, toggle)
|
||||
return
|
||||
|
||||
# In-place update if same id
|
||||
for i, existing in enumerate(active_for_toggle):
|
||||
if existing.id == event.id:
|
||||
active_for_toggle[i] = event
|
||||
self._logger.debug(
|
||||
f"LOGGED event {event.id} ({toggle}/{event.category}/{event.severity})"
|
||||
f"UPDATED active event {event.id} in {toggle}"
|
||||
)
|
||||
return
|
||||
|
||||
# Otherwise it's a new active event
|
||||
active_for_toggle.append(event)
|
||||
self._logger.debug(
|
||||
f"ADDED active event {event.id} ({toggle}/{event.category})"
|
||||
)
|
||||
|
||||
def tick(self, now: Optional[float] = None) -> int:
|
||||
"""No-op in Phase 2.4+. Returns 0."""
|
||||
return 0
|
||||
"""Move expired events from active to since_last.
|
||||
|
||||
# ---- rendering ----
|
||||
|
||||
async def render_digest(self, now: Optional[float] = None) -> Digest:
|
||||
"""Produce a Digest with LLM-summarized lines per toggle.
|
||||
|
||||
Calls the LLM once per toggle that had activity. Empty toggles
|
||||
produce no line. Clears the event log after rendering.
|
||||
Returns the number of events moved.
|
||||
"""
|
||||
if now is None:
|
||||
now = self._now()
|
||||
moved = 0
|
||||
for toggle in list(self._active.keys()):
|
||||
still_active = []
|
||||
for ev in self._active[toggle]:
|
||||
if ev.expires is not None and ev.expires <= now:
|
||||
self._since_last.setdefault(toggle, []).append(ev)
|
||||
moved += 1
|
||||
else:
|
||||
still_active.append(ev)
|
||||
self._active[toggle] = still_active
|
||||
return moved
|
||||
|
||||
# ---- rendering ----
|
||||
|
||||
def render_digest(self, now: Optional[float] = None) -> Digest:
|
||||
"""Produce a Digest of current state, then clear since_last."""
|
||||
if now is None:
|
||||
now = self._now()
|
||||
# tick() first so expired actives roll into since_last
|
||||
self.tick(now)
|
||||
|
||||
digest = Digest(rendered_at=now)
|
||||
time_str = time.strftime('%H%M', time.localtime(now))
|
||||
|
||||
# Build summary lines per toggle
|
||||
summary_lines: list[str] = []
|
||||
|
||||
for toggle in TOGGLE_ORDER:
|
||||
events = self._events_since_last_digest.get(toggle, [])
|
||||
if not events:
|
||||
continue
|
||||
if toggle not in self._included:
|
||||
continue
|
||||
|
||||
label = TOGGLE_LABELS.get(toggle, toggle)
|
||||
summary = await self._summarize_toggle(toggle, events, now)
|
||||
summary_lines.append(f"[{label}] {summary}")
|
||||
|
||||
# Render outputs
|
||||
if summary_lines:
|
||||
digest.mesh_chunks = self._render_mesh_chunks(summary_lines, time_str)
|
||||
digest.full = self._render_full(summary_lines, time_str)
|
||||
else:
|
||||
digest.mesh_chunks = [f"DIGEST {time_str}\nNo alerts since last digest."]
|
||||
digest.full = f"--- {time_str} Digest ---\n\nNo alerts since last digest.\n"
|
||||
|
||||
# mesh_compact for backward compatibility
|
||||
# Defensive: skip non-included toggles when building output
|
||||
digest.active = {
|
||||
k: list(v) for k, v in self._active.items()
|
||||
if v and k in self._included
|
||||
}
|
||||
digest.since_last = {
|
||||
k: list(v) for k, v in self._since_last.items()
|
||||
if v and k in self._included
|
||||
}
|
||||
digest.mesh_chunks = self._render_mesh_chunks(digest, now)
|
||||
# mesh_compact: join chunks for backward compatibility
|
||||
if len(digest.mesh_chunks) == 1:
|
||||
digest.mesh_compact = digest.mesh_chunks[0]
|
||||
else:
|
||||
digest.mesh_compact = "\n---\n".join(digest.mesh_chunks)
|
||||
digest.full = self._render_full(digest, now)
|
||||
|
||||
# Clear event log
|
||||
self._events_since_last_digest.clear()
|
||||
# Clear since_last; active stays for the next cycle
|
||||
self._since_last.clear()
|
||||
self._last_digest_at = now
|
||||
|
||||
return digest
|
||||
|
||||
async def _summarize_toggle(
|
||||
def _render_mesh_chunks(self, digest: Digest, now: float) -> list[str]:
|
||||
"""Produce mesh-radio-friendly compact chunks.
|
||||
|
||||
Returns a list of strings, each ≤ self._mesh_char_limit chars.
|
||||
Single-chunk output has no "(1/N)" suffix. Multi-chunk output
|
||||
has "(k/N)" counters and "(cont)" suffixes on section headers
|
||||
that span chunks.
|
||||
"""
|
||||
time_str = time.strftime('%H%M', time.localtime(now))
|
||||
|
||||
# Empty digest case
|
||||
if not digest.active and not digest.since_last:
|
||||
return [f"DIGEST {time_str}\nNo alerts since last digest."]
|
||||
|
||||
# Build logical lines with section markers
|
||||
# Each item is (section, line) where section is "active", "resolved", or None
|
||||
logical_lines: list[tuple[str | None, str]] = []
|
||||
|
||||
if digest.active:
|
||||
logical_lines.append(("active", "ACTIVE NOW"))
|
||||
for toggle in TOGGLE_ORDER:
|
||||
events = digest.active.get(toggle)
|
||||
if not events:
|
||||
continue
|
||||
logical_lines.append(("active", self._compact_toggle_line(toggle, events)))
|
||||
|
||||
if digest.since_last:
|
||||
logical_lines.append(("resolved", "RESOLVED"))
|
||||
for toggle in TOGGLE_ORDER:
|
||||
events = digest.since_last.get(toggle)
|
||||
if not events:
|
||||
continue
|
||||
logical_lines.append(("resolved", self._compact_toggle_line(toggle, events)))
|
||||
|
||||
# Pack lines into chunks
|
||||
return self._pack_lines_into_chunks(logical_lines, time_str)
|
||||
|
||||
def _pack_lines_into_chunks(
|
||||
self,
|
||||
toggle: str,
|
||||
events: list[Event],
|
||||
now: float,
|
||||
) -> str:
|
||||
"""Generate a one-line summary for a toggle's events."""
|
||||
# Sort by severity (immediate=0, priority=1, routine=2), then timestamp
|
||||
severity_rank = {"immediate": 0, "priority": 1, "routine": 2}
|
||||
sorted_events = sorted(
|
||||
events,
|
||||
key=lambda e: (severity_rank.get(e.severity, 3), e.timestamp),
|
||||
)
|
||||
|
||||
# Build LLM input
|
||||
lines = [f"Category: {toggle}", "Events:"]
|
||||
for ev in sorted_events:
|
||||
lines.append(self._format_event_for_llm(ev))
|
||||
llm_input = "\n".join(lines)
|
||||
|
||||
# Try LLM summarization
|
||||
if self._llm is not None:
|
||||
try:
|
||||
response = await self._llm.generate(
|
||||
messages=[{"role": "user", "content": llm_input}],
|
||||
system_prompt=DIGEST_SYSTEM_PROMPT,
|
||||
max_tokens=200,
|
||||
)
|
||||
# Take first line only
|
||||
summary = response.strip().split("\n")[0].strip()
|
||||
if summary:
|
||||
return summary
|
||||
except Exception as e:
|
||||
self._logger.warning(f"LLM summarization failed for {toggle}: {e}")
|
||||
|
||||
# Fallback: count-based summary
|
||||
return f"{len(events)} event(s) (LLM unavailable)"
|
||||
|
||||
def _format_event_for_llm(self, event: Event) -> str:
|
||||
"""Format one event for LLM input."""
|
||||
ts = datetime.fromtimestamp(event.timestamp)
|
||||
time_str = ts.strftime("%H:%M")
|
||||
severity = event.severity.upper()
|
||||
|
||||
# Combine title and summary
|
||||
text = event.title or ""
|
||||
if event.summary and event.summary != event.title:
|
||||
if text:
|
||||
text = f"{text} — {event.summary}"
|
||||
else:
|
||||
text = event.summary
|
||||
if not text:
|
||||
text = event.category
|
||||
|
||||
# Truncate long text
|
||||
if len(text) > 120:
|
||||
text = text[:117] + "..."
|
||||
|
||||
return f"- [{severity} {time_str}] {text}"
|
||||
|
||||
def _render_mesh_chunks(
|
||||
self,
|
||||
summary_lines: list[str],
|
||||
logical_lines: list[tuple[str | None, str]],
|
||||
time_str: str,
|
||||
) -> list[str]:
|
||||
"""Pack summary lines into mesh-friendly chunks."""
|
||||
"""Pack logical lines into chunks respecting char limit.
|
||||
|
||||
Args:
|
||||
logical_lines: List of (section, line) tuples where section
|
||||
is "active", "resolved", or None for headers.
|
||||
time_str: Time string for headers (e.g., "0700").
|
||||
|
||||
Returns:
|
||||
List of chunk strings, each ≤ self._mesh_char_limit.
|
||||
"""
|
||||
if not logical_lines:
|
||||
return [f"DIGEST {time_str}\nNo alerts since last digest."]
|
||||
|
||||
limit = self._mesh_char_limit
|
||||
chunks: list[list[str]] = []
|
||||
chunks: list[list[str]] = [] # List of line lists
|
||||
current_chunk: list[str] = []
|
||||
current_len = 0
|
||||
last_section_in_chunk: str | None = None
|
||||
sections_started: set[str] = set()
|
||||
|
||||
# Placeholder header
|
||||
header = f"DIGEST {time_str}"
|
||||
# Placeholder header - will be fixed up later
|
||||
header_placeholder = f"DIGEST {time_str}"
|
||||
|
||||
def start_new_chunk():
|
||||
nonlocal current_chunk, current_len
|
||||
nonlocal current_chunk, current_len, last_section_in_chunk
|
||||
if current_chunk:
|
||||
chunks.append(current_chunk)
|
||||
current_chunk = [header]
|
||||
current_len = len(header)
|
||||
current_chunk = [header_placeholder]
|
||||
current_len = len(header_placeholder)
|
||||
last_section_in_chunk = None
|
||||
|
||||
start_new_chunk()
|
||||
|
||||
for line in summary_lines:
|
||||
line_len = 1 + len(line) # newline + line
|
||||
if current_len + line_len > limit:
|
||||
i = 0
|
||||
while i < len(logical_lines):
|
||||
section, line = logical_lines[i]
|
||||
is_section_header = line in ("ACTIVE NOW", "RESOLVED")
|
||||
|
||||
# Check if this is a section header - ensure it has at least one
|
||||
# toggle line following it in this chunk
|
||||
if is_section_header:
|
||||
# Look ahead for the next toggle line
|
||||
next_toggle_idx = i + 1
|
||||
if next_toggle_idx < len(logical_lines):
|
||||
_, next_line = logical_lines[next_toggle_idx]
|
||||
# Calculate space needed for header + newline + next line
|
||||
needed = len(line) + 1 + len(next_line)
|
||||
if current_len + 1 + needed > limit:
|
||||
# Section header + next line won't fit, start new chunk
|
||||
start_new_chunk()
|
||||
sections_started.add(section)
|
||||
last_section_in_chunk = section
|
||||
current_chunk.append(line)
|
||||
current_len += line_len
|
||||
current_len += 1 + len(line)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Calculate line length with newline
|
||||
line_with_newline = 1 + len(line) # newline before line
|
||||
|
||||
# Would this line fit?
|
||||
if current_len + line_with_newline > limit:
|
||||
# Start new chunk
|
||||
start_new_chunk()
|
||||
|
||||
# If continuing a section, add "(cont)" header
|
||||
if section and section in sections_started and not is_section_header:
|
||||
cont_header = "ACTIVE NOW (cont)" if section == "active" else "RESOLVED (cont)"
|
||||
current_chunk.append(cont_header)
|
||||
current_len += 1 + len(cont_header)
|
||||
last_section_in_chunk = section
|
||||
|
||||
# Add the line
|
||||
if is_section_header:
|
||||
sections_started.add(section)
|
||||
last_section_in_chunk = section
|
||||
current_chunk.append(line)
|
||||
current_len += 1 + len(line)
|
||||
i += 1
|
||||
|
||||
# Don't forget the last chunk
|
||||
if current_chunk and len(current_chunk) > 1:
|
||||
if current_chunk and len(current_chunk) > 1: # More than just header
|
||||
chunks.append(current_chunk)
|
||||
elif current_chunk and len(current_chunk) == 1:
|
||||
# Only header in chunk - shouldn't happen but handle gracefully
|
||||
if chunks:
|
||||
# Merge with previous chunk if possible
|
||||
pass
|
||||
else:
|
||||
chunks.append(current_chunk)
|
||||
|
||||
# Fix up headers with chunk counts
|
||||
total = len(chunks)
|
||||
total_chunks = len(chunks)
|
||||
result: list[str] = []
|
||||
|
||||
for idx, chunk_lines in enumerate(chunks):
|
||||
if total == 1:
|
||||
# Fix header line
|
||||
if total_chunks == 1:
|
||||
chunk_lines[0] = f"DIGEST {time_str}"
|
||||
else:
|
||||
chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total})"
|
||||
chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total_chunks})"
|
||||
result.append("\n".join(chunk_lines))
|
||||
|
||||
return result if result else [f"DIGEST {time_str}\nNo alerts since last digest."]
|
||||
|
||||
def _render_full(self, summary_lines: list[str], time_str: str) -> str:
|
||||
"""Produce full multi-line digest for email/webhook."""
|
||||
def _compact_toggle_line(self, toggle: str, events: list[Event]) -> str:
|
||||
"""Build one compact line for a toggle: [Label] headline (+N)"""
|
||||
label = TOGGLE_LABELS.get(toggle, toggle)
|
||||
sorted_events = self._sort_events(events)
|
||||
top_event = sorted_events[0]
|
||||
|
||||
# Get headline text
|
||||
headline = top_event.summary or top_event.title or top_event.category
|
||||
|
||||
# Truncate headline at ~60 chars to keep lines readable
|
||||
max_headline = 60
|
||||
if len(headline) > max_headline:
|
||||
headline = headline[:max_headline - 1] + "…"
|
||||
|
||||
# Append (+N) if more than one event
|
||||
overflow = len(events) - 1
|
||||
if overflow > 0:
|
||||
return f"[{label}] {headline} (+{overflow})"
|
||||
else:
|
||||
return f"[{label}] {headline}"
|
||||
|
||||
def _render_full(self, digest: Digest, now: float) -> str:
|
||||
"""Produce the full multi-line digest for email/webhook."""
|
||||
lines = [
|
||||
f"--- {time_str} Digest ---",
|
||||
f"--- {time.strftime('%H%M', time.localtime(now))} Digest ---",
|
||||
"",
|
||||
]
|
||||
lines.extend(summary_lines)
|
||||
|
||||
if not digest.active and not digest.since_last:
|
||||
lines.append("No alerts since last digest.")
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
else:
|
||||
if digest.active:
|
||||
lines.append("ACTIVE NOW:")
|
||||
for toggle in TOGGLE_ORDER:
|
||||
events = digest.active.get(toggle)
|
||||
if not events:
|
||||
continue
|
||||
label = TOGGLE_LABELS.get(toggle, toggle)
|
||||
for ev in self._sort_events(events):
|
||||
lines.append(f" [{label}] {self._format_event_line(ev)}")
|
||||
lines.append("")
|
||||
|
||||
if digest.since_last:
|
||||
lines.append("SINCE LAST DIGEST:")
|
||||
for toggle in TOGGLE_ORDER:
|
||||
events = digest.since_last.get(toggle)
|
||||
if not events:
|
||||
continue
|
||||
label = TOGGLE_LABELS.get(toggle, toggle)
|
||||
for ev in self._sort_events(events):
|
||||
lines.append(f" [{label}] {self._format_event_line(ev)}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
|
||||
def _format_event_line(self, event: Event) -> str:
|
||||
"""Single-line summary of an event for digest output."""
|
||||
# Prefer event.summary if set, else fall back to title, then category
|
||||
text = event.summary or event.title or event.category
|
||||
# Trim runaway text — keep digest readable
|
||||
if len(text) > 140:
|
||||
text = text[:139] + "…"
|
||||
return text
|
||||
|
||||
def _sort_events(self, events: list[Event]) -> list[Event]:
|
||||
"""Sort within a toggle: immediate first, then priority,
|
||||
then routine, then by timestamp newest first."""
|
||||
rank = {"immediate": 0, "priority": 1, "routine": 2}
|
||||
return sorted(
|
||||
events,
|
||||
key=lambda e: (rank.get(e.severity, 3), -e.timestamp),
|
||||
)
|
||||
|
||||
# ---- helpers ----
|
||||
|
||||
def _is_resolution(self, event: Event, now: float) -> bool:
|
||||
if event.expires is not None and event.expires <= now:
|
||||
return True
|
||||
title_lc = (event.title or "").lower()
|
||||
return any(marker in title_lc for marker in RESOLUTION_MARKERS)
|
||||
|
||||
def _move_to_since_last_by_group(self, event: Event, toggle: str) -> None:
|
||||
"""Remove any active event matching event's group_key (or id)
|
||||
and place this resolution event into since_last.
|
||||
"""
|
||||
active_list = self._active.get(toggle, [])
|
||||
# Match by group_key if set, else by id
|
||||
match_key = event.group_key
|
||||
if match_key:
|
||||
self._active[toggle] = [
|
||||
e for e in active_list
|
||||
if e.group_key != match_key
|
||||
]
|
||||
else:
|
||||
self._active[toggle] = [
|
||||
e for e in active_list if e.id != event.id
|
||||
]
|
||||
self._since_last.setdefault(toggle, []).append(event)
|
||||
self._logger.debug(
|
||||
f"RESOLVED in {toggle}: {event.id} ({event.title!r})"
|
||||
)
|
||||
|
||||
def _now(self) -> float:
|
||||
return time.time()
|
||||
|
||||
# ---- inspection (for tests and scheduler) ----
|
||||
# ---- inspection (for tests and future scheduler) ----
|
||||
|
||||
def event_count(self, toggle: Optional[str] = None) -> int:
|
||||
"""Count events logged since last digest."""
|
||||
def active_count(self, toggle: Optional[str] = None) -> int:
|
||||
if toggle is not None:
|
||||
return len(self._events_since_last_digest.get(toggle, []))
|
||||
return sum(len(v) for v in self._events_since_last_digest.values())
|
||||
return len(self._active.get(toggle, []))
|
||||
return sum(len(v) for v in self._active.values())
|
||||
|
||||
def since_last_count(self, toggle: Optional[str] = None) -> int:
|
||||
if toggle is not None:
|
||||
return len(self._since_last.get(toggle, []))
|
||||
return sum(len(v) for v in self._since_last.values())
|
||||
|
||||
def last_digest_at(self) -> float:
|
||||
return self._last_digest_at
|
||||
|
||||
def clear(self) -> None:
|
||||
self._events_since_last_digest.clear()
|
||||
self._active.clear()
|
||||
self._since_last.clear()
|
||||
self._last_digest_at = 0.0
|
||||
|
||||
# Legacy compatibility — return 0 for old tests
|
||||
def active_count(self, toggle: Optional[str] = None) -> int:
|
||||
return 0
|
||||
|
||||
def since_last_count(self, toggle: Optional[str] = None) -> int:
|
||||
return 0
|
||||
|
|
|
|||
|
|
@ -4,15 +4,12 @@ The dispatcher routes immediate-severity events through the existing
|
|||
NotificationRuleConfig rules and delivers via channels.py. This is the
|
||||
transitional bridge between the new Event pipeline and the existing
|
||||
channel implementations.
|
||||
|
||||
Phase 2.5a: dispatch() is now async, takes a connector at construction,
|
||||
and properly awaits channel.deliver(payload, rule).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Callable, Optional
|
||||
from typing import Callable
|
||||
|
||||
from meshai.notifications.events import Event, make_payload_from_event
|
||||
from meshai.notifications.events import Event
|
||||
|
||||
|
||||
class Dispatcher:
|
||||
|
|
@ -20,26 +17,21 @@ class Dispatcher:
|
|||
|
||||
SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2}
|
||||
|
||||
def __init__(self, config, channel_factory: Callable, connector=None):
|
||||
def __init__(self, config, channel_factory: Callable):
|
||||
"""Initialize.
|
||||
|
||||
Args:
|
||||
config: The full Config object (provides config.notifications.rules)
|
||||
channel_factory: Callable taking (rule, connector) and returning
|
||||
a NotificationChannel. This is create_channel from
|
||||
meshai/notifications/channels.py.
|
||||
connector: MeshConnector instance for mesh channel deliveries.
|
||||
channel_factory: Callable taking a NotificationRuleConfig and
|
||||
returning a NotificationChannel. This is create_channel
|
||||
from meshai/notifications/channels.py.
|
||||
"""
|
||||
self._config = config
|
||||
self._channel_factory = channel_factory
|
||||
self._connector = connector
|
||||
self._logger = logging.getLogger("meshai.pipeline.dispatcher")
|
||||
|
||||
async def dispatch(self, event: Event) -> None:
|
||||
"""Deliver an immediate-severity event to all matching channels.
|
||||
|
||||
This method is async and awaits each channel.deliver() call.
|
||||
"""
|
||||
def dispatch(self, event: Event) -> None:
|
||||
"""Deliver an immediate-severity event to all matching channels."""
|
||||
rules = self._matching_rules(event)
|
||||
if not rules:
|
||||
self._logger.debug(
|
||||
|
|
@ -48,17 +40,19 @@ class Dispatcher:
|
|||
return
|
||||
for rule in rules:
|
||||
try:
|
||||
channel = self._channel_factory(rule, self._connector)
|
||||
payload = make_payload_from_event(event)
|
||||
success = await channel.deliver(payload, rule)
|
||||
if success:
|
||||
channel = self._channel_factory(rule)
|
||||
alert = {
|
||||
"category": event.category,
|
||||
"severity": event.severity,
|
||||
"message": event.summary or event.title,
|
||||
"node_id": event.node_ids[0] if event.node_ids else None,
|
||||
"region": event.region,
|
||||
"timestamp": event.timestamp,
|
||||
}
|
||||
channel.deliver(alert)
|
||||
self._logger.info(
|
||||
f"Dispatched event {event.id} via {rule.delivery_type}"
|
||||
)
|
||||
else:
|
||||
self._logger.warning(
|
||||
f"Channel delivery returned False for rule {rule.name}"
|
||||
)
|
||||
except Exception:
|
||||
self._logger.exception(
|
||||
f"Channel delivery failed for rule {rule.name}"
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ from datetime import datetime, timedelta
|
|||
from typing import Callable, Optional
|
||||
|
||||
from meshai.notifications.pipeline.digest import DigestAccumulator
|
||||
from meshai.notifications.events import NotificationPayload
|
||||
|
||||
|
||||
class DigestScheduler:
|
||||
|
|
@ -24,14 +23,12 @@ class DigestScheduler:
|
|||
accumulator: DigestAccumulator,
|
||||
config,
|
||||
channel_factory: Callable,
|
||||
connector=None,
|
||||
clock: Optional[Callable[[], float]] = None,
|
||||
sleep: Optional[Callable[[float], "asyncio.Future"]] = None,
|
||||
):
|
||||
self._accumulator = accumulator
|
||||
self._config = config
|
||||
self._channel_factory = channel_factory
|
||||
self._connector = connector
|
||||
self._clock = clock or time.time
|
||||
self._sleep = sleep or asyncio.sleep
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
|
|
@ -101,8 +98,7 @@ class DigestScheduler:
|
|||
async def _fire(self, now: float) -> None:
|
||||
"""Render and deliver one digest."""
|
||||
self._logger.info(f"Firing digest at {datetime.fromtimestamp(now):%H:%M}")
|
||||
# render_digest is now async in Phase 2.4+
|
||||
digest = await self._accumulator.render_digest(now)
|
||||
digest = self._accumulator.render_digest(now)
|
||||
self._last_fire_at = now
|
||||
|
||||
rules = self._matching_rules()
|
||||
|
|
@ -123,7 +119,7 @@ class DigestScheduler:
|
|||
|
||||
async def _deliver_to_rule(self, rule, digest, now: float) -> None:
|
||||
"""Hand the rendered digest to a channel based on rule.delivery_type."""
|
||||
channel = self._channel_factory(rule, self._connector)
|
||||
channel = self._channel_factory(rule)
|
||||
delivery_type = rule.delivery_type
|
||||
|
||||
if delivery_type in ("mesh_broadcast", "mesh_dm"):
|
||||
|
|
@ -131,27 +127,31 @@ class DigestScheduler:
|
|||
chunks = digest.mesh_chunks
|
||||
total = len(chunks)
|
||||
for i, chunk in enumerate(chunks, start=1):
|
||||
payload = NotificationPayload(
|
||||
message=chunk,
|
||||
category="digest",
|
||||
severity="routine",
|
||||
timestamp=now,
|
||||
chunk_index=i,
|
||||
chunk_total=total,
|
||||
)
|
||||
await channel.deliver(payload, rule)
|
||||
payload = {
|
||||
"category": "digest",
|
||||
"severity": "routine",
|
||||
"message": chunk,
|
||||
"node_id": None,
|
||||
"region": None,
|
||||
"timestamp": now,
|
||||
"chunk_index": i,
|
||||
"chunk_total": total,
|
||||
}
|
||||
channel.deliver(payload)
|
||||
self._logger.info(
|
||||
f"Delivered {total} mesh chunk(s) to rule {rule.name!r}"
|
||||
)
|
||||
else:
|
||||
# Single full-form delivery
|
||||
payload = NotificationPayload(
|
||||
message=digest.full,
|
||||
category="digest",
|
||||
severity="routine",
|
||||
timestamp=now,
|
||||
)
|
||||
await channel.deliver(payload, rule)
|
||||
payload = {
|
||||
"category": "digest",
|
||||
"severity": "routine",
|
||||
"message": digest.full,
|
||||
"node_id": None,
|
||||
"region": None,
|
||||
"timestamp": now,
|
||||
}
|
||||
channel.deliver(payload)
|
||||
self._logger.info(
|
||||
f"Delivered digest to rule {rule.name!r} via {delivery_type}"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,48 +0,0 @@
|
|||
"""Master toggle filter.
|
||||
|
||||
Drops events whose category maps to a toggle that the operator has
|
||||
disabled. Distinct from DigestAccumulator.include_toggles, which
|
||||
only affects the digest recap — this filter drops events from the
|
||||
entire pipeline, so disabled toggles produce no live mesh delivery,
|
||||
no digest entry, nothing.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Callable
|
||||
|
||||
from meshai.notifications.events import Event
|
||||
from meshai.notifications.categories import get_toggle
|
||||
|
||||
|
||||
class ToggleFilter:
|
||||
"""Drop events whose toggle isn't in the enabled set."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
next_handler: Callable[[Event], None],
|
||||
enabled_toggles: set[str] | None = None,
|
||||
):
|
||||
"""Initialize.
|
||||
|
||||
Args:
|
||||
next_handler: Callable that receives non-dropped events.
|
||||
enabled_toggles: Set of toggle names that are enabled.
|
||||
If None, all toggles are enabled (filter is a no-op).
|
||||
"""
|
||||
self._next = next_handler
|
||||
self._enabled = enabled_toggles # None = no-op
|
||||
self._logger = logging.getLogger("meshai.pipeline.toggle_filter")
|
||||
|
||||
def handle(self, event: Event) -> None:
|
||||
"""Pass the event through, or drop it if its toggle is disabled."""
|
||||
if self._enabled is None:
|
||||
self._next(event)
|
||||
return
|
||||
|
||||
toggle = get_toggle(event.category) or "other"
|
||||
if toggle not in self._enabled:
|
||||
self._logger.debug(
|
||||
f"DROPPED event {event.id} — toggle {toggle!r} not enabled"
|
||||
)
|
||||
return
|
||||
self._next(event)
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
"""Channel-type-aware renderers.
|
||||
|
||||
Each renderer takes a NotificationPayload and produces a
|
||||
channel-type-appropriate output: mesh chunks, email subject/body,
|
||||
or webhook JSON dict.
|
||||
|
||||
Renderers are reusable beyond channels.py — the future MQTT event
|
||||
publisher (Phase 2.6.5) uses WebhookRenderer for its on-the-wire
|
||||
format.
|
||||
"""
|
||||
|
||||
from meshai.notifications.renderers.base import Renderer
|
||||
from meshai.notifications.renderers.mesh import MeshRenderer
|
||||
from meshai.notifications.renderers.email import EmailRenderer
|
||||
from meshai.notifications.renderers.webhook import WebhookRenderer
|
||||
|
||||
__all__ = [
|
||||
"Renderer",
|
||||
"MeshRenderer",
|
||||
"EmailRenderer",
|
||||
"WebhookRenderer",
|
||||
]
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
"""Abstract base for channel-type-aware renderers.
|
||||
|
||||
Each renderer takes a NotificationPayload and produces a
|
||||
channel-type-appropriate output (string, list, dict, etc.).
|
||||
Renderers are pure functions of their input — no network, no
|
||||
state, no side effects beyond logging. The channel that owns a
|
||||
renderer calls render() and then handles delivery.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
|
||||
from meshai.notifications.events import NotificationPayload
|
||||
|
||||
|
||||
class Renderer(ABC):
|
||||
"""Base class for all channel-type renderers."""
|
||||
|
||||
@abstractmethod
|
||||
def render(self, payload: NotificationPayload) -> Any:
|
||||
"""Produce the channel-type-appropriate output.
|
||||
|
||||
Subclasses define the concrete return type:
|
||||
- MeshRenderer returns list[str]
|
||||
- EmailRenderer returns dict (subject, body)
|
||||
- WebhookRenderer returns dict (JSON-serializable)
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
"""Email channel renderer.
|
||||
|
||||
Produces a dict with subject and body for SMTP delivery. Plain
|
||||
text body for now; HTML body is a future polish.
|
||||
|
||||
Subject format: "[MeshAI] <Severity> — <Event Type>"
|
||||
Body format: multi-line, with the message as the lead, followed
|
||||
by structured context fields (severity, region, node, timestamp,
|
||||
source category).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from meshai.notifications.events import NotificationPayload
|
||||
from meshai.notifications.renderers.base import Renderer
|
||||
|
||||
|
||||
class EmailRenderer(Renderer):
|
||||
"""Produce email subject and body for a single payload."""
|
||||
|
||||
def __init__(self):
|
||||
self._logger = logging.getLogger("meshai.renderers.email")
|
||||
|
||||
def render(self, payload: NotificationPayload) -> dict:
|
||||
"""Render the payload as {subject, body}."""
|
||||
return {
|
||||
"subject": self._build_subject(payload),
|
||||
"body": self._build_body(payload),
|
||||
}
|
||||
|
||||
def _build_subject(self, p: NotificationPayload) -> str:
|
||||
sev = (p.severity or "routine").upper()
|
||||
type_label = self._type_label(p.event_type) or "Alert"
|
||||
return f"[MeshAI] {sev} — {type_label}"
|
||||
|
||||
def _build_body(self, p: NotificationPayload) -> str:
|
||||
lines: list[str] = []
|
||||
# Lead line
|
||||
lines.append(p.message or "(no message)")
|
||||
lines.append("") # blank separator
|
||||
|
||||
# Structured context
|
||||
lines.append(f"Severity: {p.severity or 'routine'}")
|
||||
if p.event_type:
|
||||
lines.append(f"Category: {p.event_type}")
|
||||
if p.region:
|
||||
lines.append(f"Region: {p.region}")
|
||||
if p.node_name:
|
||||
lines.append(f"Node: {p.node_name}")
|
||||
elif p.node_id:
|
||||
lines.append(f"Node: {p.node_id}")
|
||||
if p.timestamp:
|
||||
try:
|
||||
ts = datetime.fromtimestamp(p.timestamp)
|
||||
lines.append(f"Time: {ts.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
except (ValueError, OverflowError):
|
||||
pass
|
||||
|
||||
# Optional source event detail (if present)
|
||||
if p.source_event is not None:
|
||||
ev = p.source_event
|
||||
if hasattr(ev, "source") and ev.source:
|
||||
lines.append(f"Source: {ev.source}")
|
||||
if hasattr(ev, "title") and ev.title and ev.title != p.message:
|
||||
lines.append(f"Title: {ev.title}")
|
||||
|
||||
lines.append("")
|
||||
lines.append("--")
|
||||
lines.append("MeshAI notification")
|
||||
return "\n".join(lines)
|
||||
|
||||
def _type_label(self, event_type: Optional[str]) -> Optional[str]:
|
||||
"""Title-cased label for the event type (for subject line)."""
|
||||
if not event_type:
|
||||
return None
|
||||
return event_type.replace("_", " ").title()
|
||||
|
|
@ -1,127 +0,0 @@
|
|||
"""Mesh channel renderer.
|
||||
|
||||
Produces a list of short strings (each ≤200 chars by default)
|
||||
suitable for mesh radio broadcast. Reuses the digest's chunking
|
||||
pattern: never split a single "line" across chunks; add (k/N)
|
||||
counters when the rendered output produces more than one chunk.
|
||||
|
||||
The mesh renderer is symmetric with the digest accumulator's
|
||||
mesh chunk renderer — same chunk-packing algorithm, same char
|
||||
limit semantics.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from meshai.notifications.events import NotificationPayload
|
||||
from meshai.notifications.renderers.base import Renderer
|
||||
|
||||
|
||||
class MeshRenderer(Renderer):
|
||||
"""Produce mesh-compact chunks for a single payload."""
|
||||
|
||||
def __init__(self, char_limit: int = 200):
|
||||
self._limit = char_limit
|
||||
self._logger = logging.getLogger("meshai.renderers.mesh")
|
||||
|
||||
def render(self, payload: NotificationPayload) -> list[str]:
|
||||
"""Render the payload as 1+ mesh-compact chunks.
|
||||
|
||||
Algorithm:
|
||||
- Build the full message line (event_type, severity hint,
|
||||
and message text — see _format_one_line)
|
||||
- If the line fits in self._limit chars, return [line].
|
||||
- If the line exceeds self._limit, split across multiple
|
||||
chunks at word boundaries when possible, and add
|
||||
"(k/N)" counters at the start of each chunk.
|
||||
- If a single word exceeds the chunk limit, hard-split
|
||||
mid-word (rare — long URLs etc.)
|
||||
"""
|
||||
line = self._format_one_line(payload)
|
||||
if len(line) <= self._limit:
|
||||
return [line]
|
||||
|
||||
return self._chunk_long_line(line)
|
||||
|
||||
def _format_one_line(self, p: NotificationPayload) -> str:
|
||||
"""Build the headline for a payload.
|
||||
|
||||
Default format:
|
||||
"[<EventTypeTitle>] <message>"
|
||||
where EventTypeTitle is a short label derived from
|
||||
p.event_type (e.g. "weather_warning" → "Weather"). If
|
||||
p.event_type is None, omit the prefix.
|
||||
|
||||
Truncates the message at the limit only if the prefix
|
||||
is short enough; otherwise lets the chunker handle it.
|
||||
"""
|
||||
prefix = self._toggle_label(p.event_type)
|
||||
if prefix:
|
||||
return f"[{prefix}] {p.message}"
|
||||
return p.message
|
||||
|
||||
def _toggle_label(self, event_type: Optional[str]) -> Optional[str]:
|
||||
"""Map an event category to a short toggle label.
|
||||
|
||||
Looks up the toggle the category belongs to and returns
|
||||
the toggle's display label. If unknown, returns None
|
||||
(no prefix added).
|
||||
"""
|
||||
if not event_type:
|
||||
return None
|
||||
from meshai.notifications.categories import get_toggle
|
||||
toggle = get_toggle(event_type)
|
||||
if not toggle:
|
||||
return None
|
||||
# Same label set used by the digest renderer
|
||||
TOGGLE_LABELS = {
|
||||
"mesh_health": "Mesh",
|
||||
"weather": "Weather",
|
||||
"fire": "Fire",
|
||||
"rf_propagation": "RF",
|
||||
"roads": "Roads",
|
||||
"avalanche": "Avalanche",
|
||||
"seismic": "Seismic",
|
||||
"tracking": "Tracking",
|
||||
"other": "Other",
|
||||
}
|
||||
return TOGGLE_LABELS.get(toggle, toggle.title())
|
||||
|
||||
def _chunk_long_line(self, line: str) -> list[str]:
|
||||
"""Split a long line into chunks ≤ self._limit each.
|
||||
|
||||
Reserves ~10 chars per chunk for the "(k/N)" counter
|
||||
suffix. Splits at word boundaries; falls back to
|
||||
mid-word split if a single word exceeds the budget.
|
||||
"""
|
||||
# Reserve space for " (k/N)" — generous to 10 chars
|
||||
body_budget = self._limit - 10
|
||||
if body_budget <= 0:
|
||||
body_budget = self._limit # extreme small limit; ignore counter
|
||||
|
||||
words = line.split(" ")
|
||||
chunks: list[str] = []
|
||||
current = ""
|
||||
for word in words:
|
||||
# Hard-split words longer than body_budget (rare)
|
||||
while len(word) > body_budget:
|
||||
if current:
|
||||
chunks.append(current)
|
||||
current = ""
|
||||
chunks.append(word[:body_budget])
|
||||
word = word[body_budget:]
|
||||
# Add word to current chunk if it fits
|
||||
tentative = (current + " " + word) if current else word
|
||||
if len(tentative) <= body_budget:
|
||||
current = tentative
|
||||
else:
|
||||
chunks.append(current)
|
||||
current = word
|
||||
if current:
|
||||
chunks.append(current)
|
||||
|
||||
# Add counters: "DIGEST"-style "(k/N)" suffix
|
||||
total = len(chunks)
|
||||
if total == 1:
|
||||
return chunks
|
||||
return [f"{chunk} ({i+1}/{total})" for i, chunk in enumerate(chunks)]
|
||||
|
|
@ -1,67 +0,0 @@
|
|||
"""Webhook channel renderer.
|
||||
|
||||
Produces a JSON-serializable dict with stable field names. Also
|
||||
intended for reuse by the future MQTT event publisher (Phase
|
||||
2.6.5), which wants the same structured shape on the wire.
|
||||
|
||||
Field names use snake_case. Optional fields are omitted from the
|
||||
output when None, NOT set to null — keeps payloads compact and
|
||||
avoids ambiguity between "field absent" and "field explicitly
|
||||
null".
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from meshai.notifications.events import NotificationPayload
|
||||
from meshai.notifications.renderers.base import Renderer
|
||||
|
||||
|
||||
# Schema version for the wire format. Bump when making
|
||||
# breaking changes to the field shape. External consumers
|
||||
# can use this to handle multiple versions.
|
||||
WEBHOOK_SCHEMA_VERSION = "1.0"
|
||||
|
||||
|
||||
class WebhookRenderer(Renderer):
|
||||
"""Produce a JSON-serializable dict for a single payload."""
|
||||
|
||||
def __init__(self):
|
||||
self._logger = logging.getLogger("meshai.renderers.webhook")
|
||||
|
||||
def render(self, payload: NotificationPayload) -> dict:
|
||||
"""Render the payload as a structured dict."""
|
||||
out: dict[str, Any] = {
|
||||
"schema_version": WEBHOOK_SCHEMA_VERSION,
|
||||
"message": payload.message,
|
||||
"severity": payload.severity or "routine",
|
||||
"timestamp": payload.timestamp,
|
||||
}
|
||||
|
||||
# Optional fields — omit if None
|
||||
for src_attr, dst_key in (
|
||||
("category", "category"),
|
||||
("event_type", "event_type"),
|
||||
("node_id", "node_id"),
|
||||
("node_name", "node_name"),
|
||||
("region", "region"),
|
||||
("chunk_index", "chunk_index"),
|
||||
("chunk_total", "chunk_total"),
|
||||
):
|
||||
value = getattr(payload, src_attr, None)
|
||||
if value is not None:
|
||||
out[dst_key] = value
|
||||
|
||||
# Optional source event detail
|
||||
if payload.source_event is not None:
|
||||
ev = payload.source_event
|
||||
source_event: dict[str, Any] = {}
|
||||
for attr in ("id", "source", "title", "expires", "group_key"):
|
||||
if hasattr(ev, attr):
|
||||
value = getattr(ev, attr)
|
||||
if value is not None:
|
||||
source_event[attr] = value
|
||||
if source_event:
|
||||
out["source_event"] = source_event
|
||||
|
||||
return out
|
||||
|
|
@ -8,8 +8,7 @@ import time
|
|||
from datetime import datetime
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
from .channels import create_channel_from_dict, NotificationChannel
|
||||
from .events import NotificationPayload
|
||||
from .channels import create_channel, NotificationChannel
|
||||
from .summarizer import MessageSummarizer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -143,7 +142,7 @@ class NotificationRouter:
|
|||
return None
|
||||
|
||||
try:
|
||||
return create_channel_from_dict(config, self._connector)
|
||||
return create_channel(config, self._connector)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e)
|
||||
return None
|
||||
|
|
@ -200,20 +199,7 @@ class NotificationRouter:
|
|||
else:
|
||||
delivery_alert = {**alert, "message": message[:195] + "..."}
|
||||
|
||||
# Convert dict to NotificationPayload for channel interface
|
||||
payload = NotificationPayload(
|
||||
message=delivery_alert.get("message", ""),
|
||||
category=delivery_alert.get("type", "unknown"),
|
||||
severity=delivery_alert.get("severity", "routine"),
|
||||
timestamp=delivery_alert.get("timestamp", time.time()),
|
||||
node_id=delivery_alert.get("node_id"),
|
||||
node_name=delivery_alert.get("node_name"),
|
||||
region=delivery_alert.get("region"),
|
||||
event_type=delivery_alert.get("type"),
|
||||
)
|
||||
# Rule is a dict here; channels don't use it so we pass None
|
||||
# for the rule parameter (channels ignore it anyway)
|
||||
success = await channel.deliver(payload, None)
|
||||
success = await channel.deliver(delivery_alert, rule)
|
||||
if success:
|
||||
delivered = True
|
||||
self._record_fire(rule_name)
|
||||
|
|
@ -269,7 +255,7 @@ class NotificationRouter:
|
|||
{success, message, error, details}
|
||||
"""
|
||||
try:
|
||||
channel = create_channel_from_dict(channel_config, self._connector)
|
||||
channel = create_channel(channel_config, self._connector)
|
||||
return await channel.test_connection()
|
||||
except ValueError as e:
|
||||
return {
|
||||
|
|
|
|||
|
|
@ -1,212 +0,0 @@
|
|||
"""Tests for FIRMS adapter Phase 2.6 — to_event() method."""
|
||||
|
||||
import time
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from meshai.env.firms import FIRMSAdapter
|
||||
from meshai.notifications.events import Event
|
||||
|
||||
|
||||
# ============================================================
|
||||
# FIXTURES
|
||||
# ============================================================
|
||||
|
||||
@pytest.fixture
|
||||
def mock_config():
|
||||
"""Create a mock FIRMSConfig."""
|
||||
config = MagicMock()
|
||||
config.map_key = "test-key"
|
||||
config.source = "VIIRS_SNPP_NRT"
|
||||
config.bbox = [-117, 42, -114, 44]
|
||||
config.day_range = 1
|
||||
config.tick_seconds = 1800
|
||||
config.confidence_min = "nominal"
|
||||
config.proximity_km = 10.0
|
||||
return config
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def adapter(mock_config):
|
||||
"""Create a FIRMSAdapter with mocked dependencies."""
|
||||
return FIRMSAdapter(mock_config, region_anchors=[], fires_adapter=None)
|
||||
|
||||
|
||||
def make_firms_event(
|
||||
lat=42.5,
|
||||
lon=-114.5,
|
||||
new_ignition=False,
|
||||
severity="routine",
|
||||
headline="Test Hotspot",
|
||||
frp=None,
|
||||
confidence="n",
|
||||
distance_km=None,
|
||||
nearest_anchor=None,
|
||||
near_fire=None,
|
||||
):
|
||||
"""Helper to create a FIRMS event dict."""
|
||||
now = time.time()
|
||||
return {
|
||||
"source": "firms",
|
||||
"event_id": f"firms_{lat:.4f}_{lon:.4f}_2026-05-15_1200",
|
||||
"event_type": "Fire Hotspot",
|
||||
"severity": severity,
|
||||
"headline": headline,
|
||||
"lat": lat,
|
||||
"lon": lon,
|
||||
"expires": now + 21600,
|
||||
"fetched_at": now,
|
||||
"properties": {
|
||||
"new_ignition": new_ignition,
|
||||
"confidence": confidence,
|
||||
"frp": frp,
|
||||
"brightness": 350.0,
|
||||
"acq_date": "2026-05-15",
|
||||
"acq_time": "1200",
|
||||
"near_fire": near_fire,
|
||||
"distance_to_fire_km": 5.0 if near_fire else None,
|
||||
"distance_km": distance_km,
|
||||
"nearest_anchor": nearest_anchor,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ============================================================
|
||||
# CATEGORY DECISION TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_to_event_new_ignition(adapter):
|
||||
"""New ignition maps to new_ignition category."""
|
||||
evt = make_firms_event(new_ignition=True)
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.category == "new_ignition"
|
||||
|
||||
|
||||
def test_to_event_near_known_fire(adapter):
|
||||
"""Hotspot near known fire maps to wildfire_proximity."""
|
||||
evt = make_firms_event(new_ignition=False, near_fire="Snake River Fire")
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.category == "wildfire_proximity"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# SEVERITY PASS-THROUGH TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_to_event_severity_passes_through(adapter):
|
||||
"""Severity from FIRMS event passes through unchanged."""
|
||||
for sev in ["routine", "priority", "immediate"]:
|
||||
evt = make_firms_event(severity=sev)
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.severity == sev
|
||||
|
||||
|
||||
# ============================================================
|
||||
# CONTENT TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_to_event_summary_includes_frp(adapter):
|
||||
"""Summary includes FRP when present."""
|
||||
evt = make_firms_event(frp=85.5)
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert "FRP 85" in event.summary
|
||||
|
||||
|
||||
def test_to_event_summary_handles_missing_frp(adapter):
|
||||
"""Missing FRP doesn't break to_event."""
|
||||
evt = make_firms_event(frp=None)
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert "FRP" not in event.summary
|
||||
|
||||
|
||||
def test_to_event_summary_includes_distance_when_present(adapter):
|
||||
"""Summary includes distance and anchor when present."""
|
||||
evt = make_firms_event(distance_km=12, nearest_anchor="TFL")
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert "12 km" in event.summary
|
||||
assert "TFL" in event.summary
|
||||
|
||||
|
||||
def test_to_event_region_uses_nearest_anchor(adapter):
|
||||
"""Region is set from nearest_anchor."""
|
||||
evt = make_firms_event(nearest_anchor="MHR")
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.region == "MHR"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# SPATIAL KEY TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_to_event_group_key_is_spatial_grid(adapter):
|
||||
"""Group key is spatial grid based on rounded lat/lon."""
|
||||
evt = make_firms_event(lat=42.5678, lon=-114.3456)
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.group_key == "firms:42.57:-114.35"
|
||||
|
||||
|
||||
def test_to_event_inhibit_keys_match_group_key(adapter):
|
||||
"""Inhibit keys contain the same spatial key as group_key."""
|
||||
evt = make_firms_event(lat=42.5678, lon=-114.3456)
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.group_key in event.inhibit_keys
|
||||
|
||||
|
||||
def test_two_nearby_detections_share_group_key(adapter):
|
||||
"""Two detections in same grid cell share group_key."""
|
||||
# Both round to 42.57:-114.35
|
||||
evt1 = make_firms_event(lat=42.571, lon=-114.351)
|
||||
evt2 = make_firms_event(lat=42.572, lon=-114.352)
|
||||
event1 = adapter.to_event(evt1)
|
||||
event2 = adapter.to_event(evt2)
|
||||
assert event1 is not None
|
||||
assert event2 is not None
|
||||
assert event1.group_key == event2.group_key
|
||||
|
||||
|
||||
# ============================================================
|
||||
# DEFENSIVE TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_to_event_missing_coords_returns_none(adapter):
|
||||
"""Missing coordinates returns None."""
|
||||
evt = make_firms_event()
|
||||
evt["lat"] = None
|
||||
event = adapter.to_event(evt)
|
||||
assert event is None
|
||||
|
||||
|
||||
def test_to_event_missing_properties_returns_event(adapter):
|
||||
"""Missing properties dict defaults to wildfire_proximity."""
|
||||
evt = {
|
||||
"source": "firms",
|
||||
"event_id": "test",
|
||||
"event_type": "Fire Hotspot",
|
||||
"severity": "routine",
|
||||
"headline": "Test",
|
||||
"lat": 42.5,
|
||||
"lon": -114.5,
|
||||
"fetched_at": time.time(),
|
||||
}
|
||||
# No "properties" key at all
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.category == "wildfire_proximity"
|
||||
|
||||
|
||||
def test_to_event_does_not_raise_on_corrupted_dict(adapter):
|
||||
"""Corrupted dict returns None without raising."""
|
||||
evt = {"garbage": True}
|
||||
# Should not raise
|
||||
event = adapter.to_event(evt)
|
||||
assert event is None
|
||||
|
|
@ -1,277 +0,0 @@
|
|||
"""Tests for NWS adapter Phase 2.6 — to_event() and _derive_category()."""
|
||||
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from meshai.env.nws import NWSAlertsAdapter
|
||||
from meshai.notifications.events import Event
|
||||
|
||||
|
||||
# ============================================================
|
||||
# FIXTURES
|
||||
# ============================================================
|
||||
|
||||
@pytest.fixture
|
||||
def mock_config():
|
||||
"""Create a mock NWSConfig."""
|
||||
config = MagicMock()
|
||||
config.areas = ["ID"]
|
||||
config.user_agent = "(test, test@example.com)"
|
||||
config.severity_min = "moderate"
|
||||
config.tick_seconds = 60
|
||||
return config
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def adapter(mock_config):
|
||||
"""Create an NWSAlertsAdapter with mocked config."""
|
||||
return NWSAlertsAdapter(mock_config)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# _derive_category TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_derive_category_warning(adapter):
|
||||
"""Warning suffix maps to weather_warning."""
|
||||
assert adapter._derive_category("Tornado Warning") == "weather_warning"
|
||||
assert adapter._derive_category("Red Flag Warning") == "weather_warning"
|
||||
assert adapter._derive_category("Winter Storm Warning") == "weather_warning"
|
||||
|
||||
|
||||
def test_derive_category_watch(adapter):
|
||||
"""Watch suffix maps to weather_watch."""
|
||||
assert adapter._derive_category("Tornado Watch") == "weather_watch"
|
||||
assert adapter._derive_category("Winter Storm Watch") == "weather_watch"
|
||||
assert adapter._derive_category("Fire Weather Watch") == "weather_watch"
|
||||
|
||||
|
||||
def test_derive_category_advisory(adapter):
|
||||
"""Advisory suffix maps to weather_advisory."""
|
||||
assert adapter._derive_category("Wind Advisory") == "weather_advisory"
|
||||
assert adapter._derive_category("Heat Advisory") == "weather_advisory"
|
||||
assert adapter._derive_category("Frost Advisory") == "weather_advisory"
|
||||
|
||||
|
||||
def test_derive_category_statement(adapter):
|
||||
"""Non-standard suffixes map to weather_statement."""
|
||||
assert adapter._derive_category("Special Weather Statement") == "weather_statement"
|
||||
assert adapter._derive_category("Short Term Forecast") == "weather_statement"
|
||||
assert adapter._derive_category("Hazardous Weather Outlook") == "weather_statement"
|
||||
|
||||
|
||||
def test_derive_category_case_insensitive(adapter):
|
||||
"""Category derivation is case-insensitive."""
|
||||
assert adapter._derive_category("TORNADO WARNING") == "weather_warning"
|
||||
assert adapter._derive_category("winter storm watch") == "weather_watch"
|
||||
assert adapter._derive_category("Wind ADVISORY") == "weather_advisory"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# to_event TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_to_event_returns_event_instance(adapter):
|
||||
"""to_event returns an Event instance."""
|
||||
raw = {
|
||||
"source": "nws",
|
||||
"event_id": "urn:oid:2.49.0.1.840.0.abc123",
|
||||
"event_type": "Tornado Warning",
|
||||
"severity": "extreme",
|
||||
"headline": "Tornado Warning issued for Ada County",
|
||||
"description": "A tornado warning has been issued...",
|
||||
"onset": time.time(),
|
||||
"expires": time.time() + 3600,
|
||||
"areas": ["IDZ016"],
|
||||
"lat": 43.615,
|
||||
"lon": -116.2023,
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert isinstance(event, Event)
|
||||
|
||||
|
||||
def test_to_event_sets_source_nws(adapter):
|
||||
"""to_event sets source to 'nws'."""
|
||||
raw = {
|
||||
"source": "nws",
|
||||
"event_id": "test-id",
|
||||
"event_type": "Wind Advisory",
|
||||
"severity": "moderate",
|
||||
"headline": "Wind Advisory",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.source == "nws"
|
||||
|
||||
|
||||
def test_to_event_derives_category_from_event_type(adapter):
|
||||
"""to_event uses _derive_category for category field."""
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Winter Storm Watch",
|
||||
"severity": "severe",
|
||||
"headline": "Winter Storm Watch",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.category == "weather_watch"
|
||||
|
||||
|
||||
def test_to_event_maps_severity(adapter):
|
||||
"""to_event maps NWS severity to 3-level system."""
|
||||
# Extreme -> immediate
|
||||
raw = {"event_id": "1", "event_type": "Tornado Warning", "severity": "extreme", "headline": "test"}
|
||||
assert adapter.to_event(raw).severity == "immediate"
|
||||
|
||||
# Severe -> priority
|
||||
raw = {"event_id": "2", "event_type": "Tornado Warning", "severity": "severe", "headline": "test"}
|
||||
assert adapter.to_event(raw).severity == "priority"
|
||||
|
||||
# Moderate -> routine
|
||||
raw = {"event_id": "3", "event_type": "Wind Advisory", "severity": "moderate", "headline": "test"}
|
||||
assert adapter.to_event(raw).severity == "routine"
|
||||
|
||||
|
||||
def test_to_event_sets_title_from_headline(adapter):
|
||||
"""to_event uses headline as title."""
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Heat Advisory",
|
||||
"severity": "moderate",
|
||||
"headline": "Heat Advisory issued for Magic Valley",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.title == "Heat Advisory issued for Magic Valley"
|
||||
|
||||
|
||||
def test_to_event_sets_body_from_description(adapter):
|
||||
"""to_event uses description as body."""
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Heat Advisory",
|
||||
"severity": "moderate",
|
||||
"headline": "Heat Advisory",
|
||||
"description": "Dangerously hot conditions expected...",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.body == "Dangerously hot conditions expected..."
|
||||
|
||||
|
||||
def test_to_event_sets_effective_and_expires(adapter):
|
||||
"""to_event sets effective and expires from onset/expires."""
|
||||
onset = time.time()
|
||||
expires = time.time() + 7200
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Wind Advisory",
|
||||
"severity": "moderate",
|
||||
"headline": "Wind Advisory",
|
||||
"onset": onset,
|
||||
"expires": expires,
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.effective == onset
|
||||
assert event.expires == expires
|
||||
|
||||
|
||||
def test_to_event_sets_lat_lon(adapter):
|
||||
"""to_event sets lat/lon from raw event."""
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Tornado Warning",
|
||||
"severity": "extreme",
|
||||
"headline": "Tornado Warning",
|
||||
"lat": 43.615,
|
||||
"lon": -116.2023,
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.lat == 43.615
|
||||
assert event.lon == -116.2023
|
||||
|
||||
|
||||
def test_to_event_sets_nws_zones(adapter):
|
||||
"""to_event sets nws_zones from areas."""
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Red Flag Warning",
|
||||
"severity": "severe",
|
||||
"headline": "Red Flag Warning",
|
||||
"areas": ["IDZ016", "IDZ030", "IDZ031"],
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.nws_zones == ["IDZ016", "IDZ030", "IDZ031"]
|
||||
|
||||
|
||||
def test_to_event_sets_group_key_from_event_id(adapter):
|
||||
"""to_event sets group_key to event_id for dedup."""
|
||||
raw = {
|
||||
"event_id": "urn:oid:2.49.0.1.840.0.abc123",
|
||||
"event_type": "Tornado Warning",
|
||||
"severity": "extreme",
|
||||
"headline": "Tornado Warning",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.group_key == "urn:oid:2.49.0.1.840.0.abc123"
|
||||
|
||||
|
||||
def test_to_event_warning_sets_inhibit_keys(adapter):
|
||||
"""Warnings set inhibit_keys for corresponding Watch/Advisory."""
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Winter Storm Warning",
|
||||
"severity": "severe",
|
||||
"headline": "Winter Storm Warning",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert "nws:Winter Storm Watch" in event.inhibit_keys
|
||||
assert "nws:Winter Storm Advisory" in event.inhibit_keys
|
||||
|
||||
|
||||
def test_to_event_watch_no_inhibit_keys(adapter):
|
||||
"""Watches do not set inhibit_keys."""
|
||||
raw = {
|
||||
"event_id": "test",
|
||||
"event_type": "Tornado Watch",
|
||||
"severity": "moderate",
|
||||
"headline": "Tornado Watch",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.inhibit_keys == []
|
||||
|
||||
|
||||
def test_to_event_preserves_raw_in_data(adapter):
|
||||
"""to_event preserves raw event dict in data field."""
|
||||
raw = {
|
||||
"event_id": "test-123",
|
||||
"event_type": "Wind Advisory",
|
||||
"severity": "moderate",
|
||||
"headline": "Wind Advisory",
|
||||
"custom_field": "custom_value",
|
||||
}
|
||||
event = adapter.to_event(raw)
|
||||
assert event.data == raw
|
||||
assert event.data["custom_field"] == "custom_value"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# INTEGRATION: _map_nws_severity
|
||||
# ============================================================
|
||||
|
||||
def test_map_nws_severity_extreme_to_immediate(adapter):
|
||||
"""Extreme NWS severity maps to immediate."""
|
||||
assert adapter._map_nws_severity("extreme") == "immediate"
|
||||
|
||||
|
||||
def test_map_nws_severity_severe_to_priority(adapter):
|
||||
"""Severe NWS severity maps to priority."""
|
||||
assert adapter._map_nws_severity("severe") == "priority"
|
||||
|
||||
|
||||
def test_map_nws_severity_moderate_to_routine(adapter):
|
||||
"""Moderate NWS severity maps to routine."""
|
||||
assert adapter._map_nws_severity("moderate") == "routine"
|
||||
|
||||
|
||||
def test_map_nws_severity_minor_to_routine(adapter):
|
||||
"""Minor NWS severity maps to routine."""
|
||||
assert adapter._map_nws_severity("minor") == "routine"
|
||||
|
|
@ -1,214 +0,0 @@
|
|||
"""Tests for channel-renderer integration (Phase 2.5b)."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from meshai.notifications.events import NotificationPayload
|
||||
from meshai.notifications.channels import (
|
||||
MeshBroadcastChannel,
|
||||
MeshDMChannel,
|
||||
EmailChannel,
|
||||
WebhookChannel,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# MESH CHANNEL RENDERING TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_mesh_channel_uses_mesh_renderer():
|
||||
"""MeshBroadcastChannel renders long messages to multiple chunks."""
|
||||
mock_connector = MagicMock()
|
||||
|
||||
channel = MeshBroadcastChannel(
|
||||
connector=mock_connector,
|
||||
channel_index=0,
|
||||
)
|
||||
|
||||
# Build a long message that will require chunking
|
||||
long_message = "This is a very long alert message that exceeds the character limit. " * 5
|
||||
|
||||
payload = NotificationPayload(
|
||||
message=long_message,
|
||||
category="weather_warning",
|
||||
severity="priority",
|
||||
timestamp=time.time(),
|
||||
event_type="weather_warning",
|
||||
)
|
||||
|
||||
asyncio.run(channel.deliver(payload, None))
|
||||
|
||||
# Should have called send_message multiple times (once per chunk)
|
||||
assert mock_connector.send_message.call_count >= 2
|
||||
|
||||
# Each call's text should be <= 200 chars
|
||||
for call in mock_connector.send_message.call_args_list:
|
||||
text = call.kwargs.get("text", call.args[0] if call.args else "")
|
||||
assert len(text) <= 200
|
||||
|
||||
|
||||
def test_mesh_channel_uses_payload_message_directly_when_chunk_metadata_set():
|
||||
"""Pre-chunked payloads (from digest) skip re-rendering."""
|
||||
mock_connector = MagicMock()
|
||||
|
||||
channel = MeshBroadcastChannel(
|
||||
connector=mock_connector,
|
||||
channel_index=0,
|
||||
)
|
||||
|
||||
# Payload with chunk metadata set (from digest scheduler)
|
||||
payload = NotificationPayload(
|
||||
message="pre-chunked text",
|
||||
category="digest",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
chunk_index=1,
|
||||
chunk_total=3,
|
||||
)
|
||||
|
||||
asyncio.run(channel.deliver(payload, None))
|
||||
|
||||
# Should have called send_message exactly once
|
||||
assert mock_connector.send_message.call_count == 1
|
||||
# Should use the message directly
|
||||
call = mock_connector.send_message.call_args
|
||||
text = call.kwargs.get("text", call.args[0] if call.args else "")
|
||||
assert text == "pre-chunked text"
|
||||
|
||||
|
||||
def test_mesh_dm_channel_uses_mesh_renderer():
|
||||
"""MeshDMChannel renders long messages to chunks for each recipient."""
|
||||
mock_connector = MagicMock()
|
||||
|
||||
channel = MeshDMChannel(
|
||||
connector=mock_connector,
|
||||
node_ids=["!node1", "!node2"],
|
||||
)
|
||||
|
||||
long_message = "This is a long DM message that should be chunked. " * 4
|
||||
|
||||
payload = NotificationPayload(
|
||||
message=long_message,
|
||||
category="test",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
)
|
||||
|
||||
asyncio.run(channel.deliver(payload, None))
|
||||
|
||||
# Should have called send_message multiple times
|
||||
# (chunks * nodes)
|
||||
assert mock_connector.send_message.call_count >= 2
|
||||
|
||||
|
||||
def test_mesh_dm_channel_uses_payload_message_directly_when_chunk_metadata_set():
|
||||
"""Pre-chunked DM payloads skip re-rendering."""
|
||||
mock_connector = MagicMock()
|
||||
|
||||
channel = MeshDMChannel(
|
||||
connector=mock_connector,
|
||||
node_ids=["!node1"],
|
||||
)
|
||||
|
||||
payload = NotificationPayload(
|
||||
message="pre-chunked DM",
|
||||
category="digest",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
chunk_index=2,
|
||||
chunk_total=5,
|
||||
)
|
||||
|
||||
asyncio.run(channel.deliver(payload, None))
|
||||
|
||||
# Should use message directly, once per node
|
||||
assert mock_connector.send_message.call_count == 1
|
||||
call = mock_connector.send_message.call_args
|
||||
text = call.kwargs.get("text", call.args[0] if call.args else "")
|
||||
assert text == "pre-chunked DM"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# EMAIL CHANNEL RENDERING TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_email_channel_uses_email_renderer():
|
||||
"""EmailChannel uses renderer for subject and body."""
|
||||
channel = EmailChannel(
|
||||
smtp_host="localhost",
|
||||
smtp_port=25,
|
||||
smtp_user="",
|
||||
smtp_password="",
|
||||
smtp_tls=False,
|
||||
from_address="test@example.com",
|
||||
recipients=["user@example.com"],
|
||||
)
|
||||
|
||||
payload = NotificationPayload(
|
||||
message="Test alert message",
|
||||
category="weather_warning",
|
||||
severity="immediate",
|
||||
timestamp=time.time(),
|
||||
event_type="weather_warning",
|
||||
)
|
||||
|
||||
# Mock the _send_email method
|
||||
with patch.object(channel, "_send_email") as mock_send:
|
||||
asyncio.run(channel.deliver(payload, None))
|
||||
|
||||
# Should have been called with renderer output
|
||||
mock_send.assert_called_once()
|
||||
call_args = mock_send.call_args
|
||||
subject = call_args.args[0]
|
||||
body = call_args.args[1]
|
||||
|
||||
# Renderer format checks
|
||||
assert "[MeshAI]" in subject
|
||||
assert "IMMEDIATE" in subject
|
||||
assert "Test alert message" in body
|
||||
assert "Severity:" in body
|
||||
|
||||
|
||||
# ============================================================
|
||||
# WEBHOOK CHANNEL RENDERING TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_webhook_channel_uses_webhook_renderer():
|
||||
"""WebhookChannel uses renderer for JSON payload."""
|
||||
channel = WebhookChannel(
|
||||
url="https://example.com/webhook",
|
||||
headers={},
|
||||
)
|
||||
|
||||
payload = NotificationPayload(
|
||||
message="Test webhook message",
|
||||
category="test",
|
||||
severity="priority",
|
||||
timestamp=time.time(),
|
||||
event_type="battery_warning",
|
||||
)
|
||||
|
||||
# Mock httpx
|
||||
with patch("meshai.notifications.channels.httpx.AsyncClient") as mock_client_class:
|
||||
mock_client = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_client.post = AsyncMock(return_value=mock_response)
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
asyncio.run(channel.deliver(payload, None))
|
||||
|
||||
# Check the POST was called
|
||||
mock_client.post.assert_called_once()
|
||||
call_kwargs = mock_client.post.call_args.kwargs
|
||||
|
||||
# Should have JSON payload with schema_version
|
||||
json_payload = call_kwargs.get("json", {})
|
||||
assert "schema_version" in json_payload
|
||||
assert json_payload["schema_version"] == "1.0"
|
||||
assert json_payload["message"] == "Test webhook message"
|
||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,9 +1,6 @@
|
|||
"""Tests for DigestScheduler (Phase 2.3b + 2.4).
|
||||
"""Tests for DigestScheduler (Phase 2.3b).
|
||||
|
||||
Uses asyncio.run() since pytest-asyncio is not available in the container.
|
||||
|
||||
Updated in Phase 2.4: render_digest is now async, accumulator mocks
|
||||
must return awaitables.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
|
@ -11,12 +8,12 @@ import time
|
|||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
from unittest.mock import MagicMock, AsyncMock, call
|
||||
from unittest.mock import MagicMock, call
|
||||
|
||||
import pytest
|
||||
|
||||
from meshai.notifications.events import make_event
|
||||
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
|
||||
from meshai.notifications.pipeline.digest import DigestAccumulator
|
||||
from meshai.notifications.pipeline.scheduler import DigestScheduler
|
||||
|
||||
|
||||
|
|
@ -60,15 +57,8 @@ class MockChannel:
|
|||
def __init__(self):
|
||||
self.deliveries = []
|
||||
|
||||
async def deliver(self, payload, rule=None):
|
||||
def deliver(self, payload: dict):
|
||||
self.deliveries.append(payload)
|
||||
return True
|
||||
|
||||
|
||||
class MockLLMBackend:
|
||||
"""Mock LLM backend for accumulator."""
|
||||
async def generate(self, messages, system_prompt, max_tokens=200):
|
||||
return "Mock summary."
|
||||
|
||||
|
||||
def make_scheduler(
|
||||
|
|
@ -94,14 +84,13 @@ def make_scheduler(
|
|||
|
||||
channels = {}
|
||||
|
||||
def channel_factory(rule, connector=None):
|
||||
def channel_factory(rule):
|
||||
ch = MockChannel()
|
||||
channels[rule.name] = ch
|
||||
return ch
|
||||
|
||||
if accumulator is None:
|
||||
# Use mock LLM backend for async render_digest
|
||||
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
|
||||
accumulator = DigestAccumulator()
|
||||
|
||||
scheduler = DigestScheduler(
|
||||
accumulator=accumulator,
|
||||
|
|
@ -135,31 +124,37 @@ class TestScheduleComputation:
|
|||
def test_parse_schedule_invalid_falls_back(self):
|
||||
"""Invalid schedules fall back to 07:00."""
|
||||
scheduler, _, _ = make_scheduler()
|
||||
# Bad format
|
||||
assert scheduler._parse_schedule("7:00:00") == (7, 0)
|
||||
assert scheduler._parse_schedule("invalid") == (7, 0)
|
||||
assert scheduler._parse_schedule("") == (7, 0)
|
||||
# Out of range
|
||||
assert scheduler._parse_schedule("25:00") == (7, 0)
|
||||
assert scheduler._parse_schedule("12:60") == (7, 0)
|
||||
|
||||
def test_next_fire_at_future_today(self):
|
||||
"""If schedule time is later today, returns today's timestamp."""
|
||||
# Set clock to 06:00 on a known date
|
||||
base_dt = datetime(2024, 6, 15, 6, 0, 0)
|
||||
base_ts = base_dt.timestamp()
|
||||
|
||||
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
|
||||
next_fire = scheduler._next_fire_at(base_ts)
|
||||
|
||||
# Should be 07:00 same day
|
||||
expected_dt = datetime(2024, 6, 15, 7, 0, 0)
|
||||
assert abs(next_fire - expected_dt.timestamp()) < 1
|
||||
|
||||
def test_next_fire_at_past_today_schedules_tomorrow(self):
|
||||
"""If schedule time has passed today, returns tomorrow's timestamp."""
|
||||
# Set clock to 08:00 on a known date
|
||||
base_dt = datetime(2024, 6, 15, 8, 0, 0)
|
||||
base_ts = base_dt.timestamp()
|
||||
|
||||
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
|
||||
next_fire = scheduler._next_fire_at(base_ts)
|
||||
|
||||
# Should be 07:00 next day
|
||||
expected_dt = datetime(2024, 6, 16, 7, 0, 0)
|
||||
assert abs(next_fire - expected_dt.timestamp()) < 1
|
||||
|
||||
|
|
@ -171,6 +166,7 @@ class TestScheduleComputation:
|
|||
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
|
||||
next_fire = scheduler._next_fire_at(base_ts)
|
||||
|
||||
# Should be 07:00 next day
|
||||
expected_dt = datetime(2024, 6, 16, 7, 0, 0)
|
||||
assert abs(next_fire - expected_dt.timestamp()) < 1
|
||||
|
||||
|
|
@ -185,7 +181,7 @@ class TestScheduleComputation:
|
|||
config.notifications.digest = None
|
||||
|
||||
scheduler = DigestScheduler(
|
||||
accumulator=DigestAccumulator(llm_backend=MockLLMBackend()),
|
||||
accumulator=DigestAccumulator(),
|
||||
config=config,
|
||||
channel_factory=lambda r: MockChannel(),
|
||||
)
|
||||
|
|
@ -199,7 +195,8 @@ class TestFireBehavior:
|
|||
|
||||
def test_fire_delivers_to_matching_rule(self):
|
||||
"""_fire() delivers digest to rules with schedule_match='digest'."""
|
||||
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
|
||||
accumulator = DigestAccumulator()
|
||||
# Add an event so digest has content
|
||||
accumulator.enqueue(make_event(
|
||||
source="test",
|
||||
category="weather_warning",
|
||||
|
|
@ -224,8 +221,9 @@ class TestFireBehavior:
|
|||
ch = channels["digest-mesh"]
|
||||
assert len(ch.deliveries) == 1
|
||||
payload = ch.deliveries[0]
|
||||
assert payload.category == "digest"
|
||||
assert payload.severity == "routine"
|
||||
assert payload["category"] == "digest"
|
||||
assert payload["severity"] == "routine"
|
||||
assert "Test alert" in payload["message"] or "Weather" in payload["message"]
|
||||
|
||||
def test_fire_skips_disabled_rules(self):
|
||||
"""Disabled rules are not delivered to."""
|
||||
|
|
@ -238,6 +236,7 @@ class TestFireBehavior:
|
|||
|
||||
asyncio.run(run_fire())
|
||||
|
||||
# Channel should not be created for disabled rule
|
||||
assert "disabled" not in channels
|
||||
|
||||
def test_fire_skips_non_schedule_rules(self):
|
||||
|
|
@ -266,10 +265,8 @@ class TestFireBehavior:
|
|||
|
||||
def test_fire_mesh_delivery_chunks(self):
|
||||
"""Mesh delivery types get per-chunk delivery."""
|
||||
accumulator = DigestAccumulator(
|
||||
llm_backend=MockLLMBackend(),
|
||||
mesh_char_limit=100,
|
||||
)
|
||||
accumulator = DigestAccumulator(mesh_char_limit=100)
|
||||
# Add multiple events to force chunking
|
||||
for i in range(5):
|
||||
accumulator.enqueue(make_event(
|
||||
source="test",
|
||||
|
|
@ -292,14 +289,16 @@ class TestFireBehavior:
|
|||
asyncio.run(run_fire())
|
||||
|
||||
ch = channels["mesh"]
|
||||
# Should have multiple deliveries (one per chunk)
|
||||
assert len(ch.deliveries) >= 1
|
||||
# Check chunk metadata
|
||||
for payload in ch.deliveries:
|
||||
assert payload.chunk_index is not None
|
||||
assert payload.chunk_total is not None
|
||||
assert "chunk_index" in payload
|
||||
assert "chunk_total" in payload
|
||||
|
||||
def test_fire_email_delivery_full_text(self):
|
||||
"""Email delivery type gets single full-text delivery."""
|
||||
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
|
||||
accumulator = DigestAccumulator()
|
||||
accumulator.enqueue(make_event(
|
||||
source="test",
|
||||
category="weather_warning",
|
||||
|
|
@ -321,8 +320,8 @@ class TestFireBehavior:
|
|||
ch = channels["email"]
|
||||
assert len(ch.deliveries) == 1
|
||||
payload = ch.deliveries[0]
|
||||
assert payload.chunk_index is None
|
||||
assert "--- " in payload.message
|
||||
assert "chunk_index" not in payload
|
||||
assert "--- " in payload["message"] # Full format has header
|
||||
|
||||
def test_fire_updates_last_fire_at(self):
|
||||
"""_fire() updates last_fire_at timestamp."""
|
||||
|
|
@ -351,7 +350,7 @@ class TestFireBehavior:
|
|||
|
||||
ch = channels["mesh"]
|
||||
assert len(ch.deliveries) == 1
|
||||
assert "No alerts" in ch.deliveries[0].message
|
||||
assert "No alerts" in ch.deliveries[0]["message"]
|
||||
|
||||
|
||||
# ---- Lifecycle Tests ----
|
||||
|
|
@ -403,7 +402,9 @@ class TestLifecycle:
|
|||
scheduler, _, _ = make_scheduler()
|
||||
|
||||
async def run_stop():
|
||||
# Never started
|
||||
await scheduler.stop()
|
||||
# Should not raise
|
||||
|
||||
asyncio.run(run_stop())
|
||||
|
||||
|
|
@ -413,8 +414,10 @@ class TestLifecycle:
|
|||
|
||||
async def fake_sleep(duration):
|
||||
sleep_calls.append(duration)
|
||||
# Actually sleep briefly so we can cancel
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# Set clock far from schedule time to get long sleep
|
||||
base_dt = datetime(2024, 6, 15, 8, 0, 0)
|
||||
scheduler, _, _ = make_scheduler(
|
||||
schedule="07:00",
|
||||
|
|
@ -424,11 +427,14 @@ class TestLifecycle:
|
|||
|
||||
async def run_test():
|
||||
await scheduler.start()
|
||||
# Give task time to enter sleep
|
||||
await asyncio.sleep(0.05)
|
||||
await scheduler.stop()
|
||||
|
||||
asyncio.run(run_test())
|
||||
|
||||
# Task should have exited cleanly
|
||||
|
||||
|
||||
# ---- Integration Tests ----
|
||||
|
||||
|
|
@ -438,8 +444,9 @@ class TestIntegration:
|
|||
def test_scheduler_fires_on_schedule(self):
|
||||
"""Scheduler fires when schedule time arrives."""
|
||||
fire_times = []
|
||||
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
|
||||
accumulator = DigestAccumulator()
|
||||
|
||||
# Start at 06:59:59.95 (50ms before 07:00), delay will be ~50ms
|
||||
clock_time = [datetime(2024, 6, 15, 6, 59, 59, 950000).timestamp()]
|
||||
|
||||
def fake_clock():
|
||||
|
|
@ -451,27 +458,31 @@ class TestIntegration:
|
|||
accumulator=accumulator,
|
||||
)
|
||||
|
||||
# Track when fire happens
|
||||
original_fire = scheduler._fire
|
||||
|
||||
async def tracking_fire(now):
|
||||
fire_times.append(now)
|
||||
await original_fire(now)
|
||||
# After first fire, advance clock so next cycle has long delay
|
||||
clock_time[0] = datetime(2024, 6, 15, 8, 0, 0).timestamp()
|
||||
|
||||
scheduler._fire = tracking_fire
|
||||
|
||||
async def run_test():
|
||||
await scheduler.start()
|
||||
# Wait for the ~50ms delay plus some buffer
|
||||
await asyncio.sleep(0.2)
|
||||
await scheduler.stop()
|
||||
|
||||
asyncio.run(run_test())
|
||||
|
||||
# Should have fired once
|
||||
assert len(fire_times) >= 1
|
||||
|
||||
def test_scheduler_multiple_rules(self):
|
||||
"""Scheduler delivers to multiple matching rules."""
|
||||
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
|
||||
accumulator = DigestAccumulator()
|
||||
accumulator.enqueue(make_event(
|
||||
source="test",
|
||||
category="weather_warning",
|
||||
|
|
@ -496,6 +507,7 @@ class TestIntegration:
|
|||
|
||||
asyncio.run(run_fire())
|
||||
|
||||
# All three should have received deliveries
|
||||
assert "mesh1" in channels
|
||||
assert "mesh2" in channels
|
||||
assert "email" in channels
|
||||
|
|
@ -505,7 +517,7 @@ class TestIntegration:
|
|||
|
||||
def test_scheduler_handles_delivery_error(self):
|
||||
"""Scheduler continues after delivery error."""
|
||||
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
|
||||
accumulator = DigestAccumulator()
|
||||
accumulator.enqueue(make_event(
|
||||
source="test",
|
||||
category="weather_warning",
|
||||
|
|
@ -521,11 +533,11 @@ class TestIntegration:
|
|||
|
||||
call_order = []
|
||||
|
||||
def bad_channel_factory(rule, connector=None):
|
||||
def bad_channel_factory(rule):
|
||||
call_order.append(rule.name)
|
||||
if rule.name == "bad":
|
||||
ch = MagicMock()
|
||||
ch.deliver = AsyncMock(side_effect=RuntimeError("delivery failed"))
|
||||
ch.deliver.side_effect = RuntimeError("delivery failed")
|
||||
return ch
|
||||
return MockChannel()
|
||||
|
||||
|
|
@ -542,6 +554,7 @@ class TestIntegration:
|
|||
|
||||
asyncio.run(run_fire())
|
||||
|
||||
# Both rules should have been attempted
|
||||
assert "bad" in call_order
|
||||
assert "good" in call_order
|
||||
|
||||
|
|
|
|||
|
|
@ -2,16 +2,10 @@
|
|||
|
||||
These tests verify the core routing and dispatch behavior of the
|
||||
notification pipeline without requiring real channel backends.
|
||||
|
||||
Updated in Phase 2.4: Events now go to BOTH dispatcher and accumulator
|
||||
(no severity-based fork). SeverityRouter class kept for backward
|
||||
compatibility but not used in production wiring.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, AsyncMock, patch
|
||||
from unittest.mock import Mock, patch
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from meshai.notifications.events import Event, make_event
|
||||
|
|
@ -45,7 +39,6 @@ class ConfigStub:
|
|||
class TestImmediateDispatch:
|
||||
|
||||
def test_immediate_event_with_matching_rule_dispatches(self):
|
||||
"""Immediate events reach the dispatcher and get delivered."""
|
||||
rule = NotificationRuleConfigStub(
|
||||
enabled=True,
|
||||
trigger_type="condition",
|
||||
|
|
@ -57,10 +50,15 @@ class TestImmediateDispatch:
|
|||
notifications=NotificationsConfigStub(rules=[rule])
|
||||
)
|
||||
mock_channel = Mock()
|
||||
mock_channel.deliver = AsyncMock(return_value=True)
|
||||
mock_factory = Mock(return_value=mock_channel)
|
||||
bus = EventBus()
|
||||
dispatcher = Dispatcher(config, mock_factory)
|
||||
digest = StubDigestQueue()
|
||||
router = SeverityRouter(
|
||||
immediate_handler=dispatcher.dispatch,
|
||||
digest_handler=digest.enqueue,
|
||||
)
|
||||
bus.subscribe(router.handle)
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="test_cat",
|
||||
|
|
@ -68,99 +66,80 @@ class TestImmediateDispatch:
|
|||
title="Test Alert",
|
||||
summary="Test summary message",
|
||||
)
|
||||
# Run dispatch in async context since it's now async
|
||||
asyncio.run(dispatcher.dispatch(event))
|
||||
bus.emit(event)
|
||||
assert mock_channel.deliver.call_count == 1
|
||||
alert = mock_channel.deliver.call_args[0][0]
|
||||
assert alert.category == "test_cat"
|
||||
assert alert.severity == "immediate"
|
||||
assert alert.message
|
||||
assert alert["category"] == "test_cat"
|
||||
assert alert["severity"] == "immediate"
|
||||
assert alert["message"]
|
||||
|
||||
|
||||
class TestTeeRouting:
|
||||
"""Phase 2.4: Events go to BOTH dispatcher and accumulator."""
|
||||
class TestDigestRouting:
|
||||
|
||||
def test_routine_event_goes_to_both_dispatcher_and_accumulator(self):
|
||||
"""Routine events reach both dispatcher and accumulator in Phase 2.4."""
|
||||
def test_routine_event_goes_to_digest_not_dispatcher(self):
|
||||
rule = NotificationRuleConfigStub(
|
||||
enabled=True,
|
||||
trigger_type="condition",
|
||||
categories=["test_cat"],
|
||||
min_severity="routine",
|
||||
delivery_type="mesh_broadcast",
|
||||
)
|
||||
config = ConfigStub(
|
||||
notifications=NotificationsConfigStub(rules=[rule])
|
||||
)
|
||||
mock_channel = Mock()
|
||||
mock_channel.deliver = AsyncMock(return_value=True)
|
||||
mock_factory = Mock(return_value=mock_channel)
|
||||
|
||||
# Create dispatcher
|
||||
mock_factory = Mock()
|
||||
bus = EventBus()
|
||||
dispatcher = Dispatcher(config, mock_factory)
|
||||
|
||||
# Create accumulator mock
|
||||
accumulator_calls = []
|
||||
def mock_enqueue(event):
|
||||
accumulator_calls.append(event)
|
||||
|
||||
digest = StubDigestQueue()
|
||||
with patch.object(dispatcher, "dispatch", wraps=dispatcher.dispatch) as mock_dispatch:
|
||||
router = SeverityRouter(
|
||||
immediate_handler=mock_dispatch,
|
||||
digest_handler=digest.enqueue,
|
||||
)
|
||||
bus.subscribe(router.handle)
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="test_cat",
|
||||
severity="routine",
|
||||
title="Routine Alert",
|
||||
)
|
||||
bus.emit(event)
|
||||
assert len(digest) == 1
|
||||
mock_dispatch.assert_not_called()
|
||||
|
||||
# Run dispatch in async context
|
||||
asyncio.run(dispatcher.dispatch(event))
|
||||
mock_enqueue(event)
|
||||
|
||||
# Both paths received the event
|
||||
assert len(accumulator_calls) == 1
|
||||
# Dispatcher found a matching rule and delivered
|
||||
assert mock_channel.deliver.call_count == 1
|
||||
|
||||
def test_priority_event_goes_to_both_dispatcher_and_accumulator(self):
|
||||
"""Priority events reach both dispatcher and accumulator in Phase 2.4."""
|
||||
def test_priority_event_goes_to_digest_not_dispatcher(self):
|
||||
rule = NotificationRuleConfigStub(
|
||||
enabled=True,
|
||||
trigger_type="condition",
|
||||
categories=["test_cat"],
|
||||
min_severity="routine",
|
||||
delivery_type="mesh_broadcast",
|
||||
)
|
||||
config = ConfigStub(
|
||||
notifications=NotificationsConfigStub(rules=[rule])
|
||||
)
|
||||
mock_channel = Mock()
|
||||
mock_channel.deliver = AsyncMock(return_value=True)
|
||||
mock_factory = Mock(return_value=mock_channel)
|
||||
|
||||
mock_factory = Mock()
|
||||
bus = EventBus()
|
||||
dispatcher = Dispatcher(config, mock_factory)
|
||||
|
||||
accumulator_calls = []
|
||||
def mock_enqueue(event):
|
||||
accumulator_calls.append(event)
|
||||
|
||||
digest = StubDigestQueue()
|
||||
with patch.object(dispatcher, "dispatch", wraps=dispatcher.dispatch) as mock_dispatch:
|
||||
router = SeverityRouter(
|
||||
immediate_handler=mock_dispatch,
|
||||
digest_handler=digest.enqueue,
|
||||
)
|
||||
bus.subscribe(router.handle)
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="test_cat",
|
||||
severity="priority",
|
||||
title="Priority Alert",
|
||||
)
|
||||
|
||||
# Run dispatch in async context
|
||||
asyncio.run(dispatcher.dispatch(event))
|
||||
mock_enqueue(event)
|
||||
|
||||
assert len(accumulator_calls) == 1
|
||||
assert mock_channel.deliver.call_count == 1
|
||||
bus.emit(event)
|
||||
assert len(digest) == 1
|
||||
mock_dispatch.assert_not_called()
|
||||
|
||||
|
||||
class TestNoMatchingRule:
|
||||
|
||||
def test_immediate_event_with_no_matching_rule_skips_silently(self):
|
||||
"""Events with no matching rules don't crash."""
|
||||
config = ConfigStub(
|
||||
notifications=NotificationsConfigStub(rules=[])
|
||||
)
|
||||
|
|
@ -186,7 +165,6 @@ class TestNoMatchingRule:
|
|||
class TestSubscriberIsolation:
|
||||
|
||||
def test_subscriber_exception_isolation(self):
|
||||
"""Exceptions in one subscriber don't affect others."""
|
||||
bus = EventBus()
|
||||
|
||||
def failing_handler(event):
|
||||
|
|
@ -208,7 +186,6 @@ class TestSubscriberIsolation:
|
|||
class TestUnknownSeverity:
|
||||
|
||||
def test_unknown_severity_dropped_without_crash(self):
|
||||
"""Events with unknown severity are dropped gracefully."""
|
||||
config = ConfigStub(
|
||||
notifications=NotificationsConfigStub(rules=[])
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,132 +0,0 @@
|
|||
"""Tests for ToggleFilter (Phase 2.4)."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock
|
||||
|
||||
from meshai.notifications.events import make_event
|
||||
from meshai.notifications.pipeline.toggle_filter import ToggleFilter
|
||||
from meshai.notifications.pipeline import build_pipeline_components
|
||||
from meshai.config import Config
|
||||
|
||||
|
||||
class TestToggleFilter:
|
||||
"""Unit tests for ToggleFilter."""
|
||||
|
||||
def test_toggle_filter_passes_through_when_enabled_is_none(self):
|
||||
"""Filter with enabled_toggles=None passes all events."""
|
||||
received = []
|
||||
filter_ = ToggleFilter(
|
||||
next_handler=received.append,
|
||||
enabled_toggles=None,
|
||||
)
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="weather_warning",
|
||||
severity="priority",
|
||||
title="Test",
|
||||
)
|
||||
filter_.handle(event)
|
||||
assert len(received) == 1
|
||||
assert received[0] is event
|
||||
|
||||
def test_toggle_filter_drops_event_when_toggle_not_enabled(self):
|
||||
"""Filter drops events whose toggle isn't in enabled set."""
|
||||
received = []
|
||||
filter_ = ToggleFilter(
|
||||
next_handler=received.append,
|
||||
enabled_toggles={"weather"},
|
||||
)
|
||||
# wildfire_proximity maps to "fire" toggle
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="wildfire_proximity",
|
||||
severity="priority",
|
||||
title="Fire",
|
||||
)
|
||||
filter_.handle(event)
|
||||
assert len(received) == 0
|
||||
|
||||
def test_toggle_filter_passes_event_when_toggle_enabled(self):
|
||||
"""Filter passes events whose toggle is in enabled set."""
|
||||
received = []
|
||||
filter_ = ToggleFilter(
|
||||
next_handler=received.append,
|
||||
enabled_toggles={"weather"},
|
||||
)
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="weather_warning",
|
||||
severity="priority",
|
||||
title="Weather",
|
||||
)
|
||||
filter_.handle(event)
|
||||
assert len(received) == 1
|
||||
|
||||
def test_toggle_filter_drops_unknown_category_when_filter_active(self):
|
||||
"""Unknown category maps to 'other', dropped if 'other' not enabled."""
|
||||
received = []
|
||||
filter_ = ToggleFilter(
|
||||
next_handler=received.append,
|
||||
enabled_toggles={"weather"},
|
||||
)
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="bogus_category",
|
||||
severity="priority",
|
||||
title="Unknown",
|
||||
)
|
||||
filter_.handle(event)
|
||||
# "bogus_category" has no toggle mapping, falls back to "other"
|
||||
# "other" is not in enabled set
|
||||
assert len(received) == 0
|
||||
|
||||
def test_toggle_filter_passes_other_when_enabled(self):
|
||||
"""'other' toggle passes unknown categories when enabled."""
|
||||
received = []
|
||||
filter_ = ToggleFilter(
|
||||
next_handler=received.append,
|
||||
enabled_toggles={"other"},
|
||||
)
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="bogus_category",
|
||||
severity="priority",
|
||||
title="Unknown",
|
||||
)
|
||||
filter_.handle(event)
|
||||
assert len(received) == 1
|
||||
|
||||
|
||||
class TestToggleFilterPipelineWiring:
|
||||
"""Integration tests for toggle filter in pipeline."""
|
||||
|
||||
def test_toggle_filter_pipeline_drops_disabled_toggle(self):
|
||||
"""Events for disabled toggles don't reach dispatcher or accumulator."""
|
||||
config = Config()
|
||||
|
||||
# Pass mock LLM backend
|
||||
mock_backend = MagicMock()
|
||||
mock_backend.generate = AsyncMock(return_value="stub summary")
|
||||
|
||||
# Note: without toggles.enabled set, filter is a no-op
|
||||
# This test verifies the wiring is correct
|
||||
bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, mock_backend)
|
||||
|
||||
# Verify toggle_filter is in the chain
|
||||
assert toggle_filter is not None
|
||||
assert hasattr(toggle_filter, 'handle')
|
||||
|
||||
def test_build_pipeline_uses_provided_backend(self):
|
||||
"""build_pipeline_components uses the provided llm_backend."""
|
||||
config = Config()
|
||||
|
||||
# Sentinel backend with unique attribute
|
||||
sentinel = MagicMock()
|
||||
sentinel.unique_marker = "I_AM_THE_SENTINEL"
|
||||
sentinel.generate = AsyncMock(return_value="sentinel summary")
|
||||
|
||||
bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, sentinel)
|
||||
|
||||
# Accumulator should have the exact sentinel instance
|
||||
assert accumulator._llm is sentinel
|
||||
assert accumulator._llm.unique_marker == "I_AM_THE_SENTINEL"
|
||||
|
|
@ -1,317 +0,0 @@
|
|||
"""Tests for Phase 2.5b per-channel-type renderers."""
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from meshai.notifications.events import NotificationPayload, make_event, make_payload_from_event
|
||||
from meshai.notifications.renderers import MeshRenderer, EmailRenderer, WebhookRenderer
|
||||
|
||||
|
||||
# ============================================================
|
||||
# MESH RENDERER TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_mesh_render_short_message_single_chunk():
|
||||
"""Short message produces a single chunk."""
|
||||
payload = NotificationPayload(
|
||||
message="Test alert",
|
||||
category="test",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
)
|
||||
renderer = MeshRenderer()
|
||||
chunks = renderer.render(payload)
|
||||
assert len(chunks) == 1
|
||||
assert "Test alert" in chunks[0]
|
||||
|
||||
|
||||
def test_mesh_render_event_type_prefix():
|
||||
"""Known event type adds toggle label prefix."""
|
||||
payload = NotificationPayload(
|
||||
message="Severe storm",
|
||||
category="weather_warning",
|
||||
severity="priority",
|
||||
timestamp=time.time(),
|
||||
event_type="weather_warning",
|
||||
)
|
||||
renderer = MeshRenderer()
|
||||
chunks = renderer.render(payload)
|
||||
assert len(chunks) == 1
|
||||
assert chunks[0].startswith("[Weather]")
|
||||
|
||||
|
||||
def test_mesh_render_unknown_event_type_no_prefix():
|
||||
"""Unknown event type does not add a prefix."""
|
||||
payload = NotificationPayload(
|
||||
message="Hello",
|
||||
category="made_up_thing",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
event_type="made_up_thing",
|
||||
)
|
||||
renderer = MeshRenderer()
|
||||
chunks = renderer.render(payload)
|
||||
assert len(chunks) == 1
|
||||
assert not chunks[0].startswith("[")
|
||||
|
||||
|
||||
def test_mesh_render_long_message_chunks():
|
||||
"""Long message splits into multiple chunks with counters."""
|
||||
# Build a ~500 char message
|
||||
long_message = "This is a very long alert message that should definitely exceed the two hundred character limit. " * 5
|
||||
payload = NotificationPayload(
|
||||
message=long_message,
|
||||
category="weather_warning",
|
||||
severity="priority",
|
||||
timestamp=time.time(),
|
||||
event_type="weather_warning",
|
||||
)
|
||||
renderer = MeshRenderer()
|
||||
chunks = renderer.render(payload)
|
||||
|
||||
assert len(chunks) >= 2
|
||||
for chunk in chunks:
|
||||
assert len(chunk) <= 200, f"Chunk exceeds limit: {len(chunk)} chars"
|
||||
# Check each chunk ends with "(k/N)" counter
|
||||
for chunk in chunks:
|
||||
assert re.search(r"\(\d+/\d+\)$", chunk), f"Missing counter: {chunk}"
|
||||
|
||||
|
||||
def test_mesh_render_chunks_preserve_full_content():
|
||||
"""Chunking preserves all words from the original message."""
|
||||
# Build a message with distinct tokens
|
||||
words = ["alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf",
|
||||
"hotel", "india", "juliet", "kilo", "lima", "mike", "november",
|
||||
"oscar", "papa", "quebec", "romeo", "sierra", "tango", "uniform",
|
||||
"victor", "whiskey", "xray", "yankee", "zulu"]
|
||||
long_message = " ".join(words * 3) # Repeat to span multiple chunks
|
||||
|
||||
payload = NotificationPayload(
|
||||
message=long_message,
|
||||
category="test",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
)
|
||||
renderer = MeshRenderer()
|
||||
chunks = renderer.render(payload)
|
||||
|
||||
# Join all chunks and remove counters
|
||||
combined = " ".join(chunks)
|
||||
combined = re.sub(r"\s*\(\d+/\d+\)", "", combined)
|
||||
|
||||
# Every original word should appear
|
||||
for word in words:
|
||||
assert word in combined, f"Missing word: {word}"
|
||||
|
||||
|
||||
def test_mesh_render_no_event_type():
|
||||
"""Payload without event_type has no prefix."""
|
||||
payload = NotificationPayload(
|
||||
message="Plain message",
|
||||
category="test",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
event_type=None,
|
||||
)
|
||||
renderer = MeshRenderer()
|
||||
chunks = renderer.render(payload)
|
||||
assert len(chunks) == 1
|
||||
assert chunks[0] == "Plain message"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# EMAIL RENDERER TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_email_render_subject_includes_severity_and_type():
|
||||
"""Subject line includes severity and event type."""
|
||||
payload = NotificationPayload(
|
||||
message="Storm approaching",
|
||||
category="weather_warning",
|
||||
severity="immediate",
|
||||
timestamp=time.time(),
|
||||
event_type="weather_warning",
|
||||
)
|
||||
renderer = EmailRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
|
||||
assert "IMMEDIATE" in rendered["subject"]
|
||||
assert "Weather Warning" in rendered["subject"]
|
||||
|
||||
|
||||
def test_email_render_body_includes_message_and_context():
|
||||
"""Body includes message and structured context fields."""
|
||||
fixed_time = 1700000000.0 # Known timestamp
|
||||
payload = NotificationPayload(
|
||||
message="Test alert message",
|
||||
category="battery_warning",
|
||||
severity="priority",
|
||||
timestamp=fixed_time,
|
||||
event_type="battery_warning",
|
||||
region="Magic Valley",
|
||||
node_name="BLD-MTN",
|
||||
)
|
||||
renderer = EmailRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
body = rendered["body"]
|
||||
|
||||
assert "Test alert message" in body
|
||||
assert "Magic Valley" in body
|
||||
assert "BLD-MTN" in body
|
||||
assert "priority" in body
|
||||
assert "battery_warning" in body
|
||||
# Check formatted date
|
||||
assert "2023-11-14" in body # Date part of timestamp
|
||||
|
||||
|
||||
def test_email_render_omits_missing_context():
|
||||
"""Body omits lines for missing optional fields."""
|
||||
payload = NotificationPayload(
|
||||
message="Minimal alert",
|
||||
category="test",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
# No region, no node, no event_type
|
||||
)
|
||||
renderer = EmailRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
body = rendered["body"]
|
||||
|
||||
assert "Minimal alert" in body
|
||||
assert "Severity: routine" in body
|
||||
assert "Region:" not in body
|
||||
assert "Node:" not in body
|
||||
assert "Category:" not in body
|
||||
|
||||
|
||||
def test_email_render_includes_source_event():
|
||||
"""Body includes source event details when present."""
|
||||
event = make_event(
|
||||
source="weather_adapter",
|
||||
category="weather_warning",
|
||||
severity="priority",
|
||||
title="Severe Storm",
|
||||
summary="Severe storm expected",
|
||||
)
|
||||
payload = make_payload_from_event(event)
|
||||
|
||||
renderer = EmailRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
body = rendered["body"]
|
||||
|
||||
assert "weather_adapter" in body
|
||||
assert "Severe Storm" in body
|
||||
|
||||
|
||||
# ============================================================
|
||||
# WEBHOOK RENDERER TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_webhook_render_has_schema_version():
|
||||
"""Output includes schema_version field."""
|
||||
payload = NotificationPayload(
|
||||
message="Test",
|
||||
category="test",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
)
|
||||
renderer = WebhookRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
|
||||
assert rendered["schema_version"] == "1.0"
|
||||
|
||||
|
||||
def test_webhook_render_omits_none_fields():
|
||||
"""None optional fields are omitted, not set to null."""
|
||||
payload = NotificationPayload(
|
||||
message="Test message",
|
||||
category="test",
|
||||
severity="routine",
|
||||
timestamp=time.time(),
|
||||
# All optional fields default to None
|
||||
)
|
||||
renderer = WebhookRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
|
||||
# Required fields present
|
||||
assert "message" in rendered
|
||||
assert "severity" in rendered
|
||||
assert "timestamp" in rendered
|
||||
assert "schema_version" in rendered
|
||||
|
||||
# Optional fields omitted
|
||||
assert "node_id" not in rendered
|
||||
assert "region" not in rendered
|
||||
assert "chunk_index" not in rendered
|
||||
|
||||
|
||||
def test_webhook_render_includes_source_event_when_present():
|
||||
"""source_event dict included when payload has source event."""
|
||||
event = make_event(
|
||||
source="test_adapter",
|
||||
category="test",
|
||||
severity="routine",
|
||||
title="Test Event",
|
||||
)
|
||||
payload = make_payload_from_event(event)
|
||||
|
||||
renderer = WebhookRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
|
||||
assert "source_event" in rendered
|
||||
assert rendered["source_event"]["id"] == event.id
|
||||
assert rendered["source_event"]["source"] == "test_adapter"
|
||||
|
||||
|
||||
def test_webhook_render_is_json_serializable():
|
||||
"""Rendered output is JSON-serializable (no datetime objects)."""
|
||||
event = make_event(
|
||||
source="test",
|
||||
category="test",
|
||||
severity="routine",
|
||||
title="Test",
|
||||
)
|
||||
payload = make_payload_from_event(event)
|
||||
payload.chunk_index = 1
|
||||
payload.chunk_total = 3
|
||||
payload.region = "Test Region"
|
||||
payload.node_name = "Test-Node"
|
||||
|
||||
renderer = WebhookRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
|
||||
# Should not raise
|
||||
json_str = json.dumps(rendered)
|
||||
assert isinstance(json_str, str)
|
||||
# Verify round-trip
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["message"] == payload.message
|
||||
|
||||
|
||||
def test_webhook_render_includes_optional_fields_when_set():
|
||||
"""Optional fields included when they have values."""
|
||||
payload = NotificationPayload(
|
||||
message="Test",
|
||||
category="test_category",
|
||||
severity="priority",
|
||||
timestamp=time.time(),
|
||||
event_type="battery_warning",
|
||||
node_id="!abc123",
|
||||
node_name="Test-Node",
|
||||
region="Test Region",
|
||||
chunk_index=2,
|
||||
chunk_total=5,
|
||||
)
|
||||
renderer = WebhookRenderer()
|
||||
rendered = renderer.render(payload)
|
||||
|
||||
assert rendered["category"] == "test_category"
|
||||
assert rendered["event_type"] == "battery_warning"
|
||||
assert rendered["node_id"] == "!abc123"
|
||||
assert rendered["node_name"] == "Test-Node"
|
||||
assert rendered["region"] == "Test Region"
|
||||
assert rendered["chunk_index"] == 2
|
||||
assert rendered["chunk_total"] == 5
|
||||
Loading…
Add table
Add a link
Reference in a new issue