meshai/tests/test_v052_dispatcher.py
Matt Johnson 60e8e62e85 fix(fire): v0.5.7-fire -- FIRMS NATS pattern + WFIGS tombstone dedup + remove fire_proximity + categories audit
Third family of the v0.5.7 NATS-and-categories campaign. Fire is the heaviest of the campaign -- four distinct fixes plus a category audit. Two of the four were broken in production: FIRMS subscribed to a syntactically invalid pattern, and WFIGS tombstones were silently dropped.

FIX 1 -- FIRMS NATS pattern (the canonical bug). Pre-v0.5.7-fire `_subjects_for("firms","us.id")` returned `["central.fire.hotspot.>.us.id"]`, which is INVALID NATS (the `>` multi-level wildcard is only legal at the tail token). It also wouldn't have matched anything Central publishes: per the Central v0.10.0 consumer integration guide §firms, the actual published pattern is `central.fire.hotspot.<satellite>.<confidence>` (5 tokens, no us.<state> suffix). The two slots after "hotspot" are satellite name and confidence band -- NOT tile coordinates or region tokens.

Note on prompt vs. guide discrepancy: the v0.5.7-fire task spec described a tile-coord/state pattern `central.fire.hotspot.*.*.us.id` (7 tokens with us.<state> tail). That's neither what Central v0.10.0 publishes nor what its guide documents. We follow the guide. Subscribing to the prompt's 7-token pattern would silently match zero messages in production (token-count mismatch). State filtering for FIRMS happens client-side via data.latitude / data.longitude against the configured region bbox.

New subscription: `central.fire.hotspot.>` -- tail-only `>`, NATS-legal, matches all <satellite>.<confidence> combinations.

FIX 2 -- WFIGS tombstone subjects. Per guide §wfigs_incidents and §wfigs_perimeters, WFIGS publishes:

    active:    central.fire.incident.<state>.<county>     (Convention A, depth-3 state)
    active:    central.fire.perimeter.<state>.<county>
    tombstone: central.fire.incident.removed.<state>     (5 tokens, "removed" at depth-3)
    tombstone: central.fire.perimeter.removed.<state>

Pre-v0.5.7-fire `_subjects_for("fires","us.id")` subscribed only to the active subjects (`central.fire.incident.id.>` and `central.fire.perimeter.id.>`). The tombstone subjects have "removed" at depth-3 instead of the state token, so the active-subject `>` filters silently dropped EVERY tombstone. Fall-off signals never reached meshai's inhibitor, so old incidents stayed "live" in the pipeline indefinitely.

Added the two tombstone subjects to the subscription list. Both are 5-token literals with no wildcards -- trivially NATS-legal.

FIX 3 -- WFIGS tombstone dedup. Per guide §wfigs_incidents removal semantics, the tombstone env_id has the shape `<IrwinID>:removed:<iso_now>` -- the `:removed:` is sandwiched in the middle, with a timestamp tail. Pre-v0.5.7-fire the consumer.py group_key recovery was `re.sub(r":removed$", "", group_key)` -- a literal trailing `:removed` match -- which DID NOT FIRE on the WFIGS form (the regex required `:removed` at the very end of the string, but the WFIGS form has `:<iso>` after it).

Consequence: WFIGS tombstones' group_key was the full `<IrwinID>:removed:<iso>` string instead of the bare `<IrwinID>`. The pipeline grouper/inhibitor never matched tombstones to their original incidents, so the lapse signal was lost.

Fixed by switching the regex to `re.sub(r":removed(:.*)?$", "", group_key)` -- handles both the WFIGS `<IrwinID>:removed:<iso>` form AND the legacy GDACS `<id>:removed` form. The `is_tombstone` detection also gained an explicit `":removed:" in env_id` check for the WFIGS shape.

