Compare commits

..

No commits in common. "9c5a106c9fe08b690a20c052d90685f2e21b1064" and "d6bc6b2b8973a5408c5267aa20d53161ccbd305e" have entirely different histories.

26 changed files with 1147 additions and 2622 deletions

View file

@ -484,14 +484,6 @@ class NotificationRuleConfig:
channel_ids: list = field(default_factory=list)
@dataclass
class TogglesConfig:
"""Master toggle filter settings."""
enabled: list[str] = field(default_factory=list) # Toggle names that are enabled (empty = all)
@dataclass
class DigestConfig:
"""Digest scheduler settings."""
@ -508,7 +500,6 @@ class NotificationsConfig:
quiet_hours_enabled: bool = True # Master toggle for quiet hours
quiet_hours_start: str = "22:00"
quiet_hours_end: str = "06:00"
toggles: TogglesConfig = field(default_factory=TogglesConfig)
digest: DigestConfig = field(default_factory=DigestConfig)
rules: list = field(default_factory=list) # List of NotificationRuleConfig
@ -681,8 +672,6 @@ def _dict_to_dataclass(cls, data: dict):
kwargs[key] = _dict_to_dataclass(FIRMSConfig, value)
elif key == "dashboard" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(DashboardConfig, value)
elif key == "toggles" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(TogglesConfig, value)
elif key == "digest" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(DigestConfig, value)
elif key == "notifications" and isinstance(value, dict):

60
meshai/env/firms.py vendored
View file

@ -3,12 +3,10 @@
import json
import logging
import time
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from meshai.notifications.events import Event, make_event
if TYPE_CHECKING:
from ..config import FIRMSConfig
@ -344,62 +342,6 @@ class FIRMSAdapter:
return (None, None)
def to_event(self, evt: dict) -> Optional["Event"]:
"""Translate a stored FIRMS event dict into a pipeline Event.
Args:
evt: Internal event dict from get_events()
Returns:
Event instance ready for EventBus emission, or None if
the dict is missing required fields (lat/lon).
"""
try:
lat = evt.get("lat")
lon = evt.get("lon")
if lat is None or lon is None:
return None # Can't make a useful Event without coords
props = evt.get("properties", {}) or {}
is_new_ignition = bool(props.get("new_ignition", False))
category = "new_ignition" if is_new_ignition else "wildfire_proximity"
severity = evt.get("severity", "routine")
title = evt.get("headline", "") or "Fire Hotspot"
# Build a richer summary including FRP, confidence, distance
summary_parts = [title]
if props.get("frp") is not None:
summary_parts.append(f"FRP {int(props['frp'])} MW")
if props.get("confidence"):
summary_parts.append(f"conf {props['confidence']}")
if props.get("distance_km") is not None and props.get("nearest_anchor"):
summary_parts.append(
f"{int(props['distance_km'])} km from {props['nearest_anchor']}"
)
summary = " | ".join(summary_parts)[:300]
spatial_key = f"firms:{round(lat, 2):.2f}:{round(lon, 2):.2f}"
return make_event(
source="firms",
category=category,
severity=severity,
title=title,
summary=summary,
timestamp=evt.get("fetched_at"),
expires=evt.get("expires"),
region=props.get("nearest_anchor"),
lat=lat,
lon=lon,
group_key=spatial_key,
inhibit_keys=[spatial_key],
)
except Exception:
logger.exception(f"FIRMS to_event failed for evt: {evt.get('event_id')}")
return None
def get_events(self) -> list:
"""Get current hotspot events."""
return self._events

69
meshai/env/nws.py vendored
View file

