mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-11 01:14:45 +02:00
Three integrated pieces that ship together because they were designed as one safety story: (1) PERSISTENCE FOUNDATION -- new meshai/persistence/ module with SQLite db.py, schema migration framework (v1), 13 tables covering all adapter event shapes (traffic_events, fires, firms_pixels, quake_events, nws_alerts, gauge_readings, swpc_events) + mesh state (mesh_nodes, mesh_telemetry, mesh_positions, mesh_messages_in, mesh_broadcasts_out, mesh_health_events) + cross-cutting event_log + schema_meta. WAL mode for reader concurrency, single-writer pattern, MESHAI_DB_PATH env var, mounted at /data/meshai.sqlite via existing docker-compose meshai_data volume. .gitignore updated. (2) WFIGS HANDLER -- meshai/central/wfigs_handler.py implements the first per-adapter handler that uses the persistence layer. Format: MEDIUM style with town/landclass/county fallback chain, lat/lon at 3-decimal precision, New:/Update: prefix. 8h-rate-limited change-detection per IRWIN via fires.last_broadcast_at. Skips tombstones and perimeters silently (logged to event_log with handled=0). Acres fallback chain DailyAcres -> IncidentSize -> raw.DiscoveryAcres -> raw.FinalAcres -> N/A. Pass-through Initial Attack auto-numbered names (IA 1, IA 2). (3) UNIVERSAL COLD-START GRACE -- meshai/notifications/pipeline/dispatcher.py grows a configurable grace window (cold_start_grace_seconds, default 60s, GUI-editable per Rule 17). Anchored to first-event-seen (not container boot), so the grace activates the moment broadcasts could fire. Suppresses mesh delivery during the window; handler-side persistence (fires UPSERT, event_log) still happens normally. New _cold_start_dropped counter exposed in dispatch_stats(). Designed to protect against JetStream backlog spam at toggle-flip time, applies universally to ALL adapters. (4) WFIGS HANDLER CALLBACK REFACTOR -- New:/Update: prefix now keys on fires.last_broadcast_at IS NULL (not row-missing), and last_broadcast_* field updates moved to a post-broadcast commit callback that the dispatcher invokes ONLY on successful delivery. This means: cold-start-suppressed events leave fires.last_broadcast_at NULL, so when they eventually broadcast post-grace, they correctly render as New: (first ACTUAL delivery for that IRWIN), not Update:. event_log.handled and mesh_broadcasts_out audit row also gated on the same callback -- decoupling persistence rows from broadcast rows for an honest audit trail. New tests: 15 in test_wfigs_handler.py, 15 in test_persistence.py, additional cold-start grace tests in test_dispatcher.py (+4 WFIGS callback scenarios). Synthetic probes wfigs-cleaned-samples.md (initial) and wfigs-cleaned-samples-v2.md (cold-start verification) generated against isolated temp SQLite databases. CT108 /data/meshai.sqlite untouched during build. Master stays off. No live toggle flips. Test count: was 535 (v0.5.7 baseline) -> 566 (persistence) -> 581 (wfigs handler) -> 589 expected (cold-start grace). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
342 lines
13 KiB
Python
342 lines
13 KiB
Python
"""v0.5.2 — staleness filter, cooldown, dedup, friendly renderer, hydro family.
|
||
|
||
Spec: docs/v0.5.2-spec-cooldown-and-staleness.md (Sections 1–5).
|
||
|
||
Eight tests per spec Verification §C plus a couple of guards on
|
||
counter increments / stats exposure. We intentionally exercise both the
|
||
unit (`compose_mesh_message`) and the integration (dispatcher hands the
|
||
composed string into the channel payload).
|
||
"""
|
||
|
||
import asyncio
|
||
import time
|
||
|
||
import pytest
|
||
|
||
from meshai.config import Config, NotificationRuleConfig
|
||
from meshai.notifications.pipeline.dispatcher import Dispatcher
|
||
from meshai.notifications.events import make_event
|
||
from meshai.notifications.renderers.composer import (
|
||
compose_mesh_message,
|
||
_BYTE_BUDGET,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------- helpers
|
||
|
||
|
||
class RecChannel:
|
||
"""Channel recorder that captures rule + full payload (including .message)."""
|
||
|
||
def __init__(self, rec):
|
||
self.rec = rec
|
||
|
||
async def deliver(self, payload, rule):
|
||
self.rec.append({
|
||
"delivery_type": rule.delivery_type,
|
||
"name": rule.name,
|
||
"message": payload.message,
|
||
"category": payload.category,
|
||
"severity": payload.severity,
|
||
})
|
||
return True
|
||
|
||
|
||
def _make_dispatcher(cfg):
|
||
rec: list = []
|
||
d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None)
|
||
return d, rec
|
||
|
||
|
||
def _dispatch_one(cfg, event):
|
||
d, rec = _make_dispatcher(cfg)
|
||
asyncio.run(d.dispatch(event))
|
||
return d, rec
|
||
|
||
|
||
def _cfg(toggle_name="weather", **kw):
|
||
"""Default config: one toggle enabled with mesh_broadcast on priority."""
|
||
cfg = Config()
|
||
cfg.notifications.rules = []
|
||
# v0.5.8b: disable the cold-start grace for these tests -- they
|
||
# exercise the v0.5.2 guards in isolation and expect the first
|
||
# event to broadcast.
|
||
cfg.notifications.cold_start_grace_seconds = 0
|
||
t = cfg.notifications.toggles[toggle_name]
|
||
t.enabled = True
|
||
t.min_severity = kw.get("min_severity", "routine")
|
||
t.regions = kw.get("regions", [])
|
||
t.severity_channels = kw.get("severity_channels", {
|
||
"routine": ["mesh_broadcast"],
|
||
"priority": ["mesh_broadcast"],
|
||
"immediate": ["mesh_broadcast"],
|
||
})
|
||
# v0.5.2 fields — tests override per-case as needed
|
||
t.freshness_seconds = kw.get("freshness_seconds", 600)
|
||
t.cooldown_seconds = kw.get("cooldown_seconds", 300)
|
||
return cfg
|
||
|
||
|
||
def _ev(severity="priority", category="weather_warning",
|
||
timestamp=None, region=None, source="nws", title="t", **kw):
|
||
"""Build an Event. timestamp=None means "now" via make_event auto-set."""
|
||
extra = dict(kw)
|
||
if timestamp is not None:
|
||
extra["timestamp"] = timestamp
|
||
return make_event(
|
||
source=source, category=category, severity=severity,
|
||
region=region, title=title, **extra,
|
||
)
|
||
|
||
|
||
# ============================================================== Section 1
|
||
# Staleness filter
|
||
|
||
def test_staleness_drops_old_central_event():
|
||
"""Spec §1: event with timestamp = now - 7200s must be dropped at entrance."""
|
||
cfg = _cfg(freshness_seconds=600)
|
||
stale = _ev(timestamp=time.time() - 7200)
|
||
d, rec = _dispatch_one(cfg, stale)
|
||
assert rec == [], "stale event must not be dispatched"
|
||
assert d.dispatch_stats()["stale_dropped"] == 1
|
||
|
||
|
||
def test_staleness_passes_fresh_event():
|
||
"""Spec §1: a fresh event (now) flows through normally."""
|
||
cfg = _cfg(freshness_seconds=600)
|
||
fresh = _ev(timestamp=time.time()) # 0s old
|
||
d, rec = _dispatch_one(cfg, fresh)
|
||
assert len(rec) == 1
|
||
assert d.dispatch_stats()["stale_dropped"] == 0
|
||
|
||
|
||
def test_staleness_applies_to_immediate_severity():
|
||
"""Spec §1 note: stale-immediate also drops — recipient saw it elsewhere."""
|
||
cfg = _cfg(freshness_seconds=600)
|
||
stale_imm = _ev(severity="immediate", timestamp=time.time() - 3600)
|
||
d, rec = _dispatch_one(cfg, stale_imm)
|
||
assert rec == []
|
||
assert d.dispatch_stats()["stale_dropped"] == 1
|
||
|
||
|
||
# ============================================================== Section 2
|
||
# Per-toggle cooldown
|
||
|
||
def test_cooldown_throttles_same_category_region():
|
||
"""Spec §2: two events with same (toggle, category, region) within window
|
||
→ only the first fires; second is silently throttled."""
|
||
cfg = _cfg(cooldown_seconds=300)
|
||
d, rec = _make_dispatcher(cfg)
|
||
e1 = _ev(region="Magic Valley")
|
||
e2 = _ev(region="Magic Valley")
|
||
asyncio.run(d.dispatch(e1))
|
||
asyncio.run(d.dispatch(e2))
|
||
assert len(rec) == 1, "second event in cooldown window must be dropped"
|
||
assert d.dispatch_stats()["cooldown_dropped"] == 1
|
||
|
||
|
||
def test_cooldown_releases_after_window():
|
||
"""Spec §2: cooldown_seconds=0 disables throttling → both fire."""
|
||
cfg = _cfg(cooldown_seconds=0)
|
||
d, rec = _make_dispatcher(cfg)
|
||
# Different event IDs (so dedup doesn't catch us) — vary group_key.
|
||
asyncio.run(d.dispatch(_ev(group_key="a")))
|
||
asyncio.run(d.dispatch(_ev(group_key="b")))
|
||
assert len(rec) == 2, "cooldown_seconds=0 must allow both"
|
||
|
||
|
||
def test_cooldown_different_region_not_throttled():
|
||
"""Spec §2: cooldown is keyed on region — different regions don't share."""
|
||
cfg = _cfg(cooldown_seconds=300)
|
||
d, rec = _make_dispatcher(cfg)
|
||
asyncio.run(d.dispatch(_ev(region="Magic Valley", group_key="mv")))
|
||
asyncio.run(d.dispatch(_ev(region="Wood River", group_key="wr")))
|
||
assert len(rec) == 2
|
||
assert d.dispatch_stats()["cooldown_dropped"] == 0
|
||
|
||
|
||
# ============================================================== Section 3
|
||
# (source, event.id) dedup
|
||
|
||
def test_dedup_catches_identical_source_event_id():
|
||
"""Spec §3: same (source, id) on consecutive deliveries — second dropped.
|
||
Uses two events constructed with the same identity (no group_key)."""
|
||
cfg = _cfg(cooldown_seconds=0) # disable cooldown so only dedup can drop
|
||
d, rec = _make_dispatcher(cfg)
|
||
e1 = _ev()
|
||
e2 = _ev()
|
||
# make_event auto-computes the same id for identical source+category+geo
|
||
assert e1.id == e2.id, "preflight: ids must match for this test"
|
||
asyncio.run(d.dispatch(e1))
|
||
asyncio.run(d.dispatch(e2))
|
||
assert len(rec) == 1
|
||
assert d.dispatch_stats()["dedup_dropped"] == 1
|
||
|
||
|
||
def test_dedup_lru_eviction_under_load():
|
||
"""Spec §3: bounded LRU at 10k entries — distinct ids don't crash, and
|
||
after 10k+ entries the size stabilizes. We assert just the cap behavior
|
||
using a private constant so we don't churn 10k events in the test."""
|
||
from meshai.notifications.pipeline import dispatcher as disp_mod
|
||
cfg = _cfg(cooldown_seconds=0, freshness_seconds=0) # disable both guards
|
||
d, rec = _make_dispatcher(cfg)
|
||
cap = disp_mod._DEDUP_LRU_MAX
|
||
# Fire cap + 5 distinct events; the LRU should hold exactly cap.
|
||
for i in range(cap + 5):
|
||
asyncio.run(d.dispatch(_ev(group_key=f"k{i}")))
|
||
assert d.dispatch_stats()["dedup_lru_size"] == cap
|
||
|
||
|
||
# ============================================================== Section 4
|
||
# Friendly renderer
|
||
|
||
def test_renderer_produces_friendly_string():
|
||
"""Spec §4: compose_mesh_message yields a string with severity emoji +
|
||
UPPERCASE label + primary identifier + severity word; ≤150 bytes UTF-8."""
|
||
e = make_event(
|
||
source="nws", category="weather_warning", severity="priority",
|
||
title="Red Flag Warning", region="Twin Falls", timestamp=time.time(),
|
||
)
|
||
s = compose_mesh_message(e)
|
||
assert "⚠" in s and "WX" in s
|
||
assert "Red Flag Warning" in s
|
||
assert "priority" in s
|
||
assert len(s.encode("utf-8")) <= _BYTE_BUDGET
|
||
|
||
|
||
def test_renderer_byte_budget_drops_optional_segments():
|
||
"""Spec §4: when over budget, optional segments drop FIRST (context, then
|
||
distance, then quant, then region). Required segments (head + primary +
|
||
severity) always survive."""
|
||
big_title = "A" * 200
|
||
e = make_event(
|
||
source="nws", category="wildfire_incident", severity="immediate",
|
||
title=big_title, region="Wood River Valley",
|
||
timestamp=time.time(),
|
||
data={
|
||
"acres": 1500,
|
||
"containment_pct": 25,
|
||
"cause": "lightning",
|
||
"distance_km": 8,
|
||
"bearing": "W",
|
||
"anchor": "Hailey",
|
||
},
|
||
)
|
||
s = compose_mesh_message(e)
|
||
assert len(s.encode("utf-8")) <= _BYTE_BUDGET
|
||
# Head + severity word still present:
|
||
assert s.startswith("🔥 FIRE:")
|
||
assert "immediate" in s
|
||
# Lowest-priority optional (context) must have been dropped:
|
||
assert "% contained" not in s
|
||
assert "lightning" not in s
|
||
|
||
|
||
def test_renderer_never_mid_character_truncation():
|
||
"""The composer must never emit a UTF-8 byte sequence that splits a
|
||
codepoint. Even with required-only over budget, we drop wholesale or
|
||
shrink by codepoints + ellipsis."""
|
||
# All four-byte emoji glyphs in a row, primary forced super long.
|
||
e = make_event(
|
||
source="nws", category="wildfire_hotspot", severity="priority",
|
||
title="🔥" * 200, # 800 bytes of emoji
|
||
timestamp=time.time(),
|
||
)
|
||
s = compose_mesh_message(e)
|
||
# Must be valid UTF-8 (no UnicodeDecodeError on round-trip).
|
||
s.encode("utf-8").decode("utf-8")
|
||
assert len(s.encode("utf-8")) <= _BYTE_BUDGET
|
||
|
||
|
||
def test_renderer_no_debug_fallback_for_central_prefixed_category():
|
||
"""Regression — the prod incident: central.<category> event with empty
|
||
title/summary must NOT yield `[Family] central.category` debug format."""
|
||
e = make_event(
|
||
source="central", category="central.weather_warning",
|
||
severity="priority",
|
||
title="", # explicitly empty
|
||
timestamp=time.time(),
|
||
)
|
||
s = compose_mesh_message(e)
|
||
assert "central.weather_warning" not in s
|
||
# Must still carry a meaningful label even though category is unrecognized.
|
||
assert any(c.isupper() for c in s)
|
||
|
||
|
||
def test_renderer_message_lands_in_toggle_payload():
|
||
"""Integration: composer output must reach the channel as payload.message."""
|
||
cfg = _cfg(cooldown_seconds=0, freshness_seconds=0)
|
||
e = _ev(title="Red Flag Warning", region="Twin Falls")
|
||
_, rec = _dispatch_one(cfg, e)
|
||
assert len(rec) == 1
|
||
msg = rec[0]["message"]
|
||
assert "Red Flag Warning" in msg
|
||
assert "⚠" in msg # weather_warning emoji
|
||
assert len(msg.encode("utf-8")) <= _BYTE_BUDGET
|
||
|
||
|
||
# ============================================================== Section 5
|
||
# Hydro family routing
|
||
|
||
def test_hydro_event_maps_to_geohazards_toggle():
|
||
"""Spec §5: stream_flood_warning + stream_high_water route to the
|
||
canonical Geohazards toggle (`seismic` in VALID_TOGGLES). Weather
|
||
toggle alone must NOT fire on them anymore."""
|
||
cfg = Config()
|
||
cfg.notifications.rules = []
|
||
cfg.notifications.cold_start_grace_seconds = 0
|
||
# Enable BOTH weather and seismic toggles so we can prove routing.
|
||
cfg.notifications.toggles["weather"].enabled = True
|
||
cfg.notifications.toggles["weather"].min_severity = "routine"
|
||
cfg.notifications.toggles["weather"].severity_channels = {
|
||
"routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"],
|
||
}
|
||
cfg.notifications.toggles["weather"].cooldown_seconds = 0
|
||
cfg.notifications.toggles["seismic"].enabled = True
|
||
cfg.notifications.toggles["seismic"].min_severity = "routine"
|
||
cfg.notifications.toggles["seismic"].severity_channels = {
|
||
"routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"],
|
||
}
|
||
cfg.notifications.toggles["seismic"].cooldown_seconds = 0
|
||
|
||
e = make_event(
|
||
source="usgs", category="stream_flood_warning", severity="priority",
|
||
title="Snake River nr Twin Falls 12.8 ft", timestamp=time.time(),
|
||
)
|
||
_, rec = _dispatch_one(cfg, e)
|
||
names = {r["name"] for r in rec}
|
||
assert "toggle:seismic" in names, "hydro must route to seismic family"
|
||
assert "toggle:weather" not in names, "hydro must NOT route to weather"
|
||
|
||
|
||
def test_hydro_high_water_also_seismic():
|
||
"""Same as above for stream_high_water (the lower-severity sibling)."""
|
||
cfg = Config()
|
||
cfg.notifications.rules = []
|
||
cfg.notifications.cold_start_grace_seconds = 0
|
||
cfg.notifications.toggles["seismic"].enabled = True
|
||
cfg.notifications.toggles["seismic"].min_severity = "routine"
|
||
cfg.notifications.toggles["seismic"].severity_channels = {
|
||
"routine": ["mesh_broadcast"],
|
||
}
|
||
cfg.notifications.toggles["seismic"].cooldown_seconds = 0
|
||
e = make_event(
|
||
source="usgs", category="stream_high_water", severity="routine",
|
||
title="Snake River 9.8 ft", timestamp=time.time(),
|
||
)
|
||
_, rec = _dispatch_one(cfg, e)
|
||
assert len(rec) == 1 and rec[0]["name"] == "toggle:seismic"
|
||
|
||
|
||
# ============================================================== misc
|
||
|
||
def test_dispatch_stats_exposes_all_counters():
|
||
"""Stats dict shape is part of the v0.5.2 contract for /api/health."""
|
||
cfg = _cfg()
|
||
d, _ = _make_dispatcher(cfg)
|
||
stats = d.dispatch_stats()
|
||
assert set(stats.keys()) == {
|
||
"stale_dropped", "cooldown_dropped", "dedup_dropped",
|
||
"cold_start_dropped", "cold_start_anchor_at",
|
||
"cooldown_keys", "dedup_lru_size",
|
||
}
|