From a4cb29002dbbcb79a79612e732f1460bb046be2f Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Fri, 15 May 2026 03:08:31 +0000 Subject: [PATCH] fix(notifications): inject llm_backend into build_pipeline build_pipeline previously constructed its own LLMBackend from config.llm, which: - duplicated main.py's already-running backend instance - failed to inherit env-loaded LLM_API_KEY when called from short-lived scripts (eyeball checks, tests), forcing fallback - prevented pipeline components from sharing the live backend build_pipeline and build_pipeline_components now require an llm_backend parameter. main.py passes the same instance it constructed for its primary responder. Tests pass mocks. The digest accumulator now uses the live, authenticated backend. Added test_build_pipeline_uses_provided_backend to lock in the injection contract. Co-Authored-By: Claude Opus 4.5 --- meshai/notifications/pipeline/__init__.py | 50 ++++++++--------------- tests/test_pipeline_digest.py | 11 ++++- tests/test_pipeline_toggle_filter.py | 26 +++++++++--- 3 files changed, 47 insertions(+), 40 deletions(-) diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index b606942..32a50b6 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -12,7 +12,7 @@ Phase 2.4: Usage: from meshai.notifications.pipeline import build_pipeline, start_pipeline, stop_pipeline - bus = build_pipeline(config) + bus = build_pipeline(config, llm_backend) # llm_backend from main.py bus.emit(event) # Async lifecycle @@ -35,40 +35,20 @@ from meshai.notifications.pipeline.digest import DigestAccumulator, Digest from meshai.notifications.pipeline.scheduler import DigestScheduler -def _create_llm_backend(config): - """Create an LLM backend from config, or return None if unavailable.""" - try: - from meshai.backends import OpenAIBackend, AnthropicBackend, GoogleBackend - - api_key = config.resolve_api_key() - if not api_key: - return None - - backend_name = config.llm.backend.lower() - # Use minimal memory settings for digest summaries - if backend_name == "openai": - return OpenAIBackend(config.llm, api_key, 0, 0) - elif backend_name == "anthropic": - return AnthropicBackend(config.llm, api_key, 0, 0) - elif backend_name == "google": - return GoogleBackend(config.llm, api_key, 0, 0) - else: - return OpenAIBackend(config.llm, api_key, 0, 0) - except Exception: - return None - - -def build_pipeline(config) -> EventBus: +def build_pipeline(config, llm_backend) -> EventBus: """Build the pipeline and return the EventBus. + Args: + config: Full Config object. + llm_backend: An already-constructed LLMBackend instance + (from main.py or a test). Pipeline components share + this single instance. May be None for fallback behavior. + Components are stashed on bus._pipeline_components for lifecycle use. """ bus = EventBus() dispatcher = Dispatcher(config, create_channel) - # Build LLM backend for digest summarization - llm_backend = _create_llm_backend(config) - # Build include_toggles from config digest_cfg = getattr(config.notifications, "digest", None) include_toggles = None @@ -116,17 +96,21 @@ def build_pipeline(config) -> EventBus: return bus -def build_pipeline_components(config) -> tuple: +def build_pipeline_components(config, llm_backend) -> tuple: """Like build_pipeline, but returns all components for tests. - Returns (bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator). + Args: + config: Full Config object. + llm_backend: An already-constructed LLMBackend instance + (from main.py or a test). Pipeline components share + this single instance. May be None for fallback behavior. + + Returns: + (bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator). """ bus = EventBus() dispatcher = Dispatcher(config, create_channel) - # Build LLM backend for digest summarization - llm_backend = _create_llm_backend(config) - # Build include_toggles from config digest_cfg = getattr(config.notifications, "digest", None) include_toggles = None diff --git a/tests/test_pipeline_digest.py b/tests/test_pipeline_digest.py index bbf8289..0ec26b5 100644 --- a/tests/test_pipeline_digest.py +++ b/tests/test_pipeline_digest.py @@ -50,6 +50,13 @@ class FailingLLMBackend: raise RuntimeError("LLM unavailable") +def _make_mock_backend(): + """Create a standard mock LLM backend for tests.""" + mock = MagicMock() + mock.generate = AsyncMock(return_value="stub summary") + return mock + + # ============================================================ # ACCUMULATOR EVENT LOGGING TESTS # ============================================================ @@ -478,7 +485,7 @@ def test_pipeline_routes_event_to_accumulator(): """Events via bus.emit end up in DigestAccumulator.""" config = Config() bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = \ - build_pipeline_components(config) + build_pipeline_components(config, _make_mock_backend()) event = make_event( source="test", @@ -499,7 +506,7 @@ def test_pipeline_routes_immediate_to_both(): """Immediate events go to both dispatcher and accumulator in Phase 2.4.""" config = Config() bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = \ - build_pipeline_components(config) + build_pipeline_components(config, _make_mock_backend()) event = make_event( source="test", diff --git a/tests/test_pipeline_toggle_filter.py b/tests/test_pipeline_toggle_filter.py index 4074798..9afa93e 100644 --- a/tests/test_pipeline_toggle_filter.py +++ b/tests/test_pipeline_toggle_filter.py @@ -1,6 +1,7 @@ """Tests for ToggleFilter (Phase 2.4).""" import pytest +from unittest.mock import MagicMock, AsyncMock from meshai.notifications.events import make_event from meshai.notifications.pipeline.toggle_filter import ToggleFilter @@ -101,16 +102,31 @@ class TestToggleFilterPipelineWiring: def test_toggle_filter_pipeline_drops_disabled_toggle(self): """Events for disabled toggles don't reach dispatcher or accumulator.""" - # Create config with only weather enabled config = Config() - # We'll check by using build_pipeline_components and inspecting - # In Phase 2.4, build_pipeline_components returns toggle_filter + + # Pass mock LLM backend + mock_backend = MagicMock() + mock_backend.generate = AsyncMock(return_value="stub summary") # Note: without toggles.enabled set, filter is a no-op # This test verifies the wiring is correct - bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = \ - build_pipeline_components(config) + bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, mock_backend) # Verify toggle_filter is in the chain assert toggle_filter is not None assert hasattr(toggle_filter, 'handle') + + def test_build_pipeline_uses_provided_backend(self): + """build_pipeline_components uses the provided llm_backend.""" + config = Config() + + # Sentinel backend with unique attribute + sentinel = MagicMock() + sentinel.unique_marker = "I_AM_THE_SENTINEL" + sentinel.generate = AsyncMock(return_value="sentinel summary") + + bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = build_pipeline_components(config, sentinel) + + # Accumulator should have the exact sentinel instance + assert accumulator._llm is sentinel + assert accumulator._llm.unique_marker == "I_AM_THE_SENTINEL"