@ -4,12 +4,10 @@ import json
import logging
import time
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from meshai.notifications.events import Event, make_event
if TYPE_CHECKING:
from ..config import NWSConfig
@ -41,71 +39,6 @@ class NWSAlertsAdapter:
else: # moderate, minor, unknown
return "routine"
def _derive_category(self, event_type: str) -> str:
"""Derive notification category from NWS event type suffix.
NWS event types like "Red Flag Warning", "Winter Storm Watch",
"Wind Advisory" map to our fine-grained weather categories.
Args:
event_type: NWS event type string (e.g., "Tornado Warning")
Returns:
Category key: weather_warning, weather_watch, weather_advisory,
or weather_statement
"""
event_type_lower = event_type.lower()
if event_type_lower.endswith("warning"):
return "weather_warning"
elif event_type_lower.endswith("watch"):
return "weather_watch"
elif event_type_lower.endswith("advisory"):
return "weather_advisory"
else:
# Covers "Special Weather Statement", "Short Term Forecast", etc.
return "weather_statement"
def to_event(self, raw: dict) -> Event:
"""Convert internal event dict to pipeline Event.
Args:
raw: Internal event dict from get_events()
Returns:
Event instance ready for EventBus emission
"""
event_type = raw.get("event_type", "Unknown")
category = self._derive_category(event_type)
nws_severity = raw.get("severity", "unknown")
severity = self._map_nws_severity(nws_severity)
# Build group_key for dedup: same alert ID should merge
group_key = raw.get("event_id", "")
# Build inhibit_keys: a Warning supersedes Watch/Advisory for same hazard
inhibit_keys = []
if category == "weather_warning":
# Warning inhibits corresponding Watch/Advisory
base = event_type.rsplit(" ", 1)[0] if " " in event_type else event_type
inhibit_keys = [f"nws:{base} Watch", f"nws:{base} Advisory"]
return make_event(
source="nws",
category=category,
severity=severity,
title=raw.get("headline", event_type),
summary=raw.get("headline", ""),
body=raw.get("description", ""),
effective=raw.get("onset") or None,
expires=raw.get("expires") or None,
lat=raw.get("lat"),
lon=raw.get("lon"),
nws_zones=raw.get("areas", []),
group_key=group_key,
inhibit_keys=inhibit_keys,
data=raw,
)
def tick(self) -> bool:
"""Execute one polling tick.

32
meshai/env/store.py vendored
View file

@ -2,11 +2,10 @@
import logging
import time
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import EnvironmentalConfig
from ..notifications.pipeline import EventBus
logger = logging.getLogger(__name__)
@ -14,15 +13,9 @@ logger = logging.getLogger(__name__)
class EnvironmentalStore:
"""Cache and tick-driver for all environmental feed adapters."""
def __init__(
self,
config: "EnvironmentalConfig",
region_anchors: list = None,
event_bus: Optional["EventBus"] = None,
):
def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None):
self._adapters = {} # name -> adapter instance
self._events = {} # (source, event_id) -> event dict
self._event_bus = event_bus # Pipeline EventBus for emission
self._swpc_status = {} # Kp/SFI/scales snapshot
self._ducting_status = {} # tropo ducting assessment
self._mesh_zones = config.nws_zones or []
@ -94,29 +87,12 @@ class EnvironmentalStore:
self._swpc_status = adapter.get_status()
# Also ingest any alert events (R-scale >= 3)
for evt in adapter.get_events():
key = (evt["source"], evt["event_id"])
is_new = key not in self._events
self._events[key] = evt
if is_new and self._event_bus and hasattr(adapter, "to_event"):
self._emit_event(adapter, evt)
self._events[(evt["source"], evt["event_id"])] = evt
elif name == "ducting":
self._ducting_status = adapter.get_status()
else:
for evt in adapter.get_events():
key = (evt["source"], evt["event_id"])
is_new = key not in self._events
self._events[key] = evt
if is_new and self._event_bus and hasattr(adapter, "to_event"):
self._emit_event(adapter, evt)
def _emit_event(self, adapter, raw_evt: dict):
"""Convert raw event to pipeline Event and emit to bus."""
try:
event = adapter.to_event(raw_evt)
self._event_bus.emit(event)
logger.debug("Emitted %s event %s to pipeline", event.source, event.id)
except Exception as e:
logger.warning("Failed to emit event to pipeline: %s", e)
self._events[(evt["source"], evt["event_id"])] = evt
def _purge_expired(self):
"""Remove expired events."""

View file

@ -174,33 +174,12 @@ ALERT_CATEGORIES = {
# Environmental - Weather
"weather_warning": {
"name": "Severe Weather Warning",
"description": "NWS Warning affecting your mesh area — highest urgency weather alert",
"name": "Severe Weather",
"description": "NWS warning or advisory affecting your mesh area",
"default_severity": "priority",
"example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z",
"toggle": "weather",
},
"weather_watch": {
"name": "Weather Watch",
"description": "NWS Watch affecting your mesh area — conditions favorable for hazardous weather",
"default_severity": "routine",
"example_message": "⏳ Winter Storm Watch — Wood River Valley. Heavy snow possible Thu night through Fri.",
"toggle": "weather",
},
"weather_advisory": {
"name": "Weather Advisory",
"description": "NWS Advisory affecting your mesh area — weather may cause inconvenience",
"default_severity": "routine",
"example_message": " Wind Advisory — Magic Valley. SW winds 25-35 mph with gusts to 50 mph.",
"toggle": "weather",
},
"weather_statement": {
"name": "Weather Statement",
"description": "NWS Special Weather Statement — general awareness, no specific hazard",
"default_severity": "routine",
"example_message": "📋 Special Weather Statement — Isolated thunderstorms possible this afternoon.",
"toggle": "weather",
},
# Environmental - Space Weather
"hf_blackout": {

View file

@ -14,10 +14,6 @@ import httpx
if TYPE_CHECKING:
from ..connector import MeshConnector
from ..config import NotificationRuleConfig
from .events import NotificationPayload
from meshai.notifications.renderers import MeshRenderer, EmailRenderer, WebhookRenderer
logger = logging.getLogger(__name__)
@ -28,7 +24,7 @@ class NotificationChannel(ABC):
channel_type: str = "base"
@abstractmethod
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert. Returns True on success."""
raise NotImplementedError
@ -63,34 +59,21 @@ class MeshBroadcastChannel(NotificationChannel):
def __init__(self, connector: "MeshConnector", channel_index: int = 0):
self._connector = connector
self._channel = channel_index
self._renderer = MeshRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert to mesh channel."""
if not self._connector:
logger.warning("No mesh connector available")
return False
try:
# If payload already has chunk metadata (from digest), use message directly
if alert.chunk_index is not None:
self._connector.send_message(
text=alert.message or "",
destination=None,
channel=self._channel,
)
logger.info("Broadcast pre-chunked alert to channel %d", self._channel)
return True
# Render to chunks for single-event delivery
chunks = self._renderer.render(alert)
for chunk in chunks:
self._connector.send_message(
text=chunk,
destination=None,
channel=self._channel,
)
logger.info("Broadcast %d chunk(s) to channel %d", len(chunks), self._channel)
message = alert.get("message", "")
self._connector.send_message(
text=message,
destination=None,
channel=self._channel,
)
logger.info("Broadcast alert to channel %d", self._channel)
return True
except Exception as e:
logger.error("Failed to broadcast alert: %s", e)
@ -174,29 +157,22 @@ class MeshDMChannel(NotificationChannel):
def __init__(self, connector: "MeshConnector", node_ids: list[str]):
self._connector = connector
self._node_ids = node_ids
self._renderer = MeshRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via DM to configured nodes."""
if not self._connector:
return False
# If payload already has chunk metadata (from digest), use message directly
if alert.chunk_index is not None:
messages = [alert.message or ""]
else:
# Render to chunks for single-event delivery
messages = self._renderer.render(alert)
message = alert.get("message", "")
success = True
for node_id in self._node_ids:
for message in messages:
try:
node_id = str(node_id)
self._connector.send_message(text=message, destination=node_id, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
success = False
try:
node_id = str(node_id)
self._connector.send_message(text=message, destination=node_id, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
success = False
return success
@ -309,17 +285,19 @@ class EmailChannel(NotificationChannel):
self._tls = smtp_tls
self._from = from_address
self._recipients = recipients
self._renderer = EmailRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
async def deliver(self, alert: dict, rule: dict) -> bool:
"""Send alert via email."""
if not self._recipients:
return False
# Use renderer for subject and body
rendered = self._renderer.render(alert)
subject = rendered["subject"]
body = rendered["body"]
alert_type = alert.get("type", "alert")
severity = alert.get("severity", "routine").upper()
message = alert.get("message", "")
subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title())
body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % (
alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message
)
try:
loop = asyncio.get_event_loop()
@ -535,16 +513,21 @@ class WebhookChannel(NotificationChannel):
def __init__(self, url: str, headers: Optional[dict] = None):
self._url = url
self._headers = headers or {}
self._renderer = WebhookRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
async def deliver(self, alert: dict, rule: dict) -> bool:
"""POST alert to webhook URL."""
# Use renderer for generic JSON payload
payload = self._renderer.render(alert)
payload = {
"type": alert.get("type"),
"severity": alert.get("severity", "routine"),
"message": alert.get("message", ""),
"timestamp": time.time(),
"node_name": alert.get("node_name"),
"region": alert.get("region"),
}
# Discord/Slack format
if "discord.com" in self._url or "slack.com" in self._url:
severity = alert.severity or "routine"
severity = alert.get("severity", "routine")
color = {
"immediate": 0xFF0000,
"priority": 0xFFAA00,
@ -552,8 +535,8 @@ class WebhookChannel(NotificationChannel):
}.get(severity, 0x888888)
payload = {
"embeds": [{
"title": "MeshAI: %s" % (alert.event_type or "unknown"),
"description": alert.message or "",
"title": "MeshAI: %s" % alert.get("type", "unknown"),
"description": alert.get("message", ""),
"color": color,
}]
}
@ -562,14 +545,14 @@ class WebhookChannel(NotificationChannel):
elif "ntfy" in self._url:
headers = {
**self._headers,
"Title": "MeshAI: %s" % (alert.event_type or "alert"),
"Title": "MeshAI: %s" % alert.get("type", "alert"),
"Priority": "3",
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
content=alert.message or "",
content=alert.get("message", ""),
headers=headers,
timeout=10,
)
@ -762,52 +745,8 @@ class WebhookChannel(NotificationChannel):
return False, f"Webhook failed: {e}"
def create_channel(rule: "NotificationRuleConfig", connector=None) -> NotificationChannel:
"""Create a channel instance from a NotificationRuleConfig.
Args:
rule: NotificationRuleConfig with delivery_type and channel settings
connector: MeshConnector instance (required for mesh channels)
Returns:
NotificationChannel instance
"""
delivery_type = rule.delivery_type
if delivery_type == "mesh_broadcast":
return MeshBroadcastChannel(
connector=connector,
channel_index=rule.broadcast_channel,
)
elif delivery_type == "mesh_dm":
return MeshDMChannel(
connector=connector,
node_ids=rule.node_ids,
)
elif delivery_type == "email":
return EmailChannel(
smtp_host=rule.smtp_host,
smtp_port=rule.smtp_port,
smtp_user=rule.smtp_user,
smtp_password=rule.smtp_password,
smtp_tls=rule.smtp_tls,
from_address=rule.from_address,
recipients=rule.recipients,
)
elif delivery_type == "webhook":
return WebhookChannel(
url=rule.webhook_url,
headers=rule.webhook_headers,
)
else:
raise ValueError("Unknown delivery type: %s" % delivery_type)
def create_channel_from_dict(config: dict, connector=None) -> NotificationChannel:
"""Create a channel instance from a dict config (legacy interface).
Used by old router.py and test_channel API. Will be removed in Phase 2.7.
"""
def create_channel(config: dict, connector=None) -> NotificationChannel:
"""Create a channel instance from config."""
channel_type = config.get("type", "")
if channel_type == "mesh_broadcast":

View file

@ -133,52 +133,6 @@ class Event:
return cls(**d)
@dataclass
class NotificationPayload:
"""Per-delivery alert content handed to a NotificationChannel.
This is the runtime alert shape: derived from an Event (or
built directly by the old router) and consumed by channels.py
implementations.
"""
message: str # The rendered text to deliver
category: str # e.g. "weather_warning"
severity: str # "immediate" | "priority" | "routine"
timestamp: float # Unix epoch when generated
# Optional context fields (None when not applicable)
node_id: Optional[str] = None
node_name: Optional[str] = None
region: Optional[str] = None
event_type: Optional[str] = None # Maps to old dict's "type" field
# Chunk metadata for mesh deliveries (set by scheduler/digest path)
chunk_index: Optional[int] = None
chunk_total: Optional[int] = None
# Source Event reference for advanced channel use (renderers in 2.5b)
source_event: Optional["Event"] = None
def make_payload_from_event(event: "Event", **overrides) -> NotificationPayload:
"""Helper to convert an Event into a NotificationPayload."""
p = NotificationPayload(
message=event.summary or event.title or event.category,
category=event.category,
severity=event.severity,
timestamp=event.timestamp,
node_id=event.node_ids[0] if event.node_ids else None,
region=event.region,
event_type=event.category,
source_event=event,
)
for k, v in overrides.items():
setattr(p, k, v)
return p
def make_event(
source: str,
category: str,

View file

@ -1,18 +1,17 @@
"""Notification pipeline package.
Phase 2.4:
Phase 2.1 + 2.2 + 2.3a + 2.3b:
- EventBus: pub/sub ingress
- Inhibitor: suppresses redundant events by inhibit_keys
- Grouper: coalesces events sharing group_key within a window
- ToggleFilter: drops events whose toggle isn't enabled
- Tee: sends events to both dispatcher and accumulator
- Dispatcher: routes to channels based on rules
- DigestAccumulator: logs events for LLM-summarized periodic digest
- DigestScheduler: fires digest at configured time
- SeverityRouter: forks immediate vs digest
- Dispatcher: routes immediate via channels (existing rules schema)
- DigestAccumulator: tracks priority/routine events for periodic digest
- DigestScheduler: fires digest at configured time (Phase 2.3b)
Usage:
from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline
bus = build_pipeline(config, llm_backend) # llm_backend from main.py
bus = build_pipeline(config)
bus.emit(event)
# Async lifecycle
@ -21,8 +20,6 @@ Usage:
await stop_pipeline(scheduler)
"""
import asyncio
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
@ -32,25 +29,17 @@ from meshai.notifications.pipeline.severity_router import (
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
from meshai.notifications.pipeline.toggle_filter import ToggleFilter
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
from meshai.notifications.pipeline.scheduler import DigestScheduler
def build_pipeline(config, llm_backend, connector=None) -> EventBus:
def build_pipeline(config) -> EventBus:
"""Build the pipeline and return the EventBus.
Args:
config: Full Config object.
llm_backend: An already-constructed LLMBackend instance
(from main.py or a test). Pipeline components share
this single instance. May be None for fallback behavior.
connector: Optional MeshtasticConnector for mesh channels.
Components are stashed on bus._pipeline_components for lifecycle use.
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel, connector=connector)
dispatcher = Dispatcher(config, create_channel)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
@ -60,35 +49,12 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus:
if include_list:
include_toggles = list(include_list)
accumulator = DigestAccumulator(
llm_backend=llm_backend,
include_toggles=include_toggles,
digest = DigestAccumulator(include_toggles=include_toggles)
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
# Tee closure: events go to BOTH dispatcher and accumulator
# dispatcher.dispatch() is async, so fire-and-forget with create_task
def _tee(event):
try:
asyncio.create_task(dispatcher.dispatch(event))
except RuntimeError:
# No running event loop (e.g. sync tests) - skip async dispatch
pass
accumulator.enqueue(event)
# Build enabled toggles set from config
toggles_cfg = getattr(config.notifications, "toggles", None)
enabled_toggles = None
if toggles_cfg is not None:
enabled_list = getattr(toggles_cfg, "enabled", None)
if enabled_list:
enabled_toggles = set(enabled_list)
toggle_filter = ToggleFilter(
next_handler=_tee,
enabled_toggles=enabled_toggles,
)
grouper = Grouper(next_handler=toggle_filter.handle)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
@ -96,30 +62,21 @@ def build_pipeline(config, llm_backend, connector=None) -> EventBus:
bus._pipeline_components = {
"inhibitor": inhibitor,
"grouper": grouper,
"toggle_filter": toggle_filter,
"severity_router": severity_router,
"dispatcher": dispatcher,
"accumulator": accumulator,
"connector": connector,
"digest": digest,
}
return bus
def build_pipeline_components(config, llm_backend, connector=None) -> tuple:
def build_pipeline_components(config) -> tuple:
"""Like build_pipeline, but returns all components for tests.
Args:
config: Full Config object.
llm_backend: An already-constructed LLMBackend instance
(from main.py or a test). Pipeline components share
this single instance. May be None for fallback behavior.
connector: Optional MeshtasticConnector for mesh channels.
Returns:
(bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator).
Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel, connector=connector)
dispatcher = Dispatcher(config, create_channel)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
@ -129,39 +86,15 @@ def build_pipeline_components(config, llm_backend, connector=None) -> tuple:
if include_list:
include_toggles = list(include_list)
accumulator = DigestAccumulator(
llm_backend=llm_backend,
include_toggles=include_toggles,
digest = DigestAccumulator(include_toggles=include_toggles)
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
# Tee closure: events go to BOTH dispatcher and accumulator
# dispatcher.dispatch() is async, so fire-and-forget with create_task
def _tee(event):
try:
asyncio.create_task(dispatcher.dispatch(event))
except RuntimeError:
# No running event loop (e.g. sync tests) - skip async dispatch
pass
accumulator.enqueue(event)
# Build enabled toggles set from config
toggles_cfg = getattr(config.notifications, "toggles", None)
enabled_toggles = None
if toggles_cfg is not None:
enabled_list = getattr(toggles_cfg, "enabled", None)
if enabled_list:
enabled_toggles = set(enabled_list)
toggle_filter = ToggleFilter(
next_handler=_tee,
enabled_toggles=enabled_toggles,
)
grouper = Grouper(next_handler=toggle_filter.handle)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator
return bus, inhibitor, grouper, severity_router, dispatcher, digest
async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
@ -178,14 +111,12 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
if components is None:
raise RuntimeError("bus missing _pipeline_components; use build_pipeline()")
accumulator = components["accumulator"]
digest = components["digest"]
connector = components.get("connector")
scheduler = DigestScheduler(
accumulator=accumulator,
accumulator=digest,
config=config,
channel_factory=create_channel,
connector=connector,
)
await scheduler.start()
@ -212,7 +143,6 @@ __all__ = [
"Dispatcher",
"Inhibitor",
"Grouper",
"ToggleFilter",
"DigestAccumulator",
"Digest",
"DigestScheduler",

View file

@ -1,24 +1,33 @@
"""Digest accumulator and renderer for Phase 2.4.
"""Digest accumulator and renderer for Phase 2.3a.
Logs all events between digest emissions and renders LLM-summarized
digest output per toggle. No active/resolved tracking just a
chronological log that the LLM summarizes.
Holds priority and routine events between digest emissions, tracks
active vs recently-resolved events, and renders the two-section
digest output (ACTIVE NOW + SINCE LAST DIGEST) when called.
render_digest() is async and calls the LLM once per non-empty toggle.
No scheduling logic here. render_digest() is called explicitly by
the future scheduler (Phase 2.3b) or by tests.
"""
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from typing import Optional
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
if TYPE_CHECKING:
from meshai.backends.base import LLMBackend
# Lowercase substrings in event.title that indicate the event is
# a resolution of a prior alert. Conservative list — easy to extend.
RESOLUTION_MARKERS = (
"cleared",
"reopened",
"ended",
"resolved",
"back online",
"recovered",
"lifted",
)
# Display labels per toggle (used in rendered output)
TOGGLE_LABELS = {
@ -46,23 +55,11 @@ TOGGLE_ORDER = [
"other",
]
# System prompt for digest summarization
DIGEST_SYSTEM_PROMPT = (
"You are summarizing a category of mesh-network alerts for a "
"morning digest broadcast. Given a list of events in chronological "
"order (immediate severity first, then priority, then routine), "
"produce ONE SHORT LINE summarizing what happened. "
"Be specific about node IDs, places, and counts when present. "
"Aim for 80-140 characters. Do not use markdown. No bullet points. "
"Plain prose. End with a period."
)
@dataclass
class Digest:
"""Result of render_digest(). Carries sections and metadata."""
"""Result of render_digest(). Carries both sections and metadata."""
rendered_at: float
# Keep these fields for type compatibility; populated empty in Phase 2.4+
active: dict[str, list[Event]] = field(default_factory=dict)
since_last: dict[str, list[Event]] = field(default_factory=dict)
mesh_chunks: list[str] = field(default_factory=list)
@ -70,31 +67,28 @@ class Digest:
full: str = ""
def is_empty(self) -> bool:
return not self.mesh_chunks or (
len(self.mesh_chunks) == 1 and "No alerts" in self.mesh_chunks[0]
)
return not self.active and not self.since_last
class DigestAccumulator:
"""Logs events and produces LLM-summarized periodic digests.
"""Tracks priority/routine events and produces periodic digests.
Args:
llm_backend: LLM backend for generating summaries. If None,
falls back to count-based summaries.
mesh_char_limit: Maximum characters per mesh chunk (default 200).
include_toggles: List of toggle names to include in digest output.
If None, defaults to all toggles in TOGGLE_ORDER except
rf_propagation.
mesh_char_limit: Maximum characters per mesh chunk (default 200).
rf_propagation. Unknown toggle names in the list are silently
accepted (TOGGLE_ORDER drives display order, include_toggles
drives which toggles are tracked).
"""
def __init__(
self,
llm_backend: Optional["LLMBackend"] = None,
include_toggles: list[str] | None = None,
mesh_char_limit: int = 200,
include_toggles: list[str] | None = None,
):
self._llm = llm_backend
self._events_since_last_digest: dict[str, list[Event]] = {}
self._active: dict[str, list[Event]] = {} # toggle -> events
self._since_last: dict[str, list[Event]] = {} # toggle -> events
self._last_digest_at: float = 0.0
self._mesh_char_limit = mesh_char_limit
# Default: all known toggles except rf_propagation
@ -107,7 +101,7 @@ class DigestAccumulator:
# ---- ingress ----
def enqueue(self, event: Event) -> None:
"""Log an event for the next digest."""
"""SeverityRouter calls this for priority/routine events."""
toggle = get_toggle(event.category) or "other"
# Skip non-included toggles
@ -117,201 +111,348 @@ class DigestAccumulator:
)
return
# Append to the event log
self._events_since_last_digest.setdefault(toggle, []).append(event)
active_for_toggle = self._active.setdefault(toggle, [])
# Resolution detection
if self._is_resolution(event, self._now()):
self._move_to_since_last_by_group(event, toggle)
return
# In-place update if same id
for i, existing in enumerate(active_for_toggle):
if existing.id == event.id:
active_for_toggle[i] = event
self._logger.debug(
f"UPDATED active event {event.id} in {toggle}"
)
return
# Otherwise it's a new active event
active_for_toggle.append(event)
self._logger.debug(
f"LOGGED event {event.id} ({toggle}/{event.category}/{event.severity})"
f"ADDED active event {event.id} ({toggle}/{event.category})"
)
def tick(self, now: Optional[float] = None) -> int:
"""No-op in Phase 2.4+. Returns 0."""
return 0
"""Move expired events from active to since_last.
# ---- rendering ----
async def render_digest(self, now: Optional[float] = None) -> Digest:
"""Produce a Digest with LLM-summarized lines per toggle.
Calls the LLM once per toggle that had activity. Empty toggles
produce no line. Clears the event log after rendering.
Returns the number of events moved.
"""
if now is None:
now = self._now()
moved = 0
for toggle in list(self._active.keys()):
still_active = []
for ev in self._active[toggle]:
if ev.expires is not None and ev.expires <= now:
self._since_last.setdefault(toggle, []).append(ev)
moved += 1
else:
still_active.append(ev)
self._active[toggle] = still_active
return moved
# ---- rendering ----
def render_digest(self, now: Optional[float] = None) -> Digest:
"""Produce a Digest of current state, then clear since_last."""
if now is None:
now = self._now()
# tick() first so expired actives roll into since_last
self.tick(now)
digest = Digest(rendered_at=now)
time_str = time.strftime('%H%M', time.localtime(now))
# Build summary lines per toggle
summary_lines: list[str] = []
for toggle in TOGGLE_ORDER:
events = self._events_since_last_digest.get(toggle, [])
if not events:
continue
if toggle not in self._included:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
summary = await self._summarize_toggle(toggle, events, now)
summary_lines.append(f"[{label}] {summary}")
# Render outputs
if summary_lines:
digest.mesh_chunks = self._render_mesh_chunks(summary_lines, time_str)
digest.full = self._render_full(summary_lines, time_str)
else:
digest.mesh_chunks = [f"DIGEST {time_str}\nNo alerts since last digest."]
digest.full = f"--- {time_str} Digest ---\n\nNo alerts since last digest.\n"
# mesh_compact for backward compatibility
# Defensive: skip non-included toggles when building output
digest.active = {
k: list(v) for k, v in self._active.items()
if v and k in self._included
}
digest.since_last = {
k: list(v) for k, v in self._since_last.items()
if v and k in self._included
}
digest.mesh_chunks = self._render_mesh_chunks(digest, now)
# mesh_compact: join chunks for backward compatibility
if len(digest.mesh_chunks) == 1:
digest.mesh_compact = digest.mesh_chunks[0]
else:
digest.mesh_compact = "\n---\n".join(digest.mesh_chunks)
digest.full = self._render_full(digest, now)
# Clear event log
self._events_since_last_digest.clear()
# Clear since_last; active stays for the next cycle
self._since_last.clear()
self._last_digest_at = now
return digest
async def _summarize_toggle(
def _render_mesh_chunks(self, digest: Digest, now: float) -> list[str]:
"""Produce mesh-radio-friendly compact chunks.
Returns a list of strings, each self._mesh_char_limit chars.
Single-chunk output has no "(1/N)" suffix. Multi-chunk output
has "(k/N)" counters and "(cont)" suffixes on section headers
that span chunks.
"""
time_str = time.strftime('%H%M', time.localtime(now))
# Empty digest case
if not digest.active and not digest.since_last:
return [f"DIGEST {time_str}\nNo alerts since last digest."]
# Build logical lines with section markers
# Each item is (section, line) where section is "active", "resolved", or None
logical_lines: list[tuple[str | None, str]] = []
if digest.active:
logical_lines.append(("active", "ACTIVE NOW"))
for toggle in TOGGLE_ORDER:
events = digest.active.get(toggle)
if not events:
continue
logical_lines.append(("active", self._compact_toggle_line(toggle, events)))
if digest.since_last:
logical_lines.append(("resolved", "RESOLVED"))
for toggle in TOGGLE_ORDER:
events = digest.since_last.get(toggle)
if not events:
continue
logical_lines.append(("resolved", self._compact_toggle_line(toggle, events)))
# Pack lines into chunks
return self._pack_lines_into_chunks(logical_lines, time_str)
def _pack_lines_into_chunks(
self,
toggle: str,
events: list[Event],
now: float,
) -> str:
"""Generate a one-line summary for a toggle's events."""
# Sort by severity (immediate=0, priority=1, routine=2), then timestamp
severity_rank = {"immediate": 0, "priority": 1, "routine": 2}
sorted_events = sorted(
events,
key=lambda e: (severity_rank.get(e.severity, 3), e.timestamp),
)
# Build LLM input
lines = [f"Category: {toggle}", "Events:"]
for ev in sorted_events:
lines.append(self._format_event_for_llm(ev))
llm_input = "\n".join(lines)
# Try LLM summarization
if self._llm is not None:
try:
response = await self._llm.generate(
messages=[{"role": "user", "content": llm_input}],
system_prompt=DIGEST_SYSTEM_PROMPT,
max_tokens=200,
)
# Take first line only
summary = response.strip().split("\n")[0].strip()
if summary:
return summary
except Exception as e:
self._logger.warning(f"LLM summarization failed for {toggle}: {e}")
# Fallback: count-based summary
return f"{len(events)} event(s) (LLM unavailable)"
def _format_event_for_llm(self, event: Event) -> str:
"""Format one event for LLM input."""
ts = datetime.fromtimestamp(event.timestamp)
time_str = ts.strftime("%H:%M")
severity = event.severity.upper()
# Combine title and summary
text = event.title or ""
if event.summary and event.summary != event.title:
if text:
text = f"{text}{event.summary}"
else:
text = event.summary
if not text:
text = event.category
# Truncate long text
if len(text) > 120:
text = text[:117] + "..."
return f"- [{severity} {time_str}] {text}"
def _render_mesh_chunks(
self,
summary_lines: list[str],
logical_lines: list[tuple[str | None, str]],
time_str: str,
) -> list[str]:
"""Pack summary lines into mesh-friendly chunks."""
"""Pack logical lines into chunks respecting char limit.
Args:
logical_lines: List of (section, line) tuples where section
is "active", "resolved", or None for headers.
time_str: Time string for headers (e.g., "0700").
Returns:
List of chunk strings, each self._mesh_char_limit.
"""
if not logical_lines:
return [f"DIGEST {time_str}\nNo alerts since last digest."]
limit = self._mesh_char_limit
chunks: list[list[str]] = []
chunks: list[list[str]] = [] # List of line lists
current_chunk: list[str] = []
current_len = 0
last_section_in_chunk: str | None = None
sections_started: set[str] = set()
# Placeholder header
header = f"DIGEST {time_str}"
# Placeholder header - will be fixed up later
header_placeholder = f"DIGEST {time_str}"
def start_new_chunk():
nonlocal current_chunk, current_len
nonlocal current_chunk, current_len, last_section_in_chunk
if current_chunk:
chunks.append(current_chunk)
current_chunk = [header]
current_len = len(header)
current_chunk = [header_placeholder]
current_len = len(header_placeholder)
last_section_in_chunk = None
start_new_chunk()
for line in summary_lines:
line_len = 1 + len(line) # newline + line
if current_len + line_len > limit:
i = 0
while i < len(logical_lines):
section, line = logical_lines[i]
is_section_header = line in ("ACTIVE NOW", "RESOLVED")
# Check if this is a section header - ensure it has at least one
# toggle line following it in this chunk
if is_section_header:
# Look ahead for the next toggle line
next_toggle_idx = i + 1
if next_toggle_idx < len(logical_lines):
_, next_line = logical_lines[next_toggle_idx]
# Calculate space needed for header + newline + next line
needed = len(line) + 1 + len(next_line)
if current_len + 1 + needed > limit:
# Section header + next line won't fit, start new chunk
start_new_chunk()
sections_started.add(section)
last_section_in_chunk = section
current_chunk.append(line)
current_len += 1 + len(line)
i += 1
continue
# Calculate line length with newline
line_with_newline = 1 + len(line) # newline before line
# Would this line fit?
if current_len + line_with_newline > limit:
# Start new chunk
start_new_chunk()
# If continuing a section, add "(cont)" header
if section and section in sections_started and not is_section_header:
cont_header = "ACTIVE NOW (cont)" if section == "active" else "RESOLVED (cont)"
current_chunk.append(cont_header)
current_len += 1 + len(cont_header)
last_section_in_chunk = section
# Add the line
if is_section_header:
sections_started.add(section)
last_section_in_chunk = section
current_chunk.append(line)
current_len += line_len
current_len += 1 + len(line)
i += 1
# Don't forget the last chunk
if current_chunk and len(current_chunk) > 1:
if current_chunk and len(current_chunk) > 1: # More than just header
chunks.append(current_chunk)
elif current_chunk and len(current_chunk) == 1:
# Only header in chunk - shouldn't happen but handle gracefully
if chunks:
# Merge with previous chunk if possible
pass
else:
chunks.append(current_chunk)
# Fix up headers with chunk counts
total = len(chunks)
total_chunks = len(chunks)
result: list[str] = []
for idx, chunk_lines in enumerate(chunks):
if total == 1:
# Fix header line
if total_chunks == 1:
chunk_lines[0] = f"DIGEST {time_str}"
else:
chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total})"
chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total_chunks})"
result.append("\n".join(chunk_lines))
return result if result else [f"DIGEST {time_str}\nNo alerts since last digest."]
def _render_full(self, summary_lines: list[str], time_str: str) -> str:
"""Produce full multi-line digest for email/webhook."""
def _compact_toggle_line(self, toggle: str, events: list[Event]) -> str:
"""Build one compact line for a toggle: [Label] headline (+N)"""
label = TOGGLE_LABELS.get(toggle, toggle)
sorted_events = self._sort_events(events)
top_event = sorted_events[0]
# Get headline text
headline = top_event.summary or top_event.title or top_event.category
# Truncate headline at ~60 chars to keep lines readable
max_headline = 60
if len(headline) > max_headline:
headline = headline[:max_headline - 1] + ""
# Append (+N) if more than one event
overflow = len(events) - 1
if overflow > 0:
return f"[{label}] {headline} (+{overflow})"
else:
return f"[{label}] {headline}"
def _render_full(self, digest: Digest, now: float) -> str:
"""Produce the full multi-line digest for email/webhook."""
lines = [
f"--- {time_str} Digest ---",
f"--- {time.strftime('%H%M', time.localtime(now))} Digest ---",
"",
]
lines.extend(summary_lines)
lines.append("")
return "\n".join(lines)
if not digest.active and not digest.since_last:
lines.append("No alerts since last digest.")
lines.append("")
else:
if digest.active:
lines.append("ACTIVE NOW:")
for toggle in TOGGLE_ORDER:
events = digest.active.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
if digest.since_last:
lines.append("SINCE LAST DIGEST:")
for toggle in TOGGLE_ORDER:
events = digest.since_last.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _format_event_line(self, event: Event) -> str:
"""Single-line summary of an event for digest output."""
# Prefer event.summary if set, else fall back to title, then category
text = event.summary or event.title or event.category
# Trim runaway text — keep digest readable
if len(text) > 140:
text = text[:139] + ""
return text
def _sort_events(self, events: list[Event]) -> list[Event]:
"""Sort within a toggle: immediate first, then priority,
then routine, then by timestamp newest first."""
rank = {"immediate": 0, "priority": 1, "routine": 2}
return sorted(
events,
key=lambda e: (rank.get(e.severity, 3), -e.timestamp),
)
# ---- helpers ----
def _is_resolution(self, event: Event, now: float) -> bool:
if event.expires is not None and event.expires <= now:
return True
title_lc = (event.title or "").lower()
return any(marker in title_lc for marker in RESOLUTION_MARKERS)
def _move_to_since_last_by_group(self, event: Event, toggle: str) -> None:
"""Remove any active event matching event's group_key (or id)
and place this resolution event into since_last.
"""
active_list = self._active.get(toggle, [])
# Match by group_key if set, else by id
match_key = event.group_key
if match_key:
self._active[toggle] = [
e for e in active_list
if e.group_key != match_key
]
else:
self._active[toggle] = [
e for e in active_list if e.id != event.id
]
self._since_last.setdefault(toggle, []).append(event)
self._logger.debug(
f"RESOLVED in {toggle}: {event.id} ({event.title!r})"
)
def _now(self) -> float:
return time.time()
# ---- inspection (for tests and scheduler) ----
# ---- inspection (for tests and future scheduler) ----
def event_count(self, toggle: Optional[str] = None) -> int:
"""Count events logged since last digest."""
def active_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._events_since_last_digest.get(toggle, []))
return sum(len(v) for v in self._events_since_last_digest.values())
return len(self._active.get(toggle, []))
return sum(len(v) for v in self._active.values())
def since_last_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._since_last.get(toggle, []))
return sum(len(v) for v in self._since_last.values())
def last_digest_at(self) -> float:
return self._last_digest_at
def clear(self) -> None:
self._events_since_last_digest.clear()
self._active.clear()
self._since_last.clear()
self._last_digest_at = 0.0
# Legacy compatibility — return 0 for old tests
def active_count(self, toggle: Optional[str] = None) -> int:
return 0
def since_last_count(self, toggle: Optional[str] = None) -> int:
return 0