Per the guide: "the same incident can have one or more removal tombstones over its lifecycle" (it can re-enter and re-fall-off). To preserve per-tombstone distinctness for downstream lifecycle accounting, the full env_id is stashed on `Event.data["_central_tombstone_id"]` (the group_key collapses to the IrwinID by design, but the original env_id with the :<iso> tail survives on data).

FIX 4 -- ALERT_CATEGORIES fire-family audit + removed parametric entries. Per Matt's direct feedback ("fire near mesh has its own set of parameters that I don't even know what they could be. like how far is near mesh? I don't know I can't set that."), the parametric `fire_proximity` and the duplicate-named `wildfire_proximity` (both labeled "Fire Near Mesh" with parametric radius-based descriptions) were unselectable in the new Advanced Rules UI. Removed both.

Cross-referenced what FIRMS and WFIGS actually emit (per the guide and the native adapter code) and audited the registry:

    Native emit:
      firms.py  -> new_ignition (when adapter flags new_ignition)
                or wildfire_hotspot (otherwise)  [v0.5.7-fire: was wildfire_proximity]
      fires.py  -> wildfire_incident
    Central path emit (via map_category):
      fire.hotspot.*    -> wildfire_hotspot
      fire.incident.*   -> wildfire_incident
      fire.perimeter.*  -> wildfire_incident (perimeters merge to the incident)
      fire.<other>      -> wildfire_incident (catchall)
    Registry after v0.5.7-fire:
      {new_ignition, wildfire_hotspot, wildfire_incident}
    Parity confirmed. No orphans, no missing.

Aligning firms.py to emit `wildfire_hotspot` (matching the central FIRMS map) means native + central FIRMS produce identical categories regardless of which feed path is enabled.

Composer (`_CATEGORY_EMOJI`, `_CATEGORY_LABEL`) and router (three source-attribution tables) updated to drop the removed categories and add the new ones.

Deferred to v0.5.8: distance_max_km field on rules for actual proximity filtering. Replaces the parametric fire_proximity registry entry with a parameterized rule field that the user CAN configure ("alert me about wildfire_incident within 30 km" instead of an opaque "Fire Near Mesh" toggle).

Tests
-----
PYTHONPATH=. pytest -q: 380 passed (was 366; +14 net).
  - tests/test_fire_v057.py (new): FIRMS subject is tail-only `>` with no mid-subject placement; WFIGS subjects cover active + four tombstones; WFIGS tombstone strips `:removed(:.*)?$` for group_key; two same-IrwinID tombstones both propagate through _handle and share group_key, with the original env_id preserved on data["_central_tombstone_id"]; legacy GDACS `:removed` shape still strips cleanly; fire_proximity / wildfire_proximity absent from ALERT_CATEGORIES; no "Fire Near Mesh" name duplicates; fire-family parity (native + central emit == registry); required-fields check on the three fire entries.
  - tests/test_central_region_routing.py: updated FIRMS test (tail-only `>`) and WFIGS test (includes tombstone subjects).
  - tests/test_pipeline_toggle_filter.py, tests/test_adapter_firms.py, tests/test_v052_dispatcher.py, tests/test_pipeline_digest.py: bulk-migrated obsolete category references (wildfire_proximity -> wildfire_hotspot, fire_proximity -> wildfire_incident) so the existing test suites continue to exercise the same routing/digest/dispatch paths with the new category names.

Safe-mode preserved (master off, all family toggles off, all adapters native, central disabled). No live toggle flipped. Not tagging yet -- v0.5.7 tag waits until all families ship.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 06:25:42 +00:00

