From 73c007d227ded5c03d26002d0e88e68ae679cbbf Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 28 May 2026 02:28:19 +0000 Subject: [PATCH] feat(central): v0.4 C.1 Central connector backend (no-op until adapter source flipped) Adds the backend for sourcing environmental feeds from Central's NATS JetStream firehose instead of (or alongside) meshai's native adapters. Architecture is Matt-approved Option 3' (dedicated package + per-adapter source switch surfaced on the existing Environmental config). NO-OP POSTURE (intentional): every adapter defaults to feed_source="native" and environmental.central.enabled defaults false, so on a stock config the CentralConsumer starts and subscribes to nothing -- behavior is byte-for-byte v0.3. Live env_feeds.yaml is unchanged on disk; an operator who touches nothing sees no change. Flipping an adapter to central is Phase C.3; the dashboard UI for it is Phase C.2. What landed: - meshai/central/ package (CentralConsumer): async start()/stop(), JetStream durable subscribe to subjects derived from adapters with feed_source=central, and _on_message -> normalize -> bus.emit. nats-py is lazy-imported only on the connect path, so no-op boot has zero NATS dependency. - Normalization (CloudEvents envelope -> Central Event -> upstream data): source = inner Event.adapter category = Central hierarchical string -> meshai flat, via a small table-driven prefix map (map_category) severity = 0|1->routine, 2->priority, 3|4->immediate, null->routine lat/lon = geo.centroid, swapped from GeoJSON [lon,lat] -> (lat,lon) group_key/inhibit = outer envelope id (dedup parity with native adapters) expires/timestamp parsed from ISO-8601 Event.data = upstream payload verbatim (generic _enriched merge, preserved as-is incl. hydro's extra usgs_site/usgs_stats bundles) - Tombstone (`.removed.` subject or `:removed` id suffix) -> a "clear" Event carrying the ORIGINAL group_key (`:removed` stripped) + data._central_tombstone so the grouper/inhibitor lets the prior event lapse naturally. - config.py: a `_SourcedFeed` mixin adds `feed_source: native|central` (validated in __post_init__) to all 10 adapter configs; new CentralConsumerConfig as environmental.central { enabled, url, durable, connect_timeout }. Both ride the generic _dict_to_dataclass coercion, so they are GUI-editable via PUT /config/environmental (Rule 17) -- frontend fields come in C.2. - env/store.py: each adapter is instantiated only when enabled AND feed_source=="native"; a feed_source=central adapter is skipped natively (debug-logged) so Central can own it without a duplicate. - main.py: CentralConsumer constructed + started after start_pipeline(), stopped in stop(). DEVIATION FROM SPEC (documented): the spec named the new field `source`, but FIRMSConfig already has a `source` field (the satellite product, "VIIRS_SNPP_NRT"). To avoid the collision the field is named **feed_source** across all adapters. Everything else follows the spec. NETWORKING: zero infra change required. The meshai container already reaches the Central NATS server directly (TCP to 100.64.0.12:4222 OK) and resolves central.echo6.mesh via the Phase 2.6.6 MagicDNS fix. No docker-compose edit; default bridge works (LXC host masquerades to the Tailscale CGNAT range). The lighter bridge-route / host-net / sidecar fallbacks were not needed. Tests: tests/test_central_consumer.py (11) + tests/test_config_source_field.py (6): no-op-when-native, subjects-when-central, source-gate skips native instantiation, normalize+emit, _enriched preserved verbatim, tombstone->clear, severity map (0-4/null), category map (>=4 strings), async _on_message emits+acks, start() no-op without NATS, feed_source default/validate/reject/ dict-coercion. Full suite: 269 passed (was 253 + 16 new). Verification: (A) no bare self._x() in consumer.py. (B) py_compile clean. (C) 269 passed. (D) rebuilt prod -- 8 native adapters, pipeline started, native nifc/traffic emissions still flowing, healthy, no errors, log "CentralConsumer started; 0 subjects subscribed -- no adapters set to central". (E) in-container synthetic _on_message injection normalized correctly (usgs_quake/earthquake_event/immediate, centroid swapped, _enriched preserved) and reached the bus; ephemeral, no config change to roll back. C.2 (dashboard frontend for the feed_source switch + central connection) is next. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/central/__init__.py | 6 + meshai/central/consumer.py | 249 ++++++++++++++++++++++++++++++ meshai/config.py | 43 ++++-- meshai/env/store.py | 24 +-- meshai/main.py | 8 + pyproject.toml | 1 + tests/test_central_consumer.py | 139 +++++++++++++++++ tests/test_config_source_field.py | 50 ++++++ 8 files changed, 500 insertions(+), 20 deletions(-) create mode 100644 meshai/central/__init__.py create mode 100644 meshai/central/consumer.py create mode 100644 tests/test_central_consumer.py create mode 100644 tests/test_config_source_field.py diff --git a/meshai/central/__init__.py b/meshai/central/__init__.py new file mode 100644 index 0000000..524a913 --- /dev/null +++ b/meshai/central/__init__.py @@ -0,0 +1,6 @@ +"""Central connector package (v0.4) — consumes Central's NATS JetStream +firehose and normalizes it into meshai pipeline Events.""" + +from meshai.central.consumer import CentralConsumer + +__all__ = ["CentralConsumer"] diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py new file mode 100644 index 0000000..1547bb8 --- /dev/null +++ b/meshai/central/consumer.py @@ -0,0 +1,249 @@ +"""Central connector — consumes Central's NATS JetStream firehose and +normalizes CloudEvents envelopes into meshai pipeline Events. + +v0.4 C.1: backend only. The consumer subscribes only to subjects derived from +adapters whose config `source == "central"`. With every adapter defaulting to +`native`, it starts as a no-op (0 subscriptions) and introduces no NATS +dependency at boot. Flipping an adapter to central is Phase C.3. + +Wire format (see Central CONSUMER-INTEGRATION guide, confirmed in v0.4 Phase A): + envelope (CloudEvents v1.0) -> envelope["data"] (Central Event) + -> Event["data"] (upstream payload, verbatim, incl `_enriched`) +""" + +import json +import logging +import re +from datetime import datetime +from typing import Optional + +from meshai.notifications.events import Event, make_event + +logger = logging.getLogger("meshai.central.consumer") + + +# meshai adapter (env-config attr) -> Central subject filters it consumes. +# Adapters with no Central equivalent (avalanche, ducting, roads511) are absent; +# setting source=central on those subscribes to nothing (logged). +ADAPTER_SUBJECTS: dict[str, list[str]] = { + "nws": ["central.wx.>"], + "fires": ["central.fire.incident.>", "central.fire.perimeter.>"], + "firms": ["central.fire.hotspot.>"], + "usgs_quake": ["central.quake.>"], + "usgs": ["central.hydro.>"], + "swpc": ["central.space.>"], + "traffic": ["central.traffic.>"], +} + +# Central hierarchical category prefix -> meshai flat category. +# First matching prefix wins; order matters (most specific first). +_CATEGORY_MAP: list[tuple[str, str]] = [ + ("wx.alert", "weather_warning"), + ("wx.", "weather_statement"), + ("fire.hotspot", "wildfire_hotspot"), + ("fire.incident", "wildfire_incident"), + ("fire.perimeter", "wildfire_incident"), + ("fire.", "wildfire_incident"), + ("quake.", "earthquake_event"), + ("hydro.", "stream_flow"), + ("space.alert", "rf_propagation_alert"), + ("space.kindex", "geomagnetic_storm"), + ("space.proton", "solar_radiation_storm"), + ("space.", "geomagnetic_storm"), + ("disaster.", "disaster_event"), + ("traffic_flow", "traffic_flow"), + ("traffic_cameras", "traffic_camera"), + ("traffic.", "traffic_congestion"), +] + + +def map_category(central_category: str) -> str: + """Map Central's hierarchical category string to a meshai flat category.""" + cat = central_category or "" + for prefix, flat in _CATEGORY_MAP: + if cat.startswith(prefix): + return flat + return "other" + + +def map_severity(sev: Optional[int]) -> str: + """Central int severity (0-4 / None) -> meshai severity string. + + 0|1 -> routine, 2 -> priority, 3|4 -> immediate, None -> routine. + """ + if sev is None: + return "routine" + try: + sev = int(sev) + except (TypeError, ValueError): + return "routine" + if sev >= 3: + return "immediate" + if sev == 2: + return "priority" + return "routine" + + +def _parse_time(s) -> Optional[float]: + """Parse a Central ISO-8601 timestamp to epoch seconds.""" + if not s or not isinstance(s, str): + return None + try: + return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() + except Exception: + return None + + +class CentralConsumer: + """Subscribes to Central JetStream subjects and emits normalized Events.""" + + def __init__(self, env_config, event_bus): + """Args: + env_config: the EnvironmentalConfig (provides .central + per-adapter .source) + event_bus: the pipeline EventBus to emit normalized Events onto + """ + self._env = env_config + self._central = getattr(env_config, "central", None) + self._bus = event_bus + self._nc = None + self._js = None + self._subs: list = [] + + # ---- subject derivation ---- + def subjects(self) -> list[str]: + """Subject filters for adapters whose source == 'central'.""" + out: list[str] = [] + for attr, subjects in ADAPTER_SUBJECTS.items(): + cfg = getattr(self._env, attr, None) + if cfg is not None and getattr(cfg, "feed_source", "native") == "central": + out.extend(subjects) + # Warn about centralized adapters that have no Central mapping. + for attr in ("avalanche", "ducting", "roads511"): + cfg = getattr(self._env, attr, None) + if cfg is not None and getattr(cfg, "feed_source", "native") == "central": + logger.warning("Adapter %r set to source=central but Central has no " + "matching stream; nothing will be consumed for it.", attr) + return out + + # ---- normalization ---- + def _normalize(self, subject: str, envelope: dict) -> Optional[Event]: + """CloudEvents envelope -> meshai Event (None if unusable).""" + inner = envelope.get("data") or {} + env_id = envelope.get("id") or inner.get("id") + if not env_id: + return None + + is_tombstone = (".removed." in (subject or "")) or str(env_id).endswith(":removed") + # The clear event shares the ORIGINAL event's group_key so the grouper/ + # inhibitor lets the prior event lapse naturally. + group_key = str(env_id) + if is_tombstone: + group_key = re.sub(r":removed$", "", group_key) + + cat_raw = inner.get("category") or envelope.get("centralcategory") or "" + + geo = inner.get("geo") or {} + lat = lon = None + centroid = geo.get("centroid") + if isinstance(centroid, (list, tuple)) and len(centroid) >= 2: + lon, lat = centroid[0], centroid[1] # GeoJSON [lon, lat] -> (lat, lon) + + # Preserve the upstream payload verbatim (incl. `_enriched`) in Event.data. + data = dict(inner.get("data") or {}) + if is_tombstone: + data["_central_tombstone"] = True + + title = (data.get("title") or data.get("headline") + or cat_raw or f"{inner.get('adapter', 'central')} event") + + kwargs = dict( + title=str(title)[:200], + summary="", + lat=lat, + lon=lon, + region=geo.get("primary_region"), + regions=geo.get("regions") or [], + group_key=group_key, + inhibit_keys=[group_key], + data=data, + ) + ts = _parse_time(inner.get("time")) + if ts is not None: + kwargs["timestamp"] = ts + exp = _parse_time(inner.get("expires")) + if exp is not None: + kwargs["expires"] = exp + + return make_event( + source=inner.get("adapter") or "central", + category=map_category(cat_raw), + severity=map_severity(inner.get("severity")), + **kwargs, + ) + + def _handle(self, subject: str, raw: bytes) -> Optional[Event]: + """Normalize a raw message body and emit to the bus. Returns the Event.""" + try: + envelope = json.loads(raw) + except Exception: + logger.exception("CentralConsumer: bad JSON on %s", subject) + return None + event = self._normalize(subject, envelope) + if event is not None and self._bus is not None: + self._bus.emit(event) + return event + + async def _on_message(self, msg) -> None: + """JetStream callback: normalize + emit, then ack.""" + try: + self._handle(msg.subject, msg.data) + except Exception: + logger.exception("CentralConsumer: handler failed on %s", + getattr(msg, "subject", "?")) + finally: + ack = getattr(msg, "ack", None) + if ack is not None: + try: + await ack() + except Exception: + pass + + # ---- lifecycle ---- + async def start(self) -> None: + subjects = self.subjects() + if not subjects: + logger.info("CentralConsumer started; 0 subjects subscribed -- " + "no adapters set to central") + return + if self._central is None or not getattr(self._central, "enabled", False): + logger.warning("CentralConsumer: %d adapter(s) want source=central but " + "environmental.central.enabled is false; not subscribing: %s", + len(subjects), subjects) + return + + import nats # lazy: no NATS dependency at boot unless actually consuming + self._nc = await nats.connect( + self._central.url, + connect_timeout=getattr(self._central, "connect_timeout", 10.0), + ) + self._js = self._nc.jetstream() + for subj in subjects: + durable = self._central.durable + "-" + re.sub(r"[^a-z0-9]+", "_", subj.lower()) + sub = await self._js.subscribe(subj, durable=durable, cb=self._on_message) + self._subs.append(sub) + logger.info("CentralConsumer started; %d subjects subscribed: %s", + len(subjects), subjects) + + async def stop(self) -> None: + if self._nc is not None: + try: + await self._nc.drain() + except Exception: + pass + try: + await self._nc.close() + except Exception: + pass + self._nc = None + self._js = None + self._subs = [] diff --git a/meshai/config.py b/meshai/config.py index 96bd010..df67582 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -297,7 +297,19 @@ class MeshIntelligenceConfig: # Environmental feed configs @dataclass -class NWSConfig: +class _SourcedFeed: + """Mixin: an environmental feed is sourced 'native' (local adapter) or + 'central' (Central NATS firehose). Default 'native' preserves v0.3 behavior.""" + + feed_source: str = "native" + + def __post_init__(self): + if self.feed_source not in ("native", "central"): + raise ValueError(f"feed_source must be 'native' or 'central', got {self.feed_source!r}") + + +@dataclass +class NWSConfig(_SourcedFeed): """NWS weather alerts settings.""" enabled: bool = True @@ -316,7 +328,7 @@ class NWSConfig: @dataclass -class SWPCConfig: +class SWPCConfig(_SourcedFeed): """NOAA Space Weather settings.""" enabled: bool = True @@ -331,7 +343,7 @@ class SWPCConfig: @dataclass -class DuctingConfig: +class DuctingConfig(_SourcedFeed): """Tropospheric ducting settings.""" enabled: bool = True @@ -349,7 +361,7 @@ class DuctingConfig: @dataclass -class NICFFiresConfig: +class NICFFiresConfig(_SourcedFeed): """NIFC fire perimeters settings (Phase 2).""" enabled: bool = False @@ -358,7 +370,7 @@ class NICFFiresConfig: @dataclass -class AvalancheConfig: +class AvalancheConfig(_SourcedFeed): """Avalanche advisory settings (Phase 2).""" enabled: bool = False @@ -368,7 +380,7 @@ class AvalancheConfig: @dataclass -class USGSConfig: +class USGSConfig(_SourcedFeed): """USGS stream gauge settings.""" enabled: bool = False @@ -378,7 +390,7 @@ class USGSConfig: @dataclass -class USGSQuakeConfig: +class USGSQuakeConfig(_SourcedFeed): """USGS earthquake feed settings (Phase 2.14).""" enabled: bool = False @@ -391,7 +403,7 @@ class USGSQuakeConfig: @dataclass -class TomTomConfig: +class TomTomConfig(_SourcedFeed): """TomTom traffic flow settings.""" enabled: bool = False @@ -401,7 +413,7 @@ class TomTomConfig: @dataclass -class Roads511Config: +class Roads511Config(_SourcedFeed): """511 road conditions settings.""" enabled: bool = False @@ -413,7 +425,7 @@ class Roads511Config: @dataclass -class FIRMSConfig: +class FIRMSConfig(_SourcedFeed): """NASA FIRMS satellite fire hotspot settings.""" enabled: bool = False @@ -426,6 +438,16 @@ class FIRMSConfig: proximity_km: float = 10.0 # km to match known fire +@dataclass +class CentralConsumerConfig: + """Connection settings for the Central NATS JetStream consumer (v0.4).""" + + enabled: bool = False + url: str = "nats://central.echo6.mesh:4222" + durable: str = "meshai-consumer" + connect_timeout: float = 10.0 + + @dataclass class EnvironmentalConfig: """Environmental feeds settings.""" @@ -442,6 +464,7 @@ class EnvironmentalConfig: traffic: TomTomConfig = field(default_factory=TomTomConfig) roads511: Roads511Config = field(default_factory=Roads511Config) firms: FIRMSConfig = field(default_factory=FIRMSConfig) + central: CentralConsumerConfig = field(default_factory=CentralConsumerConfig) @dataclass diff --git a/meshai/env/store.py b/meshai/env/store.py index 3aa11a5..bff873f 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -29,49 +29,53 @@ class EnvironmentalStore: self._region_anchors = region_anchors or [] # Create adapter instances based on config - if config.nws.enabled: + if config.nws.enabled and config.nws.feed_source == "native": from .nws import NWSAlertsAdapter self._adapters["nws"] = NWSAlertsAdapter(config.nws) - if config.swpc.enabled: + if config.swpc.enabled and config.swpc.feed_source == "native": from .swpc import SWPCAdapter self._adapters["swpc"] = SWPCAdapter(config.swpc) - if config.ducting.enabled: + if config.ducting.enabled and config.ducting.feed_source == "native": from .ducting import DuctingAdapter self._adapters["ducting"] = DuctingAdapter(config.ducting) - if config.fires.enabled: + if config.fires.enabled and config.fires.feed_source == "native": from .fires import NICFFiresAdapter self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors) - if config.avalanche.enabled: + if config.avalanche.enabled and config.avalanche.feed_source == "native": from .avalanche import AvalancheAdapter self._adapters["avalanche"] = AvalancheAdapter(config.avalanche) - if config.usgs.enabled: + if config.usgs.enabled and config.usgs.feed_source == "native": from .usgs import USGSStreamsAdapter self._adapters["usgs"] = USGSStreamsAdapter(config.usgs) - if config.usgs_quake.enabled: + if config.usgs_quake.enabled and config.usgs_quake.feed_source == "native": from .usgs_quake import USGSQuakeAdapter self._adapters["usgs_quake"] = USGSQuakeAdapter(config.usgs_quake) - if config.traffic.enabled: + if config.traffic.enabled and config.traffic.feed_source == "native": from .traffic import TomTomTrafficAdapter self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic) - if config.roads511.enabled: + if config.roads511.enabled and config.roads511.feed_source == "native": from .roads511 import Roads511Adapter self._adapters["roads511"] = Roads511Adapter(config.roads511) # FIRMS needs reference to NIFC adapter for cross-referencing - if config.firms.enabled: + if config.firms.enabled and config.firms.feed_source == "native": from .firms import FIRMSAdapter fires_adapter = self._adapters.get("nifc") self._firms = FIRMSAdapter(config.firms, self._region_anchors, fires_adapter) self._adapters["firms"] = self._firms + _central = [n for n in ("nws", "swpc", "ducting", "fires", "avalanche", "usgs", "usgs_quake", "traffic", "roads511", "firms") + if getattr(getattr(config, n, None), "feed_source", "native") == "central"] + if _central: + logger.debug("Adapters sourced from Central (native skipped): %s", _central) logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters") def refresh(self) -> bool: diff --git a/meshai/main.py b/meshai/main.py index 6a994dc..f613f7b 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -49,6 +49,7 @@ class MeshAI: self.event_bus = None # Notification pipeline EventBus (v0.3) self._pipeline_scheduler = None # DigestScheduler from start_pipeline() self.env_store = None # Environmental feeds store + self._central_consumer = None # Central NATS consumer (v0.4) self._last_sub_check: float = 0.0 self.router: Optional[MessageRouter] = None self.responder: Optional[Responder] = None @@ -90,6 +91,10 @@ class MeshAI: self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config) logger.info("Notification pipeline started") + from .central.consumer import CentralConsumer + self._central_consumer = CentralConsumer(self.config.environmental, self.event_bus) + await self._central_consumer.start() + logger.info("MeshAI started successfully") # Keep running @@ -183,6 +188,9 @@ class MeshAI: from .notifications.pipeline import stop_pipeline await stop_pipeline(self._pipeline_scheduler) + if self._central_consumer is not None: + await self._central_consumer.stop() + if self.connector: self.connector.disconnect() diff --git a/pyproject.toml b/pyproject.toml index 6bdd4b0..8c45cbb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "fastapi>=0.110.0", "uvicorn[standard]>=0.27.0", "aiomqtt>=2.0.0", + "nats-py>=2.0.0", ] [project.optional-dependencies] diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py new file mode 100644 index 0000000..6295553 --- /dev/null +++ b/tests/test_central_consumer.py @@ -0,0 +1,139 @@ +"""v0.4 C.1: Central connector backend — normalization, lifecycle, source gate.""" + +import asyncio +import json + +import pytest + +from meshai.config import EnvironmentalConfig +from meshai.central.consumer import CentralConsumer, map_category, map_severity +from meshai.notifications.pipeline.bus import EventBus + + +def make_consumer(): + env = EnvironmentalConfig() + bus = EventBus() + rec = [] + bus.subscribe(rec.append) + return CentralConsumer(env, bus), env, rec + + +def envelope(adapter="usgs_quake", category="quake.event", severity=2, + eid="us6000abcd", centroid=(-114.5, 42.6), upstream=None, + time="2026-05-27T12:00:00Z", expires=None): + return { + "id": eid, "source": "central.echo6.co", + "type": f"central.{category}.v1", "time": time, + "centralcategory": category, "centralseverity": severity, + "specversion": "1.0", "datacontenttype": "application/json", + "data": { + "id": eid, "adapter": adapter, "category": category, + "time": time, "expires": expires, "severity": severity, + "geo": {"centroid": list(centroid), "bbox": None, + "regions": ["US-ID"], "primary_region": "US-ID"}, + "data": upstream if upstream is not None else {"magnitude": 4.2, "place": "near Twin Falls"}, + }, + } + + +class FakeMsg: + def __init__(self, subject, env): + self.subject = subject + self.data = json.dumps(env).encode() + self.acked = False + + async def ack(self): + self.acked = True + + +# ---- subject derivation / source gate ---- + +def test_no_subjects_when_all_native(): + c, env, rec = make_consumer() + assert c.subjects() == [] + + +def test_subjects_when_central(): + c, env, rec = make_consumer() + env.usgs_quake.feed_source = "central" + assert "central.quake.>" in c.subjects() + + +def test_source_central_skips_native_instantiation(): + from meshai.env.store import EnvironmentalStore + env = EnvironmentalConfig() + env.enabled = True + env.usgs_quake.enabled = True + env.usgs_quake.feed_source = "central" # should be skipped natively + env.nws.enabled = True # native -> present + store = EnvironmentalStore(config=env, region_anchors=[], event_bus=None) + assert "usgs_quake" not in store._adapters + assert "nws" in store._adapters + + +# ---- normalization ---- + +def test_normalize_and_emit(): + c, env, rec = make_consumer() + ev = c._handle("central.quake.event.moderate", json.dumps(envelope()).encode()) + assert ev is not None + assert len(rec) == 1 + e = rec[0] + assert e.source == "usgs_quake" + assert e.category == "earthquake_event" + assert e.severity == "priority" # central severity 2 + assert e.lat == 42.6 and e.lon == -114.5 # [lon,lat] -> (lat,lon) + assert e.group_key == "us6000abcd" + assert e.region == "US-ID" + assert e.data.get("magnitude") == 4.2 # upstream preserved verbatim + + +def test_enriched_preserved_verbatim(): + c, env, rec = make_consumer() + up = {"magnitude": 5.1, "_enriched": {"geocoder": {"state": "Idaho"}, "usgs_stats": {"x": 1}}} + ev = c._handle("central.quake.event.strong", json.dumps(envelope(severity=4, upstream=up)).encode()) + assert ev.severity == "immediate" + assert ev.data["_enriched"]["geocoder"]["state"] == "Idaho" + assert ev.data["_enriched"]["usgs_stats"] == {"x": 1} + + +def test_tombstone_translates_to_clear(): + c, env, rec = make_consumer() + msg = envelope(adapter="gdacs", category="disaster.fl.removed", severity=0, eid="FL1103885:removed") + ev = c._handle("central.disaster.fl.removed.austria", json.dumps(msg).encode()) + assert ev is not None + assert ev.group_key == "FL1103885" # ':removed' stripped -> matches original + assert ev.data.get("_central_tombstone") is True + + +def test_severity_mapping(): + assert map_severity(0) == "routine" + assert map_severity(1) == "routine" + assert map_severity(2) == "priority" + assert map_severity(3) == "immediate" + assert map_severity(4) == "immediate" + assert map_severity(None) == "routine" + + +def test_category_mapping(): + assert map_category("wx.alert.severe_thunderstorm_warning") == "weather_warning" + assert map_category("quake.event") == "earthquake_event" + assert map_category("fire.hotspot.viirs_noaa20.high") == "wildfire_hotspot" + assert map_category("hydro.00060.usgs.06901250") == "stream_flow" + + +# ---- async callback path ---- + +def test_on_message_emits_and_acks(): + c, env, rec = make_consumer() + msg = FakeMsg("central.quake.event.moderate", envelope()) + asyncio.run(c._on_message(msg)) + assert msg.acked is True + assert len(rec) == 1 + + +def test_start_no_op_when_all_native(): + """start() is a no-op (no NATS connect) when no adapter is central.""" + c, env, rec = make_consumer() + asyncio.run(c.start()) # must not raise / must not require NATS + assert c._nc is None diff --git a/tests/test_config_source_field.py b/tests/test_config_source_field.py new file mode 100644 index 0000000..028701c --- /dev/null +++ b/tests/test_config_source_field.py @@ -0,0 +1,50 @@ +"""v0.4 C.1: per-adapter `source` field + CentralConsumerConfig.""" + +import pytest + +from meshai.config import ( + NWSConfig, FIRMSConfig, USGSQuakeConfig, + EnvironmentalConfig, CentralConsumerConfig, +) + +_ADAPTERS = ("nws", "swpc", "ducting", "fires", "avalanche", + "usgs", "usgs_quake", "traffic", "roads511", "firms") + + +def test_source_defaults_native(): + assert NWSConfig().feed_source == "native" + assert FIRMSConfig().feed_source == "native" + assert USGSQuakeConfig().feed_source == "native" + + +def test_all_adapters_default_native(): + env = EnvironmentalConfig() + for attr in _ADAPTERS: + assert getattr(env, attr).feed_source == "native", attr + + +def test_source_central_validates(): + assert NWSConfig(feed_source="central").feed_source == "central" + assert USGSQuakeConfig(feed_source="central").feed_source == "central" + + +def test_source_garbage_rejects(): + with pytest.raises(ValueError): + NWSConfig(feed_source="garbage") + with pytest.raises(ValueError): + FIRMSConfig(feed_source="") + + +def test_environmental_has_central_default(): + env = EnvironmentalConfig() + assert isinstance(env.central, CentralConsumerConfig) + assert env.central.enabled is False + assert env.central.url.startswith("nats://") + + +def test_source_field_survives_dict_coercion(): + """A `source` in yaml/dict is coerced onto the adapter config.""" + from meshai.config import Config, _dict_to_dataclass + cfg = _dict_to_dataclass(Config, {"environmental": {"usgs_quake": {"enabled": True, "feed_source": "central"}}}) + assert cfg.environmental.usgs_quake.feed_source == "central" + assert cfg.environmental.nws.feed_source == "native" # untouched default