View file

@ -4,15 +4,12 @@ The dispatcher routes immediate-severity events through the existing
NotificationRuleConfig rules and delivers via channels.py. This is the
transitional bridge between the new Event pipeline and the existing
channel implementations.
Phase 2.5a: dispatch() is now async, takes a connector at construction,
and properly awaits channel.deliver(payload, rule).
"""
import logging
from typing import Callable, Optional
from typing import Callable
from meshai.notifications.events import Event, make_payload_from_event
from meshai.notifications.events import Event
class Dispatcher:
@ -20,26 +17,21 @@ class Dispatcher:
SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2}
def __init__(self, config, channel_factory: Callable, connector=None):
def __init__(self, config, channel_factory: Callable):
"""Initialize.
Args:
config: The full Config object (provides config.notifications.rules)
channel_factory: Callable taking (rule, connector) and returning
a NotificationChannel. This is create_channel from
meshai/notifications/channels.py.
connector: MeshConnector instance for mesh channel deliveries.
channel_factory: Callable taking a NotificationRuleConfig and
returning a NotificationChannel. This is create_channel
from meshai/notifications/channels.py.
"""
self._config = config
self._channel_factory = channel_factory
self._connector = connector
self._logger = logging.getLogger("meshai.pipeline.dispatcher")
async def dispatch(self, event: Event) -> None:
"""Deliver an immediate-severity event to all matching channels.
This method is async and awaits each channel.deliver() call.
"""
def dispatch(self, event: Event) -> None:
"""Deliver an immediate-severity event to all matching channels."""
rules = self._matching_rules(event)
if not rules:
self._logger.debug(
@ -48,17 +40,19 @@ class Dispatcher:
return
for rule in rules:
try:
channel = self._channel_factory(rule, self._connector)
payload = make_payload_from_event(event)
success = await channel.deliver(payload, rule)
if success:
self._logger.info(
f"Dispatched event {event.id} via {rule.delivery_type}"
)
else:
self._logger.warning(
f"Channel delivery returned False for rule {rule.name}"
)
channel = self._channel_factory(rule)
alert = {
"category": event.category,
"severity": event.severity,
"message": event.summary or event.title,
"node_id": event.node_ids[0] if event.node_ids else None,
"region": event.region,
"timestamp": event.timestamp,
}
channel.deliver(alert)
self._logger.info(
f"Dispatched event {event.id} via {rule.delivery_type}"
)
except Exception:
self._logger.exception(
f"Channel delivery failed for rule {rule.name}"

View file

@ -13,7 +13,6 @@ from datetime import datetime, timedelta
from typing import Callable, Optional
from meshai.notifications.pipeline.digest import DigestAccumulator
from meshai.notifications.events import NotificationPayload
class DigestScheduler:
@ -24,14 +23,12 @@ class DigestScheduler:
accumulator: DigestAccumulator,
config,
channel_factory: Callable,
connector=None,
clock: Optional[Callable[[], float]] = None,
sleep: Optional[Callable[[float], "asyncio.Future"]] = None,
):
self._accumulator = accumulator
self._config = config
self._channel_factory = channel_factory
self._connector = connector
self._clock = clock or time.time
self._sleep = sleep or asyncio.sleep
self._task: Optional[asyncio.Task] = None
@ -101,8 +98,7 @@ class DigestScheduler:
async def _fire(self, now: float) -> None:
"""Render and deliver one digest."""
self._logger.info(f"Firing digest at {datetime.fromtimestamp(now):%H:%M}")
# render_digest is now async in Phase 2.4+
digest = await self._accumulator.render_digest(now)
digest = self._accumulator.render_digest(now)
self._last_fire_at = now
rules = self._matching_rules()
@ -123,7 +119,7 @@ class DigestScheduler:
async def _deliver_to_rule(self, rule, digest, now: float) -> None:
"""Hand the rendered digest to a channel based on rule.delivery_type."""
channel = self._channel_factory(rule, self._connector)
channel = self._channel_factory(rule)
delivery_type = rule.delivery_type
if delivery_type in ("mesh_broadcast", "mesh_dm"):
@ -131,27 +127,31 @@ class DigestScheduler:
chunks = digest.mesh_chunks
total = len(chunks)
for i, chunk in enumerate(chunks, start=1):
payload = NotificationPayload(
message=chunk,
category="digest",
severity="routine",
timestamp=now,
chunk_index=i,
chunk_total=total,
)
await channel.deliver(payload, rule)
payload = {
"category": "digest",
"severity": "routine",
"message": chunk,
"node_id": None,
"region": None,
"timestamp": now,
"chunk_index": i,
"chunk_total": total,
}
channel.deliver(payload)
self._logger.info(
f"Delivered {total} mesh chunk(s) to rule {rule.name!r}"
)
else:
# Single full-form delivery
payload = NotificationPayload(
message=digest.full,
category="digest",
severity="routine",
timestamp=now,
)
await channel.deliver(payload, rule)
payload = {
"category": "digest",
"severity": "routine",
"message": digest.full,
"node_id": None,
"region": None,
"timestamp": now,
}
channel.deliver(payload)
self._logger.info(
f"Delivered digest to rule {rule.name!r} via {delivery_type}"
)

View file

@ -1,48 +0,0 @@
"""Master toggle filter.
Drops events whose category maps to a toggle that the operator has
disabled. Distinct from DigestAccumulator.include_toggles, which
only affects the digest recap this filter drops events from the
entire pipeline, so disabled toggles produce no live mesh delivery,
no digest entry, nothing.
"""
import logging
from typing import Callable
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
class ToggleFilter:
"""Drop events whose toggle isn't in the enabled set."""
def __init__(
self,
next_handler: Callable[[Event], None],
enabled_toggles: set[str] | None = None,
):
"""Initialize.
Args:
next_handler: Callable that receives non-dropped events.
enabled_toggles: Set of toggle names that are enabled.
If None, all toggles are enabled (filter is a no-op).
"""
self._next = next_handler
self._enabled = enabled_toggles # None = no-op
self._logger = logging.getLogger("meshai.pipeline.toggle_filter")
def handle(self, event: Event) -> None:
"""Pass the event through, or drop it if its toggle is disabled."""
if self._enabled is None:
self._next(event)
return
toggle = get_toggle(event.category) or "other"
if toggle not in self._enabled:
self._logger.debug(
f"DROPPED event {event.id} — toggle {toggle!r} not enabled"
)
return
self._next(event)

