mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-11 01:14:45 +02:00
Fifth family of the v0.5.7 NATS-and-categories campaign. Water/hydro = USGS NWIS adapter (Central calls it `nwis`, meshai calls it `usgs` via CENTRAL_ADAPTER_TO_SOURCE remap). USGS quake stays in the seismic phase that already shipped; this phase fixes the hydro/water subscription and audits the water-side of the seismic toggle.
The Central v0.10.0 consumer integration guide was the starting point but the §nwis section text turned out to be STALE w.r.t. the regional subject suffix. Ground-truthed against the v0.10.0-itd-511 nwis.py producer subject_for() body and used the CODE as the source of truth. Documented below.
FIX 1 -- USGS NWIS hydro NATS pattern. Pre-v0.5.7-water `_subjects_for("usgs","us.id")` returned `["central.hydro.>.us.id", "central.hydro.>.unknown"]`. Both subjects are invalid NATS (`>` is only legal at the tail token).
The producer code at v0.10.0-itd-511 src/central/adapters/nwis.py:223 publishes:
central.hydro.<param>.<agency>.<site>.<region>
where <region> is either `us.<state>` (2 tokens) or `unknown` (1 token). So the live subjects on the broker are 7 tokens (per-state) or 6 tokens (unknown). The doc §nwis section text shows only the 4-token category stem `central.hydro.<parameter_code>.<agency>.<bare_site_no>` -- that text is stale; it predates the regional-routing roll-out.
Fixed by using three single-token `*` wildcards in the param/agency/site slots plus the bare region tail. Preserves the v0.5.4 INTENT (server-side regional filtering + .unknown workaround for gauges whose state Central can't resolve) while restoring NATS syntax legality:
central.hydro.*.*.*.us.id (7 tokens, per-state)
central.hydro.*.*.*.unknown (6 tokens, .unknown workaround)
Bare-form fallback (`central.hydro.>`) is unchanged for empty/None region (pre-v0.5.3 backward compat path).
FIX 2 -- ALERT_CATEGORIES water/hydro audit. Pre-v0.5.7-water registry had `stream_flood_warning` and `stream_high_water` (both toggle="seismic" from the v0.5.2 USGS-water -> Geohazards migration). Audit findings:
- Native usgs.py applies NWPS flood-stage thresholds client-side and emits
`stream_flood_warning` (reading at/above flood stage) or `stream_high_water`
(Action Stage reading). Routine gauge readings below action stage are
silently dropped on the native path (no spam).
- Central path: every NWIS reading arrives with category=`hydro.<pcode>.
<agency>.<site>` at severity=0. consumer._CATEGORY_MAP maps `hydro.*`
to `stream_flow` (added in earlier work). But `stream_flow` was MISSING
from ALERT_CATEGORIES -- routing worked via the `("stream", "seismic")`
prefix fallback, but the Advanced Rules editor couldn't target raw
central-delivered gauge readings.
Added `stream_flow` to ALERT_CATEGORIES under toggle="seismic", default_severity="routine", with an example_message that reflects the raw-reading shape. The existing `stream_flood_warning` / `stream_high_water` entries are unchanged.
NOTE on parity gap (deferred to v0.5.8+): meshai does NOT currently re-apply NWPS threshold logic to central-delivered NWIS readings. So flipping `usgs.feed_source=central` today produces a stream of routine `stream_flow` events without the flood-stage classification the native path provides. Bringing the central path to parity (apply threshold logic AFTER receiving central-delivered raw readings) is queued as future work -- intentionally out of scope here per Matt's one-fix-per-family rule.
Audit table after v0.5.7-water:
Native emit: stream_flood_warning, stream_high_water (threshold-triggered)
Central path: every hydro.* -> stream_flow (routine; no threshold)
Registry: {stream_flow, stream_flood_warning, stream_high_water} (toggle=seismic)
Quake side: earthquake_event (toggle=seismic, added v0.5.7-seismic) -- unchanged
Parity confirmed. No orphans, no missing.
Tests
-----
PYTHONPATH=. pytest -q: 413 passed (was 400; +13 net).
- tests/test_water_v057.py (new): usgs subscription is NATS-legal (no `>` anywhere, all single-token `*`); token-count matches the producer-published shape (7-token us-state + 6-token .unknown); per-region substitution (Montana sanity); bare-form backward compat; `stream_flow` present under toggle="seismic"; `stream_flood_warning` / `stream_high_water` unchanged; native + central emit set matches registry water-side subset; threshold categories still emitted by usgs.py; all realistic central pcodes fold to `stream_flow`; required-fields check; severity=0 -> "routine" sanity.
- tests/test_central_region_routing.py: updated `test_subjects_for_usgs_includes_unknown_workaround` to reflect the v0.5.7-water fix (single-token `*` wildcards instead of mid-subject `>`).
Safe-mode preserved (master off, all family toggles off, all adapters native, central disabled). No live toggle flipped. Not tagging yet -- v0.5.7 tag waits until all families ship.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
121 lines
5.6 KiB
Python
121 lines
5.6 KiB
Python
"""v0.5.4: Central v0.9.20 region-aware subject building.
|
|
|
|
Exercises `_subjects_for(adapter, region)` and the wiring through
|
|
`CentralConsumer._subject_owned()`. The spec is hard-coded in the test
|
|
strings on purpose so a future drift in the v0.9.20 subject scheme
|
|
fails noisily here instead of silently shipping wrong filters.
|
|
"""
|
|
|
|
from meshai.central.consumer import (
|
|
CentralConsumer,
|
|
_subjects_for,
|
|
_SUBJECTS_BARE,
|
|
)
|
|
from meshai.config import EnvironmentalConfig
|
|
|
|
|
|
# --------------------------------------------------------------------- per-adapter
|
|
|
|
def test_subjects_for_nws_us_id():
|
|
"""NWS: region BEFORE wildcard (matches alert.<region>.<...>)."""
|
|
assert _subjects_for("nws", "us.id") == ["central.wx.alert.us.id.>"]
|
|
|
|
|
|
def test_subjects_for_usgs_quake_us_id_uses_tail_only_wildcard():
|
|
"""v0.5.7-seismic: USGS quake publishes `central.quake.event.<tier>` with
|
|
NO region in the subject (per Central v0.10.0 guide §usgs_quake; same
|
|
situation as FIRMS). The pre-v0.5.7-seismic `central.quake.event.>.us.id`
|
|
was syntactically invalid (`>` mid-subject) AND wouldn't have matched
|
|
anything Central publishes (only 4 tokens, no us.<state>). Region
|
|
filtering for quakes now happens client-side via data.latitude/longitude.
|
|
Subscription uses tail-only `>` (NATS-legal)."""
|
|
assert _subjects_for("usgs_quake", "us.id") == ["central.quake.event.>"]
|
|
|
|
|
|
def test_subjects_for_firms_us_id_uses_tail_only_wildcard():
|
|
"""v0.5.7-fire: FIRMS publishes `central.fire.hotspot.<satellite>.<confidence>`
|
|
with NO region in the subject (per Central v0.10.0 guide §firms). The
|
|
pre-v0.5.7-fire `central.fire.hotspot.>.us.id` was syntactically invalid
|
|
(`>` mid-subject) AND wouldn't have matched anything Central actually
|
|
publishes. Region filtering for FIRMS now happens client-side via
|
|
data.latitude/longitude. Subscription uses tail-only `>` (NATS-legal)."""
|
|
assert _subjects_for("firms", "us.id") == ["central.fire.hotspot.>"]
|
|
|
|
|
|
def test_subjects_for_fires_us_id_includes_tombstones():
|
|
"""v0.5.7-fire: WFIGS subjects -- active state-token at depth-3 + the
|
|
removal-tombstone subjects (`central.fire.{incident,perimeter}.removed.<state>`)
|
|
per Central v0.10.0 guide §wfigs_incidents §wfigs_perimeters. Pre-v0.5.7-fire
|
|
we only subscribed to active subjects, silently dropping fall-off signals."""
|
|
assert _subjects_for("fires", "us.id") == [
|
|
"central.fire.incident.id.>",
|
|
"central.fire.perimeter.id.>",
|
|
"central.fire.incident.removed.id",
|
|
"central.fire.perimeter.removed.id",
|
|
]
|
|
|
|
|
|
def test_subjects_for_traffic_uses_convention_b():
|
|
"""v0.5.7-traffic: traffic adapter -> bare-state Convention B with `*`
|
|
in the event_type slot. Pre-v0.5.7-traffic this was `>.{state}` which
|
|
is invalid NATS (`>` must be at the tail). The bare-state subject is
|
|
shared with roads511 (sub-adapter routing picks the right meshai source)."""
|
|
assert _subjects_for("traffic", "us.id") == ["central.traffic.*.id"]
|
|
|
|
|
|
def test_subjects_for_roads511_dual_subscribes_convention_a_and_b():
|
|
"""v0.5.7-traffic: roads511 owns BOTH the shared bare-state subject
|
|
(Convention B, shared with traffic) AND the us.<state> subject
|
|
(Convention A) where the new Idaho-only itd_511 adapter publishes."""
|
|
assert _subjects_for("roads511", "us.id") == [
|
|
"central.traffic.*.id",
|
|
"central.traffic.*.us.id",
|
|
]
|
|
|
|
|
|
def test_subjects_for_usgs_includes_unknown_workaround():
|
|
"""v0.5.7-water: USGS NWIS hydro subscribes to BOTH the region-tagged
|
|
filter and the .unknown filter. Per the v0.10.0-itd-511 nwis.py
|
|
producer, the actual published subject is
|
|
`central.hydro.<param>.<agency>.<site>.<region>` where <region> is
|
|
either `us.<state>` (7 tokens) or `unknown` (6 tokens). The
|
|
pre-v0.5.7-water shape `central.hydro.>.<state>` was invalid NATS
|
|
(`>` mid-subject). Fixed by using three single-token `*` wildcards
|
|
in the parameter/agency/site slots."""
|
|
assert _subjects_for("usgs", "us.id") == [
|
|
"central.hydro.*.*.*.us.id",
|
|
"central.hydro.*.*.*.unknown",
|
|
]
|
|
|
|
|
|
def test_subjects_for_swpc_stays_global():
|
|
"""SWPC: space weather is planetary; region argument is ignored."""
|
|
assert _subjects_for("swpc", "us.id") == ["central.space.>"]
|
|
assert _subjects_for("swpc", "us.mt") == ["central.space.>"] # same regardless
|
|
assert _subjects_for("swpc", "") == ["central.space.>"]
|
|
|
|
|
|
# --------------------------------------------------------------------- backward compat
|
|
|
|
def test_subjects_for_empty_region_falls_back_to_bare_wildcards():
|
|
"""Empty/None region = pre-v0.9.20 behaviour for every adapter, byte-identical
|
|
to the legacy _SUBJECTS_BARE map. Adapters absent from the map return []."""
|
|
for adapter, expected in _SUBJECTS_BARE.items():
|
|
assert _subjects_for(adapter, "") == expected, f"empty region mismatch for {adapter}"
|
|
assert _subjects_for(adapter, None) == expected, f"None region mismatch for {adapter}"
|
|
# Unknown adapters return empty regardless of region.
|
|
assert _subjects_for("ducting", "us.id") == []
|
|
assert _subjects_for("avalanche", "") == []
|
|
|
|
|
|
# --------------------------------------------------------------------- integration
|
|
|
|
def test_central_region_default_propagates_to_consumer_subjects():
|
|
"""Default region = 'us.id': flipping nws to central → consumer subscribes
|
|
to the region-aware subject, not the bare wildcard."""
|
|
env = EnvironmentalConfig()
|
|
assert env.central.region == "us.id" # spec default
|
|
env.nws.feed_source = "central"
|
|
so = CentralConsumer(env, None)._subject_owned()
|
|
assert list(so.keys()) == ["central.wx.alert.us.id.>"]
|
|
assert so["central.wx.alert.us.id.>"] == {"nws"}
|