feat(central): v0.5.3 -- roads511 + FIRMS central feeds with sub-adapter routing (recover stashed v0.5.1)

C.2 (v0.4) marked roads511 and FIRMS native-only in the dashboard despite the
Central event bus shipping both stream families. This recovers the v0.5.1
work paused before the v0.5.2 spam fix and lands it cleanly on top of v0.5.2.

Backend (meshai/central/consumer.py):
- ADAPTER_SUBJECTS: roads511 now subscribes to central.traffic.> (shared
  with the existing traffic adapter); firms widened from
  central.fire.hotspot.> to central.fire.>.
- CENTRAL_ADAPTER_TO_SOURCE: three new sub-adapter remaps so the inner
  envelope.adapter routes correctly inside a shared subject --
  tomtom_incidents -> traffic, state_511_atis -> roads511, firms -> firms.
- New _subject_owned() -> dict[subject_filter, set[meshai_source]]:
  builds the per-subject ownership set so a single central.traffic.>
  subscription can be owned by {traffic, roads511} simultaneously.
- subjects() now derives from _subject_owned().keys().
- _make_cb(owned) binds the owned set per-subscription; _on_message
  forwards it.
- _handle(subject, raw, owned=None) drops events whose remapped source
  isn't in the owned set (silent debug log). Enabling roads511 alone
  no longer accidentally consumes wzdx; enabling traffic alone no
  longer consumes state_511_atis.
- start() subscribes per (subject, owned) tuple; per-subject log line
  records the owned sources at startup.
- Removed roads511 from the "no Central mapping" warning loop now that
  it has one.

Frontend (dashboard-frontend/src/pages/Environment.tsx):
- roads511 META entry: hasCentral false -> true, nativeOnly true -> false
  (the FIRMS entry was already correct).
- Static bundle rebuilt via npm run build; old index-CfYlhn4e.js dropped,
  new index-DCFmSeOM.js + index-DjhQa8Mv.css land under static/assets;
  index.html updated to the new bundle hash.

Tests (tests/test_central_sub_adapter_routing.py, 8 new):
- roads511-only drops wzdx; emits state_511_atis as source=roads511.
- traffic+roads511 both central: wzdx -> traffic, state_511_atis -> roads511.
- firms-only drops wfigs_incidents; emits hotspots as source=firms.
- tomtom_incidents remaps to traffic.
- _subject_owned() shares central.traffic.> across {traffic, roads511}.

Orthogonal to v0.5.2's dispatcher guards: cooldown / dedup / staleness still
apply downstream of the consumer; the owned-sources filter operates one
layer up at message ingest. No changes to the dispatcher path.

Verified: pytest 318 passed (310 prior + 8 new routing tests); py_compile clean.
Safe-mode preserved -- no toggle enabled, no master enabled, no central enabled.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
matt+claude 2026-06-04 02:16:54 +00:00
commit ded2156024
5 changed files with 128 additions and 24 deletions

View file