View file

@ -1,22 +0,0 @@
"""Channel-type-aware renderers.
Each renderer takes a NotificationPayload and produces a
channel-type-appropriate output: mesh chunks, email subject/body,
or webhook JSON dict.
Renderers are reusable beyond channels.py the future MQTT event
publisher (Phase 2.6.5) uses WebhookRenderer for its on-the-wire
format.
"""
from meshai.notifications.renderers.base import Renderer
from meshai.notifications.renderers.mesh import MeshRenderer
from meshai.notifications.renderers.email import EmailRenderer
from meshai.notifications.renderers.webhook import WebhookRenderer
__all__ = [
"Renderer",
"MeshRenderer",
"EmailRenderer",
"WebhookRenderer",
]

View file

@ -1,28 +0,0 @@
"""Abstract base for channel-type-aware renderers.
Each renderer takes a NotificationPayload and produces a
channel-type-appropriate output (string, list, dict, etc.).
Renderers are pure functions of their input no network, no
state, no side effects beyond logging. The channel that owns a
renderer calls render() and then handles delivery.
"""
from abc import ABC, abstractmethod
from typing import Any
from meshai.notifications.events import NotificationPayload
class Renderer(ABC):
"""Base class for all channel-type renderers."""
@abstractmethod
def render(self, payload: NotificationPayload) -> Any:
"""Produce the channel-type-appropriate output.
Subclasses define the concrete return type:
- MeshRenderer returns list[str]
- EmailRenderer returns dict (subject, body)
- WebhookRenderer returns dict (JSON-serializable)
"""
raise NotImplementedError

View file

@ -1,78 +0,0 @@
"""Email channel renderer.
Produces a dict with subject and body for SMTP delivery. Plain
text body for now; HTML body is a future polish.
Subject format: "[MeshAI] <Severity> — <Event Type>"
Body format: multi-line, with the message as the lead, followed
by structured context fields (severity, region, node, timestamp,
source category).
"""
import logging
from datetime import datetime
from typing import Optional
from meshai.notifications.events import NotificationPayload
from meshai.notifications.renderers.base import Renderer
class EmailRenderer(Renderer):
"""Produce email subject and body for a single payload."""
def __init__(self):
self._logger = logging.getLogger("meshai.renderers.email")
def render(self, payload: NotificationPayload) -> dict:
"""Render the payload as {subject, body}."""
return {
"subject": self._build_subject(payload),
"body": self._build_body(payload),
}
def _build_subject(self, p: NotificationPayload) -> str:
sev = (p.severity or "routine").upper()
type_label = self._type_label(p.event_type) or "Alert"
return f"[MeshAI] {sev}{type_label}"
def _build_body(self, p: NotificationPayload) -> str:
lines: list[str] = []
# Lead line
lines.append(p.message or "(no message)")
lines.append("") # blank separator
# Structured context
lines.append(f"Severity: {p.severity or 'routine'}")
if p.event_type:
lines.append(f"Category: {p.event_type}")
if p.region:
lines.append(f"Region: {p.region}")
if p.node_name:
lines.append(f"Node: {p.node_name}")
elif p.node_id:
lines.append(f"Node: {p.node_id}")
if p.timestamp:
try:
ts = datetime.fromtimestamp(p.timestamp)
lines.append(f"Time: {ts.strftime('%Y-%m-%d %H:%M:%S')}")
except (ValueError, OverflowError):
pass
# Optional source event detail (if present)
if p.source_event is not None:
ev = p.source_event
if hasattr(ev, "source") and ev.source:
lines.append(f"Source: {ev.source}")
if hasattr(ev, "title") and ev.title and ev.title != p.message:
lines.append(f"Title: {ev.title}")
lines.append("")
lines.append("--")
lines.append("MeshAI notification")
return "\n".join(lines)
def _type_label(self, event_type: Optional[str]) -> Optional[str]:
"""Title-cased label for the event type (for subject line)."""
if not event_type:
return None
return event_type.replace("_", " ").title()

View file

