diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py
index c5ebe02..ed20b79 100644
--- a/meshai/central/consumer.py
+++ b/meshai/central/consumer.py
@@ -64,8 +64,18 @@ def _subjects_for(adapter: str, region: Optional[str]) -> list[str]:
- region BEFORE the wildcard (nws):
central.wx.alert.us.id.>
- - region AFTER the wildcard (usgs hydro only):
- central.hydro.>.us.id (+ ".unknown" workaround, see below)
+ - USGS NWIS hydro — three single-token wildcards + bare region tail:
+ central.hydro.*.*.*.us.id (per-state, e.g. Idaho)
+ central.hydro.*.*.*.unknown (gauges whose state Central
+ couldn't resolve; documented
+ workaround until backfill)
+ Per Central v0.10.0 nwis.py producer code, the actual published
+ subject is `central.hydro....` where
+ is `us.` (7 tokens) or `unknown` (6 tokens). The
+ doc §nwis text shows only the 4-token category-shape stem and is
+ stale w.r.t. the regional suffix. v0.5.7-water fixes the
+ pre-v0.5.7-water `central.hydro.>.` shape, which was
+ invalid NATS (`>` mid-subject).
- USGS quake — no region in subject (per Central v0.10.0 guide §usgs_quake):
central.quake.event.
4 tokens total. is one of {minor, light, moderate, strong,
@@ -106,8 +116,9 @@ def _subjects_for(adapter: str, region: Optional[str]) -> list[str]:
attribute to roads511 in meshai.
The .unknown workaround: v0.9.20 leaves USGS hydro events whose gauge
- state can't be inferred on `central.hydro.>.unknown`. Subscribing to
- both avoids losing those rows until v0.9.20.1 backfills the state tag.
+ state can't be inferred at the `central.hydro.*.*.*.unknown` subject
+ (6 tokens). Subscribing to both the per-state and the unknown filters
+ avoids losing those rows until the upstream NWIS state-tag backfill.
Empty/None region returns the bare-wildcard form (v0.5.3 behaviour).
Adapters without a Central equivalent (avalanche, ducting) return [].
@@ -131,8 +142,13 @@ def _subjects_for(adapter: str, region: Optional[str]) -> list[str]:
# region in the subject (per guide §usgs_quake). Same situation as
# FIRMS -- tail-only `>` is the legal form; client-side filters lat/lon.
"usgs_quake": ["central.quake.event.>"],
- "usgs": [f"central.hydro.>.{region}",
- "central.hydro.>.unknown"],
+ # USGS NWIS hydro: 3 single-token wildcards for ..
+ # + bare region tail. Pre-v0.5.7-water shipped `central.hydro.>.`
+ # which is invalid NATS (`>` only legal at the tail). Verified against
+ # the v0.10.0-itd-511 nwis.py producer subject_for() body which
+ # publishes `central.hydro....`.
+ "usgs": [f"central.hydro.*.*.*.{region}",
+ "central.hydro.*.*.*.unknown"],
"swpc": ["central.space.>"],
# Convention B (bare state) — shared by traffic family (wzdx,
# tomtom_incidents, state_511_atis). Single-token `*` matches the
diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py
index 025a3ce..3cecc6b 100644
--- a/meshai/notifications/categories.py
+++ b/meshai/notifications/categories.py
@@ -303,7 +303,27 @@ ALERT_CATEGORIES = {
"toggle": "seismic",
},
- # Environmental - Flood
+ # Environmental - Flood / Water (Geohazards family per v0.5.2 migration)
+ # v0.5.7-water audit (test_alert_categories_water_complete enforces parity):
+ # Native: usgs.py applies NWPS flood-stage thresholds CLIENT-SIDE and emits
+ # - stream_flood_warning (reading at/above flood stage)
+ # - stream_high_water (reading at action stage)
+ # Routine gauge readings below action stage are silently dropped on the
+ # native path (no spam).
+ # Central path: every NWIS reading arrives with category="hydro..
+ # ." at severity=0. consumer._CATEGORY_MAP maps `hydro.*` to
+ # `stream_flow` (added below in v0.5.7-water so the rule editor can target
+ # raw central readings). NOTE: meshai does NOT currently re-apply NWPS
+ # threshold logic to central-delivered readings; future work to bring
+ # the central path to parity with the native threshold-classification
+ # is queued for v0.5.8+.
+ "stream_flow": {
+ "name": "Stream Gauge Reading",
+ "description": "Raw USGS NWIS stream gauge reading (discharge, gage height, water temp) — no threshold classification. Use stream_flood_warning / stream_high_water for threshold-triggered alerts.",
+ "default_severity": "routine",
+ "example_message": "🌊 Stream Reading: Snake River nr Twin Falls — 7,420 ft³/s discharge at 2026-06-04T12:00Z (provisional).",
+ "toggle": "seismic",
+ },
"stream_flood_warning": {
"name": "Stream Flood Warning",
"description": "River gauge exceeds NWS flood stage threshold",
diff --git a/tests/test_central_region_routing.py b/tests/test_central_region_routing.py
index fb4c0f3..a8b7bf2 100644
--- a/tests/test_central_region_routing.py
+++ b/tests/test_central_region_routing.py
@@ -74,12 +74,17 @@ def test_subjects_for_roads511_dual_subscribes_convention_a_and_b():
def test_subjects_for_usgs_includes_unknown_workaround():
- """USGS hydro: subscribes to BOTH the region-tagged filter and the
- ".unknown" filter to cover gauges whose state Central v0.9.20 can't
- infer yet (workaround until v0.9.20.1 backfills the tag)."""
+ """v0.5.7-water: USGS NWIS hydro subscribes to BOTH the region-tagged
+ filter and the .unknown filter. Per the v0.10.0-itd-511 nwis.py
+ producer, the actual published subject is
+ `central.hydro....` where is
+ either `us.` (7 tokens) or `unknown` (6 tokens). The
+ pre-v0.5.7-water shape `central.hydro.>.` was invalid NATS
+ (`>` mid-subject). Fixed by using three single-token `*` wildcards
+ in the parameter/agency/site slots."""
assert _subjects_for("usgs", "us.id") == [
- "central.hydro.>.us.id",
- "central.hydro.>.unknown",
+ "central.hydro.*.*.*.us.id",
+ "central.hydro.*.*.*.unknown",
]
diff --git a/tests/test_water_v057.py b/tests/test_water_v057.py
new file mode 100644
index 0000000..899e255
--- /dev/null
+++ b/tests/test_water_v057.py
@@ -0,0 +1,197 @@
+"""v0.5.7-water: USGS NWIS hydro NATS pattern + water/hydro categories audit.
+
+Covers two things shipped in v0.5.7-water:
+
+1. USGS NWIS hydro subject pattern -- per Central v0.10.0-itd-511 nwis.py
+ producer subject_for() body, the actual published subject is
+ `central.hydro....` where is
+ `us.` (7 tokens) or `unknown` (6 tokens). The pre-v0.5.7-water
+ `central.hydro.>.us.id` was invalid NATS (`>` mid-subject) -- replaced
+ with three single-token `*` wildcards in the param/agency/site slots
+ plus the bare region tail.
+
+ Note on guide vs code: the Central guide §nwis text shows only the
+ 4-token category-shape stem `central.hydro...
+ ` without the regional suffix. That doc text is stale
+ w.r.t. the producer code. The producer code is the ground truth (it's
+ what NATS actually delivers); we follow the code.
+
+2. ALERT_CATEGORIES water/hydro audit -- pre-v0.5.7-water the registry had
+ `stream_flood_warning` and `stream_high_water` (both toggle=seismic from
+ the v0.5.2 geohazards migration). The central path's
+ `("hydro.", "stream_flow")` _CATEGORY_MAP entry produced a category
+ `stream_flow` that had no registry entry -- the rule editor couldn't
+ target it. Added `stream_flow` (toggle=seismic) so central-delivered
+ raw gauge readings are UI-selectable. The native usgs.py threshold-
+ classified categories are unchanged.
+"""
+
+import inspect
+import re
+
+import pytest
+
+from meshai.central.consumer import (
+ _SUBJECTS_BARE,
+ _subjects_for,
+ map_category,
+ map_severity,
+)
+from meshai.notifications.categories import ALERT_CATEGORIES
+
+
+def _assert_legal_nats(subject: str) -> None:
+ tokens = subject.split(".")
+ if ">" in tokens:
+ assert tokens[-1] == ">", f"`>` not at tail in {subject!r}"
+ assert tokens.count(">") == 1, f"multiple `>` in {subject!r}"
+ for tok in tokens:
+ assert tok, f"empty token in {subject!r}"
+ if tok not in {"*", ">"}:
+ assert "*" not in tok and ">" not in tok, f"mixed wildcard in token {tok!r}"
+
+
+# ---------- FIX 1: USGS NWIS hydro subject pattern ------------------------
+
+
+def test_usgs_subjects_are_nats_legal():
+ """No `>` mid-subject; all wildcards are single-token `*`."""
+ subs = _subjects_for("usgs", "us.id")
+ assert subs == [
+ "central.hydro.*.*.*.us.id",
+ "central.hydro.*.*.*.unknown",
+ ]
+ for s in subs:
+ _assert_legal_nats(s)
+ # Per-state filter has 7 tokens; .unknown has 6.
+ assert ">" not in s, f"`>` should not appear in fixed-token form: {s!r}"
+
+
+def test_usgs_subjects_match_producer_published_shape():
+ """Sanity: the subscription patterns match what nwis.py actually
+ publishes. Producer publishes:
+ central.hydro....
+ where is us. (2 tokens) or unknown (1 token).
+ """
+ sub_state, sub_unknown = _subjects_for("usgs", "us.id")
+ # Per-state form: matches a 7-token published subject.
+ sample_published_state = "central.hydro.00060.usgs.06898000.us.id"
+ sample_published_unknown = "central.hydro.00060.usgs.06898000.unknown"
+ # Token-count check (NATS `*` matches exactly one token).
+ assert len(sub_state.split(".")) == len(sample_published_state.split("."))
+ assert len(sub_unknown.split(".")) == len(sample_published_unknown.split("."))
+ # Per-state must end with the requested region, .unknown with literal.
+ assert sub_state.endswith(".us.id")
+ assert sub_unknown.endswith(".unknown")
+
+
+def test_usgs_bare_form_unchanged():
+ """Empty region falls back to the bare wildcard (backward compat)."""
+ assert _subjects_for("usgs", "") == ["central.hydro.>"]
+ assert _subjects_for("usgs", None) == ["central.hydro.>"]
+
+
+def test_usgs_per_state_filter_does_not_match_wrong_state():
+ """Sanity: a Montana-region subscription wouldn't match an Idaho subject.
+ (Just verifies the substitution flows through cleanly per region.)"""
+ mt_subs = _subjects_for("usgs", "us.mt")
+ assert mt_subs == [
+ "central.hydro.*.*.*.us.mt",
+ "central.hydro.*.*.*.unknown",
+ ]
+
+
+# ---------- FIX 2: ALERT_CATEGORIES water/hydro audit ---------------------
+
+
+def test_stream_flow_in_registry():
+ """v0.5.7-water: central path's `hydro.* -> stream_flow` mapping now has
+ a corresponding ALERT_CATEGORIES entry under toggle='seismic'."""
+ assert "stream_flow" in ALERT_CATEGORIES
+ assert ALERT_CATEGORIES["stream_flow"]["toggle"] == "seismic"
+ assert ALERT_CATEGORIES["stream_flow"]["default_severity"] == "routine"
+
+
+def test_existing_hydro_entries_unchanged():
+ """v0.5.2 USGS-water -> toggle='seismic' migration must survive."""
+ for cat in ("stream_flood_warning", "stream_high_water"):
+ assert cat in ALERT_CATEGORIES
+ assert ALERT_CATEGORIES[cat]["toggle"] == "seismic"
+
+
+def _native_emitted_water_categories() -> set[str]:
+ """Walk usgs.py for category= literals routing to toggle=seismic."""
+ from meshai.env import usgs as usgs_mod
+ src = inspect.getsource(usgs_mod)
+ emitted = set(re.findall(r'category\s*=\s*"([a-z_]+)"', src))
+ return {c for c in emitted if c in ALERT_CATEGORIES
+ and ALERT_CATEGORIES[c].get("toggle") == "seismic"}
+
+
+def _central_path_water_categories() -> set[str]:
+ """Map a representative set of central hydro category strings through
+ map_category() to see what meshai categories we'd emit downstream.
+ Per the guide §nwis, every NWIS event has category
+ `hydro...`."""
+ central_inputs = [
+ "hydro.00060.usgs.06898000", # discharge
+ "hydro.00065.usgs.06898000", # gage height
+ "hydro.00010.usgs.06898000", # water temperature
+ "hydro.00060.mo005.0000123", # cooperator agency
+ ]
+ return {map_category(c) for c in central_inputs}
+
+
+def test_alert_categories_water_complete():
+ """Native + central-path water emit must equal registry's water-side
+ subset of toggle='seismic'. (The quake-side earthquake_event added in
+ v0.5.7-seismic is also under toggle='seismic' but emitted by a
+ different adapter — exclude it from this water-only audit.)"""
+ registry_water = {
+ cid for cid, info in ALERT_CATEGORIES.items()
+ if info.get("toggle") == "seismic"
+ and (cid.startswith("stream_") or cid == "stream_flow")
+ }
+ native = _native_emitted_water_categories()
+ central = _central_path_water_categories()
+ emitted = native | central
+ missing = emitted - registry_water
+ orphans = registry_water - emitted
+ assert not missing, f"water emit set missing from ALERT_CATEGORIES: {missing}"
+ assert not orphans, f"ALERT_CATEGORIES has orphan water entries: {orphans}"
+
+
+def test_native_threshold_categories_still_emitted():
+ """Spot-check that usgs.py still has the two threshold-classified
+ categories (regression guard against accidental removal)."""
+ native = _native_emitted_water_categories()
+ assert "stream_flood_warning" in native
+ assert "stream_high_water" in native
+
+
+def test_central_hydro_pcode_strings_all_map_to_stream_flow():
+ """Every realistic central hydro category collapses to stream_flow
+ via the catchall `("hydro.", "stream_flow")` _CATEGORY_MAP entry."""
+ for pcode in ("00060", "00065", "00010", "00045", "00095"):
+ assert map_category(f"hydro.{pcode}.usgs.12345678") == "stream_flow"
+
+
+@pytest.mark.parametrize(
+ "cat", ["stream_flow", "stream_flood_warning", "stream_high_water"],
+)
+def test_water_categories_have_required_fields(cat):
+ info = ALERT_CATEGORIES[cat]
+ assert info["toggle"] == "seismic"
+ assert info["name"]
+ assert info["description"]
+ assert info["default_severity"] in {"routine", "priority", "immediate"}
+ assert info["example_message"]
+
+
+# ---------- Severity sanity for central NWIS events -----------------------
+
+
+def test_central_nwis_severity_zero_routes_to_routine():
+ """Central NWIS publishes severity=0 (no threshold classification).
+ Confirm that becomes 'routine' in meshai's three-level scale."""
+ assert map_severity(0) == "routine"