feat(central): v0.4 C.1 Central connector backend (no-op until adapter source flipped)
Adds the backend for sourcing environmental feeds from Central's NATS
JetStream firehose instead of (or alongside) meshai's native adapters.
Architecture is Matt-approved Option 3' (dedicated package + per-adapter
source switch surfaced on the existing Environmental config).
NO-OP POSTURE (intentional): every adapter defaults to feed_source="native"
and environmental.central.enabled defaults false, so on a stock config the
CentralConsumer starts and subscribes to nothing -- behavior is byte-for-byte
v0.3. Live env_feeds.yaml is unchanged on disk; an operator who touches
nothing sees no change. Flipping an adapter to central is Phase C.3; the
dashboard UI for it is Phase C.2.
What landed:
- meshai/central/ package (CentralConsumer): async start()/stop(), JetStream
durable subscribe to subjects derived from adapters with feed_source=central,
and _on_message -> normalize -> bus.emit. nats-py is lazy-imported only on
the connect path, so no-op boot has zero NATS dependency.
- Normalization (CloudEvents envelope -> Central Event -> upstream data):
source = inner Event.adapter
category = Central hierarchical string -> meshai flat, via a small
table-driven prefix map (map_category)
severity = 0|1->routine, 2->priority, 3|4->immediate, null->routine
lat/lon = geo.centroid, swapped from GeoJSON [lon,lat] -> (lat,lon)
group_key/inhibit = outer envelope id (dedup parity with native adapters)
expires/timestamp parsed from ISO-8601
Event.data = upstream payload verbatim (generic _enriched merge, preserved
as-is incl. hydro's extra usgs_site/usgs_stats bundles)
- Tombstone (`.removed.` subject or `:removed` id suffix) -> a "clear" Event
carrying the ORIGINAL group_key (`:removed` stripped) + data._central_tombstone
so the grouper/inhibitor lets the prior event lapse naturally.
- config.py: a `_SourcedFeed` mixin adds `feed_source: native|central`
(validated in __post_init__) to all 10 adapter configs; new
CentralConsumerConfig as environmental.central { enabled, url, durable,
connect_timeout }. Both ride the generic _dict_to_dataclass coercion, so
they are GUI-editable via PUT /config/environmental (Rule 17) -- frontend
fields come in C.2.
- env/store.py: each adapter is instantiated only when
enabled AND feed_source=="native"; a feed_source=central adapter is skipped
natively (debug-logged) so Central can own it without a duplicate.
- main.py: CentralConsumer constructed + started after start_pipeline(),
stopped in stop().
DEVIATION FROM SPEC (documented): the spec named the new field `source`, but
FIRMSConfig already has a `source` field (the satellite product,
"VIIRS_SNPP_NRT"). To avoid the collision the field is named **feed_source**
across all adapters. Everything else follows the spec.
NETWORKING: zero infra change required. The meshai container already reaches
the Central NATS server directly (TCP to 100.64.0.12:4222 OK) and resolves
central.echo6.mesh via the Phase 2.6.6 MagicDNS fix. No docker-compose edit;
default bridge works (LXC host masquerades to the Tailscale CGNAT range). The
lighter bridge-route / host-net / sidecar fallbacks were not needed.
Tests: tests/test_central_consumer.py (11) + tests/test_config_source_field.py
(6): no-op-when-native, subjects-when-central, source-gate skips native
instantiation, normalize+emit, _enriched preserved verbatim, tombstone->clear,
severity map (0-4/null), category map (>=4 strings), async _on_message
emits+acks, start() no-op without NATS, feed_source default/validate/reject/
dict-coercion. Full suite: 269 passed (was 253 + 16 new).
Verification: (A) no bare self._x() in consumer.py. (B) py_compile clean.
(C) 269 passed. (D) rebuilt prod -- 8 native adapters, pipeline started,
native nifc/traffic emissions still flowing, healthy, no errors, log
"CentralConsumer started; 0 subjects subscribed -- no adapters set to central".
(E) in-container synthetic _on_message injection normalized correctly
(usgs_quake/earthquake_event/immediate, centroid swapped, _enriched preserved)
and reached the bus; ephemeral, no config change to roll back.
C.2 (dashboard frontend for the feed_source switch + central connection) is next.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 02:28:19 +00:00
|
|
|
"""v0.4 C.1: Central connector backend — normalization, lifecycle, source gate."""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from meshai.config import EnvironmentalConfig
|
|
|
|
|
from meshai.central.consumer import CentralConsumer, map_category, map_severity
|
|
|
|
|
from meshai.notifications.pipeline.bus import EventBus
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_consumer():
|
|
|
|
|
env = EnvironmentalConfig()
|
|
|
|
|
bus = EventBus()
|
|
|
|
|
rec = []
|
|
|
|
|
bus.subscribe(rec.append)
|
|
|
|
|
return CentralConsumer(env, bus), env, rec
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def envelope(adapter="usgs_quake", category="quake.event", severity=2,
|
|
|
|
|
eid="us6000abcd", centroid=(-114.5, 42.6), upstream=None,
|
|
|
|
|
time="2026-05-27T12:00:00Z", expires=None):
|
|
|
|
|
return {
|
|
|
|
|
"id": eid, "source": "central.echo6.co",
|
|
|
|
|
"type": f"central.{category}.v1", "time": time,
|
|
|
|
|
"centralcategory": category, "centralseverity": severity,
|
|
|
|
|
"specversion": "1.0", "datacontenttype": "application/json",
|
|
|
|
|
"data": {
|
|
|
|
|
"id": eid, "adapter": adapter, "category": category,
|
|
|
|
|
"time": time, "expires": expires, "severity": severity,
|
|
|
|
|
"geo": {"centroid": list(centroid), "bbox": None,
|
|
|
|
|
"regions": ["US-ID"], "primary_region": "US-ID"},
|
|
|
|
|
"data": upstream if upstream is not None else {"magnitude": 4.2, "place": "near Twin Falls"},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FakeMsg:
|
|
|
|
|
def __init__(self, subject, env):
|
|
|
|
|
self.subject = subject
|
|
|
|
|
self.data = json.dumps(env).encode()
|
|
|
|
|
self.acked = False
|
|
|
|
|
|
|
|
|
|
async def ack(self):
|
|
|
|
|
self.acked = True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---- subject derivation / source gate ----
|
|
|
|
|
|
|
|
|
|
def test_no_subjects_when_all_native():
|
|
|
|
|
c, env, rec = make_consumer()
|
|
|
|
|
assert c.subjects() == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_subjects_when_central():
|
feat(central): v0.5.4 -- region-aware subscriptions using Central v0.9.20 regional subjects
Pre-v0.5.4 every Central subscription used a bare wildcard (central.wx.>,
central.fire.>, central.traffic.>, central.quake.>, central.hydro.>,
central.space.>), so a Magic Valley operator flipping nws -> central was
in fact subscribing to the all-US firehose and discarding 95% of events
locally. Central v0.9.20 (2026-05-28) added per-region subject suffixes
so the firehose can be filtered server-side. This wires meshai to use them.
Backend (meshai/central/consumer.py):
- New _subjects_for(adapter, region) replaces the static ADAPTER_SUBJECTS
dict. ADAPTER_SUBJECTS is retained as an alias to _SUBJECTS_BARE for any
legacy importers; the dispatcher path is unchanged.
- Per-adapter subject patterns (region='us.id' default):
nws -> central.wx.alert.us.id.> (region BEFORE wildcard)
usgs_quake -> central.quake.event.>.us.id (region AFTER wildcard)
firms -> central.fire.hotspot.>.us.id
fires -> central.fire.incident.id.> (state token at fixed depth)
central.fire.perimeter.id.>
traffic -> central.traffic.>.id (bare state, no us. prefix)
roads511 -> central.traffic.>.id (shared with traffic, sub-adapter routing)
usgs -> central.hydro.>.us.id
central.hydro.>.unknown (workaround until v0.9.20.1)
swpc -> central.space.> (planetary; region ignored)
- Empty/None region falls back to bare wildcards (pre-v0.9.20 behaviour).
- _subject_owned() pulls region from env.central.region and routes through
_subjects_for; v0.5.3 sub-adapter routing (owned-sources set) still
applies on shared subjects like central.traffic.>.id.
- start() logs the active region at connect-time for ops visibility.
Config (meshai/config.py):
- CentralConsumerConfig.region: str = "us.id". One region per consumer
applies to every central-flipped adapter; per-adapter overrides can
land in v0.6 when there is a real use case.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- Central Connection panel gets a Region text input next to URL/Durable.
- EnvConfig.central type extended with region: string.
- Static bundle rebuilt; index-DCFmSeOM.js -> index-B24tHcYj.js.
Tests:
- tests/test_central_region_routing.py (new, 9 cases): asserts the exact
v0.9.20 subject string for each adapter at region='us.id', the SWPC
global-stays-global rule, the USGS .unknown workaround, the empty-region
backward-compat fallback for all 8 adapters, and integration through
CentralConsumer._subject_owned() with the default region.
- tests/test_central_consumer.py + tests/test_central_sub_adapter_routing.py:
the two tests that asserted bare-wildcard subjects now set
env.central.region = "" explicitly to preserve their original concern
(no region semantics — backward-compat path only).
Why swpc stays global: space weather is planetary -- a CME is detected on
the sun, the geomagnetic response is hemispheric. There is no Idaho-only
solar event; subscribing per-region would only drop events we want.
Why hydro has the .unknown workaround: Central v0.9.20 leaves gauges
whose USGS state can't be inferred on central.hydro.>.unknown. Until
v0.9.20.1 backfills the state tag we subscribe to both filters to
avoid silently losing those rows. Idaho downstream-filtering on
data['_enriched']['usgs_site']['state'] is future v0.6 work.
Orthogonal to v0.5.2 dispatcher guards (staleness / cooldown / dedup)
and v0.5.3 sub-adapter routing: the region filter operates at the NATS
subscription layer (server-side), upstream of everything else.
Verified: pytest 327 passed (318 prior + 9 new region-routing tests);
py_compile clean; frontend build clean. Safe-mode preserved -- no toggle
enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:30:33 +00:00
|
|
|
# v0.5.4: assert the legacy bare-wildcard form by clearing region.
|
|
|
|
|
# Region-aware subject shapes are covered by test_central_region_routing.py.
|
feat(central): v0.4 C.1 Central connector backend (no-op until adapter source flipped)
Adds the backend for sourcing environmental feeds from Central's NATS
JetStream firehose instead of (or alongside) meshai's native adapters.
Architecture is Matt-approved Option 3' (dedicated package + per-adapter
source switch surfaced on the existing Environmental config).
NO-OP POSTURE (intentional): every adapter defaults to feed_source="native"
and environmental.central.enabled defaults false, so on a stock config the
CentralConsumer starts and subscribes to nothing -- behavior is byte-for-byte
v0.3. Live env_feeds.yaml is unchanged on disk; an operator who touches
nothing sees no change. Flipping an adapter to central is Phase C.3; the
dashboard UI for it is Phase C.2.
What landed:
- meshai/central/ package (CentralConsumer): async start()/stop(), JetStream
durable subscribe to subjects derived from adapters with feed_source=central,
and _on_message -> normalize -> bus.emit. nats-py is lazy-imported only on
the connect path, so no-op boot has zero NATS dependency.
- Normalization (CloudEvents envelope -> Central Event -> upstream data):
source = inner Event.adapter
category = Central hierarchical string -> meshai flat, via a small
table-driven prefix map (map_category)
severity = 0|1->routine, 2->priority, 3|4->immediate, null->routine
lat/lon = geo.centroid, swapped from GeoJSON [lon,lat] -> (lat,lon)
group_key/inhibit = outer envelope id (dedup parity with native adapters)
expires/timestamp parsed from ISO-8601
Event.data = upstream payload verbatim (generic _enriched merge, preserved
as-is incl. hydro's extra usgs_site/usgs_stats bundles)
- Tombstone (`.removed.` subject or `:removed` id suffix) -> a "clear" Event
carrying the ORIGINAL group_key (`:removed` stripped) + data._central_tombstone
so the grouper/inhibitor lets the prior event lapse naturally.
- config.py: a `_SourcedFeed` mixin adds `feed_source: native|central`
(validated in __post_init__) to all 10 adapter configs; new
CentralConsumerConfig as environmental.central { enabled, url, durable,
connect_timeout }. Both ride the generic _dict_to_dataclass coercion, so
they are GUI-editable via PUT /config/environmental (Rule 17) -- frontend
fields come in C.2.
- env/store.py: each adapter is instantiated only when
enabled AND feed_source=="native"; a feed_source=central adapter is skipped
natively (debug-logged) so Central can own it without a duplicate.
- main.py: CentralConsumer constructed + started after start_pipeline(),
stopped in stop().
DEVIATION FROM SPEC (documented): the spec named the new field `source`, but
FIRMSConfig already has a `source` field (the satellite product,
"VIIRS_SNPP_NRT"). To avoid the collision the field is named **feed_source**
across all adapters. Everything else follows the spec.
NETWORKING: zero infra change required. The meshai container already reaches
the Central NATS server directly (TCP to 100.64.0.12:4222 OK) and resolves
central.echo6.mesh via the Phase 2.6.6 MagicDNS fix. No docker-compose edit;
default bridge works (LXC host masquerades to the Tailscale CGNAT range). The
lighter bridge-route / host-net / sidecar fallbacks were not needed.
Tests: tests/test_central_consumer.py (11) + tests/test_config_source_field.py
(6): no-op-when-native, subjects-when-central, source-gate skips native
instantiation, normalize+emit, _enriched preserved verbatim, tombstone->clear,
severity map (0-4/null), category map (>=4 strings), async _on_message
emits+acks, start() no-op without NATS, feed_source default/validate/reject/
dict-coercion. Full suite: 269 passed (was 253 + 16 new).
Verification: (A) no bare self._x() in consumer.py. (B) py_compile clean.
(C) 269 passed. (D) rebuilt prod -- 8 native adapters, pipeline started,
native nifc/traffic emissions still flowing, healthy, no errors, log
"CentralConsumer started; 0 subjects subscribed -- no adapters set to central".
(E) in-container synthetic _on_message injection normalized correctly
(usgs_quake/earthquake_event/immediate, centroid swapped, _enriched preserved)
and reached the bus; ephemeral, no config change to roll back.
C.2 (dashboard frontend for the feed_source switch + central connection) is next.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 02:28:19 +00:00
|
|
|
c, env, rec = make_consumer()
|
feat(central): v0.5.4 -- region-aware subscriptions using Central v0.9.20 regional subjects
Pre-v0.5.4 every Central subscription used a bare wildcard (central.wx.>,
central.fire.>, central.traffic.>, central.quake.>, central.hydro.>,
central.space.>), so a Magic Valley operator flipping nws -> central was
in fact subscribing to the all-US firehose and discarding 95% of events
locally. Central v0.9.20 (2026-05-28) added per-region subject suffixes
so the firehose can be filtered server-side. This wires meshai to use them.
Backend (meshai/central/consumer.py):
- New _subjects_for(adapter, region) replaces the static ADAPTER_SUBJECTS
dict. ADAPTER_SUBJECTS is retained as an alias to _SUBJECTS_BARE for any
legacy importers; the dispatcher path is unchanged.
- Per-adapter subject patterns (region='us.id' default):
nws -> central.wx.alert.us.id.> (region BEFORE wildcard)
usgs_quake -> central.quake.event.>.us.id (region AFTER wildcard)
firms -> central.fire.hotspot.>.us.id
fires -> central.fire.incident.id.> (state token at fixed depth)
central.fire.perimeter.id.>
traffic -> central.traffic.>.id (bare state, no us. prefix)
roads511 -> central.traffic.>.id (shared with traffic, sub-adapter routing)
usgs -> central.hydro.>.us.id
central.hydro.>.unknown (workaround until v0.9.20.1)
swpc -> central.space.> (planetary; region ignored)
- Empty/None region falls back to bare wildcards (pre-v0.9.20 behaviour).
- _subject_owned() pulls region from env.central.region and routes through
_subjects_for; v0.5.3 sub-adapter routing (owned-sources set) still
applies on shared subjects like central.traffic.>.id.
- start() logs the active region at connect-time for ops visibility.
Config (meshai/config.py):
- CentralConsumerConfig.region: str = "us.id". One region per consumer
applies to every central-flipped adapter; per-adapter overrides can
land in v0.6 when there is a real use case.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- Central Connection panel gets a Region text input next to URL/Durable.
- EnvConfig.central type extended with region: string.
- Static bundle rebuilt; index-DCFmSeOM.js -> index-B24tHcYj.js.
Tests:
- tests/test_central_region_routing.py (new, 9 cases): asserts the exact
v0.9.20 subject string for each adapter at region='us.id', the SWPC
global-stays-global rule, the USGS .unknown workaround, the empty-region
backward-compat fallback for all 8 adapters, and integration through
CentralConsumer._subject_owned() with the default region.
- tests/test_central_consumer.py + tests/test_central_sub_adapter_routing.py:
the two tests that asserted bare-wildcard subjects now set
env.central.region = "" explicitly to preserve their original concern
(no region semantics — backward-compat path only).
Why swpc stays global: space weather is planetary -- a CME is detected on
the sun, the geomagnetic response is hemispheric. There is no Idaho-only
solar event; subscribing per-region would only drop events we want.
Why hydro has the .unknown workaround: Central v0.9.20 leaves gauges
whose USGS state can't be inferred on central.hydro.>.unknown. Until
v0.9.20.1 backfills the state tag we subscribe to both filters to
avoid silently losing those rows. Idaho downstream-filtering on
data['_enriched']['usgs_site']['state'] is future v0.6 work.
Orthogonal to v0.5.2 dispatcher guards (staleness / cooldown / dedup)
and v0.5.3 sub-adapter routing: the region filter operates at the NATS
subscription layer (server-side), upstream of everything else.
Verified: pytest 327 passed (318 prior + 9 new region-routing tests);
py_compile clean; frontend build clean. Safe-mode preserved -- no toggle
enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:30:33 +00:00
|
|
|
env.central.region = ""
|
feat(central): v0.4 C.1 Central connector backend (no-op until adapter source flipped)
Adds the backend for sourcing environmental feeds from Central's NATS
JetStream firehose instead of (or alongside) meshai's native adapters.
Architecture is Matt-approved Option 3' (dedicated package + per-adapter
source switch surfaced on the existing Environmental config).
NO-OP POSTURE (intentional): every adapter defaults to feed_source="native"
and environmental.central.enabled defaults false, so on a stock config the
CentralConsumer starts and subscribes to nothing -- behavior is byte-for-byte
v0.3. Live env_feeds.yaml is unchanged on disk; an operator who touches
nothing sees no change. Flipping an adapter to central is Phase C.3; the
dashboard UI for it is Phase C.2.
What landed:
- meshai/central/ package (CentralConsumer): async start()/stop(), JetStream
durable subscribe to subjects derived from adapters with feed_source=central,
and _on_message -> normalize -> bus.emit. nats-py is lazy-imported only on
the connect path, so no-op boot has zero NATS dependency.
- Normalization (CloudEvents envelope -> Central Event -> upstream data):
source = inner Event.adapter
category = Central hierarchical string -> meshai flat, via a small
table-driven prefix map (map_category)
severity = 0|1->routine, 2->priority, 3|4->immediate, null->routine
lat/lon = geo.centroid, swapped from GeoJSON [lon,lat] -> (lat,lon)
group_key/inhibit = outer envelope id (dedup parity with native adapters)
expires/timestamp parsed from ISO-8601
Event.data = upstream payload verbatim (generic _enriched merge, preserved
as-is incl. hydro's extra usgs_site/usgs_stats bundles)
- Tombstone (`.removed.` subject or `:removed` id suffix) -> a "clear" Event
carrying the ORIGINAL group_key (`:removed` stripped) + data._central_tombstone
so the grouper/inhibitor lets the prior event lapse naturally.
- config.py: a `_SourcedFeed` mixin adds `feed_source: native|central`
(validated in __post_init__) to all 10 adapter configs; new
CentralConsumerConfig as environmental.central { enabled, url, durable,
connect_timeout }. Both ride the generic _dict_to_dataclass coercion, so
they are GUI-editable via PUT /config/environmental (Rule 17) -- frontend
fields come in C.2.
- env/store.py: each adapter is instantiated only when
enabled AND feed_source=="native"; a feed_source=central adapter is skipped
natively (debug-logged) so Central can own it without a duplicate.
- main.py: CentralConsumer constructed + started after start_pipeline(),
stopped in stop().
DEVIATION FROM SPEC (documented): the spec named the new field `source`, but
FIRMSConfig already has a `source` field (the satellite product,
"VIIRS_SNPP_NRT"). To avoid the collision the field is named **feed_source**
across all adapters. Everything else follows the spec.
NETWORKING: zero infra change required. The meshai container already reaches
the Central NATS server directly (TCP to 100.64.0.12:4222 OK) and resolves
central.echo6.mesh via the Phase 2.6.6 MagicDNS fix. No docker-compose edit;
default bridge works (LXC host masquerades to the Tailscale CGNAT range). The
lighter bridge-route / host-net / sidecar fallbacks were not needed.
Tests: tests/test_central_consumer.py (11) + tests/test_config_source_field.py
(6): no-op-when-native, subjects-when-central, source-gate skips native
instantiation, normalize+emit, _enriched preserved verbatim, tombstone->clear,
severity map (0-4/null), category map (>=4 strings), async _on_message
emits+acks, start() no-op without NATS, feed_source default/validate/reject/
dict-coercion. Full suite: 269 passed (was 253 + 16 new).
Verification: (A) no bare self._x() in consumer.py. (B) py_compile clean.
(C) 269 passed. (D) rebuilt prod -- 8 native adapters, pipeline started,
native nifc/traffic emissions still flowing, healthy, no errors, log
"CentralConsumer started; 0 subjects subscribed -- no adapters set to central".
(E) in-container synthetic _on_message injection normalized correctly
(usgs_quake/earthquake_event/immediate, centroid swapped, _enriched preserved)
and reached the bus; ephemeral, no config change to roll back.
C.2 (dashboard frontend for the feed_source switch + central connection) is next.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 02:28:19 +00:00
|
|
|
env.usgs_quake.feed_source = "central"
|
|
|
|
|
assert "central.quake.>" in c.subjects()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_source_central_skips_native_instantiation():
|
|
|
|
|
from meshai.env.store import EnvironmentalStore
|
|
|
|
|
env = EnvironmentalConfig()
|
|
|
|
|
env.enabled = True
|
|
|
|
|
env.usgs_quake.enabled = True
|
|
|
|
|
env.usgs_quake.feed_source = "central" # should be skipped natively
|
|
|
|
|
env.nws.enabled = True # native -> present
|
|
|
|
|
store = EnvironmentalStore(config=env, region_anchors=[], event_bus=None)
|
|
|
|
|
assert "usgs_quake" not in store._adapters
|
|
|
|
|
assert "nws" in store._adapters
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---- normalization ----
|
|
|
|
|
|
|
|
|
|
def test_normalize_and_emit():
|
|
|
|
|
c, env, rec = make_consumer()
|
|
|
|
|
ev = c._handle("central.quake.event.moderate", json.dumps(envelope()).encode())
|
|
|
|
|
assert ev is not None
|
|
|
|
|
assert len(rec) == 1
|
|
|
|
|
e = rec[0]
|
|
|
|
|
assert e.source == "usgs_quake"
|
|
|
|
|
assert e.category == "earthquake_event"
|
|
|
|
|
assert e.severity == "priority" # central severity 2
|
|
|
|
|
assert e.lat == 42.6 and e.lon == -114.5 # [lon,lat] -> (lat,lon)
|
|
|
|
|
assert e.group_key == "us6000abcd"
|
|
|
|
|
assert e.region == "US-ID"
|
|
|
|
|
assert e.data.get("magnitude") == 4.2 # upstream preserved verbatim
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_enriched_preserved_verbatim():
|
|
|
|
|
c, env, rec = make_consumer()
|
|
|
|
|
up = {"magnitude": 5.1, "_enriched": {"geocoder": {"state": "Idaho"}, "usgs_stats": {"x": 1}}}
|
|
|
|
|
ev = c._handle("central.quake.event.strong", json.dumps(envelope(severity=4, upstream=up)).encode())
|
|
|
|
|
assert ev.severity == "immediate"
|
|
|
|
|
assert ev.data["_enriched"]["geocoder"]["state"] == "Idaho"
|
|
|
|
|
assert ev.data["_enriched"]["usgs_stats"] == {"x": 1}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_tombstone_translates_to_clear():
|
|
|
|
|
c, env, rec = make_consumer()
|
|
|
|
|
msg = envelope(adapter="gdacs", category="disaster.fl.removed", severity=0, eid="FL1103885:removed")
|
|
|
|
|
ev = c._handle("central.disaster.fl.removed.austria", json.dumps(msg).encode())
|
|
|
|
|
assert ev is not None
|
|
|
|
|
assert ev.group_key == "FL1103885" # ':removed' stripped -> matches original
|
|
|
|
|
assert ev.data.get("_central_tombstone") is True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_severity_mapping():
|
|
|
|
|
assert map_severity(0) == "routine"
|
|
|
|
|
assert map_severity(1) == "routine"
|
|
|
|
|
assert map_severity(2) == "priority"
|
|
|
|
|
assert map_severity(3) == "immediate"
|
|
|
|
|
assert map_severity(4) == "immediate"
|
|
|
|
|
assert map_severity(None) == "routine"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_category_mapping():
|
|
|
|
|
assert map_category("wx.alert.severe_thunderstorm_warning") == "weather_warning"
|
|
|
|
|
assert map_category("quake.event") == "earthquake_event"
|
|
|
|
|
assert map_category("fire.hotspot.viirs_noaa20.high") == "wildfire_hotspot"
|
|
|
|
|
assert map_category("hydro.00060.usgs.06901250") == "stream_flow"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---- async callback path ----
|
|
|
|
|
|
|
|
|
|
def test_on_message_emits_and_acks():
|
|
|
|
|
c, env, rec = make_consumer()
|
|
|
|
|
msg = FakeMsg("central.quake.event.moderate", envelope())
|
|
|
|
|
asyncio.run(c._on_message(msg))
|
|
|
|
|
assert msg.acked is True
|
|
|
|
|
assert len(rec) == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_start_no_op_when_all_native():
|
|
|
|
|
"""start() is a no-op (no NATS connect) when no adapter is central."""
|
|
|
|
|
c, env, rec = make_consumer()
|
|
|
|
|
asyncio.run(c.start()) # must not raise / must not require NATS
|
|
|
|
|
assert c._nc is None
|
fix(central): v0.4 C.3.1 -- preserve secret refs in save_section + deliver_policy=NEW (no backlog flood)
Fixes the two real bugs C.3 surfaced when flipping usgs_quake to central.
BUG #1 -- GUI save dropped ${VAR} secret refs (config_loader.save_section).
before: A GUI PUT round-trips the *interpolated* secret value (GET returns the
resolved key string, e.g. the real TomTom key). save_section's
check_secrets saw a literal string at a SECRET_FIELDS path, didn't
recognize it as a ref, and DROPPED it -- losing the on-disk
${TOMTOM_API_KEY} placeholder. C.3's flip PUT stripped TomTom's key.
after: check_secrets now reads the raw on-disk value (pre-interpolation) for
each secret field and decides three ways:
on-disk ${VAR} and new == resolved(VAR) -> keep the ${VAR} ref
on-disk ${VAR} and new != resolved(VAR) -> intentional change, store it
no on-disk ${VAR} ref -> reject (never write a raw
secret to a domain file)
${VAR} resolution mirrors load: os.environ first, then /data/secrets/.env.
The common case (GUI re-saves unchanged config) now preserves the
placeholder instead of dropping it.
BUG #2 -- CentralConsumer replayed the entire retained backlog on first flip.
before: js.subscribe(...) with no config -> default deliver_policy=all. Fine
for quake (682 msgs) but would flood the bus with ~330k traffic_flow
messages on first flip.
after: consumer_config() -> ConsumerConfig(deliver_policy=DeliverPolicy.NEW):
only messages published AFTER consumer creation. meshai won't see the
backlog on first flip -- acceptable, Central is a live firehose for
current events. (NOT geo-filtering -- that's a Central-side issue filed
separately for the Central project.)
Files: meshai/config_loader.py (save_section secret preservation),
meshai/central/consumer.py (consumer_config() + deliver_policy=NEW),
tests/test_save_section_secret_preserve.py (new),
tests/test_central_consumer.py (deliver_policy assertion).
Verification:
- (A) py_compile clean on config_loader.py + consumer.py.
- (C) pytest -q: 276 passed (272 + 4 new -- preserve-unchanged-ref,
changed-value-written, no-placeholder-still-rejects, deliver_policy=NEW).
The C.2.1 strip test still passes (no placeholder -> reject).
- (D) In-prod (rebuilt): GET+PUT /api/config/environmental round-trip ->
{"saved":true}; on-disk traffic.api_key stayed '${TOMTOM_API_KEY}'
(SECRET_REF_PRESERVED: True), not the literal key; disk restored to baseline.
consumer_config().deliver_policy == DeliverPolicy.NEW in the built image.
Follow-up for D rollout: the durable 'meshai-v04-central_quake_' created during
C.3 was made with deliver_policy=all; re-flipping a domain may need that stale
durable deleted on the Central NATS server first (config mismatch on re-subscribe).
D rollout (remaining domains) is now safe: GUI flips preserve secret refs and
new subscriptions don't replay huge backlogs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 04:55:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_consumer_config_uses_deliver_policy_new():
|
|
|
|
|
"""C.3.1: Central subscriptions use deliver_policy=NEW (no full-backlog replay)."""
|
|
|
|
|
from meshai.central.consumer import consumer_config
|
|
|
|
|
from nats.js.api import DeliverPolicy
|
|
|
|
|
assert consumer_config().deliver_policy == DeliverPolicy.NEW
|
fix(central): v0.4 D.1 -- subject-domain category fallback (traffic 'work_zone.wzdx' was mapping to 'other')
Surfaced during the Phase D rollout flipping all five remaining domains to
central. Central's traffic categories are NOT domain-prefixed -- the inner
Event.category for a work zone is "work_zone.wzdx", not "traffic.work_zone".
The prefix table in map_category therefore missed and returned "other", which
would break category-based routing/digest grouping for central-sourced traffic.
before: map_category("work_zone.wzdx") -> "other"
after: when the category table misses, fall back to the stable subject domain
token (central.<domain>.<...>): central.traffic.* -> traffic_congestion.
Added category_from_subject() + a domain->category map (wx, fire, quake,
hydro, space, disaster, traffic, traffic_flow, traffic_cameras). The
well-prefixed domains (wx.alert, fire.incident, hydro., space.alert)
still match the primary table; the fallback only fires on a miss, so a
known domain never yields "other" again.
Test: tests/test_central_consumer.py gains test_subject_domain_fallback_for_unmapped_category
(category_from_subject + a 'work_zone.wzdx' message -> traffic_congestion).
Full suite: 277 passed.
Verified in prod (rebuilt, all 5 flipped to central): the per-domain
LAST_PER_SUBJECT normalize probe now shows traffic -> category=traffic_congestion
(was 'other'); the other four domains unchanged and clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 05:05:12 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_subject_domain_fallback_for_unmapped_category():
|
|
|
|
|
"""D.1: an unmapped category (traffic 'work_zone.wzdx') falls back to the
|
|
|
|
|
subject domain instead of returning 'other'."""
|
|
|
|
|
import json
|
|
|
|
|
from meshai.central.consumer import CentralConsumer, category_from_subject
|
|
|
|
|
from meshai.config import EnvironmentalConfig
|
|
|
|
|
from meshai.notifications.pipeline.bus import EventBus
|
|
|
|
|
assert category_from_subject("central.traffic.work_zone.ok") == "traffic_congestion"
|
|
|
|
|
rec = []
|
|
|
|
|
bus = EventBus(); bus.subscribe(rec.append)
|
|
|
|
|
c = CentralConsumer(EnvironmentalConfig(), bus)
|
|
|
|
|
env = {"id": "wz1", "data": {"id": "wz1", "adapter": "wzdx",
|
|
|
|
|
"category": "work_zone.wzdx", "time": "2026-05-28T00:00:00Z", "severity": 1,
|
|
|
|
|
"geo": {"centroid": [-96.2, 36.15], "primary_region": "US-OK", "regions": ["US-OK"]},
|
|
|
|
|
"data": {"road": "I-44"}}}
|
|
|
|
|
ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode())
|
|
|
|
|
assert ev is not None and ev.category == "traffic_congestion"
|
fix(central): v0.4 D.2 -- remap Central adapter names to meshai source for consistent dashboard attribution
Phase D catalogued a source-name divergence: central-sourced events carried
Central's adapter name (wfigs_incidents, nwis, swpc_alerts, wzdx) rather than
meshai's native source (fires, usgs, swpc, traffic), so the C.2 family-tab
per-adapter event filtering (which keys on the native source name) wouldn't
group central events under the right adapter.
Fix: CENTRAL_ADAPTER_TO_SOURCE table in consumer.py; normalize() now remaps
inner Event.adapter -> meshai source, falling back to the literal adapter name
for anything not in the table (logged at DEBUG when a translation happens).
before -> after (Event.source):
wfigs_incidents / wfigs_perimeters -> fires
nwis -> usgs
swpc_alerts / swpc_kindex / swpc_protons -> swpc
wzdx -> traffic
nws, usgs_quake, firms -> unchanged (1:1, omitted from table)
unknown (e.g. experimental_foo) -> passthrough as-is
Tests: tests/test_central_consumer.py parametrized test_central_adapter_source_remap
(6 cases: 4 remaps + nws passthrough + unknown passthrough). Full suite: 283 passed.
In-prod verify (rebuilt, ephemeral probe over real Central data): the four
observed adapters now normalize to source=fires/usgs/swpc/traffic; nws passes
through. No live flip needed; container stays native baseline + healthy.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 06:12:47 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("adapter,expected", [
|
|
|
|
|
("wfigs_incidents", "fires"),
|
|
|
|
|
("nwis", "usgs"),
|
|
|
|
|
("swpc_alerts", "swpc"),
|
|
|
|
|
("wzdx", "traffic"),
|
|
|
|
|
("nws", "nws"), # 1:1 passthrough
|
|
|
|
|
("experimental_foo", "experimental_foo"), # unknown -> passthrough
|
|
|
|
|
])
|
|
|
|
|
def test_central_adapter_source_remap(adapter, expected):
|
|
|
|
|
"""D.2: Central adapter names map to meshai source names (unknown passes through)."""
|
|
|
|
|
import json
|
|
|
|
|
from meshai.central.consumer import CentralConsumer
|
|
|
|
|
from meshai.config import EnvironmentalConfig
|
|
|
|
|
from meshai.notifications.pipeline.bus import EventBus
|
|
|
|
|
rec = []
|
|
|
|
|
bus = EventBus(); bus.subscribe(rec.append)
|
|
|
|
|
c = CentralConsumer(EnvironmentalConfig(), bus)
|
|
|
|
|
env = {"id": "e1", "data": {"id": "e1", "adapter": adapter, "category": "wx.alert.x",
|
|
|
|
|
"time": "2026-05-28T00:00:00Z", "severity": 1,
|
|
|
|
|
"geo": {"centroid": [-114.0, 42.0], "primary_region": "US-ID", "regions": ["US-ID"]},
|
|
|
|
|
"data": {}}}
|
|
|
|
|
ev = c._handle("central.wx.alert.x", json.dumps(env).encode())
|
|
|
|
|
assert ev is not None and ev.source == expected
|