@ -1,127 +0,0 @@
"""Mesh channel renderer.
Produces a list of short strings (each 200 chars by default)
suitable for mesh radio broadcast. Reuses the digest's chunking
pattern: never split a single "line" across chunks; add (k/N)
counters when the rendered output produces more than one chunk.
The mesh renderer is symmetric with the digest accumulator's
mesh chunk renderer same chunk-packing algorithm, same char
limit semantics.
"""
import logging
from typing import Optional
from meshai.notifications.events import NotificationPayload
from meshai.notifications.renderers.base import Renderer
class MeshRenderer(Renderer):
"""Produce mesh-compact chunks for a single payload."""
def __init__(self, char_limit: int = 200):
self._limit = char_limit
self._logger = logging.getLogger("meshai.renderers.mesh")
def render(self, payload: NotificationPayload) -> list[str]:
"""Render the payload as 1+ mesh-compact chunks.
Algorithm:
- Build the full message line (event_type, severity hint,
and message text see _format_one_line)
- If the line fits in self._limit chars, return [line].
- If the line exceeds self._limit, split across multiple
chunks at word boundaries when possible, and add
"(k/N)" counters at the start of each chunk.
- If a single word exceeds the chunk limit, hard-split
mid-word (rare long URLs etc.)
"""
line = self._format_one_line(payload)
if len(line) <= self._limit:
return [line]
return self._chunk_long_line(line)
def _format_one_line(self, p: NotificationPayload) -> str:
"""Build the headline for a payload.
Default format:
"[<EventTypeTitle>] <message>"
where EventTypeTitle is a short label derived from
p.event_type (e.g. "weather_warning" "Weather"). If
p.event_type is None, omit the prefix.
Truncates the message at the limit only if the prefix
is short enough; otherwise lets the chunker handle it.
"""
prefix = self._toggle_label(p.event_type)
if prefix:
return f"[{prefix}] {p.message}"
return p.message
def _toggle_label(self, event_type: Optional[str]) -> Optional[str]:
"""Map an event category to a short toggle label.
Looks up the toggle the category belongs to and returns
the toggle's display label. If unknown, returns None
(no prefix added).
"""
if not event_type:
return None
from meshai.notifications.categories import get_toggle
toggle = get_toggle(event_type)
if not toggle:
return None
# Same label set used by the digest renderer
TOGGLE_LABELS = {
"mesh_health": "Mesh",
"weather": "Weather",
"fire": "Fire",
"rf_propagation": "RF",
"roads": "Roads",
"avalanche": "Avalanche",
"seismic": "Seismic",
"tracking": "Tracking",
"other": "Other",
}
return TOGGLE_LABELS.get(toggle, toggle.title())
def _chunk_long_line(self, line: str) -> list[str]:
"""Split a long line into chunks ≤ self._limit each.
Reserves ~10 chars per chunk for the "(k/N)" counter
suffix. Splits at word boundaries; falls back to
mid-word split if a single word exceeds the budget.
"""
# Reserve space for " (k/N)" — generous to 10 chars
body_budget = self._limit - 10
if body_budget <= 0:
body_budget = self._limit # extreme small limit; ignore counter
words = line.split(" ")
chunks: list[str] = []
current = ""
for word in words:
# Hard-split words longer than body_budget (rare)
while len(word) > body_budget:
if current:
chunks.append(current)
current = ""
chunks.append(word[:body_budget])
word = word[body_budget:]
# Add word to current chunk if it fits
tentative = (current + " " + word) if current else word
if len(tentative) <= body_budget:
current = tentative
else:
chunks.append(current)
current = word
if current:
chunks.append(current)
# Add counters: "DIGEST"-style "(k/N)" suffix
total = len(chunks)
if total == 1:
return chunks
return [f"{chunk} ({i+1}/{total})" for i, chunk in enumerate(chunks)]

View file

@ -1,67 +0,0 @@
"""Webhook channel renderer.
Produces a JSON-serializable dict with stable field names. Also
intended for reuse by the future MQTT event publisher (Phase
2.6.5), which wants the same structured shape on the wire.
Field names use snake_case. Optional fields are omitted from the
output when None, NOT set to null keeps payloads compact and
avoids ambiguity between "field absent" and "field explicitly
null".
"""
import logging
from typing import Any
from meshai.notifications.events import NotificationPayload
from meshai.notifications.renderers.base import Renderer
# Schema version for the wire format. Bump when making
# breaking changes to the field shape. External consumers
# can use this to handle multiple versions.
WEBHOOK_SCHEMA_VERSION = "1.0"
class WebhookRenderer(Renderer):
"""Produce a JSON-serializable dict for a single payload."""
def __init__(self):
self._logger = logging.getLogger("meshai.renderers.webhook")
def render(self, payload: NotificationPayload) -> dict:
"""Render the payload as a structured dict."""
out: dict[str, Any] = {
"schema_version": WEBHOOK_SCHEMA_VERSION,
"message": payload.message,
"severity": payload.severity or "routine",
"timestamp": payload.timestamp,
}
# Optional fields — omit if None
for src_attr, dst_key in (
("category", "category"),
("event_type", "event_type"),
("node_id", "node_id"),
("node_name", "node_name"),
("region", "region"),
("chunk_index", "chunk_index"),
("chunk_total", "chunk_total"),
):
value = getattr(payload, src_attr, None)
if value is not None:
out[dst_key] = value
# Optional source event detail
if payload.source_event is not None:
ev = payload.source_event
source_event: dict[str, Any] = {}
for attr in ("id", "source", "title", "expires", "group_key"):
if hasattr(ev, attr):
value = getattr(ev, attr)
if value is not None:
source_event[attr] = value
if source_event:
out["source_event"] = source_event
return out

View file

@ -8,8 +8,7 @@ import time
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from .channels import create_channel_from_dict, NotificationChannel
from .events import NotificationPayload
from .channels import create_channel, NotificationChannel
from .summarizer import MessageSummarizer
if TYPE_CHECKING:
@ -143,7 +142,7 @@ class NotificationRouter:
return None
try:
return create_channel_from_dict(config, self._connector)
return create_channel(config, self._connector)
except Exception as e:
logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e)
return None
@ -200,20 +199,7 @@ class NotificationRouter:
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
# Convert dict to NotificationPayload for channel interface
payload = NotificationPayload(
message=delivery_alert.get("message", ""),
category=delivery_alert.get("type", "unknown"),
severity=delivery_alert.get("severity", "routine"),
timestamp=delivery_alert.get("timestamp", time.time()),
node_id=delivery_alert.get("node_id"),
node_name=delivery_alert.get("node_name"),
region=delivery_alert.get("region"),
event_type=delivery_alert.get("type"),
)
# Rule is a dict here; channels don't use it so we pass None
# for the rule parameter (channels ignore it anyway)
success = await channel.deliver(payload, None)
success = await channel.deliver(delivery_alert, rule)
if success:
delivered = True
self._record_fire(rule_name)
@ -269,7 +255,7 @@ class NotificationRouter:
{success, message, error, details}
"""
try:
channel = create_channel_from_dict(channel_config, self._connector)
channel = create_channel(channel_config, self._connector)
return await channel.test_connection()
except ValueError as e:
return {

View file

@ -1,212 +0,0 @@
"""Tests for FIRMS adapter Phase 2.6 — to_event() method."""
import time
from unittest.mock import MagicMock
import pytest
from meshai.env.firms import FIRMSAdapter
from meshai.notifications.events import Event
# ============================================================
# FIXTURES
# ============================================================
@pytest.fixture
def mock_config():
"""Create a mock FIRMSConfig."""
config = MagicMock()
config.map_key = "test-key"
config.source = "VIIRS_SNPP_NRT"
config.bbox = [-117, 42, -114, 44]
config.day_range = 1
config.tick_seconds = 1800
config.confidence_min = "nominal"
config.proximity_km = 10.0
return config
@pytest.fixture
def adapter(mock_config):
"""Create a FIRMSAdapter with mocked dependencies."""
return FIRMSAdapter(mock_config, region_anchors=[], fires_adapter=None)
def make_firms_event(
lat=42.5,
lon=-114.5,
new_ignition=False,
severity="routine",
headline="Test Hotspot",
frp=None,
confidence="n",
distance_km=None,
nearest_anchor=None,
near_fire=None,
):
"""Helper to create a FIRMS event dict."""
now = time.time()
return {
"source": "firms",
"event_id": f"firms_{lat:.4f}_{lon:.4f}_2026-05-15_1200",
"event_type": "Fire Hotspot",
"severity": severity,
"headline": headline,
"lat": lat,
"lon": lon,
"expires": now + 21600,
"fetched_at": now,
"properties": {
"new_ignition": new_ignition,
"confidence": confidence,
"frp": frp,
"brightness": 350.0,
"acq_date": "2026-05-15",
"acq_time": "1200",
"near_fire": near_fire,
"distance_to_fire_km": 5.0 if near_fire else None,
"distance_km": distance_km,
"nearest_anchor": nearest_anchor,
},
}
# ============================================================
# CATEGORY DECISION TESTS
# ============================================================
def test_to_event_new_ignition(adapter):
"""New ignition maps to new_ignition category."""
evt = make_firms_event(new_ignition=True)
event = adapter.to_event(evt)
assert event is not None
assert event.category == "new_ignition"
def test_to_event_near_known_fire(adapter):
"""Hotspot near known fire maps to wildfire_proximity."""
evt = make_firms_event(new_ignition=False, near_fire="Snake River Fire")
event = adapter.to_event(evt)
assert event is not None
assert event.category == "wildfire_proximity"
# ============================================================
# SEVERITY PASS-THROUGH TESTS
# ============================================================
def test_to_event_severity_passes_through(adapter):
"""Severity from FIRMS event passes through unchanged."""
for sev in ["routine", "priority", "immediate"]:
evt = make_firms_event(severity=sev)
event = adapter.to_event(evt)
assert event is not None
assert event.severity == sev
# ============================================================
# CONTENT TESTS
# ============================================================
def test_to_event_summary_includes_frp(adapter):
"""Summary includes FRP when present."""
evt = make_firms_event(frp=85.5)
event = adapter.to_event(evt)
assert event is not None
assert "FRP 85" in event.summary
def test_to_event_summary_handles_missing_frp(adapter):
"""Missing FRP doesn't break to_event."""
evt = make_firms_event(frp=None)
event = adapter.to_event(evt)
assert event is not None
assert "FRP" not in event.summary
def test_to_event_summary_includes_distance_when_present(adapter):
"""Summary includes distance and anchor when present."""
evt = make_firms_event(distance_km=12, nearest_anchor="TFL")
event = adapter.to_event(evt)
assert event is not None
assert "12 km" in event.summary
assert "TFL" in event.summary
def test_to_event_region_uses_nearest_anchor(adapter):
"""Region is set from nearest_anchor."""
evt = make_firms_event(nearest_anchor="MHR")
event = adapter.to_event(evt)
assert event is not None
assert event.region == "MHR"
# ============================================================
# SPATIAL KEY TESTS
# ============================================================
def test_to_event_group_key_is_spatial_grid(adapter):
"""Group key is spatial grid based on rounded lat/lon."""
evt = make_firms_event(lat=42.5678, lon=-114.3456)
event = adapter.to_event(evt)
assert event is not None
assert event.group_key == "firms:42.57:-114.35"
def test_to_event_inhibit_keys_match_group_key(adapter):
"""Inhibit keys contain the same spatial key as group_key."""
evt = make_firms_event(lat=42.5678, lon=-114.3456)
event = adapter.to_event(evt)
assert event is not None
assert event.group_key in event.inhibit_keys
def test_two_nearby_detections_share_group_key(adapter):
"""Two detections in same grid cell share group_key."""
# Both round to 42.57:-114.35
evt1 = make_firms_event(lat=42.571, lon=-114.351)
evt2 = make_firms_event(lat=42.572, lon=-114.352)
event1 = adapter.to_event(evt1)
event2 = adapter.to_event(evt2)
assert event1 is not None
assert event2 is not None
assert event1.group_key == event2.group_key
# ============================================================
# DEFENSIVE TESTS
# ============================================================
def test_to_event_missing_coords_returns_none(adapter):
"""Missing coordinates returns None."""
evt = make_firms_event()
evt["lat"] = None
event = adapter.to_event(evt)
assert event is None
def test_to_event_missing_properties_returns_event(adapter):
"""Missing properties dict defaults to wildfire_proximity."""
evt = {
"source": "firms",
"event_id": "test",
"event_type": "Fire Hotspot",
"severity": "routine",
"headline": "Test",
"lat": 42.5,
"lon": -114.5,
"fetched_at": time.time(),
}
# No "properties" key at all
event = adapter.to_event(evt)
assert event is not None
assert event.category == "wildfire_proximity"
def test_to_event_does_not_raise_on_corrupted_dict(adapter):
"""Corrupted dict returns None without raising."""
evt = {"garbage": True}
# Should not raise
event = adapter.to_event(evt)
assert event is None

View file

