meshai/tests/test_pipeline_digest.py
Matt Johnson 60e8e62e85 fix(fire): v0.5.7-fire -- FIRMS NATS pattern + WFIGS tombstone dedup + remove fire_proximity + categories audit
Third family of the v0.5.7 NATS-and-categories campaign. Fire is the heaviest of the campaign -- four distinct fixes plus a category audit. Two of the four were broken in production: FIRMS subscribed to a syntactically invalid pattern, and WFIGS tombstones were silently dropped.

FIX 1 -- FIRMS NATS pattern (the canonical bug). Pre-v0.5.7-fire `_subjects_for("firms","us.id")` returned `["central.fire.hotspot.>.us.id"]`, which is INVALID NATS (the `>` multi-level wildcard is only legal at the tail token). It also wouldn't have matched anything Central publishes: per the Central v0.10.0 consumer integration guide §firms, the actual published pattern is `central.fire.hotspot.<satellite>.<confidence>` (5 tokens, no us.<state> suffix). The two slots after "hotspot" are satellite name and confidence band -- NOT tile coordinates or region tokens.

Note on prompt vs. guide discrepancy: the v0.5.7-fire task spec described a tile-coord/state pattern `central.fire.hotspot.*.*.us.id` (7 tokens with us.<state> tail). That's neither what Central v0.10.0 publishes nor what its guide documents. We follow the guide. Subscribing to the prompt's 7-token pattern would silently match zero messages in production (token-count mismatch). State filtering for FIRMS happens client-side via data.latitude / data.longitude against the configured region bbox.

New subscription: `central.fire.hotspot.>` -- tail-only `>`, NATS-legal, matches all <satellite>.<confidence> combinations.

FIX 2 -- WFIGS tombstone subjects. Per guide §wfigs_incidents and §wfigs_perimeters, WFIGS publishes:

    active:    central.fire.incident.<state>.<county>     (Convention A, depth-3 state)
    active:    central.fire.perimeter.<state>.<county>
    tombstone: central.fire.incident.removed.<state>     (5 tokens, "removed" at depth-3)
    tombstone: central.fire.perimeter.removed.<state>

Pre-v0.5.7-fire `_subjects_for("fires","us.id")` subscribed only to the active subjects (`central.fire.incident.id.>` and `central.fire.perimeter.id.>`). The tombstone subjects have "removed" at depth-3 instead of the state token, so the active-subject `>` filters silently dropped EVERY tombstone. Fall-off signals never reached meshai's inhibitor, so old incidents stayed "live" in the pipeline indefinitely.

Added the two tombstone subjects to the subscription list. Both are 5-token literals with no wildcards -- trivially NATS-legal.

FIX 3 -- WFIGS tombstone dedup. Per guide §wfigs_incidents removal semantics, the tombstone env_id has the shape `<IrwinID>:removed:<iso_now>` -- the `:removed:` is sandwiched in the middle, with a timestamp tail. Pre-v0.5.7-fire the consumer.py group_key recovery was `re.sub(r":removed$", "", group_key)` -- a literal trailing `:removed` match -- which DID NOT FIRE on the WFIGS form (the regex required `:removed` at the very end of the string, but the WFIGS form has `:<iso>` after it).

Consequence: WFIGS tombstones' group_key was the full `<IrwinID>:removed:<iso>` string instead of the bare `<IrwinID>`. The pipeline grouper/inhibitor never matched tombstones to their original incidents, so the lapse signal was lost.

Fixed by switching the regex to `re.sub(r":removed(:.*)?$", "", group_key)` -- handles both the WFIGS `<IrwinID>:removed:<iso>` form AND the legacy GDACS `<id>:removed` form. The `is_tombstone` detection also gained an explicit `":removed:" in env_id` check for the WFIGS shape.

Per the guide: "the same incident can have one or more removal tombstones over its lifecycle" (it can re-enter and re-fall-off). To preserve per-tombstone distinctness for downstream lifecycle accounting, the full env_id is stashed on `Event.data["_central_tombstone_id"]` (the group_key collapses to the IrwinID by design, but the original env_id with the :<iso> tail survives on data).

FIX 4 -- ALERT_CATEGORIES fire-family audit + removed parametric entries. Per Matt's direct feedback ("fire near mesh has its own set of parameters that I don't even know what they could be. like how far is near mesh? I don't know I can't set that."), the parametric `fire_proximity` and the duplicate-named `wildfire_proximity` (both labeled "Fire Near Mesh" with parametric radius-based descriptions) were unselectable in the new Advanced Rules UI. Removed both.

