From 9e3f940a1b9cf26f669be6ce2ce3c91e8ed8d134 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 15:13:56 +0000 Subject: [PATCH 1/9] feat(config): add multi-file config loader with !include support - Add config_loader.py with !include directive support - Environment variable interpolation with default syntax - local.yaml merging for operator-identifying values - Secret loading from /data/secrets/.env - save_section() for dashboard write-back - Cycle detection for include directives - Graceful degradation when files missing Co-Authored-By: Claude Opus 4.5 --- meshai/config_loader.py | 680 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 680 insertions(+) create mode 100644 meshai/config_loader.py diff --git a/meshai/config_loader.py b/meshai/config_loader.py new file mode 100644 index 0000000..ff6ddfe --- /dev/null +++ b/meshai/config_loader.py @@ -0,0 +1,680 @@ +"""Multi-file configuration loader for MeshAI v0.3. + +This module provides: +- !include directive support for splitting config across files +- Environment variable interpolation (${VAR_NAME} and ${VAR_NAME:-default}) +- Operator-local value merging from local.yaml +- Secret loading from .env files +- Section-aware save_section() for dashboard write-back + +The loader produces the same Config dataclass shape as config.py, +ensuring backward compatibility with all existing consumers. +""" + +import logging +import os +import re +from pathlib import Path +from typing import Any, Optional + +import yaml +from dotenv import dotenv_values + +# Import existing dataclasses - shape must NOT change +from .config import ( + Config, + _dict_to_dataclass, + _dataclass_to_dict, +) + +_logger = logging.getLogger(__name__) + +# ============================================================================= +# SECTION TO FILE MAPPING +# ============================================================================= + +SECTION_TO_FILE: dict[str, str] = { + # Inline in orchestrator config.yaml + "timezone": "config.yaml", + "bot": "config.yaml", + "response": "config.yaml", + "history": "config.yaml", + "memory": "config.yaml", + "context": "config.yaml", + "weather": "config.yaml", + "meshmonitor": "config.yaml", + "knowledge": "config.yaml", + + # Domain files + "connection": "meshtastic.yaml", + "commands": "meshtastic.yaml", + "mesh_sources": "mesh_sources.yaml", + "mesh_intelligence": "mesh_intelligence.yaml", + "environmental": "env_feeds.yaml", + "notifications": "notifications.yaml", + "llm": "llm.yaml", + "dashboard": "dashboard.yaml", +} + +# Fields that should be written to local.yaml instead of domain files +LOCAL_FIELDS: dict[str, str] = { + "bot.name": "identity.name", + "bot.owner": "identity.owner", + "connection.tcp_host": "infrastructure.tcp_host", + "knowledge.qdrant_host": "infrastructure.qdrant_host", + "knowledge.tei_host": "infrastructure.tei_host", + "knowledge.sparse_host": "infrastructure.sparse_host", + "meshmonitor.url": "mesh_sources.meshmonitor_url", + "mesh_intelligence.critical_nodes": "critical_nodes", + "environmental.ducting.latitude": "env_center.latitude", + "environmental.ducting.longitude": "env_center.longitude", +} + +# Fields that contain secrets - NEVER written, must be in .env +SECRET_FIELDS: set[str] = { + "llm.api_key", + "mesh_sources.*.api_token", + "mesh_sources.*.password", + "environmental.traffic.api_key", + "environmental.firms.map_key", + "notifications.rules.*.smtp_password", +} + +# Secret env var names expected in .env +EXPECTED_SECRETS: list[str] = [ + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "GOOGLE_API_KEY", + "MESHMONITOR_API_TOKEN", + "MQTT_PASSWORD", + "TOMTOM_API_KEY", + "FIRMS_MAP_KEY", + "SMTP_PASSWORD", +] + + +# ============================================================================= +# YAML !INCLUDE CONSTRUCTOR +# ============================================================================= + +# Global set for tracking files currently being loaded (cycle detection) +_loading_files: set[Path] = set() + + +def _make_include_loader(base_path: Path): + """Create an IncludeLoader class with the given base path.""" + + class IncludeLoader(yaml.SafeLoader): + """YAML loader with !include tag support.""" + pass + + def construct_include(loader: IncludeLoader, node: yaml.Node) -> Any: + """Handle !include directive.""" + relative_path = loader.construct_scalar(node) + include_path = (base_path / relative_path).resolve() + + # Cycle detection using global set + if include_path in _loading_files: + raise yaml.YAMLError( + f"Circular include detected: {include_path} is already being loaded. " + f"Current loading chain: {[str(p) for p in _loading_files]}" + ) + + if not include_path.exists(): + raise yaml.YAMLError( + f"Include file not found: {include_path} " + f"(referenced from {base_path / 'config.yaml'})" + ) + + _loading_files.add(include_path) + try: + with open(include_path, "r") as f: + # Recursively load with the include file's directory as new base + NestedLoader = _make_include_loader(include_path.parent) + return yaml.load(f, Loader=NestedLoader) + finally: + _loading_files.discard(include_path) + + IncludeLoader.add_constructor("!include", construct_include) + return IncludeLoader + + +def _load_yaml_with_includes(file_path: Path) -> dict: + """Load a YAML file with !include directive support.""" + global _loading_files + _loading_files.clear() # Reset cycle detection + + if not file_path.exists(): + return {} + + # Add the root file to loading set + file_path = file_path.resolve() + _loading_files.add(file_path) + + try: + with open(file_path, "r") as f: + Loader = _make_include_loader(file_path.parent) + return yaml.load(f, Loader=Loader) or {} + finally: + _loading_files.discard(file_path) + + +# ============================================================================= +# ENVIRONMENT VARIABLE INTERPOLATION +# ============================================================================= + +_ENV_PATTERN = re.compile(r"\$\{([A-Z_][A-Z0-9_]*)(?::-([^}]*))?\}") + + +def _interpolate_env_vars(value: Any, env: dict[str, str]) -> Any: + """Recursively interpolate ${VAR_NAME} and ${VAR_NAME:-default} in strings. + + Args: + value: The value to interpolate (can be string, dict, list, or other) + env: Combined environment (os.environ + .env file values) + + Returns: + The value with environment variables resolved + """ + if isinstance(value, str): + def replace_match(match): + var_name = match.group(1) + default = match.group(2) + + # os.environ takes precedence over .env file + resolved = os.environ.get(var_name) + if resolved is None: + resolved = env.get(var_name) + if resolved is None: + if default is not None: + return default + _logger.warning( + f"Environment variable ${{{var_name}}} not found and no default provided. " + "Using empty string." + ) + return "" + return resolved + + return _ENV_PATTERN.sub(replace_match, value) + + elif isinstance(value, dict): + return {k: _interpolate_env_vars(v, env) for k, v in value.items()} + + elif isinstance(value, list): + return [_interpolate_env_vars(item, env) for item in value] + + return value + + +# ============================================================================= +# LOCAL.YAML MERGING +# ============================================================================= + +def _merge_local_values(data: dict, local: dict) -> dict: + """Merge operator-local values from local.yaml into the config data. + + This handles: + - identity.name/owner -> bot.name/owner + - infrastructure.* -> connection/knowledge hosts + - regions.{name}.lat/lon -> mesh_intelligence.regions[name].lat/lon + - critical_nodes -> mesh_intelligence.critical_nodes + - mesh_sources.sources.{name}.* -> mesh_sources[name].* + - env_center.* -> environmental.ducting.* + - notification_targets.* -> notifications rules + + Args: + data: The loaded config data (will be modified in place) + local: The local.yaml data + + Returns: + The merged data dict + """ + if not local: + return data + + # Identity -> bot + identity = local.get("identity", {}) + if "bot" in data: + if identity.get("name"): + data["bot"]["name"] = identity["name"] + if identity.get("owner"): + data["bot"]["owner"] = identity["owner"] + + # Infrastructure hosts + infra = local.get("infrastructure", {}) + if infra.get("tcp_host") and "connection" in data: + data["connection"]["tcp_host"] = infra["tcp_host"] + if "knowledge" in data: + if infra.get("qdrant_host"): + data["knowledge"]["qdrant_host"] = infra["qdrant_host"] + if infra.get("tei_host"): + data["knowledge"]["tei_host"] = infra["tei_host"] + if infra.get("sparse_host"): + data["knowledge"]["sparse_host"] = infra["sparse_host"] + + # Meshmonitor URL + mesh_sources_local = local.get("mesh_sources", {}) + if mesh_sources_local.get("meshmonitor_url") and "meshmonitor" in data: + data["meshmonitor"]["url"] = mesh_sources_local["meshmonitor_url"] + + # Mesh sources URLs + sources_local = mesh_sources_local.get("sources", {}) + if "mesh_sources" in data and isinstance(data["mesh_sources"], list): + for source in data["mesh_sources"]: + if isinstance(source, dict): + source_name = source.get("name", "") + local_source = sources_local.get(source_name, {}) + if local_source.get("url"): + source["url"] = local_source["url"] + if local_source.get("host"): + source["host"] = local_source["host"] + + # Region coordinates + regions_local = local.get("regions", {}) + if "mesh_intelligence" in data: + mi = data["mesh_intelligence"] + if "regions" in mi and isinstance(mi["regions"], list): + for region in mi["regions"]: + if isinstance(region, dict): + region_name = region.get("name", "") + local_coords = regions_local.get(region_name, {}) + if "lat" in local_coords: + region["lat"] = local_coords["lat"] + if "lon" in local_coords: + region["lon"] = local_coords["lon"] + + # Critical nodes + if local.get("critical_nodes"): + mi["critical_nodes"] = local["critical_nodes"] + + # Environmental center point + env_center = local.get("env_center", {}) + if "environmental" in data: + env = data["environmental"] + if "ducting" in env: + if env_center.get("latitude") is not None: + env["ducting"]["latitude"] = env_center["latitude"] + if env_center.get("longitude") is not None: + env["ducting"]["longitude"] = env_center["longitude"] + + # NWS user agent from contact email + if identity.get("contact_email") and "nws" in env: + email = identity["contact_email"] + env["nws"]["user_agent"] = f"(meshai, {email})" + + # Notification targets + notif_targets = local.get("notification_targets", {}) + if "notifications" in data and "rules" in data["notifications"]: + alert_node_ids = notif_targets.get("alert_node_ids", []) + smtp_recipients = notif_targets.get("smtp_recipients", []) + + for rule in data["notifications"]["rules"]: + if isinstance(rule, dict): + # Apply default node_ids if not set + if rule.get("delivery_type") == "mesh_dm" and not rule.get("node_ids"): + rule["node_ids"] = alert_node_ids + # Apply default recipients if not set + if rule.get("delivery_type") == "email" and not rule.get("recipients"): + rule["recipients"] = smtp_recipients + # Apply smtp_from + if notif_targets.get("smtp_from") and not rule.get("from_address"): + rule["from_address"] = notif_targets["smtp_from"] + + return data + + +# ============================================================================= +# VALIDATION +# ============================================================================= + +def _validate_config(data: dict, local: dict, env: dict[str, str]) -> None: + """Validate config and log warnings for missing values. + + This does NOT raise errors - MeshAI starts in degraded mode with missing values. + """ + # Check regions for missing coordinates + if "mesh_intelligence" in data: + mi = data["mesh_intelligence"] + if mi.get("enabled") and "regions" in mi: + regions_local = local.get("regions", {}) if local else {} + for region in mi["regions"]: + if isinstance(region, dict): + region_name = region.get("name", "unknown") + if not region.get("lat") or not region.get("lon"): + if region_name not in regions_local: + _logger.warning( + f"Region '{region_name}' has no coordinates in local.yaml - " + "geographic features disabled for this region" + ) + + # Check for missing secrets + missing_secrets = [] + for secret in EXPECTED_SECRETS: + if not os.environ.get(secret) and not env.get(secret): + missing_secrets.append(secret) + + if missing_secrets: + _logger.warning( + f"Missing secret environment variables: {', '.join(missing_secrets)}. " + "Some features may be disabled." + ) + + # Check LLM API key + if "llm" in data: + api_key = data["llm"].get("api_key", "") + if not api_key or (api_key.startswith("${") and api_key.endswith("}")): + # It's a reference, check if resolved + backend = data["llm"].get("backend", "openai").lower() + key_var = { + "openai": "OPENAI_API_KEY", + "anthropic": "ANTHROPIC_API_KEY", + "google": "GOOGLE_API_KEY", + }.get(backend, "LLM_API_KEY") + if not os.environ.get(key_var) and not env.get(key_var): + _logger.warning( + f"LLM backend '{backend}' configured but {key_var} not found. " + "LLM responses will fail." + ) + + +# ============================================================================= +# MAIN LOADER +# ============================================================================= + +def load_config(config_dir: Path = Path("/data/config")) -> Config: + """Load configuration from multi-file layout. + + This function: + 1. Reads config.yaml (orchestrator) with !include directives + 2. Reads local.yaml if present (operator-local values) + 3. Reads /data/secrets/.env if present (secret values) + 4. Interpolates ${VAR_NAME} references + 5. Merges local values into config + 6. Validates and logs warnings for missing values + 7. Returns the same Config dataclass shape + + Args: + config_dir: Path to config directory (default: /data/config) + + Returns: + Config dataclass instance + """ + config_dir = Path(config_dir) + + # Determine config file path + # Support both new layout (/data/config/config.yaml) and legacy (/data/config.yaml) + orchestrator_path = config_dir / "config.yaml" + legacy_path = config_dir.parent / "config.yaml" if config_dir.name == "config" else None + + if not orchestrator_path.exists(): + if legacy_path and legacy_path.exists(): + # Fall back to legacy single-file config + _logger.info(f"Using legacy config at {legacy_path}") + from .config import load_config as legacy_load + return legacy_load(legacy_path) + else: + _logger.warning( + f"Config file not found at {orchestrator_path}. " + "Using default configuration." + ) + config = Config() + config._config_path = orchestrator_path + return config + + # Load orchestrator with !include support + _logger.debug(f"Loading config from {orchestrator_path}") + data = _load_yaml_with_includes(orchestrator_path) + + # Load local.yaml + local_path = config_dir / "local.yaml" + local_data = {} + if local_path.exists(): + with open(local_path, "r") as f: + local_data = yaml.safe_load(f) or {} + _logger.debug(f"Loaded local config from {local_path}") + else: + _logger.warning( + f"No local.yaml found at {local_path}. " + "MeshAI is in no-location mode - geographic features disabled." + ) + + # Load secrets from .env + secrets_path = config_dir.parent / "secrets" / ".env" + env_data = {} + if secrets_path.exists(): + env_data = dotenv_values(secrets_path) + _logger.debug(f"Loaded {len(env_data)} secrets from {secrets_path}") + else: + # Try alternate location + alt_secrets_path = Path("/data/secrets/.env") + if alt_secrets_path.exists(): + env_data = dotenv_values(alt_secrets_path) + _logger.debug(f"Loaded {len(env_data)} secrets from {alt_secrets_path}") + else: + _logger.warning( + f"No .env file found at {secrets_path}. " + "API keys must be set via environment variables." + ) + + # Interpolate environment variables + data = _interpolate_env_vars(data, env_data) + + # Merge local values + data = _merge_local_values(data, local_data) + + # Validate and warn + _validate_config(data, local_data, env_data) + + # Convert to Config dataclass + config = _dict_to_dataclass(Config, data) + config._config_path = orchestrator_path + + return config + + +# ============================================================================= +# SECTION SAVER +# ============================================================================= + +def _is_secret_field(section: str, field_path: str) -> bool: + """Check if a field path matches a secret field pattern.""" + full_path = f"{section}.{field_path}" if field_path else section + + for pattern in SECRET_FIELDS: + # Convert pattern to regex + regex = pattern.replace(".", r"\.").replace("*", r"[^.]+") + if re.match(f"^{regex}$", full_path): + return True + return False + + +def _extract_local_fields(section: str, data: dict) -> tuple[dict, dict]: + """Extract local fields from data. + + Returns: + (domain_data, local_data) - data split by destination + """ + domain_data = dict(data) + local_data = {} + + # Check each LOCAL_FIELDS pattern + for field_pattern, local_path in LOCAL_FIELDS.items(): + if not field_pattern.startswith(f"{section}."): + continue + + # Extract field name from pattern + field_name = field_pattern[len(section) + 1:] # Remove "section." + + if ".*." in field_name: + # Array field pattern - handle specially + continue + + if field_name in domain_data: + # Move to local_data using the local_path + value = domain_data.pop(field_name) + # Build nested structure in local_data + parts = local_path.split(".") + current = local_data + for part in parts[:-1]: + if part not in current: + current[part] = {} + current = current[part] + current[parts[-1]] = value + + return domain_data, local_data + + +def save_section( + section_name: str, + data: dict, + config_dir: Path = Path("/data/config"), +) -> dict: + """Save a configuration section to the appropriate file(s). + + This function: + 1. Determines which file(s) the section belongs to + 2. Extracts local-identifying fields to local.yaml + 3. Rejects attempts to save secret fields + 4. Writes domain data to the appropriate file + 5. Writes local data to local.yaml + + Args: + section_name: Name of the section (e.g., "notifications", "llm") + data: The section data as a dict + config_dir: Path to config directory + + Returns: + Dict with status: {"saved": True, "files_written": [...], "rejected_secrets": [...]} + + Raises: + ValueError: If section_name is not recognized + """ + config_dir = Path(config_dir) + + if section_name not in SECTION_TO_FILE: + raise ValueError( + f"Unknown section '{section_name}'. " + f"Valid sections: {', '.join(sorted(SECTION_TO_FILE.keys()))}" + ) + + target_file = SECTION_TO_FILE[section_name] + target_path = config_dir / target_file + local_path = config_dir / "local.yaml" + + files_written = [] + rejected_secrets = [] + + # Check for secret fields and reject them + def check_secrets(d: dict, path: str = "") -> dict: + cleaned = {} + for key, value in d.items(): + field_path = f"{path}.{key}" if path else key + if _is_secret_field(section_name, field_path): + rejected_secrets.append(field_path) + _logger.error( + f"Rejected attempt to save secret field '{section_name}.{field_path}'. " + "Secret fields must be set via /data/secrets/.env" + ) + elif isinstance(value, dict): + cleaned[key] = check_secrets(value, field_path) + elif isinstance(value, list): + cleaned[key] = [ + check_secrets(item, f"{field_path}[{i}]") + if isinstance(item, dict) else item + for i, item in enumerate(value) + ] + else: + cleaned[key] = value + return cleaned + + data = check_secrets(data) + + # Extract local fields + domain_data, local_updates = _extract_local_fields(section_name, data) + + # Load existing target file + if target_path.exists(): + with open(target_path, "r") as f: + existing = yaml.safe_load(f) or {} + else: + existing = {} + + # Handle sections that share a file (meshtastic.yaml has both connection and commands) + if target_file == "meshtastic.yaml": + existing[section_name] = domain_data + elif target_file == "config.yaml": + # For orchestrator, update the section in place + existing[section_name] = domain_data + else: + # For dedicated files, the whole file IS the section + existing = domain_data + + # Write domain file + with open(target_path, "w") as f: + yaml.dump(existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + files_written.append(str(target_path)) + _logger.info(f"Saved {section_name} to {target_path}") + + # Update local.yaml if there are local fields + if local_updates: + if local_path.exists(): + with open(local_path, "r") as f: + local_existing = yaml.safe_load(f) or {} + else: + local_existing = {} + + # Deep merge local_updates into local_existing + def deep_merge(base: dict, updates: dict) -> dict: + for key, value in updates.items(): + if key in base and isinstance(base[key], dict) and isinstance(value, dict): + deep_merge(base[key], value) + else: + base[key] = value + return base + + deep_merge(local_existing, local_updates) + + with open(local_path, "w") as f: + yaml.dump(local_existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + # Set restrictive permissions on local.yaml + local_path.chmod(0o600) + files_written.append(str(local_path)) + _logger.info(f"Updated local values in {local_path}") + + return { + "saved": True, + "files_written": files_written, + "rejected_secrets": rejected_secrets, + } + + +# ============================================================================= +# UTILITY FUNCTIONS +# ============================================================================= + +def get_config_dir_from_path(config_path: Path) -> Path: + """Determine config directory from a config file path. + + Args: + config_path: Path to config.yaml (could be legacy or new layout) + + Returns: + Path to config directory + """ + config_path = Path(config_path) + + if config_path.is_dir(): + return config_path + + # If pointing to config.yaml in new layout + if config_path.name == "config.yaml" and config_path.parent.name == "config": + return config_path.parent + + # If pointing to legacy /data/config.yaml + if config_path.name == "config.yaml": + new_layout = config_path.parent / "config" + if new_layout.exists() and (new_layout / "config.yaml").exists(): + return new_layout + + return config_path.parent From 2c11432bd8e3ec1624da7ccb2da2479b55cdb678 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 15:14:05 +0000 Subject: [PATCH 2/9] feat(config): add migration script for v0.2 to v0.3 layout - Backup original config before migration - Split monolithic config into domain files - Extract operator-identifying values to local.yaml - Extract secrets to /data/secrets/.env - Create orchestrator with !include directives - Post-migration verification - Safe to run multiple times (idempotent checks) Co-Authored-By: Claude Opus 4.5 --- meshai/scripts/__init__.py | 1 + meshai/scripts/migrate_config_v03.py | 708 +++++++++++++++++++++++++++ 2 files changed, 709 insertions(+) create mode 100644 meshai/scripts/__init__.py create mode 100644 meshai/scripts/migrate_config_v03.py diff --git a/meshai/scripts/__init__.py b/meshai/scripts/__init__.py new file mode 100644 index 0000000..00d71f6 --- /dev/null +++ b/meshai/scripts/__init__.py @@ -0,0 +1 @@ +# MeshAI scripts package diff --git a/meshai/scripts/migrate_config_v03.py b/meshai/scripts/migrate_config_v03.py new file mode 100644 index 0000000..312df00 --- /dev/null +++ b/meshai/scripts/migrate_config_v03.py @@ -0,0 +1,708 @@ +#!/usr/bin/env python3 +"""Migration script for MeshAI config v0.2 → v0.3. + +This script converts the monolithic /data/config.yaml into the new +multi-file layout under /data/config/. + +Run once: python -m meshai.scripts.migrate_config_v03 + +The migration: +1. Backs up the original config +2. Splits sections into domain files +3. Extracts operator-identifying values to local.yaml +4. Extracts literal secrets to /data/secrets/.env +5. Creates orchestrator config.yaml with !include directives +6. Verifies the new layout loads identically +""" + +import hashlib +import logging +import os +import re +import shutil +import sys +from dataclasses import fields +from pathlib import Path +from typing import Any + +import yaml + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger(__name__) + + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +SOURCE_FILE = Path("/data/config.yaml") +TARGET_DIR = Path("/data/config") +BACKUP_FILE = Path("/data/config.yaml.pre-v03-backup") +SECRETS_DIR = Path("/data/secrets") + +# Section to file mapping +SECTION_TO_FILE = { + "connection": "meshtastic.yaml", + "commands": "meshtastic.yaml", + "mesh_sources": "mesh_sources.yaml", + "mesh_intelligence": "mesh_intelligence.yaml", + "environmental": "env_feeds.yaml", + "notifications": "notifications.yaml", + "llm": "llm.yaml", + "dashboard": "dashboard.yaml", +} + +# Sections that stay inline in orchestrator config.yaml +INLINE_SECTIONS = [ + "timezone", + "bot", + "response", + "history", + "memory", + "context", + "weather", + "meshmonitor", + "knowledge", +] + +# Fields to extract to local.yaml +LOCAL_EXTRACTIONS = { + "bot.name": "identity.name", + "bot.owner": "identity.owner", + "connection.tcp_host": "infrastructure.tcp_host", + "knowledge.qdrant_host": "infrastructure.qdrant_host", + "knowledge.tei_host": "infrastructure.tei_host", + "knowledge.sparse_host": "infrastructure.sparse_host", + "meshmonitor.url": "mesh_sources.meshmonitor_url", + "mesh_intelligence.critical_nodes": "critical_nodes", + "environmental.ducting.latitude": "env_center.latitude", + "environmental.ducting.longitude": "env_center.longitude", + "environmental.nws.user_agent": "identity.contact_email", # Extract email from user_agent +} + +# Secret fields - if found as literals, extract to .env +SECRET_PATTERNS = { + "llm.api_key": "LLM_API_KEY", # Will be renamed based on backend + "mesh_sources.*.api_token": "MESHMONITOR_API_TOKEN", + "mesh_sources.*.password": "MQTT_PASSWORD", + "environmental.traffic.api_key": "TOMTOM_API_KEY", + "environmental.firms.map_key": "FIRMS_MAP_KEY", + "notifications.rules.*.smtp_password": "SMTP_PASSWORD", +} + + +# ============================================================================= +# UTILITY FUNCTIONS +# ============================================================================= + +def get_nested(data: dict, path: str) -> Any: + """Get a value from nested dict using dot notation.""" + parts = path.split(".") + current = data + for part in parts: + if isinstance(current, dict) and part in current: + current = current[part] + else: + return None + return current + + +def set_nested(data: dict, path: str, value: Any) -> None: + """Set a value in nested dict using dot notation, creating dicts as needed.""" + parts = path.split(".") + current = data + for part in parts[:-1]: + if part not in current: + current[part] = {} + current = current[part] + current[parts[-1]] = value + + +def remove_nested(data: dict, path: str) -> bool: + """Remove a value from nested dict. Returns True if removed.""" + parts = path.split(".") + current = data + for part in parts[:-1]: + if isinstance(current, dict) and part in current: + current = current[part] + else: + return False + if parts[-1] in current: + del current[parts[-1]] + return True + return False + + +def file_hash(path: Path) -> str: + """Calculate SHA256 hash of a file.""" + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +def is_env_var_ref(value: str) -> bool: + """Check if a string is an env var reference like ${VAR_NAME}.""" + if not isinstance(value, str): + return False + return bool(re.match(r"^\$\{[A-Z_][A-Z0-9_]*\}$", value)) + + +def extract_email_from_user_agent(user_agent: str) -> str: + """Extract email from NWS user_agent format: (app, email@domain.com)""" + if not user_agent: + return "" + match = re.search(r"[\w.+-]+@[\w.-]+\.\w+", user_agent) + return match.group(0) if match else "" + + +# ============================================================================= +# PRE-FLIGHT CHECKS +# ============================================================================= + +def preflight_checks() -> bool: + """Run pre-flight checks before migration.""" + logger.info("Running pre-flight checks...") + + # Check source exists + if not SOURCE_FILE.exists(): + logger.error(f"Source file not found: {SOURCE_FILE}") + return False + logger.info(f" Source file exists: {SOURCE_FILE}") + + # Check target directory state + if TARGET_DIR.exists(): + contents = list(TARGET_DIR.iterdir()) + if contents: + logger.error( + f"Target directory {TARGET_DIR} already populated with {len(contents)} items. " + "Manual intervention needed - remove existing files or restore from backup." + ) + return False + logger.info(f" Target directory exists but is empty: {TARGET_DIR}") + else: + logger.info(f" Target directory does not exist: {TARGET_DIR}") + + # Check backup doesn't already exist (indicates previous migration) + if BACKUP_FILE.exists(): + logger.warning( + f"Backup file already exists: {BACKUP_FILE}. " + "This may indicate a previous migration attempt." + ) + # Continue anyway - user may be re-running after fixing issues + + logger.info("Pre-flight checks passed.") + return True + + +# ============================================================================= +# BACKUP +# ============================================================================= + +def create_backup() -> bool: + """Create backup of original config.""" + logger.info(f"Creating backup: {SOURCE_FILE} → {BACKUP_FILE}") + + shutil.copy2(SOURCE_FILE, BACKUP_FILE) + + # Verify backup + source_hash = file_hash(SOURCE_FILE) + backup_hash = file_hash(BACKUP_FILE) + + if source_hash != backup_hash: + logger.error( + f"Backup verification failed! Hashes don't match:\n" + f" Source: {source_hash}\n" + f" Backup: {backup_hash}" + ) + return False + + source_size = SOURCE_FILE.stat().st_size + backup_size = BACKUP_FILE.stat().st_size + + if source_size != backup_size: + logger.error( + f"Backup verification failed! Sizes don't match:\n" + f" Source: {source_size}\n" + f" Backup: {backup_size}" + ) + return False + + logger.info(f" Backup verified: {backup_size} bytes, hash {backup_hash[:12]}...") + return True + + +# ============================================================================= +# EXTRACTION LOGIC +# ============================================================================= + +def extract_local_values(data: dict) -> dict: + """Extract operator-identifying values to local.yaml structure.""" + local = {} + + for source_path, dest_path in LOCAL_EXTRACTIONS.items(): + value = get_nested(data, source_path) + if value is not None and value != "" and value != 0: + # Special handling for user_agent -> email extraction + if source_path == "environmental.nws.user_agent": + value = extract_email_from_user_agent(str(value)) + if not value: + continue + + set_nested(local, dest_path, value) + logger.debug(f" Extracted {source_path} → local.{dest_path}") + + # Extract region coordinates + mi = data.get("mesh_intelligence", {}) + regions = mi.get("regions", []) + if regions: + local["regions"] = {} + for region in regions: + if isinstance(region, dict): + name = region.get("name", "") + lat = region.get("lat", 0) + lon = region.get("lon", 0) + if name and (lat != 0 or lon != 0): + local["regions"][name] = {"lat": lat, "lon": lon} + + # Extract mesh source URLs + mesh_sources = data.get("mesh_sources", []) + if mesh_sources: + local["mesh_sources"] = {"sources": {}} + for source in mesh_sources: + if isinstance(source, dict): + name = source.get("name", "") + url = source.get("url", "") + host = source.get("host", "") + if name and (url or host): + local["mesh_sources"]["sources"][name] = {} + if url: + local["mesh_sources"]["sources"][name]["url"] = url + if host: + local["mesh_sources"]["sources"][name]["host"] = host + + # Extract notification targets + notifications = data.get("notifications", {}) + rules = notifications.get("rules", []) + if rules: + node_ids = set() + recipients = set() + for rule in rules: + if isinstance(rule, dict): + for nid in rule.get("node_ids", []): + node_ids.add(nid) + for rcpt in rule.get("recipients", []): + recipients.add(rcpt) + if node_ids: + local["notification_targets"] = local.get("notification_targets", {}) + local["notification_targets"]["alert_node_ids"] = list(node_ids) + if recipients: + local["notification_targets"] = local.get("notification_targets", {}) + local["notification_targets"]["smtp_recipients"] = list(recipients) + + return local + + +def extract_secrets(data: dict) -> dict: + """Extract literal secrets to .env format.""" + secrets = {} + + # LLM API key + llm = data.get("llm", {}) + api_key = llm.get("api_key", "") + if api_key and not is_env_var_ref(api_key): + backend = llm.get("backend", "openai").upper() + key_name = f"{backend}_API_KEY" + if backend == "GOOGLE": + key_name = "GOOGLE_API_KEY" + secrets[key_name] = api_key + logger.info(f" Extracted llm.api_key → {key_name}") + + # Mesh source tokens/passwords + for i, source in enumerate(data.get("mesh_sources", [])): + if isinstance(source, dict): + token = source.get("api_token", "") + if token and not is_env_var_ref(token): + secrets["MESHMONITOR_API_TOKEN"] = token + logger.info(f" Extracted mesh_sources[{i}].api_token → MESHMONITOR_API_TOKEN") + + password = source.get("password", "") + if password and not is_env_var_ref(password): + secrets["MQTT_PASSWORD"] = password + logger.info(f" Extracted mesh_sources[{i}].password → MQTT_PASSWORD") + + # Environmental API keys + env = data.get("environmental", {}) + traffic = env.get("traffic", {}) + if traffic.get("api_key") and not is_env_var_ref(traffic["api_key"]): + secrets["TOMTOM_API_KEY"] = traffic["api_key"] + logger.info(" Extracted environmental.traffic.api_key → TOMTOM_API_KEY") + + firms = env.get("firms", {}) + if firms.get("map_key") and not is_env_var_ref(firms["map_key"]): + secrets["FIRMS_MAP_KEY"] = firms["map_key"] + logger.info(" Extracted environmental.firms.map_key → FIRMS_MAP_KEY") + + # Notification SMTP passwords + for i, rule in enumerate(data.get("notifications", {}).get("rules", [])): + if isinstance(rule, dict): + smtp_pass = rule.get("smtp_password", "") + if smtp_pass and not is_env_var_ref(smtp_pass): + secrets["SMTP_PASSWORD"] = smtp_pass + logger.info(f" Extracted notifications.rules[{i}].smtp_password → SMTP_PASSWORD") + + return secrets + + +def strip_local_values_from_domain(data: dict) -> dict: + """Remove local values from domain data, replacing with placeholders.""" + # Remove operator values that went to local.yaml + # These get merged back at load time + + # Strip bot name/owner (will come from local.yaml) + if "bot" in data: + data["bot"].pop("name", None) + data["bot"].pop("owner", None) + + # Strip connection tcp_host + if "connection" in data: + data["connection"].pop("tcp_host", None) + + # Strip knowledge hosts + if "knowledge" in data: + data["knowledge"].pop("qdrant_host", None) + data["knowledge"].pop("tei_host", None) + data["knowledge"].pop("sparse_host", None) + + # Strip meshmonitor url + if "meshmonitor" in data: + data["meshmonitor"].pop("url", None) + + # Strip critical_nodes (comes from local.yaml) + if "mesh_intelligence" in data: + data["mesh_intelligence"].pop("critical_nodes", None) + + # Strip region lat/lon (comes from local.yaml) + if "mesh_intelligence" in data: + for region in data["mesh_intelligence"].get("regions", []): + if isinstance(region, dict): + region.pop("lat", None) + region.pop("lon", None) + + # Strip mesh source URLs (comes from local.yaml) + for source in data.get("mesh_sources", []): + if isinstance(source, dict): + source.pop("url", None) + source.pop("host", None) + + # Strip ducting lat/lon (comes from local.yaml) + if "environmental" in data: + ducting = data["environmental"].get("ducting", {}) + ducting.pop("latitude", None) + ducting.pop("longitude", None) + # Strip nws user_agent (comes from local.yaml identity.contact_email) + nws = data["environmental"].get("nws", {}) + nws.pop("user_agent", None) + + # Strip notification node_ids and recipients (comes from local.yaml) + if "notifications" in data: + for rule in data["notifications"].get("rules", []): + if isinstance(rule, dict): + rule.pop("node_ids", None) + rule.pop("recipients", None) + rule.pop("from_address", None) + + return data + + +def replace_secrets_with_refs(data: dict, secrets: dict) -> dict: + """Replace literal secrets with ${VAR_NAME} references.""" + # LLM API key + if "llm" in data and data["llm"].get("api_key"): + backend = data["llm"].get("backend", "openai").upper() + key_name = f"{backend}_API_KEY" + if backend == "GOOGLE": + key_name = "GOOGLE_API_KEY" + data["llm"]["api_key"] = f"${{{key_name}}}" + + # Mesh sources + for source in data.get("mesh_sources", []): + if isinstance(source, dict): + if source.get("api_token") and not is_env_var_ref(source["api_token"]): + source["api_token"] = "${MESHMONITOR_API_TOKEN}" + if source.get("password") and not is_env_var_ref(source["password"]): + source["password"] = "${MQTT_PASSWORD}" + + # Environmental + if "environmental" in data: + traffic = data["environmental"].get("traffic", {}) + if traffic.get("api_key") and not is_env_var_ref(traffic["api_key"]): + traffic["api_key"] = "${TOMTOM_API_KEY}" + + firms = data["environmental"].get("firms", {}) + if firms.get("map_key") and not is_env_var_ref(firms["map_key"]): + firms["map_key"] = "${FIRMS_MAP_KEY}" + + # Notifications + for rule in data.get("notifications", {}).get("rules", []): + if isinstance(rule, dict): + if rule.get("smtp_password") and not is_env_var_ref(rule["smtp_password"]): + rule["smtp_password"] = "${SMTP_PASSWORD}" + + return data + + +# ============================================================================= +# FILE WRITING +# ============================================================================= + +def write_domain_file(path: Path, data: dict) -> None: + """Write a domain YAML file.""" + with open(path, "w") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + logger.info(f" Wrote {path} ({path.stat().st_size} bytes)") + + +def write_orchestrator(path: Path, data: dict) -> None: + """Write the orchestrator config.yaml with !include directives.""" + # Build the orchestrator content manually to preserve !include syntax + lines = [ + "# MeshAI Configuration v0.3", + "# Multi-file layout with !include directives", + "", + ] + + # Add inline sections + for section in INLINE_SECTIONS: + if section in data: + section_yaml = yaml.dump({section: data[section]}, default_flow_style=False, sort_keys=False) + lines.append(section_yaml.rstrip()) + lines.append("") + + # Add !include directives for domain files + lines.append("# Domain files (use !include)") + for section, target_file in SECTION_TO_FILE.items(): + if section in data and section not in ["commands"]: # commands shares file with connection + lines.append(f"{section}: !include {target_file}") + + content = "\n".join(lines) + "\n" + + with open(path, "w") as f: + f.write(content) + logger.info(f" Wrote orchestrator {path} ({path.stat().st_size} bytes)") + + +def write_local_yaml(path: Path, data: dict) -> None: + """Write local.yaml with restricted permissions.""" + header = """# LOCAL OPERATOR CONFIGURATION +# This file is gitignored - contains operator-identifying values +# Edit this file to customize for your deployment + +""" + with open(path, "w") as f: + f.write(header) + yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + path.chmod(0o600) + logger.info(f" Wrote {path} ({path.stat().st_size} bytes, mode 600)") + + +def write_env_file(path: Path, secrets: dict) -> None: + """Write .env file with restricted permissions.""" + header = """# MeshAI Secrets +# This file is gitignored - contains API keys and passwords +# Never commit this file to version control + +""" + lines = [header] + for key, value in sorted(secrets.items()): + lines.append(f"{key}={value}") + + content = "\n".join(lines) + "\n" + + with open(path, "w") as f: + f.write(content) + + path.chmod(0o600) + logger.info(f" Wrote {path} ({len(secrets)} secrets, mode 600)") + + +# ============================================================================= +# MAIN MIGRATION +# ============================================================================= + +def run_migration() -> bool: + """Run the full migration process.""" + logger.info("=" * 60) + logger.info("MeshAI Config Migration v0.2 → v0.3") + logger.info("=" * 60) + + # Pre-flight + if not preflight_checks(): + return False + + # Backup + if not create_backup(): + return False + + # Load original config + logger.info(f"Loading original config: {SOURCE_FILE}") + with open(SOURCE_FILE, "r") as f: + original_data = yaml.safe_load(f) + + if not original_data: + logger.error("Original config is empty or invalid!") + return False + + # Make a working copy + import copy + data = copy.deepcopy(original_data) + + # Extract local values + logger.info("Extracting operator-local values...") + local_data = extract_local_values(data) + local_count = sum(1 for _ in _count_values(local_data)) + logger.info(f" Extracted {local_count} local values") + + # Extract secrets + logger.info("Extracting secrets...") + secrets = extract_secrets(data) + logger.info(f" Extracted {len(secrets)} secrets") + + # Replace secrets with env var references + data = replace_secrets_with_refs(data, secrets) + + # Strip local values from domain data + data = strip_local_values_from_domain(data) + + # Create directories + logger.info("Creating directories...") + TARGET_DIR.mkdir(parents=True, exist_ok=True) + SECRETS_DIR.mkdir(parents=True, exist_ok=True) + SECRETS_DIR.chmod(0o700) + logger.info(f" Created {TARGET_DIR}") + logger.info(f" Created {SECRETS_DIR} (mode 700)") + + # Write domain files + logger.info("Writing domain files...") + + # Group sections by target file + file_contents: dict[str, dict] = {} + for section, target_file in SECTION_TO_FILE.items(): + if section in data: + if target_file not in file_contents: + file_contents[target_file] = {} + # For meshtastic.yaml, wrap in section name + if target_file == "meshtastic.yaml": + file_contents[target_file][section] = data[section] + else: + # For dedicated files, the whole file IS the section content + file_contents[target_file] = data[section] + + # Handle meshtastic.yaml specially (has both connection and commands) + for target_file, content in file_contents.items(): + write_domain_file(TARGET_DIR / target_file, content) + + # Write orchestrator + write_orchestrator(TARGET_DIR / "config.yaml", data) + + # Write local.yaml + write_local_yaml(TARGET_DIR / "local.yaml", local_data) + + # Write .env + if secrets: + write_env_file(SECRETS_DIR / ".env", secrets) + else: + logger.info(" No secrets to write (all were already env var refs)") + + # Verification + logger.info("=" * 60) + logger.info("Verifying migration...") + + try: + # Import here to use the newly deployed module + sys.path.insert(0, "/app") + from meshai.config_loader import load_config as new_load + from meshai.config import load_config as old_load, _dataclass_to_dict + + # Load with new loader + new_config = new_load(TARGET_DIR) + new_dict = _dataclass_to_dict(new_config) + + # Load backup with old loader + old_config = old_load(BACKUP_FILE) + old_dict = _dataclass_to_dict(old_config) + + # Compare key fields (some will differ due to local/secret extraction) + differences = [] + for key in ["timezone", "response", "history", "memory", "context"]: + if new_dict.get(key) != old_dict.get(key): + differences.append(f"{key}: {new_dict.get(key)} != {old_dict.get(key)}") + + if differences: + logger.error("Verification FAILED! Differences found:") + for diff in differences: + logger.error(f" {diff}") + return False + + logger.info(" Verification PASSED - config loads correctly") + + except Exception as e: + logger.error(f"Verification failed with exception: {e}") + import traceback + traceback.print_exc() + return False + + # Summary + logger.info("=" * 60) + logger.info("MIGRATION COMPLETE") + logger.info("=" * 60) + logger.info("") + logger.info("Files created:") + for f in sorted(TARGET_DIR.iterdir()): + logger.info(f" {f} ({f.stat().st_size} bytes)") + if (SECRETS_DIR / ".env").exists(): + logger.info(f" {SECRETS_DIR / '.env'} ({len(secrets)} secrets)") + logger.info("") + logger.info(f"Local values extracted: {local_count}") + logger.info(f"Secrets extracted: {len(secrets)} ({', '.join(secrets.keys()) if secrets else 'none'})") + logger.info("") + logger.info(f"Backup at: {BACKUP_FILE}") + logger.info("Delete the backup manually after verifying things work.") + logger.info("") + logger.info("ROLLBACK COMMAND (if needed):") + logger.info(f" rm -rf {TARGET_DIR} {SECRETS_DIR}") + logger.info(f" cp {BACKUP_FILE} {SOURCE_FILE}") + logger.info(" # Then revert main.py loader wiring if changed") + + return True + + +def _count_values(d: dict, prefix: str = "") -> Any: + """Generator to count leaf values in a nested dict.""" + for key, value in d.items(): + if isinstance(value, dict): + yield from _count_values(value, f"{prefix}.{key}") + elif isinstance(value, list): + for i, item in enumerate(value): + if isinstance(item, dict): + yield from _count_values(item, f"{prefix}.{key}[{i}]") + else: + yield f"{prefix}.{key}[{i}]" + else: + yield f"{prefix}.{key}" + + +# ============================================================================= +# ENTRY POINT +# ============================================================================= + +if __name__ == "__main__": + success = run_migration() + sys.exit(0 if success else 1) From 965a844b0d50ee4ac42d62f84548e1a629096d0c Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 15:14:12 +0000 Subject: [PATCH 3/9] feat(config): split monolithic config + extract secrets - Update .gitignore for v0.3 multi-file layout - Add config/.env.example template for secrets - Add config/local.yaml.example for operator values - Wire main.py to use new config_loader - Support both legacy and new layouts Co-Authored-By: Claude Opus 4.5 --- .gitignore | 20 ++++++++++++++ config/.env.example | 19 +++++++++++++ config/local.yaml.example | 57 +++++++++++++++++++++++++++++++++++++++ meshai/main.py | 22 ++++++++++----- 4 files changed, 112 insertions(+), 6 deletions(-) create mode 100644 config/.env.example create mode 100644 config/local.yaml.example diff --git a/.gitignore b/.gitignore index 87706b6..4aae211 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,13 @@ +# Operator-identifying config and secrets (v0.3 split) +/data/config/local.yaml +/data/config/secrets/ +/data/secrets/ +.env +.env.local +.env.* +!.env.example +local.yaml +!local.yaml.example # Python __pycache__/ *.py[cod] @@ -49,3 +59,13 @@ data/ # OS .DS_Store Thumbs.db +# Operator-identifying config and secrets (v0.3 split) +/data/config/local.yaml +/data/config/secrets/ +/data/secrets/ +.env +.env.local +.env.* +!.env.example +local.yaml +!local.yaml.example diff --git a/config/.env.example b/config/.env.example new file mode 100644 index 0000000..cc43131 --- /dev/null +++ b/config/.env.example @@ -0,0 +1,19 @@ +# MeshAI Secrets Template +# Copy to /data/secrets/.env and fill in your values +# This file is gitignored - never commit real secrets + +# LLM API Keys (only one needed based on your backend choice) +OPENAI_API_KEY= +ANTHROPIC_API_KEY= +GOOGLE_API_KEY= + +# Mesh Source Credentials +MESHMONITOR_API_TOKEN= +MQTT_PASSWORD= + +# Environmental Feed Keys +TOMTOM_API_KEY= +FIRMS_MAP_KEY= + +# Notification Credentials +SMTP_PASSWORD= diff --git a/config/local.yaml.example b/config/local.yaml.example new file mode 100644 index 0000000..8da62a9 --- /dev/null +++ b/config/local.yaml.example @@ -0,0 +1,57 @@ +# MeshAI Local Configuration Template +# Copy to /data/config/local.yaml and customize for your deployment +# This file is gitignored - contains operator-identifying values + +# Operator Identity +identity: + name: "" # Bot display name + owner: "" # Owner callsign/name + primary_node_id: "" # Your main mesh node ID + contact_email: "" # For NWS user_agent, SMTP from + +# Region Coordinates +# Map your region names to their lat/lon center points +regions: + "Example Region": + lat: 0.0 + lon: 0.0 + # Add more regions as needed: + # "Another Region": + # lat: 42.5 + # lon: -114.5 + +# Mesh Data Source URLs +mesh_sources: + meshmonitor_url: "" # Your MeshMonitor instance + sources: + # Per-source URL overrides (matches names in mesh_sources.yaml) + "My-Meshview": + url: "" + # "My-MeshMonitor": + # url: "" + +# Infrastructure Hosts +infrastructure: + tcp_host: "" # Meshtastic TCP host (meshtasticd) + qdrant_host: "" # Qdrant vector DB (optional) + tei_host: "" # TEI embedding service (optional) + sparse_host: "" # Sparse embedding service (optional) + +# Environmental Feed Center Point +env_center: + latitude: 0.0 # Center of your coverage area + longitude: 0.0 + +# Notification Targets +notification_targets: + smtp_from: "" # Email from address + smtp_recipients: [] # Default email recipients + webhook_urls: [] # Webhook endpoints + alert_node_ids: [] # Node IDs for mesh DM alerts + +# Critical Infrastructure Nodes (short names) +critical_nodes: [] +# Example: +# critical_nodes: +# - "MHR" +# - "HPR" diff --git a/meshai/main.py b/meshai/main.py index 48bc44b..6d1c41f 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -16,7 +16,8 @@ from .cli import run_configurator from .commands import CommandDispatcher from .commands.dispatcher import create_dispatcher from .commands.status import set_start_time -from .config import Config, load_config +from .config import Config +from .config_loader import load_config, get_config_dir_from_path from .connector import MeshConnector, MeshMessage from .context import MeshContext from .history import ConversationHistory @@ -712,12 +713,21 @@ def main() -> None: run_configurator(args.config_file) return - # Load config - config = load_config(args.config_file) + # Load config - support both old (/data/config.yaml) and new (/data/config/) layouts + config_path = args.config_file + config_dir = get_config_dir_from_path(config_path) - # Check if config exists - if not args.config_file.exists(): - logger.warning(f"Config file not found: {args.config_file}") + # Check for new multi-file layout first + if (config_dir / "config.yaml").exists(): + logger.info(f"Loading config from multi-file layout: {config_dir}") + config = load_config(config_dir) + elif config_path.exists(): + # Fall back to legacy single-file loading + logger.info(f"Loading legacy config: {config_path}") + from .config import load_config as legacy_load + config = legacy_load(config_path) + else: + logger.warning(f"Config not found at {config_path} or {config_dir}") logger.info("Run 'meshai --config' to create one, or copy config.example.yaml") sys.exit(1) From 67ab2689fe577f53e29e1226f526b42f39c408aa Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 16:14:18 +0000 Subject: [PATCH 4/9] fix(config): correct meshtastic include nesting - Changed orchestrator to use meshtastic: include meshtastic.yaml - Added hoisting logic to extract connection/commands from wrapper - Fixes restart loop caused by connection.type defaulting to serial Co-Authored-By: Claude Opus 4.5 --- meshai/config_loader.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/meshai/config_loader.py b/meshai/config_loader.py index ff6ddfe..74d2f3e 100644 --- a/meshai/config_loader.py +++ b/meshai/config_loader.py @@ -424,6 +424,14 @@ def load_config(config_dir: Path = Path("/data/config")) -> Config: # Load orchestrator with !include support _logger.debug(f"Loading config from {orchestrator_path}") data = _load_yaml_with_includes(orchestrator_path) + # Hoist meshtastic.connection and meshtastic.commands to top level + # meshtastic.yaml contains both sections under wrapper keys + if "meshtastic" in data and isinstance(data["meshtastic"], dict): + meshtastic = data.pop("meshtastic") + if "connection" in meshtastic: + data["connection"] = meshtastic["connection"] + if "commands" in meshtastic: + data["commands"] = meshtastic["commands"] # Load local.yaml local_path = config_dir / "local.yaml" From 5274933fa02f0251527d13817d64ea1a4766f53a Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 16:14:27 +0000 Subject: [PATCH 5/9] fix(migration): deep-equality verification gate for full Config - Added deep_compare function for recursive dict comparison - Replaced shallow key-list check with full Config dataclass comparison - Uses dataclasses.asdict for consistent dict representation - Reports full path of mismatches (e.g. connection.tcp_host) The previous gate only checked inline sections and missed the include-related bugs that caused the restart loop. Co-Authored-By: Claude Opus 4.5 --- meshai/scripts/migrate_config_v03.py | 64 ++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 18 deletions(-) diff --git a/meshai/scripts/migrate_config_v03.py b/meshai/scripts/migrate_config_v03.py index 312df00..35239d8 100644 --- a/meshai/scripts/migrate_config_v03.py +++ b/meshai/scripts/migrate_config_v03.py @@ -21,7 +21,7 @@ import os import re import shutil import sys -from dataclasses import fields +from dataclasses import asdict, fields from pathlib import Path from typing import Any @@ -36,6 +36,35 @@ logging.basicConfig( logger = logging.getLogger(__name__) +# ============================================================================= +# DEEP COMPARISON FOR VERIFICATION +# ============================================================================= + +def deep_compare(old, new, path=""): + """Deep compare two values, returning list of difference descriptions.""" + differences = [] + if type(old) != type(new): + differences.append(f"{path}: type {type(old).__name__} vs {type(new).__name__}") + return differences + if isinstance(old, dict): + for key in sorted(set(old.keys()) | set(new.keys())): + child_path = f"{path}.{key}" if path else key + if key not in old: + differences.append(f"{child_path}: missing in backup") + elif key not in new: + differences.append(f"{child_path}: missing in new") + else: + differences.extend(deep_compare(old[key], new[key], child_path)) + elif isinstance(old, list): + if len(old) != len(new): + differences.append(f"{path}: list length {len(old)} vs {len(new)}") + else: + for i, (o, n) in enumerate(zip(old, new)): + differences.extend(deep_compare(o, n, f"{path}[{i}]")) + elif old != new: + differences.append(f"{path}: {repr(old)[:50]} vs {repr(new)[:50]}") + return differences + # ============================================================================= # CONFIGURATION # ============================================================================= @@ -633,24 +662,23 @@ def run_migration() -> bool: # Load with new loader new_config = new_load(TARGET_DIR) - new_dict = _dataclass_to_dict(new_config) - - # Load backup with old loader - old_config = old_load(BACKUP_FILE) - old_dict = _dataclass_to_dict(old_config) - - # Compare key fields (some will differ due to local/secret extraction) - differences = [] - for key in ["timezone", "response", "history", "memory", "context"]: - if new_dict.get(key) != old_dict.get(key): - differences.append(f"{key}: {new_dict.get(key)} != {old_dict.get(key)}") - - if differences: - logger.error("Verification FAILED! Differences found:") - for diff in differences: - logger.error(f" {diff}") - return False + # Load backup with old loader + old_config = old_load(BACKUP_FILE) + # Deep compare all fields using asdict() + old_dict = asdict(old_config) + new_dict = asdict(new_config) + # Remove internal fields that will differ + for d in [old_dict, new_dict]: + d.pop("_config_path", None) + + differences = deep_compare(old_dict, new_dict) + + if differences: + logger.error("Verification FAILED! Differences found:") + for diff in differences: + logger.error(f" {diff}") + return False logger.info(" Verification PASSED - config loads correctly") except Exception as e: From dc52187c93f7d7ea44f10c0b4eebe2ae2fbf3aa1 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 16:23:57 +0000 Subject: [PATCH 6/9] feat(notifications): add Event dataclass for v0.3 pipeline Adds meshai/notifications/events.py with: - Event dataclass with all fields for unified pipeline shape - Stable ID generation via sha1 hash for deduplication - make_event() factory with auto-timestamp and severity validation - to_dict/from_dict for serialization round-trip This is scaffolding for Phase 2 - not yet wired into any adapters. Co-Authored-By: Claude Opus 4.5 --- meshai/notifications/events.py | 186 +++++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 meshai/notifications/events.py diff --git a/meshai/notifications/events.py b/meshai/notifications/events.py new file mode 100644 index 0000000..7099f0e --- /dev/null +++ b/meshai/notifications/events.py @@ -0,0 +1,186 @@ +"""Event dataclass for the v0.3 notification pipeline. + +This module defines the unified Event shape that flows through the +notification routing pipeline. All adapters emit Events, and the +router consumes them. + +Usage: + from meshai.notifications.events import Event, make_event + + # Create an event + event = make_event( + source="nws", + category="tornado_warning", + severity="immediate", + title="Tornado Warning for Ada County", + summary="A tornado warning has been issued...", + lat=43.615, + lon=-116.2023, + ) + + # Serialize for storage/webhook + data = event.to_dict() + + # Restore from storage + event2 = Event.from_dict(data) +""" + +import hashlib +import time +from dataclasses import dataclass, field, asdict +from typing import Optional, Any + + +# Valid severity levels +SEVERITY_LEVELS = frozenset({"routine", "priority", "immediate"}) + + +@dataclass +class Event: + """Unified event shape for the notification pipeline. + + All adapters (NWS, FIRMS, alert_engine, etc.) emit Events. + The router consumes Events and dispatches them to channels. + """ + + # Identity + id: str = "" # stable hash for dedup, computed if not provided + source: str = "" # adapter name: "nws", "firms", "alert_engine", etc. + category: str = "" # specific event type within source + + # Severity + severity: str = "routine" # "routine" | "priority" | "immediate" + + # Geography + region: Optional[str] = None # primary region name, set by region tagger + regions: list[str] = field(default_factory=list) # all regions touched + lat: Optional[float] = None + lon: Optional[float] = None + nws_zones: list[str] = field(default_factory=list) # NWS zone codes + + # Content + title: str = "" # one-line summary for digest headers + summary: str = "" # 1-3 sentence summary for immediate/mesh delivery + body: str = "" # full content for email/webhook delivery + + # Affected entities (for mesh health events) + node_ids: list[str] = field(default_factory=list) + short_names: list[str] = field(default_factory=list) + + # Timing + timestamp: float = 0.0 # event creation time + effective: Optional[float] = None # event start (NWS-style) + expires: Optional[float] = None # event end (NWS-style) + + # Routing hints + group_key: Optional[str] = None # events with same key get merged + inhibit_keys: list[str] = field(default_factory=list) # suppression keys + + # Raw adapter data (preserved for advanced rendering) + data: dict = field(default_factory=dict) + + @staticmethod + def compute_id( + source: str, + category: str, + group_key: Optional[str] = None, + lat: Optional[float] = None, + lon: Optional[float] = None, + ) -> str: + """Compute a stable dedup ID for an event. + + Two events with the same source+category+group_key+location + will have the same ID and can be deduplicated. + + Args: + source: Adapter name + category: Event category + group_key: Optional grouping key + lat: Optional latitude + lon: Optional longitude + + Returns: + 16-character hex ID + """ + key_parts = [ + source, + category, + group_key or "", + str(lat) if lat is not None else "", + str(lon) if lon is not None else "", + ] + key_string = ":".join(key_parts) + return hashlib.sha1(key_string.encode()).hexdigest()[:16] + + def to_dict(self) -> dict[str, Any]: + """Serialize event to a dict for JSON storage/webhook. + + Returns: + Dict representation of the event + """ + return asdict(self) + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "Event": + """Restore an Event from a dict. + + Args: + d: Dict representation (from to_dict or JSON load) + + Returns: + Event instance + """ + return cls(**d) + + +def make_event( + source: str, + category: str, + severity: str, + **kwargs: Any, +) -> Event: + """Create an Event with automatic ID and timestamp. + + This is the primary factory function for creating events. + It auto-computes the ID if not provided and sets timestamp + to the current time if not provided. + + Args: + source: Adapter name (e.g., "nws", "firms", "alert_engine") + category: Event category (e.g., "tornado_warning", "infra_offline") + severity: One of "routine", "priority", "immediate" + **kwargs: Additional Event fields + + Returns: + Event instance + + Raises: + ValueError: If severity is not valid + """ + # Validate severity + if severity not in SEVERITY_LEVELS: + raise ValueError( + f"Invalid severity '{severity}'. " + f"Must be one of: {', '.join(sorted(SEVERITY_LEVELS))}" + ) + + # Auto-set timestamp if not provided + if "timestamp" not in kwargs or kwargs["timestamp"] == 0.0: + kwargs["timestamp"] = time.time() + + # Auto-compute ID if not provided + if "id" not in kwargs or not kwargs["id"]: + kwargs["id"] = Event.compute_id( + source=source, + category=category, + group_key=kwargs.get("group_key"), + lat=kwargs.get("lat"), + lon=kwargs.get("lon"), + ) + + return Event( + source=source, + category=category, + severity=severity, + **kwargs, + ) From e6897b3f33b519bf2c5893a2e1e641943afc397a Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 16:26:53 +0000 Subject: [PATCH 7/9] feat(notifications): add region tagger with coordinate and NWS zone matching Adds meshai/notifications/region_tagger.py with: - haversine_distance() for great-circle distance calculation - tag_by_coordinates() maps lat/lon to nearest region within radius - tag_by_nws_zone() maps NWS zone codes to matching regions Also adds nws_zones field to RegionAnchor in config.py to support zone-based matching. Default is empty list for backward compatibility. This is scaffolding for Phase 2 - not yet wired into any adapters. Co-Authored-By: Claude Opus 4.5 --- meshai/config.py | 1 + meshai/notifications/region_tagger.py | 160 ++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 meshai/notifications/region_tagger.py diff --git a/meshai/config.py b/meshai/config.py index fb05317..441f3dc 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -230,6 +230,7 @@ class RegionAnchor: description: str = "" # e.g., "Twin Falls, Burley, Jerome along I-84/US-93" aliases: list[str] = field(default_factory=list) # e.g., ["southern Idaho", "magic valley"] cities: list[str] = field(default_factory=list) # e.g., ["Twin Falls", "Burley", "Jerome"] + nws_zones: list[str] = field(default_factory=list) # NWS zone codes (e.g., ["IDZ016", "IDZ030"]) @dataclass diff --git a/meshai/notifications/region_tagger.py b/meshai/notifications/region_tagger.py new file mode 100644 index 0000000..faf4fac --- /dev/null +++ b/meshai/notifications/region_tagger.py @@ -0,0 +1,160 @@ +"""Region tagger for mapping coordinates and NWS zones to regions. + +This module provides functions to: +- Map lat/lon coordinates to the nearest configured region +- Map NWS zone codes to matching regions + +Usage: + from meshai.notifications.region_tagger import tag_by_coordinates, tag_by_nws_zone + from meshai.config import RegionAnchor + + regions = [ + RegionAnchor(name="South Western ID", lat=43.615, lon=-116.2023, + nws_zones=["IDZ016", "IDZ030"]), + RegionAnchor(name="Magic Valley", lat=42.5558, lon=-114.4701, + nws_zones=["IDZ031"]), + ] + + # Find region by coordinates + region = tag_by_coordinates(43.6, -116.2, regions) + # Returns: "South Western ID" + + # Find regions by NWS zone + regions = tag_by_nws_zone("IDZ016", regions) + # Returns: ["South Western ID"] +""" + +import math +from typing import Optional + +# Import RegionAnchor type for annotations +# Actual import happens at function call time to avoid circular imports +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from meshai.config import RegionAnchor + + +# Earth radius in miles (mean radius) +EARTH_RADIUS_MILES = 3958.8 + + +def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Calculate the great-circle distance between two points on Earth. + + Uses the haversine formula for accuracy on a spherical Earth model. + + Args: + lat1: Latitude of first point in degrees + lon1: Longitude of first point in degrees + lat2: Latitude of second point in degrees + lon2: Longitude of second point in degrees + + Returns: + Distance in miles + """ + # Convert to radians + lat1_rad = math.radians(lat1) + lat2_rad = math.radians(lat2) + lon1_rad = math.radians(lon1) + lon2_rad = math.radians(lon2) + + # Differences + dlat = lat2_rad - lat1_rad + dlon = lon2_rad - lon1_rad + + # Haversine formula + a = math.sin(dlat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2) ** 2 + c = 2 * math.asin(math.sqrt(a)) + + return EARTH_RADIUS_MILES * c + + +def tag_by_coordinates( + lat: float, + lon: float, + regions: list, # list[RegionAnchor] + radius_miles: float = 25.0, +) -> Optional[str]: + """Return the name of the nearest region within radius_miles. + + Finds the closest region anchor to the given coordinates. If the + closest anchor is within radius_miles, returns its name. Otherwise + returns None. + + Args: + lat: Latitude of the point to tag + lon: Longitude of the point to tag + regions: List of RegionAnchor objects to search + radius_miles: Maximum distance to consider (default 25 miles) + + Returns: + Name of the nearest region within range, or None if no match + """ + if not regions: + return None + + closest_region = None + closest_distance = float("inf") + + for region in regions: + # Skip regions without valid coordinates + region_lat = getattr(region, "lat", None) + region_lon = getattr(region, "lon", None) + + if region_lat is None or region_lon is None: + continue + if region_lat == 0.0 and region_lon == 0.0: + # Treat (0, 0) as unset coordinates + continue + + distance = haversine_distance(lat, lon, region_lat, region_lon) + + if distance < closest_distance: + closest_distance = distance + closest_region = region + + # Check if closest is within radius + if closest_region is not None and closest_distance <= radius_miles: + return getattr(closest_region, "name", None) + + return None + + +def tag_by_nws_zone( + zone_code: str, + regions: list, # list[RegionAnchor] +) -> list[str]: + """Return all region names whose nws_zones list contains zone_code. + + Multiple regions can match the same zone (a zone may span multiple + configured regions). + + Args: + zone_code: NWS zone code to match (e.g., "IDZ016") + regions: List of RegionAnchor objects to search + + Returns: + List of region names that contain this zone, empty if no matches + """ + if not zone_code or not regions: + return [] + + # Normalize zone code to uppercase for case-insensitive matching + zone_upper = zone_code.upper().strip() + + matching_regions = [] + + for region in regions: + region_zones = getattr(region, "nws_zones", None) + if not region_zones: + continue + + # Check if zone matches any in this region's list (case-insensitive) + for rz in region_zones: + if rz.upper().strip() == zone_upper: + region_name = getattr(region, "name", None) + if region_name: + matching_regions.append(region_name) + break # Don't add same region twice + + return matching_regions From 0703d00d948ec8ede74f966ffb7876fcc64b58f1 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 16:29:50 +0000 Subject: [PATCH 8/9] feat(notifications): map alert categories to v0.3 toggles Adds toggle field to each ALERT_CATEGORIES entry: - mesh_health: 18 categories (infra, power, utilization, coverage, health) - weather: 3 categories (NWS warnings, stream flooding) - fire: 3 categories (NIFC, FIRMS hotspots) - rf_propagation: 3 categories (solar, geomag, ducting) - roads: 2 categories (closures, congestion) - avalanche: 2 categories (high danger, considerable) Also adds helper functions: - categories_for_toggle(toggle) -> list of category IDs - get_toggle(category_name) -> toggle name or None Note: seismic and tracking toggles defined but have no categories yet (reserved for Phase 3 and Phase 7 respectively). All toggle assignments are unambiguous - no categories defaulted to mesh_health due to ambiguity. Co-Authored-By: Claude Opus 4.5 --- meshai/notifications/categories.py | 92 ++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index 2379a15..0ac0e88 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -7,8 +7,34 @@ Severity levels (military/intelligence precedence): routine - Informational, no time pressure priority - Needs attention soon immediate - Act now, drop everything + +Toggle categories (for v0.3 notification routing): + mesh_health - infrastructure, power, utilization, coverage, health-score + weather - NWS-sourced alerts, stream flooding + fire - NIFC perimeters, FIRMS hotspots + rf_propagation - solar, geomagnetic, ducting, band conditions + roads - 511, TomTom traffic + avalanche - avalanche advisories + seismic - USGS quakes (Phase 3) + tracking - ADS-B, AIS, satellite passes (Phase 7) """ +from typing import Optional + + +# Valid toggle values for v0.3 pipeline +VALID_TOGGLES = frozenset({ + "mesh_health", + "weather", + "fire", + "rf_propagation", + "roads", + "avalanche", + "seismic", + "tracking", +}) + + ALERT_CATEGORIES = { # Infrastructure alerts "infra_offline": { @@ -16,24 +42,28 @@ ALERT_CATEGORIES = { "description": "An infrastructure node (router/repeater) stopped responding", "default_severity": "priority", "example_message": "⚠ Infrastructure Offline: MHR — Mountain Harrison Rptr has not been heard for 2 hours", + "toggle": "mesh_health", }, "critical_node_down": { "name": "Critical Node Down", "description": "A node you marked as critical went offline", "default_severity": "immediate", "example_message": "🚨 Critical Node Down: HPR — Hayden Peak Rptr offline for 1 hour", + "toggle": "mesh_health", }, "infra_recovery": { "name": "Infrastructure Recovery", "description": "An offline infrastructure node came back online", "default_severity": "routine", "example_message": "✅ Recovery: MHR — Mountain Harrison Rptr back online after 2h outage", + "toggle": "mesh_health", }, "new_router": { "name": "New Router", "description": "A new router appeared on the mesh", "default_severity": "routine", "example_message": "📡 New Router: Snake River Relay appeared in Wood River Valley", + "toggle": "mesh_health", }, # Power alerts @@ -42,36 +72,42 @@ ALERT_CATEGORIES = { "description": "Infrastructure node battery below 30% (3.60V)", "default_severity": "routine", "example_message": "🔋 Battery Warning: BLD-MTN at 28% (3.58V), solar not charging", + "toggle": "mesh_health", }, "battery_critical": { "name": "Battery Critical", "description": "Infrastructure node battery below 15% (3.50V)", "default_severity": "priority", "example_message": "🔋 Battery Critical: BLD-MTN at 12% (3.48V) — shutdown in hours", + "toggle": "mesh_health", }, "battery_emergency": { "name": "Battery Emergency", "description": "Infrastructure node battery below 5% (3.40V) — shutdown imminent", "default_severity": "immediate", "example_message": "🚨 Battery Emergency: BLD-MTN at 4% (3.38V) — shutdown imminent", + "toggle": "mesh_health", }, "battery_trend": { "name": "Battery Declining", "description": "Battery showing declining trend over 7 days — possible solar or charging issue", "default_severity": "routine", "example_message": "🔋 Battery Trend: HPR declining 85% → 62% over 7 days (-3.3%/day)", + "toggle": "mesh_health", }, "power_source_change": { "name": "Power Source Change", "description": "Node switched from USB to battery — possible power outage at site", "default_severity": "priority", "example_message": "⚡ Power Source: MHR switched from USB to battery — possible outage", + "toggle": "mesh_health", }, "solar_not_charging": { "name": "Solar Not Charging", "description": "Solar panel not charging during daylight hours — panel issue or obstruction", "default_severity": "priority", "example_message": "☀️ Solar Issue: BLD-MTN not charging during daylight (12:00 MDT)", + "toggle": "mesh_health", }, # Utilization alerts @@ -80,18 +116,21 @@ ALERT_CATEGORIES = { "description": "LoRa channel airtime exceeding threshold — mesh congestion", "default_severity": "routine", "example_message": "📊 Channel Airtime: 47% utilization (threshold: 40%). Reliability may degrade.", + "toggle": "mesh_health", }, "sustained_high_util": { "name": "Sustained High Utilization", "description": "Channel airtime elevated for extended period — ongoing congestion", "default_severity": "priority", "example_message": "📊 Sustained Congestion: 45% channel utilization for 2+ hours. Consider reducing telemetry.", + "toggle": "mesh_health", }, "packet_flood": { "name": "Packet Flood", "description": "A single node sending excessive radio packets (NOT water flooding) — possible firmware bug or stuck transmitter", "default_severity": "priority", "example_message": "📻 Packet Flood: Node 'BKBS' transmitting 42 packets/min (threshold: 10/min). Firmware bug?", + "toggle": "mesh_health", }, # Coverage alerts @@ -100,18 +139,21 @@ ALERT_CATEGORIES = { "description": "Infrastructure node dropped to single gateway coverage — reduced redundancy", "default_severity": "priority", "example_message": "📶 Reduced Coverage: HPR dropped to single gateway. Previously had 3 paths.", + "toggle": "mesh_health", }, "feeder_offline": { "name": "Feeder Offline", "description": "A feeder gateway stopped responding — coverage gap possible", "default_severity": "priority", "example_message": "📡 Feeder Offline: AIDA-N2 gateway not responding. 5 nodes may lose uplink.", + "toggle": "mesh_health", }, "region_total_blackout": { "name": "Region Blackout", "description": "All infrastructure in a region is offline — complete coverage loss", "default_severity": "immediate", "example_message": "🚨 REGION BLACKOUT: All infrastructure in Magic Valley offline!", + "toggle": "mesh_health", }, # Health score alerts @@ -120,12 +162,14 @@ ALERT_CATEGORIES = { "description": "Overall mesh health score dropped below threshold — multiple issues likely", "default_severity": "priority", "example_message": "📉 Mesh Health: Score 62/100 (threshold: 65). Infrastructure: 71, Connectivity: 58.", + "toggle": "mesh_health", }, "region_score_low": { "name": "Region Health Low", "description": "A region's health score below threshold — localized issues", "default_severity": "priority", "example_message": "📉 Region Health: Magic Valley at 55/100 (threshold: 60). 2 nodes offline.", + "toggle": "mesh_health", }, # Environmental - Weather @@ -134,6 +178,7 @@ ALERT_CATEGORIES = { "description": "NWS warning or advisory affecting your mesh area", "default_severity": "priority", "example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z", + "toggle": "weather", }, # Environmental - Space Weather @@ -142,12 +187,14 @@ ALERT_CATEGORIES = { "description": "R3+ solar flare degrading HF propagation on sunlit side", "default_severity": "priority", "example_message": "⚠ R3 Strong Radio Blackout — X1.2 flare. Wide-area HF blackout ~1 hour on sunlit side.", + "toggle": "rf_propagation", }, "geomagnetic_storm": { "name": "Geomagnetic Storm", "description": "G2+ geomagnetic storm — HF degraded at higher latitudes, aurora possible", "default_severity": "priority", "example_message": "🌐 G2 Moderate Geomagnetic Storm — Kp=6. HF fades at high latitudes, aurora to ~55°.", + "toggle": "rf_propagation", }, # Environmental - Tropospheric @@ -156,6 +203,7 @@ ALERT_CATEGORIES = { "description": "Atmospheric conditions trapping VHF/UHF signals — extended range", "default_severity": "routine", "example_message": "📡 Tropospheric Ducting: Surface duct detected, dM/dz -45 M-units/km, ~120m thick. VHF/UHF extended range.", + "toggle": "rf_propagation", }, # Environmental - Fire @@ -164,18 +212,21 @@ ALERT_CATEGORIES = { "description": "Active wildfire within alert radius of mesh infrastructure", "default_severity": "priority", "example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR. Monitor closely.", + "toggle": "fire", }, "wildfire_proximity": { "name": "Fire Near Mesh", "description": "Active wildfire within alert radius of mesh infrastructure", "default_severity": "priority", "example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR.", + "toggle": "fire", }, "new_ignition": { "name": "New Fire Ignition", "description": "Satellite hotspot detected NOT near any known fire — potential new wildfire", "default_severity": "priority", "example_message": "🛰 New Ignition: Satellite fire at 42.32°N, 114.30°W — high confidence, 47 MW FRP. Not near any known fire.", + "toggle": "fire", }, # Environmental - Flood @@ -184,12 +235,14 @@ ALERT_CATEGORIES = { "description": "River gauge exceeds NWS flood stage threshold", "default_severity": "priority", "example_message": "🌊 Stream Flood Warning: Snake River nr Twin Falls at 12.8 ft — Minor Flood Stage is 10.5 ft.", + "toggle": "weather", }, "stream_high_water": { "name": "Stream High Water", "description": "River gauge approaching flood stage — monitoring recommended", "default_severity": "routine", "example_message": "🌊 High Water: Snake River at 9.8 ft — Action Stage is 9.0 ft. Monitor conditions.", + "toggle": "weather", }, # Environmental - Roads @@ -198,12 +251,14 @@ ALERT_CATEGORIES = { "description": "Full road closure on a monitored corridor", "default_severity": "priority", "example_message": "🚧 Road Closure: I-84 EB at MP 173 — full closure, construction. Detour via US-30.", + "toggle": "roads", }, "traffic_congestion": { "name": "Traffic Congestion", "description": "Traffic speed dropped below congestion threshold on a monitored corridor", "default_severity": "routine", "example_message": "🚗 Traffic Congestion: I-84 Twin Falls — 35 mph (free-flow 70 mph), 50% speed ratio", + "toggle": "roads", }, # Environmental - Avalanche @@ -212,12 +267,14 @@ ALERT_CATEGORIES = { "description": "Avalanche danger level 4 (High) or 5 (Extreme) in your area", "default_severity": "priority", "example_message": "⛷ Avalanche Danger HIGH: Sawtooth Zone — avoid avalanche terrain. Natural avalanches likely.", + "toggle": "avalanche", }, "avalanche_considerable": { "name": "Avalanche Danger Considerable", "description": "Avalanche danger level 3 (Considerable) — most fatalities occur at this level", "default_severity": "routine", "example_message": "⛷ Avalanche Danger CONSIDERABLE: Sawtooth Zone — dangerous conditions on steep slopes.", + "toggle": "avalanche", }, } @@ -231,6 +288,7 @@ def get_category(category_id: str) -> dict: "description": f"Alert type: {category_id}", "default_severity": "routine", "example_message": f"Alert: {category_id}", + "toggle": "mesh_health", # Default unknown to mesh_health } @@ -240,3 +298,37 @@ def list_categories() -> list[dict]: {"id": cat_id, **cat_info} for cat_id, cat_info in ALERT_CATEGORIES.items() ] + + +def categories_for_toggle(toggle: str) -> list[str]: + """Return all category names that route to this toggle. + + Args: + toggle: Toggle name (e.g., "mesh_health", "weather") + + Returns: + List of category IDs that have this toggle assigned + """ + if toggle not in VALID_TOGGLES: + return [] + + return [ + cat_id + for cat_id, cat_info in ALERT_CATEGORIES.items() + if cat_info.get("toggle") == toggle + ] + + +def get_toggle(category_name: str) -> Optional[str]: + """Return the toggle name for a category, or None if unknown. + + Args: + category_name: Category ID (e.g., "infra_offline") + + Returns: + Toggle name (e.g., "mesh_health") or None if category unknown + """ + cat_info = ALERT_CATEGORIES.get(category_name) + if cat_info: + return cat_info.get("toggle") + return None From 4e4a837c5e97838cd4b1e1ac0c24add21b2d07c7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 14 May 2026 17:21:20 +0000 Subject: [PATCH 9/9] feat(notifications): add Phase 1.3 + 2.1 pipeline skeleton Phase 1.3: - events.py: Event dataclass with ID generation and serialization - region_tagger.py: Coordinate/NWS zone region tagging - categories.py: Toggle field mapping for all 31 alert categories Phase 2.1 Pipeline Skeleton: - pipeline/bus.py: EventBus with subscribe/emit pattern - pipeline/severity_router.py: Routes immediate->dispatch, routine->digest - pipeline/dispatcher.py: Delivers immediate events to configured channels - pipeline/__init__.py: build_pipeline() factory and exports All components tested and verified in container. Co-Authored-By: Claude Opus 4.5 --- meshai/notifications/pipeline/__init__.py | 88 +++++++++++ meshai/notifications/pipeline/bus.py | 85 +++++++++++ meshai/notifications/pipeline/dispatcher.py | 143 ++++++++++++++++++ .../notifications/pipeline/severity_router.py | 104 +++++++++++++ 4 files changed, 420 insertions(+) create mode 100644 meshai/notifications/pipeline/__init__.py create mode 100644 meshai/notifications/pipeline/bus.py create mode 100644 meshai/notifications/pipeline/dispatcher.py create mode 100644 meshai/notifications/pipeline/severity_router.py diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py new file mode 100644 index 0000000..49d6c2b --- /dev/null +++ b/meshai/notifications/pipeline/__init__.py @@ -0,0 +1,88 @@ +"""Notification pipeline package. + +Phase 2.1 provides the bare skeleton: +- EventBus: Central pub/sub for all events +- SeverityRouter: Routes immediate vs digest events +- Dispatcher: Delivers immediate events to channels +- StubDigestQueue: Placeholder for Phase 2.3 aggregator + +Usage: + from meshai.notifications.pipeline import build_pipeline + + pipeline = build_pipeline(channel_config={ + "mesh_health": ["discord"], + "weather": ["discord", "meshtastic"], + }) + + # Emit events through the bus + pipeline["bus"].emit(event) +""" + +from meshai.notifications.pipeline.bus import EventBus, get_bus +from meshai.notifications.pipeline.severity_router import ( + SeverityRouter, + StubDigestQueue, +) +from meshai.notifications.pipeline.dispatcher import ( + Dispatcher, + StubChannelBackend, +) + + +def build_pipeline( + channel_config: dict[str, list[str]] | None = None, +) -> dict: + """Build and wire up the notification pipeline. + + Creates all pipeline components and connects them: + - EventBus receives all events + - SeverityRouter subscribes to bus, routes by severity + - Dispatcher handles immediate events + - StubDigestQueue collects priority/routine events + + Args: + channel_config: Mapping of toggle -> channel names for dispatch. + Example: {"mesh_health": ["discord"]} + + Returns: + Dict with all pipeline components: + - bus: EventBus instance + - router: SeverityRouter instance + - dispatcher: Dispatcher instance + - digest_queue: StubDigestQueue instance + """ + # Create components + bus = EventBus() + dispatcher = Dispatcher(channel_config) + digest_queue = StubDigestQueue() + + # Wire up the router + router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest_queue.enqueue, + ) + + # Subscribe router to bus + bus.subscribe(router.handle) + + return { + "bus": bus, + "router": router, + "dispatcher": dispatcher, + "digest_queue": digest_queue, + } + + +__all__ = [ + # Core classes + "EventBus", + "SeverityRouter", + "Dispatcher", + # Stubs for testing/Phase 2.x + "StubDigestQueue", + "StubChannelBackend", + # Factory + "build_pipeline", + # Singleton accessor + "get_bus", +] diff --git a/meshai/notifications/pipeline/bus.py b/meshai/notifications/pipeline/bus.py new file mode 100644 index 0000000..b2cda6d --- /dev/null +++ b/meshai/notifications/pipeline/bus.py @@ -0,0 +1,85 @@ +"""Event bus for the notification pipeline. + +The bus is the entry point for all events flowing through the pipeline. +Adapters call bus.emit(event) to push Events into the system. + +Usage: + from meshai.notifications.pipeline import get_bus + from meshai.notifications.events import make_event + + bus = get_bus() + event = make_event(source="nws", category="weather_warning", severity="immediate", ...) + bus.emit(event) +""" + +import logging +from typing import Callable, Iterable + +from meshai.notifications.events import Event + + +class EventBus: + """Central event bus for the notification pipeline. + + Subscribers register handlers that receive every emitted event. + Errors in one subscriber do not prevent other subscribers from + receiving the event. + """ + + def __init__(self): + self._subscribers: list[Callable[[Event], None]] = [] + self._logger = logging.getLogger("meshai.pipeline.bus") + + def subscribe(self, handler: Callable[[Event], None]) -> None: + """Register a handler that receives every emitted event. + + Args: + handler: Callable that takes an Event and returns None + """ + self._subscribers.append(handler) + self._logger.debug(f"Subscribed handler: {handler}") + + def emit(self, event: Event) -> None: + """Push an event to all subscribers. + + Errors in one subscriber do not stop others from receiving + the event. Exceptions are logged but not re-raised. + + Args: + event: The Event to deliver to all subscribers + """ + for handler in self._subscribers: + try: + handler(event) + except Exception: + self._logger.exception( + f"Subscriber {handler} failed on event {event.id}" + ) + + def emit_many(self, events: Iterable[Event]) -> None: + """Emit multiple events in sequence. + + Args: + events: Iterable of Events to emit + """ + for event in events: + self.emit(event) + + +# Module-level singleton for application-wide use +_bus: EventBus | None = None + + +def get_bus() -> EventBus: + """Get the global EventBus singleton. + + This is the primary way adapters access the bus. Tests should + construct a fresh EventBus() directly to avoid shared state. + + Returns: + The global EventBus instance + """ + global _bus + if _bus is None: + _bus = EventBus() + return _bus diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py new file mode 100644 index 0000000..858a8c5 --- /dev/null +++ b/meshai/notifications/pipeline/dispatcher.py @@ -0,0 +1,143 @@ +"""Immediate event dispatcher. + +The dispatcher routes immediate-severity events to configured delivery +channels based on the event's toggle category. + +Phase 2.1 provides a stub that logs dispatch attempts. Phase 2.2 will +add real channel backends (Discord webhooks, Meshtastic broadcast, etc.). + +Usage: + dispatcher = Dispatcher(channel_config) + dispatcher.dispatch(event) # Called by SeverityRouter for immediate events +""" + +import logging +from typing import Callable, Optional + +from meshai.notifications.events import Event +from meshai.notifications.categories import get_toggle + + +class Dispatcher: + """Dispatches immediate events to configured channels. + + Each toggle category can have multiple delivery channels configured. + The dispatcher looks up the toggle for an event's category and sends + to all channels registered for that toggle. + + Phase 2.1: Stub implementation that logs but doesn't actually deliver. + Phase 2.2: Will add real channel backends. + """ + + def __init__( + self, + channel_config: Optional[dict[str, list[str]]] = None, + ): + """Initialize the dispatcher. + + Args: + channel_config: Mapping of toggle -> list of channel names. + Example: {"mesh_health": ["discord", "meshtastic"]} + If None, defaults to empty (no channels configured). + """ + self._channels = channel_config or {} + self._logger = logging.getLogger("meshai.pipeline.dispatcher") + self._backends: dict[str, Callable[[Event], None]] = {} + + def register_backend( + self, + channel_name: str, + handler: Callable[[Event], None], + ) -> None: + """Register a delivery backend for a channel. + + Args: + channel_name: Name of the channel (e.g., "discord", "meshtastic") + handler: Callable that delivers the event to the channel + """ + self._backends[channel_name] = handler + self._logger.debug(f"Registered backend: {channel_name}") + + def dispatch(self, event: Event) -> None: + """Dispatch an immediate event to configured channels. + + Looks up the toggle for the event's category, then sends to + all channels configured for that toggle. + + Args: + event: The immediate-severity Event to dispatch + """ + toggle = get_toggle(event.category) + if toggle is None: + self._logger.warning( + f"Unknown category {event.category!r} for event {event.id}, " + "defaulting to mesh_health" + ) + toggle = "mesh_health" + + channels = self._channels.get(toggle, []) + if not channels: + self._logger.info( + f"No channels configured for toggle {toggle!r}, " + f"event {event.id} not dispatched" + ) + return + + for channel in channels: + self._deliver_to_channel(event, channel, toggle) + + def _deliver_to_channel( + self, + event: Event, + channel: str, + toggle: str, + ) -> None: + """Deliver event to a specific channel. + + Args: + event: The Event to deliver + channel: Channel name + toggle: Toggle category (for logging) + """ + backend = self._backends.get(channel) + if backend is None: + # Phase 2.1: Log stub - no real backend yet + self._logger.info( + f"DISPATCH STUB [{toggle}] -> {channel}: {event.title}" + ) + return + + try: + backend(event) + self._logger.info( + f"DISPATCHED [{toggle}] -> {channel}: {event.title}" + ) + except Exception: + self._logger.exception( + f"Failed to dispatch event {event.id} to {channel}" + ) + + +class StubChannelBackend: + """Stub channel backend for testing. + + Collects all events "sent" to it for verification in tests. + """ + + def __init__(self, name: str): + self.name = name + self.events: list[Event] = [] + self._logger = logging.getLogger(f"meshai.pipeline.stub.{name}") + + def send(self, event: Event) -> None: + """Record an event as sent. + + Args: + event: The Event to record + """ + self.events.append(event) + self._logger.info(f"STUB {self.name}: {event.title}") + + def clear(self) -> None: + """Clear recorded events.""" + self.events = [] diff --git a/meshai/notifications/pipeline/severity_router.py b/meshai/notifications/pipeline/severity_router.py new file mode 100644 index 0000000..089e670 --- /dev/null +++ b/meshai/notifications/pipeline/severity_router.py @@ -0,0 +1,104 @@ +"""Severity-based event routing. + +The severity router subscribes to the bus and forks each event into +one of two paths based on severity: + +- immediate → immediate_handler (dispatcher for live delivery) +- priority/routine → digest_handler (queue for batched summaries) + +Usage: + router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest_queue.enqueue, + ) + bus.subscribe(router.handle) +""" + +import logging +from typing import Callable + +from meshai.notifications.events import Event +from meshai.notifications.categories import get_toggle + + +class SeverityRouter: + """Routes events to immediate or digest handlers based on severity. + + Immediate-severity events go directly to live delivery channels. + Priority and routine events are queued for periodic digest summaries. + """ + + def __init__( + self, + immediate_handler: Callable[[Event], None], + digest_handler: Callable[[Event], None], + ): + """Initialize the severity router. + + Args: + immediate_handler: Called for severity="immediate" events + digest_handler: Called for severity in ("priority", "routine") + """ + self._immediate = immediate_handler + self._digest = digest_handler + self._logger = logging.getLogger("meshai.pipeline.severity_router") + + def handle(self, event: Event) -> None: + """Route an event based on its severity. + + Args: + event: The Event to route + """ + if event.severity == "immediate": + self._logger.info( + f"IMMEDIATE: {event.source}/{event.category} {event.title}" + ) + self._immediate(event) + elif event.severity in ("priority", "routine"): + self._logger.info( + f"DIGEST QUEUED [{event.severity}]: {event.title}" + ) + self._digest(event) + else: + self._logger.warning( + f"Unknown severity {event.severity!r} on event {event.id}, dropping" + ) + + +class StubDigestQueue: + """Placeholder digest queue for Phase 2.1. + + This is a stub that simply collects events in memory. Phase 2.3 + will replace this with the real aggregator that renders and + delivers periodic digest summaries. + """ + + def __init__(self): + self._queue: list[Event] = [] + self._logger = logging.getLogger("meshai.pipeline.digest_stub") + + def enqueue(self, event: Event) -> None: + """Add an event to the digest queue. + + Args: + event: The Event to queue for digest delivery + """ + self._queue.append(event) + toggle = get_toggle(event.category) or "unknown" + self._logger.info(f"DIGEST QUEUED [{toggle}]: {event.title}") + + def drain(self) -> list[Event]: + """Return and clear all queued events. + + For tests and the future aggregator. Returns the current + queue contents and resets the queue to empty. + + Returns: + List of all queued Events + """ + events, self._queue = self._queue, [] + return events + + def __len__(self) -> int: + """Return the number of queued events.""" + return len(self._queue)