diff --git a/meshai/config_loader.py b/meshai/config_loader.py new file mode 100644 index 0000000..ff6ddfe --- /dev/null +++ b/meshai/config_loader.py @@ -0,0 +1,680 @@ +"""Multi-file configuration loader for MeshAI v0.3. + +This module provides: +- !include directive support for splitting config across files +- Environment variable interpolation (${VAR_NAME} and ${VAR_NAME:-default}) +- Operator-local value merging from local.yaml +- Secret loading from .env files +- Section-aware save_section() for dashboard write-back + +The loader produces the same Config dataclass shape as config.py, +ensuring backward compatibility with all existing consumers. +""" + +import logging +import os +import re +from pathlib import Path +from typing import Any, Optional + +import yaml +from dotenv import dotenv_values + +# Import existing dataclasses - shape must NOT change +from .config import ( + Config, + _dict_to_dataclass, + _dataclass_to_dict, +) + +_logger = logging.getLogger(__name__) + +# ============================================================================= +# SECTION TO FILE MAPPING +# ============================================================================= + +SECTION_TO_FILE: dict[str, str] = { + # Inline in orchestrator config.yaml + "timezone": "config.yaml", + "bot": "config.yaml", + "response": "config.yaml", + "history": "config.yaml", + "memory": "config.yaml", + "context": "config.yaml", + "weather": "config.yaml", + "meshmonitor": "config.yaml", + "knowledge": "config.yaml", + + # Domain files + "connection": "meshtastic.yaml", + "commands": "meshtastic.yaml", + "mesh_sources": "mesh_sources.yaml", + "mesh_intelligence": "mesh_intelligence.yaml", + "environmental": "env_feeds.yaml", + "notifications": "notifications.yaml", + "llm": "llm.yaml", + "dashboard": "dashboard.yaml", +} + +# Fields that should be written to local.yaml instead of domain files +LOCAL_FIELDS: dict[str, str] = { + "bot.name": "identity.name", + "bot.owner": "identity.owner", + "connection.tcp_host": "infrastructure.tcp_host", + "knowledge.qdrant_host": "infrastructure.qdrant_host", + "knowledge.tei_host": "infrastructure.tei_host", + "knowledge.sparse_host": "infrastructure.sparse_host", + "meshmonitor.url": "mesh_sources.meshmonitor_url", + "mesh_intelligence.critical_nodes": "critical_nodes", + "environmental.ducting.latitude": "env_center.latitude", + "environmental.ducting.longitude": "env_center.longitude", +} + +# Fields that contain secrets - NEVER written, must be in .env +SECRET_FIELDS: set[str] = { + "llm.api_key", + "mesh_sources.*.api_token", + "mesh_sources.*.password", + "environmental.traffic.api_key", + "environmental.firms.map_key", + "notifications.rules.*.smtp_password", +} + +# Secret env var names expected in .env +EXPECTED_SECRETS: list[str] = [ + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "GOOGLE_API_KEY", + "MESHMONITOR_API_TOKEN", + "MQTT_PASSWORD", + "TOMTOM_API_KEY", + "FIRMS_MAP_KEY", + "SMTP_PASSWORD", +] + + +# ============================================================================= +# YAML !INCLUDE CONSTRUCTOR +# ============================================================================= + +# Global set for tracking files currently being loaded (cycle detection) +_loading_files: set[Path] = set() + + +def _make_include_loader(base_path: Path): + """Create an IncludeLoader class with the given base path.""" + + class IncludeLoader(yaml.SafeLoader): + """YAML loader with !include tag support.""" + pass + + def construct_include(loader: IncludeLoader, node: yaml.Node) -> Any: + """Handle !include directive.""" + relative_path = loader.construct_scalar(node) + include_path = (base_path / relative_path).resolve() + + # Cycle detection using global set + if include_path in _loading_files: + raise yaml.YAMLError( + f"Circular include detected: {include_path} is already being loaded. " + f"Current loading chain: {[str(p) for p in _loading_files]}" + ) + + if not include_path.exists(): + raise yaml.YAMLError( + f"Include file not found: {include_path} " + f"(referenced from {base_path / 'config.yaml'})" + ) + + _loading_files.add(include_path) + try: + with open(include_path, "r") as f: + # Recursively load with the include file's directory as new base + NestedLoader = _make_include_loader(include_path.parent) + return yaml.load(f, Loader=NestedLoader) + finally: + _loading_files.discard(include_path) + + IncludeLoader.add_constructor("!include", construct_include) + return IncludeLoader + + +def _load_yaml_with_includes(file_path: Path) -> dict: + """Load a YAML file with !include directive support.""" + global _loading_files + _loading_files.clear() # Reset cycle detection + + if not file_path.exists(): + return {} + + # Add the root file to loading set + file_path = file_path.resolve() + _loading_files.add(file_path) + + try: + with open(file_path, "r") as f: + Loader = _make_include_loader(file_path.parent) + return yaml.load(f, Loader=Loader) or {} + finally: + _loading_files.discard(file_path) + + +# ============================================================================= +# ENVIRONMENT VARIABLE INTERPOLATION +# ============================================================================= + +_ENV_PATTERN = re.compile(r"\$\{([A-Z_][A-Z0-9_]*)(?::-([^}]*))?\}") + + +def _interpolate_env_vars(value: Any, env: dict[str, str]) -> Any: + """Recursively interpolate ${VAR_NAME} and ${VAR_NAME:-default} in strings. + + Args: + value: The value to interpolate (can be string, dict, list, or other) + env: Combined environment (os.environ + .env file values) + + Returns: + The value with environment variables resolved + """ + if isinstance(value, str): + def replace_match(match): + var_name = match.group(1) + default = match.group(2) + + # os.environ takes precedence over .env file + resolved = os.environ.get(var_name) + if resolved is None: + resolved = env.get(var_name) + if resolved is None: + if default is not None: + return default + _logger.warning( + f"Environment variable ${{{var_name}}} not found and no default provided. " + "Using empty string." + ) + return "" + return resolved + + return _ENV_PATTERN.sub(replace_match, value) + + elif isinstance(value, dict): + return {k: _interpolate_env_vars(v, env) for k, v in value.items()} + + elif isinstance(value, list): + return [_interpolate_env_vars(item, env) for item in value] + + return value + + +# ============================================================================= +# LOCAL.YAML MERGING +# ============================================================================= + +def _merge_local_values(data: dict, local: dict) -> dict: + """Merge operator-local values from local.yaml into the config data. + + This handles: + - identity.name/owner -> bot.name/owner + - infrastructure.* -> connection/knowledge hosts + - regions.{name}.lat/lon -> mesh_intelligence.regions[name].lat/lon + - critical_nodes -> mesh_intelligence.critical_nodes + - mesh_sources.sources.{name}.* -> mesh_sources[name].* + - env_center.* -> environmental.ducting.* + - notification_targets.* -> notifications rules + + Args: + data: The loaded config data (will be modified in place) + local: The local.yaml data + + Returns: + The merged data dict + """ + if not local: + return data + + # Identity -> bot + identity = local.get("identity", {}) + if "bot" in data: + if identity.get("name"): + data["bot"]["name"] = identity["name"] + if identity.get("owner"): + data["bot"]["owner"] = identity["owner"] + + # Infrastructure hosts + infra = local.get("infrastructure", {}) + if infra.get("tcp_host") and "connection" in data: + data["connection"]["tcp_host"] = infra["tcp_host"] + if "knowledge" in data: + if infra.get("qdrant_host"): + data["knowledge"]["qdrant_host"] = infra["qdrant_host"] + if infra.get("tei_host"): + data["knowledge"]["tei_host"] = infra["tei_host"] + if infra.get("sparse_host"): + data["knowledge"]["sparse_host"] = infra["sparse_host"] + + # Meshmonitor URL + mesh_sources_local = local.get("mesh_sources", {}) + if mesh_sources_local.get("meshmonitor_url") and "meshmonitor" in data: + data["meshmonitor"]["url"] = mesh_sources_local["meshmonitor_url"] + + # Mesh sources URLs + sources_local = mesh_sources_local.get("sources", {}) + if "mesh_sources" in data and isinstance(data["mesh_sources"], list): + for source in data["mesh_sources"]: + if isinstance(source, dict): + source_name = source.get("name", "") + local_source = sources_local.get(source_name, {}) + if local_source.get("url"): + source["url"] = local_source["url"] + if local_source.get("host"): + source["host"] = local_source["host"] + + # Region coordinates + regions_local = local.get("regions", {}) + if "mesh_intelligence" in data: + mi = data["mesh_intelligence"] + if "regions" in mi and isinstance(mi["regions"], list): + for region in mi["regions"]: + if isinstance(region, dict): + region_name = region.get("name", "") + local_coords = regions_local.get(region_name, {}) + if "lat" in local_coords: + region["lat"] = local_coords["lat"] + if "lon" in local_coords: + region["lon"] = local_coords["lon"] + + # Critical nodes + if local.get("critical_nodes"): + mi["critical_nodes"] = local["critical_nodes"] + + # Environmental center point + env_center = local.get("env_center", {}) + if "environmental" in data: + env = data["environmental"] + if "ducting" in env: + if env_center.get("latitude") is not None: + env["ducting"]["latitude"] = env_center["latitude"] + if env_center.get("longitude") is not None: + env["ducting"]["longitude"] = env_center["longitude"] + + # NWS user agent from contact email + if identity.get("contact_email") and "nws" in env: + email = identity["contact_email"] + env["nws"]["user_agent"] = f"(meshai, {email})" + + # Notification targets + notif_targets = local.get("notification_targets", {}) + if "notifications" in data and "rules" in data["notifications"]: + alert_node_ids = notif_targets.get("alert_node_ids", []) + smtp_recipients = notif_targets.get("smtp_recipients", []) + + for rule in data["notifications"]["rules"]: + if isinstance(rule, dict): + # Apply default node_ids if not set + if rule.get("delivery_type") == "mesh_dm" and not rule.get("node_ids"): + rule["node_ids"] = alert_node_ids + # Apply default recipients if not set + if rule.get("delivery_type") == "email" and not rule.get("recipients"): + rule["recipients"] = smtp_recipients + # Apply smtp_from + if notif_targets.get("smtp_from") and not rule.get("from_address"): + rule["from_address"] = notif_targets["smtp_from"] + + return data + + +# ============================================================================= +# VALIDATION +# ============================================================================= + +def _validate_config(data: dict, local: dict, env: dict[str, str]) -> None: + """Validate config and log warnings for missing values. + + This does NOT raise errors - MeshAI starts in degraded mode with missing values. + """ + # Check regions for missing coordinates + if "mesh_intelligence" in data: + mi = data["mesh_intelligence"] + if mi.get("enabled") and "regions" in mi: + regions_local = local.get("regions", {}) if local else {} + for region in mi["regions"]: + if isinstance(region, dict): + region_name = region.get("name", "unknown") + if not region.get("lat") or not region.get("lon"): + if region_name not in regions_local: + _logger.warning( + f"Region '{region_name}' has no coordinates in local.yaml - " + "geographic features disabled for this region" + ) + + # Check for missing secrets + missing_secrets = [] + for secret in EXPECTED_SECRETS: + if not os.environ.get(secret) and not env.get(secret): + missing_secrets.append(secret) + + if missing_secrets: + _logger.warning( + f"Missing secret environment variables: {', '.join(missing_secrets)}. " + "Some features may be disabled." + ) + + # Check LLM API key + if "llm" in data: + api_key = data["llm"].get("api_key", "") + if not api_key or (api_key.startswith("${") and api_key.endswith("}")): + # It's a reference, check if resolved + backend = data["llm"].get("backend", "openai").lower() + key_var = { + "openai": "OPENAI_API_KEY", + "anthropic": "ANTHROPIC_API_KEY", + "google": "GOOGLE_API_KEY", + }.get(backend, "LLM_API_KEY") + if not os.environ.get(key_var) and not env.get(key_var): + _logger.warning( + f"LLM backend '{backend}' configured but {key_var} not found. " + "LLM responses will fail." + ) + + +# ============================================================================= +# MAIN LOADER +# ============================================================================= + +def load_config(config_dir: Path = Path("/data/config")) -> Config: + """Load configuration from multi-file layout. + + This function: + 1. Reads config.yaml (orchestrator) with !include directives + 2. Reads local.yaml if present (operator-local values) + 3. Reads /data/secrets/.env if present (secret values) + 4. Interpolates ${VAR_NAME} references + 5. Merges local values into config + 6. Validates and logs warnings for missing values + 7. Returns the same Config dataclass shape + + Args: + config_dir: Path to config directory (default: /data/config) + + Returns: + Config dataclass instance + """ + config_dir = Path(config_dir) + + # Determine config file path + # Support both new layout (/data/config/config.yaml) and legacy (/data/config.yaml) + orchestrator_path = config_dir / "config.yaml" + legacy_path = config_dir.parent / "config.yaml" if config_dir.name == "config" else None + + if not orchestrator_path.exists(): + if legacy_path and legacy_path.exists(): + # Fall back to legacy single-file config + _logger.info(f"Using legacy config at {legacy_path}") + from .config import load_config as legacy_load + return legacy_load(legacy_path) + else: + _logger.warning( + f"Config file not found at {orchestrator_path}. " + "Using default configuration." + ) + config = Config() + config._config_path = orchestrator_path + return config + + # Load orchestrator with !include support + _logger.debug(f"Loading config from {orchestrator_path}") + data = _load_yaml_with_includes(orchestrator_path) + + # Load local.yaml + local_path = config_dir / "local.yaml" + local_data = {} + if local_path.exists(): + with open(local_path, "r") as f: + local_data = yaml.safe_load(f) or {} + _logger.debug(f"Loaded local config from {local_path}") + else: + _logger.warning( + f"No local.yaml found at {local_path}. " + "MeshAI is in no-location mode - geographic features disabled." + ) + + # Load secrets from .env + secrets_path = config_dir.parent / "secrets" / ".env" + env_data = {} + if secrets_path.exists(): + env_data = dotenv_values(secrets_path) + _logger.debug(f"Loaded {len(env_data)} secrets from {secrets_path}") + else: + # Try alternate location + alt_secrets_path = Path("/data/secrets/.env") + if alt_secrets_path.exists(): + env_data = dotenv_values(alt_secrets_path) + _logger.debug(f"Loaded {len(env_data)} secrets from {alt_secrets_path}") + else: + _logger.warning( + f"No .env file found at {secrets_path}. " + "API keys must be set via environment variables." + ) + + # Interpolate environment variables + data = _interpolate_env_vars(data, env_data) + + # Merge local values + data = _merge_local_values(data, local_data) + + # Validate and warn + _validate_config(data, local_data, env_data) + + # Convert to Config dataclass + config = _dict_to_dataclass(Config, data) + config._config_path = orchestrator_path + + return config + + +# ============================================================================= +# SECTION SAVER +# ============================================================================= + +def _is_secret_field(section: str, field_path: str) -> bool: + """Check if a field path matches a secret field pattern.""" + full_path = f"{section}.{field_path}" if field_path else section + + for pattern in SECRET_FIELDS: + # Convert pattern to regex + regex = pattern.replace(".", r"\.").replace("*", r"[^.]+") + if re.match(f"^{regex}$", full_path): + return True + return False + + +def _extract_local_fields(section: str, data: dict) -> tuple[dict, dict]: + """Extract local fields from data. + + Returns: + (domain_data, local_data) - data split by destination + """ + domain_data = dict(data) + local_data = {} + + # Check each LOCAL_FIELDS pattern + for field_pattern, local_path in LOCAL_FIELDS.items(): + if not field_pattern.startswith(f"{section}."): + continue + + # Extract field name from pattern + field_name = field_pattern[len(section) + 1:] # Remove "section." + + if ".*." in field_name: + # Array field pattern - handle specially + continue + + if field_name in domain_data: + # Move to local_data using the local_path + value = domain_data.pop(field_name) + # Build nested structure in local_data + parts = local_path.split(".") + current = local_data + for part in parts[:-1]: + if part not in current: + current[part] = {} + current = current[part] + current[parts[-1]] = value + + return domain_data, local_data + + +def save_section( + section_name: str, + data: dict, + config_dir: Path = Path("/data/config"), +) -> dict: + """Save a configuration section to the appropriate file(s). + + This function: + 1. Determines which file(s) the section belongs to + 2. Extracts local-identifying fields to local.yaml + 3. Rejects attempts to save secret fields + 4. Writes domain data to the appropriate file + 5. Writes local data to local.yaml + + Args: + section_name: Name of the section (e.g., "notifications", "llm") + data: The section data as a dict + config_dir: Path to config directory + + Returns: + Dict with status: {"saved": True, "files_written": [...], "rejected_secrets": [...]} + + Raises: + ValueError: If section_name is not recognized + """ + config_dir = Path(config_dir) + + if section_name not in SECTION_TO_FILE: + raise ValueError( + f"Unknown section '{section_name}'. " + f"Valid sections: {', '.join(sorted(SECTION_TO_FILE.keys()))}" + ) + + target_file = SECTION_TO_FILE[section_name] + target_path = config_dir / target_file + local_path = config_dir / "local.yaml" + + files_written = [] + rejected_secrets = [] + + # Check for secret fields and reject them + def check_secrets(d: dict, path: str = "") -> dict: + cleaned = {} + for key, value in d.items(): + field_path = f"{path}.{key}" if path else key + if _is_secret_field(section_name, field_path): + rejected_secrets.append(field_path) + _logger.error( + f"Rejected attempt to save secret field '{section_name}.{field_path}'. " + "Secret fields must be set via /data/secrets/.env" + ) + elif isinstance(value, dict): + cleaned[key] = check_secrets(value, field_path) + elif isinstance(value, list): + cleaned[key] = [ + check_secrets(item, f"{field_path}[{i}]") + if isinstance(item, dict) else item + for i, item in enumerate(value) + ] + else: + cleaned[key] = value + return cleaned + + data = check_secrets(data) + + # Extract local fields + domain_data, local_updates = _extract_local_fields(section_name, data) + + # Load existing target file + if target_path.exists(): + with open(target_path, "r") as f: + existing = yaml.safe_load(f) or {} + else: + existing = {} + + # Handle sections that share a file (meshtastic.yaml has both connection and commands) + if target_file == "meshtastic.yaml": + existing[section_name] = domain_data + elif target_file == "config.yaml": + # For orchestrator, update the section in place + existing[section_name] = domain_data + else: + # For dedicated files, the whole file IS the section + existing = domain_data + + # Write domain file + with open(target_path, "w") as f: + yaml.dump(existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + files_written.append(str(target_path)) + _logger.info(f"Saved {section_name} to {target_path}") + + # Update local.yaml if there are local fields + if local_updates: + if local_path.exists(): + with open(local_path, "r") as f: + local_existing = yaml.safe_load(f) or {} + else: + local_existing = {} + + # Deep merge local_updates into local_existing + def deep_merge(base: dict, updates: dict) -> dict: + for key, value in updates.items(): + if key in base and isinstance(base[key], dict) and isinstance(value, dict): + deep_merge(base[key], value) + else: + base[key] = value + return base + + deep_merge(local_existing, local_updates) + + with open(local_path, "w") as f: + yaml.dump(local_existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + # Set restrictive permissions on local.yaml + local_path.chmod(0o600) + files_written.append(str(local_path)) + _logger.info(f"Updated local values in {local_path}") + + return { + "saved": True, + "files_written": files_written, + "rejected_secrets": rejected_secrets, + } + + +# ============================================================================= +# UTILITY FUNCTIONS +# ============================================================================= + +def get_config_dir_from_path(config_path: Path) -> Path: + """Determine config directory from a config file path. + + Args: + config_path: Path to config.yaml (could be legacy or new layout) + + Returns: + Path to config directory + """ + config_path = Path(config_path) + + if config_path.is_dir(): + return config_path + + # If pointing to config.yaml in new layout + if config_path.name == "config.yaml" and config_path.parent.name == "config": + return config_path.parent + + # If pointing to legacy /data/config.yaml + if config_path.name == "config.yaml": + new_layout = config_path.parent / "config" + if new_layout.exists() and (new_layout / "config.yaml").exists(): + return new_layout + + return config_path.parent