335 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""v0.5.2 — staleness filter, cooldown, dedup, friendly renderer, hydro family.
Spec: docs/v0.5.2-spec-cooldown-and-staleness.md (Sections 15).
Eight tests per spec Verification §C plus a couple of guards on
counter increments / stats exposure. We intentionally exercise both the
unit (`compose_mesh_message`) and the integration (dispatcher hands the
composed string into the channel payload).
"""
import asyncio
import time
import pytest
from meshai.config import Config, NotificationRuleConfig
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.events import make_event
from meshai.notifications.renderers.composer import (
compose_mesh_message,
_BYTE_BUDGET,
)
# ---------------------------------------------------------------- helpers
class RecChannel:
"""Channel recorder that captures rule + full payload (including .message)."""
def __init__(self, rec):
self.rec = rec
async def deliver(self, payload, rule):
self.rec.append({
"delivery_type": rule.delivery_type,
"name": rule.name,
"message": payload.message,
"category": payload.category,
"severity": payload.severity,
})
return True
def _make_dispatcher(cfg):
rec: list = []
d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None)
return d, rec
def _dispatch_one(cfg, event):
d, rec = _make_dispatcher(cfg)
asyncio.run(d.dispatch(event))
return d, rec
def _cfg(toggle_name="weather", **kw):
"""Default config: one toggle enabled with mesh_broadcast on priority."""
cfg = Config()
cfg.notifications.rules = []
t = cfg.notifications.toggles[toggle_name]
t.enabled = True
t.min_severity = kw.get("min_severity", "routine")
t.regions = kw.get("regions", [])
t.severity_channels = kw.get("severity_channels", {
"routine": ["mesh_broadcast"],
"priority": ["mesh_broadcast"],
"immediate": ["mesh_broadcast"],
})
# v0.5.2 fields — tests override per-case as needed
t.freshness_seconds = kw.get("freshness_seconds", 600)
t.cooldown_seconds = kw.get("cooldown_seconds", 300)
return cfg
def _ev(severity="priority", category="weather_warning",
timestamp=None, region=None, source="nws", title="t", **kw):
"""Build an Event. timestamp=None means "now" via make_event auto-set."""
extra = dict(kw)
if timestamp is not None:
extra["timestamp"] = timestamp
return make_event(
source=source, category=category, severity=severity,
region=region, title=title, **extra,
)
# ============================================================== Section 1
# Staleness filter
def test_staleness_drops_old_central_event():
"""Spec §1: event with timestamp = now - 7200s must be dropped at entrance."""
cfg = _cfg(freshness_seconds=600)
stale = _ev(timestamp=time.time() - 7200)
d, rec = _dispatch_one(cfg, stale)
assert rec == [], "stale event must not be dispatched"
assert d.dispatch_stats()["stale_dropped"] == 1
def test_staleness_passes_fresh_event():
"""Spec §1: a fresh event (now) flows through normally."""
cfg = _cfg(freshness_seconds=600)
fresh = _ev(timestamp=time.time()) # 0s old
d, rec = _dispatch_one(cfg, fresh)
assert len(rec) == 1
assert d.dispatch_stats()["stale_dropped"] == 0
def test_staleness_applies_to_immediate_severity():
"""Spec §1 note: stale-immediate also drops — recipient saw it elsewhere."""
cfg = _cfg(freshness_seconds=600)
stale_imm = _ev(severity="immediate", timestamp=time.time() - 3600)
d, rec = _dispatch_one(cfg, stale_imm)
assert rec == []
assert d.dispatch_stats()["stale_dropped"] == 1
# ============================================================== Section 2
# Per-toggle cooldown
def test_cooldown_throttles_same_category_region():
"""Spec §2: two events with same (toggle, category, region) within window
→ only the first fires; second is silently throttled."""
cfg = _cfg(cooldown_seconds=300)
d, rec = _make_dispatcher(cfg)
e1 = _ev(region="Magic Valley")
e2 = _ev(region="Magic Valley")
asyncio.run(d.dispatch(e1))
asyncio.run(d.dispatch(e2))
assert len(rec) == 1, "second event in cooldown window must be dropped"
assert d.dispatch_stats()["cooldown_dropped"] == 1
def test_cooldown_releases_after_window():
"""Spec §2: cooldown_seconds=0 disables throttling → both fire."""
cfg = _cfg(cooldown_seconds=0)
d, rec = _make_dispatcher(cfg)
# Different event IDs (so dedup doesn't catch us) — vary group_key.
asyncio.run(d.dispatch(_ev(group_key="a")))
asyncio.run(d.dispatch(_ev(group_key="b")))
assert len(rec) == 2, "cooldown_seconds=0 must allow both"
def test_cooldown_different_region_not_throttled():
"""Spec §2: cooldown is keyed on region — different regions don't share."""
cfg = _cfg(cooldown_seconds=300)
d, rec = _make_dispatcher(cfg)
asyncio.run(d.dispatch(_ev(region="Magic Valley", group_key="mv")))
asyncio.run(d.dispatch(_ev(region="Wood River", group_key="wr")))
assert len(rec) == 2
assert d.dispatch_stats()["cooldown_dropped"] == 0
# ============================================================== Section 3
# (source, event.id) dedup
def test_dedup_catches_identical_source_event_id():
"""Spec §3: same (source, id) on consecutive deliveries — second dropped.
Uses two events constructed with the same identity (no group_key)."""
cfg = _cfg(cooldown_seconds=0) # disable cooldown so only dedup can drop
d, rec = _make_dispatcher(cfg)
e1 = _ev()
e2 = _ev()
# make_event auto-computes the same id for identical source+category+geo
assert e1.id == e2.id, "preflight: ids must match for this test"
asyncio.run(d.dispatch(e1))
asyncio.run(d.dispatch(e2))
assert len(rec) == 1
assert d.dispatch_stats()["dedup_dropped"] == 1
def test_dedup_lru_eviction_under_load():
"""Spec §3: bounded LRU at 10k entries — distinct ids don't crash, and
after 10k+ entries the size stabilizes. We assert just the cap behavior
using a private constant so we don't churn 10k events in the test."""
from meshai.notifications.pipeline import dispatcher as disp_mod
cfg = _cfg(cooldown_seconds=0, freshness_seconds=0) # disable both guards
d, rec = _make_dispatcher(cfg)
cap = disp_mod._DEDUP_LRU_MAX
# Fire cap + 5 distinct events; the LRU should hold exactly cap.
for i in range(cap + 5):
asyncio.run(d.dispatch(_ev(group_key=f"k{i}")))
assert d.dispatch_stats()["dedup_lru_size"] == cap
# ============================================================== Section 4
# Friendly renderer
def test_renderer_produces_friendly_string():
"""Spec §4: compose_mesh_message yields a string with severity emoji +
UPPERCASE label + primary identifier + severity word; ≤150 bytes UTF-8."""
e = make_event(
source="nws", category="weather_warning", severity="priority",
title="Red Flag Warning", region="Twin Falls", timestamp=time.time(),
)
s = compose_mesh_message(e)
assert "" in s and "WX" in s
assert "Red Flag Warning" in s
assert "priority" in s
assert len(s.encode("utf-8")) <= _BYTE_BUDGET
def test_renderer_byte_budget_drops_optional_segments():
"""Spec §4: when over budget, optional segments drop FIRST (context, then
distance, then quant, then region). Required segments (head + primary +
severity) always survive."""
big_title = "A" * 200
e = make_event(
source="nws", category="wildfire_incident", severity="immediate",
title=big_title, region="Wood River Valley",
timestamp=time.time(),
data={
"acres": 1500,
"containment_pct": 25,
"cause": "lightning",
"distance_km": 8,
"bearing": "W",
"anchor": "Hailey",
},
)
s = compose_mesh_message(e)
assert len(s.encode("utf-8")) <= _BYTE_BUDGET
# Head + severity word still present:
assert s.startswith("🔥 FIRE:")
assert "immediate" in s
# Lowest-priority optional (context) must have been dropped:
assert "% contained" not in s
assert "lightning" not in s
def test_renderer_never_mid_character_truncation():
"""The composer must never emit a UTF-8 byte sequence that splits a
codepoint. Even with required-only over budget, we drop wholesale or
shrink by codepoints + ellipsis."""
# All four-byte emoji glyphs in a row, primary forced super long.
e = make_event(
source="nws", category="wildfire_hotspot", severity="priority",
title="🔥" * 200, # 800 bytes of emoji
timestamp=time.time(),
)
s = compose_mesh_message(e)
# Must be valid UTF-8 (no UnicodeDecodeError on round-trip).
s.encode("utf-8").decode("utf-8")
assert len(s.encode("utf-8")) <= _BYTE_BUDGET
def test_renderer_no_debug_fallback_for_central_prefixed_category():
"""Regression — the prod incident: central.<category> event with empty
title/summary must NOT yield `[Family] central.category` debug format."""
e = make_event(
source="central", category="central.weather_warning",
severity="priority",
title="", # explicitly empty
timestamp=time.time(),
)
s = compose_mesh_message(e)
assert "central.weather_warning" not in s
# Must still carry a meaningful label even though category is unrecognized.
assert any(c.isupper() for c in s)
def test_renderer_message_lands_in_toggle_payload():
"""Integration: composer output must reach the channel as payload.message."""
cfg = _cfg(cooldown_seconds=0, freshness_seconds=0)
e = _ev(title="Red Flag Warning", region="Twin Falls")
_, rec = _dispatch_one(cfg, e)
assert len(rec) == 1
msg = rec[0]["message"]
assert "Red Flag Warning" in msg
assert "" in msg # weather_warning emoji
assert len(msg.encode("utf-8")) <= _BYTE_BUDGET
# ============================================================== Section 5
# Hydro family routing
def test_hydro_event_maps_to_geohazards_toggle():
"""Spec §5: stream_flood_warning + stream_high_water route to the
canonical Geohazards toggle (`seismic` in VALID_TOGGLES). Weather
toggle alone must NOT fire on them anymore."""
cfg = Config()
cfg.notifications.rules = []
# Enable BOTH weather and seismic toggles so we can prove routing.
cfg.notifications.toggles["weather"].enabled = True
cfg.notifications.toggles["weather"].min_severity = "routine"
cfg.notifications.toggles["weather"].severity_channels = {
"routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"],
}
cfg.notifications.toggles["weather"].cooldown_seconds = 0
cfg.notifications.toggles["seismic"].enabled = True
cfg.notifications.toggles["seismic"].min_severity = "routine"
cfg.notifications.toggles["seismic"].severity_channels = {
"routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"],
}
cfg.notifications.toggles["seismic"].cooldown_seconds = 0
e = make_event(
source="usgs", category="stream_flood_warning", severity="priority",
title="Snake River nr Twin Falls 12.8 ft", timestamp=time.time(),
)
_, rec = _dispatch_one(cfg, e)
names = {r["name"] for r in rec}
assert "toggle:seismic" in names, "hydro must route to seismic family"
assert "toggle:weather" not in names, "hydro must NOT route to weather"
def test_hydro_high_water_also_seismic():
"""Same as above for stream_high_water (the lower-severity sibling)."""
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.toggles["seismic"].enabled = True
cfg.notifications.toggles["seismic"].min_severity = "routine"
cfg.notifications.toggles["seismic"].severity_channels = {
"routine": ["mesh_broadcast"],
}
cfg.notifications.toggles["seismic"].cooldown_seconds = 0
e = make_event(
source="usgs", category="stream_high_water", severity="routine",
title="Snake River 9.8 ft", timestamp=time.time(),
)
_, rec = _dispatch_one(cfg, e)
assert len(rec) == 1 and rec[0]["name"] == "toggle:seismic"
# ============================================================== misc
def test_dispatch_stats_exposes_all_counters():
"""Stats dict shape is part of the v0.5.2 contract for /api/health."""
cfg = _cfg()
d, _ = _make_dispatcher(cfg)
stats = d.dispatch_stats()
assert set(stats.keys()) == {
"stale_dropped", "cooldown_dropped", "dedup_dropped",
"cooldown_keys", "dedup_lru_size",
}