@ -1,277 +0,0 @@
"""Tests for NWS adapter Phase 2.6 — to_event() and _derive_category()."""
import time
from unittest.mock import MagicMock, patch
import pytest
from meshai.env.nws import NWSAlertsAdapter
from meshai.notifications.events import Event
# ============================================================
# FIXTURES
# ============================================================
@pytest.fixture
def mock_config():
"""Create a mock NWSConfig."""
config = MagicMock()
config.areas = ["ID"]
config.user_agent = "(test, test@example.com)"
config.severity_min = "moderate"
config.tick_seconds = 60
return config
@pytest.fixture
def adapter(mock_config):
"""Create an NWSAlertsAdapter with mocked config."""
return NWSAlertsAdapter(mock_config)
# ============================================================
# _derive_category TESTS
# ============================================================
def test_derive_category_warning(adapter):
"""Warning suffix maps to weather_warning."""
assert adapter._derive_category("Tornado Warning") == "weather_warning"
assert adapter._derive_category("Red Flag Warning") == "weather_warning"
assert adapter._derive_category("Winter Storm Warning") == "weather_warning"
def test_derive_category_watch(adapter):
"""Watch suffix maps to weather_watch."""
assert adapter._derive_category("Tornado Watch") == "weather_watch"
assert adapter._derive_category("Winter Storm Watch") == "weather_watch"
assert adapter._derive_category("Fire Weather Watch") == "weather_watch"
def test_derive_category_advisory(adapter):
"""Advisory suffix maps to weather_advisory."""
assert adapter._derive_category("Wind Advisory") == "weather_advisory"
assert adapter._derive_category("Heat Advisory") == "weather_advisory"
assert adapter._derive_category("Frost Advisory") == "weather_advisory"
def test_derive_category_statement(adapter):
"""Non-standard suffixes map to weather_statement."""
assert adapter._derive_category("Special Weather Statement") == "weather_statement"
assert adapter._derive_category("Short Term Forecast") == "weather_statement"
assert adapter._derive_category("Hazardous Weather Outlook") == "weather_statement"
def test_derive_category_case_insensitive(adapter):
"""Category derivation is case-insensitive."""
assert adapter._derive_category("TORNADO WARNING") == "weather_warning"
assert adapter._derive_category("winter storm watch") == "weather_watch"
assert adapter._derive_category("Wind ADVISORY") == "weather_advisory"
# ============================================================
# to_event TESTS
# ============================================================
def test_to_event_returns_event_instance(adapter):
"""to_event returns an Event instance."""
raw = {
"source": "nws",
"event_id": "urn:oid:2.49.0.1.840.0.abc123",
"event_type": "Tornado Warning",
"severity": "extreme",
"headline": "Tornado Warning issued for Ada County",
"description": "A tornado warning has been issued...",
"onset": time.time(),
"expires": time.time() + 3600,
"areas": ["IDZ016"],
"lat": 43.615,
"lon": -116.2023,
}
event = adapter.to_event(raw)
assert isinstance(event, Event)
def test_to_event_sets_source_nws(adapter):
"""to_event sets source to 'nws'."""
raw = {
"source": "nws",
"event_id": "test-id",
"event_type": "Wind Advisory",
"severity": "moderate",
"headline": "Wind Advisory",
}
event = adapter.to_event(raw)
assert event.source == "nws"
def test_to_event_derives_category_from_event_type(adapter):
"""to_event uses _derive_category for category field."""
raw = {
"event_id": "test",
"event_type": "Winter Storm Watch",
"severity": "severe",
"headline": "Winter Storm Watch",
}
event = adapter.to_event(raw)
assert event.category == "weather_watch"
def test_to_event_maps_severity(adapter):
"""to_event maps NWS severity to 3-level system."""
# Extreme -> immediate
raw = {"event_id": "1", "event_type": "Tornado Warning", "severity": "extreme", "headline": "test"}
assert adapter.to_event(raw).severity == "immediate"
# Severe -> priority
raw = {"event_id": "2", "event_type": "Tornado Warning", "severity": "severe", "headline": "test"}
assert adapter.to_event(raw).severity == "priority"
# Moderate -> routine
raw = {"event_id": "3", "event_type": "Wind Advisory", "severity": "moderate", "headline": "test"}
assert adapter.to_event(raw).severity == "routine"
def test_to_event_sets_title_from_headline(adapter):
"""to_event uses headline as title."""
raw = {
"event_id": "test",
"event_type": "Heat Advisory",
"severity": "moderate",
"headline": "Heat Advisory issued for Magic Valley",
}
event = adapter.to_event(raw)
assert event.title == "Heat Advisory issued for Magic Valley"
def test_to_event_sets_body_from_description(adapter):
"""to_event uses description as body."""
raw = {
"event_id": "test",
"event_type": "Heat Advisory",
"severity": "moderate",
"headline": "Heat Advisory",
"description": "Dangerously hot conditions expected...",
}
event = adapter.to_event(raw)
assert event.body == "Dangerously hot conditions expected..."
def test_to_event_sets_effective_and_expires(adapter):
"""to_event sets effective and expires from onset/expires."""
onset = time.time()
expires = time.time() + 7200
raw = {
"event_id": "test",
"event_type": "Wind Advisory",
"severity": "moderate",
"headline": "Wind Advisory",
"onset": onset,
"expires": expires,
}
event = adapter.to_event(raw)
assert event.effective == onset
assert event.expires == expires
def test_to_event_sets_lat_lon(adapter):
"""to_event sets lat/lon from raw event."""
raw = {
"event_id": "test",
"event_type": "Tornado Warning",
"severity": "extreme",
"headline": "Tornado Warning",
"lat": 43.615,
"lon": -116.2023,
}
event = adapter.to_event(raw)
assert event.lat == 43.615
assert event.lon == -116.2023
def test_to_event_sets_nws_zones(adapter):
"""to_event sets nws_zones from areas."""
raw = {
"event_id": "test",
"event_type": "Red Flag Warning",
"severity": "severe",
"headline": "Red Flag Warning",
"areas": ["IDZ016", "IDZ030", "IDZ031"],
}
event = adapter.to_event(raw)
assert event.nws_zones == ["IDZ016", "IDZ030", "IDZ031"]
def test_to_event_sets_group_key_from_event_id(adapter):
"""to_event sets group_key to event_id for dedup."""
raw = {
"event_id": "urn:oid:2.49.0.1.840.0.abc123",
"event_type": "Tornado Warning",
"severity": "extreme",
"headline": "Tornado Warning",
}
event = adapter.to_event(raw)
assert event.group_key == "urn:oid:2.49.0.1.840.0.abc123"
def test_to_event_warning_sets_inhibit_keys(adapter):
"""Warnings set inhibit_keys for corresponding Watch/Advisory."""
raw = {
"event_id": "test",
"event_type": "Winter Storm Warning",
"severity": "severe",
"headline": "Winter Storm Warning",
}
event = adapter.to_event(raw)
assert "nws:Winter Storm Watch" in event.inhibit_keys
assert "nws:Winter Storm Advisory" in event.inhibit_keys
def test_to_event_watch_no_inhibit_keys(adapter):
"""Watches do not set inhibit_keys."""
raw = {
"event_id": "test",
"event_type": "Tornado Watch",
"severity": "moderate",
"headline": "Tornado Watch",
}
event = adapter.to_event(raw)
assert event.inhibit_keys == []
def test_to_event_preserves_raw_in_data(adapter):
"""to_event preserves raw event dict in data field."""
raw = {
"event_id": "test-123",
"event_type": "Wind Advisory",
"severity": "moderate",
"headline": "Wind Advisory",
"custom_field": "custom_value",
}
event = adapter.to_event(raw)
assert event.data == raw
assert event.data["custom_field"] == "custom_value"
# ============================================================
# INTEGRATION: _map_nws_severity
# ============================================================
def test_map_nws_severity_extreme_to_immediate(adapter):
"""Extreme NWS severity maps to immediate."""
assert adapter._map_nws_severity("extreme") == "immediate"
def test_map_nws_severity_severe_to_priority(adapter):
"""Severe NWS severity maps to priority."""
assert adapter._map_nws_severity("severe") == "priority"
def test_map_nws_severity_moderate_to_routine(adapter):
"""Moderate NWS severity maps to routine."""
assert adapter._map_nws_severity("moderate") == "routine"
def test_map_nws_severity_minor_to_routine(adapter):
"""Minor NWS severity maps to routine."""
assert adapter._map_nws_severity("minor") == "routine"

View file

@ -1,214 +0,0 @@
"""Tests for channel-renderer integration (Phase 2.5b)."""
import asyncio
import time
from unittest.mock import MagicMock, AsyncMock, patch
import pytest
from meshai.notifications.events import NotificationPayload
from meshai.notifications.channels import (
MeshBroadcastChannel,
MeshDMChannel,
EmailChannel,
WebhookChannel,
)
# ============================================================
# MESH CHANNEL RENDERING TESTS
# ============================================================
def test_mesh_channel_uses_mesh_renderer():
"""MeshBroadcastChannel renders long messages to multiple chunks."""
mock_connector = MagicMock()
channel = MeshBroadcastChannel(
connector=mock_connector,
channel_index=0,
)
# Build a long message that will require chunking
long_message = "This is a very long alert message that exceeds the character limit. " * 5
payload = NotificationPayload(
message=long_message,
category="weather_warning",
severity="priority",
timestamp=time.time(),
event_type="weather_warning",
)
asyncio.run(channel.deliver(payload, None))
# Should have called send_message multiple times (once per chunk)
assert mock_connector.send_message.call_count >= 2
# Each call's text should be <= 200 chars
for call in mock_connector.send_message.call_args_list:
text = call.kwargs.get("text", call.args[0] if call.args else "")
assert len(text) <= 200
def test_mesh_channel_uses_payload_message_directly_when_chunk_metadata_set():
"""Pre-chunked payloads (from digest) skip re-rendering."""
mock_connector = MagicMock()
channel = MeshBroadcastChannel(
connector=mock_connector,
channel_index=0,
)
# Payload with chunk metadata set (from digest scheduler)
payload = NotificationPayload(
message="pre-chunked text",
category="digest",
severity="routine",
timestamp=time.time(),
chunk_index=1,
chunk_total=3,
)
asyncio.run(channel.deliver(payload, None))
# Should have called send_message exactly once
assert mock_connector.send_message.call_count == 1
# Should use the message directly
call = mock_connector.send_message.call_args
text = call.kwargs.get("text", call.args[0] if call.args else "")
assert text == "pre-chunked text"
def test_mesh_dm_channel_uses_mesh_renderer():
"""MeshDMChannel renders long messages to chunks for each recipient."""
mock_connector = MagicMock()
channel = MeshDMChannel(
connector=mock_connector,
node_ids=["!node1", "!node2"],
)
long_message = "This is a long DM message that should be chunked. " * 4
payload = NotificationPayload(
message=long_message,
category="test",
severity="routine",
timestamp=time.time(),
)
asyncio.run(channel.deliver(payload, None))
# Should have called send_message multiple times
# (chunks * nodes)
assert mock_connector.send_message.call_count >= 2
def test_mesh_dm_channel_uses_payload_message_directly_when_chunk_metadata_set():
"""Pre-chunked DM payloads skip re-rendering."""
mock_connector = MagicMock()
channel = MeshDMChannel(
connector=mock_connector,
node_ids=["!node1"],
)
payload = NotificationPayload(
message="pre-chunked DM",
category="digest",
severity="routine",
timestamp=time.time(),
chunk_index=2,
chunk_total=5,
)
asyncio.run(channel.deliver(payload, None))
# Should use message directly, once per node
assert mock_connector.send_message.call_count == 1
call = mock_connector.send_message.call_args
text = call.kwargs.get("text", call.args[0] if call.args else "")
assert text == "pre-chunked DM"
# ============================================================
# EMAIL CHANNEL RENDERING TESTS
# ============================================================
def test_email_channel_uses_email_renderer():
"""EmailChannel uses renderer for subject and body."""
channel = EmailChannel(
smtp_host="localhost",
smtp_port=25,
smtp_user="",
smtp_password="",
smtp_tls=False,
from_address="test@example.com",
recipients=["user@example.com"],
)
payload = NotificationPayload(
message="Test alert message",
category="weather_warning",
severity="immediate",
timestamp=time.time(),
event_type="weather_warning",
)
# Mock the _send_email method
with patch.object(channel, "_send_email") as mock_send:
asyncio.run(channel.deliver(payload, None))
# Should have been called with renderer output
mock_send.assert_called_once()
call_args = mock_send.call_args
subject = call_args.args[0]
body = call_args.args[1]
# Renderer format checks
assert "[MeshAI]" in subject
assert "IMMEDIATE" in subject
assert "Test alert message" in body
assert "Severity:" in body
# ============================================================
# WEBHOOK CHANNEL RENDERING TESTS
# ============================================================
def test_webhook_channel_uses_webhook_renderer():
"""WebhookChannel uses renderer for JSON payload."""
channel = WebhookChannel(
url="https://example.com/webhook",
headers={},
)
payload = NotificationPayload(
message="Test webhook message",
category="test",
severity="priority",
timestamp=time.time(),
event_type="battery_warning",
)
# Mock httpx
with patch("meshai.notifications.channels.httpx.AsyncClient") as mock_client_class:
mock_client = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_client.post = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
mock_client_class.return_value = mock_client
asyncio.run(channel.deliver(payload, None))
# Check the POST was called
mock_client.post.assert_called_once()
call_kwargs = mock_client.post.call_args.kwargs
# Should have JSON payload with schema_version
json_payload = call_kwargs.get("json", {})
assert "schema_version" in json_payload
assert json_payload["schema_version"] == "1.0"
assert json_payload["message"] == "Test webhook message"

File diff suppressed because it is too large Load diff

View file

