feat(central): v0.5.3 -- roads511 + FIRMS central feeds with sub-adapter routing (recover stashed v0.5.1)
C.2 (v0.4) marked roads511 and FIRMS native-only in the dashboard despite the
Central event bus shipping both stream families. This recovers the v0.5.1
work paused before the v0.5.2 spam fix and lands it cleanly on top of v0.5.2.
Backend (meshai/central/consumer.py):
- ADAPTER_SUBJECTS: roads511 now subscribes to central.traffic.> (shared
with the existing traffic adapter); firms widened from
central.fire.hotspot.> to central.fire.>.
- CENTRAL_ADAPTER_TO_SOURCE: three new sub-adapter remaps so the inner
envelope.adapter routes correctly inside a shared subject --
tomtom_incidents -> traffic, state_511_atis -> roads511, firms -> firms.
- New _subject_owned() -> dict[subject_filter, set[meshai_source]]:
builds the per-subject ownership set so a single central.traffic.>
subscription can be owned by {traffic, roads511} simultaneously.
- subjects() now derives from _subject_owned().keys().
- _make_cb(owned) binds the owned set per-subscription; _on_message
forwards it.
- _handle(subject, raw, owned=None) drops events whose remapped source
isn't in the owned set (silent debug log). Enabling roads511 alone
no longer accidentally consumes wzdx; enabling traffic alone no
longer consumes state_511_atis.
- start() subscribes per (subject, owned) tuple; per-subject log line
records the owned sources at startup.
- Removed roads511 from the "no Central mapping" warning loop now that
it has one.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- roads511 META entry: hasCentral false -> true, nativeOnly true -> false
(the FIRMS entry was already correct).
- Static bundle rebuilt via npm run build; old index-CfYlhn4e.js dropped,
new index-DCFmSeOM.js + index-DjhQa8Mv.css land under static/assets;
index.html updated to the new bundle hash.
Tests (tests/test_central_sub_adapter_routing.py, 8 new):
- roads511-only drops wzdx; emits state_511_atis as source=roads511.
- traffic+roads511 both central: wzdx -> traffic, state_511_atis -> roads511.
- firms-only drops wfigs_incidents; emits hotspots as source=firms.
- tomtom_incidents remaps to traffic.
- _subject_owned() shares central.traffic.> across {traffic, roads511}.
Orthogonal to v0.5.2's dispatcher guards: cooldown / dedup / staleness still
apply downstream of the consumer; the owned-sources filter operates one
layer up at message ingest. No changes to the dispatcher path.
Verified: pytest 318 passed (310 prior + 8 new routing tests); py_compile clean.
Safe-mode preserved -- no toggle enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:16:54 +00:00
|
|
|
"""v0.5.1: sub-adapter (owned-sources) routing for shared Central subjects."""
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
from meshai.config import EnvironmentalConfig
|
|
|
|
|
from meshai.central.consumer import CentralConsumer
|
|
|
|
|
from meshai.notifications.pipeline.bus import EventBus
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _envelope(adapter, category="x.y", eid="e1"):
|
|
|
|
|
return {"id": eid, "data": {
|
|
|
|
|
"id": eid, "adapter": adapter, "category": category,
|
|
|
|
|
"time": "2026-05-28T00:00:00Z", "severity": 1,
|
|
|
|
|
"geo": {"centroid": [-114.0, 42.0], "primary_region": "US-ID", "regions": ["US-ID"]},
|
|
|
|
|
"data": {}}}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _route(central, adapter, subject, category="x.y"):
|
|
|
|
|
"""Simulate a message arriving on the subscription that matches `subject`,
|
feat(central): v0.5.4 -- region-aware subscriptions using Central v0.9.20 regional subjects
Pre-v0.5.4 every Central subscription used a bare wildcard (central.wx.>,
central.fire.>, central.traffic.>, central.quake.>, central.hydro.>,
central.space.>), so a Magic Valley operator flipping nws -> central was
in fact subscribing to the all-US firehose and discarding 95% of events
locally. Central v0.9.20 (2026-05-28) added per-region subject suffixes
so the firehose can be filtered server-side. This wires meshai to use them.
Backend (meshai/central/consumer.py):
- New _subjects_for(adapter, region) replaces the static ADAPTER_SUBJECTS
dict. ADAPTER_SUBJECTS is retained as an alias to _SUBJECTS_BARE for any
legacy importers; the dispatcher path is unchanged.
- Per-adapter subject patterns (region='us.id' default):
nws -> central.wx.alert.us.id.> (region BEFORE wildcard)
usgs_quake -> central.quake.event.>.us.id (region AFTER wildcard)
firms -> central.fire.hotspot.>.us.id
fires -> central.fire.incident.id.> (state token at fixed depth)
central.fire.perimeter.id.>
traffic -> central.traffic.>.id (bare state, no us. prefix)
roads511 -> central.traffic.>.id (shared with traffic, sub-adapter routing)
usgs -> central.hydro.>.us.id
central.hydro.>.unknown (workaround until v0.9.20.1)
swpc -> central.space.> (planetary; region ignored)
- Empty/None region falls back to bare wildcards (pre-v0.9.20 behaviour).
- _subject_owned() pulls region from env.central.region and routes through
_subjects_for; v0.5.3 sub-adapter routing (owned-sources set) still
applies on shared subjects like central.traffic.>.id.
- start() logs the active region at connect-time for ops visibility.
Config (meshai/config.py):
- CentralConsumerConfig.region: str = "us.id". One region per consumer
applies to every central-flipped adapter; per-adapter overrides can
land in v0.6 when there is a real use case.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- Central Connection panel gets a Region text input next to URL/Durable.
- EnvConfig.central type extended with region: string.
- Static bundle rebuilt; index-DCFmSeOM.js -> index-B24tHcYj.js.
Tests:
- tests/test_central_region_routing.py (new, 9 cases): asserts the exact
v0.9.20 subject string for each adapter at region='us.id', the SWPC
global-stays-global rule, the USGS .unknown workaround, the empty-region
backward-compat fallback for all 8 adapters, and integration through
CentralConsumer._subject_owned() with the default region.
- tests/test_central_consumer.py + tests/test_central_sub_adapter_routing.py:
the two tests that asserted bare-wildcard subjects now set
env.central.region = "" explicitly to preserve their original concern
(no region semantics — backward-compat path only).
Why swpc stays global: space weather is planetary -- a CME is detected on
the sun, the geomagnetic response is hemispheric. There is no Idaho-only
solar event; subscribing per-region would only drop events we want.
Why hydro has the .unknown workaround: Central v0.9.20 leaves gauges
whose USGS state can't be inferred on central.hydro.>.unknown. Until
v0.9.20.1 backfills the state tag we subscribe to both filters to
avoid silently losing those rows. Idaho downstream-filtering on
data['_enriched']['usgs_site']['state'] is future v0.6 work.
Orthogonal to v0.5.2 dispatcher guards (staleness / cooldown / dedup)
and v0.5.3 sub-adapter routing: the region filter operates at the NATS
subscription layer (server-side), upstream of everything else.
Verified: pytest 327 passed (318 prior + 9 new region-routing tests);
py_compile clean; frontend build clean. Safe-mode preserved -- no toggle
enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:30:33 +00:00
|
|
|
with that subscription's owned-sources, and return the emitted Event (or None).
|
|
|
|
|
|
|
|
|
|
v0.5.4: this helper deliberately clears central.region so sub-adapter
|
|
|
|
|
routing is exercised against bare wildcards (its concern is the
|
|
|
|
|
owned-sources filter, not the region-aware subject shape — those are
|
|
|
|
|
tested in test_central_region_routing.py).
|
|
|
|
|
"""
|
feat(central): v0.5.3 -- roads511 + FIRMS central feeds with sub-adapter routing (recover stashed v0.5.1)
C.2 (v0.4) marked roads511 and FIRMS native-only in the dashboard despite the
Central event bus shipping both stream families. This recovers the v0.5.1
work paused before the v0.5.2 spam fix and lands it cleanly on top of v0.5.2.
Backend (meshai/central/consumer.py):
- ADAPTER_SUBJECTS: roads511 now subscribes to central.traffic.> (shared
with the existing traffic adapter); firms widened from
central.fire.hotspot.> to central.fire.>.
- CENTRAL_ADAPTER_TO_SOURCE: three new sub-adapter remaps so the inner
envelope.adapter routes correctly inside a shared subject --
tomtom_incidents -> traffic, state_511_atis -> roads511, firms -> firms.
- New _subject_owned() -> dict[subject_filter, set[meshai_source]]:
builds the per-subject ownership set so a single central.traffic.>
subscription can be owned by {traffic, roads511} simultaneously.
- subjects() now derives from _subject_owned().keys().
- _make_cb(owned) binds the owned set per-subscription; _on_message
forwards it.
- _handle(subject, raw, owned=None) drops events whose remapped source
isn't in the owned set (silent debug log). Enabling roads511 alone
no longer accidentally consumes wzdx; enabling traffic alone no
longer consumes state_511_atis.
- start() subscribes per (subject, owned) tuple; per-subject log line
records the owned sources at startup.
- Removed roads511 from the "no Central mapping" warning loop now that
it has one.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- roads511 META entry: hasCentral false -> true, nativeOnly true -> false
(the FIRMS entry was already correct).
- Static bundle rebuilt via npm run build; old index-CfYlhn4e.js dropped,
new index-DCFmSeOM.js + index-DjhQa8Mv.css land under static/assets;
index.html updated to the new bundle hash.
Tests (tests/test_central_sub_adapter_routing.py, 8 new):
- roads511-only drops wzdx; emits state_511_atis as source=roads511.
- traffic+roads511 both central: wzdx -> traffic, state_511_atis -> roads511.
- firms-only drops wfigs_incidents; emits hotspots as source=firms.
- tomtom_incidents remaps to traffic.
- _subject_owned() shares central.traffic.> across {traffic, roads511}.
Orthogonal to v0.5.2's dispatcher guards: cooldown / dedup / staleness still
apply downstream of the consumer; the owned-sources filter operates one
layer up at message ingest. No changes to the dispatcher path.
Verified: pytest 318 passed (310 prior + 8 new routing tests); py_compile clean.
Safe-mode preserved -- no toggle enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:16:54 +00:00
|
|
|
env = EnvironmentalConfig()
|
feat(central): v0.5.4 -- region-aware subscriptions using Central v0.9.20 regional subjects
Pre-v0.5.4 every Central subscription used a bare wildcard (central.wx.>,
central.fire.>, central.traffic.>, central.quake.>, central.hydro.>,
central.space.>), so a Magic Valley operator flipping nws -> central was
in fact subscribing to the all-US firehose and discarding 95% of events
locally. Central v0.9.20 (2026-05-28) added per-region subject suffixes
so the firehose can be filtered server-side. This wires meshai to use them.
Backend (meshai/central/consumer.py):
- New _subjects_for(adapter, region) replaces the static ADAPTER_SUBJECTS
dict. ADAPTER_SUBJECTS is retained as an alias to _SUBJECTS_BARE for any
legacy importers; the dispatcher path is unchanged.
- Per-adapter subject patterns (region='us.id' default):
nws -> central.wx.alert.us.id.> (region BEFORE wildcard)
usgs_quake -> central.quake.event.>.us.id (region AFTER wildcard)
firms -> central.fire.hotspot.>.us.id
fires -> central.fire.incident.id.> (state token at fixed depth)
central.fire.perimeter.id.>
traffic -> central.traffic.>.id (bare state, no us. prefix)
roads511 -> central.traffic.>.id (shared with traffic, sub-adapter routing)
usgs -> central.hydro.>.us.id
central.hydro.>.unknown (workaround until v0.9.20.1)
swpc -> central.space.> (planetary; region ignored)
- Empty/None region falls back to bare wildcards (pre-v0.9.20 behaviour).
- _subject_owned() pulls region from env.central.region and routes through
_subjects_for; v0.5.3 sub-adapter routing (owned-sources set) still
applies on shared subjects like central.traffic.>.id.
- start() logs the active region at connect-time for ops visibility.
Config (meshai/config.py):
- CentralConsumerConfig.region: str = "us.id". One region per consumer
applies to every central-flipped adapter; per-adapter overrides can
land in v0.6 when there is a real use case.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- Central Connection panel gets a Region text input next to URL/Durable.
- EnvConfig.central type extended with region: string.
- Static bundle rebuilt; index-DCFmSeOM.js -> index-B24tHcYj.js.
Tests:
- tests/test_central_region_routing.py (new, 9 cases): asserts the exact
v0.9.20 subject string for each adapter at region='us.id', the SWPC
global-stays-global rule, the USGS .unknown workaround, the empty-region
backward-compat fallback for all 8 adapters, and integration through
CentralConsumer._subject_owned() with the default region.
- tests/test_central_consumer.py + tests/test_central_sub_adapter_routing.py:
the two tests that asserted bare-wildcard subjects now set
env.central.region = "" explicitly to preserve their original concern
(no region semantics — backward-compat path only).
Why swpc stays global: space weather is planetary -- a CME is detected on
the sun, the geomagnetic response is hemispheric. There is no Idaho-only
solar event; subscribing per-region would only drop events we want.
Why hydro has the .unknown workaround: Central v0.9.20 leaves gauges
whose USGS state can't be inferred on central.hydro.>.unknown. Until
v0.9.20.1 backfills the state tag we subscribe to both filters to
avoid silently losing those rows. Idaho downstream-filtering on
data['_enriched']['usgs_site']['state'] is future v0.6 work.
Orthogonal to v0.5.2 dispatcher guards (staleness / cooldown / dedup)
and v0.5.3 sub-adapter routing: the region filter operates at the NATS
subscription layer (server-side), upstream of everything else.
Verified: pytest 327 passed (318 prior + 9 new region-routing tests);
py_compile clean; frontend build clean. Safe-mode preserved -- no toggle
enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:30:33 +00:00
|
|
|
env.central.region = ""
|
feat(central): v0.5.3 -- roads511 + FIRMS central feeds with sub-adapter routing (recover stashed v0.5.1)
C.2 (v0.4) marked roads511 and FIRMS native-only in the dashboard despite the
Central event bus shipping both stream families. This recovers the v0.5.1
work paused before the v0.5.2 spam fix and lands it cleanly on top of v0.5.2.
Backend (meshai/central/consumer.py):
- ADAPTER_SUBJECTS: roads511 now subscribes to central.traffic.> (shared
with the existing traffic adapter); firms widened from
central.fire.hotspot.> to central.fire.>.
- CENTRAL_ADAPTER_TO_SOURCE: three new sub-adapter remaps so the inner
envelope.adapter routes correctly inside a shared subject --
tomtom_incidents -> traffic, state_511_atis -> roads511, firms -> firms.
- New _subject_owned() -> dict[subject_filter, set[meshai_source]]:
builds the per-subject ownership set so a single central.traffic.>
subscription can be owned by {traffic, roads511} simultaneously.
- subjects() now derives from _subject_owned().keys().
- _make_cb(owned) binds the owned set per-subscription; _on_message
forwards it.
- _handle(subject, raw, owned=None) drops events whose remapped source
isn't in the owned set (silent debug log). Enabling roads511 alone
no longer accidentally consumes wzdx; enabling traffic alone no
longer consumes state_511_atis.
- start() subscribes per (subject, owned) tuple; per-subject log line
records the owned sources at startup.
- Removed roads511 from the "no Central mapping" warning loop now that
it has one.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- roads511 META entry: hasCentral false -> true, nativeOnly true -> false
(the FIRMS entry was already correct).
- Static bundle rebuilt via npm run build; old index-CfYlhn4e.js dropped,
new index-DCFmSeOM.js + index-DjhQa8Mv.css land under static/assets;
index.html updated to the new bundle hash.
Tests (tests/test_central_sub_adapter_routing.py, 8 new):
- roads511-only drops wzdx; emits state_511_atis as source=roads511.
- traffic+roads511 both central: wzdx -> traffic, state_511_atis -> roads511.
- firms-only drops wfigs_incidents; emits hotspots as source=firms.
- tomtom_incidents remaps to traffic.
- _subject_owned() shares central.traffic.> across {traffic, roads511}.
Orthogonal to v0.5.2's dispatcher guards: cooldown / dedup / staleness still
apply downstream of the consumer; the owned-sources filter operates one
layer up at message ingest. No changes to the dispatcher path.
Verified: pytest 318 passed (310 prior + 8 new routing tests); py_compile clean.
Safe-mode preserved -- no toggle enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:16:54 +00:00
|
|
|
for a in central:
|
|
|
|
|
getattr(env, a).feed_source = "central"
|
|
|
|
|
rec = []
|
|
|
|
|
bus = EventBus(); bus.subscribe(rec.append)
|
|
|
|
|
c = CentralConsumer(env, bus)
|
|
|
|
|
so = c._subject_owned()
|
|
|
|
|
owned = None
|
|
|
|
|
for filt, o in so.items():
|
|
|
|
|
prefix = filt[:-1] if filt.endswith(">") else filt
|
|
|
|
|
if subject == filt or subject.startswith(prefix):
|
|
|
|
|
owned = o
|
|
|
|
|
break
|
|
|
|
|
ev = c._handle(subject, json.dumps(_envelope(adapter, category)).encode(), owned)
|
|
|
|
|
return ev, rec
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_roads511_only_drops_wzdx():
|
|
|
|
|
ev, rec = _route(["roads511"], "wzdx", "central.traffic.work_zone.ok")
|
|
|
|
|
assert ev is None and rec == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_roads511_only_emits_state_511_atis():
|
|
|
|
|
ev, rec = _route(["roads511"], "state_511_atis", "central.traffic.event.id.1")
|
|
|
|
|
assert ev is not None and ev.source == "roads511" and len(rec) == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_both_central_wzdx_routes_to_traffic():
|
|
|
|
|
ev, rec = _route(["traffic", "roads511"], "wzdx", "central.traffic.work_zone.ok")
|
|
|
|
|
assert ev is not None and ev.source == "traffic"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_both_central_state511_routes_to_roads511():
|
|
|
|
|
ev, rec = _route(["traffic", "roads511"], "state_511_atis", "central.traffic.event.id.1")
|
|
|
|
|
assert ev is not None and ev.source == "roads511"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_firms_only_drops_wfigs():
|
|
|
|
|
ev, rec = _route(["firms"], "wfigs_incidents", "central.fire.incident.mt.x")
|
|
|
|
|
assert ev is None and rec == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_firms_only_emits_firms():
|
|
|
|
|
ev, rec = _route(["firms"], "firms", "central.fire.hotspot.viirs_noaa20.high")
|
|
|
|
|
assert ev is not None and ev.source == "firms" and len(rec) == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_tomtom_incidents_remaps_to_traffic():
|
|
|
|
|
ev, rec = _route(["traffic"], "tomtom_incidents", "central.traffic.incident.x")
|
|
|
|
|
assert ev is not None and ev.source == "traffic"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_subject_owned_shares_traffic_subject():
|
feat(central): v0.5.4 -- region-aware subscriptions using Central v0.9.20 regional subjects
Pre-v0.5.4 every Central subscription used a bare wildcard (central.wx.>,
central.fire.>, central.traffic.>, central.quake.>, central.hydro.>,
central.space.>), so a Magic Valley operator flipping nws -> central was
in fact subscribing to the all-US firehose and discarding 95% of events
locally. Central v0.9.20 (2026-05-28) added per-region subject suffixes
so the firehose can be filtered server-side. This wires meshai to use them.
Backend (meshai/central/consumer.py):
- New _subjects_for(adapter, region) replaces the static ADAPTER_SUBJECTS
dict. ADAPTER_SUBJECTS is retained as an alias to _SUBJECTS_BARE for any
legacy importers; the dispatcher path is unchanged.
- Per-adapter subject patterns (region='us.id' default):
nws -> central.wx.alert.us.id.> (region BEFORE wildcard)
usgs_quake -> central.quake.event.>.us.id (region AFTER wildcard)
firms -> central.fire.hotspot.>.us.id
fires -> central.fire.incident.id.> (state token at fixed depth)
central.fire.perimeter.id.>
traffic -> central.traffic.>.id (bare state, no us. prefix)
roads511 -> central.traffic.>.id (shared with traffic, sub-adapter routing)
usgs -> central.hydro.>.us.id
central.hydro.>.unknown (workaround until v0.9.20.1)
swpc -> central.space.> (planetary; region ignored)
- Empty/None region falls back to bare wildcards (pre-v0.9.20 behaviour).
- _subject_owned() pulls region from env.central.region and routes through
_subjects_for; v0.5.3 sub-adapter routing (owned-sources set) still
applies on shared subjects like central.traffic.>.id.
- start() logs the active region at connect-time for ops visibility.
Config (meshai/config.py):
- CentralConsumerConfig.region: str = "us.id". One region per consumer
applies to every central-flipped adapter; per-adapter overrides can
land in v0.6 when there is a real use case.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- Central Connection panel gets a Region text input next to URL/Durable.
- EnvConfig.central type extended with region: string.
- Static bundle rebuilt; index-DCFmSeOM.js -> index-B24tHcYj.js.
Tests:
- tests/test_central_region_routing.py (new, 9 cases): asserts the exact
v0.9.20 subject string for each adapter at region='us.id', the SWPC
global-stays-global rule, the USGS .unknown workaround, the empty-region
backward-compat fallback for all 8 adapters, and integration through
CentralConsumer._subject_owned() with the default region.
- tests/test_central_consumer.py + tests/test_central_sub_adapter_routing.py:
the two tests that asserted bare-wildcard subjects now set
env.central.region = "" explicitly to preserve their original concern
(no region semantics — backward-compat path only).
Why swpc stays global: space weather is planetary -- a CME is detected on
the sun, the geomagnetic response is hemispheric. There is no Idaho-only
solar event; subscribing per-region would only drop events we want.
Why hydro has the .unknown workaround: Central v0.9.20 leaves gauges
whose USGS state can't be inferred on central.hydro.>.unknown. Until
v0.9.20.1 backfills the state tag we subscribe to both filters to
avoid silently losing those rows. Idaho downstream-filtering on
data['_enriched']['usgs_site']['state'] is future v0.6 work.
Orthogonal to v0.5.2 dispatcher guards (staleness / cooldown / dedup)
and v0.5.3 sub-adapter routing: the region filter operates at the NATS
subscription layer (server-side), upstream of everything else.
Verified: pytest 327 passed (318 prior + 9 new region-routing tests);
py_compile clean; frontend build clean. Safe-mode preserved -- no toggle
enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:30:33 +00:00
|
|
|
# v0.5.4: assert the legacy bare-wildcard shape by clearing region.
|
|
|
|
|
# Region-aware shared-subject behaviour ('central.traffic.>.id' for both
|
|
|
|
|
# traffic and roads511) is covered in test_central_region_routing.py.
|
feat(central): v0.5.3 -- roads511 + FIRMS central feeds with sub-adapter routing (recover stashed v0.5.1)
C.2 (v0.4) marked roads511 and FIRMS native-only in the dashboard despite the
Central event bus shipping both stream families. This recovers the v0.5.1
work paused before the v0.5.2 spam fix and lands it cleanly on top of v0.5.2.
Backend (meshai/central/consumer.py):
- ADAPTER_SUBJECTS: roads511 now subscribes to central.traffic.> (shared
with the existing traffic adapter); firms widened from
central.fire.hotspot.> to central.fire.>.
- CENTRAL_ADAPTER_TO_SOURCE: three new sub-adapter remaps so the inner
envelope.adapter routes correctly inside a shared subject --
tomtom_incidents -> traffic, state_511_atis -> roads511, firms -> firms.
- New _subject_owned() -> dict[subject_filter, set[meshai_source]]:
builds the per-subject ownership set so a single central.traffic.>
subscription can be owned by {traffic, roads511} simultaneously.
- subjects() now derives from _subject_owned().keys().
- _make_cb(owned) binds the owned set per-subscription; _on_message
forwards it.
- _handle(subject, raw, owned=None) drops events whose remapped source
isn't in the owned set (silent debug log). Enabling roads511 alone
no longer accidentally consumes wzdx; enabling traffic alone no
longer consumes state_511_atis.
- start() subscribes per (subject, owned) tuple; per-subject log line
records the owned sources at startup.
- Removed roads511 from the "no Central mapping" warning loop now that
it has one.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- roads511 META entry: hasCentral false -> true, nativeOnly true -> false
(the FIRMS entry was already correct).
- Static bundle rebuilt via npm run build; old index-CfYlhn4e.js dropped,
new index-DCFmSeOM.js + index-DjhQa8Mv.css land under static/assets;
index.html updated to the new bundle hash.
Tests (tests/test_central_sub_adapter_routing.py, 8 new):
- roads511-only drops wzdx; emits state_511_atis as source=roads511.
- traffic+roads511 both central: wzdx -> traffic, state_511_atis -> roads511.
- firms-only drops wfigs_incidents; emits hotspots as source=firms.
- tomtom_incidents remaps to traffic.
- _subject_owned() shares central.traffic.> across {traffic, roads511}.
Orthogonal to v0.5.2's dispatcher guards: cooldown / dedup / staleness still
apply downstream of the consumer; the owned-sources filter operates one
layer up at message ingest. No changes to the dispatcher path.
Verified: pytest 318 passed (310 prior + 8 new routing tests); py_compile clean.
Safe-mode preserved -- no toggle enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:16:54 +00:00
|
|
|
env = EnvironmentalConfig()
|
feat(central): v0.5.4 -- region-aware subscriptions using Central v0.9.20 regional subjects
Pre-v0.5.4 every Central subscription used a bare wildcard (central.wx.>,
central.fire.>, central.traffic.>, central.quake.>, central.hydro.>,
central.space.>), so a Magic Valley operator flipping nws -> central was
in fact subscribing to the all-US firehose and discarding 95% of events
locally. Central v0.9.20 (2026-05-28) added per-region subject suffixes
so the firehose can be filtered server-side. This wires meshai to use them.
Backend (meshai/central/consumer.py):
- New _subjects_for(adapter, region) replaces the static ADAPTER_SUBJECTS
dict. ADAPTER_SUBJECTS is retained as an alias to _SUBJECTS_BARE for any
legacy importers; the dispatcher path is unchanged.
- Per-adapter subject patterns (region='us.id' default):
nws -> central.wx.alert.us.id.> (region BEFORE wildcard)
usgs_quake -> central.quake.event.>.us.id (region AFTER wildcard)
firms -> central.fire.hotspot.>.us.id
fires -> central.fire.incident.id.> (state token at fixed depth)
central.fire.perimeter.id.>
traffic -> central.traffic.>.id (bare state, no us. prefix)
roads511 -> central.traffic.>.id (shared with traffic, sub-adapter routing)
usgs -> central.hydro.>.us.id
central.hydro.>.unknown (workaround until v0.9.20.1)
swpc -> central.space.> (planetary; region ignored)
- Empty/None region falls back to bare wildcards (pre-v0.9.20 behaviour).
- _subject_owned() pulls region from env.central.region and routes through
_subjects_for; v0.5.3 sub-adapter routing (owned-sources set) still
applies on shared subjects like central.traffic.>.id.
- start() logs the active region at connect-time for ops visibility.
Config (meshai/config.py):
- CentralConsumerConfig.region: str = "us.id". One region per consumer
applies to every central-flipped adapter; per-adapter overrides can
land in v0.6 when there is a real use case.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- Central Connection panel gets a Region text input next to URL/Durable.
- EnvConfig.central type extended with region: string.
- Static bundle rebuilt; index-DCFmSeOM.js -> index-B24tHcYj.js.
Tests:
- tests/test_central_region_routing.py (new, 9 cases): asserts the exact
v0.9.20 subject string for each adapter at region='us.id', the SWPC
global-stays-global rule, the USGS .unknown workaround, the empty-region
backward-compat fallback for all 8 adapters, and integration through
CentralConsumer._subject_owned() with the default region.
- tests/test_central_consumer.py + tests/test_central_sub_adapter_routing.py:
the two tests that asserted bare-wildcard subjects now set
env.central.region = "" explicitly to preserve their original concern
(no region semantics — backward-compat path only).
Why swpc stays global: space weather is planetary -- a CME is detected on
the sun, the geomagnetic response is hemispheric. There is no Idaho-only
solar event; subscribing per-region would only drop events we want.
Why hydro has the .unknown workaround: Central v0.9.20 leaves gauges
whose USGS state can't be inferred on central.hydro.>.unknown. Until
v0.9.20.1 backfills the state tag we subscribe to both filters to
avoid silently losing those rows. Idaho downstream-filtering on
data['_enriched']['usgs_site']['state'] is future v0.6 work.
Orthogonal to v0.5.2 dispatcher guards (staleness / cooldown / dedup)
and v0.5.3 sub-adapter routing: the region filter operates at the NATS
subscription layer (server-side), upstream of everything else.
Verified: pytest 327 passed (318 prior + 9 new region-routing tests);
py_compile clean; frontend build clean. Safe-mode preserved -- no toggle
enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:30:33 +00:00
|
|
|
env.central.region = ""
|
feat(central): v0.5.3 -- roads511 + FIRMS central feeds with sub-adapter routing (recover stashed v0.5.1)
C.2 (v0.4) marked roads511 and FIRMS native-only in the dashboard despite the
Central event bus shipping both stream families. This recovers the v0.5.1
work paused before the v0.5.2 spam fix and lands it cleanly on top of v0.5.2.
Backend (meshai/central/consumer.py):
- ADAPTER_SUBJECTS: roads511 now subscribes to central.traffic.> (shared
with the existing traffic adapter); firms widened from
central.fire.hotspot.> to central.fire.>.
- CENTRAL_ADAPTER_TO_SOURCE: three new sub-adapter remaps so the inner
envelope.adapter routes correctly inside a shared subject --
tomtom_incidents -> traffic, state_511_atis -> roads511, firms -> firms.
- New _subject_owned() -> dict[subject_filter, set[meshai_source]]:
builds the per-subject ownership set so a single central.traffic.>
subscription can be owned by {traffic, roads511} simultaneously.
- subjects() now derives from _subject_owned().keys().
- _make_cb(owned) binds the owned set per-subscription; _on_message
forwards it.
- _handle(subject, raw, owned=None) drops events whose remapped source
isn't in the owned set (silent debug log). Enabling roads511 alone
no longer accidentally consumes wzdx; enabling traffic alone no
longer consumes state_511_atis.
- start() subscribes per (subject, owned) tuple; per-subject log line
records the owned sources at startup.
- Removed roads511 from the "no Central mapping" warning loop now that
it has one.
Frontend (dashboard-frontend/src/pages/Environment.tsx):
- roads511 META entry: hasCentral false -> true, nativeOnly true -> false
(the FIRMS entry was already correct).
- Static bundle rebuilt via npm run build; old index-CfYlhn4e.js dropped,
new index-DCFmSeOM.js + index-DjhQa8Mv.css land under static/assets;
index.html updated to the new bundle hash.
Tests (tests/test_central_sub_adapter_routing.py, 8 new):
- roads511-only drops wzdx; emits state_511_atis as source=roads511.
- traffic+roads511 both central: wzdx -> traffic, state_511_atis -> roads511.
- firms-only drops wfigs_incidents; emits hotspots as source=firms.
- tomtom_incidents remaps to traffic.
- _subject_owned() shares central.traffic.> across {traffic, roads511}.
Orthogonal to v0.5.2's dispatcher guards: cooldown / dedup / staleness still
apply downstream of the consumer; the owned-sources filter operates one
layer up at message ingest. No changes to the dispatcher path.
Verified: pytest 318 passed (310 prior + 8 new routing tests); py_compile clean.
Safe-mode preserved -- no toggle enabled, no master enabled, no central enabled.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 02:16:54 +00:00
|
|
|
env.traffic.feed_source = "central"
|
|
|
|
|
env.roads511.feed_source = "central"
|
|
|
|
|
so = CentralConsumer(env, None)._subject_owned()
|
|
|
|
|
assert so.get("central.traffic.>") == {"traffic", "roads511"}
|