Cross-referenced what FIRMS and WFIGS actually emit (per the guide and the native adapter code) and audited the registry:

    Native emit:
      firms.py  -> new_ignition (when adapter flags new_ignition)
                or wildfire_hotspot (otherwise)  [v0.5.7-fire: was wildfire_proximity]
      fires.py  -> wildfire_incident
    Central path emit (via map_category):
      fire.hotspot.*    -> wildfire_hotspot
      fire.incident.*   -> wildfire_incident
      fire.perimeter.*  -> wildfire_incident (perimeters merge to the incident)
      fire.<other>      -> wildfire_incident (catchall)
    Registry after v0.5.7-fire:
      {new_ignition, wildfire_hotspot, wildfire_incident}
    Parity confirmed. No orphans, no missing.

Aligning firms.py to emit `wildfire_hotspot` (matching the central FIRMS map) means native + central FIRMS produce identical categories regardless of which feed path is enabled.

Composer (`_CATEGORY_EMOJI`, `_CATEGORY_LABEL`) and router (three source-attribution tables) updated to drop the removed categories and add the new ones.

Deferred to v0.5.8: distance_max_km field on rules for actual proximity filtering. Replaces the parametric fire_proximity registry entry with a parameterized rule field that the user CAN configure ("alert me about wildfire_incident within 30 km" instead of an opaque "Fire Near Mesh" toggle).

Tests
-----
PYTHONPATH=. pytest -q: 380 passed (was 366; +14 net).
  - tests/test_fire_v057.py (new): FIRMS subject is tail-only `>` with no mid-subject placement; WFIGS subjects cover active + four tombstones; WFIGS tombstone strips `:removed(:.*)?$` for group_key; two same-IrwinID tombstones both propagate through _handle and share group_key, with the original env_id preserved on data["_central_tombstone_id"]; legacy GDACS `:removed` shape still strips cleanly; fire_proximity / wildfire_proximity absent from ALERT_CATEGORIES; no "Fire Near Mesh" name duplicates; fire-family parity (native + central emit == registry); required-fields check on the three fire entries.
  - tests/test_central_region_routing.py: updated FIRMS test (tail-only `>`) and WFIGS test (includes tombstone subjects).
  - tests/test_pipeline_toggle_filter.py, tests/test_adapter_firms.py, tests/test_v052_dispatcher.py, tests/test_pipeline_digest.py: bulk-migrated obsolete category references (wildfire_proximity -> wildfire_hotspot, fire_proximity -> wildfire_incident) so the existing test suites continue to exercise the same routing/digest/dispatch paths with the new category names.

Safe-mode preserved (master off, all family toggles off, all adapters native, central disabled). No live toggle flipped. Not tagging yet -- v0.5.7 tag waits until all families ship.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 06:25:42 +00:00

523 lines
17 KiB
Python

