diff --git a/meshai/config.py b/meshai/config.py index 0f2d09f..193d652 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -484,14 +484,6 @@ class NotificationRuleConfig: channel_ids: list = field(default_factory=list) - -@dataclass -class TogglesConfig: - """Master toggle filter settings.""" - - enabled: list[str] = field(default_factory=list) # Toggle names that are enabled (empty = all) - - @dataclass class DigestConfig: """Digest scheduler settings.""" @@ -508,7 +500,6 @@ class NotificationsConfig: quiet_hours_enabled: bool = True # Master toggle for quiet hours quiet_hours_start: str = "22:00" quiet_hours_end: str = "06:00" - toggles: TogglesConfig = field(default_factory=TogglesConfig) digest: DigestConfig = field(default_factory=DigestConfig) rules: list = field(default_factory=list) # List of NotificationRuleConfig @@ -681,8 +672,6 @@ def _dict_to_dataclass(cls, data: dict): kwargs[key] = _dict_to_dataclass(FIRMSConfig, value) elif key == "dashboard" and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(DashboardConfig, value) - elif key == "toggles" and isinstance(value, dict): - kwargs[key] = _dict_to_dataclass(TogglesConfig, value) elif key == "digest" and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(DigestConfig, value) elif key == "notifications" and isinstance(value, dict): diff --git a/meshai/env/firms.py b/meshai/env/firms.py index 94be0d3..9c2e50a 100644 --- a/meshai/env/firms.py +++ b/meshai/env/firms.py @@ -3,12 +3,10 @@ import json import logging import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen -from meshai.notifications.events import Event, make_event - if TYPE_CHECKING: from ..config import FIRMSConfig @@ -344,62 +342,6 @@ class FIRMSAdapter: return (None, None) - def to_event(self, evt: dict) -> Optional["Event"]: - """Translate a stored FIRMS event dict into a pipeline Event. - - Args: - evt: Internal event dict from get_events() - - Returns: - Event instance ready for EventBus emission, or None if - the dict is missing required fields (lat/lon). - """ - try: - lat = evt.get("lat") - lon = evt.get("lon") - if lat is None or lon is None: - return None # Can't make a useful Event without coords - - props = evt.get("properties", {}) or {} - is_new_ignition = bool(props.get("new_ignition", False)) - category = "new_ignition" if is_new_ignition else "wildfire_proximity" - - severity = evt.get("severity", "routine") - - title = evt.get("headline", "") or "Fire Hotspot" - - # Build a richer summary including FRP, confidence, distance - summary_parts = [title] - if props.get("frp") is not None: - summary_parts.append(f"FRP {int(props['frp'])} MW") - if props.get("confidence"): - summary_parts.append(f"conf {props['confidence']}") - if props.get("distance_km") is not None and props.get("nearest_anchor"): - summary_parts.append( - f"{int(props['distance_km'])} km from {props['nearest_anchor']}" - ) - summary = " | ".join(summary_parts)[:300] - - spatial_key = f"firms:{round(lat, 2):.2f}:{round(lon, 2):.2f}" - - return make_event( - source="firms", - category=category, - severity=severity, - title=title, - summary=summary, - timestamp=evt.get("fetched_at"), - expires=evt.get("expires"), - region=props.get("nearest_anchor"), - lat=lat, - lon=lon, - group_key=spatial_key, - inhibit_keys=[spatial_key], - ) - except Exception: - logger.exception(f"FIRMS to_event failed for evt: {evt.get('event_id')}") - return None - def get_events(self) -> list: """Get current hotspot events.""" return self._events diff --git a/meshai/env/nws.py b/meshai/env/nws.py index 041dc47..af37a84 100644 --- a/meshai/env/nws.py +++ b/meshai/env/nws.py @@ -4,12 +4,10 @@ import json import logging import time from datetime import datetime -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen -from meshai.notifications.events import Event, make_event - if TYPE_CHECKING: from ..config import NWSConfig @@ -41,71 +39,6 @@ class NWSAlertsAdapter: else: # moderate, minor, unknown return "routine" - def _derive_category(self, event_type: str) -> str: - """Derive notification category from NWS event type suffix. - - NWS event types like "Red Flag Warning", "Winter Storm Watch", - "Wind Advisory" map to our fine-grained weather categories. - - Args: - event_type: NWS event type string (e.g., "Tornado Warning") - - Returns: - Category key: weather_warning, weather_watch, weather_advisory, - or weather_statement - """ - event_type_lower = event_type.lower() - if event_type_lower.endswith("warning"): - return "weather_warning" - elif event_type_lower.endswith("watch"): - return "weather_watch" - elif event_type_lower.endswith("advisory"): - return "weather_advisory" - else: - # Covers "Special Weather Statement", "Short Term Forecast", etc. - return "weather_statement" - - def to_event(self, raw: dict) -> Event: - """Convert internal event dict to pipeline Event. - - Args: - raw: Internal event dict from get_events() - - Returns: - Event instance ready for EventBus emission - """ - event_type = raw.get("event_type", "Unknown") - category = self._derive_category(event_type) - nws_severity = raw.get("severity", "unknown") - severity = self._map_nws_severity(nws_severity) - - # Build group_key for dedup: same alert ID should merge - group_key = raw.get("event_id", "") - - # Build inhibit_keys: a Warning supersedes Watch/Advisory for same hazard - inhibit_keys = [] - if category == "weather_warning": - # Warning inhibits corresponding Watch/Advisory - base = event_type.rsplit(" ", 1)[0] if " " in event_type else event_type - inhibit_keys = [f"nws:{base} Watch", f"nws:{base} Advisory"] - - return make_event( - source="nws", - category=category, - severity=severity, - title=raw.get("headline", event_type), - summary=raw.get("headline", ""), - body=raw.get("description", ""), - effective=raw.get("onset") or None, - expires=raw.get("expires") or None, - lat=raw.get("lat"), - lon=raw.get("lon"), - nws_zones=raw.get("areas", []), - group_key=group_key, - inhibit_keys=inhibit_keys, - data=raw, - ) - def tick(self) -> bool: """Execute one polling tick. diff --git a/meshai/env/store.py b/meshai/env/store.py index a6ea2fd..cb05569 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -2,11 +2,10 @@ import logging import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING if TYPE_CHECKING: from ..config import EnvironmentalConfig - from ..notifications.pipeline import EventBus logger = logging.getLogger(__name__) @@ -14,15 +13,9 @@ logger = logging.getLogger(__name__) class EnvironmentalStore: """Cache and tick-driver for all environmental feed adapters.""" - def __init__( - self, - config: "EnvironmentalConfig", - region_anchors: list = None, - event_bus: Optional["EventBus"] = None, - ): + def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None): self._adapters = {} # name -> adapter instance self._events = {} # (source, event_id) -> event dict - self._event_bus = event_bus # Pipeline EventBus for emission self._swpc_status = {} # Kp/SFI/scales snapshot self._ducting_status = {} # tropo ducting assessment self._mesh_zones = config.nws_zones or [] @@ -94,29 +87,12 @@ class EnvironmentalStore: self._swpc_status = adapter.get_status() # Also ingest any alert events (R-scale >= 3) for evt in adapter.get_events(): - key = (evt["source"], evt["event_id"]) - is_new = key not in self._events - self._events[key] = evt - if is_new and self._event_bus and hasattr(adapter, "to_event"): - self._emit_event(adapter, evt) + self._events[(evt["source"], evt["event_id"])] = evt elif name == "ducting": self._ducting_status = adapter.get_status() else: for evt in adapter.get_events(): - key = (evt["source"], evt["event_id"]) - is_new = key not in self._events - self._events[key] = evt - if is_new and self._event_bus and hasattr(adapter, "to_event"): - self._emit_event(adapter, evt) - - def _emit_event(self, adapter, raw_evt: dict): - """Convert raw event to pipeline Event and emit to bus.""" - try: - event = adapter.to_event(raw_evt) - self._event_bus.emit(event) - logger.debug("Emitted %s event %s to pipeline", event.source, event.id) - except Exception as e: - logger.warning("Failed to emit event to pipeline: %s", e) + self._events[(evt["source"], evt["event_id"])] = evt def _purge_expired(self): """Remove expired events.""" diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index 156a5e5..7275c38 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -174,33 +174,12 @@ ALERT_CATEGORIES = { # Environmental - Weather "weather_warning": { - "name": "Severe Weather Warning", - "description": "NWS Warning affecting your mesh area — highest urgency weather alert", + "name": "Severe Weather", + "description": "NWS warning or advisory affecting your mesh area", "default_severity": "priority", "example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z", "toggle": "weather", }, - "weather_watch": { - "name": "Weather Watch", - "description": "NWS Watch affecting your mesh area — conditions favorable for hazardous weather", - "default_severity": "routine", - "example_message": "⏳ Winter Storm Watch — Wood River Valley. Heavy snow possible Thu night through Fri.", - "toggle": "weather", - }, - "weather_advisory": { - "name": "Weather Advisory", - "description": "NWS Advisory affecting your mesh area — weather may cause inconvenience", - "default_severity": "routine", - "example_message": "ℹ Wind Advisory — Magic Valley. SW winds 25-35 mph with gusts to 50 mph.", - "toggle": "weather", - }, - "weather_statement": { - "name": "Weather Statement", - "description": "NWS Special Weather Statement — general awareness, no specific hazard", - "default_severity": "routine", - "example_message": "📋 Special Weather Statement — Isolated thunderstorms possible this afternoon.", - "toggle": "weather", - }, # Environmental - Space Weather "hf_blackout": { diff --git a/meshai/notifications/channels.py b/meshai/notifications/channels.py index 6ac39ff..8c6f917 100644 --- a/meshai/notifications/channels.py +++ b/meshai/notifications/channels.py @@ -14,10 +14,6 @@ import httpx if TYPE_CHECKING: from ..connector import MeshConnector - from ..config import NotificationRuleConfig - from .events import NotificationPayload - -from meshai.notifications.renderers import MeshRenderer, EmailRenderer, WebhookRenderer logger = logging.getLogger(__name__) @@ -28,7 +24,7 @@ class NotificationChannel(ABC): channel_type: str = "base" @abstractmethod - async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: + async def deliver(self, alert: dict, rule: dict) -> bool: """Send alert. Returns True on success.""" raise NotImplementedError @@ -63,34 +59,21 @@ class MeshBroadcastChannel(NotificationChannel): def __init__(self, connector: "MeshConnector", channel_index: int = 0): self._connector = connector self._channel = channel_index - self._renderer = MeshRenderer() - async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: + async def deliver(self, alert: dict, rule: dict) -> bool: """Send alert to mesh channel.""" if not self._connector: logger.warning("No mesh connector available") return False try: - # If payload already has chunk metadata (from digest), use message directly - if alert.chunk_index is not None: - self._connector.send_message( - text=alert.message or "", - destination=None, - channel=self._channel, - ) - logger.info("Broadcast pre-chunked alert to channel %d", self._channel) - return True - - # Render to chunks for single-event delivery - chunks = self._renderer.render(alert) - for chunk in chunks: - self._connector.send_message( - text=chunk, - destination=None, - channel=self._channel, - ) - logger.info("Broadcast %d chunk(s) to channel %d", len(chunks), self._channel) + message = alert.get("message", "") + self._connector.send_message( + text=message, + destination=None, + channel=self._channel, + ) + logger.info("Broadcast alert to channel %d", self._channel) return True except Exception as e: logger.error("Failed to broadcast alert: %s", e) @@ -174,29 +157,22 @@ class MeshDMChannel(NotificationChannel): def __init__(self, connector: "MeshConnector", node_ids: list[str]): self._connector = connector self._node_ids = node_ids - self._renderer = MeshRenderer() - async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: + async def deliver(self, alert: dict, rule: dict) -> bool: """Send alert via DM to configured nodes.""" if not self._connector: return False - # If payload already has chunk metadata (from digest), use message directly - if alert.chunk_index is not None: - messages = [alert.message or ""] - else: - # Render to chunks for single-event delivery - messages = self._renderer.render(alert) - + message = alert.get("message", "") success = True + for node_id in self._node_ids: - for message in messages: - try: - node_id = str(node_id) - self._connector.send_message(text=message, destination=node_id, channel=0) - except Exception as e: - logger.error("Failed to DM %s: %s", node_id, e) - success = False + try: + node_id = str(node_id) + self._connector.send_message(text=message, destination=node_id, channel=0) + except Exception as e: + logger.error("Failed to DM %s: %s", node_id, e) + success = False return success @@ -309,17 +285,19 @@ class EmailChannel(NotificationChannel): self._tls = smtp_tls self._from = from_address self._recipients = recipients - self._renderer = EmailRenderer() - async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: + async def deliver(self, alert: dict, rule: dict) -> bool: """Send alert via email.""" if not self._recipients: return False - # Use renderer for subject and body - rendered = self._renderer.render(alert) - subject = rendered["subject"] - body = rendered["body"] + alert_type = alert.get("type", "alert") + severity = alert.get("severity", "routine").upper() + message = alert.get("message", "") + subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title()) + body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % ( + alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message + ) try: loop = asyncio.get_event_loop() @@ -535,16 +513,21 @@ class WebhookChannel(NotificationChannel): def __init__(self, url: str, headers: Optional[dict] = None): self._url = url self._headers = headers or {} - self._renderer = WebhookRenderer() - async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: + async def deliver(self, alert: dict, rule: dict) -> bool: """POST alert to webhook URL.""" - # Use renderer for generic JSON payload - payload = self._renderer.render(alert) + payload = { + "type": alert.get("type"), + "severity": alert.get("severity", "routine"), + "message": alert.get("message", ""), + "timestamp": time.time(), + "node_name": alert.get("node_name"), + "region": alert.get("region"), + } # Discord/Slack format if "discord.com" in self._url or "slack.com" in self._url: - severity = alert.severity or "routine" + severity = alert.get("severity", "routine") color = { "immediate": 0xFF0000, "priority": 0xFFAA00, @@ -552,8 +535,8 @@ class WebhookChannel(NotificationChannel): }.get(severity, 0x888888) payload = { "embeds": [{ - "title": "MeshAI: %s" % (alert.event_type or "unknown"), - "description": alert.message or "", + "title": "MeshAI: %s" % alert.get("type", "unknown"), + "description": alert.get("message", ""), "color": color, }] } @@ -562,14 +545,14 @@ class WebhookChannel(NotificationChannel): elif "ntfy" in self._url: headers = { **self._headers, - "Title": "MeshAI: %s" % (alert.event_type or "alert"), + "Title": "MeshAI: %s" % alert.get("type", "alert"), "Priority": "3", } try: async with httpx.AsyncClient() as client: resp = await client.post( self._url, - content=alert.message or "", + content=alert.get("message", ""), headers=headers, timeout=10, ) @@ -762,52 +745,8 @@ class WebhookChannel(NotificationChannel): return False, f"Webhook failed: {e}" -def create_channel(rule: "NotificationRuleConfig", connector=None) -> NotificationChannel: - """Create a channel instance from a NotificationRuleConfig. - - Args: - rule: NotificationRuleConfig with delivery_type and channel settings - connector: MeshConnector instance (required for mesh channels) - - Returns: - NotificationChannel instance - """ - delivery_type = rule.delivery_type - - if delivery_type == "mesh_broadcast": - return MeshBroadcastChannel( - connector=connector, - channel_index=rule.broadcast_channel, - ) - elif delivery_type == "mesh_dm": - return MeshDMChannel( - connector=connector, - node_ids=rule.node_ids, - ) - elif delivery_type == "email": - return EmailChannel( - smtp_host=rule.smtp_host, - smtp_port=rule.smtp_port, - smtp_user=rule.smtp_user, - smtp_password=rule.smtp_password, - smtp_tls=rule.smtp_tls, - from_address=rule.from_address, - recipients=rule.recipients, - ) - elif delivery_type == "webhook": - return WebhookChannel( - url=rule.webhook_url, - headers=rule.webhook_headers, - ) - else: - raise ValueError("Unknown delivery type: %s" % delivery_type) - - -def create_channel_from_dict(config: dict, connector=None) -> NotificationChannel: - """Create a channel instance from a dict config (legacy interface). - - Used by old router.py and test_channel API. Will be removed in Phase 2.7. - """ +def create_channel(config: dict, connector=None) -> NotificationChannel: + """Create a channel instance from config.""" channel_type = config.get("type", "") if channel_type == "mesh_broadcast": diff --git a/meshai/notifications/events.py b/meshai/notifications/events.py index 82ad5f6..72d322c 100644 --- a/meshai/notifications/events.py +++ b/meshai/notifications/events.py @@ -133,52 +133,6 @@ class Event: return cls(**d) - -@dataclass -class NotificationPayload: - """Per-delivery alert content handed to a NotificationChannel. - - This is the runtime alert shape: derived from an Event (or - built directly by the old router) and consumed by channels.py - implementations. - """ - message: str # The rendered text to deliver - category: str # e.g. "weather_warning" - severity: str # "immediate" | "priority" | "routine" - timestamp: float # Unix epoch when generated - - # Optional context fields (None when not applicable) - node_id: Optional[str] = None - node_name: Optional[str] = None - region: Optional[str] = None - event_type: Optional[str] = None # Maps to old dict's "type" field - - # Chunk metadata for mesh deliveries (set by scheduler/digest path) - chunk_index: Optional[int] = None - chunk_total: Optional[int] = None - - # Source Event reference for advanced channel use (renderers in 2.5b) - source_event: Optional["Event"] = None - - -def make_payload_from_event(event: "Event", **overrides) -> NotificationPayload: - """Helper to convert an Event into a NotificationPayload.""" - p = NotificationPayload( - message=event.summary or event.title or event.category, - category=event.category, - severity=event.severity, - timestamp=event.timestamp, - node_id=event.node_ids[0] if event.node_ids else None, - region=event.region, - event_type=event.category, - source_event=event, - ) - for k, v in overrides.items(): - setattr(p, k, v) - return p - - - def make_event( source: str, category: str, diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 05d96ef..1ee22ba 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -1,18 +1,17 @@ """Notification pipeline package. -Phase 2.4: +Phase 2.1 + 2.2 + 2.3a + 2.3b: - EventBus: pub/sub ingress - Inhibitor: suppresses redundant events by inhibit_keys - Grouper: coalesces events sharing group_key within a window - - ToggleFilter: drops events whose toggle isn't enabled - - Tee: sends events to both dispatcher and accumulator - - Dispatcher: routes to channels based on rules - - DigestAccumulator: logs events for LLM-summarized periodic digest - - DigestScheduler: fires digest at configured time + - SeverityRouter: forks immediate vs digest + - Dispatcher: routes immediate via channels (existing rules schema) + - DigestAccumulator: tracks priority/routine events for periodic digest + - DigestScheduler: fires digest at configured time (Phase 2.3b) Usage: from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline - bus = build_pipeline(config, llm_backend) # llm_backend from main.py + bus = build_pipeline(config) bus.emit(event) # Async lifecycle @@ -21,8 +20,6 @@ Usage: await stop_pipeline(scheduler) """ -import asyncio - from meshai.notifications.channels import create_channel from meshai.notifications.pipeline.bus import EventBus, get_bus from meshai.notifications.pipeline.severity_router import ( @@ -32,25 +29,17 @@ from meshai.notifications.pipeline.severity_router import ( from meshai.notifications.pipeline.dispatcher import Dispatcher from meshai.notifications.pipeline.inhibitor import Inhibitor from meshai.notifications.pipeline.grouper import Grouper -from meshai.notifications.pipeline.toggle_filter import ToggleFilter from meshai.notifications.pipeline.digest import DigestAccumulator, Digest from meshai.notifications.pipeline.scheduler import DigestScheduler -def build_pipeline(config, llm_backend, connector=None) -> EventBus: +def build_pipeline(config) -> EventBus: """Build the pipeline and return the EventBus. - Args: - config: Full Config object. - llm_backend: An already-constructed LLMBackend instance - (from main.py or a test). Pipeline components share - this single instance. May be None for fallback behavior. - connector: Optional MeshtasticConnector for mesh channels. - Components are stashed on bus._pipeline_components for lifecycle use. """ bus = EventBus() - dispatcher = Dispatcher(config, create_channel, connector=connector) + dispatcher = Dispatcher(config, create_channel) # Build include_toggles from config digest_cfg = getattr(config.notifications, "digest", None) @@ -60,35 +49,12 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus: if include_list: include_toggles = list(include_list) - accumulator = DigestAccumulator( - llm_backend=llm_backend, - include_toggles=include_toggles, + digest = DigestAccumulator(include_toggles=include_toggles) + severity_router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest.enqueue, ) - - # Tee closure: events go to BOTH dispatcher and accumulator - # dispatcher.dispatch() is async, so fire-and-forget with create_task - def _tee(event): - try: - asyncio.create_task(dispatcher.dispatch(event)) - except RuntimeError: - # No running event loop (e.g. sync tests) - skip async dispatch - pass - accumulator.enqueue(event) - - # Build enabled toggles set from config - toggles_cfg = getattr(config.notifications, "toggles", None) - enabled_toggles = None - if toggles_cfg is not None: - enabled_list = getattr(toggles_cfg, "enabled", None) - if enabled_list: - enabled_toggles = set(enabled_list) - - toggle_filter = ToggleFilter( - next_handler=_tee, - enabled_toggles=enabled_toggles, - ) - - grouper = Grouper(next_handler=toggle_filter.handle) + grouper = Grouper(next_handler=severity_router.handle) inhibitor = Inhibitor(next_handler=grouper.handle) bus.subscribe(inhibitor.handle) @@ -96,30 +62,21 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus: bus._pipeline_components = { "inhibitor": inhibitor, "grouper": grouper, - "toggle_filter": toggle_filter, + "severity_router": severity_router, "dispatcher": dispatcher, - "accumulator": accumulator, - "connector": connector, + "digest": digest, } return bus -def build_pipeline_components(config, llm_backend, connector=None) -> tuple: +def build_pipeline_components(config) -> tuple: """Like build_pipeline, but returns all components for tests. - Args: - config: Full Config object. - llm_backend: An already-constructed LLMBackend instance - (from main.py or a test). Pipeline components share - this single instance. May be None for fallback behavior. - connector: Optional MeshtasticConnector for mesh channels. - - Returns: - (bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator). + Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest). """ bus = EventBus() - dispatcher = Dispatcher(config, create_channel, connector=connector) + dispatcher = Dispatcher(config, create_channel) # Build include_toggles from config digest_cfg = getattr(config.notifications, "digest", None) @@ -129,39 +86,15 @@ def build_pipeline_components(config, llm_backend, connector=None) -> tuple: if include_list: include_toggles = list(include_list) - accumulator = DigestAccumulator( - llm_backend=llm_backend, - include_toggles=include_toggles, + digest = DigestAccumulator(include_toggles=include_toggles) + severity_router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest.enqueue, ) - - # Tee closure: events go to BOTH dispatcher and accumulator - # dispatcher.dispatch() is async, so fire-and-forget with create_task - def _tee(event): - try: - asyncio.create_task(dispatcher.dispatch(event)) - except RuntimeError: - # No running event loop (e.g. sync tests) - skip async dispatch - pass - accumulator.enqueue(event) - - # Build enabled toggles set from config - toggles_cfg = getattr(config.notifications, "toggles", None) - enabled_toggles = None - if toggles_cfg is not None: - enabled_list = getattr(toggles_cfg, "enabled", None) - if enabled_list: - enabled_toggles = set(enabled_list) - - toggle_filter = ToggleFilter( - next_handler=_tee, - enabled_toggles=enabled_toggles, - ) - - grouper = Grouper(next_handler=toggle_filter.handle) + grouper = Grouper(next_handler=severity_router.handle) inhibitor = Inhibitor(next_handler=grouper.handle) bus.subscribe(inhibitor.handle) - - return bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator + return bus, inhibitor, grouper, severity_router, dispatcher, digest async def start_pipeline(bus: EventBus, config) -> DigestScheduler: @@ -178,14 +111,12 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: if components is None: raise RuntimeError("bus missing _pipeline_components; use build_pipeline()") - accumulator = components["accumulator"] + digest = components["digest"] - connector = components.get("connector") scheduler = DigestScheduler( - accumulator=accumulator, + accumulator=digest, config=config, channel_factory=create_channel, - connector=connector, ) await scheduler.start() @@ -212,7 +143,6 @@ __all__ = [ "Dispatcher", "Inhibitor", "Grouper", - "ToggleFilter", "DigestAccumulator", "Digest", "DigestScheduler", diff --git a/meshai/notifications/pipeline/digest.py b/meshai/notifications/pipeline/digest.py index 0b19f9d..e518a25 100644 --- a/meshai/notifications/pipeline/digest.py +++ b/meshai/notifications/pipeline/digest.py @@ -1,24 +1,33 @@ -"""Digest accumulator and renderer for Phase 2.4. +"""Digest accumulator and renderer for Phase 2.3a. -Logs all events between digest emissions and renders LLM-summarized -digest output per toggle. No active/resolved tracking — just a -chronological log that the LLM summarizes. +Holds priority and routine events between digest emissions, tracks +active vs recently-resolved events, and renders the two-section +digest output (ACTIVE NOW + SINCE LAST DIGEST) when called. -render_digest() is async and calls the LLM once per non-empty toggle. +No scheduling logic here. render_digest() is called explicitly by +the future scheduler (Phase 2.3b) or by tests. """ import logging import time from dataclasses import dataclass, field -from datetime import datetime -from typing import Optional, TYPE_CHECKING +from typing import Optional from meshai.notifications.events import Event from meshai.notifications.categories import get_toggle -if TYPE_CHECKING: - from meshai.backends.base import LLMBackend +# Lowercase substrings in event.title that indicate the event is +# a resolution of a prior alert. Conservative list — easy to extend. +RESOLUTION_MARKERS = ( + "cleared", + "reopened", + "ended", + "resolved", + "back online", + "recovered", + "lifted", +) # Display labels per toggle (used in rendered output) TOGGLE_LABELS = { @@ -46,23 +55,11 @@ TOGGLE_ORDER = [ "other", ] -# System prompt for digest summarization -DIGEST_SYSTEM_PROMPT = ( - "You are summarizing a category of mesh-network alerts for a " - "morning digest broadcast. Given a list of events in chronological " - "order (immediate severity first, then priority, then routine), " - "produce ONE SHORT LINE summarizing what happened. " - "Be specific about node IDs, places, and counts when present. " - "Aim for 80-140 characters. Do not use markdown. No bullet points. " - "Plain prose. End with a period." -) - @dataclass class Digest: - """Result of render_digest(). Carries sections and metadata.""" + """Result of render_digest(). Carries both sections and metadata.""" rendered_at: float - # Keep these fields for type compatibility; populated empty in Phase 2.4+ active: dict[str, list[Event]] = field(default_factory=dict) since_last: dict[str, list[Event]] = field(default_factory=dict) mesh_chunks: list[str] = field(default_factory=list) @@ -70,31 +67,28 @@ class Digest: full: str = "" def is_empty(self) -> bool: - return not self.mesh_chunks or ( - len(self.mesh_chunks) == 1 and "No alerts" in self.mesh_chunks[0] - ) + return not self.active and not self.since_last class DigestAccumulator: - """Logs events and produces LLM-summarized periodic digests. + """Tracks priority/routine events and produces periodic digests. Args: - llm_backend: LLM backend for generating summaries. If None, - falls back to count-based summaries. + mesh_char_limit: Maximum characters per mesh chunk (default 200). include_toggles: List of toggle names to include in digest output. If None, defaults to all toggles in TOGGLE_ORDER except - rf_propagation. - mesh_char_limit: Maximum characters per mesh chunk (default 200). + rf_propagation. Unknown toggle names in the list are silently + accepted (TOGGLE_ORDER drives display order, include_toggles + drives which toggles are tracked). """ def __init__( self, - llm_backend: Optional["LLMBackend"] = None, - include_toggles: list[str] | None = None, mesh_char_limit: int = 200, + include_toggles: list[str] | None = None, ): - self._llm = llm_backend - self._events_since_last_digest: dict[str, list[Event]] = {} + self._active: dict[str, list[Event]] = {} # toggle -> events + self._since_last: dict[str, list[Event]] = {} # toggle -> events self._last_digest_at: float = 0.0 self._mesh_char_limit = mesh_char_limit # Default: all known toggles except rf_propagation @@ -107,7 +101,7 @@ class DigestAccumulator: # ---- ingress ---- def enqueue(self, event: Event) -> None: - """Log an event for the next digest.""" + """SeverityRouter calls this for priority/routine events.""" toggle = get_toggle(event.category) or "other" # Skip non-included toggles @@ -117,201 +111,348 @@ class DigestAccumulator: ) return - # Append to the event log - self._events_since_last_digest.setdefault(toggle, []).append(event) + active_for_toggle = self._active.setdefault(toggle, []) + + # Resolution detection + if self._is_resolution(event, self._now()): + self._move_to_since_last_by_group(event, toggle) + return + + # In-place update if same id + for i, existing in enumerate(active_for_toggle): + if existing.id == event.id: + active_for_toggle[i] = event + self._logger.debug( + f"UPDATED active event {event.id} in {toggle}" + ) + return + + # Otherwise it's a new active event + active_for_toggle.append(event) self._logger.debug( - f"LOGGED event {event.id} ({toggle}/{event.category}/{event.severity})" + f"ADDED active event {event.id} ({toggle}/{event.category})" ) def tick(self, now: Optional[float] = None) -> int: - """No-op in Phase 2.4+. Returns 0.""" - return 0 + """Move expired events from active to since_last. - # ---- rendering ---- - - async def render_digest(self, now: Optional[float] = None) -> Digest: - """Produce a Digest with LLM-summarized lines per toggle. - - Calls the LLM once per toggle that had activity. Empty toggles - produce no line. Clears the event log after rendering. + Returns the number of events moved. """ if now is None: now = self._now() + moved = 0 + for toggle in list(self._active.keys()): + still_active = [] + for ev in self._active[toggle]: + if ev.expires is not None and ev.expires <= now: + self._since_last.setdefault(toggle, []).append(ev) + moved += 1 + else: + still_active.append(ev) + self._active[toggle] = still_active + return moved + + # ---- rendering ---- + + def render_digest(self, now: Optional[float] = None) -> Digest: + """Produce a Digest of current state, then clear since_last.""" + if now is None: + now = self._now() + # tick() first so expired actives roll into since_last + self.tick(now) digest = Digest(rendered_at=now) - time_str = time.strftime('%H%M', time.localtime(now)) - - # Build summary lines per toggle - summary_lines: list[str] = [] - - for toggle in TOGGLE_ORDER: - events = self._events_since_last_digest.get(toggle, []) - if not events: - continue - if toggle not in self._included: - continue - - label = TOGGLE_LABELS.get(toggle, toggle) - summary = await self._summarize_toggle(toggle, events, now) - summary_lines.append(f"[{label}] {summary}") - - # Render outputs - if summary_lines: - digest.mesh_chunks = self._render_mesh_chunks(summary_lines, time_str) - digest.full = self._render_full(summary_lines, time_str) - else: - digest.mesh_chunks = [f"DIGEST {time_str}\nNo alerts since last digest."] - digest.full = f"--- {time_str} Digest ---\n\nNo alerts since last digest.\n" - - # mesh_compact for backward compatibility + # Defensive: skip non-included toggles when building output + digest.active = { + k: list(v) for k, v in self._active.items() + if v and k in self._included + } + digest.since_last = { + k: list(v) for k, v in self._since_last.items() + if v and k in self._included + } + digest.mesh_chunks = self._render_mesh_chunks(digest, now) + # mesh_compact: join chunks for backward compatibility if len(digest.mesh_chunks) == 1: digest.mesh_compact = digest.mesh_chunks[0] else: digest.mesh_compact = "\n---\n".join(digest.mesh_chunks) + digest.full = self._render_full(digest, now) - # Clear event log - self._events_since_last_digest.clear() + # Clear since_last; active stays for the next cycle + self._since_last.clear() self._last_digest_at = now - return digest - async def _summarize_toggle( + def _render_mesh_chunks(self, digest: Digest, now: float) -> list[str]: + """Produce mesh-radio-friendly compact chunks. + + Returns a list of strings, each ≤ self._mesh_char_limit chars. + Single-chunk output has no "(1/N)" suffix. Multi-chunk output + has "(k/N)" counters and "(cont)" suffixes on section headers + that span chunks. + """ + time_str = time.strftime('%H%M', time.localtime(now)) + + # Empty digest case + if not digest.active and not digest.since_last: + return [f"DIGEST {time_str}\nNo alerts since last digest."] + + # Build logical lines with section markers + # Each item is (section, line) where section is "active", "resolved", or None + logical_lines: list[tuple[str | None, str]] = [] + + if digest.active: + logical_lines.append(("active", "ACTIVE NOW")) + for toggle in TOGGLE_ORDER: + events = digest.active.get(toggle) + if not events: + continue + logical_lines.append(("active", self._compact_toggle_line(toggle, events))) + + if digest.since_last: + logical_lines.append(("resolved", "RESOLVED")) + for toggle in TOGGLE_ORDER: + events = digest.since_last.get(toggle) + if not events: + continue + logical_lines.append(("resolved", self._compact_toggle_line(toggle, events))) + + # Pack lines into chunks + return self._pack_lines_into_chunks(logical_lines, time_str) + + def _pack_lines_into_chunks( self, - toggle: str, - events: list[Event], - now: float, - ) -> str: - """Generate a one-line summary for a toggle's events.""" - # Sort by severity (immediate=0, priority=1, routine=2), then timestamp - severity_rank = {"immediate": 0, "priority": 1, "routine": 2} - sorted_events = sorted( - events, - key=lambda e: (severity_rank.get(e.severity, 3), e.timestamp), - ) - - # Build LLM input - lines = [f"Category: {toggle}", "Events:"] - for ev in sorted_events: - lines.append(self._format_event_for_llm(ev)) - llm_input = "\n".join(lines) - - # Try LLM summarization - if self._llm is not None: - try: - response = await self._llm.generate( - messages=[{"role": "user", "content": llm_input}], - system_prompt=DIGEST_SYSTEM_PROMPT, - max_tokens=200, - ) - # Take first line only - summary = response.strip().split("\n")[0].strip() - if summary: - return summary - except Exception as e: - self._logger.warning(f"LLM summarization failed for {toggle}: {e}") - - # Fallback: count-based summary - return f"{len(events)} event(s) (LLM unavailable)" - - def _format_event_for_llm(self, event: Event) -> str: - """Format one event for LLM input.""" - ts = datetime.fromtimestamp(event.timestamp) - time_str = ts.strftime("%H:%M") - severity = event.severity.upper() - - # Combine title and summary - text = event.title or "" - if event.summary and event.summary != event.title: - if text: - text = f"{text} — {event.summary}" - else: - text = event.summary - if not text: - text = event.category - - # Truncate long text - if len(text) > 120: - text = text[:117] + "..." - - return f"- [{severity} {time_str}] {text}" - - def _render_mesh_chunks( - self, - summary_lines: list[str], + logical_lines: list[tuple[str | None, str]], time_str: str, ) -> list[str]: - """Pack summary lines into mesh-friendly chunks.""" + """Pack logical lines into chunks respecting char limit. + + Args: + logical_lines: List of (section, line) tuples where section + is "active", "resolved", or None for headers. + time_str: Time string for headers (e.g., "0700"). + + Returns: + List of chunk strings, each ≤ self._mesh_char_limit. + """ + if not logical_lines: + return [f"DIGEST {time_str}\nNo alerts since last digest."] + limit = self._mesh_char_limit - chunks: list[list[str]] = [] + chunks: list[list[str]] = [] # List of line lists current_chunk: list[str] = [] current_len = 0 + last_section_in_chunk: str | None = None + sections_started: set[str] = set() - # Placeholder header - header = f"DIGEST {time_str}" + # Placeholder header - will be fixed up later + header_placeholder = f"DIGEST {time_str}" def start_new_chunk(): - nonlocal current_chunk, current_len + nonlocal current_chunk, current_len, last_section_in_chunk if current_chunk: chunks.append(current_chunk) - current_chunk = [header] - current_len = len(header) + current_chunk = [header_placeholder] + current_len = len(header_placeholder) + last_section_in_chunk = None start_new_chunk() - for line in summary_lines: - line_len = 1 + len(line) # newline + line - if current_len + line_len > limit: + i = 0 + while i < len(logical_lines): + section, line = logical_lines[i] + is_section_header = line in ("ACTIVE NOW", "RESOLVED") + + # Check if this is a section header - ensure it has at least one + # toggle line following it in this chunk + if is_section_header: + # Look ahead for the next toggle line + next_toggle_idx = i + 1 + if next_toggle_idx < len(logical_lines): + _, next_line = logical_lines[next_toggle_idx] + # Calculate space needed for header + newline + next line + needed = len(line) + 1 + len(next_line) + if current_len + 1 + needed > limit: + # Section header + next line won't fit, start new chunk + start_new_chunk() + sections_started.add(section) + last_section_in_chunk = section + current_chunk.append(line) + current_len += 1 + len(line) + i += 1 + continue + + # Calculate line length with newline + line_with_newline = 1 + len(line) # newline before line + + # Would this line fit? + if current_len + line_with_newline > limit: + # Start new chunk start_new_chunk() + + # If continuing a section, add "(cont)" header + if section and section in sections_started and not is_section_header: + cont_header = "ACTIVE NOW (cont)" if section == "active" else "RESOLVED (cont)" + current_chunk.append(cont_header) + current_len += 1 + len(cont_header) + last_section_in_chunk = section + + # Add the line + if is_section_header: + sections_started.add(section) + last_section_in_chunk = section current_chunk.append(line) - current_len += line_len + current_len += 1 + len(line) + i += 1 # Don't forget the last chunk - if current_chunk and len(current_chunk) > 1: + if current_chunk and len(current_chunk) > 1: # More than just header chunks.append(current_chunk) + elif current_chunk and len(current_chunk) == 1: + # Only header in chunk - shouldn't happen but handle gracefully + if chunks: + # Merge with previous chunk if possible + pass + else: + chunks.append(current_chunk) # Fix up headers with chunk counts - total = len(chunks) + total_chunks = len(chunks) result: list[str] = [] + for idx, chunk_lines in enumerate(chunks): - if total == 1: + # Fix header line + if total_chunks == 1: chunk_lines[0] = f"DIGEST {time_str}" else: - chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total})" + chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total_chunks})" result.append("\n".join(chunk_lines)) return result if result else [f"DIGEST {time_str}\nNo alerts since last digest."] - def _render_full(self, summary_lines: list[str], time_str: str) -> str: - """Produce full multi-line digest for email/webhook.""" + def _compact_toggle_line(self, toggle: str, events: list[Event]) -> str: + """Build one compact line for a toggle: [Label] headline (+N)""" + label = TOGGLE_LABELS.get(toggle, toggle) + sorted_events = self._sort_events(events) + top_event = sorted_events[0] + + # Get headline text + headline = top_event.summary or top_event.title or top_event.category + + # Truncate headline at ~60 chars to keep lines readable + max_headline = 60 + if len(headline) > max_headline: + headline = headline[:max_headline - 1] + "…" + + # Append (+N) if more than one event + overflow = len(events) - 1 + if overflow > 0: + return f"[{label}] {headline} (+{overflow})" + else: + return f"[{label}] {headline}" + + def _render_full(self, digest: Digest, now: float) -> str: + """Produce the full multi-line digest for email/webhook.""" lines = [ - f"--- {time_str} Digest ---", + f"--- {time.strftime('%H%M', time.localtime(now))} Digest ---", "", ] - lines.extend(summary_lines) - lines.append("") - return "\n".join(lines) + + if not digest.active and not digest.since_last: + lines.append("No alerts since last digest.") + lines.append("") + else: + if digest.active: + lines.append("ACTIVE NOW:") + for toggle in TOGGLE_ORDER: + events = digest.active.get(toggle) + if not events: + continue + label = TOGGLE_LABELS.get(toggle, toggle) + for ev in self._sort_events(events): + lines.append(f" [{label}] {self._format_event_line(ev)}") + lines.append("") + + if digest.since_last: + lines.append("SINCE LAST DIGEST:") + for toggle in TOGGLE_ORDER: + events = digest.since_last.get(toggle) + if not events: + continue + label = TOGGLE_LABELS.get(toggle, toggle) + for ev in self._sort_events(events): + lines.append(f" [{label}] {self._format_event_line(ev)}") + lines.append("") + + return "\n".join(lines).rstrip() + "\n" + + def _format_event_line(self, event: Event) -> str: + """Single-line summary of an event for digest output.""" + # Prefer event.summary if set, else fall back to title, then category + text = event.summary or event.title or event.category + # Trim runaway text — keep digest readable + if len(text) > 140: + text = text[:139] + "…" + return text + + def _sort_events(self, events: list[Event]) -> list[Event]: + """Sort within a toggle: immediate first, then priority, + then routine, then by timestamp newest first.""" + rank = {"immediate": 0, "priority": 1, "routine": 2} + return sorted( + events, + key=lambda e: (rank.get(e.severity, 3), -e.timestamp), + ) + + # ---- helpers ---- + + def _is_resolution(self, event: Event, now: float) -> bool: + if event.expires is not None and event.expires <= now: + return True + title_lc = (event.title or "").lower() + return any(marker in title_lc for marker in RESOLUTION_MARKERS) + + def _move_to_since_last_by_group(self, event: Event, toggle: str) -> None: + """Remove any active event matching event's group_key (or id) + and place this resolution event into since_last. + """ + active_list = self._active.get(toggle, []) + # Match by group_key if set, else by id + match_key = event.group_key + if match_key: + self._active[toggle] = [ + e for e in active_list + if e.group_key != match_key + ] + else: + self._active[toggle] = [ + e for e in active_list if e.id != event.id + ] + self._since_last.setdefault(toggle, []).append(event) + self._logger.debug( + f"RESOLVED in {toggle}: {event.id} ({event.title!r})" + ) def _now(self) -> float: return time.time() - # ---- inspection (for tests and scheduler) ---- + # ---- inspection (for tests and future scheduler) ---- - def event_count(self, toggle: Optional[str] = None) -> int: - """Count events logged since last digest.""" + def active_count(self, toggle: Optional[str] = None) -> int: if toggle is not None: - return len(self._events_since_last_digest.get(toggle, [])) - return sum(len(v) for v in self._events_since_last_digest.values()) + return len(self._active.get(toggle, [])) + return sum(len(v) for v in self._active.values()) + + def since_last_count(self, toggle: Optional[str] = None) -> int: + if toggle is not None: + return len(self._since_last.get(toggle, [])) + return sum(len(v) for v in self._since_last.values()) def last_digest_at(self) -> float: return self._last_digest_at def clear(self) -> None: - self._events_since_last_digest.clear() + self._active.clear() + self._since_last.clear() self._last_digest_at = 0.0 - - # Legacy compatibility — return 0 for old tests - def active_count(self, toggle: Optional[str] = None) -> int: - return 0 - - def since_last_count(self, toggle: Optional[str] = None) -> int: - return 0 diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index adc8ad1..1df1eb2 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -4,15 +4,12 @@ The dispatcher routes immediate-severity events through the existing NotificationRuleConfig rules and delivers via channels.py. This is the transitional bridge between the new Event pipeline and the existing channel implementations. - -Phase 2.5a: dispatch() is now async, takes a connector at construction, -and properly awaits channel.deliver(payload, rule). """ import logging -from typing import Callable, Optional +from typing import Callable -from meshai.notifications.events import Event, make_payload_from_event +from meshai.notifications.events import Event class Dispatcher: @@ -20,26 +17,21 @@ class Dispatcher: SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2} - def __init__(self, config, channel_factory: Callable, connector=None): + def __init__(self, config, channel_factory: Callable): """Initialize. Args: config: The full Config object (provides config.notifications.rules) - channel_factory: Callable taking (rule, connector) and returning - a NotificationChannel. This is create_channel from - meshai/notifications/channels.py. - connector: MeshConnector instance for mesh channel deliveries. + channel_factory: Callable taking a NotificationRuleConfig and + returning a NotificationChannel. This is create_channel + from meshai/notifications/channels.py. """ self._config = config self._channel_factory = channel_factory - self._connector = connector self._logger = logging.getLogger("meshai.pipeline.dispatcher") - async def dispatch(self, event: Event) -> None: - """Deliver an immediate-severity event to all matching channels. - - This method is async and awaits each channel.deliver() call. - """ + def dispatch(self, event: Event) -> None: + """Deliver an immediate-severity event to all matching channels.""" rules = self._matching_rules(event) if not rules: self._logger.debug( @@ -48,17 +40,19 @@ class Dispatcher: return for rule in rules: try: - channel = self._channel_factory(rule, self._connector) - payload = make_payload_from_event(event) - success = await channel.deliver(payload, rule) - if success: - self._logger.info( - f"Dispatched event {event.id} via {rule.delivery_type}" - ) - else: - self._logger.warning( - f"Channel delivery returned False for rule {rule.name}" - ) + channel = self._channel_factory(rule) + alert = { + "category": event.category, + "severity": event.severity, + "message": event.summary or event.title, + "node_id": event.node_ids[0] if event.node_ids else None, + "region": event.region, + "timestamp": event.timestamp, + } + channel.deliver(alert) + self._logger.info( + f"Dispatched event {event.id} via {rule.delivery_type}" + ) except Exception: self._logger.exception( f"Channel delivery failed for rule {rule.name}" diff --git a/meshai/notifications/pipeline/scheduler.py b/meshai/notifications/pipeline/scheduler.py index 66f2512..617a00e 100644 --- a/meshai/notifications/pipeline/scheduler.py +++ b/meshai/notifications/pipeline/scheduler.py @@ -13,7 +13,6 @@ from datetime import datetime, timedelta from typing import Callable, Optional from meshai.notifications.pipeline.digest import DigestAccumulator -from meshai.notifications.events import NotificationPayload class DigestScheduler: @@ -24,14 +23,12 @@ class DigestScheduler: accumulator: DigestAccumulator, config, channel_factory: Callable, - connector=None, clock: Optional[Callable[[], float]] = None, sleep: Optional[Callable[[float], "asyncio.Future"]] = None, ): self._accumulator = accumulator self._config = config self._channel_factory = channel_factory - self._connector = connector self._clock = clock or time.time self._sleep = sleep or asyncio.sleep self._task: Optional[asyncio.Task] = None @@ -101,8 +98,7 @@ class DigestScheduler: async def _fire(self, now: float) -> None: """Render and deliver one digest.""" self._logger.info(f"Firing digest at {datetime.fromtimestamp(now):%H:%M}") - # render_digest is now async in Phase 2.4+ - digest = await self._accumulator.render_digest(now) + digest = self._accumulator.render_digest(now) self._last_fire_at = now rules = self._matching_rules() @@ -123,7 +119,7 @@ class DigestScheduler: async def _deliver_to_rule(self, rule, digest, now: float) -> None: """Hand the rendered digest to a channel based on rule.delivery_type.""" - channel = self._channel_factory(rule, self._connector) + channel = self._channel_factory(rule) delivery_type = rule.delivery_type if delivery_type in ("mesh_broadcast", "mesh_dm"): @@ -131,27 +127,31 @@ class DigestScheduler: chunks = digest.mesh_chunks total = len(chunks) for i, chunk in enumerate(chunks, start=1): - payload = NotificationPayload( - message=chunk, - category="digest", - severity="routine", - timestamp=now, - chunk_index=i, - chunk_total=total, - ) - await channel.deliver(payload, rule) + payload = { + "category": "digest", + "severity": "routine", + "message": chunk, + "node_id": None, + "region": None, + "timestamp": now, + "chunk_index": i, + "chunk_total": total, + } + channel.deliver(payload) self._logger.info( f"Delivered {total} mesh chunk(s) to rule {rule.name!r}" ) else: # Single full-form delivery - payload = NotificationPayload( - message=digest.full, - category="digest", - severity="routine", - timestamp=now, - ) - await channel.deliver(payload, rule) + payload = { + "category": "digest", + "severity": "routine", + "message": digest.full, + "node_id": None, + "region": None, + "timestamp": now, + } + channel.deliver(payload) self._logger.info( f"Delivered digest to rule {rule.name!r} via {delivery_type}" ) diff --git a/meshai/notifications/pipeline/toggle_filter.py b/meshai/notifications/pipeline/toggle_filter.py deleted file mode 100644 index 1813990..0000000 --- a/meshai/notifications/pipeline/toggle_filter.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Master toggle filter. - -Drops events whose category maps to a toggle that the operator has -disabled. Distinct from DigestAccumulator.include_toggles, which -only affects the digest recap — this filter drops events from the -entire pipeline, so disabled toggles produce no live mesh delivery, -no digest entry, nothing. -""" - -import logging -from typing import Callable - -from meshai.notifications.events import Event -from meshai.notifications.categories import get_toggle - - -class ToggleFilter: - """Drop events whose toggle isn't in the enabled set.""" - - def __init__( - self, - next_handler: Callable[[Event], None], - enabled_toggles: set[str] | None = None, - ): - """Initialize. - - Args: - next_handler: Callable that receives non-dropped events. - enabled_toggles: Set of toggle names that are enabled. - If None, all toggles are enabled (filter is a no-op). - """ - self._next = next_handler - self._enabled = enabled_toggles # None = no-op - self._logger = logging.getLogger("meshai.pipeline.toggle_filter") - - def handle(self, event: Event) -> None: - """Pass the event through, or drop it if its toggle is disabled.""" - if self._enabled is None: - self._next(event) - return - - toggle = get_toggle(event.category) or "other" - if toggle not in self._enabled: - self._logger.debug( - f"DROPPED event {event.id} — toggle {toggle!r} not enabled" - ) - return - self._next(event) diff --git a/meshai/notifications/renderers/__init__.py b/meshai/notifications/renderers/__init__.py deleted file mode 100644 index cdad6cd..0000000 --- a/meshai/notifications/renderers/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -"""Channel-type-aware renderers. - -Each renderer takes a NotificationPayload and produces a -channel-type-appropriate output: mesh chunks, email subject/body, -or webhook JSON dict. - -Renderers are reusable beyond channels.py — the future MQTT event -publisher (Phase 2.6.5) uses WebhookRenderer for its on-the-wire -format. -""" - -from meshai.notifications.renderers.base import Renderer -from meshai.notifications.renderers.mesh import MeshRenderer -from meshai.notifications.renderers.email import EmailRenderer -from meshai.notifications.renderers.webhook import WebhookRenderer - -__all__ = [ - "Renderer", - "MeshRenderer", - "EmailRenderer", - "WebhookRenderer", -] diff --git a/meshai/notifications/renderers/base.py b/meshai/notifications/renderers/base.py deleted file mode 100644 index b3f4dfe..0000000 --- a/meshai/notifications/renderers/base.py +++ /dev/null @@ -1,28 +0,0 @@ -"""Abstract base for channel-type-aware renderers. - -Each renderer takes a NotificationPayload and produces a -channel-type-appropriate output (string, list, dict, etc.). -Renderers are pure functions of their input — no network, no -state, no side effects beyond logging. The channel that owns a -renderer calls render() and then handles delivery. -""" - -from abc import ABC, abstractmethod -from typing import Any - -from meshai.notifications.events import NotificationPayload - - -class Renderer(ABC): - """Base class for all channel-type renderers.""" - - @abstractmethod - def render(self, payload: NotificationPayload) -> Any: - """Produce the channel-type-appropriate output. - - Subclasses define the concrete return type: - - MeshRenderer returns list[str] - - EmailRenderer returns dict (subject, body) - - WebhookRenderer returns dict (JSON-serializable) - """ - raise NotImplementedError diff --git a/meshai/notifications/renderers/email.py b/meshai/notifications/renderers/email.py deleted file mode 100644 index 988e75b..0000000 --- a/meshai/notifications/renderers/email.py +++ /dev/null @@ -1,78 +0,0 @@ -"""Email channel renderer. - -Produces a dict with subject and body for SMTP delivery. Plain -text body for now; HTML body is a future polish. - -Subject format: "[MeshAI] " -Body format: multi-line, with the message as the lead, followed -by structured context fields (severity, region, node, timestamp, -source category). -""" - -import logging -from datetime import datetime -from typing import Optional - -from meshai.notifications.events import NotificationPayload -from meshai.notifications.renderers.base import Renderer - - -class EmailRenderer(Renderer): - """Produce email subject and body for a single payload.""" - - def __init__(self): - self._logger = logging.getLogger("meshai.renderers.email") - - def render(self, payload: NotificationPayload) -> dict: - """Render the payload as {subject, body}.""" - return { - "subject": self._build_subject(payload), - "body": self._build_body(payload), - } - - def _build_subject(self, p: NotificationPayload) -> str: - sev = (p.severity or "routine").upper() - type_label = self._type_label(p.event_type) or "Alert" - return f"[MeshAI] {sev} — {type_label}" - - def _build_body(self, p: NotificationPayload) -> str: - lines: list[str] = [] - # Lead line - lines.append(p.message or "(no message)") - lines.append("") # blank separator - - # Structured context - lines.append(f"Severity: {p.severity or 'routine'}") - if p.event_type: - lines.append(f"Category: {p.event_type}") - if p.region: - lines.append(f"Region: {p.region}") - if p.node_name: - lines.append(f"Node: {p.node_name}") - elif p.node_id: - lines.append(f"Node: {p.node_id}") - if p.timestamp: - try: - ts = datetime.fromtimestamp(p.timestamp) - lines.append(f"Time: {ts.strftime('%Y-%m-%d %H:%M:%S')}") - except (ValueError, OverflowError): - pass - - # Optional source event detail (if present) - if p.source_event is not None: - ev = p.source_event - if hasattr(ev, "source") and ev.source: - lines.append(f"Source: {ev.source}") - if hasattr(ev, "title") and ev.title and ev.title != p.message: - lines.append(f"Title: {ev.title}") - - lines.append("") - lines.append("--") - lines.append("MeshAI notification") - return "\n".join(lines) - - def _type_label(self, event_type: Optional[str]) -> Optional[str]: - """Title-cased label for the event type (for subject line).""" - if not event_type: - return None - return event_type.replace("_", " ").title() diff --git a/meshai/notifications/renderers/mesh.py b/meshai/notifications/renderers/mesh.py deleted file mode 100644 index 935a92b..0000000 --- a/meshai/notifications/renderers/mesh.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Mesh channel renderer. - -Produces a list of short strings (each ≤200 chars by default) -suitable for mesh radio broadcast. Reuses the digest's chunking -pattern: never split a single "line" across chunks; add (k/N) -counters when the rendered output produces more than one chunk. - -The mesh renderer is symmetric with the digest accumulator's -mesh chunk renderer — same chunk-packing algorithm, same char -limit semantics. -""" - -import logging -from typing import Optional - -from meshai.notifications.events import NotificationPayload -from meshai.notifications.renderers.base import Renderer - - -class MeshRenderer(Renderer): - """Produce mesh-compact chunks for a single payload.""" - - def __init__(self, char_limit: int = 200): - self._limit = char_limit - self._logger = logging.getLogger("meshai.renderers.mesh") - - def render(self, payload: NotificationPayload) -> list[str]: - """Render the payload as 1+ mesh-compact chunks. - - Algorithm: - - Build the full message line (event_type, severity hint, - and message text — see _format_one_line) - - If the line fits in self._limit chars, return [line]. - - If the line exceeds self._limit, split across multiple - chunks at word boundaries when possible, and add - "(k/N)" counters at the start of each chunk. - - If a single word exceeds the chunk limit, hard-split - mid-word (rare — long URLs etc.) - """ - line = self._format_one_line(payload) - if len(line) <= self._limit: - return [line] - - return self._chunk_long_line(line) - - def _format_one_line(self, p: NotificationPayload) -> str: - """Build the headline for a payload. - - Default format: - "[] " - where EventTypeTitle is a short label derived from - p.event_type (e.g. "weather_warning" → "Weather"). If - p.event_type is None, omit the prefix. - - Truncates the message at the limit only if the prefix - is short enough; otherwise lets the chunker handle it. - """ - prefix = self._toggle_label(p.event_type) - if prefix: - return f"[{prefix}] {p.message}" - return p.message - - def _toggle_label(self, event_type: Optional[str]) -> Optional[str]: - """Map an event category to a short toggle label. - - Looks up the toggle the category belongs to and returns - the toggle's display label. If unknown, returns None - (no prefix added). - """ - if not event_type: - return None - from meshai.notifications.categories import get_toggle - toggle = get_toggle(event_type) - if not toggle: - return None - # Same label set used by the digest renderer - TOGGLE_LABELS = { - "mesh_health": "Mesh", - "weather": "Weather", - "fire": "Fire", - "rf_propagation": "RF", - "roads": "Roads", - "avalanche": "Avalanche", - "seismic": "Seismic", - "tracking": "Tracking", - "other": "Other", - } - return TOGGLE_LABELS.get(toggle, toggle.title()) - - def _chunk_long_line(self, line: str) -> list[str]: - """Split a long line into chunks ≤ self._limit each. - - Reserves ~10 chars per chunk for the "(k/N)" counter - suffix. Splits at word boundaries; falls back to - mid-word split if a single word exceeds the budget. - """ - # Reserve space for " (k/N)" — generous to 10 chars - body_budget = self._limit - 10 - if body_budget <= 0: - body_budget = self._limit # extreme small limit; ignore counter - - words = line.split(" ") - chunks: list[str] = [] - current = "" - for word in words: - # Hard-split words longer than body_budget (rare) - while len(word) > body_budget: - if current: - chunks.append(current) - current = "" - chunks.append(word[:body_budget]) - word = word[body_budget:] - # Add word to current chunk if it fits - tentative = (current + " " + word) if current else word - if len(tentative) <= body_budget: - current = tentative - else: - chunks.append(current) - current = word - if current: - chunks.append(current) - - # Add counters: "DIGEST"-style "(k/N)" suffix - total = len(chunks) - if total == 1: - return chunks - return [f"{chunk} ({i+1}/{total})" for i, chunk in enumerate(chunks)] diff --git a/meshai/notifications/renderers/webhook.py b/meshai/notifications/renderers/webhook.py deleted file mode 100644 index d9fafb3..0000000 --- a/meshai/notifications/renderers/webhook.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Webhook channel renderer. - -Produces a JSON-serializable dict with stable field names. Also -intended for reuse by the future MQTT event publisher (Phase -2.6.5), which wants the same structured shape on the wire. - -Field names use snake_case. Optional fields are omitted from the -output when None, NOT set to null — keeps payloads compact and -avoids ambiguity between "field absent" and "field explicitly -null". -""" - -import logging -from typing import Any - -from meshai.notifications.events import NotificationPayload -from meshai.notifications.renderers.base import Renderer - - -# Schema version for the wire format. Bump when making -# breaking changes to the field shape. External consumers -# can use this to handle multiple versions. -WEBHOOK_SCHEMA_VERSION = "1.0" - - -class WebhookRenderer(Renderer): - """Produce a JSON-serializable dict for a single payload.""" - - def __init__(self): - self._logger = logging.getLogger("meshai.renderers.webhook") - - def render(self, payload: NotificationPayload) -> dict: - """Render the payload as a structured dict.""" - out: dict[str, Any] = { - "schema_version": WEBHOOK_SCHEMA_VERSION, - "message": payload.message, - "severity": payload.severity or "routine", - "timestamp": payload.timestamp, - } - - # Optional fields — omit if None - for src_attr, dst_key in ( - ("category", "category"), - ("event_type", "event_type"), - ("node_id", "node_id"), - ("node_name", "node_name"), - ("region", "region"), - ("chunk_index", "chunk_index"), - ("chunk_total", "chunk_total"), - ): - value = getattr(payload, src_attr, None) - if value is not None: - out[dst_key] = value - - # Optional source event detail - if payload.source_event is not None: - ev = payload.source_event - source_event: dict[str, Any] = {} - for attr in ("id", "source", "title", "expires", "group_key"): - if hasattr(ev, attr): - value = getattr(ev, attr) - if value is not None: - source_event[attr] = value - if source_event: - out["source_event"] = source_event - - return out diff --git a/meshai/notifications/router.py b/meshai/notifications/router.py index 64efa1d..f58a185 100644 --- a/meshai/notifications/router.py +++ b/meshai/notifications/router.py @@ -8,8 +8,7 @@ import time from datetime import datetime from typing import Optional, TYPE_CHECKING -from .channels import create_channel_from_dict, NotificationChannel -from .events import NotificationPayload +from .channels import create_channel, NotificationChannel from .summarizer import MessageSummarizer if TYPE_CHECKING: @@ -143,7 +142,7 @@ class NotificationRouter: return None try: - return create_channel_from_dict(config, self._connector) + return create_channel(config, self._connector) except Exception as e: logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e) return None @@ -200,20 +199,7 @@ class NotificationRouter: else: delivery_alert = {**alert, "message": message[:195] + "..."} - # Convert dict to NotificationPayload for channel interface - payload = NotificationPayload( - message=delivery_alert.get("message", ""), - category=delivery_alert.get("type", "unknown"), - severity=delivery_alert.get("severity", "routine"), - timestamp=delivery_alert.get("timestamp", time.time()), - node_id=delivery_alert.get("node_id"), - node_name=delivery_alert.get("node_name"), - region=delivery_alert.get("region"), - event_type=delivery_alert.get("type"), - ) - # Rule is a dict here; channels don't use it so we pass None - # for the rule parameter (channels ignore it anyway) - success = await channel.deliver(payload, None) + success = await channel.deliver(delivery_alert, rule) if success: delivered = True self._record_fire(rule_name) @@ -269,7 +255,7 @@ class NotificationRouter: {success, message, error, details} """ try: - channel = create_channel_from_dict(channel_config, self._connector) + channel = create_channel(channel_config, self._connector) return await channel.test_connection() except ValueError as e: return { diff --git a/tests/test_adapter_firms.py b/tests/test_adapter_firms.py deleted file mode 100644 index fb564e6..0000000 --- a/tests/test_adapter_firms.py +++ /dev/null @@ -1,212 +0,0 @@ -"""Tests for FIRMS adapter Phase 2.6 — to_event() method.""" - -import time -from unittest.mock import MagicMock - -import pytest - -from meshai.env.firms import FIRMSAdapter -from meshai.notifications.events import Event - - -# ============================================================ -# FIXTURES -# ============================================================ - -@pytest.fixture -def mock_config(): - """Create a mock FIRMSConfig.""" - config = MagicMock() - config.map_key = "test-key" - config.source = "VIIRS_SNPP_NRT" - config.bbox = [-117, 42, -114, 44] - config.day_range = 1 - config.tick_seconds = 1800 - config.confidence_min = "nominal" - config.proximity_km = 10.0 - return config - - -@pytest.fixture -def adapter(mock_config): - """Create a FIRMSAdapter with mocked dependencies.""" - return FIRMSAdapter(mock_config, region_anchors=[], fires_adapter=None) - - -def make_firms_event( - lat=42.5, - lon=-114.5, - new_ignition=False, - severity="routine", - headline="Test Hotspot", - frp=None, - confidence="n", - distance_km=None, - nearest_anchor=None, - near_fire=None, -): - """Helper to create a FIRMS event dict.""" - now = time.time() - return { - "source": "firms", - "event_id": f"firms_{lat:.4f}_{lon:.4f}_2026-05-15_1200", - "event_type": "Fire Hotspot", - "severity": severity, - "headline": headline, - "lat": lat, - "lon": lon, - "expires": now + 21600, - "fetched_at": now, - "properties": { - "new_ignition": new_ignition, - "confidence": confidence, - "frp": frp, - "brightness": 350.0, - "acq_date": "2026-05-15", - "acq_time": "1200", - "near_fire": near_fire, - "distance_to_fire_km": 5.0 if near_fire else None, - "distance_km": distance_km, - "nearest_anchor": nearest_anchor, - }, - } - - -# ============================================================ -# CATEGORY DECISION TESTS -# ============================================================ - -def test_to_event_new_ignition(adapter): - """New ignition maps to new_ignition category.""" - evt = make_firms_event(new_ignition=True) - event = adapter.to_event(evt) - assert event is not None - assert event.category == "new_ignition" - - -def test_to_event_near_known_fire(adapter): - """Hotspot near known fire maps to wildfire_proximity.""" - evt = make_firms_event(new_ignition=False, near_fire="Snake River Fire") - event = adapter.to_event(evt) - assert event is not None - assert event.category == "wildfire_proximity" - - -# ============================================================ -# SEVERITY PASS-THROUGH TESTS -# ============================================================ - -def test_to_event_severity_passes_through(adapter): - """Severity from FIRMS event passes through unchanged.""" - for sev in ["routine", "priority", "immediate"]: - evt = make_firms_event(severity=sev) - event = adapter.to_event(evt) - assert event is not None - assert event.severity == sev - - -# ============================================================ -# CONTENT TESTS -# ============================================================ - -def test_to_event_summary_includes_frp(adapter): - """Summary includes FRP when present.""" - evt = make_firms_event(frp=85.5) - event = adapter.to_event(evt) - assert event is not None - assert "FRP 85" in event.summary - - -def test_to_event_summary_handles_missing_frp(adapter): - """Missing FRP doesn't break to_event.""" - evt = make_firms_event(frp=None) - event = adapter.to_event(evt) - assert event is not None - assert "FRP" not in event.summary - - -def test_to_event_summary_includes_distance_when_present(adapter): - """Summary includes distance and anchor when present.""" - evt = make_firms_event(distance_km=12, nearest_anchor="TFL") - event = adapter.to_event(evt) - assert event is not None - assert "12 km" in event.summary - assert "TFL" in event.summary - - -def test_to_event_region_uses_nearest_anchor(adapter): - """Region is set from nearest_anchor.""" - evt = make_firms_event(nearest_anchor="MHR") - event = adapter.to_event(evt) - assert event is not None - assert event.region == "MHR" - - -# ============================================================ -# SPATIAL KEY TESTS -# ============================================================ - -def test_to_event_group_key_is_spatial_grid(adapter): - """Group key is spatial grid based on rounded lat/lon.""" - evt = make_firms_event(lat=42.5678, lon=-114.3456) - event = adapter.to_event(evt) - assert event is not None - assert event.group_key == "firms:42.57:-114.35" - - -def test_to_event_inhibit_keys_match_group_key(adapter): - """Inhibit keys contain the same spatial key as group_key.""" - evt = make_firms_event(lat=42.5678, lon=-114.3456) - event = adapter.to_event(evt) - assert event is not None - assert event.group_key in event.inhibit_keys - - -def test_two_nearby_detections_share_group_key(adapter): - """Two detections in same grid cell share group_key.""" - # Both round to 42.57:-114.35 - evt1 = make_firms_event(lat=42.571, lon=-114.351) - evt2 = make_firms_event(lat=42.572, lon=-114.352) - event1 = adapter.to_event(evt1) - event2 = adapter.to_event(evt2) - assert event1 is not None - assert event2 is not None - assert event1.group_key == event2.group_key - - -# ============================================================ -# DEFENSIVE TESTS -# ============================================================ - -def test_to_event_missing_coords_returns_none(adapter): - """Missing coordinates returns None.""" - evt = make_firms_event() - evt["lat"] = None - event = adapter.to_event(evt) - assert event is None - - -def test_to_event_missing_properties_returns_event(adapter): - """Missing properties dict defaults to wildfire_proximity.""" - evt = { - "source": "firms", - "event_id": "test", - "event_type": "Fire Hotspot", - "severity": "routine", - "headline": "Test", - "lat": 42.5, - "lon": -114.5, - "fetched_at": time.time(), - } - # No "properties" key at all - event = adapter.to_event(evt) - assert event is not None - assert event.category == "wildfire_proximity" - - -def test_to_event_does_not_raise_on_corrupted_dict(adapter): - """Corrupted dict returns None without raising.""" - evt = {"garbage": True} - # Should not raise - event = adapter.to_event(evt) - assert event is None diff --git a/tests/test_adapter_nws.py b/tests/test_adapter_nws.py deleted file mode 100644 index ca60181..0000000 --- a/tests/test_adapter_nws.py +++ /dev/null @@ -1,277 +0,0 @@ -"""Tests for NWS adapter Phase 2.6 — to_event() and _derive_category().""" - -import time -from unittest.mock import MagicMock, patch - -import pytest - -from meshai.env.nws import NWSAlertsAdapter -from meshai.notifications.events import Event - - -# ============================================================ -# FIXTURES -# ============================================================ - -@pytest.fixture -def mock_config(): - """Create a mock NWSConfig.""" - config = MagicMock() - config.areas = ["ID"] - config.user_agent = "(test, test@example.com)" - config.severity_min = "moderate" - config.tick_seconds = 60 - return config - - -@pytest.fixture -def adapter(mock_config): - """Create an NWSAlertsAdapter with mocked config.""" - return NWSAlertsAdapter(mock_config) - - -# ============================================================ -# _derive_category TESTS -# ============================================================ - -def test_derive_category_warning(adapter): - """Warning suffix maps to weather_warning.""" - assert adapter._derive_category("Tornado Warning") == "weather_warning" - assert adapter._derive_category("Red Flag Warning") == "weather_warning" - assert adapter._derive_category("Winter Storm Warning") == "weather_warning" - - -def test_derive_category_watch(adapter): - """Watch suffix maps to weather_watch.""" - assert adapter._derive_category("Tornado Watch") == "weather_watch" - assert adapter._derive_category("Winter Storm Watch") == "weather_watch" - assert adapter._derive_category("Fire Weather Watch") == "weather_watch" - - -def test_derive_category_advisory(adapter): - """Advisory suffix maps to weather_advisory.""" - assert adapter._derive_category("Wind Advisory") == "weather_advisory" - assert adapter._derive_category("Heat Advisory") == "weather_advisory" - assert adapter._derive_category("Frost Advisory") == "weather_advisory" - - -def test_derive_category_statement(adapter): - """Non-standard suffixes map to weather_statement.""" - assert adapter._derive_category("Special Weather Statement") == "weather_statement" - assert adapter._derive_category("Short Term Forecast") == "weather_statement" - assert adapter._derive_category("Hazardous Weather Outlook") == "weather_statement" - - -def test_derive_category_case_insensitive(adapter): - """Category derivation is case-insensitive.""" - assert adapter._derive_category("TORNADO WARNING") == "weather_warning" - assert adapter._derive_category("winter storm watch") == "weather_watch" - assert adapter._derive_category("Wind ADVISORY") == "weather_advisory" - - -# ============================================================ -# to_event TESTS -# ============================================================ - -def test_to_event_returns_event_instance(adapter): - """to_event returns an Event instance.""" - raw = { - "source": "nws", - "event_id": "urn:oid:2.49.0.1.840.0.abc123", - "event_type": "Tornado Warning", - "severity": "extreme", - "headline": "Tornado Warning issued for Ada County", - "description": "A tornado warning has been issued...", - "onset": time.time(), - "expires": time.time() + 3600, - "areas": ["IDZ016"], - "lat": 43.615, - "lon": -116.2023, - } - event = adapter.to_event(raw) - assert isinstance(event, Event) - - -def test_to_event_sets_source_nws(adapter): - """to_event sets source to 'nws'.""" - raw = { - "source": "nws", - "event_id": "test-id", - "event_type": "Wind Advisory", - "severity": "moderate", - "headline": "Wind Advisory", - } - event = adapter.to_event(raw) - assert event.source == "nws" - - -def test_to_event_derives_category_from_event_type(adapter): - """to_event uses _derive_category for category field.""" - raw = { - "event_id": "test", - "event_type": "Winter Storm Watch", - "severity": "severe", - "headline": "Winter Storm Watch", - } - event = adapter.to_event(raw) - assert event.category == "weather_watch" - - -def test_to_event_maps_severity(adapter): - """to_event maps NWS severity to 3-level system.""" - # Extreme -> immediate - raw = {"event_id": "1", "event_type": "Tornado Warning", "severity": "extreme", "headline": "test"} - assert adapter.to_event(raw).severity == "immediate" - - # Severe -> priority - raw = {"event_id": "2", "event_type": "Tornado Warning", "severity": "severe", "headline": "test"} - assert adapter.to_event(raw).severity == "priority" - - # Moderate -> routine - raw = {"event_id": "3", "event_type": "Wind Advisory", "severity": "moderate", "headline": "test"} - assert adapter.to_event(raw).severity == "routine" - - -def test_to_event_sets_title_from_headline(adapter): - """to_event uses headline as title.""" - raw = { - "event_id": "test", - "event_type": "Heat Advisory", - "severity": "moderate", - "headline": "Heat Advisory issued for Magic Valley", - } - event = adapter.to_event(raw) - assert event.title == "Heat Advisory issued for Magic Valley" - - -def test_to_event_sets_body_from_description(adapter): - """to_event uses description as body.""" - raw = { - "event_id": "test", - "event_type": "Heat Advisory", - "severity": "moderate", - "headline": "Heat Advisory", - "description": "Dangerously hot conditions expected...", - } - event = adapter.to_event(raw) - assert event.body == "Dangerously hot conditions expected..." - - -def test_to_event_sets_effective_and_expires(adapter): - """to_event sets effective and expires from onset/expires.""" - onset = time.time() - expires = time.time() + 7200 - raw = { - "event_id": "test", - "event_type": "Wind Advisory", - "severity": "moderate", - "headline": "Wind Advisory", - "onset": onset, - "expires": expires, - } - event = adapter.to_event(raw) - assert event.effective == onset - assert event.expires == expires - - -def test_to_event_sets_lat_lon(adapter): - """to_event sets lat/lon from raw event.""" - raw = { - "event_id": "test", - "event_type": "Tornado Warning", - "severity": "extreme", - "headline": "Tornado Warning", - "lat": 43.615, - "lon": -116.2023, - } - event = adapter.to_event(raw) - assert event.lat == 43.615 - assert event.lon == -116.2023 - - -def test_to_event_sets_nws_zones(adapter): - """to_event sets nws_zones from areas.""" - raw = { - "event_id": "test", - "event_type": "Red Flag Warning", - "severity": "severe", - "headline": "Red Flag Warning", - "areas": ["IDZ016", "IDZ030", "IDZ031"], - } - event = adapter.to_event(raw) - assert event.nws_zones == ["IDZ016", "IDZ030", "IDZ031"] - - -def test_to_event_sets_group_key_from_event_id(adapter): - """to_event sets group_key to event_id for dedup.""" - raw = { - "event_id": "urn:oid:2.49.0.1.840.0.abc123", - "event_type": "Tornado Warning", - "severity": "extreme", - "headline": "Tornado Warning", - } - event = adapter.to_event(raw) - assert event.group_key == "urn:oid:2.49.0.1.840.0.abc123" - - -def test_to_event_warning_sets_inhibit_keys(adapter): - """Warnings set inhibit_keys for corresponding Watch/Advisory.""" - raw = { - "event_id": "test", - "event_type": "Winter Storm Warning", - "severity": "severe", - "headline": "Winter Storm Warning", - } - event = adapter.to_event(raw) - assert "nws:Winter Storm Watch" in event.inhibit_keys - assert "nws:Winter Storm Advisory" in event.inhibit_keys - - -def test_to_event_watch_no_inhibit_keys(adapter): - """Watches do not set inhibit_keys.""" - raw = { - "event_id": "test", - "event_type": "Tornado Watch", - "severity": "moderate", - "headline": "Tornado Watch", - } - event = adapter.to_event(raw) - assert event.inhibit_keys == [] - - -def test_to_event_preserves_raw_in_data(adapter): - """to_event preserves raw event dict in data field.""" - raw = { - "event_id": "test-123", - "event_type": "Wind Advisory", - "severity": "moderate", - "headline": "Wind Advisory", - "custom_field": "custom_value", - } - event = adapter.to_event(raw) - assert event.data == raw - assert event.data["custom_field"] == "custom_value" - - -# ============================================================ -# INTEGRATION: _map_nws_severity -# ============================================================ - -def test_map_nws_severity_extreme_to_immediate(adapter): - """Extreme NWS severity maps to immediate.""" - assert adapter._map_nws_severity("extreme") == "immediate" - - -def test_map_nws_severity_severe_to_priority(adapter): - """Severe NWS severity maps to priority.""" - assert adapter._map_nws_severity("severe") == "priority" - - -def test_map_nws_severity_moderate_to_routine(adapter): - """Moderate NWS severity maps to routine.""" - assert adapter._map_nws_severity("moderate") == "routine" - - -def test_map_nws_severity_minor_to_routine(adapter): - """Minor NWS severity maps to routine.""" - assert adapter._map_nws_severity("minor") == "routine" diff --git a/tests/test_channel_rendering.py b/tests/test_channel_rendering.py deleted file mode 100644 index c69c91a..0000000 --- a/tests/test_channel_rendering.py +++ /dev/null @@ -1,214 +0,0 @@ -"""Tests for channel-renderer integration (Phase 2.5b).""" - -import asyncio -import time -from unittest.mock import MagicMock, AsyncMock, patch - -import pytest - -from meshai.notifications.events import NotificationPayload -from meshai.notifications.channels import ( - MeshBroadcastChannel, - MeshDMChannel, - EmailChannel, - WebhookChannel, -) - - -# ============================================================ -# MESH CHANNEL RENDERING TESTS -# ============================================================ - -def test_mesh_channel_uses_mesh_renderer(): - """MeshBroadcastChannel renders long messages to multiple chunks.""" - mock_connector = MagicMock() - - channel = MeshBroadcastChannel( - connector=mock_connector, - channel_index=0, - ) - - # Build a long message that will require chunking - long_message = "This is a very long alert message that exceeds the character limit. " * 5 - - payload = NotificationPayload( - message=long_message, - category="weather_warning", - severity="priority", - timestamp=time.time(), - event_type="weather_warning", - ) - - asyncio.run(channel.deliver(payload, None)) - - # Should have called send_message multiple times (once per chunk) - assert mock_connector.send_message.call_count >= 2 - - # Each call's text should be <= 200 chars - for call in mock_connector.send_message.call_args_list: - text = call.kwargs.get("text", call.args[0] if call.args else "") - assert len(text) <= 200 - - -def test_mesh_channel_uses_payload_message_directly_when_chunk_metadata_set(): - """Pre-chunked payloads (from digest) skip re-rendering.""" - mock_connector = MagicMock() - - channel = MeshBroadcastChannel( - connector=mock_connector, - channel_index=0, - ) - - # Payload with chunk metadata set (from digest scheduler) - payload = NotificationPayload( - message="pre-chunked text", - category="digest", - severity="routine", - timestamp=time.time(), - chunk_index=1, - chunk_total=3, - ) - - asyncio.run(channel.deliver(payload, None)) - - # Should have called send_message exactly once - assert mock_connector.send_message.call_count == 1 - # Should use the message directly - call = mock_connector.send_message.call_args - text = call.kwargs.get("text", call.args[0] if call.args else "") - assert text == "pre-chunked text" - - -def test_mesh_dm_channel_uses_mesh_renderer(): - """MeshDMChannel renders long messages to chunks for each recipient.""" - mock_connector = MagicMock() - - channel = MeshDMChannel( - connector=mock_connector, - node_ids=["!node1", "!node2"], - ) - - long_message = "This is a long DM message that should be chunked. " * 4 - - payload = NotificationPayload( - message=long_message, - category="test", - severity="routine", - timestamp=time.time(), - ) - - asyncio.run(channel.deliver(payload, None)) - - # Should have called send_message multiple times - # (chunks * nodes) - assert mock_connector.send_message.call_count >= 2 - - -def test_mesh_dm_channel_uses_payload_message_directly_when_chunk_metadata_set(): - """Pre-chunked DM payloads skip re-rendering.""" - mock_connector = MagicMock() - - channel = MeshDMChannel( - connector=mock_connector, - node_ids=["!node1"], - ) - - payload = NotificationPayload( - message="pre-chunked DM", - category="digest", - severity="routine", - timestamp=time.time(), - chunk_index=2, - chunk_total=5, - ) - - asyncio.run(channel.deliver(payload, None)) - - # Should use message directly, once per node - assert mock_connector.send_message.call_count == 1 - call = mock_connector.send_message.call_args - text = call.kwargs.get("text", call.args[0] if call.args else "") - assert text == "pre-chunked DM" - - -# ============================================================ -# EMAIL CHANNEL RENDERING TESTS -# ============================================================ - -def test_email_channel_uses_email_renderer(): - """EmailChannel uses renderer for subject and body.""" - channel = EmailChannel( - smtp_host="localhost", - smtp_port=25, - smtp_user="", - smtp_password="", - smtp_tls=False, - from_address="test@example.com", - recipients=["user@example.com"], - ) - - payload = NotificationPayload( - message="Test alert message", - category="weather_warning", - severity="immediate", - timestamp=time.time(), - event_type="weather_warning", - ) - - # Mock the _send_email method - with patch.object(channel, "_send_email") as mock_send: - asyncio.run(channel.deliver(payload, None)) - - # Should have been called with renderer output - mock_send.assert_called_once() - call_args = mock_send.call_args - subject = call_args.args[0] - body = call_args.args[1] - - # Renderer format checks - assert "[MeshAI]" in subject - assert "IMMEDIATE" in subject - assert "Test alert message" in body - assert "Severity:" in body - - -# ============================================================ -# WEBHOOK CHANNEL RENDERING TESTS -# ============================================================ - -def test_webhook_channel_uses_webhook_renderer(): - """WebhookChannel uses renderer for JSON payload.""" - channel = WebhookChannel( - url="https://example.com/webhook", - headers={}, - ) - - payload = NotificationPayload( - message="Test webhook message", - category="test", - severity="priority", - timestamp=time.time(), - event_type="battery_warning", - ) - - # Mock httpx - with patch("meshai.notifications.channels.httpx.AsyncClient") as mock_client_class: - mock_client = MagicMock() - mock_response = MagicMock() - mock_response.status_code = 200 - mock_client.post = AsyncMock(return_value=mock_response) - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client - - asyncio.run(channel.deliver(payload, None)) - - # Check the POST was called - mock_client.post.assert_called_once() - call_kwargs = mock_client.post.call_args.kwargs - - # Should have JSON payload with schema_version - json_payload = call_kwargs.get("json", {}) - assert "schema_version" in json_payload - assert json_payload["schema_version"] == "1.0" - assert json_payload["message"] == "Test webhook message" diff --git a/tests/test_pipeline_digest.py b/tests/test_pipeline_digest.py index 0ec26b5..4c9832c 100644 --- a/tests/test_pipeline_digest.py +++ b/tests/test_pipeline_digest.py @@ -1,15 +1,15 @@ -"""Tests for Phase 2.4 DigestAccumulator with LLM summaries. +"""Tests for Phase 2.3a DigestAccumulator. -Updated from Phase 2.3a to reflect new behavior: -- No active/resolved tracking (just event log) -- LLM-summarized output per toggle -- render_digest is async +27 tests covering: +- Accumulator active/since_last behavior (6 tests) +- Renderer output (8 tests) +- Mesh chunks (7 tests) +- Include toggles (3 tests) +- Pipeline integration (3 tests) """ -import asyncio -import inspect import time -from unittest.mock import MagicMock, AsyncMock, patch +from unittest.mock import MagicMock, patch import pytest @@ -24,45 +24,11 @@ from meshai.config import Config # ============================================================ -# MOCK LLM BACKEND +# ACCUMULATOR ACTIVE/SINCE_LAST TESTS # ============================================================ -class MockLLMBackend: - """Mock LLM backend for testing.""" - - def __init__(self, response: str = "Mock summary of events."): - self.response = response - self.calls = [] - - async def generate(self, messages, system_prompt, max_tokens=200): - self.calls.append({ - "messages": messages, - "system_prompt": system_prompt, - "max_tokens": max_tokens, - }) - return self.response - - -class FailingLLMBackend: - """Mock LLM that raises exceptions.""" - - async def generate(self, messages, system_prompt, max_tokens=200): - raise RuntimeError("LLM unavailable") - - -def _make_mock_backend(): - """Create a standard mock LLM backend for tests.""" - mock = MagicMock() - mock.generate = AsyncMock(return_value="stub summary") - return mock - - -# ============================================================ -# ACCUMULATOR EVENT LOGGING TESTS -# ============================================================ - -def test_enqueue_logs_event(): - """Enqueue adds event to the log.""" +def test_enqueue_adds_to_active(): + """Enqueue one routine Event with no expires → active_count == 1.""" acc = DigestAccumulator() event = make_event( source="test", @@ -71,404 +37,294 @@ def test_enqueue_logs_event(): title="Wind Advisory", ) acc.enqueue(event) - assert acc.event_count() == 1 + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 -def test_enqueue_multiple_events_same_toggle(): - """Multiple events for same toggle all logged.""" +def test_enqueue_same_id_updates_in_place(): + """Enqueue same id twice → still 1 active, title updated.""" acc = DigestAccumulator() - for i in range(3): + event1 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="abc", + title="initial", + ) + event2 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="abc", + title="updated", + ) + acc.enqueue(event1) + acc.enqueue(event2) + assert acc.active_count() == 1 + # Check the held event's title + toggle = "weather" + events = acc._active.get(toggle, []) + assert len(events) == 1 + assert events[0].title == "updated" + + +def test_two_different_ids_both_active(): + """Two different routine events → both active.""" + acc = DigestAccumulator() + event1 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="ev1", + title="Event 1", + ) + event2 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="ev2", + title="Event 2", + ) + acc.enqueue(event1) + acc.enqueue(event2) + assert acc.active_count() == 2 + + +def test_resolution_marker_in_title_moves_active_to_since_last(): + """Resolution marker in title moves matching active to since_last.""" + acc = DigestAccumulator() + event1 = make_event( + source="test", + category="wildfire_proximity", + severity="priority", + group_key="fire:42", + title="Snake River Fire", + ) + acc.enqueue(event1) + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 + + event2 = make_event( + source="test", + category="wildfire_proximity", + severity="priority", + group_key="fire:42", + title="Snake River Fire ended", + ) + acc.enqueue(event2) + assert acc.active_count() == 0 + assert acc.since_last_count() == 1 + + +def test_expired_event_via_tick_moves_to_since_last(): + """tick() moves expired events from active to since_last.""" + acc = DigestAccumulator() + base_time = 1000000.0 + + # Monkeypatch _now to control time + acc._now = lambda: base_time + + event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Temporary Warning", + expires=base_time + 60, # expires in 60 seconds + ) + acc.enqueue(event) + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 + + # Tick at base_time + 30 → still active + moved = acc.tick(now=base_time + 30) + assert moved == 0 + assert acc.active_count() == 1 + + # Tick at base_time + 120 → expired, moved to since_last + moved = acc.tick(now=base_time + 120) + assert moved == 1 + assert acc.active_count() == 0 + assert acc.since_last_count() == 1 + + +def test_render_digest_clears_since_last_but_keeps_active(): + """render_digest() clears since_last but preserves active.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add an active event + active_event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Ongoing Storm", + ) + acc.enqueue(active_event) + + # Add an event that becomes since_last via resolution marker + resolved_event = make_event( + source="test", + category="road_closure", + severity="routine", + group_key="roads:99", + title="US-93 reopened at MP 47", + ) + acc.enqueue(resolved_event) + + # Now we should have 1 active, 1 since_last + assert acc.active_count() == 1 + assert acc.since_last_count() == 1 + + # Render digest + digest = acc.render_digest(now=base_time) + assert len(digest.active) > 0 + assert len(digest.since_last) > 0 + + # After render: active preserved, since_last cleared + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 + + # Second render has only active + digest2 = acc.render_digest(now=base_time + 10) + assert len(digest2.active) > 0 + assert len(digest2.since_last) == 0 + + +# ============================================================ +# RENDERER TESTS +# ============================================================ + +def test_render_full_lists_active_and_since_last_with_labels(): + """Full render includes ACTIVE NOW, SINCE LAST DIGEST, toggle labels.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Weather event (active) + weather_event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Wind Advisory until 21:00", + ) + acc.enqueue(weather_event) + + # Roads event with resolution marker → since_last + roads_event = make_event( + source="test", + category="road_closure", + severity="routine", + title="US-93 reopened at MP 47", + ) + acc.enqueue(roads_event) + + digest = acc.render_digest(now=base_time) + + assert "ACTIVE NOW:" in digest.full + assert "[Weather]" in digest.full + assert "Wind Advisory" in digest.full + assert "SINCE LAST DIGEST:" in digest.full + assert "[Roads]" in digest.full + assert "US-93" in digest.full + + +def test_render_mesh_compact_under_char_limit(): + """Each mesh chunk is <= 200 chars.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add 10 events across 4 toggles + categories = [ + ("weather_warning", "Weather Event"), + ("weather_warning", "Weather Event 2"), + ("weather_warning", "Weather Event 3"), + ("wildfire_proximity", "Fire Event"), + ("wildfire_proximity", "Fire Event 2"), + ("battery_warning", "Mesh Event"), + ("battery_warning", "Mesh Event 2"), + ("battery_warning", "Mesh Event 3"), + ("road_closure", "Road Event"), + ("road_closure", "Road Event 2"), + ] + for i, (cat, title) in enumerate(categories): event = make_event( source="test", - category="weather_warning", + category=cat, severity="routine", id=f"ev{i}", - title=f"Event {i}", + title=title, ) acc.enqueue(event) - assert acc.event_count() == 3 - assert acc.event_count("weather") == 3 + digest = acc.render_digest(now=base_time) -def test_enqueue_multiple_toggles(): - """Events across multiple toggles all logged.""" - acc = DigestAccumulator() - acc.enqueue(make_event( - source="test", - category="weather_warning", - severity="routine", - title="Weather", - )) - acc.enqueue(make_event( - source="test", - category="wildfire_proximity", - severity="priority", - title="Fire", - )) - acc.enqueue(make_event( - source="test", - category="battery_warning", - severity="immediate", - title="Mesh", - )) - assert acc.event_count() == 3 - assert acc.event_count("weather") == 1 - assert acc.event_count("fire") == 1 - assert acc.event_count("mesh_health") == 1 - - -def test_enqueue_skips_excluded_toggles(): - """Events for non-included toggles are dropped.""" - acc = DigestAccumulator(include_toggles=["weather"]) - acc.enqueue(make_event( - source="test", - category="weather_warning", - severity="routine", - title="Weather", - )) - acc.enqueue(make_event( - source="test", - category="wildfire_proximity", - severity="routine", - title="Fire", - )) - assert acc.event_count() == 1 - assert acc.event_count("weather") == 1 - assert acc.event_count("fire") == 0 - - -def test_tick_is_noop(): - """tick() does nothing in Phase 2.4+.""" - acc = DigestAccumulator() - acc.enqueue(make_event( - source="test", - category="weather_warning", - severity="routine", - title="Event", - )) - result = acc.tick() - assert result == 0 - assert acc.event_count() == 1 - - -# ============================================================ -# RENDER DIGEST TESTS -# ============================================================ - -def test_render_digest_is_async(): - """render_digest is an async coroutine function.""" - assert inspect.iscoroutinefunction(DigestAccumulator.render_digest) - - -def test_render_digest_clears_event_log(): - """render_digest clears the event log after rendering.""" - mock_llm = MockLLMBackend() - acc = DigestAccumulator(llm_backend=mock_llm) - acc.enqueue(make_event( - source="test", - category="weather_warning", - severity="routine", - title="Event", - )) - assert acc.event_count() == 1 - - asyncio.run(acc.render_digest()) - assert acc.event_count() == 0 - - -def test_render_digest_sets_last_digest_at(): - """render_digest updates last_digest_at timestamp.""" - mock_llm = MockLLMBackend() - acc = DigestAccumulator(llm_backend=mock_llm) - acc.enqueue(make_event( - source="test", - category="weather_warning", - severity="routine", - title="Event", - )) - - now = 1234567890.0 - asyncio.run(acc.render_digest(now=now)) - assert acc.last_digest_at() == now - - -def test_render_digest_empty_shows_no_alerts(): - """Empty accumulator produces 'No alerts' message.""" - acc = DigestAccumulator() - digest = asyncio.run(acc.render_digest()) - - assert "No alerts since last digest" in digest.full - assert "No alerts since last digest" in digest.mesh_chunks[0] - - -# ============================================================ -# LLM INTEGRATION TESTS -# ============================================================ - -def test_digest_calls_llm_once_per_non_empty_toggle(): - """LLM is called once per toggle that has events.""" - mock_llm = MockLLMBackend(response="Summary for toggle.") - acc = DigestAccumulator(llm_backend=mock_llm) - - # Add events to 3 different toggles - acc.enqueue(make_event(source="test", category="weather_warning", - severity="routine", title="Weather")) - acc.enqueue(make_event(source="test", category="wildfire_proximity", - severity="routine", title="Fire")) - acc.enqueue(make_event(source="test", category="battery_warning", - severity="routine", title="Mesh")) - - asyncio.run(acc.render_digest()) - - assert len(mock_llm.calls) == 3 - - -def test_digest_line_uses_llm_output(): - """Digest lines contain the LLM's summary output.""" - mock_llm = MockLLMBackend(response="Severe storms moving through the area.") - acc = DigestAccumulator(llm_backend=mock_llm) - - acc.enqueue(make_event( - source="test", - category="weather_warning", - severity="priority", - title="Storm Warning", - )) - - digest = asyncio.run(acc.render_digest()) - - assert "[Weather] Severe storms moving through the area." in digest.full - assert "Severe storms moving through the area" in digest.mesh_compact - - -def test_digest_falls_back_to_count_when_llm_raises(): - """When LLM fails, fallback to count-based summary.""" - failing_llm = FailingLLMBackend() - acc = DigestAccumulator(llm_backend=failing_llm) - - acc.enqueue(make_event(source="test", category="battery_warning", - severity="routine", title="Event 1")) - acc.enqueue(make_event(source="test", category="battery_warning", - severity="routine", title="Event 2")) - acc.enqueue(make_event(source="test", category="battery_warning", - severity="routine", title="Event 3")) - - digest = asyncio.run(acc.render_digest()) - - assert "[Mesh]" in digest.full - assert "3 event(s)" in digest.full - assert "LLM unavailable" in digest.full - - -def test_digest_falls_back_when_no_llm(): - """When no LLM backend, fallback to count-based summary.""" - acc = DigestAccumulator(llm_backend=None) - - acc.enqueue(make_event(source="test", category="weather_warning", - severity="routine", title="Event")) - - digest = asyncio.run(acc.render_digest()) - - assert "[Weather]" in digest.full - assert "1 event(s)" in digest.full - - -def test_digest_input_orders_by_severity_then_time(): - """LLM input lists events by severity (immediate first) then timestamp.""" - mock_llm = MockLLMBackend() - acc = DigestAccumulator(llm_backend=mock_llm) - - # Enqueue in wrong order: routine, then immediate, then priority - acc.enqueue(make_event(source="test", category="weather_warning", - severity="routine", title="Routine Event", - timestamp=10.0)) - acc.enqueue(make_event(source="test", category="weather_warning", - severity="immediate", title="Immediate Event", - timestamp=20.0)) - acc.enqueue(make_event(source="test", category="weather_warning", - severity="priority", title="Priority Event", - timestamp=30.0)) - - asyncio.run(acc.render_digest()) - - # Check the LLM input - assert len(mock_llm.calls) == 1 - user_content = mock_llm.calls[0]["messages"][0]["content"] - - # Find positions of each event in the input - immediate_pos = user_content.find("IMMEDIATE") - priority_pos = user_content.find("PRIORITY") - routine_pos = user_content.find("ROUTINE") - - assert immediate_pos < priority_pos, "Immediate should appear before priority" - assert priority_pos < routine_pos, "Priority should appear before routine" - - -# ============================================================ -# MESH CHUNKS TESTS -# ============================================================ - -def test_mesh_chunks_single_chunk_when_short(): - """Single short summary produces one chunk without counter.""" - mock_llm = MockLLMBackend(response="Brief summary.") - acc = DigestAccumulator(llm_backend=mock_llm) - - acc.enqueue(make_event(source="test", category="weather_warning", - severity="routine", title="Event")) - - digest = asyncio.run(acc.render_digest()) - - assert len(digest.mesh_chunks) == 1 + # All chunks should be <= 200 chars + assert all(len(c) <= 200 for c in digest.mesh_chunks) + assert len(digest.mesh_chunks) >= 1 + # Should have proper structure assert digest.mesh_chunks[0].startswith("DIGEST ") - assert "(1/" not in digest.mesh_chunks[0] -def test_mesh_chunks_under_char_limit(): - """Each mesh chunk is <= 200 characters.""" - mock_llm = MockLLMBackend(response="Summary of events for this category.") - acc = DigestAccumulator(llm_backend=mock_llm) - - # Add events to multiple toggles - for cat in ["weather_warning", "wildfire_proximity", "battery_warning", - "road_closure", "avalanche_warning"]: - acc.enqueue(make_event(source="test", category=cat, - severity="routine", title="Event")) - - digest = asyncio.run(acc.render_digest()) - - for chunk in digest.mesh_chunks: - assert len(chunk) <= 210, f"Chunk exceeds limit: {len(chunk)} chars" - - -def test_mesh_chunks_splits_when_many_toggles(): - """Many toggle summaries split into multiple chunks.""" - # Longer summaries to force splitting - mock_llm = MockLLMBackend( - response="A fairly detailed summary of the events in this category." - ) - acc = DigestAccumulator(llm_backend=mock_llm, mesh_char_limit=150) - - # Add events to multiple toggles - for cat in ["weather_warning", "wildfire_proximity", "battery_warning", - "road_closure", "avalanche_warning"]: - acc.enqueue(make_event(source="test", category=cat, - severity="routine", title="Event")) - - digest = asyncio.run(acc.render_digest()) - - assert len(digest.mesh_chunks) >= 2 - - # Check chunk counters - total = len(digest.mesh_chunks) - for i, chunk in enumerate(digest.mesh_chunks): - assert f"({i+1}/{total})" in chunk - - -def test_mesh_chunks_empty_is_single_chunk(): - """Empty digest produces single chunk.""" +def test_render_mesh_compact_empty_shows_no_alerts_message(): + """Empty accumulator renders 'No alerts since last digest' in mesh_compact.""" acc = DigestAccumulator() - digest = asyncio.run(acc.render_digest()) + base_time = 1000000.0 + acc._now = lambda: base_time - assert len(digest.mesh_chunks) == 1 - assert "No alerts since last digest" in digest.mesh_chunks[0] - assert "(1/" not in digest.mesh_chunks[0] + digest = acc.render_digest(now=base_time) + assert "No alerts since last digest" in digest.mesh_compact + assert "DIGEST " in digest.mesh_compact + assert "All quiet" not in digest.mesh_compact -def test_mesh_compact_joins_chunks(): - """mesh_compact joins chunks with separator when multiple.""" - mock_llm = MockLLMBackend(response="Summary of events.") - acc = DigestAccumulator(llm_backend=mock_llm, mesh_char_limit=100) - - for cat in ["weather_warning", "wildfire_proximity", "battery_warning", - "road_closure"]: - acc.enqueue(make_event(source="test", category=cat, - severity="routine", title="Event")) - - digest = asyncio.run(acc.render_digest()) - - if len(digest.mesh_chunks) > 1: - expected = "\n---\n".join(digest.mesh_chunks) - assert digest.mesh_compact == expected - else: - assert digest.mesh_compact == digest.mesh_chunks[0] - - -# ============================================================ -# INCLUDE TOGGLES TESTS -# ============================================================ - -def test_rf_propagation_excluded_by_default(): - """rf_propagation toggle is excluded by default.""" +def test_render_full_handles_empty_accumulator(): + """Empty accumulator → is_empty() True, shows 'No alerts since last digest'.""" acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time - # Find an rf_propagation category - rf_category = None - for cat_id, cat_info in ALERT_CATEGORIES.items(): - if cat_info.get("toggle") == "rf_propagation": - rf_category = cat_id - break - - if rf_category: - acc.enqueue(make_event(source="test", category=rf_category, - severity="routine", title="RF Event")) - assert acc.event_count() == 0 + digest = acc.render_digest(now=base_time) + assert digest.is_empty() is True + assert "No alerts since last digest" in digest.full + assert "ACTIVE NOW" not in digest.full + assert "ACTIVE NOW: nothing" not in digest.full -def test_include_toggles_overrides_default(): - """include_toggles parameter controls which toggles are tracked.""" - mock_llm = MockLLMBackend() +def test_render_orders_toggles_by_priority(): + """Toggles appear in TOGGLE_ORDER sequence in full output.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time - # Find an rf_propagation category - rf_category = None - for cat_id, cat_info in ALERT_CATEGORIES.items(): - if cat_info.get("toggle") == "rf_propagation": - rf_category = cat_id - break - - acc = DigestAccumulator( - llm_backend=mock_llm, - include_toggles=["rf_propagation", "weather"] + # Add one event each for weather, mesh_health, and fire + # (intentionally out of order) + mesh_event = make_event( + source="test", + category="battery_warning", # maps to mesh_health toggle + severity="routine", + title="Mesh battery low", ) + fire_event = make_event( + source="test", + category="wildfire_proximity", + severity="routine", + title="Fire nearby", + ) + weather_event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Storm coming", + ) + acc.enqueue(mesh_event) + acc.enqueue(fire_event) + acc.enqueue(weather_event) - if rf_category: - acc.enqueue(make_event(source="test", category=rf_category, - severity="routine", title="RF Event")) - acc.enqueue(make_event(source="test", category="wildfire_proximity", - severity="routine", title="Fire Event")) + digest = acc.render_digest(now=base_time) - # RF should be kept (in include list), fire should be dropped - expected_count = 1 if rf_category else 0 - assert acc.event_count() == expected_count - - -def test_include_toggles_unknown_name_accepted(): - """Unknown toggle names don't crash.""" - acc = DigestAccumulator(include_toggles=["weather", "future_toggle"]) - acc.enqueue(make_event(source="test", category="weather_warning", - severity="routine", title="Event")) - assert acc.event_count() == 1 - - -# ============================================================ -# TOGGLE ORDER TESTS -# ============================================================ - -def test_digest_orders_toggles_correctly(): - """Toggle lines appear in TOGGLE_ORDER sequence.""" - mock_llm = MockLLMBackend(response="Summary.") - acc = DigestAccumulator(llm_backend=mock_llm) - - # Add events in wrong order - acc.enqueue(make_event(source="test", category="battery_warning", - severity="routine", title="Mesh")) - acc.enqueue(make_event(source="test", category="wildfire_proximity", - severity="routine", title="Fire")) - acc.enqueue(make_event(source="test", category="weather_warning", - severity="routine", title="Weather")) - - digest = asyncio.run(acc.render_digest()) - - # Check order in full output: weather, fire, ..., mesh_health + # In TOGGLE_ORDER: weather, fire, ..., mesh_health weather_pos = digest.full.find("[Weather]") fire_pos = digest.full.find("[Fire]") mesh_pos = digest.full.find("[Mesh]") @@ -477,21 +333,453 @@ def test_digest_orders_toggles_correctly(): assert fire_pos < mesh_pos, "Fire should appear before Mesh" -# ============================================================ -# PIPELINE INTEGRATION TESTS -# ============================================================ - -def test_pipeline_routes_event_to_accumulator(): - """Events via bus.emit end up in DigestAccumulator.""" - config = Config() - bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = \ - build_pipeline_components(config, _make_mock_backend()) +def test_format_event_line_does_not_append_expires_hint(): + """_format_event_line() does NOT append '(until HH:MM)' anymore.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time event = make_event( source="test", category="weather_warning", severity="routine", - title="Test event", + title="Severe Thunderstorm Warning", + expires=base_time + 3600, # 1 hour in future + ) + + line = acc._format_event_line(event) + assert "until " not in line + assert "(" not in line + + +def test_mesh_compact_shows_one_line_per_toggle(): + """Each toggle gets exactly one line, with (+N) for overflow.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add 2 weather events, 1 fire event, 1 mesh event + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id="w1", + title="Weather Event 1", + )) + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id="w2", + title="Weather Event 2", + )) + acc.enqueue(make_event( + source="test", + category="wildfire_proximity", + severity="routine", + id="f1", + title="Fire Event", + )) + acc.enqueue(make_event( + source="test", + category="battery_warning", + severity="routine", + id="m1", + title="Mesh Event", + )) + + digest = acc.render_digest(now=base_time) + + # Count occurrences of each toggle label + weather_count = digest.mesh_compact.count("[Weather]") + fire_count = digest.mesh_compact.count("[Fire]") + mesh_count = digest.mesh_compact.count("[Mesh]") + + assert weather_count == 1, "Should have exactly one [Weather] line" + assert fire_count == 1, "Should have exactly one [Fire] line" + assert mesh_count == 1, "Should have exactly one [Mesh] line" + + # Weather line should have (+1) since there are 2 weather events + weather_line = [l for l in digest.mesh_compact.split("\n") if "[Weather]" in l][0] + assert "(+1)" in weather_line + + +def test_mesh_compact_active_and_resolved_sections(): + """mesh_compact has ACTIVE NOW and RESOLVED sections when both present.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add 1 active weather event + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Storm Warning", + )) + + # Add 1 resolution event for roads (contains "reopened") + acc.enqueue(make_event( + source="test", + category="road_closure", + severity="routine", + title="US-93 reopened at MP 47", + )) + + digest = acc.render_digest(now=base_time) + + # Check section markers in the joined compact string + assert "ACTIVE NOW" in digest.mesh_compact + assert "RESOLVED" in digest.mesh_compact + + # ACTIVE NOW should appear before RESOLVED + active_pos = digest.mesh_compact.find("ACTIVE NOW") + resolved_pos = digest.mesh_compact.find("RESOLVED") + assert active_pos < resolved_pos, "ACTIVE NOW should appear before RESOLVED" + + +def test_mesh_compact_line_truncates_long_headline(): + """Long headlines are truncated in mesh_compact.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Create a 200-char summary + long_summary = "A" * 200 + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Weather Event", + summary=long_summary, + )) + + digest = acc.render_digest(now=base_time) + + # The [Weather] line should be shorter than the raw summary + weather_line = [l for l in digest.mesh_compact.split("\n") if "[Weather]" in l][0] + assert len(weather_line) < len(long_summary) + + +# ============================================================ +# MESH CHUNKS TESTS +# ============================================================ + +def test_mesh_chunks_single_chunk_when_short(): + """Single short event produces one chunk with no counter.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Short event", + summary="Brief summary", + )) + + digest = acc.render_digest(now=base_time) + + assert len(digest.mesh_chunks) == 1 + assert digest.mesh_chunks[0].startswith("DIGEST ") + assert "(1/" not in digest.mesh_chunks[0] # No chunk counter when single + assert digest.mesh_compact == digest.mesh_chunks[0] + + +def test_mesh_chunks_splits_when_overflow(): + """Many events with long summaries produce multiple chunks.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add events with long summaries across different toggles + toggles = [ + ("weather_warning", "Severe storm warning for Magic Valley area"), + ("wildfire_proximity", "Fire proximity alert 8mi NE of position"), + ("battery_warning", "Battery critical on node BLD-MTN system"), + ("road_closure", "Road closure US-93 at milepost forty seven"), + ("avalanche_warning", "Avalanche danger high in backcountry area"), + ] + for i, (cat, summary) in enumerate(toggles): + acc.enqueue(make_event( + source="test", + category=cat, + severity="routine", + id=f"ev{i}", + title=f"Event {i}", + summary=summary, + )) + + digest = acc.render_digest(now=base_time) + + # Should have multiple chunks + assert len(digest.mesh_chunks) >= 2 + + # Each chunk should have proper header with counter + total = len(digest.mesh_chunks) + for i, chunk in enumerate(digest.mesh_chunks): + assert chunk.startswith("DIGEST ") + assert f"({i+1}/{total})" in chunk + + # All chunks should be within limit + assert all(len(c) <= 200 for c in digest.mesh_chunks) + + +def test_mesh_chunks_does_not_split_within_a_line(): + """A toggle line appears intact in exactly one chunk.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add event with specific summary we can search for + target_summary = "Mesh node BLD-MTN battery at critical level" + acc.enqueue(make_event( + source="test", + category="battery_warning", + severity="routine", + title="Battery Alert", + summary=target_summary, + )) + # Add more events to possibly force chunking + for i in range(5): + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id=f"w{i}", + title=f"Weather {i}", + summary=f"Weather event description number {i} for testing", + )) + + digest = acc.render_digest(now=base_time) + + # Find chunks containing [Mesh] + mesh_chunks = [c for c in digest.mesh_chunks if "[Mesh]" in c] + assert len(mesh_chunks) == 1, "Mesh toggle should appear in exactly one chunk" + + # The summary text should be in that chunk (possibly truncated but not split) + mesh_chunk = mesh_chunks[0] + assert "[Mesh]" in mesh_chunk + + +def test_mesh_chunks_section_header_continuation(): + """Section headers spanning chunks get '(cont)' suffix.""" + acc = DigestAccumulator(mesh_char_limit=150) # Smaller limit to force splits + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add many events to force ACTIVE NOW to span chunks + for i in range(8): + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id=f"w{i}", + title=f"Weather Event {i}", + summary=f"Weather warning number {i} for the area", + )) + + digest = acc.render_digest(now=base_time) + + if len(digest.mesh_chunks) >= 2: + # Check if any non-first chunk has continuation header + for i, chunk in enumerate(digest.mesh_chunks[1:], start=2): + if "[Weather]" in chunk or any(f"[{t}]" in chunk for t in ["Fire", "Mesh", "Roads"]): + # This chunk has toggle lines, check for section header + if "ACTIVE NOW" in chunk: + assert "ACTIVE NOW (cont)" in chunk, f"Chunk {i} should have (cont) suffix" + + +def test_mesh_chunks_empty_digest_is_single_chunk(): + """Empty digest produces single chunk with no counter.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + digest = acc.render_digest(now=base_time) + + assert len(digest.mesh_chunks) == 1 + assert "No alerts since last digest" in digest.mesh_chunks[0] + assert "(1/" not in digest.mesh_chunks[0] + + +def test_mesh_compact_string_is_joined_chunks(): + """mesh_compact is chunks joined with separator when multiple chunks.""" + acc = DigestAccumulator(mesh_char_limit=120) # Small limit to force multiple chunks + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add events to force multiple chunks + for i in range(6): + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id=f"w{i}", + title=f"Event {i}", + summary=f"Summary for weather event number {i}", + )) + + digest = acc.render_digest(now=base_time) + + if len(digest.mesh_chunks) > 1: + expected = "\n---\n".join(digest.mesh_chunks) + assert digest.mesh_compact == expected + else: + assert digest.mesh_compact == digest.mesh_chunks[0] + + +def test_include_toggles_unknown_name_does_not_crash(): + """Unknown toggle names in include_toggles are silently accepted.""" + acc = DigestAccumulator(include_toggles=["weather", "made_up_future_toggle"]) + base_time = 1000000.0 + acc._now = lambda: base_time + + # Weather should work + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Weather event", + )) + + # rf_propagation should be excluded (not in include list) + rf_category = None + for cat_id, cat_info in ALERT_CATEGORIES.items(): + if cat_info.get("toggle") == "rf_propagation": + rf_category = cat_id + break + + if rf_category: + acc.enqueue(make_event( + source="test", + category=rf_category, + severity="routine", + title="RF event", + )) + + # Weather kept, RF dropped + assert acc.active_count() == 1 + + # Should not raise + digest = acc.render_digest(now=base_time) + assert "[Weather]" in digest.full + + +# ============================================================ +# INCLUDE TOGGLES TESTS +# ============================================================ + +def test_rf_propagation_events_excluded_from_digest_by_default(): + """rf_propagation toggle is excluded by default (not in default include).""" + acc = DigestAccumulator() # default config + base_time = 1000000.0 + acc._now = lambda: base_time + + # Find a category that maps to rf_propagation + rf_category = None + for cat_id, cat_info in ALERT_CATEGORIES.items(): + if cat_info.get("toggle") == "rf_propagation": + rf_category = cat_id + break + + assert rf_category is not None, "Should find an rf_propagation category" + + event = make_event( + source="test", + category=rf_category, + severity="routine", + title="HF Blackout", + ) + acc.enqueue(event) + + # Should NOT be in active + assert acc.active_count() == 0 + + digest = acc.render_digest(now=base_time) + assert "[RF]" not in digest.full + + +def test_include_toggles_parameter_overrides_default(): + """include_toggles parameter controls which toggles are tracked.""" + # Only include rf_propagation and weather + acc = DigestAccumulator(include_toggles=["rf_propagation", "weather"]) + base_time = 1000000.0 + acc._now = lambda: base_time + + # Find rf_propagation category + rf_category = None + for cat_id, cat_info in ALERT_CATEGORIES.items(): + if cat_info.get("toggle") == "rf_propagation": + rf_category = cat_id + break + + # Enqueue rf_propagation event - should be kept + acc.enqueue(make_event( + source="test", + category=rf_category, + severity="routine", + title="HF Blackout", + )) + assert acc.active_count() == 1 + + # Enqueue fire event - should be dropped (fire not in include) + acc.enqueue(make_event( + source="test", + category="wildfire_proximity", + severity="routine", + title="Fire Alert", + )) + assert acc.active_count() == 1 # Still 1, fire was dropped + + digest = acc.render_digest(now=base_time) + assert "[RF]" in digest.full + assert "[Fire]" not in digest.full + + +def test_include_toggles_explicit_subset(): + """include_toggles with explicit subset only tracks those toggles.""" + acc = DigestAccumulator(include_toggles=["weather"]) + base_time = 1000000.0 + acc._now = lambda: base_time + + # Weather - included + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Weather event", + )) + + # Fire - not included + acc.enqueue(make_event( + source="test", + category="wildfire_proximity", + severity="routine", + title="Fire event", + )) + + # Tracking - not included (and may not have categories anyway) + # Just verify the count is only 1 + assert acc.active_count() == 1 + + +# ============================================================ +# PIPELINE INTEGRATION TESTS +# ============================================================ + +def test_pipeline_routes_routine_event_to_accumulator(): + """Routine event via bus.emit ends up in DigestAccumulator.""" + config = Config() + bus, inhibitor, grouper, severity_router, dispatcher, digest = \ + build_pipeline_components(config) + + event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Test routine event", ) # Flush through grouper @@ -499,25 +787,31 @@ def test_pipeline_routes_event_to_accumulator(): bus.emit(event) grouper.flush_all() - assert accumulator.event_count() == 1 + assert digest.active_count() == 1 -def test_pipeline_routes_immediate_to_both(): - """Immediate events go to both dispatcher and accumulator in Phase 2.4.""" +def test_pipeline_routes_immediate_event_to_dispatcher_not_accumulator(): + """Immediate event goes to dispatcher, not accumulator.""" config = Config() - bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = \ - build_pipeline_components(config, _make_mock_backend()) + bus, inhibitor, grouper, severity_router, dispatcher, digest = \ + build_pipeline_components(config) + + # Mock the severity_router's immediate handler (already bound to dispatcher.dispatch) + mock_immediate = MagicMock() + severity_router._immediate = mock_immediate event = make_event( source="test", category="weather_warning", severity="immediate", - title="Immediate event", + title="Test immediate event", ) grouper.flush_all() bus.emit(event) grouper.flush_all() - # In Phase 2.4, all events go to accumulator - assert accumulator.event_count() == 1 + # Immediate handler should have been called + assert mock_immediate.called + # Accumulator should have nothing + assert digest.active_count() == 0 diff --git a/tests/test_pipeline_scheduler.py b/tests/test_pipeline_scheduler.py index 6c118f3..4606e93 100644 --- a/tests/test_pipeline_scheduler.py +++ b/tests/test_pipeline_scheduler.py @@ -1,9 +1,6 @@ -"""Tests for DigestScheduler (Phase 2.3b + 2.4). +"""Tests for DigestScheduler (Phase 2.3b). Uses asyncio.run() since pytest-asyncio is not available in the container. - -Updated in Phase 2.4: render_digest is now async, accumulator mocks -must return awaitables. """ import asyncio @@ -11,12 +8,12 @@ import time from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Optional -from unittest.mock import MagicMock, AsyncMock, call +from unittest.mock import MagicMock, call import pytest from meshai.notifications.events import make_event -from meshai.notifications.pipeline.digest import DigestAccumulator, Digest +from meshai.notifications.pipeline.digest import DigestAccumulator from meshai.notifications.pipeline.scheduler import DigestScheduler @@ -60,15 +57,8 @@ class MockChannel: def __init__(self): self.deliveries = [] - async def deliver(self, payload, rule=None): + def deliver(self, payload: dict): self.deliveries.append(payload) - return True - - -class MockLLMBackend: - """Mock LLM backend for accumulator.""" - async def generate(self, messages, system_prompt, max_tokens=200): - return "Mock summary." def make_scheduler( @@ -94,14 +84,13 @@ def make_scheduler( channels = {} - def channel_factory(rule, connector=None): + def channel_factory(rule): ch = MockChannel() channels[rule.name] = ch return ch if accumulator is None: - # Use mock LLM backend for async render_digest - accumulator = DigestAccumulator(llm_backend=MockLLMBackend()) + accumulator = DigestAccumulator() scheduler = DigestScheduler( accumulator=accumulator, @@ -135,31 +124,37 @@ class TestScheduleComputation: def test_parse_schedule_invalid_falls_back(self): """Invalid schedules fall back to 07:00.""" scheduler, _, _ = make_scheduler() + # Bad format assert scheduler._parse_schedule("7:00:00") == (7, 0) assert scheduler._parse_schedule("invalid") == (7, 0) assert scheduler._parse_schedule("") == (7, 0) + # Out of range assert scheduler._parse_schedule("25:00") == (7, 0) assert scheduler._parse_schedule("12:60") == (7, 0) def test_next_fire_at_future_today(self): """If schedule time is later today, returns today's timestamp.""" + # Set clock to 06:00 on a known date base_dt = datetime(2024, 6, 15, 6, 0, 0) base_ts = base_dt.timestamp() scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts) next_fire = scheduler._next_fire_at(base_ts) + # Should be 07:00 same day expected_dt = datetime(2024, 6, 15, 7, 0, 0) assert abs(next_fire - expected_dt.timestamp()) < 1 def test_next_fire_at_past_today_schedules_tomorrow(self): """If schedule time has passed today, returns tomorrow's timestamp.""" + # Set clock to 08:00 on a known date base_dt = datetime(2024, 6, 15, 8, 0, 0) base_ts = base_dt.timestamp() scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts) next_fire = scheduler._next_fire_at(base_ts) + # Should be 07:00 next day expected_dt = datetime(2024, 6, 16, 7, 0, 0) assert abs(next_fire - expected_dt.timestamp()) < 1 @@ -171,6 +166,7 @@ class TestScheduleComputation: scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts) next_fire = scheduler._next_fire_at(base_ts) + # Should be 07:00 next day expected_dt = datetime(2024, 6, 16, 7, 0, 0) assert abs(next_fire - expected_dt.timestamp()) < 1 @@ -185,7 +181,7 @@ class TestScheduleComputation: config.notifications.digest = None scheduler = DigestScheduler( - accumulator=DigestAccumulator(llm_backend=MockLLMBackend()), + accumulator=DigestAccumulator(), config=config, channel_factory=lambda r: MockChannel(), ) @@ -199,7 +195,8 @@ class TestFireBehavior: def test_fire_delivers_to_matching_rule(self): """_fire() delivers digest to rules with schedule_match='digest'.""" - accumulator = DigestAccumulator(llm_backend=MockLLMBackend()) + accumulator = DigestAccumulator() + # Add an event so digest has content accumulator.enqueue(make_event( source="test", category="weather_warning", @@ -224,8 +221,9 @@ class TestFireBehavior: ch = channels["digest-mesh"] assert len(ch.deliveries) == 1 payload = ch.deliveries[0] - assert payload.category == "digest" - assert payload.severity == "routine" + assert payload["category"] == "digest" + assert payload["severity"] == "routine" + assert "Test alert" in payload["message"] or "Weather" in payload["message"] def test_fire_skips_disabled_rules(self): """Disabled rules are not delivered to.""" @@ -238,6 +236,7 @@ class TestFireBehavior: asyncio.run(run_fire()) + # Channel should not be created for disabled rule assert "disabled" not in channels def test_fire_skips_non_schedule_rules(self): @@ -266,10 +265,8 @@ class TestFireBehavior: def test_fire_mesh_delivery_chunks(self): """Mesh delivery types get per-chunk delivery.""" - accumulator = DigestAccumulator( - llm_backend=MockLLMBackend(), - mesh_char_limit=100, - ) + accumulator = DigestAccumulator(mesh_char_limit=100) + # Add multiple events to force chunking for i in range(5): accumulator.enqueue(make_event( source="test", @@ -292,14 +289,16 @@ class TestFireBehavior: asyncio.run(run_fire()) ch = channels["mesh"] + # Should have multiple deliveries (one per chunk) assert len(ch.deliveries) >= 1 + # Check chunk metadata for payload in ch.deliveries: - assert payload.chunk_index is not None - assert payload.chunk_total is not None + assert "chunk_index" in payload + assert "chunk_total" in payload def test_fire_email_delivery_full_text(self): """Email delivery type gets single full-text delivery.""" - accumulator = DigestAccumulator(llm_backend=MockLLMBackend()) + accumulator = DigestAccumulator() accumulator.enqueue(make_event( source="test", category="weather_warning", @@ -321,8 +320,8 @@ class TestFireBehavior: ch = channels["email"] assert len(ch.deliveries) == 1 payload = ch.deliveries[0] - assert payload.chunk_index is None - assert "--- " in payload.message + assert "chunk_index" not in payload + assert "--- " in payload["message"] # Full format has header def test_fire_updates_last_fire_at(self): """_fire() updates last_fire_at timestamp.""" @@ -351,7 +350,7 @@ class TestFireBehavior: ch = channels["mesh"] assert len(ch.deliveries) == 1 - assert "No alerts" in ch.deliveries[0].message + assert "No alerts" in ch.deliveries[0]["message"] # ---- Lifecycle Tests ---- @@ -403,7 +402,9 @@ class TestLifecycle: scheduler, _, _ = make_scheduler() async def run_stop(): + # Never started await scheduler.stop() + # Should not raise asyncio.run(run_stop()) @@ -413,8 +414,10 @@ class TestLifecycle: async def fake_sleep(duration): sleep_calls.append(duration) + # Actually sleep briefly so we can cancel await asyncio.sleep(0.01) + # Set clock far from schedule time to get long sleep base_dt = datetime(2024, 6, 15, 8, 0, 0) scheduler, _, _ = make_scheduler( schedule="07:00", @@ -424,11 +427,14 @@ class TestLifecycle: async def run_test(): await scheduler.start() + # Give task time to enter sleep await asyncio.sleep(0.05) await scheduler.stop() asyncio.run(run_test()) + # Task should have exited cleanly + # ---- Integration Tests ---- @@ -438,8 +444,9 @@ class TestIntegration: def test_scheduler_fires_on_schedule(self): """Scheduler fires when schedule time arrives.""" fire_times = [] - accumulator = DigestAccumulator(llm_backend=MockLLMBackend()) + accumulator = DigestAccumulator() + # Start at 06:59:59.95 (50ms before 07:00), delay will be ~50ms clock_time = [datetime(2024, 6, 15, 6, 59, 59, 950000).timestamp()] def fake_clock(): @@ -451,27 +458,31 @@ class TestIntegration: accumulator=accumulator, ) + # Track when fire happens original_fire = scheduler._fire async def tracking_fire(now): fire_times.append(now) await original_fire(now) + # After first fire, advance clock so next cycle has long delay clock_time[0] = datetime(2024, 6, 15, 8, 0, 0).timestamp() scheduler._fire = tracking_fire async def run_test(): await scheduler.start() + # Wait for the ~50ms delay plus some buffer await asyncio.sleep(0.2) await scheduler.stop() asyncio.run(run_test()) + # Should have fired once assert len(fire_times) >= 1 def test_scheduler_multiple_rules(self): """Scheduler delivers to multiple matching rules.""" - accumulator = DigestAccumulator(llm_backend=MockLLMBackend()) + accumulator = DigestAccumulator() accumulator.enqueue(make_event( source="test", category="weather_warning", @@ -496,6 +507,7 @@ class TestIntegration: asyncio.run(run_fire()) + # All three should have received deliveries assert "mesh1" in channels assert "mesh2" in channels assert "email" in channels @@ -505,7 +517,7 @@ class TestIntegration: def test_scheduler_handles_delivery_error(self): """Scheduler continues after delivery error.""" - accumulator = DigestAccumulator(llm_backend=MockLLMBackend()) + accumulator = DigestAccumulator() accumulator.enqueue(make_event( source="test", category="weather_warning", @@ -521,11 +533,11 @@ class TestIntegration: call_order = [] - def bad_channel_factory(rule, connector=None): + def bad_channel_factory(rule): call_order.append(rule.name) if rule.name == "bad": ch = MagicMock() - ch.deliver = AsyncMock(side_effect=RuntimeError("delivery failed")) + ch.deliver.side_effect = RuntimeError("delivery failed") return ch return MockChannel() @@ -542,6 +554,7 @@ class TestIntegration: asyncio.run(run_fire()) + # Both rules should have been attempted assert "bad" in call_order assert "good" in call_order diff --git a/tests/test_pipeline_skeleton.py b/tests/test_pipeline_skeleton.py index d554967..aea59b0 100644 --- a/tests/test_pipeline_skeleton.py +++ b/tests/test_pipeline_skeleton.py @@ -2,16 +2,10 @@ These tests verify the core routing and dispatch behavior of the notification pipeline without requiring real channel backends. - -Updated in Phase 2.4: Events now go to BOTH dispatcher and accumulator -(no severity-based fork). SeverityRouter class kept for backward -compatibility but not used in production wiring. """ -import asyncio - import pytest -from unittest.mock import Mock, AsyncMock, patch +from unittest.mock import Mock, patch from dataclasses import dataclass, field from meshai.notifications.events import Event, make_event @@ -45,7 +39,6 @@ class ConfigStub: class TestImmediateDispatch: def test_immediate_event_with_matching_rule_dispatches(self): - """Immediate events reach the dispatcher and get delivered.""" rule = NotificationRuleConfigStub( enabled=True, trigger_type="condition", @@ -57,10 +50,15 @@ class TestImmediateDispatch: notifications=NotificationsConfigStub(rules=[rule]) ) mock_channel = Mock() - mock_channel.deliver = AsyncMock(return_value=True) mock_factory = Mock(return_value=mock_channel) bus = EventBus() dispatcher = Dispatcher(config, mock_factory) + digest = StubDigestQueue() + router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest.enqueue, + ) + bus.subscribe(router.handle) event = make_event( source="test", category="test_cat", @@ -68,99 +66,80 @@ class TestImmediateDispatch: title="Test Alert", summary="Test summary message", ) - # Run dispatch in async context since it's now async - asyncio.run(dispatcher.dispatch(event)) + bus.emit(event) assert mock_channel.deliver.call_count == 1 alert = mock_channel.deliver.call_args[0][0] - assert alert.category == "test_cat" - assert alert.severity == "immediate" - assert alert.message + assert alert["category"] == "test_cat" + assert alert["severity"] == "immediate" + assert alert["message"] -class TestTeeRouting: - """Phase 2.4: Events go to BOTH dispatcher and accumulator.""" +class TestDigestRouting: - def test_routine_event_goes_to_both_dispatcher_and_accumulator(self): - """Routine events reach both dispatcher and accumulator in Phase 2.4.""" + def test_routine_event_goes_to_digest_not_dispatcher(self): rule = NotificationRuleConfigStub( enabled=True, trigger_type="condition", categories=["test_cat"], min_severity="routine", - delivery_type="mesh_broadcast", ) config = ConfigStub( notifications=NotificationsConfigStub(rules=[rule]) ) - mock_channel = Mock() - mock_channel.deliver = AsyncMock(return_value=True) - mock_factory = Mock(return_value=mock_channel) - - # Create dispatcher + mock_factory = Mock() + bus = EventBus() dispatcher = Dispatcher(config, mock_factory) + digest = StubDigestQueue() + with patch.object(dispatcher, "dispatch", wraps=dispatcher.dispatch) as mock_dispatch: + router = SeverityRouter( + immediate_handler=mock_dispatch, + digest_handler=digest.enqueue, + ) + bus.subscribe(router.handle) + event = make_event( + source="test", + category="test_cat", + severity="routine", + title="Routine Alert", + ) + bus.emit(event) + assert len(digest) == 1 + mock_dispatch.assert_not_called() - # Create accumulator mock - accumulator_calls = [] - def mock_enqueue(event): - accumulator_calls.append(event) - - event = make_event( - source="test", - category="test_cat", - severity="routine", - title="Routine Alert", - ) - - # Run dispatch in async context - asyncio.run(dispatcher.dispatch(event)) - mock_enqueue(event) - - # Both paths received the event - assert len(accumulator_calls) == 1 - # Dispatcher found a matching rule and delivered - assert mock_channel.deliver.call_count == 1 - - def test_priority_event_goes_to_both_dispatcher_and_accumulator(self): - """Priority events reach both dispatcher and accumulator in Phase 2.4.""" + def test_priority_event_goes_to_digest_not_dispatcher(self): rule = NotificationRuleConfigStub( enabled=True, trigger_type="condition", categories=["test_cat"], min_severity="routine", - delivery_type="mesh_broadcast", ) config = ConfigStub( notifications=NotificationsConfigStub(rules=[rule]) ) - mock_channel = Mock() - mock_channel.deliver = AsyncMock(return_value=True) - mock_factory = Mock(return_value=mock_channel) - + mock_factory = Mock() + bus = EventBus() dispatcher = Dispatcher(config, mock_factory) - - accumulator_calls = [] - def mock_enqueue(event): - accumulator_calls.append(event) - - event = make_event( - source="test", - category="test_cat", - severity="priority", - title="Priority Alert", - ) - - # Run dispatch in async context - asyncio.run(dispatcher.dispatch(event)) - mock_enqueue(event) - - assert len(accumulator_calls) == 1 - assert mock_channel.deliver.call_count == 1 + digest = StubDigestQueue() + with patch.object(dispatcher, "dispatch", wraps=dispatcher.dispatch) as mock_dispatch: + router = SeverityRouter( + immediate_handler=mock_dispatch, + digest_handler=digest.enqueue, + ) + bus.subscribe(router.handle) + event = make_event( + source="test", + category="test_cat", + severity="priority", + title="Priority Alert", + ) + bus.emit(event) + assert len(digest) == 1 + mock_dispatch.assert_not_called() class TestNoMatchingRule: def test_immediate_event_with_no_matching_rule_skips_silently(self): - """Events with no matching rules don't crash.""" config = ConfigStub( notifications=NotificationsConfigStub(rules=[]) ) @@ -186,7 +165,6 @@ class TestNoMatchingRule: class TestSubscriberIsolation: def test_subscriber_exception_isolation(self): - """Exceptions in one subscriber don't affect others.""" bus = EventBus() def failing_handler(event): @@ -208,7 +186,6 @@ class TestSubscriberIsolation: class TestUnknownSeverity: def test_unknown_severity_dropped_without_crash(self): - """Events with unknown severity are dropped gracefully.""" config = ConfigStub( notifications=NotificationsConfigStub(rules=[]) ) diff --git a/tests/test_pipeline_toggle_filter.py b/tests/test_pipeline_toggle_filter.py deleted file mode 100644 index 9afa93e..0000000 --- a/tests/test_pipeline_toggle_filter.py +++ /dev/null @@ -1,132 +0,0 @@ -"""Tests for ToggleFilter (Phase 2.4).""" - -import pytest -from unittest.mock import MagicMock, AsyncMock - -from meshai.notifications.events import make_event -from meshai.notifications.pipeline.toggle_filter import ToggleFilter -from meshai.notifications.pipeline import build_pipeline_components -from meshai.config import Config - - -class TestToggleFilter: - """Unit tests for ToggleFilter.""" - - def test_toggle_filter_passes_through_when_enabled_is_none(self): - """Filter with enabled_toggles=None passes all events.""" - received = [] - filter_ = ToggleFilter( - next_handler=received.append, - enabled_toggles=None, - ) - event = make_event( - source="test", - category="weather_warning", - severity="priority", - title="Test", - ) - filter_.handle(event) - assert len(received) == 1 - assert received[0] is event - - def test_toggle_filter_drops_event_when_toggle_not_enabled(self): - """Filter drops events whose toggle isn't in enabled set.""" - received = [] - filter_ = ToggleFilter( - next_handler=received.append, - enabled_toggles={"weather"}, - ) - # wildfire_proximity maps to "fire" toggle - event = make_event( - source="test", - category="wildfire_proximity", - severity="priority", - title="Fire", - ) - filter_.handle(event) - assert len(received) == 0 - - def test_toggle_filter_passes_event_when_toggle_enabled(self): - """Filter passes events whose toggle is in enabled set.""" - received = [] - filter_ = ToggleFilter( - next_handler=received.append, - enabled_toggles={"weather"}, - ) - event = make_event( - source="test", - category="weather_warning", - severity="priority", - title="Weather", - ) - filter_.handle(event) - assert len(received) == 1 - - def test_toggle_filter_drops_unknown_category_when_filter_active(self): - """Unknown category maps to 'other', dropped if 'other' not enabled.""" - received = [] - filter_ = ToggleFilter( - next_handler=received.append, - enabled_toggles={"weather"}, - ) - event = make_event( - source="test", - category="bogus_category", - severity="priority", - title="Unknown", - ) - filter_.handle(event) - # "bogus_category" has no toggle mapping, falls back to "other" - # "other" is not in enabled set - assert len(received) == 0 - - def test_toggle_filter_passes_other_when_enabled(self): - """'other' toggle passes unknown categories when enabled.""" - received = [] - filter_ = ToggleFilter( - next_handler=received.append, - enabled_toggles={"other"}, - ) - event = make_event( - source="test", - category="bogus_category", - severity="priority", - title="Unknown", - ) - filter_.handle(event) - assert len(received) == 1 - - -class TestToggleFilterPipelineWiring: - """Integration tests for toggle filter in pipeline.""" - - def test_toggle_filter_pipeline_drops_disabled_toggle(self): - """Events for disabled toggles don't reach dispatcher or accumulator.""" - config = Config() - - # Pass mock LLM backend - mock_backend = MagicMock() - mock_backend.generate = AsyncMock(return_value="stub summary") - - # Note: without toggles.enabled set, filter is a no-op - # This test verifies the wiring is correct - bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, mock_backend) - - # Verify toggle_filter is in the chain - assert toggle_filter is not None - assert hasattr(toggle_filter, 'handle') - - def test_build_pipeline_uses_provided_backend(self): - """build_pipeline_components uses the provided llm_backend.""" - config = Config() - - # Sentinel backend with unique attribute - sentinel = MagicMock() - sentinel.unique_marker = "I_AM_THE_SENTINEL" - sentinel.generate = AsyncMock(return_value="sentinel summary") - - bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, sentinel) - - # Accumulator should have the exact sentinel instance - assert accumulator._llm is sentinel - assert accumulator._llm.unique_marker == "I_AM_THE_SENTINEL" diff --git a/tests/test_renderers.py b/tests/test_renderers.py deleted file mode 100644 index b898dc0..0000000 --- a/tests/test_renderers.py +++ /dev/null @@ -1,317 +0,0 @@ -"""Tests for Phase 2.5b per-channel-type renderers.""" - -import json -import re -import time - -import pytest - -from meshai.notifications.events import NotificationPayload, make_event, make_payload_from_event -from meshai.notifications.renderers import MeshRenderer, EmailRenderer, WebhookRenderer - - -# ============================================================ -# MESH RENDERER TESTS -# ============================================================ - -def test_mesh_render_short_message_single_chunk(): - """Short message produces a single chunk.""" - payload = NotificationPayload( - message="Test alert", - category="test", - severity="routine", - timestamp=time.time(), - ) - renderer = MeshRenderer() - chunks = renderer.render(payload) - assert len(chunks) == 1 - assert "Test alert" in chunks[0] - - -def test_mesh_render_event_type_prefix(): - """Known event type adds toggle label prefix.""" - payload = NotificationPayload( - message="Severe storm", - category="weather_warning", - severity="priority", - timestamp=time.time(), - event_type="weather_warning", - ) - renderer = MeshRenderer() - chunks = renderer.render(payload) - assert len(chunks) == 1 - assert chunks[0].startswith("[Weather]") - - -def test_mesh_render_unknown_event_type_no_prefix(): - """Unknown event type does not add a prefix.""" - payload = NotificationPayload( - message="Hello", - category="made_up_thing", - severity="routine", - timestamp=time.time(), - event_type="made_up_thing", - ) - renderer = MeshRenderer() - chunks = renderer.render(payload) - assert len(chunks) == 1 - assert not chunks[0].startswith("[") - - -def test_mesh_render_long_message_chunks(): - """Long message splits into multiple chunks with counters.""" - # Build a ~500 char message - long_message = "This is a very long alert message that should definitely exceed the two hundred character limit. " * 5 - payload = NotificationPayload( - message=long_message, - category="weather_warning", - severity="priority", - timestamp=time.time(), - event_type="weather_warning", - ) - renderer = MeshRenderer() - chunks = renderer.render(payload) - - assert len(chunks) >= 2 - for chunk in chunks: - assert len(chunk) <= 200, f"Chunk exceeds limit: {len(chunk)} chars" - # Check each chunk ends with "(k/N)" counter - for chunk in chunks: - assert re.search(r"\(\d+/\d+\)$", chunk), f"Missing counter: {chunk}" - - -def test_mesh_render_chunks_preserve_full_content(): - """Chunking preserves all words from the original message.""" - # Build a message with distinct tokens - words = ["alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", - "hotel", "india", "juliet", "kilo", "lima", "mike", "november", - "oscar", "papa", "quebec", "romeo", "sierra", "tango", "uniform", - "victor", "whiskey", "xray", "yankee", "zulu"] - long_message = " ".join(words * 3) # Repeat to span multiple chunks - - payload = NotificationPayload( - message=long_message, - category="test", - severity="routine", - timestamp=time.time(), - ) - renderer = MeshRenderer() - chunks = renderer.render(payload) - - # Join all chunks and remove counters - combined = " ".join(chunks) - combined = re.sub(r"\s*\(\d+/\d+\)", "", combined) - - # Every original word should appear - for word in words: - assert word in combined, f"Missing word: {word}" - - -def test_mesh_render_no_event_type(): - """Payload without event_type has no prefix.""" - payload = NotificationPayload( - message="Plain message", - category="test", - severity="routine", - timestamp=time.time(), - event_type=None, - ) - renderer = MeshRenderer() - chunks = renderer.render(payload) - assert len(chunks) == 1 - assert chunks[0] == "Plain message" - - -# ============================================================ -# EMAIL RENDERER TESTS -# ============================================================ - -def test_email_render_subject_includes_severity_and_type(): - """Subject line includes severity and event type.""" - payload = NotificationPayload( - message="Storm approaching", - category="weather_warning", - severity="immediate", - timestamp=time.time(), - event_type="weather_warning", - ) - renderer = EmailRenderer() - rendered = renderer.render(payload) - - assert "IMMEDIATE" in rendered["subject"] - assert "Weather Warning" in rendered["subject"] - - -def test_email_render_body_includes_message_and_context(): - """Body includes message and structured context fields.""" - fixed_time = 1700000000.0 # Known timestamp - payload = NotificationPayload( - message="Test alert message", - category="battery_warning", - severity="priority", - timestamp=fixed_time, - event_type="battery_warning", - region="Magic Valley", - node_name="BLD-MTN", - ) - renderer = EmailRenderer() - rendered = renderer.render(payload) - body = rendered["body"] - - assert "Test alert message" in body - assert "Magic Valley" in body - assert "BLD-MTN" in body - assert "priority" in body - assert "battery_warning" in body - # Check formatted date - assert "2023-11-14" in body # Date part of timestamp - - -def test_email_render_omits_missing_context(): - """Body omits lines for missing optional fields.""" - payload = NotificationPayload( - message="Minimal alert", - category="test", - severity="routine", - timestamp=time.time(), - # No region, no node, no event_type - ) - renderer = EmailRenderer() - rendered = renderer.render(payload) - body = rendered["body"] - - assert "Minimal alert" in body - assert "Severity: routine" in body - assert "Region:" not in body - assert "Node:" not in body - assert "Category:" not in body - - -def test_email_render_includes_source_event(): - """Body includes source event details when present.""" - event = make_event( - source="weather_adapter", - category="weather_warning", - severity="priority", - title="Severe Storm", - summary="Severe storm expected", - ) - payload = make_payload_from_event(event) - - renderer = EmailRenderer() - rendered = renderer.render(payload) - body = rendered["body"] - - assert "weather_adapter" in body - assert "Severe Storm" in body - - -# ============================================================ -# WEBHOOK RENDERER TESTS -# ============================================================ - -def test_webhook_render_has_schema_version(): - """Output includes schema_version field.""" - payload = NotificationPayload( - message="Test", - category="test", - severity="routine", - timestamp=time.time(), - ) - renderer = WebhookRenderer() - rendered = renderer.render(payload) - - assert rendered["schema_version"] == "1.0" - - -def test_webhook_render_omits_none_fields(): - """None optional fields are omitted, not set to null.""" - payload = NotificationPayload( - message="Test message", - category="test", - severity="routine", - timestamp=time.time(), - # All optional fields default to None - ) - renderer = WebhookRenderer() - rendered = renderer.render(payload) - - # Required fields present - assert "message" in rendered - assert "severity" in rendered - assert "timestamp" in rendered - assert "schema_version" in rendered - - # Optional fields omitted - assert "node_id" not in rendered - assert "region" not in rendered - assert "chunk_index" not in rendered - - -def test_webhook_render_includes_source_event_when_present(): - """source_event dict included when payload has source event.""" - event = make_event( - source="test_adapter", - category="test", - severity="routine", - title="Test Event", - ) - payload = make_payload_from_event(event) - - renderer = WebhookRenderer() - rendered = renderer.render(payload) - - assert "source_event" in rendered - assert rendered["source_event"]["id"] == event.id - assert rendered["source_event"]["source"] == "test_adapter" - - -def test_webhook_render_is_json_serializable(): - """Rendered output is JSON-serializable (no datetime objects).""" - event = make_event( - source="test", - category="test", - severity="routine", - title="Test", - ) - payload = make_payload_from_event(event) - payload.chunk_index = 1 - payload.chunk_total = 3 - payload.region = "Test Region" - payload.node_name = "Test-Node" - - renderer = WebhookRenderer() - rendered = renderer.render(payload) - - # Should not raise - json_str = json.dumps(rendered) - assert isinstance(json_str, str) - # Verify round-trip - parsed = json.loads(json_str) - assert parsed["message"] == payload.message - - -def test_webhook_render_includes_optional_fields_when_set(): - """Optional fields included when they have values.""" - payload = NotificationPayload( - message="Test", - category="test_category", - severity="priority", - timestamp=time.time(), - event_type="battery_warning", - node_id="!abc123", - node_name="Test-Node", - region="Test Region", - chunk_index=2, - chunk_total=5, - ) - renderer = WebhookRenderer() - rendered = renderer.render(payload) - - assert rendered["category"] == "test_category" - assert rendered["event_type"] == "battery_warning" - assert rendered["node_id"] == "!abc123" - assert rendered["node_name"] == "Test-Node" - assert rendered["region"] == "Test Region" - assert rendered["chunk_index"] == 2 - assert rendered["chunk_total"] == 5