@ -165,7 +165,7 @@ const META: Record<AdapterKey, AdapterMeta> = {
swpc: { label: 'NOAA Space Weather (SWPC)', subtitle: 'Solar indices, geomagnetic storms', health: 'swpc', hasCentral: true, nativeOnly: false, hasKey: true },
ducting: { label: 'Tropospheric Ducting', subtitle: 'VHF/UHF extended-range conditions', health: 'ducting', hasCentral: false, nativeOnly: true, hasKey: true },
traffic: { label: 'TomTom Traffic', subtitle: 'Traffic flow on monitored corridors', health: 'traffic', hasCentral: true, nativeOnly: false, hasKey: true },
roads511: { label: '511 Road Conditions', subtitle: 'State DOT road events and closures', health: 'roads511', hasCentral: false, nativeOnly: true, hasKey: false },
roads511: { label: '511 Road Conditions', subtitle: 'State DOT road events and closures', health: 'roads511', hasCentral: true, nativeOnly: false, hasKey: false },
usgs_quake: { label: 'USGS Earthquakes', subtitle: 'Seismic events from the USGS feed', health: 'usgs_quake', hasCentral: true, nativeOnly: false, hasKey: true },
usgs: { label: 'USGS Stream Gauges', subtitle: 'River and stream water levels', health: 'usgs', hasCentral: true, nativeOnly: false, hasKey: true },
avalanche: { label: 'Avalanche Advisories', subtitle: 'Backcountry avalanche danger ratings', health: 'avalanche', hasCentral: false, nativeOnly: true, hasKey: true },

View file

@ -39,11 +39,12 @@ def consumer_config():
ADAPTER_SUBJECTS: dict[str, list[str]] = {
"nws": ["central.wx.>"],
"fires": ["central.fire.incident.>", "central.fire.perimeter.>"],
"firms": ["central.fire.hotspot.>"],
"firms": ["central.fire.>"],
"usgs_quake": ["central.quake.>"],
"usgs": ["central.hydro.>"],
"swpc": ["central.space.>"],
"traffic": ["central.traffic.>"],
"roads511": ["central.traffic.>"],
}
# Bridge between Central's adapter taxonomy and meshai's family-tab source names.
@ -59,6 +60,9 @@ CENTRAL_ADAPTER_TO_SOURCE: dict[str, str] = {
"swpc_kindex": "swpc",
"swpc_protons": "swpc",
"wzdx": "traffic",
"tomtom_incidents": "traffic",
"state_511_atis": "roads511",
"firms": "firms",
}
# Central hierarchical category prefix -> meshai flat category.
@ -160,20 +164,32 @@ class CentralConsumer:
self._subs: list = []
# ---- subject derivation ----
def subjects(self) -> list[str]:
"""Subject filters for adapters whose source == 'central'."""
out: list[str] = []
def _subject_owned(self) -> dict:
"""Map each Central subject filter -> set of meshai source names (adapter
attrs) that are feed_source=central and consume it. A shared subject
(central.traffic.> for both traffic and roads511) carries multiple owned
sources; _handle drops events whose remapped source isn't in the set."""
owned: dict = {}
for attr, subjects in ADAPTER_SUBJECTS.items():
cfg = getattr(self._env, attr, None)
if cfg is not None and getattr(cfg, "feed_source", "native") == "central":
out.extend(subjects)
# Warn about centralized adapters that have no Central mapping.
for attr in ("avalanche", "ducting", "roads511"):
for subj in subjects:
owned.setdefault(subj, set()).add(attr)
for attr in ("avalanche", "ducting"):
cfg = getattr(self._env, attr, None)
if cfg is not None and getattr(cfg, "feed_source", "native") == "central":
logger.warning("Adapter %r set to source=central but Central has no "
"matching stream; nothing will be consumed for it.", attr)
return out
return owned
def subjects(self) -> list[str]:
"""Unique Central subject filters for adapters set to central."""
return sorted(self._subject_owned().keys())
def _make_cb(self, owned):
async def _cb(msg):
await self._on_message(msg, owned)
return _cb
# ---- normalization ----
def _normalize(self, subject: str, envelope: dict) -> Optional[Event]:
@ -238,22 +254,32 @@ class CentralConsumer:
**kwargs,
)
def _handle(self, subject: str, raw: bytes) -> Optional[Event]:
"""Normalize a raw message body and emit to the bus. Returns the Event."""
def _handle(self, subject: str, raw: bytes, owned=None) -> Optional[Event]:
"""Normalize a raw message body and emit to the bus. Returns the Event.
owned: set of meshai source names this subscription may emit (sub-adapter
routing for shared subjects); None = no filtering.
"""
try:
envelope = json.loads(raw)
except Exception:
logger.exception("CentralConsumer: bad JSON on %s", subject)
return None
event = self._normalize(subject, envelope)
if event is not None and self._bus is not None:
if event is None:
return None
if owned is not None and event.source not in owned:
logger.debug("CentralConsumer: dropping %s source=%s -- not owned by "
"subscription %s", subject, event.source, sorted(owned))
return None
if self._bus is not None:
self._bus.emit(event)
return event
async def _on_message(self, msg) -> None:
async def _on_message(self, msg, owned=None) -> None:
"""JetStream callback: normalize + emit, then ack."""
try:
self._handle(msg.subject, msg.data)
self._handle(msg.subject, msg.data, owned)
except Exception:
logger.exception("CentralConsumer: handler failed on %s",
getattr(msg, "subject", "?"))
@ -267,15 +293,15 @@ class CentralConsumer:
# ---- lifecycle ----
async def start(self) -> None:
subjects = self.subjects()
if not subjects:
subject_owned = self._subject_owned()
if not subject_owned:
logger.info("CentralConsumer started; 0 subjects subscribed -- "
"no adapters set to central")
return
if self._central is None or not getattr(self._central, "enabled", False):
logger.warning("CentralConsumer: %d adapter(s) want source=central but "
logger.warning("CentralConsumer: adapter(s) want source=central but "
"environmental.central.enabled is false; not subscribing: %s",
len(subjects), subjects)
sorted(subject_owned))
return
import nats # lazy: no NATS dependency at boot unless actually consuming
@ -284,13 +310,13 @@ class CentralConsumer:
connect_timeout=getattr(self._central, "connect_timeout", 10.0),
)
self._js = self._nc.jetstream()
for subj in subjects:
for subj, owned in subject_owned.items():
durable = self._central.durable + "-" + re.sub(r"[^a-z0-9]+", "_", subj.lower())
sub = await self._js.subscribe(
subj, durable=durable, cb=self._on_message, config=consumer_config())
subj, durable=durable, cb=self._make_cb(owned), config=consumer_config())
self._subs.append(sub)
logger.info("CentralConsumer started; %d subjects subscribed: %s",
len(subjects), subjects)
logger.info("CentralConsumer subscribed %s owned-sources=%s", subj, sorted(owned))
logger.info("CentralConsumer started; %d subjects subscribed", len(subject_owned))
async def stop(self) -> None:
if self._nc is not None:

