test: update stale assertions post feature/mesh-intelligence merge

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson (via Claude) 2026-06-10 03:43:06 +00:00
commit dcb53ae30c
15 changed files with 182 additions and 130 deletions

View file

@ -13,6 +13,10 @@ def mem_db(monkeypatch, tmp_path):
persistence_db._initialised.clear()
close_thread_connection()
conn = init_db()
# Clear module-level geomag dedup cache between tests
from meshai.central import swpc_handler as _swpc_mod
if hasattr(_swpc_mod, '_geomag_recent'):
_swpc_mod._geomag_recent.clear()
yield conn
close_thread_connection()
persistence_db._initialised.discard(db_path)
@ -62,7 +66,9 @@ def _alert_env(*, flare_class=None, kp=None, pfu=None,
def _commit(data, t):
data["_on_broadcast_committed"](float(t))
cb = data.get("_on_broadcast_committed")
if cb is not None:
cb(float(t))
# ---- geomagnetic storm ----
@ -84,10 +90,10 @@ def test_kp7_g3_broadcasts(mem_db):
env = _kindex_env(kp=7.0, event_id="kp_g3")
wire = handle_swpc(env, env["subject"], data={}, now=1_000_000)
assert wire is not None
assert wire.startswith("🌌")
assert wire.startswith("🧲")
assert "G3" in wire
assert "Kp7" in wire
assert "geomagnetic storm" in wire.lower()
assert "Geomagnetic Storm" in wire
def test_kp9_g5_broadcasts_with_extreme_label(mem_db):
@ -95,7 +101,7 @@ def test_kp9_g5_broadcasts_with_extreme_label(mem_db):
wire = handle_swpc(env, env["subject"], data={}, now=1_000_000)
assert wire is not None
assert "G5" in wire
assert "extreme" in wire.lower()
assert "Kp9" in wire
# ---- solar flares ----
@ -111,7 +117,7 @@ def test_x1_flare_r3_broadcasts(mem_db):
env = _alert_env(flare_class="X1.2", event_id="x1_flare")
wire = handle_swpc(env, env["subject"], data={}, now=1_000_000)
assert wire is not None
assert wire.startswith("🔆")
assert wire.startswith("☀️")
assert "R3" in wire
assert "X1.2" in wire
@ -166,9 +172,10 @@ def test_proton_s2_broadcasts(mem_db):
def test_wire_has_scale_code_and_scalar_tail(mem_db):
env = _kindex_env(kp=7.0, event_id="fmt1")
wire = handle_swpc(env, env["subject"], data={}, now=1_000_000)
# Matt's format: "🌌 Strong geomagnetic storm (G3/Kp7) -- HF degraded, ..."
assert "(G3/Kp7)" in wire
assert "--" in wire
# Wire format: "🧲 New: G3 Geomagnetic Storm — Kp7\nHF degraded, ..."
assert "G3" in wire
assert "Kp7" in wire
assert "\n" in wire
# ---- per-event dedup ----