feat(notifications): Phase 2.5b per-channel-type renderers

Adds dedicated renderer classes per channel type:

- MeshRenderer produces 1+ chunks <=200 chars with (k/N) counters
  when the payload overflows. Reuses the toggle-label vocabulary
  from the digest. Mesh channels skip re-chunking when the payload
  already carries chunk_index metadata (digest path).
- EmailRenderer produces {subject, body} with structured context
  lines. Plain text only; HTML body is a future polish.
- WebhookRenderer produces a JSON-serializable dict with stable
  schema_version 1.0. Optional fields omitted (not nulled) for
  compactness. Designed for reuse by Phase 2.6.5's MQTT event
  publisher.
- All four channel implementations (MeshBroadcast, MeshDM, Email,
  Webhook) now call their renderer in deliver() before transport.
- New renderer tests cover each renderer in isolation; new channel
  integration tests confirm channels actually call their renderer.

Renderers are pure functions of the payload - no network, no
state, fully testable without mocking I/O. The future MQTT
publisher will instantiate WebhookRenderer directly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-15 04:25:44 +00:00
commit b2bb7f7a95
8 changed files with 898 additions and 30 deletions

View file

@ -17,6 +17,8 @@ if TYPE_CHECKING:
from ..config import NotificationRuleConfig
from .events import NotificationPayload
from meshai.notifications.renderers import MeshRenderer, EmailRenderer, WebhookRenderer
logger = logging.getLogger(__name__)
@ -61,6 +63,7 @@ class MeshBroadcastChannel(NotificationChannel):
def __init__(self, connector: "MeshConnector", channel_index: int = 0):
self._connector = connector
self._channel = channel_index
self._renderer = MeshRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""Send alert to mesh channel."""
@ -69,13 +72,25 @@ class MeshBroadcastChannel(NotificationChannel):
return False
try:
message = alert.message or ""
self._connector.send_message(
text=message,
destination=None,
channel=self._channel,
)
logger.info("Broadcast alert to channel %d", self._channel)
# If payload already has chunk metadata (from digest), use message directly
if alert.chunk_index is not None:
self._connector.send_message(
text=alert.message or "",
destination=None,
channel=self._channel,
)
logger.info("Broadcast pre-chunked alert to channel %d", self._channel)
return True
# Render to chunks for single-event delivery
chunks = self._renderer.render(alert)
for chunk in chunks:
self._connector.send_message(
text=chunk,
destination=None,
channel=self._channel,
)
logger.info("Broadcast %d chunk(s) to channel %d", len(chunks), self._channel)
return True
except Exception as e:
logger.error("Failed to broadcast alert: %s", e)
@ -159,22 +174,29 @@ class MeshDMChannel(NotificationChannel):
def __init__(self, connector: "MeshConnector", node_ids: list[str]):
self._connector = connector
self._node_ids = node_ids
self._renderer = MeshRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""Send alert via DM to configured nodes."""
if not self._connector:
return False
message = alert.message or ""
success = True
# If payload already has chunk metadata (from digest), use message directly
if alert.chunk_index is not None:
messages = [alert.message or ""]
else:
# Render to chunks for single-event delivery
messages = self._renderer.render(alert)
success = True
for node_id in self._node_ids:
try:
node_id = str(node_id)
self._connector.send_message(text=message, destination=node_id, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
success = False
for message in messages:
try:
node_id = str(node_id)
self._connector.send_message(text=message, destination=node_id, channel=0)
except Exception as e:
logger.error("Failed to DM %s: %s", node_id, e)
success = False
return success
@ -287,19 +309,17 @@ class EmailChannel(NotificationChannel):
self._tls = smtp_tls
self._from = from_address
self._recipients = recipients
self._renderer = EmailRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""Send alert via email."""
if not self._recipients:
return False
alert_type = alert.event_type or "alert"
severity = (alert.severity or "routine").upper()
message = alert.message or ""
subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title())
body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % (
alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message
)
# Use renderer for subject and body
rendered = self._renderer.render(alert)
subject = rendered["subject"]
body = rendered["body"]
try:
loop = asyncio.get_event_loop()
@ -515,17 +535,12 @@ class WebhookChannel(NotificationChannel):
def __init__(self, url: str, headers: Optional[dict] = None):
self._url = url
self._headers = headers or {}
self._renderer = WebhookRenderer()
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""POST alert to webhook URL."""
payload = {
"type": alert.event_type,
"severity": alert.severity or "routine",
"message": alert.message or "",
"timestamp": alert.timestamp or time.time(),
"node_name": alert.node_name,
"region": alert.region,
}
# Use renderer for generic JSON payload
payload = self._renderer.render(alert)
# Discord/Slack format
if "discord.com" in self._url or "slack.com" in self._url:

View file

@ -0,0 +1,22 @@
"""Channel-type-aware renderers.
Each renderer takes a NotificationPayload and produces a
channel-type-appropriate output: mesh chunks, email subject/body,
or webhook JSON dict.
Renderers are reusable beyond channels.py the future MQTT event
publisher (Phase 2.6.5) uses WebhookRenderer for its on-the-wire
format.
"""
from meshai.notifications.renderers.base import Renderer
from meshai.notifications.renderers.mesh import MeshRenderer
from meshai.notifications.renderers.email import EmailRenderer
from meshai.notifications.renderers.webhook import WebhookRenderer
__all__ = [
"Renderer",
"MeshRenderer",
"EmailRenderer",
"WebhookRenderer",
]

View file

@ -0,0 +1,28 @@
"""Abstract base for channel-type-aware renderers.
Each renderer takes a NotificationPayload and produces a
channel-type-appropriate output (string, list, dict, etc.).
Renderers are pure functions of their input no network, no
state, no side effects beyond logging. The channel that owns a
renderer calls render() and then handles delivery.
"""
from abc import ABC, abstractmethod
from typing import Any
from meshai.notifications.events import NotificationPayload
class Renderer(ABC):
"""Base class for all channel-type renderers."""
@abstractmethod
def render(self, payload: NotificationPayload) -> Any:
"""Produce the channel-type-appropriate output.
Subclasses define the concrete return type:
- MeshRenderer returns list[str]
- EmailRenderer returns dict (subject, body)
- WebhookRenderer returns dict (JSON-serializable)
"""
raise NotImplementedError

View file

@ -0,0 +1,78 @@
"""Email channel renderer.
Produces a dict with subject and body for SMTP delivery. Plain
text body for now; HTML body is a future polish.
Subject format: "[MeshAI] <Severity> — <Event Type>"
Body format: multi-line, with the message as the lead, followed
by structured context fields (severity, region, node, timestamp,
source category).
"""
import logging
from datetime import datetime
from typing import Optional
from meshai.notifications.events import NotificationPayload
from meshai.notifications.renderers.base import Renderer
class EmailRenderer(Renderer):
"""Produce email subject and body for a single payload."""
def __init__(self):
self._logger = logging.getLogger("meshai.renderers.email")
def render(self, payload: NotificationPayload) -> dict:
"""Render the payload as {subject, body}."""
return {
"subject": self._build_subject(payload),
"body": self._build_body(payload),
}
def _build_subject(self, p: NotificationPayload) -> str:
sev = (p.severity or "routine").upper()
type_label = self._type_label(p.event_type) or "Alert"
return f"[MeshAI] {sev}{type_label}"
def _build_body(self, p: NotificationPayload) -> str:
lines: list[str] = []
# Lead line
lines.append(p.message or "(no message)")
lines.append("") # blank separator
# Structured context
lines.append(f"Severity: {p.severity or 'routine'}")
if p.event_type:
lines.append(f"Category: {p.event_type}")
if p.region:
lines.append(f"Region: {p.region}")
if p.node_name:
lines.append(f"Node: {p.node_name}")
elif p.node_id:
lines.append(f"Node: {p.node_id}")
if p.timestamp:
try:
ts = datetime.fromtimestamp(p.timestamp)
lines.append(f"Time: {ts.strftime('%Y-%m-%d %H:%M:%S')}")
except (ValueError, OverflowError):
pass
# Optional source event detail (if present)
if p.source_event is not None:
ev = p.source_event
if hasattr(ev, "source") and ev.source:
lines.append(f"Source: {ev.source}")
if hasattr(ev, "title") and ev.title and ev.title != p.message:
lines.append(f"Title: {ev.title}")
lines.append("")
lines.append("--")
lines.append("MeshAI notification")
return "\n".join(lines)
def _type_label(self, event_type: Optional[str]) -> Optional[str]:
"""Title-cased label for the event type (for subject line)."""
if not event_type:
return None
return event_type.replace("_", " ").title()

View file

@ -0,0 +1,127 @@
"""Mesh channel renderer.
Produces a list of short strings (each 200 chars by default)
suitable for mesh radio broadcast. Reuses the digest's chunking
pattern: never split a single "line" across chunks; add (k/N)
counters when the rendered output produces more than one chunk.
The mesh renderer is symmetric with the digest accumulator's
mesh chunk renderer same chunk-packing algorithm, same char
limit semantics.
"""
import logging
from typing import Optional
from meshai.notifications.events import NotificationPayload
from meshai.notifications.renderers.base import Renderer
class MeshRenderer(Renderer):
"""Produce mesh-compact chunks for a single payload."""
def __init__(self, char_limit: int = 200):
self._limit = char_limit
self._logger = logging.getLogger("meshai.renderers.mesh")
def render(self, payload: NotificationPayload) -> list[str]:
"""Render the payload as 1+ mesh-compact chunks.
Algorithm:
- Build the full message line (event_type, severity hint,
and message text see _format_one_line)
- If the line fits in self._limit chars, return [line].
- If the line exceeds self._limit, split across multiple
chunks at word boundaries when possible, and add
"(k/N)" counters at the start of each chunk.
- If a single word exceeds the chunk limit, hard-split
mid-word (rare long URLs etc.)
"""
line = self._format_one_line(payload)
if len(line) <= self._limit:
return [line]
return self._chunk_long_line(line)
def _format_one_line(self, p: NotificationPayload) -> str:
"""Build the headline for a payload.
Default format:
"[<EventTypeTitle>] <message>"
where EventTypeTitle is a short label derived from
p.event_type (e.g. "weather_warning" "Weather"). If
p.event_type is None, omit the prefix.
Truncates the message at the limit only if the prefix
is short enough; otherwise lets the chunker handle it.
"""
prefix = self._toggle_label(p.event_type)
if prefix:
return f"[{prefix}] {p.message}"
return p.message
def _toggle_label(self, event_type: Optional[str]) -> Optional[str]:
"""Map an event category to a short toggle label.
Looks up the toggle the category belongs to and returns
the toggle's display label. If unknown, returns None
(no prefix added).
"""
if not event_type:
return None
from meshai.notifications.categories import get_toggle
toggle = get_toggle(event_type)
if not toggle:
return None
# Same label set used by the digest renderer
TOGGLE_LABELS = {
"mesh_health": "Mesh",
"weather": "Weather",
"fire": "Fire",
"rf_propagation": "RF",
"roads": "Roads",
"avalanche": "Avalanche",
"seismic": "Seismic",
"tracking": "Tracking",
"other": "Other",
}
return TOGGLE_LABELS.get(toggle, toggle.title())
def _chunk_long_line(self, line: str) -> list[str]:
"""Split a long line into chunks ≤ self._limit each.
Reserves ~10 chars per chunk for the "(k/N)" counter
suffix. Splits at word boundaries; falls back to
mid-word split if a single word exceeds the budget.
"""
# Reserve space for " (k/N)" — generous to 10 chars
body_budget = self._limit - 10
if body_budget <= 0:
body_budget = self._limit # extreme small limit; ignore counter
words = line.split(" ")
chunks: list[str] = []
current = ""
for word in words:
# Hard-split words longer than body_budget (rare)
while len(word) > body_budget:
if current:
chunks.append(current)
current = ""
chunks.append(word[:body_budget])
word = word[body_budget:]
# Add word to current chunk if it fits
tentative = (current + " " + word) if current else word
if len(tentative) <= body_budget:
current = tentative
else:
chunks.append(current)
current = word
if current:
chunks.append(current)
# Add counters: "DIGEST"-style "(k/N)" suffix
total = len(chunks)
if total == 1:
return chunks
return [f"{chunk} ({i+1}/{total})" for i, chunk in enumerate(chunks)]

View file

@ -0,0 +1,67 @@
"""Webhook channel renderer.
Produces a JSON-serializable dict with stable field names. Also
intended for reuse by the future MQTT event publisher (Phase
2.6.5), which wants the same structured shape on the wire.
Field names use snake_case. Optional fields are omitted from the
output when None, NOT set to null keeps payloads compact and
avoids ambiguity between "field absent" and "field explicitly
null".
"""
import logging
from typing import Any
from meshai.notifications.events import NotificationPayload
from meshai.notifications.renderers.base import Renderer
# Schema version for the wire format. Bump when making
# breaking changes to the field shape. External consumers
# can use this to handle multiple versions.
WEBHOOK_SCHEMA_VERSION = "1.0"
class WebhookRenderer(Renderer):
"""Produce a JSON-serializable dict for a single payload."""
def __init__(self):
self._logger = logging.getLogger("meshai.renderers.webhook")
def render(self, payload: NotificationPayload) -> dict:
"""Render the payload as a structured dict."""
out: dict[str, Any] = {
"schema_version": WEBHOOK_SCHEMA_VERSION,
"message": payload.message,
"severity": payload.severity or "routine",
"timestamp": payload.timestamp,
}
# Optional fields — omit if None
for src_attr, dst_key in (
("category", "category"),
("event_type", "event_type"),
("node_id", "node_id"),
("node_name", "node_name"),
("region", "region"),
("chunk_index", "chunk_index"),
("chunk_total", "chunk_total"),
):
value = getattr(payload, src_attr, None)
if value is not None:
out[dst_key] = value
# Optional source event detail
if payload.source_event is not None:
ev = payload.source_event
source_event: dict[str, Any] = {}
for attr in ("id", "source", "title", "expires", "group_key"):
if hasattr(ev, attr):
value = getattr(ev, attr)
if value is not None:
source_event[attr] = value
if source_event:
out["source_event"] = source_event
return out