fix: harden EnvironmentalStore adapter init — isolate failures

- Wrap each adapter registration in try/except via _register_adapter()
- Track failed adapters in _failed_adapters dict with error message
- Add get_status() method for /api/env/status to report failed adapters
- No single adapter failure can abort loading of remaining adapters

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson (via Claude) 2026-06-10 20:25:50 +00:00
commit b36223146f

112
meshai/env/store.py vendored
View file

@ -21,6 +21,7 @@ class EnvironmentalStore:
event_bus: Optional["EventBus"] = None,
):
self._adapters = {} # name -> adapter instance
self._failed_adapters = {} # name -> last_error string
self._events = {} # (source, event_id) -> event dict
self._event_bus = event_bus # Pipeline EventBus for emission
self._swpc_status = {} # Kp/SFI/scales snapshot
@ -28,56 +29,60 @@ class EnvironmentalStore:
self._mesh_zones = config.nws_zones or []
self._region_anchors = region_anchors or []
# Create adapter instances based on config
if config.nws.enabled and config.nws.feed_source == "native":
from .nws import NWSAlertsAdapter
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
if config.swpc.enabled and config.swpc.feed_source == "native":
from .swpc import SWPCAdapter
self._adapters["swpc"] = SWPCAdapter(config.swpc)
if config.ducting.enabled and config.ducting.feed_source == "native":
from .ducting import DuctingAdapter
self._adapters["ducting"] = DuctingAdapter(config.ducting)
if config.fires.enabled and config.fires.feed_source == "native":
from .fires import NICFFiresAdapter
self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors)
if config.avalanche.enabled and config.avalanche.feed_source == "native":
from .avalanche import AvalancheAdapter
self._adapters["avalanche"] = AvalancheAdapter(config.avalanche)
if config.usgs.enabled and config.usgs.feed_source == "native":
from .usgs import USGSStreamsAdapter
self._adapters["usgs"] = USGSStreamsAdapter(config.usgs)
if config.usgs_quake.enabled and config.usgs_quake.feed_source == "native":
from .usgs_quake import USGSQuakeAdapter
self._adapters["usgs_quake"] = USGSQuakeAdapter(config.usgs_quake)
if config.traffic.enabled and config.traffic.feed_source == "native":
from .traffic import TomTomTrafficAdapter
self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic)
if config.roads511.enabled and config.roads511.feed_source == "native":
from .roads511 import Roads511Adapter
self._adapters["roads511"] = Roads511Adapter(config.roads511)
# Create adapter instances with error isolation
self._register_adapter("nws", config.nws, ".nws", "NWSAlertsAdapter",
lambda cfg: (cfg,))
self._register_adapter("swpc", config.swpc, ".swpc", "SWPCAdapter",
lambda cfg: (cfg,))
self._register_adapter("ducting", config.ducting, ".ducting", "DuctingAdapter",
lambda cfg: (cfg,))
self._register_adapter("nifc", config.fires, ".fires", "NICFFiresAdapter",
lambda cfg: (cfg, self._region_anchors))
self._register_adapter("avalanche", config.avalanche, ".avalanche", "AvalancheAdapter",
lambda cfg: (cfg,))
self._register_adapter("usgs", config.usgs, ".usgs", "USGSStreamsAdapter",
lambda cfg: (cfg,))
self._register_adapter("usgs_quake", config.usgs_quake, ".usgs_quake", "USGSQuakeAdapter",
lambda cfg: (cfg,))
self._register_adapter("traffic", config.traffic, ".traffic", "TomTomTrafficAdapter",
lambda cfg: (cfg,))
self._register_adapter("roads511", config.roads511, ".roads511", "Roads511Adapter",
lambda cfg: (cfg,))
# FIRMS needs reference to NIFC adapter for cross-referencing
if config.firms.enabled and config.firms.feed_source == "native":
try:
from .firms import FIRMSAdapter
fires_adapter = self._adapters.get("nifc")
self._firms = FIRMSAdapter(config.firms, self._region_anchors, fires_adapter)
self._adapters["firms"] = self._firms
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
logger.warning("Failed to initialize firms adapter: %s", err_msg)
self._failed_adapters["firms"] = err_msg
_central = [n for n in ("nws", "swpc", "ducting", "fires", "avalanche", "usgs", "usgs_quake", "traffic", "roads511", "firms")
if getattr(getattr(config, n, None), "feed_source", "native") == "central"]
if _central:
logger.debug("Adapters sourced from Central (native skipped): %s", _central)
if self._failed_adapters:
logger.warning("Failed adapters: %s", list(self._failed_adapters.keys()))
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
def _register_adapter(self, name: str, cfg, module_path: str, class_name: str, args_fn):
"""Register a single adapter with error isolation."""
if not cfg.enabled or cfg.feed_source != "native":
return
try:
module = __import__(f"meshai.env{module_path}", fromlist=[class_name])
cls = getattr(module, class_name)
self._adapters[name] = cls(*args_fn(cfg))
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
logger.warning("Failed to initialize %s adapter: %s", name, err_msg)
self._failed_adapters[name] = err_msg
def refresh(self) -> bool:
"""Called every second from main loop. Ticks each adapter.
@ -302,6 +307,41 @@ class EnvironmentalStore:
return "\n".join(lines)
def get_status(self) -> list:
"""Get status of all adapters including failed ones."""
status = []
for name, adapter in self._adapters.items():
try:
hs = adapter.health_status
status.append({
"source": name,
"is_loaded": True,
"last_error": hs.get("last_error"),
"consecutive_errors": hs.get("consecutive_errors", 0),
"event_count": hs.get("event_count", 0),
"last_fetch": hs.get("last_fetch"),
})
except Exception:
status.append({
"source": name,
"is_loaded": True,
"last_error": None,
"consecutive_errors": 0,
"event_count": 0,
"last_fetch": None,
})
for name, error in self._failed_adapters.items():
status.append({
"source": name,
"is_loaded": False,
"last_error": error,
"consecutive_errors": 0,
"event_count": 0,
"last_fetch": None,
})
return status
def get_source_health(self) -> list:
"""Get health status for all adapters."""
return [a.health_status for a in self._adapters.values()]