From b36223146f6ed09f6ecff990c6cf1fb29bae8be8 Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Wed, 10 Jun 2026 20:25:50 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20harden=20EnvironmentalStore=20adapter=20?= =?UTF-8?q?init=20=E2=80=94=20isolate=20failures?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wrap each adapter registration in try/except via _register_adapter() - Track failed adapters in _failed_adapters dict with error message - Add get_status() method for /api/env/status to report failed adapters - No single adapter failure can abort loading of remaining adapters Co-Authored-By: Claude Opus 4.5 --- meshai/env/store.py | 120 +++++++++++++++++++++++++++++--------------- 1 file changed, 80 insertions(+), 40 deletions(-) diff --git a/meshai/env/store.py b/meshai/env/store.py index 953de1c..889a088 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -21,6 +21,7 @@ class EnvironmentalStore: event_bus: Optional["EventBus"] = None, ): self._adapters = {} # name -> adapter instance + self._failed_adapters = {} # name -> last_error string self._events = {} # (source, event_id) -> event dict self._event_bus = event_bus # Pipeline EventBus for emission self._swpc_status = {} # Kp/SFI/scales snapshot @@ -28,56 +29,60 @@ class EnvironmentalStore: self._mesh_zones = config.nws_zones or [] self._region_anchors = region_anchors or [] - # Create adapter instances based on config - if config.nws.enabled and config.nws.feed_source == "native": - from .nws import NWSAlertsAdapter - self._adapters["nws"] = NWSAlertsAdapter(config.nws) - - if config.swpc.enabled and config.swpc.feed_source == "native": - from .swpc import SWPCAdapter - self._adapters["swpc"] = SWPCAdapter(config.swpc) - - if config.ducting.enabled and config.ducting.feed_source == "native": - from .ducting import DuctingAdapter - self._adapters["ducting"] = DuctingAdapter(config.ducting) - - if config.fires.enabled and config.fires.feed_source == "native": - from .fires import NICFFiresAdapter - self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors) - - if config.avalanche.enabled and config.avalanche.feed_source == "native": - from .avalanche import AvalancheAdapter - self._adapters["avalanche"] = AvalancheAdapter(config.avalanche) - - if config.usgs.enabled and config.usgs.feed_source == "native": - from .usgs import USGSStreamsAdapter - self._adapters["usgs"] = USGSStreamsAdapter(config.usgs) - - if config.usgs_quake.enabled and config.usgs_quake.feed_source == "native": - from .usgs_quake import USGSQuakeAdapter - self._adapters["usgs_quake"] = USGSQuakeAdapter(config.usgs_quake) - - if config.traffic.enabled and config.traffic.feed_source == "native": - from .traffic import TomTomTrafficAdapter - self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic) - - if config.roads511.enabled and config.roads511.feed_source == "native": - from .roads511 import Roads511Adapter - self._adapters["roads511"] = Roads511Adapter(config.roads511) + # Create adapter instances with error isolation + self._register_adapter("nws", config.nws, ".nws", "NWSAlertsAdapter", + lambda cfg: (cfg,)) + self._register_adapter("swpc", config.swpc, ".swpc", "SWPCAdapter", + lambda cfg: (cfg,)) + self._register_adapter("ducting", config.ducting, ".ducting", "DuctingAdapter", + lambda cfg: (cfg,)) + self._register_adapter("nifc", config.fires, ".fires", "NICFFiresAdapter", + lambda cfg: (cfg, self._region_anchors)) + self._register_adapter("avalanche", config.avalanche, ".avalanche", "AvalancheAdapter", + lambda cfg: (cfg,)) + self._register_adapter("usgs", config.usgs, ".usgs", "USGSStreamsAdapter", + lambda cfg: (cfg,)) + self._register_adapter("usgs_quake", config.usgs_quake, ".usgs_quake", "USGSQuakeAdapter", + lambda cfg: (cfg,)) + self._register_adapter("traffic", config.traffic, ".traffic", "TomTomTrafficAdapter", + lambda cfg: (cfg,)) + self._register_adapter("roads511", config.roads511, ".roads511", "Roads511Adapter", + lambda cfg: (cfg,)) # FIRMS needs reference to NIFC adapter for cross-referencing if config.firms.enabled and config.firms.feed_source == "native": - from .firms import FIRMSAdapter - fires_adapter = self._adapters.get("nifc") - self._firms = FIRMSAdapter(config.firms, self._region_anchors, fires_adapter) - self._adapters["firms"] = self._firms + try: + from .firms import FIRMSAdapter + fires_adapter = self._adapters.get("nifc") + self._firms = FIRMSAdapter(config.firms, self._region_anchors, fires_adapter) + self._adapters["firms"] = self._firms + except Exception as e: + err_msg = f"{type(e).__name__}: {e}" + logger.warning("Failed to initialize firms adapter: %s", err_msg) + self._failed_adapters["firms"] = err_msg _central = [n for n in ("nws", "swpc", "ducting", "fires", "avalanche", "usgs", "usgs_quake", "traffic", "roads511", "firms") if getattr(getattr(config, n, None), "feed_source", "native") == "central"] if _central: logger.debug("Adapters sourced from Central (native skipped): %s", _central) + if self._failed_adapters: + logger.warning("Failed adapters: %s", list(self._failed_adapters.keys())) logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters") + + def _register_adapter(self, name: str, cfg, module_path: str, class_name: str, args_fn): + """Register a single adapter with error isolation.""" + if not cfg.enabled or cfg.feed_source != "native": + return + try: + module = __import__(f"meshai.env{module_path}", fromlist=[class_name]) + cls = getattr(module, class_name) + self._adapters[name] = cls(*args_fn(cfg)) + except Exception as e: + err_msg = f"{type(e).__name__}: {e}" + logger.warning("Failed to initialize %s adapter: %s", name, err_msg) + self._failed_adapters[name] = err_msg + def refresh(self) -> bool: """Called every second from main loop. Ticks each adapter. @@ -302,6 +307,41 @@ class EnvironmentalStore: return "\n".join(lines) + + def get_status(self) -> list: + """Get status of all adapters including failed ones.""" + status = [] + for name, adapter in self._adapters.items(): + try: + hs = adapter.health_status + status.append({ + "source": name, + "is_loaded": True, + "last_error": hs.get("last_error"), + "consecutive_errors": hs.get("consecutive_errors", 0), + "event_count": hs.get("event_count", 0), + "last_fetch": hs.get("last_fetch"), + }) + except Exception: + status.append({ + "source": name, + "is_loaded": True, + "last_error": None, + "consecutive_errors": 0, + "event_count": 0, + "last_fetch": None, + }) + for name, error in self._failed_adapters.items(): + status.append({ + "source": name, + "is_loaded": False, + "last_error": error, + "consecutive_errors": 0, + "event_count": 0, + "last_fetch": None, + }) + return status + def get_source_health(self) -> list: """Get health status for all adapters.""" return [a.health_status for a in self._adapters.values()]