From 6ea7bd70f124bc321b480b3bc69be8ebbf9dc227 Mon Sep 17 00:00:00 2001 From: malice Date: Wed, 27 May 2026 23:50:30 -0600 Subject: [PATCH] v0.9.20: regional subject routing on quake / fire / hydro / disaster adapters * v0.9.20: regional subject routing on quake / fire / hydro / disaster adapters Adds regional subject tokens to four adapters that previously published without location-based routing: - usgs_quake: central.quake.event..us. (US) or . (intl) - firms: central.fire.hotspot...us. or . - nwis: central.hydro....us. (always US) - eonet: central.disaster.eonet.. (replaces hardcoded .global) The us. pattern (two tokens for US events) matches the NWS precedent and resolves the ISO-2 collision between Idaho (id) and Indonesia. New shared helper module: src/central/adapters/_subject_helpers.py - US_STATE_NAME_TO_CODE: 50 states + DC + territories - subject_for_country(): normalized country token - subject_for_region(): returns us., , or unknown gdacs.py refactored to import subject_for_country from shared module. Fixes: meshai v0.4 Phase C.3 bug (M4.1 Nevada quake routed globally) Co-Authored-By: Claude Opus 4.5 * v0.9.20: stale docstring nit on renamed test Co-Authored-By: Claude Opus 4.5 --------- Co-authored-by: Matt Johnson Co-authored-by: Claude Opus 4.5 --- src/central/adapters/_subject_helpers.py | 155 ++++++++++++++++++++ src/central/adapters/eonet.py | 13 +- src/central/adapters/firms.py | 8 +- src/central/adapters/gdacs.py | 11 +- src/central/adapters/nwis.py | 11 +- src/central/adapters/usgs_quake.py | 10 +- tests/test_eonet.py | 14 +- tests/test_firms.py | 4 +- tests/test_nwis.py | 6 +- tests/test_subject_helpers.py | 177 +++++++++++++++++++++++ tests/test_usgs_quake.py | 12 +- 11 files changed, 383 insertions(+), 38 deletions(-) create mode 100644 src/central/adapters/_subject_helpers.py create mode 100644 tests/test_subject_helpers.py diff --git a/src/central/adapters/_subject_helpers.py b/src/central/adapters/_subject_helpers.py new file mode 100644 index 0000000..411e09d --- /dev/null +++ b/src/central/adapters/_subject_helpers.py @@ -0,0 +1,155 @@ +"""Shared helpers for regional subject routing (v0.9.20). + +Provides subject-token builders for adapters that route events by +state (US) or country (international). The us. / +pattern matches the NWS precedent and avoids ISO-2 collisions +(e.g., 'id' = Idaho vs Indonesia). +""" + +from typing import Any + +# US state name -> 2-letter code mapping. Photon returns full names; +# USGS site metadata returns full names. This is the canonical lookup. +# Includes 50 states + DC + territories (PR, VI, GU, AS, MP). +US_STATE_NAME_TO_CODE: dict[str, str] = { + "alabama": "al", + "alaska": "ak", + "arizona": "az", + "arkansas": "ar", + "california": "ca", + "colorado": "co", + "connecticut": "ct", + "delaware": "de", + "florida": "fl", + "georgia": "ga", + "hawaii": "hi", + "idaho": "id", + "illinois": "il", + "indiana": "in", + "iowa": "ia", + "kansas": "ks", + "kentucky": "ky", + "louisiana": "la", + "maine": "me", + "maryland": "md", + "massachusetts": "ma", + "michigan": "mi", + "minnesota": "mn", + "mississippi": "ms", + "missouri": "mo", + "montana": "mt", + "nebraska": "ne", + "nevada": "nv", + "new hampshire": "nh", + "new jersey": "nj", + "new mexico": "nm", + "new york": "ny", + "north carolina": "nc", + "north dakota": "nd", + "ohio": "oh", + "oklahoma": "ok", + "oregon": "or", + "pennsylvania": "pa", + "rhode island": "ri", + "south carolina": "sc", + "south dakota": "sd", + "tennessee": "tn", + "texas": "tx", + "utah": "ut", + "vermont": "vt", + "virginia": "va", + "washington": "wa", + "west virginia": "wv", + "wisconsin": "wi", + "wyoming": "wy", + # DC + "district of columbia": "dc", + "washington dc": "dc", + "washington d.c.": "dc", + # Territories + "puerto rico": "pr", + "virgin islands": "vi", + "u.s. virgin islands": "vi", + "guam": "gu", + "american samoa": "as", + "northern mariana islands": "mp", +} + +# Exact country strings that route to the US branch. +# Excludes "United States Virgin Islands" etc. which have their own territory codes. +_US_COUNTRY_NAMES = frozenset({"united states", "united states of america"}) + + +def subject_for_country(country: str | None) -> str: + """Lowercase, hyphenate spaces. 'unknown' for missing/empty. + + Takes first segment if comma-separated (handles Photon multi-value). + Extracted from gdacs.py for shared use. + """ + if not country: + return "unknown" + first = country.split(",")[0].strip() + if not first: + return "unknown" + return first.lower().replace(" ", "-") + + +def _state_name_to_code(state_name: str | None) -> str | None: + """Convert full state name to 2-letter code. Returns None if not found.""" + if not state_name: + return None + normalized = state_name.strip().lower() + # Direct lookup + if normalized in US_STATE_NAME_TO_CODE: + return US_STATE_NAME_TO_CODE[normalized] + # Already a 2-letter code? Validate against known codes. + if len(normalized) == 2 and normalized in US_STATE_NAME_TO_CODE.values(): + return normalized + return None + + +def _country_from_geocoder(event_data: dict[str, Any]) -> str | None: + """Extract country from geocoder enrichment, null-safe. + + Handles all null-bundle variants: _enriched missing, geocoder missing, + geocoder=None, country missing. + """ + enriched = event_data.get("_enriched") or {} + geocoder = enriched.get("geocoder") or {} + return geocoder.get("country") + + +def subject_for_region(event_data: dict[str, Any]) -> str: + """Build the regional subject token from enriched geocoder data. + + Returns: + "us." for US events with resolved state (e.g., "us.id") + "us.unknown" for US events with unresolved state + "" for international events (e.g., "japan", "mexico") + "unknown" if no country signal at all + + The distinction: "unknown" = geocoder returned nothing; "us.unknown" = + known US event but state lookup failed. Consumers subscribing to + central.>.us.> will match us.unknown but not bare unknown. + + Reads from event.data["_enriched"]["geocoder"] which is populated + by the Photon enrichment pipeline. + """ + enriched = event_data.get("_enriched") or {} + geocoder = enriched.get("geocoder") or {} + + country = geocoder.get("country") + state = geocoder.get("state") + + # US event: use us. pattern + if country and country.lower().strip() in _US_COUNTRY_NAMES: + state_code = _state_name_to_code(state) + if state_code: + return f"us.{state_code}" + return "us.unknown" + + # International event: use country token + if country: + return subject_for_country(country) + + return "unknown" diff --git a/src/central/adapters/eonet.py b/src/central/adapters/eonet.py index d759f69..36d9226 100644 --- a/src/central/adapters/eonet.py +++ b/src/central/adapters/eonet.py @@ -22,6 +22,7 @@ from central.adapter import SourceAdapter from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo +from central.adapters._subject_helpers import subject_for_country, _country_from_geocoder logger = logging.getLogger(__name__) @@ -222,12 +223,20 @@ class EONETAdapter(SourceAdapter): ) def subject_for(self, event: Event) -> str: + """Compute NATS subject for an EONET event. + + Subject format: central.disaster.eonet.. + EONET is mostly international; uses country token (not state). + """ # event.category is "disaster.eonet.[.removed]" parts = event.category.split(".") subj_cat = parts[2] if len(parts) >= 3 else "unknown" + country = subject_for_country( + _country_from_geocoder(event.data) + ) if len(parts) >= 4 and parts[-1] == "removed": - return f"central.disaster.eonet.{subj_cat}.removed.global" - return f"central.disaster.eonet.{subj_cat}.global" + return f"central.disaster.eonet.{subj_cat}.removed.{country}" + return f"central.disaster.eonet.{subj_cat}.{country}" @retry( stop=stop_after_attempt(3), diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 2fda0fb..5d7cbc0 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -23,6 +23,7 @@ from pydantic import BaseModel from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo +from central.adapters._subject_helpers import subject_for_region logger = logging.getLogger(__name__) @@ -139,10 +140,11 @@ class FIRMSAdapter(SourceAdapter): def subject_for(self, event: Event) -> str: """Compute NATS subject for a fire hotspot event. - Subject format: central.fire.hotspot.. - The category already contains this structure. + Subject format: central.fire.hotspot... + Region: us. for US events, for intl, unknown if missing. """ - return f"central.{event.category}" + region = subject_for_region(event.data) + return f"central.{event.category}.{region}" async def startup(self) -> None: diff --git a/src/central/adapters/gdacs.py b/src/central/adapters/gdacs.py index 53475ce..907984e 100644 --- a/src/central/adapters/gdacs.py +++ b/src/central/adapters/gdacs.py @@ -22,6 +22,7 @@ from central.adapter import SourceAdapter from central.config_models import AdapterConfig from central.config_store import ConfigStore from central.models import Event, Geo +from central.adapters._subject_helpers import subject_for_country logger = logging.getLogger(__name__) @@ -46,16 +47,6 @@ def severity_from_alertlevel(level: str | None) -> int: return _ALERTLEVEL_TO_SEVERITY.get(level.strip().capitalize(), 0) -def subject_for_country(country: str | None) -> str: - """Lowercase, hyphenate spaces. 'unknown' for missing/empty. Takes first if comma-separated.""" - if not country: - return "unknown" - first = country.split(",")[0].strip() - if not first: - return "unknown" - return first.lower().replace(" ", "-") - - def parse_rfc822_utc(raw: str | None) -> datetime | None: """Parse an RFC 822 datetime string to UTC datetime.""" if not raw: diff --git a/src/central/adapters/nwis.py b/src/central/adapters/nwis.py index 435a6e2..2c3667c 100644 --- a/src/central/adapters/nwis.py +++ b/src/central/adapters/nwis.py @@ -28,6 +28,7 @@ from central.adapters.nwis_enrich import ( from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo +from central.adapters._subject_helpers import subject_for_region logger = logging.getLogger(__name__) @@ -220,11 +221,17 @@ class NWISAdapter(SourceAdapter): ) def subject_for(self, event: Event) -> str: + """Compute NATS subject for a water data event. + + Subject format: central.hydro.... + NWIS is always US (USGS data), so region is us. or unknown. + """ # event.category is "hydro..." parts = event.category.split(".") + region = subject_for_region(event.data) if len(parts) >= 4: - return f"central.hydro.{parts[1]}.{parts[2]}.{parts[3]}" - return "central.hydro.unknown.unknown.unknown" + return f"central.hydro.{parts[1]}.{parts[2]}.{parts[3]}.{region}" + return f"central.hydro.unknown.unknown.unknown.{region}" def _initial_url(self, parameter_code: str) -> str: params: dict[str, str] = { diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 28bd973..2dac629 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -22,6 +22,7 @@ from pydantic import BaseModel from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo +from central.adapters._subject_helpers import subject_for_region logger = logging.getLogger(__name__) @@ -379,7 +380,10 @@ class USGSQuakeAdapter(SourceAdapter): logger.info("USGS quake yielded events", extra={"count": new_count}) def subject_for(self, event: Event) -> str: - """Return NATS subject for quake event.""" - return f"central.{event.category}" - + """Return NATS subject for quake event. + Subject format: central.quake.event.. + Region: us. for US events, for intl, unknown if missing. + """ + region = subject_for_region(event.data) + return f"central.{event.category}.{region}" diff --git a/tests/test_eonet.py b/tests/test_eonet.py index 324a147..3f80de7 100644 --- a/tests/test_eonet.py +++ b/tests/test_eonet.py @@ -54,7 +54,7 @@ class TestEONETHelpers: assert _subject_category(None) == "unknown" assert _subject_category("") == "unknown" - # Through subject_for: a category with no upstream component yields .unknown.global + # Through subject_for: a category with no upstream component yields .unknown.unknown adapter = EONETAdapter(_config(), MagicMock(), Path("/tmp/never_used.db")) event = Event( id="X", @@ -65,7 +65,7 @@ class TestEONETHelpers: geo=Geo(), data={}, ) - assert adapter.subject_for(event).endswith(".unknown.global") + assert adapter.subject_for(event).endswith(".unknown.unknown") def test_dedup_key_includes_latest_geometry_date(self): from central.adapters.eonet import _dedup_key @@ -148,8 +148,8 @@ class TestEONETAdapter: assert out_lat == lat_in, "second centroid element must equal upstream lat (no swap)" @pytest.mark.asyncio - async def test_country_always_global(self, tmp_path: Path): - """Every emitted event has subject suffix '.global' (no country resolution in v1).""" + async def test_country_unknown_when_no_geocoder(self, tmp_path: Path): + """No geocoder enrichment -> subject ends with '.unknown'.""" from central.adapters.eonet import EONETAdapter adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db") @@ -160,7 +160,7 @@ class TestEONETAdapter: assert events, "fixture should produce at least one emitted event" for e in events: - assert adapter.subject_for(e).endswith(".global"), e.category + assert adapter.subject_for(e).endswith(".unknown"), e.category @pytest.mark.asyncio async def test_magnitude_value_surfaced(self, tmp_path: Path): @@ -244,9 +244,9 @@ class TestEONETAdapter: # Subject pattern: subtype BEFORE 'removed' per ยง8 canonical pattern. # Subscriber filtering on central.disaster.eonet..> must match the - # removal subject central.disaster.eonet..removed.global. + # removal subject central.disaster.eonet..removed.unknown. expected_cat = _subject_category(first_event["categories"][0]["id"]) subj = adapter.subject_for(ts) assert subj.startswith(f"central.disaster.eonet.{expected_cat}.") assert ".removed." in subj - assert subj.endswith(".global") + assert subj.endswith(".unknown") diff --git a/tests/test_firms.py b/tests/test_firms.py index 6a974e2..aafead2 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -303,7 +303,7 @@ class TestSubjectGeneration: ) subject = adapter.subject_for(event) - assert subject == "central.fire.hotspot.viirs_snpp.high" + assert subject == "central.fire.hotspot.viirs_snpp.high.unknown" @pytest.mark.asyncio async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store): @@ -324,7 +324,7 @@ class TestSubjectGeneration: ) subject = adapter.subject_for(event) - assert subject == "central.fire.hotspot.viirs_noaa20.nominal" + assert subject == "central.fire.hotspot.viirs_noaa20.nominal.unknown" class TestUrlBuilding: diff --git a/tests/test_nwis.py b/tests/test_nwis.py index 90399d1..31def7b 100644 --- a/tests/test_nwis.py +++ b/tests/test_nwis.py @@ -51,7 +51,7 @@ class TestNWISHelpers: geo=Geo(), data={}, ) - assert adapter.subject_for(event).endswith(".00060.usgs.05420500") + assert adapter.subject_for(event).endswith(".00060.usgs.05420500.unknown") def test_subject_decomposes_non_usgs(self): """MO005-400105093591601 -> agency='mo005', bare='400105093591601'; subject reflects both.""" @@ -71,7 +71,7 @@ class TestNWISHelpers: geo=Geo(), data={}, ) - assert adapter.subject_for(event).endswith(".00060.mo005.400105093591601") + assert adapter.subject_for(event).endswith(".00060.mo005.400105093591601.unknown") def test_subject_unprefixed_id_falls_back(self): """ID with no dash falls back to agency='unknown'.""" @@ -93,7 +93,7 @@ class TestNWISHelpers: data={}, ) subj = adapter.subject_for(event) - assert subj.endswith(f".00060.unknown.{bare}") + assert subj.endswith(f".00060.unknown.{bare}.unknown") def test_dedup_key_composite(self): """Same id+param+time -> same key; different time -> different key.""" diff --git a/tests/test_subject_helpers.py b/tests/test_subject_helpers.py new file mode 100644 index 0000000..bbcd8a9 --- /dev/null +++ b/tests/test_subject_helpers.py @@ -0,0 +1,177 @@ +"""Tests for the _subject_helpers module (v0.9.20). + +Tests cover: +- US_STATE_NAME_TO_CODE mapping coverage +- subject_for_country normalization +- subject_for_region US/intl/unknown paths +""" + + +from central.adapters._subject_helpers import ( + US_STATE_NAME_TO_CODE, + subject_for_country, + subject_for_region, + _state_name_to_code, +) + + +class TestUSStateNameToCode: + """US state name mapping tests.""" + + def test_all_50_states_present(self): + """All 50 states should be in the mapping.""" + # Count unique 2-letter codes (excluding DC and territories) + state_codes = {v for k, v in US_STATE_NAME_TO_CODE.items() + if v not in ('dc', 'pr', 'vi', 'gu', 'as', 'mp')} + assert len(state_codes) == 50 + + def test_dc_present(self): + assert US_STATE_NAME_TO_CODE.get('district of columbia') == 'dc' + assert US_STATE_NAME_TO_CODE.get('washington dc') == 'dc' + + def test_territories_present(self): + assert US_STATE_NAME_TO_CODE.get('puerto rico') == 'pr' + assert US_STATE_NAME_TO_CODE.get('guam') == 'gu' + + def test_idaho(self): + assert US_STATE_NAME_TO_CODE.get('idaho') == 'id' + + def test_nevada(self): + assert US_STATE_NAME_TO_CODE.get('nevada') == 'nv' + + +class TestStateNameToCode: + """_state_name_to_code helper tests.""" + + def test_full_name_lookup(self): + assert _state_name_to_code('Idaho') == 'id' + assert _state_name_to_code('UTAH') == 'ut' + assert _state_name_to_code('new york') == 'ny' + + def test_already_2char_code(self): + """Some enrichers return 2-char codes, pass through.""" + assert _state_name_to_code('ID') == 'id' + assert _state_name_to_code('nv') == 'nv' + + def test_whitespace_handling(self): + assert _state_name_to_code(' Idaho ') == 'id' + + def test_unknown_name_returns_none(self): + assert _state_name_to_code('Narnia') is None + assert _state_name_to_code('') is None + assert _state_name_to_code(None) is None + + +class TestSubjectForCountry: + """subject_for_country normalization tests.""" + + def test_basic_country(self): + assert subject_for_country('Japan') == 'japan' + assert subject_for_country('Mexico') == 'mexico' + + def test_multi_word_hyphenated(self): + assert subject_for_country('United States') == 'united-states' + assert subject_for_country('New Zealand') == 'new-zealand' + + def test_comma_separated_takes_first(self): + """Photon may return multiple values.""" + assert subject_for_country('Japan, Asia') == 'japan' + + def test_empty_returns_unknown(self): + assert subject_for_country('') == 'unknown' + assert subject_for_country(None) == 'unknown' + assert subject_for_country(' ') == 'unknown' + + +class TestSubjectForRegion: + """subject_for_region integration tests.""" + + def test_us_event_with_state(self): + """US event returns us..""" + data = { + '_enriched': { + 'geocoder': { + 'country': 'United States', + 'state': 'Idaho', + } + } + } + assert subject_for_region(data) == 'us.id' + + def test_us_event_nevada(self): + """Nevada quake (the bug report example).""" + data = { + '_enriched': { + 'geocoder': { + 'country': 'United States', + 'state': 'Nevada', + } + } + } + assert subject_for_region(data) == 'us.nv' + + def test_us_event_with_2char_state(self): + """Some enrichers return 2-char codes.""" + data = { + '_enriched': { + 'geocoder': { + 'country': 'United States', + 'state': 'ID', + } + } + } + assert subject_for_region(data) == 'us.id' + + def test_us_event_no_state(self): + """US event with no state returns us.unknown (known US, unresolved state).""" + data = { + '_enriched': { + 'geocoder': { + 'country': 'United States', + 'state': None, + } + } + } + assert subject_for_region(data) == 'us.unknown' + + def test_intl_event_japan(self): + """International event returns country token.""" + data = { + '_enriched': { + 'geocoder': { + 'country': 'Japan', + 'state': 'Tokyo', # Ignored for intl + } + } + } + assert subject_for_region(data) == 'japan' + + def test_intl_event_multi_word(self): + data = { + '_enriched': { + 'geocoder': { + 'country': 'New Zealand', + } + } + } + assert subject_for_region(data) == 'new-zealand' + + def test_no_enriched_data(self): + """Missing enrichment returns unknown.""" + assert subject_for_region({}) == 'unknown' + assert subject_for_region({'_enriched': None}) == 'unknown' + assert subject_for_region({'_enriched': {}}) == 'unknown' + + def test_no_geocoder(self): + data = {'_enriched': {'other': 'stuff'}} + assert subject_for_region(data) == 'unknown' + + def test_empty_country(self): + data = { + '_enriched': { + 'geocoder': { + 'country': '', + } + } + } + assert subject_for_region(data) == 'unknown' diff --git a/tests/test_usgs_quake.py b/tests/test_usgs_quake.py index 690f7da..40ab52b 100644 --- a/tests/test_usgs_quake.py +++ b/tests/test_usgs_quake.py @@ -503,7 +503,7 @@ class TestSubjectFor: geo=Geo(centroid=(-116.0, 45.0)), data={}, ) - assert adapter.subject_for(event) == "central.quake.event.minor" + assert adapter.subject_for(event) == "central.quake.event.minor.unknown" @pytest.mark.asyncio async def test_subject_light(self, temp_db_path, mock_config_store): @@ -522,7 +522,7 @@ class TestSubjectFor: geo=Geo(centroid=(-116.0, 45.0)), data={}, ) - assert adapter.subject_for(event) == "central.quake.event.light" + assert adapter.subject_for(event) == "central.quake.event.light.unknown" @pytest.mark.asyncio async def test_subject_moderate(self, temp_db_path, mock_config_store): @@ -541,7 +541,7 @@ class TestSubjectFor: geo=Geo(centroid=(-116.0, 45.0)), data={}, ) - assert adapter.subject_for(event) == "central.quake.event.moderate" + assert adapter.subject_for(event) == "central.quake.event.moderate.unknown" @pytest.mark.asyncio async def test_subject_strong(self, temp_db_path, mock_config_store): @@ -560,7 +560,7 @@ class TestSubjectFor: geo=Geo(centroid=(-116.0, 45.0)), data={}, ) - assert adapter.subject_for(event) == "central.quake.event.strong" + assert adapter.subject_for(event) == "central.quake.event.strong.unknown" @pytest.mark.asyncio async def test_subject_major(self, temp_db_path, mock_config_store): @@ -579,7 +579,7 @@ class TestSubjectFor: geo=Geo(centroid=(-116.0, 45.0)), data={}, ) - assert adapter.subject_for(event) == "central.quake.event.major" + assert adapter.subject_for(event) == "central.quake.event.major.unknown" @pytest.mark.asyncio async def test_subject_great(self, temp_db_path, mock_config_store): @@ -598,4 +598,4 @@ class TestSubjectFor: geo=Geo(centroid=(-116.0, 45.0)), data={}, ) - assert adapter.subject_for(event) == "central.quake.event.great" + assert adapter.subject_for(event) == "central.quake.event.great.unknown"