meshai/tests/test_central_consumer.py

165 lines
6.1 KiB
Python
Raw Normal View History

feat(central): v0.4 C.1 Central connector backend (no-op until adapter source flipped) Adds the backend for sourcing environmental feeds from Central's NATS JetStream firehose instead of (or alongside) meshai's native adapters. Architecture is Matt-approved Option 3' (dedicated package + per-adapter source switch surfaced on the existing Environmental config). NO-OP POSTURE (intentional): every adapter defaults to feed_source="native" and environmental.central.enabled defaults false, so on a stock config the CentralConsumer starts and subscribes to nothing -- behavior is byte-for-byte v0.3. Live env_feeds.yaml is unchanged on disk; an operator who touches nothing sees no change. Flipping an adapter to central is Phase C.3; the dashboard UI for it is Phase C.2. What landed: - meshai/central/ package (CentralConsumer): async start()/stop(), JetStream durable subscribe to subjects derived from adapters with feed_source=central, and _on_message -> normalize -> bus.emit. nats-py is lazy-imported only on the connect path, so no-op boot has zero NATS dependency. - Normalization (CloudEvents envelope -> Central Event -> upstream data): source = inner Event.adapter category = Central hierarchical string -> meshai flat, via a small table-driven prefix map (map_category) severity = 0|1->routine, 2->priority, 3|4->immediate, null->routine lat/lon = geo.centroid, swapped from GeoJSON [lon,lat] -> (lat,lon) group_key/inhibit = outer envelope id (dedup parity with native adapters) expires/timestamp parsed from ISO-8601 Event.data = upstream payload verbatim (generic _enriched merge, preserved as-is incl. hydro's extra usgs_site/usgs_stats bundles) - Tombstone (`.removed.` subject or `:removed` id suffix) -> a "clear" Event carrying the ORIGINAL group_key (`:removed` stripped) + data._central_tombstone so the grouper/inhibitor lets the prior event lapse naturally. - config.py: a `_SourcedFeed` mixin adds `feed_source: native|central` (validated in __post_init__) to all 10 adapter configs; new CentralConsumerConfig as environmental.central { enabled, url, durable, connect_timeout }. Both ride the generic _dict_to_dataclass coercion, so they are GUI-editable via PUT /config/environmental (Rule 17) -- frontend fields come in C.2. - env/store.py: each adapter is instantiated only when enabled AND feed_source=="native"; a feed_source=central adapter is skipped natively (debug-logged) so Central can own it without a duplicate. - main.py: CentralConsumer constructed + started after start_pipeline(), stopped in stop(). DEVIATION FROM SPEC (documented): the spec named the new field `source`, but FIRMSConfig already has a `source` field (the satellite product, "VIIRS_SNPP_NRT"). To avoid the collision the field is named **feed_source** across all adapters. Everything else follows the spec. NETWORKING: zero infra change required. The meshai container already reaches the Central NATS server directly (TCP to 100.64.0.12:4222 OK) and resolves central.echo6.mesh via the Phase 2.6.6 MagicDNS fix. No docker-compose edit; default bridge works (LXC host masquerades to the Tailscale CGNAT range). The lighter bridge-route / host-net / sidecar fallbacks were not needed. Tests: tests/test_central_consumer.py (11) + tests/test_config_source_field.py (6): no-op-when-native, subjects-when-central, source-gate skips native instantiation, normalize+emit, _enriched preserved verbatim, tombstone->clear, severity map (0-4/null), category map (>=4 strings), async _on_message emits+acks, start() no-op without NATS, feed_source default/validate/reject/ dict-coercion. Full suite: 269 passed (was 253 + 16 new). Verification: (A) no bare self._x() in consumer.py. (B) py_compile clean. (C) 269 passed. (D) rebuilt prod -- 8 native adapters, pipeline started, native nifc/traffic emissions still flowing, healthy, no errors, log "CentralConsumer started; 0 subjects subscribed -- no adapters set to central". (E) in-container synthetic _on_message injection normalized correctly (usgs_quake/earthquake_event/immediate, centroid swapped, _enriched preserved) and reached the bus; ephemeral, no config change to roll back. C.2 (dashboard frontend for the feed_source switch + central connection) is next. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 02:28:19 +00:00
"""v0.4 C.1: Central connector backend — normalization, lifecycle, source gate."""
import asyncio
import json
import pytest
from meshai.config import EnvironmentalConfig
from meshai.central.consumer import CentralConsumer, map_category, map_severity
from meshai.notifications.pipeline.bus import EventBus
def make_consumer():
env = EnvironmentalConfig()
bus = EventBus()
rec = []
bus.subscribe(rec.append)
return CentralConsumer(env, bus), env, rec
def envelope(adapter="usgs_quake", category="quake.event", severity=2,
eid="us6000abcd", centroid=(-114.5, 42.6), upstream=None,
time="2026-05-27T12:00:00Z", expires=None):
return {
"id": eid, "source": "central.echo6.co",
"type": f"central.{category}.v1", "time": time,
"centralcategory": category, "centralseverity": severity,
"specversion": "1.0", "datacontenttype": "application/json",
"data": {
"id": eid, "adapter": adapter, "category": category,
"time": time, "expires": expires, "severity": severity,
"geo": {"centroid": list(centroid), "bbox": None,
"regions": ["US-ID"], "primary_region": "US-ID"},
"data": upstream if upstream is not None else {"magnitude": 4.2, "place": "near Twin Falls"},
},
}
class FakeMsg:
def __init__(self, subject, env):
self.subject = subject
self.data = json.dumps(env).encode()
self.acked = False
async def ack(self):
self.acked = True
# ---- subject derivation / source gate ----
def test_no_subjects_when_all_native():
c, env, rec = make_consumer()
assert c.subjects() == []
def test_subjects_when_central():
c, env, rec = make_consumer()
env.usgs_quake.feed_source = "central"
assert "central.quake.>" in c.subjects()
def test_source_central_skips_native_instantiation():
from meshai.env.store import EnvironmentalStore
env = EnvironmentalConfig()
env.enabled = True
env.usgs_quake.enabled = True
env.usgs_quake.feed_source = "central" # should be skipped natively
env.nws.enabled = True # native -> present
store = EnvironmentalStore(config=env, region_anchors=[], event_bus=None)
assert "usgs_quake" not in store._adapters
assert "nws" in store._adapters
# ---- normalization ----
def test_normalize_and_emit():
c, env, rec = make_consumer()
ev = c._handle("central.quake.event.moderate", json.dumps(envelope()).encode())
assert ev is not None
assert len(rec) == 1
e = rec[0]
assert e.source == "usgs_quake"
assert e.category == "earthquake_event"
assert e.severity == "priority" # central severity 2
assert e.lat == 42.6 and e.lon == -114.5 # [lon,lat] -> (lat,lon)
assert e.group_key == "us6000abcd"
assert e.region == "US-ID"
assert e.data.get("magnitude") == 4.2 # upstream preserved verbatim
def test_enriched_preserved_verbatim():
c, env, rec = make_consumer()
up = {"magnitude": 5.1, "_enriched": {"geocoder": {"state": "Idaho"}, "usgs_stats": {"x": 1}}}
ev = c._handle("central.quake.event.strong", json.dumps(envelope(severity=4, upstream=up)).encode())
assert ev.severity == "immediate"
assert ev.data["_enriched"]["geocoder"]["state"] == "Idaho"
assert ev.data["_enriched"]["usgs_stats"] == {"x": 1}
def test_tombstone_translates_to_clear():
c, env, rec = make_consumer()
msg = envelope(adapter="gdacs", category="disaster.fl.removed", severity=0, eid="FL1103885:removed")
ev = c._handle("central.disaster.fl.removed.austria", json.dumps(msg).encode())
assert ev is not None
assert ev.group_key == "FL1103885" # ':removed' stripped -> matches original
assert ev.data.get("_central_tombstone") is True
def test_severity_mapping():
assert map_severity(0) == "routine"
assert map_severity(1) == "routine"
assert map_severity(2) == "priority"
assert map_severity(3) == "immediate"
assert map_severity(4) == "immediate"
assert map_severity(None) == "routine"
def test_category_mapping():
assert map_category("wx.alert.severe_thunderstorm_warning") == "weather_warning"
assert map_category("quake.event") == "earthquake_event"
assert map_category("fire.hotspot.viirs_noaa20.high") == "wildfire_hotspot"
assert map_category("hydro.00060.usgs.06901250") == "stream_flow"
# ---- async callback path ----
def test_on_message_emits_and_acks():
c, env, rec = make_consumer()
msg = FakeMsg("central.quake.event.moderate", envelope())
asyncio.run(c._on_message(msg))
assert msg.acked is True
assert len(rec) == 1
def test_start_no_op_when_all_native():
"""start() is a no-op (no NATS connect) when no adapter is central."""
c, env, rec = make_consumer()
asyncio.run(c.start()) # must not raise / must not require NATS
assert c._nc is None
fix(central): v0.4 C.3.1 -- preserve secret refs in save_section + deliver_policy=NEW (no backlog flood) Fixes the two real bugs C.3 surfaced when flipping usgs_quake to central. BUG #1 -- GUI save dropped ${VAR} secret refs (config_loader.save_section). before: A GUI PUT round-trips the *interpolated* secret value (GET returns the resolved key string, e.g. the real TomTom key). save_section's check_secrets saw a literal string at a SECRET_FIELDS path, didn't recognize it as a ref, and DROPPED it -- losing the on-disk ${TOMTOM_API_KEY} placeholder. C.3's flip PUT stripped TomTom's key. after: check_secrets now reads the raw on-disk value (pre-interpolation) for each secret field and decides three ways: on-disk ${VAR} and new == resolved(VAR) -> keep the ${VAR} ref on-disk ${VAR} and new != resolved(VAR) -> intentional change, store it no on-disk ${VAR} ref -> reject (never write a raw secret to a domain file) ${VAR} resolution mirrors load: os.environ first, then /data/secrets/.env. The common case (GUI re-saves unchanged config) now preserves the placeholder instead of dropping it. BUG #2 -- CentralConsumer replayed the entire retained backlog on first flip. before: js.subscribe(...) with no config -> default deliver_policy=all. Fine for quake (682 msgs) but would flood the bus with ~330k traffic_flow messages on first flip. after: consumer_config() -> ConsumerConfig(deliver_policy=DeliverPolicy.NEW): only messages published AFTER consumer creation. meshai won't see the backlog on first flip -- acceptable, Central is a live firehose for current events. (NOT geo-filtering -- that's a Central-side issue filed separately for the Central project.) Files: meshai/config_loader.py (save_section secret preservation), meshai/central/consumer.py (consumer_config() + deliver_policy=NEW), tests/test_save_section_secret_preserve.py (new), tests/test_central_consumer.py (deliver_policy assertion). Verification: - (A) py_compile clean on config_loader.py + consumer.py. - (C) pytest -q: 276 passed (272 + 4 new -- preserve-unchanged-ref, changed-value-written, no-placeholder-still-rejects, deliver_policy=NEW). The C.2.1 strip test still passes (no placeholder -> reject). - (D) In-prod (rebuilt): GET+PUT /api/config/environmental round-trip -> {"saved":true}; on-disk traffic.api_key stayed '${TOMTOM_API_KEY}' (SECRET_REF_PRESERVED: True), not the literal key; disk restored to baseline. consumer_config().deliver_policy == DeliverPolicy.NEW in the built image. Follow-up for D rollout: the durable 'meshai-v04-central_quake_' created during C.3 was made with deliver_policy=all; re-flipping a domain may need that stale durable deleted on the Central NATS server first (config mismatch on re-subscribe). D rollout (remaining domains) is now safe: GUI flips preserve secret refs and new subscriptions don't replay huge backlogs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 04:55:20 +00:00
def test_consumer_config_uses_deliver_policy_new():
"""C.3.1: Central subscriptions use deliver_policy=NEW (no full-backlog replay)."""
from meshai.central.consumer import consumer_config
from nats.js.api import DeliverPolicy
assert consumer_config().deliver_policy == DeliverPolicy.NEW
2026-05-28 05:05:12 +00:00
def test_subject_domain_fallback_for_unmapped_category():
"""D.1: an unmapped category (traffic 'work_zone.wzdx') falls back to the
subject domain instead of returning 'other'."""
import json
from meshai.central.consumer import CentralConsumer, category_from_subject
from meshai.config import EnvironmentalConfig
from meshai.notifications.pipeline.bus import EventBus
assert category_from_subject("central.traffic.work_zone.ok") == "traffic_congestion"
rec = []
bus = EventBus(); bus.subscribe(rec.append)
c = CentralConsumer(EnvironmentalConfig(), bus)
env = {"id": "wz1", "data": {"id": "wz1", "adapter": "wzdx",
"category": "work_zone.wzdx", "time": "2026-05-28T00:00:00Z", "severity": 1,
"geo": {"centroid": [-96.2, 36.15], "primary_region": "US-OK", "regions": ["US-OK"]},
"data": {"road": "I-44"}}}
ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode())
assert ev is not None and ev.category == "traffic_congestion"