From c9d9a9925c5fd583a999b0d21da2c5200f9300d1 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Fri, 15 May 2026 03:45:27 +0000 Subject: [PATCH] feat(notifications): Phase 2.5a channel interface unification - Switch channels.py from dict-based to dataclass-based interfaces - Add NotificationPayload dataclass and make_payload_from_event helper - Update channel.deliver() to be async with (payload, rule) signature - Add connector parameter to Dispatcher, DigestScheduler, and pipeline builders - Update pipeline tee to use asyncio.create_task for async dispatch - Add create_channel_from_dict for legacy router.py compatibility - Update tests for new async interfaces Co-Authored-By: Claude Opus 4.5 --- meshai/notifications/channels.py | 92 +++++++++++++++------ meshai/notifications/events.py | 46 +++++++++++ meshai/notifications/pipeline/__init__.py | 29 +++++-- meshai/notifications/pipeline/dispatcher.py | 48 ++++++----- meshai/notifications/pipeline/scheduler.py | 41 +++++---- meshai/notifications/router.py | 22 ++++- tests/test_pipeline_scheduler.py | 23 +++--- tests/test_pipeline_skeleton.py | 63 +++++--------- 8 files changed, 235 insertions(+), 129 deletions(-) diff --git a/meshai/notifications/channels.py b/meshai/notifications/channels.py index 8c6f917..fd832aa 100644 --- a/meshai/notifications/channels.py +++ b/meshai/notifications/channels.py @@ -14,6 +14,8 @@ import httpx if TYPE_CHECKING: from ..connector import MeshConnector + from ..config import NotificationRuleConfig + from .events import NotificationPayload logger = logging.getLogger(__name__) @@ -24,7 +26,7 @@ class NotificationChannel(ABC): channel_type: str = "base" @abstractmethod - async def deliver(self, alert: dict, rule: dict) -> bool: + async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: """Send alert. Returns True on success.""" raise NotImplementedError @@ -60,14 +62,14 @@ class MeshBroadcastChannel(NotificationChannel): self._connector = connector self._channel = channel_index - async def deliver(self, alert: dict, rule: dict) -> bool: + async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: """Send alert to mesh channel.""" if not self._connector: logger.warning("No mesh connector available") return False try: - message = alert.get("message", "") + message = alert.message or "" self._connector.send_message( text=message, destination=None, @@ -158,12 +160,12 @@ class MeshDMChannel(NotificationChannel): self._connector = connector self._node_ids = node_ids - async def deliver(self, alert: dict, rule: dict) -> bool: + async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: """Send alert via DM to configured nodes.""" if not self._connector: return False - message = alert.get("message", "") + message = alert.message or "" success = True for node_id in self._node_ids: @@ -286,14 +288,14 @@ class EmailChannel(NotificationChannel): self._from = from_address self._recipients = recipients - async def deliver(self, alert: dict, rule: dict) -> bool: + async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: """Send alert via email.""" if not self._recipients: return False - alert_type = alert.get("type", "alert") - severity = alert.get("severity", "routine").upper() - message = alert.get("message", "") + alert_type = alert.event_type or "alert" + severity = (alert.severity or "routine").upper() + message = alert.message or "" subject = "[MeshAI %s] %s" % (severity, alert_type.replace("_", " ").title()) body = "MeshAI Alert\n\nType: %s\nSeverity: %s\nTime: %s\n\n%s\n\n---\nAutomated message from MeshAI." % ( alert_type, severity, time.strftime("%Y-%m-%d %H:%M:%S"), message @@ -514,20 +516,20 @@ class WebhookChannel(NotificationChannel): self._url = url self._headers = headers or {} - async def deliver(self, alert: dict, rule: dict) -> bool: + async def deliver(self, alert: "NotificationPayload", rule: "NotificationRuleConfig") -> bool: """POST alert to webhook URL.""" payload = { - "type": alert.get("type"), - "severity": alert.get("severity", "routine"), - "message": alert.get("message", ""), - "timestamp": time.time(), - "node_name": alert.get("node_name"), - "region": alert.get("region"), + "type": alert.event_type, + "severity": alert.severity or "routine", + "message": alert.message or "", + "timestamp": alert.timestamp or time.time(), + "node_name": alert.node_name, + "region": alert.region, } # Discord/Slack format if "discord.com" in self._url or "slack.com" in self._url: - severity = alert.get("severity", "routine") + severity = alert.severity or "routine" color = { "immediate": 0xFF0000, "priority": 0xFFAA00, @@ -535,8 +537,8 @@ class WebhookChannel(NotificationChannel): }.get(severity, 0x888888) payload = { "embeds": [{ - "title": "MeshAI: %s" % alert.get("type", "unknown"), - "description": alert.get("message", ""), + "title": "MeshAI: %s" % (alert.event_type or "unknown"), + "description": alert.message or "", "color": color, }] } @@ -545,14 +547,14 @@ class WebhookChannel(NotificationChannel): elif "ntfy" in self._url: headers = { **self._headers, - "Title": "MeshAI: %s" % alert.get("type", "alert"), + "Title": "MeshAI: %s" % (alert.event_type or "alert"), "Priority": "3", } try: async with httpx.AsyncClient() as client: resp = await client.post( self._url, - content=alert.get("message", ""), + content=alert.message or "", headers=headers, timeout=10, ) @@ -745,8 +747,52 @@ class WebhookChannel(NotificationChannel): return False, f"Webhook failed: {e}" -def create_channel(config: dict, connector=None) -> NotificationChannel: - """Create a channel instance from config.""" +def create_channel(rule: "NotificationRuleConfig", connector=None) -> NotificationChannel: + """Create a channel instance from a NotificationRuleConfig. + + Args: + rule: NotificationRuleConfig with delivery_type and channel settings + connector: MeshConnector instance (required for mesh channels) + + Returns: + NotificationChannel instance + """ + delivery_type = rule.delivery_type + + if delivery_type == "mesh_broadcast": + return MeshBroadcastChannel( + connector=connector, + channel_index=rule.broadcast_channel, + ) + elif delivery_type == "mesh_dm": + return MeshDMChannel( + connector=connector, + node_ids=rule.node_ids, + ) + elif delivery_type == "email": + return EmailChannel( + smtp_host=rule.smtp_host, + smtp_port=rule.smtp_port, + smtp_user=rule.smtp_user, + smtp_password=rule.smtp_password, + smtp_tls=rule.smtp_tls, + from_address=rule.from_address, + recipients=rule.recipients, + ) + elif delivery_type == "webhook": + return WebhookChannel( + url=rule.webhook_url, + headers=rule.webhook_headers, + ) + else: + raise ValueError("Unknown delivery type: %s" % delivery_type) + + +def create_channel_from_dict(config: dict, connector=None) -> NotificationChannel: + """Create a channel instance from a dict config (legacy interface). + + Used by old router.py and test_channel API. Will be removed in Phase 2.7. + """ channel_type = config.get("type", "") if channel_type == "mesh_broadcast": diff --git a/meshai/notifications/events.py b/meshai/notifications/events.py index 72d322c..82ad5f6 100644 --- a/meshai/notifications/events.py +++ b/meshai/notifications/events.py @@ -133,6 +133,52 @@ class Event: return cls(**d) + +@dataclass +class NotificationPayload: + """Per-delivery alert content handed to a NotificationChannel. + + This is the runtime alert shape: derived from an Event (or + built directly by the old router) and consumed by channels.py + implementations. + """ + message: str # The rendered text to deliver + category: str # e.g. "weather_warning" + severity: str # "immediate" | "priority" | "routine" + timestamp: float # Unix epoch when generated + + # Optional context fields (None when not applicable) + node_id: Optional[str] = None + node_name: Optional[str] = None + region: Optional[str] = None + event_type: Optional[str] = None # Maps to old dict's "type" field + + # Chunk metadata for mesh deliveries (set by scheduler/digest path) + chunk_index: Optional[int] = None + chunk_total: Optional[int] = None + + # Source Event reference for advanced channel use (renderers in 2.5b) + source_event: Optional["Event"] = None + + +def make_payload_from_event(event: "Event", **overrides) -> NotificationPayload: + """Helper to convert an Event into a NotificationPayload.""" + p = NotificationPayload( + message=event.summary or event.title or event.category, + category=event.category, + severity=event.severity, + timestamp=event.timestamp, + node_id=event.node_ids[0] if event.node_ids else None, + region=event.region, + event_type=event.category, + source_event=event, + ) + for k, v in overrides.items(): + setattr(p, k, v) + return p + + + def make_event( source: str, category: str, diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 32a50b6..05d96ef 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -21,6 +21,8 @@ Usage: await stop_pipeline(scheduler) """ +import asyncio + from meshai.notifications.channels import create_channel from meshai.notifications.pipeline.bus import EventBus, get_bus from meshai.notifications.pipeline.severity_router import ( @@ -35,7 +37,7 @@ from meshai.notifications.pipeline.digest import DigestAccumulator, Digest from meshai.notifications.pipeline.scheduler import DigestScheduler -def build_pipeline(config, llm_backend) -> EventBus: +def build_pipeline(config, llm_backend, connector=None) -> EventBus: """Build the pipeline and return the EventBus. Args: @@ -43,11 +45,12 @@ def build_pipeline(config, llm_backend) -> EventBus: llm_backend: An already-constructed LLMBackend instance (from main.py or a test). Pipeline components share this single instance. May be None for fallback behavior. + connector: Optional MeshtasticConnector for mesh channels. Components are stashed on bus._pipeline_components for lifecycle use. """ bus = EventBus() - dispatcher = Dispatcher(config, create_channel) + dispatcher = Dispatcher(config, create_channel, connector=connector) # Build include_toggles from config digest_cfg = getattr(config.notifications, "digest", None) @@ -63,8 +66,13 @@ def build_pipeline(config, llm_backend) -> EventBus: ) # Tee closure: events go to BOTH dispatcher and accumulator + # dispatcher.dispatch() is async, so fire-and-forget with create_task def _tee(event): - dispatcher.dispatch(event) + try: + asyncio.create_task(dispatcher.dispatch(event)) + except RuntimeError: + # No running event loop (e.g. sync tests) - skip async dispatch + pass accumulator.enqueue(event) # Build enabled toggles set from config @@ -91,12 +99,13 @@ def build_pipeline(config, llm_backend) -> EventBus: "toggle_filter": toggle_filter, "dispatcher": dispatcher, "accumulator": accumulator, + "connector": connector, } return bus -def build_pipeline_components(config, llm_backend) -> tuple: +def build_pipeline_components(config, llm_backend, connector=None) -> tuple: """Like build_pipeline, but returns all components for tests. Args: @@ -104,12 +113,13 @@ def build_pipeline_components(config, llm_backend) -> tuple: llm_backend: An already-constructed LLMBackend instance (from main.py or a test). Pipeline components share this single instance. May be None for fallback behavior. + connector: Optional MeshtasticConnector for mesh channels. Returns: (bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator). """ bus = EventBus() - dispatcher = Dispatcher(config, create_channel) + dispatcher = Dispatcher(config, create_channel, connector=connector) # Build include_toggles from config digest_cfg = getattr(config.notifications, "digest", None) @@ -125,8 +135,13 @@ def build_pipeline_components(config, llm_backend) -> tuple: ) # Tee closure: events go to BOTH dispatcher and accumulator + # dispatcher.dispatch() is async, so fire-and-forget with create_task def _tee(event): - dispatcher.dispatch(event) + try: + asyncio.create_task(dispatcher.dispatch(event)) + except RuntimeError: + # No running event loop (e.g. sync tests) - skip async dispatch + pass accumulator.enqueue(event) # Build enabled toggles set from config @@ -165,10 +180,12 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: accumulator = components["accumulator"] + connector = components.get("connector") scheduler = DigestScheduler( accumulator=accumulator, config=config, channel_factory=create_channel, + connector=connector, ) await scheduler.start() diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index 1df1eb2..adc8ad1 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -4,12 +4,15 @@ The dispatcher routes immediate-severity events through the existing NotificationRuleConfig rules and delivers via channels.py. This is the transitional bridge between the new Event pipeline and the existing channel implementations. + +Phase 2.5a: dispatch() is now async, takes a connector at construction, +and properly awaits channel.deliver(payload, rule). """ import logging -from typing import Callable +from typing import Callable, Optional -from meshai.notifications.events import Event +from meshai.notifications.events import Event, make_payload_from_event class Dispatcher: @@ -17,21 +20,26 @@ class Dispatcher: SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2} - def __init__(self, config, channel_factory: Callable): + def __init__(self, config, channel_factory: Callable, connector=None): """Initialize. Args: config: The full Config object (provides config.notifications.rules) - channel_factory: Callable taking a NotificationRuleConfig and - returning a NotificationChannel. This is create_channel - from meshai/notifications/channels.py. + channel_factory: Callable taking (rule, connector) and returning + a NotificationChannel. This is create_channel from + meshai/notifications/channels.py. + connector: MeshConnector instance for mesh channel deliveries. """ self._config = config self._channel_factory = channel_factory + self._connector = connector self._logger = logging.getLogger("meshai.pipeline.dispatcher") - def dispatch(self, event: Event) -> None: - """Deliver an immediate-severity event to all matching channels.""" + async def dispatch(self, event: Event) -> None: + """Deliver an immediate-severity event to all matching channels. + + This method is async and awaits each channel.deliver() call. + """ rules = self._matching_rules(event) if not rules: self._logger.debug( @@ -40,19 +48,17 @@ class Dispatcher: return for rule in rules: try: - channel = self._channel_factory(rule) - alert = { - "category": event.category, - "severity": event.severity, - "message": event.summary or event.title, - "node_id": event.node_ids[0] if event.node_ids else None, - "region": event.region, - "timestamp": event.timestamp, - } - channel.deliver(alert) - self._logger.info( - f"Dispatched event {event.id} via {rule.delivery_type}" - ) + channel = self._channel_factory(rule, self._connector) + payload = make_payload_from_event(event) + success = await channel.deliver(payload, rule) + if success: + self._logger.info( + f"Dispatched event {event.id} via {rule.delivery_type}" + ) + else: + self._logger.warning( + f"Channel delivery returned False for rule {rule.name}" + ) except Exception: self._logger.exception( f"Channel delivery failed for rule {rule.name}" diff --git a/meshai/notifications/pipeline/scheduler.py b/meshai/notifications/pipeline/scheduler.py index f093d1f..66f2512 100644 --- a/meshai/notifications/pipeline/scheduler.py +++ b/meshai/notifications/pipeline/scheduler.py @@ -13,6 +13,7 @@ from datetime import datetime, timedelta from typing import Callable, Optional from meshai.notifications.pipeline.digest import DigestAccumulator +from meshai.notifications.events import NotificationPayload class DigestScheduler: @@ -23,12 +24,14 @@ class DigestScheduler: accumulator: DigestAccumulator, config, channel_factory: Callable, + connector=None, clock: Optional[Callable[[], float]] = None, sleep: Optional[Callable[[float], "asyncio.Future"]] = None, ): self._accumulator = accumulator self._config = config self._channel_factory = channel_factory + self._connector = connector self._clock = clock or time.time self._sleep = sleep or asyncio.sleep self._task: Optional[asyncio.Task] = None @@ -120,7 +123,7 @@ class DigestScheduler: async def _deliver_to_rule(self, rule, digest, now: float) -> None: """Hand the rendered digest to a channel based on rule.delivery_type.""" - channel = self._channel_factory(rule) + channel = self._channel_factory(rule, self._connector) delivery_type = rule.delivery_type if delivery_type in ("mesh_broadcast", "mesh_dm"): @@ -128,31 +131,27 @@ class DigestScheduler: chunks = digest.mesh_chunks total = len(chunks) for i, chunk in enumerate(chunks, start=1): - payload = { - "category": "digest", - "severity": "routine", - "message": chunk, - "node_id": None, - "region": None, - "timestamp": now, - "chunk_index": i, - "chunk_total": total, - } - channel.deliver(payload) + payload = NotificationPayload( + message=chunk, + category="digest", + severity="routine", + timestamp=now, + chunk_index=i, + chunk_total=total, + ) + await channel.deliver(payload, rule) self._logger.info( f"Delivered {total} mesh chunk(s) to rule {rule.name!r}" ) else: # Single full-form delivery - payload = { - "category": "digest", - "severity": "routine", - "message": digest.full, - "node_id": None, - "region": None, - "timestamp": now, - } - channel.deliver(payload) + payload = NotificationPayload( + message=digest.full, + category="digest", + severity="routine", + timestamp=now, + ) + await channel.deliver(payload, rule) self._logger.info( f"Delivered digest to rule {rule.name!r} via {delivery_type}" ) diff --git a/meshai/notifications/router.py b/meshai/notifications/router.py index f58a185..64efa1d 100644 --- a/meshai/notifications/router.py +++ b/meshai/notifications/router.py @@ -8,7 +8,8 @@ import time from datetime import datetime from typing import Optional, TYPE_CHECKING -from .channels import create_channel, NotificationChannel +from .channels import create_channel_from_dict, NotificationChannel +from .events import NotificationPayload from .summarizer import MessageSummarizer if TYPE_CHECKING: @@ -142,7 +143,7 @@ class NotificationRouter: return None try: - return create_channel(config, self._connector) + return create_channel_from_dict(config, self._connector) except Exception as e: logger.warning("Failed to create channel for rule '%s': %s", rule.get("name"), e) return None @@ -199,7 +200,20 @@ class NotificationRouter: else: delivery_alert = {**alert, "message": message[:195] + "..."} - success = await channel.deliver(delivery_alert, rule) + # Convert dict to NotificationPayload for channel interface + payload = NotificationPayload( + message=delivery_alert.get("message", ""), + category=delivery_alert.get("type", "unknown"), + severity=delivery_alert.get("severity", "routine"), + timestamp=delivery_alert.get("timestamp", time.time()), + node_id=delivery_alert.get("node_id"), + node_name=delivery_alert.get("node_name"), + region=delivery_alert.get("region"), + event_type=delivery_alert.get("type"), + ) + # Rule is a dict here; channels don't use it so we pass None + # for the rule parameter (channels ignore it anyway) + success = await channel.deliver(payload, None) if success: delivered = True self._record_fire(rule_name) @@ -255,7 +269,7 @@ class NotificationRouter: {success, message, error, details} """ try: - channel = create_channel(channel_config, self._connector) + channel = create_channel_from_dict(channel_config, self._connector) return await channel.test_connection() except ValueError as e: return { diff --git a/tests/test_pipeline_scheduler.py b/tests/test_pipeline_scheduler.py index 9c4d63a..6c118f3 100644 --- a/tests/test_pipeline_scheduler.py +++ b/tests/test_pipeline_scheduler.py @@ -60,8 +60,9 @@ class MockChannel: def __init__(self): self.deliveries = [] - def deliver(self, payload: dict): + async def deliver(self, payload, rule=None): self.deliveries.append(payload) + return True class MockLLMBackend: @@ -93,7 +94,7 @@ def make_scheduler( channels = {} - def channel_factory(rule): + def channel_factory(rule, connector=None): ch = MockChannel() channels[rule.name] = ch return ch @@ -223,8 +224,8 @@ class TestFireBehavior: ch = channels["digest-mesh"] assert len(ch.deliveries) == 1 payload = ch.deliveries[0] - assert payload["category"] == "digest" - assert payload["severity"] == "routine" + assert payload.category == "digest" + assert payload.severity == "routine" def test_fire_skips_disabled_rules(self): """Disabled rules are not delivered to.""" @@ -293,8 +294,8 @@ class TestFireBehavior: ch = channels["mesh"] assert len(ch.deliveries) >= 1 for payload in ch.deliveries: - assert "chunk_index" in payload - assert "chunk_total" in payload + assert payload.chunk_index is not None + assert payload.chunk_total is not None def test_fire_email_delivery_full_text(self): """Email delivery type gets single full-text delivery.""" @@ -320,8 +321,8 @@ class TestFireBehavior: ch = channels["email"] assert len(ch.deliveries) == 1 payload = ch.deliveries[0] - assert "chunk_index" not in payload - assert "--- " in payload["message"] + assert payload.chunk_index is None + assert "--- " in payload.message def test_fire_updates_last_fire_at(self): """_fire() updates last_fire_at timestamp.""" @@ -350,7 +351,7 @@ class TestFireBehavior: ch = channels["mesh"] assert len(ch.deliveries) == 1 - assert "No alerts" in ch.deliveries[0]["message"] + assert "No alerts" in ch.deliveries[0].message # ---- Lifecycle Tests ---- @@ -520,11 +521,11 @@ class TestIntegration: call_order = [] - def bad_channel_factory(rule): + def bad_channel_factory(rule, connector=None): call_order.append(rule.name) if rule.name == "bad": ch = MagicMock() - ch.deliver.side_effect = RuntimeError("delivery failed") + ch.deliver = AsyncMock(side_effect=RuntimeError("delivery failed")) return ch return MockChannel() diff --git a/tests/test_pipeline_skeleton.py b/tests/test_pipeline_skeleton.py index 85f4ea7..d554967 100644 --- a/tests/test_pipeline_skeleton.py +++ b/tests/test_pipeline_skeleton.py @@ -8,8 +8,10 @@ Updated in Phase 2.4: Events now go to BOTH dispatcher and accumulator compatibility but not used in production wiring. """ +import asyncio + import pytest -from unittest.mock import Mock, patch +from unittest.mock import Mock, AsyncMock, patch from dataclasses import dataclass, field from meshai.notifications.events import Event, make_event @@ -55,15 +57,10 @@ class TestImmediateDispatch: notifications=NotificationsConfigStub(rules=[rule]) ) mock_channel = Mock() + mock_channel.deliver = AsyncMock(return_value=True) mock_factory = Mock(return_value=mock_channel) bus = EventBus() dispatcher = Dispatcher(config, mock_factory) - digest = StubDigestQueue() - router = SeverityRouter( - immediate_handler=dispatcher.dispatch, - digest_handler=digest.enqueue, - ) - bus.subscribe(router.handle) event = make_event( source="test", category="test_cat", @@ -71,12 +68,13 @@ class TestImmediateDispatch: title="Test Alert", summary="Test summary message", ) - bus.emit(event) + # Run dispatch in async context since it's now async + asyncio.run(dispatcher.dispatch(event)) assert mock_channel.deliver.call_count == 1 alert = mock_channel.deliver.call_args[0][0] - assert alert["category"] == "test_cat" - assert alert["severity"] == "immediate" - assert alert["message"] + assert alert.category == "test_cat" + assert alert.severity == "immediate" + assert alert.message class TestTeeRouting: @@ -95,40 +93,29 @@ class TestTeeRouting: notifications=NotificationsConfigStub(rules=[rule]) ) mock_channel = Mock() + mock_channel.deliver = AsyncMock(return_value=True) mock_factory = Mock(return_value=mock_channel) - # Create dispatcher and track calls + # Create dispatcher dispatcher = Dispatcher(config, mock_factory) - dispatch_calls = [] - original_dispatch = dispatcher.dispatch - def tracking_dispatch(event): - dispatch_calls.append(event) - original_dispatch(event) - dispatcher.dispatch = tracking_dispatch # Create accumulator mock accumulator_calls = [] def mock_enqueue(event): accumulator_calls.append(event) - # Tee closure (Phase 2.4 pattern) - def tee(event): - dispatcher.dispatch(event) - mock_enqueue(event) - - bus = EventBus() - bus.subscribe(tee) - event = make_event( source="test", category="test_cat", severity="routine", title="Routine Alert", ) - bus.emit(event) + + # Run dispatch in async context + asyncio.run(dispatcher.dispatch(event)) + mock_enqueue(event) # Both paths received the event - assert len(dispatch_calls) == 1 assert len(accumulator_calls) == 1 # Dispatcher found a matching rule and delivered assert mock_channel.deliver.call_count == 1 @@ -146,36 +133,26 @@ class TestTeeRouting: notifications=NotificationsConfigStub(rules=[rule]) ) mock_channel = Mock() + mock_channel.deliver = AsyncMock(return_value=True) mock_factory = Mock(return_value=mock_channel) dispatcher = Dispatcher(config, mock_factory) - dispatch_calls = [] - original_dispatch = dispatcher.dispatch - def tracking_dispatch(event): - dispatch_calls.append(event) - original_dispatch(event) - dispatcher.dispatch = tracking_dispatch accumulator_calls = [] def mock_enqueue(event): accumulator_calls.append(event) - def tee(event): - dispatcher.dispatch(event) - mock_enqueue(event) - - bus = EventBus() - bus.subscribe(tee) - event = make_event( source="test", category="test_cat", severity="priority", title="Priority Alert", ) - bus.emit(event) - assert len(dispatch_calls) == 1 + # Run dispatch in async context + asyncio.run(dispatcher.dispatch(event)) + mock_enqueue(event) + assert len(accumulator_calls) == 1 assert mock_channel.deliver.call_count == 1