@ -1,9 +1,6 @@
"""Tests for DigestScheduler (Phase 2.3b + 2.4).
"""Tests for DigestScheduler (Phase 2.3b).
Uses asyncio.run() since pytest-asyncio is not available in the container.
Updated in Phase 2.4: render_digest is now async, accumulator mocks
must return awaitables.
"""
import asyncio
@ -11,12 +8,12 @@ import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from unittest.mock import MagicMock, AsyncMock, call
from unittest.mock import MagicMock, call
import pytest
from meshai.notifications.events import make_event
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
from meshai.notifications.pipeline.digest import DigestAccumulator
from meshai.notifications.pipeline.scheduler import DigestScheduler
@ -60,15 +57,8 @@ class MockChannel:
def __init__(self):
self.deliveries = []
async def deliver(self, payload, rule=None):
def deliver(self, payload: dict):
self.deliveries.append(payload)
return True
class MockLLMBackend:
"""Mock LLM backend for accumulator."""
async def generate(self, messages, system_prompt, max_tokens=200):
return "Mock summary."
def make_scheduler(
@ -94,14 +84,13 @@ def make_scheduler(
channels = {}
def channel_factory(rule, connector=None):
def channel_factory(rule):
ch = MockChannel()
channels[rule.name] = ch
return ch
if accumulator is None:
# Use mock LLM backend for async render_digest
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
accumulator = DigestAccumulator()
scheduler = DigestScheduler(
accumulator=accumulator,
@ -135,31 +124,37 @@ class TestScheduleComputation:
def test_parse_schedule_invalid_falls_back(self):
"""Invalid schedules fall back to 07:00."""
scheduler, _, _ = make_scheduler()
# Bad format
assert scheduler._parse_schedule("7:00:00") == (7, 0)
assert scheduler._parse_schedule("invalid") == (7, 0)
assert scheduler._parse_schedule("") == (7, 0)
# Out of range
assert scheduler._parse_schedule("25:00") == (7, 0)
assert scheduler._parse_schedule("12:60") == (7, 0)
def test_next_fire_at_future_today(self):
"""If schedule time is later today, returns today's timestamp."""
# Set clock to 06:00 on a known date
base_dt = datetime(2024, 6, 15, 6, 0, 0)
base_ts = base_dt.timestamp()
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
next_fire = scheduler._next_fire_at(base_ts)
# Should be 07:00 same day
expected_dt = datetime(2024, 6, 15, 7, 0, 0)
assert abs(next_fire - expected_dt.timestamp()) < 1
def test_next_fire_at_past_today_schedules_tomorrow(self):
"""If schedule time has passed today, returns tomorrow's timestamp."""
# Set clock to 08:00 on a known date
base_dt = datetime(2024, 6, 15, 8, 0, 0)
base_ts = base_dt.timestamp()
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
next_fire = scheduler._next_fire_at(base_ts)
# Should be 07:00 next day
expected_dt = datetime(2024, 6, 16, 7, 0, 0)
assert abs(next_fire - expected_dt.timestamp()) < 1
@ -171,6 +166,7 @@ class TestScheduleComputation:
scheduler, _, _ = make_scheduler(schedule="07:00", clock=lambda: base_ts)
next_fire = scheduler._next_fire_at(base_ts)
# Should be 07:00 next day
expected_dt = datetime(2024, 6, 16, 7, 0, 0)
assert abs(next_fire - expected_dt.timestamp()) < 1
@ -185,7 +181,7 @@ class TestScheduleComputation:
config.notifications.digest = None
scheduler = DigestScheduler(
accumulator=DigestAccumulator(llm_backend=MockLLMBackend()),
accumulator=DigestAccumulator(),
config=config,
channel_factory=lambda r: MockChannel(),
)
@ -199,7 +195,8 @@ class TestFireBehavior:
def test_fire_delivers_to_matching_rule(self):
"""_fire() delivers digest to rules with schedule_match='digest'."""
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
accumulator = DigestAccumulator()
# Add an event so digest has content
accumulator.enqueue(make_event(
source="test",
category="weather_warning",
@ -224,8 +221,9 @@ class TestFireBehavior:
ch = channels["digest-mesh"]
assert len(ch.deliveries) == 1
payload = ch.deliveries[0]
assert payload.category == "digest"
assert payload.severity == "routine"
assert payload["category"] == "digest"
assert payload["severity"] == "routine"
assert "Test alert" in payload["message"] or "Weather" in payload["message"]
def test_fire_skips_disabled_rules(self):
"""Disabled rules are not delivered to."""
@ -238,6 +236,7 @@ class TestFireBehavior:
asyncio.run(run_fire())
# Channel should not be created for disabled rule
assert "disabled" not in channels
def test_fire_skips_non_schedule_rules(self):
@ -266,10 +265,8 @@ class TestFireBehavior:
def test_fire_mesh_delivery_chunks(self):
"""Mesh delivery types get per-chunk delivery."""
accumulator = DigestAccumulator(
llm_backend=MockLLMBackend(),
mesh_char_limit=100,
)
accumulator = DigestAccumulator(mesh_char_limit=100)
# Add multiple events to force chunking
for i in range(5):
accumulator.enqueue(make_event(
source="test",
@ -292,14 +289,16 @@ class TestFireBehavior:
asyncio.run(run_fire())
ch = channels["mesh"]
# Should have multiple deliveries (one per chunk)
assert len(ch.deliveries) >= 1
# Check chunk metadata
for payload in ch.deliveries:
assert payload.chunk_index is not None
assert payload.chunk_total is not None
assert "chunk_index" in payload
assert "chunk_total" in payload
def test_fire_email_delivery_full_text(self):
"""Email delivery type gets single full-text delivery."""
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
accumulator = DigestAccumulator()
accumulator.enqueue(make_event(
source="test",
category="weather_warning",
@ -321,8 +320,8 @@ class TestFireBehavior:
ch = channels["email"]
assert len(ch.deliveries) == 1
payload = ch.deliveries[0]
assert payload.chunk_index is None
assert "--- " in payload.message
assert "chunk_index" not in payload
assert "--- " in payload["message"] # Full format has header
def test_fire_updates_last_fire_at(self):
"""_fire() updates last_fire_at timestamp."""
@ -351,7 +350,7 @@ class TestFireBehavior:
ch = channels["mesh"]
assert len(ch.deliveries) == 1
assert "No alerts" in ch.deliveries[0].message
assert "No alerts" in ch.deliveries[0]["message"]
# ---- Lifecycle Tests ----
@ -403,7 +402,9 @@ class TestLifecycle:
scheduler, _, _ = make_scheduler()
async def run_stop():
# Never started
await scheduler.stop()
# Should not raise
asyncio.run(run_stop())
@ -413,8 +414,10 @@ class TestLifecycle:
async def fake_sleep(duration):
sleep_calls.append(duration)
# Actually sleep briefly so we can cancel
await asyncio.sleep(0.01)
# Set clock far from schedule time to get long sleep
base_dt = datetime(2024, 6, 15, 8, 0, 0)
scheduler, _, _ = make_scheduler(
schedule="07:00",
@ -424,11 +427,14 @@ class TestLifecycle:
async def run_test():
await scheduler.start()
# Give task time to enter sleep
await asyncio.sleep(0.05)
await scheduler.stop()
asyncio.run(run_test())
# Task should have exited cleanly
# ---- Integration Tests ----
@ -438,8 +444,9 @@ class TestIntegration:
def test_scheduler_fires_on_schedule(self):
"""Scheduler fires when schedule time arrives."""
fire_times = []
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
accumulator = DigestAccumulator()
# Start at 06:59:59.95 (50ms before 07:00), delay will be ~50ms
clock_time = [datetime(2024, 6, 15, 6, 59, 59, 950000).timestamp()]
def fake_clock():
@ -451,27 +458,31 @@ class TestIntegration:
accumulator=accumulator,
)
# Track when fire happens
original_fire = scheduler._fire
async def tracking_fire(now):
fire_times.append(now)
await original_fire(now)
# After first fire, advance clock so next cycle has long delay
clock_time[0] = datetime(2024, 6, 15, 8, 0, 0).timestamp()
scheduler._fire = tracking_fire
async def run_test():
await scheduler.start()
# Wait for the ~50ms delay plus some buffer
await asyncio.sleep(0.2)
await scheduler.stop()
asyncio.run(run_test())
# Should have fired once
assert len(fire_times) >= 1
def test_scheduler_multiple_rules(self):
"""Scheduler delivers to multiple matching rules."""
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
accumulator = DigestAccumulator()
accumulator.enqueue(make_event(
source="test",
category="weather_warning",
@ -496,6 +507,7 @@ class TestIntegration:
asyncio.run(run_fire())
# All three should have received deliveries
assert "mesh1" in channels
assert "mesh2" in channels
assert "email" in channels
@ -505,7 +517,7 @@ class TestIntegration:
def test_scheduler_handles_delivery_error(self):
"""Scheduler continues after delivery error."""
accumulator = DigestAccumulator(llm_backend=MockLLMBackend())
accumulator = DigestAccumulator()
accumulator.enqueue(make_event(
source="test",
category="weather_warning",
@ -521,11 +533,11 @@ class TestIntegration:
call_order = []
def bad_channel_factory(rule, connector=None):
def bad_channel_factory(rule):
call_order.append(rule.name)
if rule.name == "bad":
ch = MagicMock()
ch.deliver = AsyncMock(side_effect=RuntimeError("delivery failed"))
ch.deliver.side_effect = RuntimeError("delivery failed")
return ch
return MockChannel()
@ -542,6 +554,7 @@ class TestIntegration:
asyncio.run(run_fire())
# Both rules should have been attempted
assert "bad" in call_order
assert "good" in call_order

View file

@ -2,16 +2,10 @@
These tests verify the core routing and dispatch behavior of the
notification pipeline without requiring real channel backends.
Updated in Phase 2.4: Events now go to BOTH dispatcher and accumulator
(no severity-based fork). SeverityRouter class kept for backward
compatibility but not used in production wiring.
"""
import asyncio
import pytest
from unittest.mock import Mock, AsyncMock, patch
from unittest.mock import Mock, patch
from dataclasses import dataclass, field
from meshai.notifications.events import Event, make_event
@ -45,7 +39,6 @@ class ConfigStub:
class TestImmediateDispatch:
def test_immediate_event_with_matching_rule_dispatches(self):
"""Immediate events reach the dispatcher and get delivered."""
rule = NotificationRuleConfigStub(
enabled=True,
trigger_type="condition",
@ -57,10 +50,15 @@ class TestImmediateDispatch:
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
bus = EventBus()
dispatcher = Dispatcher(config, mock_factory)
digest = StubDigestQueue()
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(router.handle)
event = make_event(
source="test",
category="test_cat",
@ -68,99 +66,80 @@ class TestImmediateDispatch:
title="Test Alert",
summary="Test summary message",
)
# Run dispatch in async context since it's now async
asyncio.run(dispatcher.dispatch(event))
bus.emit(event)
assert mock_channel.deliver.call_count == 1
alert = mock_channel.deliver.call_args[0][0]
assert alert.category == "test_cat"
assert alert.severity == "immediate"
assert alert.message
assert alert["category"] == "test_cat"
assert alert["severity"] == "immediate"
assert alert["message"]
class TestTeeRouting:
"""Phase 2.4: Events go to BOTH dispatcher and accumulator."""
class TestDigestRouting:
def test_routine_event_goes_to_both_dispatcher_and_accumulator(self):
"""Routine events reach both dispatcher and accumulator in Phase 2.4."""
def test_routine_event_goes_to_digest_not_dispatcher(self):
rule = NotificationRuleConfigStub(
enabled=True,
trigger_type="condition",
categories=["test_cat"],
min_severity="routine",
delivery_type="mesh_broadcast",
)
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
# Create dispatcher
mock_factory = Mock()
bus = EventBus()
dispatcher = Dispatcher(config, mock_factory)
digest = StubDigestQueue()
with patch.object(dispatcher, "dispatch", wraps=dispatcher.dispatch) as mock_dispatch:
router = SeverityRouter(
immediate_handler=mock_dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(router.handle)
event = make_event(
source="test",
category="test_cat",
severity="routine",
title="Routine Alert",
)
bus.emit(event)
assert len(digest) == 1
mock_dispatch.assert_not_called()
# Create accumulator mock
accumulator_calls = []
def mock_enqueue(event):
accumulator_calls.append(event)
event = make_event(
source="test",
category="test_cat",
severity="routine",
title="Routine Alert",
)
# Run dispatch in async context
asyncio.run(dispatcher.dispatch(event))
mock_enqueue(event)
# Both paths received the event
assert len(accumulator_calls) == 1
# Dispatcher found a matching rule and delivered
assert mock_channel.deliver.call_count == 1
def test_priority_event_goes_to_both_dispatcher_and_accumulator(self):
"""Priority events reach both dispatcher and accumulator in Phase 2.4."""
def test_priority_event_goes_to_digest_not_dispatcher(self):
rule = NotificationRuleConfigStub(
enabled=True,
trigger_type="condition",
categories=["test_cat"],
min_severity="routine",
delivery_type="mesh_broadcast",
)
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
mock_factory = Mock()
bus = EventBus()
dispatcher = Dispatcher(config, mock_factory)
accumulator_calls = []
def mock_enqueue(event):
accumulator_calls.append(event)
event = make_event(
source="test",
category="test_cat",
severity="priority",
title="Priority Alert",
)
# Run dispatch in async context
asyncio.run(dispatcher.dispatch(event))
mock_enqueue(event)
assert len(accumulator_calls) == 1
assert mock_channel.deliver.call_count == 1
digest = StubDigestQueue()
with patch.object(dispatcher, "dispatch", wraps=dispatcher.dispatch) as mock_dispatch:
router = SeverityRouter(
immediate_handler=mock_dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(router.handle)
event = make_event(
source="test",
category="test_cat",
severity="priority",
title="Priority Alert",
)
bus.emit(event)
assert len(digest) == 1
mock_dispatch.assert_not_called()
class TestNoMatchingRule:
def test_immediate_event_with_no_matching_rule_skips_silently(self):
"""Events with no matching rules don't crash."""
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[])
)
@ -186,7 +165,6 @@ class TestNoMatchingRule:
class TestSubscriberIsolation:
def test_subscriber_exception_isolation(self):
"""Exceptions in one subscriber don't affect others."""
bus = EventBus()
def failing_handler(event):
@ -208,7 +186,6 @@ class TestSubscriberIsolation:
class TestUnknownSeverity:
def test_unknown_severity_dropped_without_crash(self):
"""Events with unknown severity are dropped gracefully."""
config = ConfigStub(
notifications=NotificationsConfigStub(rules=[])
)

View file

@ -1,132 +0,0 @@
"""Tests for ToggleFilter (Phase 2.4)."""
import pytest
from unittest.mock import MagicMock, AsyncMock
from meshai.notifications.events import make_event
from meshai.notifications.pipeline.toggle_filter import ToggleFilter
from meshai.notifications.pipeline import build_pipeline_components
from meshai.config import Config
class TestToggleFilter:
"""Unit tests for ToggleFilter."""
def test_toggle_filter_passes_through_when_enabled_is_none(self):
"""Filter with enabled_toggles=None passes all events."""
received = []
filter_ = ToggleFilter(
next_handler=received.append,
enabled_toggles=None,
)
event = make_event(
source="test",
category="weather_warning",
severity="priority",
title="Test",
)
filter_.handle(event)
assert len(received) == 1
assert received[0] is event
def test_toggle_filter_drops_event_when_toggle_not_enabled(self):
"""Filter drops events whose toggle isn't in enabled set."""
received = []
filter_ = ToggleFilter(
next_handler=received.append,
enabled_toggles={"weather"},
)
# wildfire_proximity maps to "fire" toggle
event = make_event(
source="test",
category="wildfire_proximity",
severity="priority",
title="Fire",
)
filter_.handle(event)
assert len(received) == 0
def test_toggle_filter_passes_event_when_toggle_enabled(self):
"""Filter passes events whose toggle is in enabled set."""
received = []
filter_ = ToggleFilter(
next_handler=received.append,
enabled_toggles={"weather"},
)
event = make_event(
source="test",
category="weather_warning",
severity="priority",
title="Weather",
)
filter_.handle(event)
assert len(received) == 1
def test_toggle_filter_drops_unknown_category_when_filter_active(self):
"""Unknown category maps to 'other', dropped if 'other' not enabled."""
received = []
filter_ = ToggleFilter(
next_handler=received.append,
enabled_toggles={"weather"},
)
event = make_event(
source="test",
category="bogus_category",
severity="priority",
title="Unknown",
)
filter_.handle(event)
# "bogus_category" has no toggle mapping, falls back to "other"
# "other" is not in enabled set
assert len(received) == 0
def test_toggle_filter_passes_other_when_enabled(self):
"""'other' toggle passes unknown categories when enabled."""
received = []
filter_ = ToggleFilter(
next_handler=received.append,
enabled_toggles={"other"},
)
event = make_event(
source="test",
category="bogus_category",
severity="priority",
title="Unknown",
)
filter_.handle(event)
assert len(received) == 1
class TestToggleFilterPipelineWiring:
"""Integration tests for toggle filter in pipeline."""
def test_toggle_filter_pipeline_drops_disabled_toggle(self):
"""Events for disabled toggles don't reach dispatcher or accumulator."""
config = Config()
# Pass mock LLM backend
mock_backend = MagicMock()
mock_backend.generate = AsyncMock(return_value="stub summary")
# Note: without toggles.enabled set, filter is a no-op
# This test verifies the wiring is correct
bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, mock_backend)
# Verify toggle_filter is in the chain
assert toggle_filter is not None
assert hasattr(toggle_filter, 'handle')
def test_build_pipeline_uses_provided_backend(self):
"""build_pipeline_components uses the provided llm_backend."""
config = Config()
# Sentinel backend with unique attribute
sentinel = MagicMock()
sentinel.unique_marker = "I_AM_THE_SENTINEL"
sentinel.generate = AsyncMock(return_value="sentinel summary")
bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, sentinel)
# Accumulator should have the exact sentinel instance
assert accumulator._llm is sentinel
assert accumulator._llm.unique_marker == "I_AM_THE_SENTINEL"

View file

@ -1,317 +0,0 @@
"""Tests for Phase 2.5b per-channel-type renderers."""
import json
import re
import time
import pytest
from meshai.notifications.events import NotificationPayload, make_event, make_payload_from_event
from meshai.notifications.renderers import MeshRenderer, EmailRenderer, WebhookRenderer
# ============================================================
# MESH RENDERER TESTS
# ============================================================
def test_mesh_render_short_message_single_chunk():
"""Short message produces a single chunk."""
payload = NotificationPayload(
message="Test alert",
category="test",
severity="routine",
timestamp=time.time(),
)
renderer = MeshRenderer()
chunks = renderer.render(payload)
assert len(chunks) == 1
assert "Test alert" in chunks[0]
def test_mesh_render_event_type_prefix():
"""Known event type adds toggle label prefix."""
payload = NotificationPayload(
message="Severe storm",
category="weather_warning",
severity="priority",
timestamp=time.time(),
event_type="weather_warning",
)
renderer = MeshRenderer()
chunks = renderer.render(payload)
assert len(chunks) == 1
assert chunks[0].startswith("[Weather]")
def test_mesh_render_unknown_event_type_no_prefix():
"""Unknown event type does not add a prefix."""
payload = NotificationPayload(
message="Hello",
category="made_up_thing",
severity="routine",
timestamp=time.time(),
event_type="made_up_thing",
)
renderer = MeshRenderer()
chunks = renderer.render(payload)
assert len(chunks) == 1
assert not chunks[0].startswith("[")
def test_mesh_render_long_message_chunks():
"""Long message splits into multiple chunks with counters."""
# Build a ~500 char message
long_message = "This is a very long alert message that should definitely exceed the two hundred character limit. " * 5
payload = NotificationPayload(
message=long_message,
category="weather_warning",
severity="priority",
timestamp=time.time(),
event_type="weather_warning",
)
renderer = MeshRenderer()
chunks = renderer.render(payload)
assert len(chunks) >= 2
for chunk in chunks:
assert len(chunk) <= 200, f"Chunk exceeds limit: {len(chunk)} chars"
# Check each chunk ends with "(k/N)" counter
for chunk in chunks:
assert re.search(r"\(\d+/\d+\)$", chunk), f"Missing counter: {chunk}"
def test_mesh_render_chunks_preserve_full_content():
"""Chunking preserves all words from the original message."""
# Build a message with distinct tokens
words = ["alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf",
"hotel", "india", "juliet", "kilo", "lima", "mike", "november",
"oscar", "papa", "quebec", "romeo", "sierra", "tango", "uniform",
"victor", "whiskey", "xray", "yankee", "zulu"]
long_message = " ".join(words * 3) # Repeat to span multiple chunks
payload = NotificationPayload(
message=long_message,
category="test",
severity="routine",
timestamp=time.time(),
)
renderer = MeshRenderer()
chunks = renderer.render(payload)
# Join all chunks and remove counters
combined = " ".join(chunks)
combined = re.sub(r"\s*\(\d+/\d+\)", "", combined)
# Every original word should appear
for word in words:
assert word in combined, f"Missing word: {word}"
def test_mesh_render_no_event_type():
"""Payload without event_type has no prefix."""
payload = NotificationPayload(
message="Plain message",
category="test",
severity="routine",
timestamp=time.time(),
event_type=None,
)
renderer = MeshRenderer()
chunks = renderer.render(payload)
assert len(chunks) == 1
assert chunks[0] == "Plain message"
# ============================================================
# EMAIL RENDERER TESTS
# ============================================================
def test_email_render_subject_includes_severity_and_type():
"""Subject line includes severity and event type."""
payload = NotificationPayload(
message="Storm approaching",
category="weather_warning",
severity="immediate",
timestamp=time.time(),
event_type="weather_warning",
)
renderer = EmailRenderer()
rendered = renderer.render(payload)
assert "IMMEDIATE" in rendered["subject"]
assert "Weather Warning" in rendered["subject"]
def test_email_render_body_includes_message_and_context():
"""Body includes message and structured context fields."""
fixed_time = 1700000000.0 # Known timestamp
payload = NotificationPayload(
message="Test alert message",
category="battery_warning",
severity="priority",
timestamp=fixed_time,
event_type="battery_warning",
region="Magic Valley",
node_name="BLD-MTN",
)
renderer = EmailRenderer()
rendered = renderer.render(payload)
body = rendered["body"]
assert "Test alert message" in body
assert "Magic Valley" in body
assert "BLD-MTN" in body
assert "priority" in body
assert "battery_warning" in body
# Check formatted date
assert "2023-11-14" in body # Date part of timestamp
def test_email_render_omits_missing_context():
"""Body omits lines for missing optional fields."""
payload = NotificationPayload(
message="Minimal alert",
category="test",
severity="routine",
timestamp=time.time(),
# No region, no node, no event_type
)
renderer = EmailRenderer()
rendered = renderer.render(payload)
body = rendered["body"]
assert "Minimal alert" in body
assert "Severity: routine" in body
assert "Region:" not in body
assert "Node:" not in body
assert "Category:" not in body
def test_email_render_includes_source_event():
"""Body includes source event details when present."""
event = make_event(
source="weather_adapter",
category="weather_warning",
severity="priority",
title="Severe Storm",
summary="Severe storm expected",
)
payload = make_payload_from_event(event)
renderer = EmailRenderer()
rendered = renderer.render(payload)
body = rendered["body"]
assert "weather_adapter" in body
assert "Severe Storm" in body
# ============================================================
# WEBHOOK RENDERER TESTS
# ============================================================
def test_webhook_render_has_schema_version():
"""Output includes schema_version field."""
payload = NotificationPayload(
message="Test",
category="test",
severity="routine",
timestamp=time.time(),
)
renderer = WebhookRenderer()
rendered = renderer.render(payload)
assert rendered["schema_version"] == "1.0"
def test_webhook_render_omits_none_fields():
"""None optional fields are omitted, not set to null."""
payload = NotificationPayload(
message="Test message",
category="test",
severity="routine",
timestamp=time.time(),
# All optional fields default to None
)
renderer = WebhookRenderer()
rendered = renderer.render(payload)
# Required fields present
assert "message" in rendered
assert "severity" in rendered
assert "timestamp" in rendered
assert "schema_version" in rendered
# Optional fields omitted
assert "node_id" not in rendered
assert "region" not in rendered
assert "chunk_index" not in rendered
def test_webhook_render_includes_source_event_when_present():
"""source_event dict included when payload has source event."""
event = make_event(
source="test_adapter",
category="test",
severity="routine",
title="Test Event",
)
payload = make_payload_from_event(event)
renderer = WebhookRenderer()
rendered = renderer.render(payload)
assert "source_event" in rendered
assert rendered["source_event"]["id"] == event.id
assert rendered["source_event"]["source"] == "test_adapter"
def test_webhook_render_is_json_serializable():
"""Rendered output is JSON-serializable (no datetime objects)."""
event = make_event(
source="test",
category="test",
severity="routine",
title="Test",
)
payload = make_payload_from_event(event)
payload.chunk_index = 1
payload.chunk_total = 3
payload.region = "Test Region"
payload.node_name = "Test-Node"
renderer = WebhookRenderer()
rendered = renderer.render(payload)
# Should not raise
json_str = json.dumps(rendered)
assert isinstance(json_str, str)
# Verify round-trip
parsed = json.loads(json_str)
assert parsed["message"] == payload.message
def test_webhook_render_includes_optional_fields_when_set():
"""Optional fields included when they have values."""
payload = NotificationPayload(
message="Test",
category="test_category",
severity="priority",
timestamp=time.time(),
event_type="battery_warning",
node_id="!abc123",
node_name="Test-Node",
region="Test Region",
chunk_index=2,
chunk_total=5,
)
renderer = WebhookRenderer()
rendered = renderer.render(payload)
assert rendered["category"] == "test_category"
assert rendered["event_type"] == "battery_warning"
assert rendered["node_id"] == "!abc123"
assert rendered["node_name"] == "Test-Node"
assert rendered["region"] == "Test Region"
assert rendered["chunk_index"] == 2
assert rendered["chunk_total"] == 5