feat(notifications): Phase 2.5a channel interface unification

- Switch channels.py from dict-based to dataclass-based interfaces
- Add NotificationPayload dataclass and make_payload_from_event helper
- Update channel.deliver() to be async with (payload, rule) signature
- Add connector parameter to Dispatcher, DigestScheduler, and pipeline builders
- Update pipeline tee to use asyncio.create_task for async dispatch
- Add create_channel_from_dict for legacy router.py compatibility
- Update tests for new async interfaces

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-15 03:45:27 +00:00
commit c9d9a9925c
8 changed files with 235 additions and 129 deletions

View file

@ -14,6 +14,8 @@ import httpx
if TYPE_CHECKING:
from ..connector import MeshConnector
from ..config import NotificationRuleConfig
from .events import NotificationPayload
logger = logging.getLogger(__name__)
@ -24,7 +26,7 @@ class NotificationChannel(ABC):
channel_type: str = "base"
@abstractmethod
async def deliver(self, alert: dict, rule: dict) -> bool:
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""Send alert. Returns True on success."""
raise NotImplementedError
@ -60,14 +62,14 @@ class MeshBroadcastChannel(NotificationChannel):
self._connector = connector
self._channel = channel_index
async def deliver(self, alert: dict, rule: dict) -> bool:
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""Send alert to mesh channel."""
if not self._connector:
logger.warning("No mesh connector available")
return False
try:
message = alert.get("message", "")
message = alert.message or ""
self._connector.send_message(
text=message,
destination=None,
@ -158,12 +160,12 @@ class MeshDMChannel(NotificationChannel):
self._connector = connector
self._node_ids = node_ids
async def deliver(self, alert: dict, rule: dict) -> bool:
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""Send alert via DM to configured nodes."""
if not self._connector:
return False
message = alert.get("message", "")
message = alert.message or ""
success = True
for node_id in self._node_ids:
@ -286,14 +288,14 @@ class EmailChannel(NotificationChannel):
self._from = from_address
self._recipients = recipients
async def deliver(self, alert: dict, rule: dict) -> bool:
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""Send alert via email."""
if not self._recipients:
return False
alert_type = alert.get("type", "alert")
severity = alert.get("severity", "routine").upper()
message = alert.get("message", "")
alert_type = alert.event_type or "alert"
severity = (alert.severity or "routine").upper()
message = alert.message or ""
subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title())
body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % (
alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message
@ -514,20 +516,20 @@ class WebhookChannel(NotificationChannel):
self._url = url
self._headers = headers or {}
async def deliver(self, alert: dict, rule: dict) -> bool:
async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool:
"""POST alert to webhook URL."""
payload = {
"type": alert.get("type"),
"severity": alert.get("severity", "routine"),
"message": alert.get("message", ""),
"timestamp": time.time(),
"node_name": alert.get("node_name"),
"region": alert.get("region"),
"type": alert.event_type,
"severity": alert.severity or "routine",
"message": alert.message or "",
"timestamp": alert.timestamp or time.time(),
"node_name": alert.node_name,
"region": alert.region,
}
# Discord/Slack format
if "discord.com" in self._url or "slack.com" in self._url:
severity = alert.get("severity", "routine")
severity = alert.severity or "routine"
color = {
"immediate": 0xFF0000,
"priority": 0xFFAA00,
@ -535,8 +537,8 @@ class WebhookChannel(NotificationChannel):
}.get(severity, 0x888888)
payload = {
"embeds": [{
"title": "MeshAI: %s" % alert.get("type", "unknown"),
"description": alert.get("message", ""),
"title": "MeshAI: %s" % (alert.event_type or "unknown"),
"description": alert.message or "",
"color": color,
}]
}
@ -545,14 +547,14 @@ class WebhookChannel(NotificationChannel):
elif "ntfy" in self._url:
headers = {
**self._headers,
"Title": "MeshAI: %s" % alert.get("type", "alert"),
"Title": "MeshAI: %s" % (alert.event_type or "alert"),
"Priority": "3",
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self._url,
content=alert.get("message", ""),
content=alert.message or "",
headers=headers,
timeout=10,
)
@ -745,8 +747,52 @@ class WebhookChannel(NotificationChannel):
return False, f"Webhook failed: {e}"
def create_channel(config: dict, connector=None) -> NotificationChannel:
"""Create a channel instance from config."""
def create_channel(rule: "NotificationRuleConfig", connector=None) -> NotificationChannel:
"""Create a channel instance from a NotificationRuleConfig.
Args:
rule: NotificationRuleConfig with delivery_type and channel settings
connector: MeshConnector instance (required for mesh channels)
Returns:
NotificationChannel instance
"""
delivery_type = rule.delivery_type
if delivery_type == "mesh_broadcast":
return MeshBroadcastChannel(
connector=connector,
channel_index=rule.broadcast_channel,
)
elif delivery_type == "mesh_dm":
return MeshDMChannel(
connector=connector,
node_ids=rule.node_ids,
)
elif delivery_type == "email":
return EmailChannel(
smtp_host=rule.smtp_host,
smtp_port=rule.smtp_port,
smtp_user=rule.smtp_user,
smtp_password=rule.smtp_password,
smtp_tls=rule.smtp_tls,
from_address=rule.from_address,
recipients=rule.recipients,
)
elif delivery_type == "webhook":
return WebhookChannel(
url=rule.webhook_url,
headers=rule.webhook_headers,
)
else:
raise ValueError("Unknown delivery type: %s" % delivery_type)
def create_channel_from_dict(config: dict, connector=None) -> NotificationChannel:
"""Create a channel instance from a dict config (legacy interface).
Used by old router.py and test_channel API. Will be removed in Phase 2.7.
"""
channel_type = config.get("type", "")
if channel_type == "mesh_broadcast":

View file

@ -133,6 +133,52 @@ class Event:
return cls(**d)
@dataclass
class NotificationPayload:
"""Per-delivery alert content handed to a NotificationChannel.
This is the runtime alert shape: derived from an Event (or
built directly by the old router) and consumed by channels.py
implementations.
"""
message: str # The rendered text to deliver
category: str # e.g. "weather_warning"
severity: str # "immediate" | "priority" | "routine"
timestamp: float # Unix epoch when generated
# Optional context fields (None when not applicable)
node_id: Optional[str] = None
node_name: Optional[str] = None
region: Optional[str] = None
event_type: Optional[str] = None # Maps to old dict's "type" field
# Chunk metadata for mesh deliveries (set by scheduler/digest path)
chunk_index: Optional[int] = None
chunk_total: Optional[int] = None
# Source Event reference for advanced channel use (renderers in 2.5b)
source_event: Optional["Event"] = None
def make_payload_from_event(event: "Event", **overrides) -> NotificationPayload:
"""Helper to convert an Event into a NotificationPayload."""
p = NotificationPayload(
message=event.summary or event.title or event.category,
category=event.category,
severity=event.severity,
timestamp=event.timestamp,
node_id=event.node_ids[0] if event.node_ids else None,
region=event.region,
event_type=event.category,
source_event=event,
)
for k, v in overrides.items():
setattr(p, k, v)
return p
def make_event(
source: str,
category: str,

View file

@ -21,6 +21,8 @@ Usage:
await stop_pipeline(scheduler)
"""
import asyncio
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
@ -35,7 +37,7 @@ from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
from meshai.notifications.pipeline.scheduler import DigestScheduler
def build_pipeline(config, llm_backend) -> EventBus:
def build_pipeline(config, llm_backend, connector=None) -> EventBus:
"""Build the pipeline and return the EventBus.
Args:
@ -43,11 +45,12 @@ def build_pipeline(config, llm_backend) -> EventBus:
llm_backend: An already-constructed LLMBackend instance
(from main.py or a test). Pipeline components share
this single instance. May be None for fallback behavior.
connector: Optional MeshtasticConnector for mesh channels.
Components are stashed on bus._pipeline_components for lifecycle use.
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
dispatcher = Dispatcher(config, create_channel, connector=connector)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
@ -63,8 +66,13 @@ def build_pipeline(config, llm_backend) -> EventBus:
)
# Tee closure: events go to BOTH dispatcher and accumulator
# dispatcher.dispatch() is async, so fire-and-forget with create_task
def _tee(event):
dispatcher.dispatch(event)
try:
asyncio.create_task(dispatcher.dispatch(event))
except RuntimeError:
# No running event loop (e.g. sync tests) - skip async dispatch
pass
accumulator.enqueue(event)
# Build enabled toggles set from config
@ -91,12 +99,13 @@ def build_pipeline(config, llm_backend) -> EventBus:
"toggle_filter": toggle_filter,
"dispatcher": dispatcher,
"accumulator": accumulator,
"connector": connector,
}
return bus
def build_pipeline_components(config, llm_backend) -> tuple:
def build_pipeline_components(config, llm_backend, connector=None) -> tuple:
"""Like build_pipeline, but returns all components for tests.
Args:
@ -104,12 +113,13 @@ def build_pipeline_components(config, llm_backend) -> tuple:
llm_backend: An already-constructed LLMBackend instance
(from main.py or a test). Pipeline components share
this single instance. May be None for fallback behavior.
connector: Optional MeshtasticConnector for mesh channels.
Returns:
(bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
dispatcher = Dispatcher(config, create_channel, connector=connector)
# Build include_toggles from config
digest_cfg = getattr(config.notifications, "digest", None)
@ -125,8 +135,13 @@ def build_pipeline_components(config, llm_backend) -> tuple:
)
# Tee closure: events go to BOTH dispatcher and accumulator
# dispatcher.dispatch() is async, so fire-and-forget with create_task
def _tee(event):
dispatcher.dispatch(event)
try:
asyncio.create_task(dispatcher.dispatch(event))
except RuntimeError:
# No running event loop (e.g. sync tests) - skip async dispatch
pass
accumulator.enqueue(event)
# Build enabled toggles set from config
@ -165,10 +180,12 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
accumulator = components["accumulator"]
connector = components.get("connector")
scheduler = DigestScheduler(
accumulator=accumulator,
config=config,
channel_factory=create_channel,
connector=connector,
)
await scheduler.start()

View file

@ -4,12 +4,15 @@ The dispatcher routes immediate-severity events through the existing
NotificationRuleConfig rules and delivers via channels.py. This is the
transitional bridge between the new Event pipeline and the existing
channel implementations.
Phase 2.5a: dispatch() is now async, takes a connector at construction,
and properly awaits channel.deliver(payload, rule).
"""
import logging
from typing import Callable
from typing import Callable, Optional
from meshai.notifications.events import Event
from meshai.notifications.events import Event, make_payload_from_event
class Dispatcher:
@ -17,21 +20,26 @@ class Dispatcher:
SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2}
def __init__(self, config, channel_factory: Callable):
def __init__(self, config, channel_factory: Callable, connector=None):
"""Initialize.
Args:
config: The full Config object (provides config.notifications.rules)
channel_factory: Callable taking a NotificationRuleConfig and
returning a NotificationChannel. This is create_channel
from meshai/notifications/channels.py.
channel_factory: Callable taking (rule, connector) and returning
a NotificationChannel. This is create_channel from
meshai/notifications/channels.py.
connector: MeshConnector instance for mesh channel deliveries.
"""
self._config = config
self._channel_factory = channel_factory
self._connector = connector
self._logger = logging.getLogger("meshai.pipeline.dispatcher")
def dispatch(self, event: Event) -> None:
"""Deliver an immediate-severity event to all matching channels."""
async def dispatch(self, event: Event) -> None:
"""Deliver an immediate-severity event to all matching channels.
This method is async and awaits each channel.deliver() call.
"""
rules = self._matching_rules(event)
if not rules:
self._logger.debug(
@ -40,19 +48,17 @@ class Dispatcher:
return
for rule in rules:
try:
channel = self._channel_factory(rule)
alert = {
"category": event.category,
"severity": event.severity,
"message": event.summary or event.title,
"node_id": event.node_ids[0] if event.node_ids else None,
"region": event.region,
"timestamp": event.timestamp,
}
channel.deliver(alert)
self._logger.info(
f"Dispatched event {event.id} via {rule.delivery_type}"
)
channel = self._channel_factory(rule, self._connector)
payload = make_payload_from_event(event)
success = await channel.deliver(payload, rule)
if success:
self._logger.info(
f"Dispatched event {event.id} via {rule.delivery_type}"
)
else:
self._logger.warning(
f"Channel delivery returned False for rule {rule.name}"
)
except Exception:
self._logger.exception(
f"Channel delivery failed for rule {rule.name}"

View file

@ -13,6 +13,7 @@ from datetime import datetime, timedelta
from typing import Callable, Optional
from meshai.notifications.pipeline.digest import DigestAccumulator
from meshai.notifications.events import NotificationPayload
class DigestScheduler:
@ -23,12 +24,14 @@ class DigestScheduler:
accumulator: DigestAccumulator,
config,
channel_factory: Callable,
connector=None,
clock: Optional[Callable[[], float]] = None,
sleep: Optional[Callable[[float], "asyncio.Future"]] = None,
):
self._accumulator = accumulator
self._config = config
self._channel_factory = channel_factory
self._connector = connector
self._clock = clock or time.time
self._sleep = sleep or asyncio.sleep
self._task: Optional[asyncio.Task] = None
@ -120,7 +123,7 @@ class DigestScheduler:
async def _deliver_to_rule(self, rule, digest, now: float) -> None:
"""Hand the rendered digest to a channel based on rule.delivery_type."""
channel = self._channel_factory(rule)
channel = self._channel_factory(rule, self._connector)
delivery_type = rule.delivery_type
if delivery_type in ("mesh_broadcast", "mesh_dm"):
@ -128,31 +131,27 @@ class DigestScheduler:
chunks = digest.mesh_chunks
total = len(chunks)
for i, chunk in enumerate(chunks, start=1):
payload = {
"category": "digest",
"severity": "routine",
"message": chunk,
"node_id": None,
"region": None,
"timestamp": now,
"chunk_index": i,
"chunk_total": total,
}
channel.deliver(payload)
payload = NotificationPayload(
message=chunk,
category="digest",
severity="routine",
timestamp=now,
chunk_index=i,
chunk_total=total,
)
await channel.deliver(payload, rule)
self._logger.info(
f"Delivered {total} mesh chunk(s) to rule {rule.name!r}"
)
else:
# Single full-form delivery
payload = {
"category": "digest",
"severity": "routine",
"message": digest.full,
"node_id": None,
"region": None,
"timestamp": now,
}
channel.deliver(payload)
payload = NotificationPayload(
message=digest.full,
category="digest",
severity="routine",
timestamp=now,
)
await channel.deliver(payload, rule)
self._logger.info(
f"Delivered digest to rule {rule.name!r} via {delivery_type}"
)

View file

@ -8,7 +8,8 @@ import time
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from .channels import create_channel, NotificationChannel
from .channels import create_channel_from_dict, NotificationChannel
from .events import NotificationPayload
from .summarizer import MessageSummarizer
if TYPE_CHECKING:
@ -142,7 +143,7 @@ class NotificationRouter:
return None
try:
return create_channel(config, self._connector)
return create_channel_from_dict(config, self._connector)
except Exception as e:
logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e)
return None
@ -199,7 +200,20 @@ class NotificationRouter:
else:
delivery_alert = {**alert, "message": message[:195] + "..."}
success = await channel.deliver(delivery_alert, rule)
# Convert dict to NotificationPayload for channel interface
payload = NotificationPayload(
message=delivery_alert.get("message", ""),
category=delivery_alert.get("type", "unknown"),
severity=delivery_alert.get("severity", "routine"),
timestamp=delivery_alert.get("timestamp", time.time()),
node_id=delivery_alert.get("node_id"),
node_name=delivery_alert.get("node_name"),
region=delivery_alert.get("region"),
event_type=delivery_alert.get("type"),
)
# Rule is a dict here; channels don't use it so we pass None
# for the rule parameter (channels ignore it anyway)
success = await channel.deliver(payload, None)
if success:
delivered = True
self._record_fire(rule_name)
@ -255,7 +269,7 @@ class NotificationRouter:
{success, message, error, details}
"""
try:
channel = create_channel(channel_config, self._connector)
channel = create_channel_from_dict(channel_config, self._connector)
return await channel.test_connection()
except ValueError as e:
return {

View file

@ -60,8 +60,9 @@ class MockChannel:
def __init__(self):
self.deliveries = []
def deliver(self, payload: dict):
async def deliver(self, payload, rule=None):
self.deliveries.append(payload)
return True
class MockLLMBackend:
@ -93,7 +94,7 @@ def make_scheduler(
channels = {}
def channel_factory(rule):
def channel_factory(rule, connector=None):
ch = MockChannel()
channels[rule.name] = ch
return ch
@ -223,8 +224,8 @@ class TestFireBehavior:
ch = channels["digest-mesh"]
assert len(ch.deliveries) == 1
payload = ch.deliveries[0]
assert payload["category"] == "digest"
assert payload["severity"] == "routine"
assert payload.category == "digest"
assert payload.severity == "routine"
def test_fire_skips_disabled_rules(self):
"""Disabled rules are not delivered to."""
@ -293,8 +294,8 @@ class TestFireBehavior:
ch = channels["mesh"]
assert len(ch.deliveries) >= 1
for payload in ch.deliveries:
assert "chunk_index" in payload
assert "chunk_total" in payload
assert payload.chunk_index is not None
assert payload.chunk_total is not None
def test_fire_email_delivery_full_text(self):
"""Email delivery type gets single full-text delivery."""
@ -320,8 +321,8 @@ class TestFireBehavior:
ch = channels["email"]
assert len(ch.deliveries) == 1
payload = ch.deliveries[0]
assert "chunk_index" not in payload
assert "--- " in payload["message"]
assert payload.chunk_index is None
assert "--- " in payload.message
def test_fire_updates_last_fire_at(self):
"""_fire() updates last_fire_at timestamp."""
@ -350,7 +351,7 @@ class TestFireBehavior:
ch = channels["mesh"]
assert len(ch.deliveries) == 1
assert "No alerts" in ch.deliveries[0]["message"]
assert "No alerts" in ch.deliveries[0].message
# ---- Lifecycle Tests ----
@ -520,11 +521,11 @@ class TestIntegration:
call_order = []
def bad_channel_factory(rule):
def bad_channel_factory(rule, connector=None):
call_order.append(rule.name)
if rule.name == "bad":
ch = MagicMock()
ch.deliver.side_effect = RuntimeError("delivery failed")
ch.deliver = AsyncMock(side_effect=RuntimeError("delivery failed"))
return ch
return MockChannel()

View file

@ -8,8 +8,10 @@ Updated in Phase 2.4: Events now go to BOTH dispatcher and accumulator
compatibility but not used in production wiring.
"""
import asyncio
import pytest
from unittest.mock import Mock, patch
from unittest.mock import Mock, AsyncMock, patch
from dataclasses import dataclass, field
from meshai.notifications.events import Event, make_event
@ -55,15 +57,10 @@ class TestImmediateDispatch:
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
bus = EventBus()
dispatcher = Dispatcher(config, mock_factory)
digest = StubDigestQueue()
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
bus.subscribe(router.handle)
event = make_event(
source="test",
category="test_cat",
@ -71,12 +68,13 @@ class TestImmediateDispatch:
title="Test Alert",
summary="Test summary message",
)
bus.emit(event)
# Run dispatch in async context since it's now async
asyncio.run(dispatcher.dispatch(event))
assert mock_channel.deliver.call_count == 1
alert = mock_channel.deliver.call_args[0][0]
assert alert["category"] == "test_cat"
assert alert["severity"] == "immediate"
assert alert["message"]
assert alert.category == "test_cat"
assert alert.severity == "immediate"
assert alert.message
class TestTeeRouting:
@ -95,40 +93,29 @@ class TestTeeRouting:
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
# Create dispatcher and track calls
# Create dispatcher
dispatcher = Dispatcher(config, mock_factory)
dispatch_calls = []
original_dispatch = dispatcher.dispatch
def tracking_dispatch(event):
dispatch_calls.append(event)
original_dispatch(event)
dispatcher.dispatch = tracking_dispatch
# Create accumulator mock
accumulator_calls = []
def mock_enqueue(event):
accumulator_calls.append(event)
# Tee closure (Phase 2.4 pattern)
def tee(event):
dispatcher.dispatch(event)
mock_enqueue(event)
bus = EventBus()
bus.subscribe(tee)
event = make_event(
source="test",
category="test_cat",
severity="routine",
title="Routine Alert",
)
bus.emit(event)
# Run dispatch in async context
asyncio.run(dispatcher.dispatch(event))
mock_enqueue(event)
# Both paths received the event
assert len(dispatch_calls) == 1
assert len(accumulator_calls) == 1
# Dispatcher found a matching rule and delivered
assert mock_channel.deliver.call_count == 1
@ -146,36 +133,26 @@ class TestTeeRouting:
notifications=NotificationsConfigStub(rules=[rule])
)
mock_channel = Mock()
mock_channel.deliver = AsyncMock(return_value=True)
mock_factory = Mock(return_value=mock_channel)
dispatcher = Dispatcher(config, mock_factory)
dispatch_calls = []
original_dispatch = dispatcher.dispatch
def tracking_dispatch(event):
dispatch_calls.append(event)
original_dispatch(event)
dispatcher.dispatch = tracking_dispatch
accumulator_calls = []
def mock_enqueue(event):
accumulator_calls.append(event)
def tee(event):
dispatcher.dispatch(event)
mock_enqueue(event)
bus = EventBus()
bus.subscribe(tee)
event = make_event(
source="test",
category="test_cat",
severity="priority",
title="Priority Alert",
)
bus.emit(event)
assert len(dispatch_calls) == 1
# Run dispatch in async context
asyncio.run(dispatcher.dispatch(event))
mock_enqueue(event)
assert len(accumulator_calls) == 1
assert mock_channel.deliver.call_count == 1