View file

@ -8,7 +8,7 @@
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&display=swap" rel="stylesheet">
<script type="module" crossorigin src="/assets/index-CfYlhn4e.js"></script>
<script type="module" crossorigin src="/assets/index-DCFmSeOM.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-DjhQa8Mv.css">
</head>
<body>

View file

@ -0,0 +1,78 @@
"""v0.5.1: sub-adapter (owned-sources) routing for shared Central subjects."""
import json
from meshai.config import EnvironmentalConfig
from meshai.central.consumer import CentralConsumer
from meshai.notifications.pipeline.bus import EventBus
def _envelope(adapter, category="x.y", eid="e1"):
return {"id": eid, "data": {
"id": eid, "adapter": adapter, "category": category,
"time": "2026-05-28T00:00:00Z", "severity": 1,
"geo": {"centroid": [-114.0, 42.0], "primary_region": "US-ID", "regions": ["US-ID"]},
"data": {}}}
def _route(central, adapter, subject, category="x.y"):
"""Simulate a message arriving on the subscription that matches `subject`,
with that subscription's owned-sources, and return the emitted Event (or None)."""
env = EnvironmentalConfig()
for a in central:
getattr(env, a).feed_source = "central"
rec = []
bus = EventBus(); bus.subscribe(rec.append)
c = CentralConsumer(env, bus)
so = c._subject_owned()
owned = None
for filt, o in so.items():
prefix = filt[:-1] if filt.endswith(">") else filt
if subject == filt or subject.startswith(prefix):
owned = o
break
ev = c._handle(subject, json.dumps(_envelope(adapter, category)).encode(), owned)
return ev, rec
def test_roads511_only_drops_wzdx():
ev, rec = _route(["roads511"], "wzdx", "central.traffic.work_zone.ok")
assert ev is None and rec == []
def test_roads511_only_emits_state_511_atis():
ev, rec = _route(["roads511"], "state_511_atis", "central.traffic.event.id.1")
assert ev is not None and ev.source == "roads511" and len(rec) == 1
def test_both_central_wzdx_routes_to_traffic():
ev, rec = _route(["traffic", "roads511"], "wzdx", "central.traffic.work_zone.ok")
assert ev is not None and ev.source == "traffic"
def test_both_central_state511_routes_to_roads511():
ev, rec = _route(["traffic", "roads511"], "state_511_atis", "central.traffic.event.id.1")
assert ev is not None and ev.source == "roads511"
def test_firms_only_drops_wfigs():
ev, rec = _route(["firms"], "wfigs_incidents", "central.fire.incident.mt.x")
assert ev is None and rec == []
def test_firms_only_emits_firms():
ev, rec = _route(["firms"], "firms", "central.fire.hotspot.viirs_noaa20.high")
assert ev is not None and ev.source == "firms" and len(rec) == 1
def test_tomtom_incidents_remaps_to_traffic():
ev, rec = _route(["traffic"], "tomtom_incidents", "central.traffic.incident.x")
assert ev is not None and ev.source == "traffic"
def test_subject_owned_shares_traffic_subject():
env = EnvironmentalConfig()
env.traffic.feed_source = "central"
env.roads511.feed_source = "central"
so = CentralConsumer(env, None)._subject_owned()
assert so.get("central.traffic.>") == {"traffic", "roads511"}