diff --git a/src/central/adapters/avalanche_org.py b/src/central/adapters/avalanche_org.py index 77c8513..16cd42c 100644 --- a/src/central/adapters/avalanche_org.py +++ b/src/central/adapters/avalanche_org.py @@ -76,13 +76,91 @@ _DEDUP_DDL = ( "PRIMARY KEY (adapter, event_id))" ) +# v0.10.11: per-adapter state for tombstone emission. Tracks +# (center_id, zone_name) of zones that PASSED the severity gate on the +# last poll -- so we can diff against the next poll and emit retraction +# envelopes when a zone falls below the threshold, flips off_season, or +# disappears from the upstream feed entirely. +_OBSERVED_DDL = ( + "CREATE TABLE IF NOT EXISTS avalanche_org_observed (" + "center_id TEXT NOT NULL, zone_name TEXT NOT NULL, " + "state TEXT NOT NULL, " + "last_published_at TEXT NOT NULL, " + "PRIMARY KEY (center_id, zone_name))" +) + + +def _read_observed( + db: sqlite3.Connection, +) -> dict[tuple[str, str], tuple[str, str]]: + """Return ``{(center_id, zone_name): (state, last_published_at)}``.""" + cursor = db.execute( + "SELECT center_id, zone_name, state, last_published_at " + "FROM avalanche_org_observed" + ) + return {(r[0], r[1]): (r[2], r[3]) for r in cursor.fetchall()} + + +def _upsert_observed( + db: sqlite3.Connection, + current: dict[tuple[str, str], str], +) -> None: + """Upsert ``current``: ``{(center_id, zone_name): state}``. Sets + ``last_published_at`` to now for each row.""" + now_iso = datetime.now(timezone.utc).isoformat() + for (cid, zn), state in current.items(): + db.execute( + "INSERT OR REPLACE INTO avalanche_org_observed " + "(center_id, zone_name, state, last_published_at) " + "VALUES (?, ?, ?, ?)", + (cid, zn, state, now_iso), + ) + db.commit() + + +def _delete_observed( + db: sqlite3.Connection, + keys: set[tuple[str, str]], +) -> None: + """Remove the listed ``(center_id, zone_name)`` rows. Idempotent.""" + for (cid, zn) in keys: + db.execute( + "DELETE FROM avalanche_org_observed " + "WHERE center_id = ? AND zone_name = ?", + (cid, zn), + ) + db.commit() + + +def _removal_reason(upstream_props: dict[str, Any] | None) -> str: + """Classify why a previously-published zone is no longer publishing. + + - ``off_season``: zone is still in the feed but ``off_season=true``. + - ``below_threshold``: zone is still in the feed but ``danger_level < 3`` + (or any other gate-failing value like ``-1``). + - ``fallen_off_feed``: zone is absent from this poll's response entirely + (forecast center reorganised, zone deprecated). meshai's renderer is + expected to treat this the same as ``below_threshold`` -- both are + retractions, only the upstream signal differs. + """ + if upstream_props is None: + return "fallen_off_feed" + if upstream_props.get("off_season"): + return "off_season" + return "below_threshold" + def _slug(text: str) -> str: - """Lowercase + collapse runs of non-alphanum to single underscores. + """Lowercase + collapse runs of non-alphanum to single hyphens. - 'Sawtooth & Western Smoky Mtns' → 'sawtooth_western_smoky_mtns'. + 'Sawtooth & Western Smoky Mtns' → 'sawtooth-western-smoky-mtns'. + + v0.10.11: meshai requested hyphens instead of underscores so the slug + matches the rest of their renderer's url-style key conventions. Safe + to swap because off-season `published_ids` for avalanche_org was 0 at + the time of this change -- no live event.id values to invalidate. """ - return re.sub(r"[^a-zA-Z0-9]+", "_", text or "").strip("_").lower() + return re.sub(r"[^a-zA-Z0-9]+", "-", text or "").strip("-").lower() def _parse_iso(value: Any) -> datetime | None: @@ -165,6 +243,7 @@ class AvalancheOrgAdapter(SourceAdapter): "CREATE INDEX IF NOT EXISTS published_ids_last_seen " "ON published_ids (last_seen)" ) + self._db.execute(_OBSERVED_DDL) self._db.commit() logger.info( "avalanche_org adapter started", @@ -235,6 +314,10 @@ class AvalancheOrgAdapter(SourceAdapter): if centroid is None: return None lon, lat = centroid + try: + bbox = shapely_shape(geometry).bounds # (minx, miny, maxx, maxy) + except Exception: + bbox = None valid_dt = _parse_iso(props.get("start_date")) or datetime.now(timezone.utc) advice = (props.get("travel_advice") or "")[:_TRAVEL_ADVICE_MAX_CHARS] @@ -248,6 +331,7 @@ class AvalancheOrgAdapter(SourceAdapter): severity=severity, geo=Geo( centroid=(lon, lat), + bbox=bbox, geometry=geometry, regions=[f"US-{state}"], primary_region=f"US-{state}", @@ -270,6 +354,7 @@ class AvalancheOrgAdapter(SourceAdapter): async def poll(self) -> AsyncIterator[Event]: if not self._session: raise RuntimeError("Session not initialized") + assert self._db is not None # for type narrowing results = await asyncio.gather( *[self._fetch(c) for c in self._center_ids], return_exceptions=True, @@ -277,6 +362,14 @@ class AvalancheOrgAdapter(SourceAdapter): yielded = 0 omitted = 0 + # Map every upstream feature this poll by (center_id, zone_name) so + # the tombstone phase can classify why a previously-published zone + # is no longer publishing (off_season vs below_threshold vs absent). + upstream_by_key: dict[tuple[str, str], dict[str, Any]] = {} + # Zones that PASSED the gate this poll, with their state for the + # tombstone subject derivation if they fall off next time. + current_published: dict[tuple[str, str], str] = {} + for center_id, result in zip(self._center_ids, results): if isinstance(result, BaseException) or result is None: if isinstance(result, BaseException): @@ -287,6 +380,10 @@ class AvalancheOrgAdapter(SourceAdapter): continue features = result.get("features") or [] for feat in features: + props = feat.get("properties") or {} + zone_name = props.get("name") + if zone_name: + upstream_by_key[(center_id, zone_name)] = props try: ev = self._build_event_record(feat, center_id) except Exception: @@ -299,16 +396,56 @@ class AvalancheOrgAdapter(SourceAdapter): if ev is None: omitted += 1 continue + current_published[(center_id, ev.data["zone_name"])] = ev.data["state"] yield ev yielded += 1 + # Tombstone phase: zones in the observed table but not in this poll's + # passing set get a retraction Event. Reason determined from the + # upstream feature's current state (if still in feed) or marked + # 'fallen_off_feed' if absent entirely. + observed_before = _read_observed(self._db) + removed: set[tuple[str, str]] = ( + set(observed_before.keys()) - set(current_published.keys()) + ) + tombstones = 0 + for key in removed: + cid, zone_name = key + last_state, last_published_at = observed_before[key] + reason = _removal_reason(upstream_by_key.get(key)) + now = datetime.now(timezone.utc) + tomb_id = f"{cid}_{_slug(zone_name)}:removed:{now.isoformat()}" + yield Event( + id=tomb_id, + adapter=self.name, + category=f"avy.advisory.removed.{cid.lower()}", + time=now, + severity=0, + geo=Geo(), + data={ + "center_id": cid, + "zone_name": zone_name, + "state": last_state, + "reason": reason, + "last_published_at": last_published_at, + }, + ) + tombstones += 1 + + # Update state AFTER yielding so a supervisor crash mid-publish causes + # at-least-once retry on the next poll (matches wfigs convention). + _upsert_observed(self._db, current_published) + _delete_observed(self._db, removed) self.sweep_old_ids() logger.info( "avalanche_org poll completed", extra={"centers": self._center_ids, - "events_yielded": yielded, "events_omitted": omitted}, + "events_yielded": yielded, "events_omitted": omitted, + "tombstones_emitted": tombstones}, ) def subject_for(self, event: Event) -> str: state = (event.data.get("state") or "").lower() or "unknown" + if event.category.startswith("avy.advisory.removed"): + return f"central.avy.advisory.removed.us.{state}" return f"central.avy.advisory.us.{state}" diff --git a/tests/test_avalanche_org.py b/tests/test_avalanche_org.py index 4be1ed5..baf3cfd 100644 --- a/tests/test_avalanche_org.py +++ b/tests/test_avalanche_org.py @@ -16,6 +16,8 @@ from central.adapters.avalanche_org import ( AvalancheOrgSettings, _centroid, _parse_iso, + _read_observed, + _removal_reason, _slug, ) from central.config_models import AdapterConfig @@ -46,16 +48,17 @@ def adapter(tmp_path: Path) -> AvalancheOrgAdapter: @pytest.mark.parametrize("text, expected", [ - ("Banner Summit", "banner_summit"), - ("Sawtooth & Western Smoky Mtns", "sawtooth_western_smoky_mtns"), - ("Galena Summit & Eastern Mtns", "galena_summit_eastern_mtns"), - ("Soldier & Wood River Valley Mtns", "soldier_wood_river_valley_mtns"), - ("ALL CAPS", "all_caps"), - (" leading/trailing ", "leading_trailing"), - ("hyphens-and_underscores", "hyphens_and_underscores"), + ("Banner Summit", "banner-summit"), + ("Sawtooth & Western Smoky Mtns", "sawtooth-western-smoky-mtns"), + ("Galena Summit & Eastern Mtns", "galena-summit-eastern-mtns"), + ("Soldier & Wood River Valley Mtns", "soldier-wood-river-valley-mtns"), + ("ALL CAPS", "all-caps"), + (" leading/trailing ", "leading-trailing"), + ("hyphens-and_underscores", "hyphens-and-underscores"), ("", ""), ]) -def test_slug(text, expected): +def test_slug_uses_hyphens(text, expected): + """v0.10.11: slug switched from underscore-join to hyphen-join.""" assert _slug(text) == expected @@ -146,10 +149,14 @@ def test_publishable_danger_levels_yield_event_with_mapped_severity( assert ev.data["center_id"] == "SNFAC" assert ev.data["zone_name"] == "Banner Summit" assert ev.data["off_season"] is False - assert ev.id == "SNFAC_banner_summit" + assert ev.id == "SNFAC_banner-summit" # v0.10.11: hyphenated slug assert ev.category == "avy.advisory.snfac" assert ev.geo.primary_region == "US-ID" assert ev.geo.geometry is not None + # v0.10.11: bbox is computed from polygon bounds (W, S, E, N). + assert ev.geo.bbox is not None + west, south, east, north = ev.geo.bbox + assert west < east and south < north @pytest.mark.parametrize("danger_level", [-1, 0, 1, 2]) @@ -307,3 +314,198 @@ async def test_poll_yields_only_publishable_features(adapter, snfac_response, mo assert events[0].severity == 3 # danger 4 → severity 3 finally: await adapter.shutdown() + + +# --- v0.10.11: tombstone emission tests ------------------------------------- + + +@pytest.mark.parametrize("upstream, expected_reason", [ + ({"off_season": True, "danger_level": -1}, "off_season"), + ({"off_season": False, "danger_level": 1}, "below_threshold"), + ({"off_season": False, "danger_level": 2}, "below_threshold"), + ({"off_season": False, "danger_level": -1}, "below_threshold"), + (None, "fallen_off_feed"), +]) +def test_removal_reason_classification(upstream, expected_reason): + """The _removal_reason helper distinguishes off_season vs below_threshold + vs absent-from-feed. meshai treats fallen_off_feed the same as + below_threshold for retraction rendering -- documented in the PR body.""" + assert _removal_reason(upstream) == expected_reason + + +def _winter_feature(zone_name: str, danger_level: int = 3, *, off_season: bool = False) -> dict: + """Build a feature with overridable severity for state-transition tests.""" + return { + "type": "Feature", "id": 1, + "properties": { + "name": zone_name, "state": "ID", + "off_season": off_season, "danger_level": danger_level, + "danger": "Considerable", "travel_advice": "x", + "start_date": "2026-12-15T17:59:00", "end_date": "2026-12-16T19:00:00", + }, + "geometry": { + "type": "Polygon", + "coordinates": [[[-115.0, 44.0], [-114.0, 44.0], + [-114.0, 45.0], [-115.0, 45.0], [-115.0, 44.0]]], + }, + } + + +async def _poll_with(adapter, features): + """Run one poll with a mocked _fetch returning the given features.""" + async def _fake_fetch(center_id): + return {"features": features} + adapter._fetch = _fake_fetch + return [e async for e in adapter.poll()] + + +@pytest.mark.asyncio +async def test_tombstone_when_zone_drops_below_threshold(adapter): + """P1 publishes Considerable; P2 sees same zone at Low → tombstone.""" + await adapter.startup() + try: + p1 = await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + assert len(p1) == 1 and p1[0].category.startswith("avy.advisory.") + assert "removed" not in p1[0].category + + p2 = await _poll_with(adapter, [_winter_feature("Banner Summit", 1)]) + assert len(p2) == 1 + tomb = p2[0] + assert tomb.category == "avy.advisory.removed.snfac" + assert tomb.severity == 0 + assert tomb.data["reason"] == "below_threshold" + assert tomb.data["zone_name"] == "Banner Summit" + assert tomb.data["state"] == "ID" + assert adapter.subject_for(tomb) == "central.avy.advisory.removed.us.id" + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_tombstone_when_zone_goes_off_season(adapter): + await adapter.startup() + try: + await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + p2 = await _poll_with( + adapter, [_winter_feature("Banner Summit", -1, off_season=True)] + ) + assert len(p2) == 1 + assert p2[0].data["reason"] == "off_season" + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_tombstone_when_zone_absent_from_feed(adapter): + """Zone falls off the response entirely → reason='fallen_off_feed'.""" + await adapter.startup() + try: + await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + p2 = await _poll_with(adapter, []) # zone gone + assert len(p2) == 1 + assert p2[0].data["reason"] == "fallen_off_feed" + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_no_tombstone_when_zone_stays_above_threshold(adapter): + """Repeat Considerable across polls → live publish only, no tombstone.""" + await adapter.startup() + try: + await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + p2 = await _poll_with(adapter, [_winter_feature("Banner Summit", 4)]) + assert len(p2) == 1 + assert "removed" not in p2[0].category + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_no_tombstone_for_never_published_zone(adapter): + """Zone has been below threshold the whole time → no tombstone ever.""" + await adapter.startup() + try: + await _poll_with(adapter, [_winter_feature("Banner Summit", 1)]) + p2 = await _poll_with(adapter, [_winter_feature("Banner Summit", -1, off_season=True)]) + assert p2 == [] + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_no_duplicate_tombstone_across_consecutive_polls(adapter): + """Load-bearing correctness case: tombstone emitted ONCE on the transition, + then the zone is removed from the observed-published table so subsequent + polls under the same below-threshold condition do NOT re-emit. This is the + bug class meshai is exposed to if the diff logic gets it wrong. + """ + await adapter.startup() + try: + # P1: publish at Considerable -> observed table has the zone. + await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + obs_after_p1 = _read_observed(adapter._db) + assert ("SNFAC", "Banner Summit") in obs_after_p1 + + # P2: zone drops to Low -> tombstone emitted AND observed table cleared. + p2 = await _poll_with(adapter, [_winter_feature("Banner Summit", 1)]) + assert len(p2) == 1 and p2[0].category == "avy.advisory.removed.snfac" + obs_after_p2 = _read_observed(adapter._db) + assert ("SNFAC", "Banner Summit") not in obs_after_p2, ( + "tombstone emitted but observed row not deleted — next poll would " + "re-emit, which is the bug we are guarding against" + ) + + # P3: still Low -> no second tombstone (observed table is empty). + p3 = await _poll_with(adapter, [_winter_feature("Banner Summit", 1)]) + assert p3 == [], ( + f"duplicate tombstone emitted on P3 ({len(p3)} events); " + f"diff logic is not removing zones from the observed table" + ) + + # P4: zone recovers to Considerable -> normal live publish, no tombstone. + p4 = await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + assert len(p4) == 1 and "removed" not in p4[0].category + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_subject_for_routes_removed_category_correctly(adapter): + """Tombstone subject is `central.avy.advisory.removed.us.`.""" + await adapter.startup() + try: + await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + p2 = await _poll_with(adapter, [_winter_feature("Banner Summit", 1)]) + assert len(p2) == 1 + tomb = p2[0] + assert adapter.subject_for(tomb) == "central.avy.advisory.removed.us.id" + # Sanity: the live-publish subject still works. + live = adapter._build_event_record(_winter_feature("Other Zone", 4), "SNFAC") + assert adapter.subject_for(live) == "central.avy.advisory.us.id" + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_tombstone_id_is_unique_per_emission(adapter): + """Each tombstone gets a fresh `:removed:` suffix so JetStream + doesn't dedup re-issued tombstones for the same zone across cycles.""" + import asyncio as _asyncio + await adapter.startup() + try: + await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + p2 = await _poll_with(adapter, [_winter_feature("Banner Summit", 1)]) + # publish, drop -> tombstone with one timestamp + first_id = p2[0].id + + # zone recovers and drops again later with a different now() + await _poll_with(adapter, [_winter_feature("Banner Summit", 3)]) + await _asyncio.sleep(0.01) # ensure new ISO timestamp + p4 = await _poll_with(adapter, [_winter_feature("Banner Summit", 1)]) + second_id = p4[0].id + + assert first_id != second_id + assert ":removed:" in first_id and ":removed:" in second_id + finally: + await adapter.shutdown()