"""Tests for Phase 2.4 DigestAccumulator with LLM summaries.
Updated from Phase 2.3a to reflect new behavior:
- No active/resolved tracking (just event log)
- LLM-summarized output per toggle
- render_digest is async
"""
import asyncio
import inspect
import time
from unittest.mock import MagicMock, AsyncMock, patch
import pytest
from meshai.notifications.events import make_event
from meshai.notifications.pipeline import (
build_pipeline_components,
DigestAccumulator,
Digest,
)
from meshai.notifications.categories import get_toggle, ALERT_CATEGORIES
from meshai.config import Config
# ============================================================
# MOCK LLM BACKEND
# ============================================================
class MockLLMBackend:
"""Mock LLM backend for testing."""
def __init__(self, response: str = "Mock summary of events."):
self.response = response
self.calls = []
async def generate(self, messages, system_prompt, max_tokens=200):
self.calls.append({
"messages": messages,
"system_prompt": system_prompt,
"max_tokens": max_tokens,
})
return self.response
class FailingLLMBackend:
"""Mock LLM that raises exceptions."""
async def generate(self, messages, system_prompt, max_tokens=200):
raise RuntimeError("LLM unavailable")
def _make_mock_backend():
"""Create a standard mock LLM backend for tests."""
mock = MagicMock()
mock.generate = AsyncMock(return_value="stub summary")
return mock
# ============================================================
# ACCUMULATOR EVENT LOGGING TESTS
# ============================================================
def test_enqueue_logs_event():
"""Enqueue adds event to the log."""
acc = DigestAccumulator()
event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Wind Advisory",
)
acc.enqueue(event)
assert acc.event_count() == 1
def test_enqueue_multiple_events_same_toggle():
"""Multiple events for same toggle all logged."""
acc = DigestAccumulator()
for i in range(3):
event = make_event(
source="test",
category="weather_warning",
severity="routine",
id=f"ev{i}",
title=f"Event {i}",
)
acc.enqueue(event)
assert acc.event_count() == 3
assert acc.event_count("weather") == 3
def test_enqueue_multiple_toggles():
"""Events across multiple toggles all logged."""
acc = DigestAccumulator()
acc.enqueue(make_event(
source="test",
category="weather_warning",
severity="routine",
title="Weather",
))
acc.enqueue(make_event(
source="test",
category="wildfire_hotspot",
severity="priority",
title="Fire",
))
acc.enqueue(make_event(
source="test",
category="battery_warning",
severity="immediate",
title="Mesh",
))
assert acc.event_count() == 3
assert acc.event_count("weather") == 1
assert acc.event_count("fire") == 1
assert acc.event_count("mesh_health") == 1
def test_enqueue_skips_excluded_toggles():
"""Events for non-included toggles are dropped."""
acc = DigestAccumulator(include_toggles=["weather"])
acc.enqueue(make_event(
source="test",
category="weather_warning",
severity="routine",
title="Weather",
))
acc.enqueue(make_event(
source="test",
category="wildfire_hotspot",
severity="routine",
title="Fire",
))
assert acc.event_count() == 1
assert acc.event_count("weather") == 1
assert acc.event_count("fire") == 0
def test_tick_is_noop():
"""tick() does nothing in Phase 2.4+."""
acc = DigestAccumulator()
acc.enqueue(make_event(
source="test",
category="weather_warning",
severity="routine",
title="Event",
))
result = acc.tick()
assert result == 0
assert acc.event_count() == 1
# ============================================================
# RENDER DIGEST TESTS
# ============================================================
def test_render_digest_is_async():
"""render_digest is an async coroutine function."""
assert inspect.iscoroutinefunction(DigestAccumulator.render_digest)
def test_render_digest_clears_event_log():
"""render_digest clears the event log after rendering."""
mock_llm = MockLLMBackend()
acc = DigestAccumulator(llm_backend=mock_llm)
acc.enqueue(make_event(
source="test",
category="weather_warning",
severity="routine",
title="Event",
))
assert acc.event_count() == 1
asyncio.run(acc.render_digest())
assert acc.event_count() == 0
def test_render_digest_sets_last_digest_at():
"""render_digest updates last_digest_at timestamp."""
mock_llm = MockLLMBackend()
acc = DigestAccumulator(llm_backend=mock_llm)
acc.enqueue(make_event(
source="test",
category="weather_warning",
severity="routine",
title="Event",
))
now = 1234567890.0
asyncio.run(acc.render_digest(now=now))
assert acc.last_digest_at() == now
def test_render_digest_empty_shows_no_alerts():
"""Empty accumulator produces 'No alerts' message."""
acc = DigestAccumulator()
digest = asyncio.run(acc.render_digest())
assert "No alerts since last digest" in digest.full
assert "No alerts since last digest" in digest.mesh_chunks[0]
# ============================================================
# LLM INTEGRATION TESTS
# ============================================================
def test_digest_calls_llm_once_per_non_empty_toggle():
"""LLM is called once per toggle that has events."""
mock_llm = MockLLMBackend(response="Summary for toggle.")
acc = DigestAccumulator(llm_backend=mock_llm)
# Add events to 3 different toggles
acc.enqueue(make_event(source="test", category="weather_warning",
severity="routine", title="Weather"))
acc.enqueue(make_event(source="test", category="wildfire_hotspot",
severity="routine", title="Fire"))
acc.enqueue(make_event(source="test", category="battery_warning",
severity="routine", title="Mesh"))
asyncio.run(acc.render_digest())
assert len(mock_llm.calls) == 3
def test_digest_line_uses_llm_output():
"""Digest lines contain the LLM's summary output."""
mock_llm = MockLLMBackend(response="Severe storms moving through the area.")
acc = DigestAccumulator(llm_backend=mock_llm)
acc.enqueue(make_event(
source="test",
category="weather_warning",
severity="priority",
title="Storm Warning",
))
digest = asyncio.run(acc.render_digest())
assert "[Weather] Severe storms moving through the area." in digest.full
assert "Severe storms moving through the area" in digest.mesh_compact
def test_digest_falls_back_to_count_when_llm_raises():
"""When LLM fails, fallback to count-based summary."""
failing_llm = FailingLLMBackend()
acc = DigestAccumulator(llm_backend=failing_llm)
acc.enqueue(make_event(source="test", category="battery_warning",
severity="routine", title="Event 1"))
acc.enqueue(make_event(source="test", category="battery_warning",
severity="routine", title="Event 2"))
acc.enqueue(make_event(source="test", category="battery_warning",
severity="routine", title="Event 3"))
digest = asyncio.run(acc.render_digest())
assert "[Mesh]" in digest.full
assert "3 event(s)" in digest.full
assert "LLM unavailable" in digest.full
def test_digest_falls_back_when_no_llm():
"""When no LLM backend, fallback to count-based summary."""
acc = DigestAccumulator(llm_backend=None)
acc.enqueue(make_event(source="test", category="weather_warning",
severity="routine", title="Event"))
digest = asyncio.run(acc.render_digest())
assert "[Weather]" in digest.full
assert "1 event(s)" in digest.full
def test_digest_input_orders_by_severity_then_time():
"""LLM input lists events by severity (immediate first) then timestamp."""
mock_llm = MockLLMBackend()
acc = DigestAccumulator(llm_backend=mock_llm)
# Enqueue in wrong order: routine, then immediate, then priority
acc.enqueue(make_event(source="test", category="weather_warning",
severity="routine", title="Routine Event",
timestamp=10.0))
acc.enqueue(make_event(source="test", category="weather_warning",
severity="immediate", title="Immediate Event",
timestamp=20.0))
acc.enqueue(make_event(source="test", category="weather_warning",
severity="priority", title="Priority Event",
timestamp=30.0))
asyncio.run(acc.render_digest())
# Check the LLM input
assert len(mock_llm.calls) == 1
user_content = mock_llm.calls[0]["messages"][0]["content"]
# Find positions of each event in the input
immediate_pos = user_content.find("IMMEDIATE")
priority_pos = user_content.find("PRIORITY")
routine_pos = user_content.find("ROUTINE")
assert immediate_pos < priority_pos, "Immediate should appear before priority"
assert priority_pos < routine_pos, "Priority should appear before routine"
# ============================================================
# MESH CHUNKS TESTS
# ============================================================
def test_mesh_chunks_single_chunk_when_short():
"""Single short summary produces one chunk without counter."""
mock_llm = MockLLMBackend(response="Brief summary.")
acc = DigestAccumulator(llm_backend=mock_llm)
acc.enqueue(make_event(source="test", category="weather_warning",
severity="routine", title="Event"))
digest = asyncio.run(acc.render_digest())
assert len(digest.mesh_chunks) == 1
assert digest.mesh_chunks[0].startswith("DIGEST ")
assert "(1/" not in digest.mesh_chunks[0]
def test_mesh_chunks_under_char_limit():
"""Each mesh chunk is <= 200 characters."""
mock_llm = MockLLMBackend(response="Summary of events for this category.")
acc = DigestAccumulator(llm_backend=mock_llm)
# Add events to multiple toggles
for cat in ["weather_warning", "wildfire_hotspot", "battery_warning",
"road_closure", "avalanche_warning"]:
acc.enqueue(make_event(source="test", category=cat,
severity="routine", title="Event"))
digest = asyncio.run(acc.render_digest())
for chunk in digest.mesh_chunks:
assert len(chunk) <= 210, f"Chunk exceeds limit: {len(chunk)} chars"
def test_mesh_chunks_splits_when_many_toggles():
"""Many toggle summaries split into multiple chunks."""
# Longer summaries to force splitting
mock_llm = MockLLMBackend(
response="A fairly detailed summary of the events in this category."
)
acc = DigestAccumulator(llm_backend=mock_llm, mesh_char_limit=150)
# Add events to multiple toggles
for cat in ["weather_warning", "wildfire_hotspot", "battery_warning",
"road_closure", "avalanche_warning"]:
acc.enqueue(make_event(source="test", category=cat,
severity="routine", title="Event"))
digest = asyncio.run(acc.render_digest())
assert len(digest.mesh_chunks) >= 2
# Check chunk counters
total = len(digest.mesh_chunks)
for i, chunk in enumerate(digest.mesh_chunks):
assert f"({i+1}/{total})" in chunk
def test_mesh_chunks_empty_is_single_chunk():
"""Empty digest produces single chunk."""
acc = DigestAccumulator()
digest = asyncio.run(acc.render_digest())
assert len(digest.mesh_chunks) == 1
assert "No alerts since last digest" in digest.mesh_chunks[0]
assert "(1/" not in digest.mesh_chunks[0]
def test_mesh_compact_joins_chunks():
"""mesh_compact joins chunks with separator when multiple."""
mock_llm = MockLLMBackend(response="Summary of events.")
acc = DigestAccumulator(llm_backend=mock_llm, mesh_char_limit=100)
for cat in ["weather_warning", "wildfire_hotspot", "battery_warning",
"road_closure"]:
acc.enqueue(make_event(source="test", category=cat,
severity="routine", title="Event"))
digest = asyncio.run(acc.render_digest())
if len(digest.mesh_chunks) > 1:
expected = "\n---\n".join(digest.mesh_chunks)
assert digest.mesh_compact == expected
else:
assert digest.mesh_compact == digest.mesh_chunks[0]
# ============================================================
# INCLUDE TOGGLES TESTS
# ============================================================
def test_rf_propagation_excluded_by_default():
"""rf_propagation toggle is excluded by default."""
acc = DigestAccumulator()
# Find an rf_propagation category
rf_category = None
for cat_id, cat_info in ALERT_CATEGORIES.items():
if cat_info.get("toggle") == "rf_propagation":
rf_category = cat_id
break
if rf_category:
acc.enqueue(make_event(source="test", category=rf_category,
severity="routine", title="RF Event"))
assert acc.event_count() == 0
def test_include_toggles_overrides_default():
"""include_toggles parameter controls which toggles are tracked."""
mock_llm = MockLLMBackend()
# Find an rf_propagation category
rf_category = None
for cat_id, cat_info in ALERT_CATEGORIES.items():
if cat_info.get("toggle") == "rf_propagation":
rf_category = cat_id
break
acc = DigestAccumulator(
llm_backend=mock_llm,
include_toggles=["rf_propagation", "weather"]
)
if rf_category:
acc.enqueue(make_event(source="test", category=rf_category,
severity="routine", title="RF Event"))
acc.enqueue(make_event(source="test", category="wildfire_hotspot",
severity="routine", title="Fire Event"))
# RF should be kept (in include list), fire should be dropped
expected_count = 1 if rf_category else 0
assert acc.event_count() == expected_count
def test_include_toggles_unknown_name_accepted():
"""Unknown toggle names don't crash."""
acc = DigestAccumulator(include_toggles=["weather", "future_toggle"])
acc.enqueue(make_event(source="test", category="weather_warning",
severity="routine", title="Event"))
assert acc.event_count() == 1
# ============================================================
# TOGGLE ORDER TESTS
# ============================================================
def test_digest_orders_toggles_correctly():
"""Toggle lines appear in TOGGLE_ORDER sequence."""
mock_llm = MockLLMBackend(response="Summary.")
acc = DigestAccumulator(llm_backend=mock_llm)
# Add events in wrong order
acc.enqueue(make_event(source="test", category="battery_warning",
severity="routine", title="Mesh"))
acc.enqueue(make_event(source="test", category="wildfire_hotspot",
severity="routine", title="Fire"))
acc.enqueue(make_event(source="test", category="weather_warning",
severity="routine", title="Weather"))
digest = asyncio.run(acc.render_digest())
# Check order in full output: weather, fire, ..., mesh_health
weather_pos = digest.full.find("[Weather]")
fire_pos = digest.full.find("[Fire]")
mesh_pos = digest.full.find("[Mesh]")
assert weather_pos < fire_pos, "Weather should appear before Fire"
assert fire_pos < mesh_pos, "Fire should appear before Mesh"
# ============================================================
# PIPELINE INTEGRATION TESTS
# ============================================================
def test_pipeline_routes_event_to_accumulator():
"""Events via bus.emit end up in DigestAccumulator."""
config = Config()
bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = \
build_pipeline_components(config, _make_mock_backend())
event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Test event",
)
# Flush through grouper
grouper.flush_all()
bus.emit(event)
grouper.flush_all()
assert accumulator.event_count() == 1
def test_pipeline_routes_immediate_to_both():
"""Immediate events go to both dispatcher and accumulator in Phase 2.4."""
config = Config()
bus, inhibitor, grouper, toggle_filter, dispatcher, accumulator = \
build_pipeline_components(config, _make_mock_backend())
event = make_event(
source="test",
category="weather_warning",
severity="immediate",
title="Immediate event",
)
grouper.flush_all()
bus.emit(event)
grouper.flush_all()
# In Phase 2.4, all events go to accumulator
assert accumulator.event_count() == 1