mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-10 17:04:45 +02:00
feat(central): v0.4 C.1 Central connector backend (no-op until adapter source flipped)
Adds the backend for sourcing environmental feeds from Central's NATS
JetStream firehose instead of (or alongside) meshai's native adapters.
Architecture is Matt-approved Option 3' (dedicated package + per-adapter
source switch surfaced on the existing Environmental config).
NO-OP POSTURE (intentional): every adapter defaults to feed_source="native"
and environmental.central.enabled defaults false, so on a stock config the
CentralConsumer starts and subscribes to nothing -- behavior is byte-for-byte
v0.3. Live env_feeds.yaml is unchanged on disk; an operator who touches
nothing sees no change. Flipping an adapter to central is Phase C.3; the
dashboard UI for it is Phase C.2.
What landed:
- meshai/central/ package (CentralConsumer): async start()/stop(), JetStream
durable subscribe to subjects derived from adapters with feed_source=central,
and _on_message -> normalize -> bus.emit. nats-py is lazy-imported only on
the connect path, so no-op boot has zero NATS dependency.
- Normalization (CloudEvents envelope -> Central Event -> upstream data):
source = inner Event.adapter
category = Central hierarchical string -> meshai flat, via a small
table-driven prefix map (map_category)
severity = 0|1->routine, 2->priority, 3|4->immediate, null->routine
lat/lon = geo.centroid, swapped from GeoJSON [lon,lat] -> (lat,lon)
group_key/inhibit = outer envelope id (dedup parity with native adapters)
expires/timestamp parsed from ISO-8601
Event.data = upstream payload verbatim (generic _enriched merge, preserved
as-is incl. hydro's extra usgs_site/usgs_stats bundles)
- Tombstone (`.removed.` subject or `:removed` id suffix) -> a "clear" Event
carrying the ORIGINAL group_key (`:removed` stripped) + data._central_tombstone
so the grouper/inhibitor lets the prior event lapse naturally.
- config.py: a `_SourcedFeed` mixin adds `feed_source: native|central`
(validated in __post_init__) to all 10 adapter configs; new
CentralConsumerConfig as environmental.central { enabled, url, durable,
connect_timeout }. Both ride the generic _dict_to_dataclass coercion, so
they are GUI-editable via PUT /config/environmental (Rule 17) -- frontend
fields come in C.2.
- env/store.py: each adapter is instantiated only when
enabled AND feed_source=="native"; a feed_source=central adapter is skipped
natively (debug-logged) so Central can own it without a duplicate.
- main.py: CentralConsumer constructed + started after start_pipeline(),
stopped in stop().
DEVIATION FROM SPEC (documented): the spec named the new field `source`, but
FIRMSConfig already has a `source` field (the satellite product,
"VIIRS_SNPP_NRT"). To avoid the collision the field is named **feed_source**
across all adapters. Everything else follows the spec.
NETWORKING: zero infra change required. The meshai container already reaches
the Central NATS server directly (TCP to 100.64.0.12:4222 OK) and resolves
central.echo6.mesh via the Phase 2.6.6 MagicDNS fix. No docker-compose edit;
default bridge works (LXC host masquerades to the Tailscale CGNAT range). The
lighter bridge-route / host-net / sidecar fallbacks were not needed.
Tests: tests/test_central_consumer.py (11) + tests/test_config_source_field.py
(6): no-op-when-native, subjects-when-central, source-gate skips native
instantiation, normalize+emit, _enriched preserved verbatim, tombstone->clear,
severity map (0-4/null), category map (>=4 strings), async _on_message
emits+acks, start() no-op without NATS, feed_source default/validate/reject/
dict-coercion. Full suite: 269 passed (was 253 + 16 new).
Verification: (A) no bare self._x() in consumer.py. (B) py_compile clean.
(C) 269 passed. (D) rebuilt prod -- 8 native adapters, pipeline started,
native nifc/traffic emissions still flowing, healthy, no errors, log
"CentralConsumer started; 0 subjects subscribed -- no adapters set to central".
(E) in-container synthetic _on_message injection normalized correctly
(usgs_quake/earthquake_event/immediate, centroid swapped, _enriched preserved)
and reached the bus; ephemeral, no config change to roll back.
C.2 (dashboard frontend for the feed_source switch + central connection) is next.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
20e0dec28a
commit
73c007d227
8 changed files with 500 additions and 20 deletions
6
meshai/central/__init__.py
Normal file
6
meshai/central/__init__.py
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
"""Central connector package (v0.4) — consumes Central's NATS JetStream
|
||||||
|
firehose and normalizes it into meshai pipeline Events."""
|
||||||
|
|
||||||
|
from meshai.central.consumer import CentralConsumer
|
||||||
|
|
||||||
|
__all__ = ["CentralConsumer"]
|
||||||
249
meshai/central/consumer.py
Normal file
249
meshai/central/consumer.py
Normal file
|
|
@ -0,0 +1,249 @@
|
||||||
|
"""Central connector — consumes Central's NATS JetStream firehose and
|
||||||
|
normalizes CloudEvents envelopes into meshai pipeline Events.
|
||||||
|
|
||||||
|
v0.4 C.1: backend only. The consumer subscribes only to subjects derived from
|
||||||
|
adapters whose config `source == "central"`. With every adapter defaulting to
|
||||||
|
`native`, it starts as a no-op (0 subscriptions) and introduces no NATS
|
||||||
|
dependency at boot. Flipping an adapter to central is Phase C.3.
|
||||||
|
|
||||||
|
Wire format (see Central CONSUMER-INTEGRATION guide, confirmed in v0.4 Phase A):
|
||||||
|
envelope (CloudEvents v1.0) -> envelope["data"] (Central Event)
|
||||||
|
-> Event["data"] (upstream payload, verbatim, incl `_enriched`)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from meshai.notifications.events import Event, make_event
|
||||||
|
|
||||||
|
logger = logging.getLogger("meshai.central.consumer")
|
||||||
|
|
||||||
|
|
||||||
|
# meshai adapter (env-config attr) -> Central subject filters it consumes.
|
||||||
|
# Adapters with no Central equivalent (avalanche, ducting, roads511) are absent;
|
||||||
|
# setting source=central on those subscribes to nothing (logged).
|
||||||
|
ADAPTER_SUBJECTS: dict[str, list[str]] = {
|
||||||
|
"nws": ["central.wx.>"],
|
||||||
|
"fires": ["central.fire.incident.>", "central.fire.perimeter.>"],
|
||||||
|
"firms": ["central.fire.hotspot.>"],
|
||||||
|
"usgs_quake": ["central.quake.>"],
|
||||||
|
"usgs": ["central.hydro.>"],
|
||||||
|
"swpc": ["central.space.>"],
|
||||||
|
"traffic": ["central.traffic.>"],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Central hierarchical category prefix -> meshai flat category.
|
||||||
|
# First matching prefix wins; order matters (most specific first).
|
||||||
|
_CATEGORY_MAP: list[tuple[str, str]] = [
|
||||||
|
("wx.alert", "weather_warning"),
|
||||||
|
("wx.", "weather_statement"),
|
||||||
|
("fire.hotspot", "wildfire_hotspot"),
|
||||||
|
("fire.incident", "wildfire_incident"),
|
||||||
|
("fire.perimeter", "wildfire_incident"),
|
||||||
|
("fire.", "wildfire_incident"),
|
||||||
|
("quake.", "earthquake_event"),
|
||||||
|
("hydro.", "stream_flow"),
|
||||||
|
("space.alert", "rf_propagation_alert"),
|
||||||
|
("space.kindex", "geomagnetic_storm"),
|
||||||
|
("space.proton", "solar_radiation_storm"),
|
||||||
|
("space.", "geomagnetic_storm"),
|
||||||
|
("disaster.", "disaster_event"),
|
||||||
|
("traffic_flow", "traffic_flow"),
|
||||||
|
("traffic_cameras", "traffic_camera"),
|
||||||
|
("traffic.", "traffic_congestion"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def map_category(central_category: str) -> str:
|
||||||
|
"""Map Central's hierarchical category string to a meshai flat category."""
|
||||||
|
cat = central_category or ""
|
||||||
|
for prefix, flat in _CATEGORY_MAP:
|
||||||
|
if cat.startswith(prefix):
|
||||||
|
return flat
|
||||||
|
return "other"
|
||||||
|
|
||||||
|
|
||||||
|
def map_severity(sev: Optional[int]) -> str:
|
||||||
|
"""Central int severity (0-4 / None) -> meshai severity string.
|
||||||
|
|
||||||
|
0|1 -> routine, 2 -> priority, 3|4 -> immediate, None -> routine.
|
||||||
|
"""
|
||||||
|
if sev is None:
|
||||||
|
return "routine"
|
||||||
|
try:
|
||||||
|
sev = int(sev)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return "routine"
|
||||||
|
if sev >= 3:
|
||||||
|
return "immediate"
|
||||||
|
if sev == 2:
|
||||||
|
return "priority"
|
||||||
|
return "routine"
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_time(s) -> Optional[float]:
|
||||||
|
"""Parse a Central ISO-8601 timestamp to epoch seconds."""
|
||||||
|
if not s or not isinstance(s, str):
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp()
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class CentralConsumer:
|
||||||
|
"""Subscribes to Central JetStream subjects and emits normalized Events."""
|
||||||
|
|
||||||
|
def __init__(self, env_config, event_bus):
|
||||||
|
"""Args:
|
||||||
|
env_config: the EnvironmentalConfig (provides .central + per-adapter .source)
|
||||||
|
event_bus: the pipeline EventBus to emit normalized Events onto
|
||||||
|
"""
|
||||||
|
self._env = env_config
|
||||||
|
self._central = getattr(env_config, "central", None)
|
||||||
|
self._bus = event_bus
|
||||||
|
self._nc = None
|
||||||
|
self._js = None
|
||||||
|
self._subs: list = []
|
||||||
|
|
||||||
|
# ---- subject derivation ----
|
||||||
|
def subjects(self) -> list[str]:
|
||||||
|
"""Subject filters for adapters whose source == 'central'."""
|
||||||
|
out: list[str] = []
|
||||||
|
for attr, subjects in ADAPTER_SUBJECTS.items():
|
||||||
|
cfg = getattr(self._env, attr, None)
|
||||||
|
if cfg is not None and getattr(cfg, "feed_source", "native") == "central":
|
||||||
|
out.extend(subjects)
|
||||||
|
# Warn about centralized adapters that have no Central mapping.
|
||||||
|
for attr in ("avalanche", "ducting", "roads511"):
|
||||||
|
cfg = getattr(self._env, attr, None)
|
||||||
|
if cfg is not None and getattr(cfg, "feed_source", "native") == "central":
|
||||||
|
logger.warning("Adapter %r set to source=central but Central has no "
|
||||||
|
"matching stream; nothing will be consumed for it.", attr)
|
||||||
|
return out
|
||||||
|
|
||||||
|
# ---- normalization ----
|
||||||
|
def _normalize(self, subject: str, envelope: dict) -> Optional[Event]:
|
||||||
|
"""CloudEvents envelope -> meshai Event (None if unusable)."""
|
||||||
|
inner = envelope.get("data") or {}
|
||||||
|
env_id = envelope.get("id") or inner.get("id")
|
||||||
|
if not env_id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
is_tombstone = (".removed." in (subject or "")) or str(env_id).endswith(":removed")
|
||||||
|
# The clear event shares the ORIGINAL event's group_key so the grouper/
|
||||||
|
# inhibitor lets the prior event lapse naturally.
|
||||||
|
group_key = str(env_id)
|
||||||
|
if is_tombstone:
|
||||||
|
group_key = re.sub(r":removed$", "", group_key)
|
||||||
|
|
||||||
|
cat_raw = inner.get("category") or envelope.get("centralcategory") or ""
|
||||||
|
|
||||||
|
geo = inner.get("geo") or {}
|
||||||
|
lat = lon = None
|
||||||
|
centroid = geo.get("centroid")
|
||||||
|
if isinstance(centroid, (list, tuple)) and len(centroid) >= 2:
|
||||||
|
lon, lat = centroid[0], centroid[1] # GeoJSON [lon, lat] -> (lat, lon)
|
||||||
|
|
||||||
|
# Preserve the upstream payload verbatim (incl. `_enriched`) in Event.data.
|
||||||
|
data = dict(inner.get("data") or {})
|
||||||
|
if is_tombstone:
|
||||||
|
data["_central_tombstone"] = True
|
||||||
|
|
||||||
|
title = (data.get("title") or data.get("headline")
|
||||||
|
or cat_raw or f"{inner.get('adapter', 'central')} event")
|
||||||
|
|
||||||
|
kwargs = dict(
|
||||||
|
title=str(title)[:200],
|
||||||
|
summary="",
|
||||||
|
lat=lat,
|
||||||
|
lon=lon,
|
||||||
|
region=geo.get("primary_region"),
|
||||||
|
regions=geo.get("regions") or [],
|
||||||
|
group_key=group_key,
|
||||||
|
inhibit_keys=[group_key],
|
||||||
|
data=data,
|
||||||
|
)
|
||||||
|
ts = _parse_time(inner.get("time"))
|
||||||
|
if ts is not None:
|
||||||
|
kwargs["timestamp"] = ts
|
||||||
|
exp = _parse_time(inner.get("expires"))
|
||||||
|
if exp is not None:
|
||||||
|
kwargs["expires"] = exp
|
||||||
|
|
||||||
|
return make_event(
|
||||||
|
source=inner.get("adapter") or "central",
|
||||||
|
category=map_category(cat_raw),
|
||||||
|
severity=map_severity(inner.get("severity")),
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle(self, subject: str, raw: bytes) -> Optional[Event]:
|
||||||
|
"""Normalize a raw message body and emit to the bus. Returns the Event."""
|
||||||
|
try:
|
||||||
|
envelope = json.loads(raw)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("CentralConsumer: bad JSON on %s", subject)
|
||||||
|
return None
|
||||||
|
event = self._normalize(subject, envelope)
|
||||||
|
if event is not None and self._bus is not None:
|
||||||
|
self._bus.emit(event)
|
||||||
|
return event
|
||||||
|
|
||||||
|
async def _on_message(self, msg) -> None:
|
||||||
|
"""JetStream callback: normalize + emit, then ack."""
|
||||||
|
try:
|
||||||
|
self._handle(msg.subject, msg.data)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("CentralConsumer: handler failed on %s",
|
||||||
|
getattr(msg, "subject", "?"))
|
||||||
|
finally:
|
||||||
|
ack = getattr(msg, "ack", None)
|
||||||
|
if ack is not None:
|
||||||
|
try:
|
||||||
|
await ack()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ---- lifecycle ----
|
||||||
|
async def start(self) -> None:
|
||||||
|
subjects = self.subjects()
|
||||||
|
if not subjects:
|
||||||
|
logger.info("CentralConsumer started; 0 subjects subscribed -- "
|
||||||
|
"no adapters set to central")
|
||||||
|
return
|
||||||
|
if self._central is None or not getattr(self._central, "enabled", False):
|
||||||
|
logger.warning("CentralConsumer: %d adapter(s) want source=central but "
|
||||||
|
"environmental.central.enabled is false; not subscribing: %s",
|
||||||
|
len(subjects), subjects)
|
||||||
|
return
|
||||||
|
|
||||||
|
import nats # lazy: no NATS dependency at boot unless actually consuming
|
||||||
|
self._nc = await nats.connect(
|
||||||
|
self._central.url,
|
||||||
|
connect_timeout=getattr(self._central, "connect_timeout", 10.0),
|
||||||
|
)
|
||||||
|
self._js = self._nc.jetstream()
|
||||||
|
for subj in subjects:
|
||||||
|
durable = self._central.durable + "-" + re.sub(r"[^a-z0-9]+", "_", subj.lower())
|
||||||
|
sub = await self._js.subscribe(subj, durable=durable, cb=self._on_message)
|
||||||
|
self._subs.append(sub)
|
||||||
|
logger.info("CentralConsumer started; %d subjects subscribed: %s",
|
||||||
|
len(subjects), subjects)
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
if self._nc is not None:
|
||||||
|
try:
|
||||||
|
await self._nc.drain()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
await self._nc.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._nc = None
|
||||||
|
self._js = None
|
||||||
|
self._subs = []
|
||||||
|
|
@ -297,7 +297,19 @@ class MeshIntelligenceConfig:
|
||||||
|
|
||||||
# Environmental feed configs
|
# Environmental feed configs
|
||||||
@dataclass
|
@dataclass
|
||||||
class NWSConfig:
|
class _SourcedFeed:
|
||||||
|
"""Mixin: an environmental feed is sourced 'native' (local adapter) or
|
||||||
|
'central' (Central NATS firehose). Default 'native' preserves v0.3 behavior."""
|
||||||
|
|
||||||
|
feed_source: str = "native"
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.feed_source not in ("native", "central"):
|
||||||
|
raise ValueError(f"feed_source must be 'native' or 'central', got {self.feed_source!r}")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NWSConfig(_SourcedFeed):
|
||||||
"""NWS weather alerts settings."""
|
"""NWS weather alerts settings."""
|
||||||
|
|
||||||
enabled: bool = True
|
enabled: bool = True
|
||||||
|
|
@ -316,7 +328,7 @@ class NWSConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SWPCConfig:
|
class SWPCConfig(_SourcedFeed):
|
||||||
"""NOAA Space Weather settings."""
|
"""NOAA Space Weather settings."""
|
||||||
|
|
||||||
enabled: bool = True
|
enabled: bool = True
|
||||||
|
|
@ -331,7 +343,7 @@ class SWPCConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DuctingConfig:
|
class DuctingConfig(_SourcedFeed):
|
||||||
"""Tropospheric ducting settings."""
|
"""Tropospheric ducting settings."""
|
||||||
|
|
||||||
enabled: bool = True
|
enabled: bool = True
|
||||||
|
|
@ -349,7 +361,7 @@ class DuctingConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class NICFFiresConfig:
|
class NICFFiresConfig(_SourcedFeed):
|
||||||
"""NIFC fire perimeters settings (Phase 2)."""
|
"""NIFC fire perimeters settings (Phase 2)."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
|
@ -358,7 +370,7 @@ class NICFFiresConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class AvalancheConfig:
|
class AvalancheConfig(_SourcedFeed):
|
||||||
"""Avalanche advisory settings (Phase 2)."""
|
"""Avalanche advisory settings (Phase 2)."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
|
@ -368,7 +380,7 @@ class AvalancheConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class USGSConfig:
|
class USGSConfig(_SourcedFeed):
|
||||||
"""USGS stream gauge settings."""
|
"""USGS stream gauge settings."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
|
@ -378,7 +390,7 @@ class USGSConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class USGSQuakeConfig:
|
class USGSQuakeConfig(_SourcedFeed):
|
||||||
"""USGS earthquake feed settings (Phase 2.14)."""
|
"""USGS earthquake feed settings (Phase 2.14)."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
|
@ -391,7 +403,7 @@ class USGSQuakeConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TomTomConfig:
|
class TomTomConfig(_SourcedFeed):
|
||||||
"""TomTom traffic flow settings."""
|
"""TomTom traffic flow settings."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
|
@ -401,7 +413,7 @@ class TomTomConfig:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Roads511Config:
|
class Roads511Config(_SourcedFeed):
|
||||||
"""511 road conditions settings."""
|
"""511 road conditions settings."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
|
@ -413,7 +425,7 @@ class Roads511Config:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class FIRMSConfig:
|
class FIRMSConfig(_SourcedFeed):
|
||||||
"""NASA FIRMS satellite fire hotspot settings."""
|
"""NASA FIRMS satellite fire hotspot settings."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
|
@ -426,6 +438,16 @@ class FIRMSConfig:
|
||||||
proximity_km: float = 10.0 # km to match known fire
|
proximity_km: float = 10.0 # km to match known fire
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CentralConsumerConfig:
|
||||||
|
"""Connection settings for the Central NATS JetStream consumer (v0.4)."""
|
||||||
|
|
||||||
|
enabled: bool = False
|
||||||
|
url: str = "nats://central.echo6.mesh:4222"
|
||||||
|
durable: str = "meshai-consumer"
|
||||||
|
connect_timeout: float = 10.0
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class EnvironmentalConfig:
|
class EnvironmentalConfig:
|
||||||
"""Environmental feeds settings."""
|
"""Environmental feeds settings."""
|
||||||
|
|
@ -442,6 +464,7 @@ class EnvironmentalConfig:
|
||||||
traffic: TomTomConfig = field(default_factory=TomTomConfig)
|
traffic: TomTomConfig = field(default_factory=TomTomConfig)
|
||||||
roads511: Roads511Config = field(default_factory=Roads511Config)
|
roads511: Roads511Config = field(default_factory=Roads511Config)
|
||||||
firms: FIRMSConfig = field(default_factory=FIRMSConfig)
|
firms: FIRMSConfig = field(default_factory=FIRMSConfig)
|
||||||
|
central: CentralConsumerConfig = field(default_factory=CentralConsumerConfig)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
|
||||||
24
meshai/env/store.py
vendored
24
meshai/env/store.py
vendored
|
|
@ -29,49 +29,53 @@ class EnvironmentalStore:
|
||||||
self._region_anchors = region_anchors or []
|
self._region_anchors = region_anchors or []
|
||||||
|
|
||||||
# Create adapter instances based on config
|
# Create adapter instances based on config
|
||||||
if config.nws.enabled:
|
if config.nws.enabled and config.nws.feed_source == "native":
|
||||||
from .nws import NWSAlertsAdapter
|
from .nws import NWSAlertsAdapter
|
||||||
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
|
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
|
||||||
|
|
||||||
if config.swpc.enabled:
|
if config.swpc.enabled and config.swpc.feed_source == "native":
|
||||||
from .swpc import SWPCAdapter
|
from .swpc import SWPCAdapter
|
||||||
self._adapters["swpc"] = SWPCAdapter(config.swpc)
|
self._adapters["swpc"] = SWPCAdapter(config.swpc)
|
||||||
|
|
||||||
if config.ducting.enabled:
|
if config.ducting.enabled and config.ducting.feed_source == "native":
|
||||||
from .ducting import DuctingAdapter
|
from .ducting import DuctingAdapter
|
||||||
self._adapters["ducting"] = DuctingAdapter(config.ducting)
|
self._adapters["ducting"] = DuctingAdapter(config.ducting)
|
||||||
|
|
||||||
if config.fires.enabled:
|
if config.fires.enabled and config.fires.feed_source == "native":
|
||||||
from .fires import NICFFiresAdapter
|
from .fires import NICFFiresAdapter
|
||||||
self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors)
|
self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors)
|
||||||
|
|
||||||
if config.avalanche.enabled:
|
if config.avalanche.enabled and config.avalanche.feed_source == "native":
|
||||||
from .avalanche import AvalancheAdapter
|
from .avalanche import AvalancheAdapter
|
||||||
self._adapters["avalanche"] = AvalancheAdapter(config.avalanche)
|
self._adapters["avalanche"] = AvalancheAdapter(config.avalanche)
|
||||||
|
|
||||||
if config.usgs.enabled:
|
if config.usgs.enabled and config.usgs.feed_source == "native":
|
||||||
from .usgs import USGSStreamsAdapter
|
from .usgs import USGSStreamsAdapter
|
||||||
self._adapters["usgs"] = USGSStreamsAdapter(config.usgs)
|
self._adapters["usgs"] = USGSStreamsAdapter(config.usgs)
|
||||||
|
|
||||||
if config.usgs_quake.enabled:
|
if config.usgs_quake.enabled and config.usgs_quake.feed_source == "native":
|
||||||
from .usgs_quake import USGSQuakeAdapter
|
from .usgs_quake import USGSQuakeAdapter
|
||||||
self._adapters["usgs_quake"] = USGSQuakeAdapter(config.usgs_quake)
|
self._adapters["usgs_quake"] = USGSQuakeAdapter(config.usgs_quake)
|
||||||
|
|
||||||
if config.traffic.enabled:
|
if config.traffic.enabled and config.traffic.feed_source == "native":
|
||||||
from .traffic import TomTomTrafficAdapter
|
from .traffic import TomTomTrafficAdapter
|
||||||
self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic)
|
self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic)
|
||||||
|
|
||||||
if config.roads511.enabled:
|
if config.roads511.enabled and config.roads511.feed_source == "native":
|
||||||
from .roads511 import Roads511Adapter
|
from .roads511 import Roads511Adapter
|
||||||
self._adapters["roads511"] = Roads511Adapter(config.roads511)
|
self._adapters["roads511"] = Roads511Adapter(config.roads511)
|
||||||
|
|
||||||
# FIRMS needs reference to NIFC adapter for cross-referencing
|
# FIRMS needs reference to NIFC adapter for cross-referencing
|
||||||
if config.firms.enabled:
|
if config.firms.enabled and config.firms.feed_source == "native":
|
||||||
from .firms import FIRMSAdapter
|
from .firms import FIRMSAdapter
|
||||||
fires_adapter = self._adapters.get("nifc")
|
fires_adapter = self._adapters.get("nifc")
|
||||||
self._firms = FIRMSAdapter(config.firms, self._region_anchors, fires_adapter)
|
self._firms = FIRMSAdapter(config.firms, self._region_anchors, fires_adapter)
|
||||||
self._adapters["firms"] = self._firms
|
self._adapters["firms"] = self._firms
|
||||||
|
|
||||||
|
_central = [n for n in ("nws", "swpc", "ducting", "fires", "avalanche", "usgs", "usgs_quake", "traffic", "roads511", "firms")
|
||||||
|
if getattr(getattr(config, n, None), "feed_source", "native") == "central"]
|
||||||
|
if _central:
|
||||||
|
logger.debug("Adapters sourced from Central (native skipped): %s", _central)
|
||||||
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
|
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
|
||||||
|
|
||||||
def refresh(self) -> bool:
|
def refresh(self) -> bool:
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@ class MeshAI:
|
||||||
self.event_bus = None # Notification pipeline EventBus (v0.3)
|
self.event_bus = None # Notification pipeline EventBus (v0.3)
|
||||||
self._pipeline_scheduler = None # DigestScheduler from start_pipeline()
|
self._pipeline_scheduler = None # DigestScheduler from start_pipeline()
|
||||||
self.env_store = None # Environmental feeds store
|
self.env_store = None # Environmental feeds store
|
||||||
|
self._central_consumer = None # Central NATS consumer (v0.4)
|
||||||
self._last_sub_check: float = 0.0
|
self._last_sub_check: float = 0.0
|
||||||
self.router: Optional[MessageRouter] = None
|
self.router: Optional[MessageRouter] = None
|
||||||
self.responder: Optional[Responder] = None
|
self.responder: Optional[Responder] = None
|
||||||
|
|
@ -90,6 +91,10 @@ class MeshAI:
|
||||||
self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config)
|
self._pipeline_scheduler = await start_pipeline(self.event_bus, self.config)
|
||||||
logger.info("Notification pipeline started")
|
logger.info("Notification pipeline started")
|
||||||
|
|
||||||
|
from .central.consumer import CentralConsumer
|
||||||
|
self._central_consumer = CentralConsumer(self.config.environmental, self.event_bus)
|
||||||
|
await self._central_consumer.start()
|
||||||
|
|
||||||
logger.info("MeshAI started successfully")
|
logger.info("MeshAI started successfully")
|
||||||
|
|
||||||
# Keep running
|
# Keep running
|
||||||
|
|
@ -183,6 +188,9 @@ class MeshAI:
|
||||||
from .notifications.pipeline import stop_pipeline
|
from .notifications.pipeline import stop_pipeline
|
||||||
await stop_pipeline(self._pipeline_scheduler)
|
await stop_pipeline(self._pipeline_scheduler)
|
||||||
|
|
||||||
|
if self._central_consumer is not None:
|
||||||
|
await self._central_consumer.stop()
|
||||||
|
|
||||||
if self.connector:
|
if self.connector:
|
||||||
self.connector.disconnect()
|
self.connector.disconnect()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ dependencies = [
|
||||||
"fastapi>=0.110.0",
|
"fastapi>=0.110.0",
|
||||||
"uvicorn[standard]>=0.27.0",
|
"uvicorn[standard]>=0.27.0",
|
||||||
"aiomqtt>=2.0.0",
|
"aiomqtt>=2.0.0",
|
||||||
|
"nats-py>=2.0.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|
|
||||||
139
tests/test_central_consumer.py
Normal file
139
tests/test_central_consumer.py
Normal file
|
|
@ -0,0 +1,139 @@
|
||||||
|
"""v0.4 C.1: Central connector backend — normalization, lifecycle, source gate."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from meshai.config import EnvironmentalConfig
|
||||||
|
from meshai.central.consumer import CentralConsumer, map_category, map_severity
|
||||||
|
from meshai.notifications.pipeline.bus import EventBus
|
||||||
|
|
||||||
|
|
||||||
|
def make_consumer():
|
||||||
|
env = EnvironmentalConfig()
|
||||||
|
bus = EventBus()
|
||||||
|
rec = []
|
||||||
|
bus.subscribe(rec.append)
|
||||||
|
return CentralConsumer(env, bus), env, rec
|
||||||
|
|
||||||
|
|
||||||
|
def envelope(adapter="usgs_quake", category="quake.event", severity=2,
|
||||||
|
eid="us6000abcd", centroid=(-114.5, 42.6), upstream=None,
|
||||||
|
time="2026-05-27T12:00:00Z", expires=None):
|
||||||
|
return {
|
||||||
|
"id": eid, "source": "central.echo6.co",
|
||||||
|
"type": f"central.{category}.v1", "time": time,
|
||||||
|
"centralcategory": category, "centralseverity": severity,
|
||||||
|
"specversion": "1.0", "datacontenttype": "application/json",
|
||||||
|
"data": {
|
||||||
|
"id": eid, "adapter": adapter, "category": category,
|
||||||
|
"time": time, "expires": expires, "severity": severity,
|
||||||
|
"geo": {"centroid": list(centroid), "bbox": None,
|
||||||
|
"regions": ["US-ID"], "primary_region": "US-ID"},
|
||||||
|
"data": upstream if upstream is not None else {"magnitude": 4.2, "place": "near Twin Falls"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class FakeMsg:
|
||||||
|
def __init__(self, subject, env):
|
||||||
|
self.subject = subject
|
||||||
|
self.data = json.dumps(env).encode()
|
||||||
|
self.acked = False
|
||||||
|
|
||||||
|
async def ack(self):
|
||||||
|
self.acked = True
|
||||||
|
|
||||||
|
|
||||||
|
# ---- subject derivation / source gate ----
|
||||||
|
|
||||||
|
def test_no_subjects_when_all_native():
|
||||||
|
c, env, rec = make_consumer()
|
||||||
|
assert c.subjects() == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_subjects_when_central():
|
||||||
|
c, env, rec = make_consumer()
|
||||||
|
env.usgs_quake.feed_source = "central"
|
||||||
|
assert "central.quake.>" in c.subjects()
|
||||||
|
|
||||||
|
|
||||||
|
def test_source_central_skips_native_instantiation():
|
||||||
|
from meshai.env.store import EnvironmentalStore
|
||||||
|
env = EnvironmentalConfig()
|
||||||
|
env.enabled = True
|
||||||
|
env.usgs_quake.enabled = True
|
||||||
|
env.usgs_quake.feed_source = "central" # should be skipped natively
|
||||||
|
env.nws.enabled = True # native -> present
|
||||||
|
store = EnvironmentalStore(config=env, region_anchors=[], event_bus=None)
|
||||||
|
assert "usgs_quake" not in store._adapters
|
||||||
|
assert "nws" in store._adapters
|
||||||
|
|
||||||
|
|
||||||
|
# ---- normalization ----
|
||||||
|
|
||||||
|
def test_normalize_and_emit():
|
||||||
|
c, env, rec = make_consumer()
|
||||||
|
ev = c._handle("central.quake.event.moderate", json.dumps(envelope()).encode())
|
||||||
|
assert ev is not None
|
||||||
|
assert len(rec) == 1
|
||||||
|
e = rec[0]
|
||||||
|
assert e.source == "usgs_quake"
|
||||||
|
assert e.category == "earthquake_event"
|
||||||
|
assert e.severity == "priority" # central severity 2
|
||||||
|
assert e.lat == 42.6 and e.lon == -114.5 # [lon,lat] -> (lat,lon)
|
||||||
|
assert e.group_key == "us6000abcd"
|
||||||
|
assert e.region == "US-ID"
|
||||||
|
assert e.data.get("magnitude") == 4.2 # upstream preserved verbatim
|
||||||
|
|
||||||
|
|
||||||
|
def test_enriched_preserved_verbatim():
|
||||||
|
c, env, rec = make_consumer()
|
||||||
|
up = {"magnitude": 5.1, "_enriched": {"geocoder": {"state": "Idaho"}, "usgs_stats": {"x": 1}}}
|
||||||
|
ev = c._handle("central.quake.event.strong", json.dumps(envelope(severity=4, upstream=up)).encode())
|
||||||
|
assert ev.severity == "immediate"
|
||||||
|
assert ev.data["_enriched"]["geocoder"]["state"] == "Idaho"
|
||||||
|
assert ev.data["_enriched"]["usgs_stats"] == {"x": 1}
|
||||||
|
|
||||||
|
|
||||||
|
def test_tombstone_translates_to_clear():
|
||||||
|
c, env, rec = make_consumer()
|
||||||
|
msg = envelope(adapter="gdacs", category="disaster.fl.removed", severity=0, eid="FL1103885:removed")
|
||||||
|
ev = c._handle("central.disaster.fl.removed.austria", json.dumps(msg).encode())
|
||||||
|
assert ev is not None
|
||||||
|
assert ev.group_key == "FL1103885" # ':removed' stripped -> matches original
|
||||||
|
assert ev.data.get("_central_tombstone") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_severity_mapping():
|
||||||
|
assert map_severity(0) == "routine"
|
||||||
|
assert map_severity(1) == "routine"
|
||||||
|
assert map_severity(2) == "priority"
|
||||||
|
assert map_severity(3) == "immediate"
|
||||||
|
assert map_severity(4) == "immediate"
|
||||||
|
assert map_severity(None) == "routine"
|
||||||
|
|
||||||
|
|
||||||
|
def test_category_mapping():
|
||||||
|
assert map_category("wx.alert.severe_thunderstorm_warning") == "weather_warning"
|
||||||
|
assert map_category("quake.event") == "earthquake_event"
|
||||||
|
assert map_category("fire.hotspot.viirs_noaa20.high") == "wildfire_hotspot"
|
||||||
|
assert map_category("hydro.00060.usgs.06901250") == "stream_flow"
|
||||||
|
|
||||||
|
|
||||||
|
# ---- async callback path ----
|
||||||
|
|
||||||
|
def test_on_message_emits_and_acks():
|
||||||
|
c, env, rec = make_consumer()
|
||||||
|
msg = FakeMsg("central.quake.event.moderate", envelope())
|
||||||
|
asyncio.run(c._on_message(msg))
|
||||||
|
assert msg.acked is True
|
||||||
|
assert len(rec) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_start_no_op_when_all_native():
|
||||||
|
"""start() is a no-op (no NATS connect) when no adapter is central."""
|
||||||
|
c, env, rec = make_consumer()
|
||||||
|
asyncio.run(c.start()) # must not raise / must not require NATS
|
||||||
|
assert c._nc is None
|
||||||
50
tests/test_config_source_field.py
Normal file
50
tests/test_config_source_field.py
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
"""v0.4 C.1: per-adapter `source` field + CentralConsumerConfig."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from meshai.config import (
|
||||||
|
NWSConfig, FIRMSConfig, USGSQuakeConfig,
|
||||||
|
EnvironmentalConfig, CentralConsumerConfig,
|
||||||
|
)
|
||||||
|
|
||||||
|
_ADAPTERS = ("nws", "swpc", "ducting", "fires", "avalanche",
|
||||||
|
"usgs", "usgs_quake", "traffic", "roads511", "firms")
|
||||||
|
|
||||||
|
|
||||||
|
def test_source_defaults_native():
|
||||||
|
assert NWSConfig().feed_source == "native"
|
||||||
|
assert FIRMSConfig().feed_source == "native"
|
||||||
|
assert USGSQuakeConfig().feed_source == "native"
|
||||||
|
|
||||||
|
|
||||||
|
def test_all_adapters_default_native():
|
||||||
|
env = EnvironmentalConfig()
|
||||||
|
for attr in _ADAPTERS:
|
||||||
|
assert getattr(env, attr).feed_source == "native", attr
|
||||||
|
|
||||||
|
|
||||||
|
def test_source_central_validates():
|
||||||
|
assert NWSConfig(feed_source="central").feed_source == "central"
|
||||||
|
assert USGSQuakeConfig(feed_source="central").feed_source == "central"
|
||||||
|
|
||||||
|
|
||||||
|
def test_source_garbage_rejects():
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
NWSConfig(feed_source="garbage")
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
FIRMSConfig(feed_source="")
|
||||||
|
|
||||||
|
|
||||||
|
def test_environmental_has_central_default():
|
||||||
|
env = EnvironmentalConfig()
|
||||||
|
assert isinstance(env.central, CentralConsumerConfig)
|
||||||
|
assert env.central.enabled is False
|
||||||
|
assert env.central.url.startswith("nats://")
|
||||||
|
|
||||||
|
|
||||||
|
def test_source_field_survives_dict_coercion():
|
||||||
|
"""A `source` in yaml/dict is coerced onto the adapter config."""
|
||||||
|
from meshai.config import Config, _dict_to_dataclass
|
||||||
|
cfg = _dict_to_dataclass(Config, {"environmental": {"usgs_quake": {"enabled": True, "feed_source": "central"}}})
|
||||||
|
assert cfg.environmental.usgs_quake.feed_source == "central"
|
||||||
|
assert cfg.environmental.nws.feed_source == "native" # untouched default
|
||||||
Loading…
Add table
Add a link
Reference in a new issue