diff --git a/.gitignore b/.gitignore index 4aae211..87706b6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,13 +1,3 @@ -# Operator-identifying config and secrets (v0.3 split) -/data/config/local.yaml -/data/config/secrets/ -/data/secrets/ -.env -.env.local -.env.* -!.env.example -local.yaml -!local.yaml.example # Python __pycache__/ *.py[cod] @@ -59,13 +49,3 @@ data/ # OS .DS_Store Thumbs.db -# Operator-identifying config and secrets (v0.3 split) -/data/config/local.yaml -/data/config/secrets/ -/data/secrets/ -.env -.env.local -.env.* -!.env.example -local.yaml -!local.yaml.example diff --git a/config/.env.example b/config/.env.example deleted file mode 100644 index cc43131..0000000 --- a/config/.env.example +++ /dev/null @@ -1,19 +0,0 @@ -# MeshAI Secrets Template -# Copy to /data/secrets/.env and fill in your values -# This file is gitignored - never commit real secrets - -# LLM API Keys (only one needed based on your backend choice) -OPENAI_API_KEY= -ANTHROPIC_API_KEY= -GOOGLE_API_KEY= - -# Mesh Source Credentials -MESHMONITOR_API_TOKEN= -MQTT_PASSWORD= - -# Environmental Feed Keys -TOMTOM_API_KEY= -FIRMS_MAP_KEY= - -# Notification Credentials -SMTP_PASSWORD= diff --git a/config/local.yaml.example b/config/local.yaml.example deleted file mode 100644 index 8da62a9..0000000 --- a/config/local.yaml.example +++ /dev/null @@ -1,57 +0,0 @@ -# MeshAI Local Configuration Template -# Copy to /data/config/local.yaml and customize for your deployment -# This file is gitignored - contains operator-identifying values - -# Operator Identity -identity: - name: "" # Bot display name - owner: "" # Owner callsign/name - primary_node_id: "" # Your main mesh node ID - contact_email: "" # For NWS user_agent, SMTP from - -# Region Coordinates -# Map your region names to their lat/lon center points -regions: - "Example Region": - lat: 0.0 - lon: 0.0 - # Add more regions as needed: - # "Another Region": - # lat: 42.5 - # lon: -114.5 - -# Mesh Data Source URLs -mesh_sources: - meshmonitor_url: "" # Your MeshMonitor instance - sources: - # Per-source URL overrides (matches names in mesh_sources.yaml) - "My-Meshview": - url: "" - # "My-MeshMonitor": - # url: "" - -# Infrastructure Hosts -infrastructure: - tcp_host: "" # Meshtastic TCP host (meshtasticd) - qdrant_host: "" # Qdrant vector DB (optional) - tei_host: "" # TEI embedding service (optional) - sparse_host: "" # Sparse embedding service (optional) - -# Environmental Feed Center Point -env_center: - latitude: 0.0 # Center of your coverage area - longitude: 0.0 - -# Notification Targets -notification_targets: - smtp_from: "" # Email from address - smtp_recipients: [] # Default email recipients - webhook_urls: [] # Webhook endpoints - alert_node_ids: [] # Node IDs for mesh DM alerts - -# Critical Infrastructure Nodes (short names) -critical_nodes: [] -# Example: -# critical_nodes: -# - "MHR" -# - "HPR" diff --git a/meshai/config.py b/meshai/config.py index 441f3dc..fb05317 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -230,7 +230,6 @@ class RegionAnchor: description: str = "" # e.g., "Twin Falls, Burley, Jerome along I-84/US-93" aliases: list[str] = field(default_factory=list) # e.g., ["southern Idaho", "magic valley"] cities: list[str] = field(default_factory=list) # e.g., ["Twin Falls", "Burley", "Jerome"] - nws_zones: list[str] = field(default_factory=list) # NWS zone codes (e.g., ["IDZ016", "IDZ030"]) @dataclass diff --git a/meshai/config_loader.py b/meshai/config_loader.py deleted file mode 100644 index 74d2f3e..0000000 --- a/meshai/config_loader.py +++ /dev/null @@ -1,688 +0,0 @@ -"""Multi-file configuration loader for MeshAI v0.3. - -This module provides: -- !include directive support for splitting config across files -- Environment variable interpolation (${VAR_NAME} and ${VAR_NAME:-default}) -- Operator-local value merging from local.yaml -- Secret loading from .env files -- Section-aware save_section() for dashboard write-back - -The loader produces the same Config dataclass shape as config.py, -ensuring backward compatibility with all existing consumers. -""" - -import logging -import os -import re -from pathlib import Path -from typing import Any, Optional - -import yaml -from dotenv import dotenv_values - -# Import existing dataclasses - shape must NOT change -from .config import ( - Config, - _dict_to_dataclass, - _dataclass_to_dict, -) - -_logger = logging.getLogger(__name__) - -# ============================================================================= -# SECTION TO FILE MAPPING -# ============================================================================= - -SECTION_TO_FILE: dict[str, str] = { - # Inline in orchestrator config.yaml - "timezone": "config.yaml", - "bot": "config.yaml", - "response": "config.yaml", - "history": "config.yaml", - "memory": "config.yaml", - "context": "config.yaml", - "weather": "config.yaml", - "meshmonitor": "config.yaml", - "knowledge": "config.yaml", - - # Domain files - "connection": "meshtastic.yaml", - "commands": "meshtastic.yaml", - "mesh_sources": "mesh_sources.yaml", - "mesh_intelligence": "mesh_intelligence.yaml", - "environmental": "env_feeds.yaml", - "notifications": "notifications.yaml", - "llm": "llm.yaml", - "dashboard": "dashboard.yaml", -} - -# Fields that should be written to local.yaml instead of domain files -LOCAL_FIELDS: dict[str, str] = { - "bot.name": "identity.name", - "bot.owner": "identity.owner", - "connection.tcp_host": "infrastructure.tcp_host", - "knowledge.qdrant_host": "infrastructure.qdrant_host", - "knowledge.tei_host": "infrastructure.tei_host", - "knowledge.sparse_host": "infrastructure.sparse_host", - "meshmonitor.url": "mesh_sources.meshmonitor_url", - "mesh_intelligence.critical_nodes": "critical_nodes", - "environmental.ducting.latitude": "env_center.latitude", - "environmental.ducting.longitude": "env_center.longitude", -} - -# Fields that contain secrets - NEVER written, must be in .env -SECRET_FIELDS: set[str] = { - "llm.api_key", - "mesh_sources.*.api_token", - "mesh_sources.*.password", - "environmental.traffic.api_key", - "environmental.firms.map_key", - "notifications.rules.*.smtp_password", -} - -# Secret env var names expected in .env -EXPECTED_SECRETS: list[str] = [ - "OPENAI_API_KEY", - "ANTHROPIC_API_KEY", - "GOOGLE_API_KEY", - "MESHMONITOR_API_TOKEN", - "MQTT_PASSWORD", - "TOMTOM_API_KEY", - "FIRMS_MAP_KEY", - "SMTP_PASSWORD", -] - - -# ============================================================================= -# YAML !INCLUDE CONSTRUCTOR -# ============================================================================= - -# Global set for tracking files currently being loaded (cycle detection) -_loading_files: set[Path] = set() - - -def _make_include_loader(base_path: Path): - """Create an IncludeLoader class with the given base path.""" - - class IncludeLoader(yaml.SafeLoader): - """YAML loader with !include tag support.""" - pass - - def construct_include(loader: IncludeLoader, node: yaml.Node) -> Any: - """Handle !include directive.""" - relative_path = loader.construct_scalar(node) - include_path = (base_path / relative_path).resolve() - - # Cycle detection using global set - if include_path in _loading_files: - raise yaml.YAMLError( - f"Circular include detected: {include_path} is already being loaded. " - f"Current loading chain: {[str(p) for p in _loading_files]}" - ) - - if not include_path.exists(): - raise yaml.YAMLError( - f"Include file not found: {include_path} " - f"(referenced from {base_path / 'config.yaml'})" - ) - - _loading_files.add(include_path) - try: - with open(include_path, "r") as f: - # Recursively load with the include file's directory as new base - NestedLoader = _make_include_loader(include_path.parent) - return yaml.load(f, Loader=NestedLoader) - finally: - _loading_files.discard(include_path) - - IncludeLoader.add_constructor("!include", construct_include) - return IncludeLoader - - -def _load_yaml_with_includes(file_path: Path) -> dict: - """Load a YAML file with !include directive support.""" - global _loading_files - _loading_files.clear() # Reset cycle detection - - if not file_path.exists(): - return {} - - # Add the root file to loading set - file_path = file_path.resolve() - _loading_files.add(file_path) - - try: - with open(file_path, "r") as f: - Loader = _make_include_loader(file_path.parent) - return yaml.load(f, Loader=Loader) or {} - finally: - _loading_files.discard(file_path) - - -# ============================================================================= -# ENVIRONMENT VARIABLE INTERPOLATION -# ============================================================================= - -_ENV_PATTERN = re.compile(r"\$\{([A-Z_][A-Z0-9_]*)(?::-([^}]*))?\}") - - -def _interpolate_env_vars(value: Any, env: dict[str, str]) -> Any: - """Recursively interpolate ${VAR_NAME} and ${VAR_NAME:-default} in strings. - - Args: - value: The value to interpolate (can be string, dict, list, or other) - env: Combined environment (os.environ + .env file values) - - Returns: - The value with environment variables resolved - """ - if isinstance(value, str): - def replace_match(match): - var_name = match.group(1) - default = match.group(2) - - # os.environ takes precedence over .env file - resolved = os.environ.get(var_name) - if resolved is None: - resolved = env.get(var_name) - if resolved is None: - if default is not None: - return default - _logger.warning( - f"Environment variable ${{{var_name}}} not found and no default provided. " - "Using empty string." - ) - return "" - return resolved - - return _ENV_PATTERN.sub(replace_match, value) - - elif isinstance(value, dict): - return {k: _interpolate_env_vars(v, env) for k, v in value.items()} - - elif isinstance(value, list): - return [_interpolate_env_vars(item, env) for item in value] - - return value - - -# ============================================================================= -# LOCAL.YAML MERGING -# ============================================================================= - -def _merge_local_values(data: dict, local: dict) -> dict: - """Merge operator-local values from local.yaml into the config data. - - This handles: - - identity.name/owner -> bot.name/owner - - infrastructure.* -> connection/knowledge hosts - - regions.{name}.lat/lon -> mesh_intelligence.regions[name].lat/lon - - critical_nodes -> mesh_intelligence.critical_nodes - - mesh_sources.sources.{name}.* -> mesh_sources[name].* - - env_center.* -> environmental.ducting.* - - notification_targets.* -> notifications rules - - Args: - data: The loaded config data (will be modified in place) - local: The local.yaml data - - Returns: - The merged data dict - """ - if not local: - return data - - # Identity -> bot - identity = local.get("identity", {}) - if "bot" in data: - if identity.get("name"): - data["bot"]["name"] = identity["name"] - if identity.get("owner"): - data["bot"]["owner"] = identity["owner"] - - # Infrastructure hosts - infra = local.get("infrastructure", {}) - if infra.get("tcp_host") and "connection" in data: - data["connection"]["tcp_host"] = infra["tcp_host"] - if "knowledge" in data: - if infra.get("qdrant_host"): - data["knowledge"]["qdrant_host"] = infra["qdrant_host"] - if infra.get("tei_host"): - data["knowledge"]["tei_host"] = infra["tei_host"] - if infra.get("sparse_host"): - data["knowledge"]["sparse_host"] = infra["sparse_host"] - - # Meshmonitor URL - mesh_sources_local = local.get("mesh_sources", {}) - if mesh_sources_local.get("meshmonitor_url") and "meshmonitor" in data: - data["meshmonitor"]["url"] = mesh_sources_local["meshmonitor_url"] - - # Mesh sources URLs - sources_local = mesh_sources_local.get("sources", {}) - if "mesh_sources" in data and isinstance(data["mesh_sources"], list): - for source in data["mesh_sources"]: - if isinstance(source, dict): - source_name = source.get("name", "") - local_source = sources_local.get(source_name, {}) - if local_source.get("url"): - source["url"] = local_source["url"] - if local_source.get("host"): - source["host"] = local_source["host"] - - # Region coordinates - regions_local = local.get("regions", {}) - if "mesh_intelligence" in data: - mi = data["mesh_intelligence"] - if "regions" in mi and isinstance(mi["regions"], list): - for region in mi["regions"]: - if isinstance(region, dict): - region_name = region.get("name", "") - local_coords = regions_local.get(region_name, {}) - if "lat" in local_coords: - region["lat"] = local_coords["lat"] - if "lon" in local_coords: - region["lon"] = local_coords["lon"] - - # Critical nodes - if local.get("critical_nodes"): - mi["critical_nodes"] = local["critical_nodes"] - - # Environmental center point - env_center = local.get("env_center", {}) - if "environmental" in data: - env = data["environmental"] - if "ducting" in env: - if env_center.get("latitude") is not None: - env["ducting"]["latitude"] = env_center["latitude"] - if env_center.get("longitude") is not None: - env["ducting"]["longitude"] = env_center["longitude"] - - # NWS user agent from contact email - if identity.get("contact_email") and "nws" in env: - email = identity["contact_email"] - env["nws"]["user_agent"] = f"(meshai, {email})" - - # Notification targets - notif_targets = local.get("notification_targets", {}) - if "notifications" in data and "rules" in data["notifications"]: - alert_node_ids = notif_targets.get("alert_node_ids", []) - smtp_recipients = notif_targets.get("smtp_recipients", []) - - for rule in data["notifications"]["rules"]: - if isinstance(rule, dict): - # Apply default node_ids if not set - if rule.get("delivery_type") == "mesh_dm" and not rule.get("node_ids"): - rule["node_ids"] = alert_node_ids - # Apply default recipients if not set - if rule.get("delivery_type") == "email" and not rule.get("recipients"): - rule["recipients"] = smtp_recipients - # Apply smtp_from - if notif_targets.get("smtp_from") and not rule.get("from_address"): - rule["from_address"] = notif_targets["smtp_from"] - - return data - - -# ============================================================================= -# VALIDATION -# ============================================================================= - -def _validate_config(data: dict, local: dict, env: dict[str, str]) -> None: - """Validate config and log warnings for missing values. - - This does NOT raise errors - MeshAI starts in degraded mode with missing values. - """ - # Check regions for missing coordinates - if "mesh_intelligence" in data: - mi = data["mesh_intelligence"] - if mi.get("enabled") and "regions" in mi: - regions_local = local.get("regions", {}) if local else {} - for region in mi["regions"]: - if isinstance(region, dict): - region_name = region.get("name", "unknown") - if not region.get("lat") or not region.get("lon"): - if region_name not in regions_local: - _logger.warning( - f"Region '{region_name}' has no coordinates in local.yaml - " - "geographic features disabled for this region" - ) - - # Check for missing secrets - missing_secrets = [] - for secret in EXPECTED_SECRETS: - if not os.environ.get(secret) and not env.get(secret): - missing_secrets.append(secret) - - if missing_secrets: - _logger.warning( - f"Missing secret environment variables: {', '.join(missing_secrets)}. " - "Some features may be disabled." - ) - - # Check LLM API key - if "llm" in data: - api_key = data["llm"].get("api_key", "") - if not api_key or (api_key.startswith("${") and api_key.endswith("}")): - # It's a reference, check if resolved - backend = data["llm"].get("backend", "openai").lower() - key_var = { - "openai": "OPENAI_API_KEY", - "anthropic": "ANTHROPIC_API_KEY", - "google": "GOOGLE_API_KEY", - }.get(backend, "LLM_API_KEY") - if not os.environ.get(key_var) and not env.get(key_var): - _logger.warning( - f"LLM backend '{backend}' configured but {key_var} not found. " - "LLM responses will fail." - ) - - -# ============================================================================= -# MAIN LOADER -# ============================================================================= - -def load_config(config_dir: Path = Path("/data/config")) -> Config: - """Load configuration from multi-file layout. - - This function: - 1. Reads config.yaml (orchestrator) with !include directives - 2. Reads local.yaml if present (operator-local values) - 3. Reads /data/secrets/.env if present (secret values) - 4. Interpolates ${VAR_NAME} references - 5. Merges local values into config - 6. Validates and logs warnings for missing values - 7. Returns the same Config dataclass shape - - Args: - config_dir: Path to config directory (default: /data/config) - - Returns: - Config dataclass instance - """ - config_dir = Path(config_dir) - - # Determine config file path - # Support both new layout (/data/config/config.yaml) and legacy (/data/config.yaml) - orchestrator_path = config_dir / "config.yaml" - legacy_path = config_dir.parent / "config.yaml" if config_dir.name == "config" else None - - if not orchestrator_path.exists(): - if legacy_path and legacy_path.exists(): - # Fall back to legacy single-file config - _logger.info(f"Using legacy config at {legacy_path}") - from .config import load_config as legacy_load - return legacy_load(legacy_path) - else: - _logger.warning( - f"Config file not found at {orchestrator_path}. " - "Using default configuration." - ) - config = Config() - config._config_path = orchestrator_path - return config - - # Load orchestrator with !include support - _logger.debug(f"Loading config from {orchestrator_path}") - data = _load_yaml_with_includes(orchestrator_path) - # Hoist meshtastic.connection and meshtastic.commands to top level - # meshtastic.yaml contains both sections under wrapper keys - if "meshtastic" in data and isinstance(data["meshtastic"], dict): - meshtastic = data.pop("meshtastic") - if "connection" in meshtastic: - data["connection"] = meshtastic["connection"] - if "commands" in meshtastic: - data["commands"] = meshtastic["commands"] - - # Load local.yaml - local_path = config_dir / "local.yaml" - local_data = {} - if local_path.exists(): - with open(local_path, "r") as f: - local_data = yaml.safe_load(f) or {} - _logger.debug(f"Loaded local config from {local_path}") - else: - _logger.warning( - f"No local.yaml found at {local_path}. " - "MeshAI is in no-location mode - geographic features disabled." - ) - - # Load secrets from .env - secrets_path = config_dir.parent / "secrets" / ".env" - env_data = {} - if secrets_path.exists(): - env_data = dotenv_values(secrets_path) - _logger.debug(f"Loaded {len(env_data)} secrets from {secrets_path}") - else: - # Try alternate location - alt_secrets_path = Path("/data/secrets/.env") - if alt_secrets_path.exists(): - env_data = dotenv_values(alt_secrets_path) - _logger.debug(f"Loaded {len(env_data)} secrets from {alt_secrets_path}") - else: - _logger.warning( - f"No .env file found at {secrets_path}. " - "API keys must be set via environment variables." - ) - - # Interpolate environment variables - data = _interpolate_env_vars(data, env_data) - - # Merge local values - data = _merge_local_values(data, local_data) - - # Validate and warn - _validate_config(data, local_data, env_data) - - # Convert to Config dataclass - config = _dict_to_dataclass(Config, data) - config._config_path = orchestrator_path - - return config - - -# ============================================================================= -# SECTION SAVER -# ============================================================================= - -def _is_secret_field(section: str, field_path: str) -> bool: - """Check if a field path matches a secret field pattern.""" - full_path = f"{section}.{field_path}" if field_path else section - - for pattern in SECRET_FIELDS: - # Convert pattern to regex - regex = pattern.replace(".", r"\.").replace("*", r"[^.]+") - if re.match(f"^{regex}$", full_path): - return True - return False - - -def _extract_local_fields(section: str, data: dict) -> tuple[dict, dict]: - """Extract local fields from data. - - Returns: - (domain_data, local_data) - data split by destination - """ - domain_data = dict(data) - local_data = {} - - # Check each LOCAL_FIELDS pattern - for field_pattern, local_path in LOCAL_FIELDS.items(): - if not field_pattern.startswith(f"{section}."): - continue - - # Extract field name from pattern - field_name = field_pattern[len(section) + 1:] # Remove "section." - - if ".*." in field_name: - # Array field pattern - handle specially - continue - - if field_name in domain_data: - # Move to local_data using the local_path - value = domain_data.pop(field_name) - # Build nested structure in local_data - parts = local_path.split(".") - current = local_data - for part in parts[:-1]: - if part not in current: - current[part] = {} - current = current[part] - current[parts[-1]] = value - - return domain_data, local_data - - -def save_section( - section_name: str, - data: dict, - config_dir: Path = Path("/data/config"), -) -> dict: - """Save a configuration section to the appropriate file(s). - - This function: - 1. Determines which file(s) the section belongs to - 2. Extracts local-identifying fields to local.yaml - 3. Rejects attempts to save secret fields - 4. Writes domain data to the appropriate file - 5. Writes local data to local.yaml - - Args: - section_name: Name of the section (e.g., "notifications", "llm") - data: The section data as a dict - config_dir: Path to config directory - - Returns: - Dict with status: {"saved": True, "files_written": [...], "rejected_secrets": [...]} - - Raises: - ValueError: If section_name is not recognized - """ - config_dir = Path(config_dir) - - if section_name not in SECTION_TO_FILE: - raise ValueError( - f"Unknown section '{section_name}'. " - f"Valid sections: {', '.join(sorted(SECTION_TO_FILE.keys()))}" - ) - - target_file = SECTION_TO_FILE[section_name] - target_path = config_dir / target_file - local_path = config_dir / "local.yaml" - - files_written = [] - rejected_secrets = [] - - # Check for secret fields and reject them - def check_secrets(d: dict, path: str = "") -> dict: - cleaned = {} - for key, value in d.items(): - field_path = f"{path}.{key}" if path else key - if _is_secret_field(section_name, field_path): - rejected_secrets.append(field_path) - _logger.error( - f"Rejected attempt to save secret field '{section_name}.{field_path}'. " - "Secret fields must be set via /data/secrets/.env" - ) - elif isinstance(value, dict): - cleaned[key] = check_secrets(value, field_path) - elif isinstance(value, list): - cleaned[key] = [ - check_secrets(item, f"{field_path}[{i}]") - if isinstance(item, dict) else item - for i, item in enumerate(value) - ] - else: - cleaned[key] = value - return cleaned - - data = check_secrets(data) - - # Extract local fields - domain_data, local_updates = _extract_local_fields(section_name, data) - - # Load existing target file - if target_path.exists(): - with open(target_path, "r") as f: - existing = yaml.safe_load(f) or {} - else: - existing = {} - - # Handle sections that share a file (meshtastic.yaml has both connection and commands) - if target_file == "meshtastic.yaml": - existing[section_name] = domain_data - elif target_file == "config.yaml": - # For orchestrator, update the section in place - existing[section_name] = domain_data - else: - # For dedicated files, the whole file IS the section - existing = domain_data - - # Write domain file - with open(target_path, "w") as f: - yaml.dump(existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True) - files_written.append(str(target_path)) - _logger.info(f"Saved {section_name} to {target_path}") - - # Update local.yaml if there are local fields - if local_updates: - if local_path.exists(): - with open(local_path, "r") as f: - local_existing = yaml.safe_load(f) or {} - else: - local_existing = {} - - # Deep merge local_updates into local_existing - def deep_merge(base: dict, updates: dict) -> dict: - for key, value in updates.items(): - if key in base and isinstance(base[key], dict) and isinstance(value, dict): - deep_merge(base[key], value) - else: - base[key] = value - return base - - deep_merge(local_existing, local_updates) - - with open(local_path, "w") as f: - yaml.dump(local_existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True) - - # Set restrictive permissions on local.yaml - local_path.chmod(0o600) - files_written.append(str(local_path)) - _logger.info(f"Updated local values in {local_path}") - - return { - "saved": True, - "files_written": files_written, - "rejected_secrets": rejected_secrets, - } - - -# ============================================================================= -# UTILITY FUNCTIONS -# ============================================================================= - -def get_config_dir_from_path(config_path: Path) -> Path: - """Determine config directory from a config file path. - - Args: - config_path: Path to config.yaml (could be legacy or new layout) - - Returns: - Path to config directory - """ - config_path = Path(config_path) - - if config_path.is_dir(): - return config_path - - # If pointing to config.yaml in new layout - if config_path.name == "config.yaml" and config_path.parent.name == "config": - return config_path.parent - - # If pointing to legacy /data/config.yaml - if config_path.name == "config.yaml": - new_layout = config_path.parent / "config" - if new_layout.exists() and (new_layout / "config.yaml").exists(): - return new_layout - - return config_path.parent diff --git a/meshai/main.py b/meshai/main.py index 6d1c41f..48bc44b 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -16,8 +16,7 @@ from .cli import run_configurator from .commands import CommandDispatcher from .commands.dispatcher import create_dispatcher from .commands.status import set_start_time -from .config import Config -from .config_loader import load_config, get_config_dir_from_path +from .config import Config, load_config from .connector import MeshConnector, MeshMessage from .context import MeshContext from .history import ConversationHistory @@ -713,21 +712,12 @@ def main() -> None: run_configurator(args.config_file) return - # Load config - support both old (/data/config.yaml) and new (/data/config/) layouts - config_path = args.config_file - config_dir = get_config_dir_from_path(config_path) + # Load config + config = load_config(args.config_file) - # Check for new multi-file layout first - if (config_dir / "config.yaml").exists(): - logger.info(f"Loading config from multi-file layout: {config_dir}") - config = load_config(config_dir) - elif config_path.exists(): - # Fall back to legacy single-file loading - logger.info(f"Loading legacy config: {config_path}") - from .config import load_config as legacy_load - config = legacy_load(config_path) - else: - logger.warning(f"Config not found at {config_path} or {config_dir}") + # Check if config exists + if not args.config_file.exists(): + logger.warning(f"Config file not found: {args.config_file}") logger.info("Run 'meshai --config' to create one, or copy config.example.yaml") sys.exit(1) diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index 0ac0e88..2379a15 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -7,34 +7,8 @@ Severity levels (military/intelligence precedence): routine - Informational, no time pressure priority - Needs attention soon immediate - Act now, drop everything - -Toggle categories (for v0.3 notification routing): - mesh_health - infrastructure, power, utilization, coverage, health-score - weather - NWS-sourced alerts, stream flooding - fire - NIFC perimeters, FIRMS hotspots - rf_propagation - solar, geomagnetic, ducting, band conditions - roads - 511, TomTom traffic - avalanche - avalanche advisories - seismic - USGS quakes (Phase 3) - tracking - ADS-B, AIS, satellite passes (Phase 7) """ -from typing import Optional - - -# Valid toggle values for v0.3 pipeline -VALID_TOGGLES = frozenset({ - "mesh_health", - "weather", - "fire", - "rf_propagation", - "roads", - "avalanche", - "seismic", - "tracking", -}) - - ALERT_CATEGORIES = { # Infrastructure alerts "infra_offline": { @@ -42,28 +16,24 @@ ALERT_CATEGORIES = { "description": "An infrastructure node (router/repeater) stopped responding", "default_severity": "priority", "example_message": "⚠ Infrastructure Offline: MHR — Mountain Harrison Rptr has not been heard for 2 hours", - "toggle": "mesh_health", }, "critical_node_down": { "name": "Critical Node Down", "description": "A node you marked as critical went offline", "default_severity": "immediate", "example_message": "🚨 Critical Node Down: HPR — Hayden Peak Rptr offline for 1 hour", - "toggle": "mesh_health", }, "infra_recovery": { "name": "Infrastructure Recovery", "description": "An offline infrastructure node came back online", "default_severity": "routine", "example_message": "✅ Recovery: MHR — Mountain Harrison Rptr back online after 2h outage", - "toggle": "mesh_health", }, "new_router": { "name": "New Router", "description": "A new router appeared on the mesh", "default_severity": "routine", "example_message": "📡 New Router: Snake River Relay appeared in Wood River Valley", - "toggle": "mesh_health", }, # Power alerts @@ -72,42 +42,36 @@ ALERT_CATEGORIES = { "description": "Infrastructure node battery below 30% (3.60V)", "default_severity": "routine", "example_message": "🔋 Battery Warning: BLD-MTN at 28% (3.58V), solar not charging", - "toggle": "mesh_health", }, "battery_critical": { "name": "Battery Critical", "description": "Infrastructure node battery below 15% (3.50V)", "default_severity": "priority", "example_message": "🔋 Battery Critical: BLD-MTN at 12% (3.48V) — shutdown in hours", - "toggle": "mesh_health", }, "battery_emergency": { "name": "Battery Emergency", "description": "Infrastructure node battery below 5% (3.40V) — shutdown imminent", "default_severity": "immediate", "example_message": "🚨 Battery Emergency: BLD-MTN at 4% (3.38V) — shutdown imminent", - "toggle": "mesh_health", }, "battery_trend": { "name": "Battery Declining", "description": "Battery showing declining trend over 7 days — possible solar or charging issue", "default_severity": "routine", "example_message": "🔋 Battery Trend: HPR declining 85% → 62% over 7 days (-3.3%/day)", - "toggle": "mesh_health", }, "power_source_change": { "name": "Power Source Change", "description": "Node switched from USB to battery — possible power outage at site", "default_severity": "priority", "example_message": "⚡ Power Source: MHR switched from USB to battery — possible outage", - "toggle": "mesh_health", }, "solar_not_charging": { "name": "Solar Not Charging", "description": "Solar panel not charging during daylight hours — panel issue or obstruction", "default_severity": "priority", "example_message": "☀️ Solar Issue: BLD-MTN not charging during daylight (12:00 MDT)", - "toggle": "mesh_health", }, # Utilization alerts @@ -116,21 +80,18 @@ ALERT_CATEGORIES = { "description": "LoRa channel airtime exceeding threshold — mesh congestion", "default_severity": "routine", "example_message": "📊 Channel Airtime: 47% utilization (threshold: 40%). Reliability may degrade.", - "toggle": "mesh_health", }, "sustained_high_util": { "name": "Sustained High Utilization", "description": "Channel airtime elevated for extended period — ongoing congestion", "default_severity": "priority", "example_message": "📊 Sustained Congestion: 45% channel utilization for 2+ hours. Consider reducing telemetry.", - "toggle": "mesh_health", }, "packet_flood": { "name": "Packet Flood", "description": "A single node sending excessive radio packets (NOT water flooding) — possible firmware bug or stuck transmitter", "default_severity": "priority", "example_message": "📻 Packet Flood: Node 'BKBS' transmitting 42 packets/min (threshold: 10/min). Firmware bug?", - "toggle": "mesh_health", }, # Coverage alerts @@ -139,21 +100,18 @@ ALERT_CATEGORIES = { "description": "Infrastructure node dropped to single gateway coverage — reduced redundancy", "default_severity": "priority", "example_message": "📶 Reduced Coverage: HPR dropped to single gateway. Previously had 3 paths.", - "toggle": "mesh_health", }, "feeder_offline": { "name": "Feeder Offline", "description": "A feeder gateway stopped responding — coverage gap possible", "default_severity": "priority", "example_message": "📡 Feeder Offline: AIDA-N2 gateway not responding. 5 nodes may lose uplink.", - "toggle": "mesh_health", }, "region_total_blackout": { "name": "Region Blackout", "description": "All infrastructure in a region is offline — complete coverage loss", "default_severity": "immediate", "example_message": "🚨 REGION BLACKOUT: All infrastructure in Magic Valley offline!", - "toggle": "mesh_health", }, # Health score alerts @@ -162,14 +120,12 @@ ALERT_CATEGORIES = { "description": "Overall mesh health score dropped below threshold — multiple issues likely", "default_severity": "priority", "example_message": "📉 Mesh Health: Score 62/100 (threshold: 65). Infrastructure: 71, Connectivity: 58.", - "toggle": "mesh_health", }, "region_score_low": { "name": "Region Health Low", "description": "A region's health score below threshold — localized issues", "default_severity": "priority", "example_message": "📉 Region Health: Magic Valley at 55/100 (threshold: 60). 2 nodes offline.", - "toggle": "mesh_health", }, # Environmental - Weather @@ -178,7 +134,6 @@ ALERT_CATEGORIES = { "description": "NWS warning or advisory affecting your mesh area", "default_severity": "priority", "example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z", - "toggle": "weather", }, # Environmental - Space Weather @@ -187,14 +142,12 @@ ALERT_CATEGORIES = { "description": "R3+ solar flare degrading HF propagation on sunlit side", "default_severity": "priority", "example_message": "⚠ R3 Strong Radio Blackout — X1.2 flare. Wide-area HF blackout ~1 hour on sunlit side.", - "toggle": "rf_propagation", }, "geomagnetic_storm": { "name": "Geomagnetic Storm", "description": "G2+ geomagnetic storm — HF degraded at higher latitudes, aurora possible", "default_severity": "priority", "example_message": "🌐 G2 Moderate Geomagnetic Storm — Kp=6. HF fades at high latitudes, aurora to ~55°.", - "toggle": "rf_propagation", }, # Environmental - Tropospheric @@ -203,7 +156,6 @@ ALERT_CATEGORIES = { "description": "Atmospheric conditions trapping VHF/UHF signals — extended range", "default_severity": "routine", "example_message": "📡 Tropospheric Ducting: Surface duct detected, dM/dz -45 M-units/km, ~120m thick. VHF/UHF extended range.", - "toggle": "rf_propagation", }, # Environmental - Fire @@ -212,21 +164,18 @@ ALERT_CATEGORIES = { "description": "Active wildfire within alert radius of mesh infrastructure", "default_severity": "priority", "example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR. Monitor closely.", - "toggle": "fire", }, "wildfire_proximity": { "name": "Fire Near Mesh", "description": "Active wildfire within alert radius of mesh infrastructure", "default_severity": "priority", "example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR.", - "toggle": "fire", }, "new_ignition": { "name": "New Fire Ignition", "description": "Satellite hotspot detected NOT near any known fire — potential new wildfire", "default_severity": "priority", "example_message": "🛰 New Ignition: Satellite fire at 42.32°N, 114.30°W — high confidence, 47 MW FRP. Not near any known fire.", - "toggle": "fire", }, # Environmental - Flood @@ -235,14 +184,12 @@ ALERT_CATEGORIES = { "description": "River gauge exceeds NWS flood stage threshold", "default_severity": "priority", "example_message": "🌊 Stream Flood Warning: Snake River nr Twin Falls at 12.8 ft — Minor Flood Stage is 10.5 ft.", - "toggle": "weather", }, "stream_high_water": { "name": "Stream High Water", "description": "River gauge approaching flood stage — monitoring recommended", "default_severity": "routine", "example_message": "🌊 High Water: Snake River at 9.8 ft — Action Stage is 9.0 ft. Monitor conditions.", - "toggle": "weather", }, # Environmental - Roads @@ -251,14 +198,12 @@ ALERT_CATEGORIES = { "description": "Full road closure on a monitored corridor", "default_severity": "priority", "example_message": "🚧 Road Closure: I-84 EB at MP 173 — full closure, construction. Detour via US-30.", - "toggle": "roads", }, "traffic_congestion": { "name": "Traffic Congestion", "description": "Traffic speed dropped below congestion threshold on a monitored corridor", "default_severity": "routine", "example_message": "🚗 Traffic Congestion: I-84 Twin Falls — 35 mph (free-flow 70 mph), 50% speed ratio", - "toggle": "roads", }, # Environmental - Avalanche @@ -267,14 +212,12 @@ ALERT_CATEGORIES = { "description": "Avalanche danger level 4 (High) or 5 (Extreme) in your area", "default_severity": "priority", "example_message": "⛷ Avalanche Danger HIGH: Sawtooth Zone — avoid avalanche terrain. Natural avalanches likely.", - "toggle": "avalanche", }, "avalanche_considerable": { "name": "Avalanche Danger Considerable", "description": "Avalanche danger level 3 (Considerable) — most fatalities occur at this level", "default_severity": "routine", "example_message": "⛷ Avalanche Danger CONSIDERABLE: Sawtooth Zone — dangerous conditions on steep slopes.", - "toggle": "avalanche", }, } @@ -288,7 +231,6 @@ def get_category(category_id: str) -> dict: "description": f"Alert type: {category_id}", "default_severity": "routine", "example_message": f"Alert: {category_id}", - "toggle": "mesh_health", # Default unknown to mesh_health } @@ -298,37 +240,3 @@ def list_categories() -> list[dict]: {"id": cat_id, **cat_info} for cat_id, cat_info in ALERT_CATEGORIES.items() ] - - -def categories_for_toggle(toggle: str) -> list[str]: - """Return all category names that route to this toggle. - - Args: - toggle: Toggle name (e.g., "mesh_health", "weather") - - Returns: - List of category IDs that have this toggle assigned - """ - if toggle not in VALID_TOGGLES: - return [] - - return [ - cat_id - for cat_id, cat_info in ALERT_CATEGORIES.items() - if cat_info.get("toggle") == toggle - ] - - -def get_toggle(category_name: str) -> Optional[str]: - """Return the toggle name for a category, or None if unknown. - - Args: - category_name: Category ID (e.g., "infra_offline") - - Returns: - Toggle name (e.g., "mesh_health") or None if category unknown - """ - cat_info = ALERT_CATEGORIES.get(category_name) - if cat_info: - return cat_info.get("toggle") - return None diff --git a/meshai/notifications/events.py b/meshai/notifications/events.py deleted file mode 100644 index 7099f0e..0000000 --- a/meshai/notifications/events.py +++ /dev/null @@ -1,186 +0,0 @@ -"""Event dataclass for the v0.3 notification pipeline. - -This module defines the unified Event shape that flows through the -notification routing pipeline. All adapters emit Events, and the -router consumes them. - -Usage: - from meshai.notifications.events import Event, make_event - - # Create an event - event = make_event( - source="nws", - category="tornado_warning", - severity="immediate", - title="Tornado Warning for Ada County", - summary="A tornado warning has been issued...", - lat=43.615, - lon=-116.2023, - ) - - # Serialize for storage/webhook - data = event.to_dict() - - # Restore from storage - event2 = Event.from_dict(data) -""" - -import hashlib -import time -from dataclasses import dataclass, field, asdict -from typing import Optional, Any - - -# Valid severity levels -SEVERITY_LEVELS = frozenset({"routine", "priority", "immediate"}) - - -@dataclass -class Event: - """Unified event shape for the notification pipeline. - - All adapters (NWS, FIRMS, alert_engine, etc.) emit Events. - The router consumes Events and dispatches them to channels. - """ - - # Identity - id: str = "" # stable hash for dedup, computed if not provided - source: str = "" # adapter name: "nws", "firms", "alert_engine", etc. - category: str = "" # specific event type within source - - # Severity - severity: str = "routine" # "routine" | "priority" | "immediate" - - # Geography - region: Optional[str] = None # primary region name, set by region tagger - regions: list[str] = field(default_factory=list) # all regions touched - lat: Optional[float] = None - lon: Optional[float] = None - nws_zones: list[str] = field(default_factory=list) # NWS zone codes - - # Content - title: str = "" # one-line summary for digest headers - summary: str = "" # 1-3 sentence summary for immediate/mesh delivery - body: str = "" # full content for email/webhook delivery - - # Affected entities (for mesh health events) - node_ids: list[str] = field(default_factory=list) - short_names: list[str] = field(default_factory=list) - - # Timing - timestamp: float = 0.0 # event creation time - effective: Optional[float] = None # event start (NWS-style) - expires: Optional[float] = None # event end (NWS-style) - - # Routing hints - group_key: Optional[str] = None # events with same key get merged - inhibit_keys: list[str] = field(default_factory=list) # suppression keys - - # Raw adapter data (preserved for advanced rendering) - data: dict = field(default_factory=dict) - - @staticmethod - def compute_id( - source: str, - category: str, - group_key: Optional[str] = None, - lat: Optional[float] = None, - lon: Optional[float] = None, - ) -> str: - """Compute a stable dedup ID for an event. - - Two events with the same source+category+group_key+location - will have the same ID and can be deduplicated. - - Args: - source: Adapter name - category: Event category - group_key: Optional grouping key - lat: Optional latitude - lon: Optional longitude - - Returns: - 16-character hex ID - """ - key_parts = [ - source, - category, - group_key or "", - str(lat) if lat is not None else "", - str(lon) if lon is not None else "", - ] - key_string = ":".join(key_parts) - return hashlib.sha1(key_string.encode()).hexdigest()[:16] - - def to_dict(self) -> dict[str, Any]: - """Serialize event to a dict for JSON storage/webhook. - - Returns: - Dict representation of the event - """ - return asdict(self) - - @classmethod - def from_dict(cls, d: dict[str, Any]) -> "Event": - """Restore an Event from a dict. - - Args: - d: Dict representation (from to_dict or JSON load) - - Returns: - Event instance - """ - return cls(**d) - - -def make_event( - source: str, - category: str, - severity: str, - **kwargs: Any, -) -> Event: - """Create an Event with automatic ID and timestamp. - - This is the primary factory function for creating events. - It auto-computes the ID if not provided and sets timestamp - to the current time if not provided. - - Args: - source: Adapter name (e.g., "nws", "firms", "alert_engine") - category: Event category (e.g., "tornado_warning", "infra_offline") - severity: One of "routine", "priority", "immediate" - **kwargs: Additional Event fields - - Returns: - Event instance - - Raises: - ValueError: If severity is not valid - """ - # Validate severity - if severity not in SEVERITY_LEVELS: - raise ValueError( - f"Invalid severity '{severity}'. " - f"Must be one of: {', '.join(sorted(SEVERITY_LEVELS))}" - ) - - # Auto-set timestamp if not provided - if "timestamp" not in kwargs or kwargs["timestamp"] == 0.0: - kwargs["timestamp"] = time.time() - - # Auto-compute ID if not provided - if "id" not in kwargs or not kwargs["id"]: - kwargs["id"] = Event.compute_id( - source=source, - category=category, - group_key=kwargs.get("group_key"), - lat=kwargs.get("lat"), - lon=kwargs.get("lon"), - ) - - return Event( - source=source, - category=category, - severity=severity, - **kwargs, - ) diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py deleted file mode 100644 index 49d6c2b..0000000 --- a/meshai/notifications/pipeline/__init__.py +++ /dev/null @@ -1,88 +0,0 @@ -"""Notification pipeline package. - -Phase 2.1 provides the bare skeleton: -- EventBus: Central pub/sub for all events -- SeverityRouter: Routes immediate vs digest events -- Dispatcher: Delivers immediate events to channels -- StubDigestQueue: Placeholder for Phase 2.3 aggregator - -Usage: - from meshai.notifications.pipeline import build_pipeline - - pipeline = build_pipeline(channel_config={ - "mesh_health": ["discord"], - "weather": ["discord", "meshtastic"], - }) - - # Emit events through the bus - pipeline["bus"].emit(event) -""" - -from meshai.notifications.pipeline.bus import EventBus, get_bus -from meshai.notifications.pipeline.severity_router import ( - SeverityRouter, - StubDigestQueue, -) -from meshai.notifications.pipeline.dispatcher import ( - Dispatcher, - StubChannelBackend, -) - - -def build_pipeline( - channel_config: dict[str, list[str]] | None = None, -) -> dict: - """Build and wire up the notification pipeline. - - Creates all pipeline components and connects them: - - EventBus receives all events - - SeverityRouter subscribes to bus, routes by severity - - Dispatcher handles immediate events - - StubDigestQueue collects priority/routine events - - Args: - channel_config: Mapping of toggle -> channel names for dispatch. - Example: {"mesh_health": ["discord"]} - - Returns: - Dict with all pipeline components: - - bus: EventBus instance - - router: SeverityRouter instance - - dispatcher: Dispatcher instance - - digest_queue: StubDigestQueue instance - """ - # Create components - bus = EventBus() - dispatcher = Dispatcher(channel_config) - digest_queue = StubDigestQueue() - - # Wire up the router - router = SeverityRouter( - immediate_handler=dispatcher.dispatch, - digest_handler=digest_queue.enqueue, - ) - - # Subscribe router to bus - bus.subscribe(router.handle) - - return { - "bus": bus, - "router": router, - "dispatcher": dispatcher, - "digest_queue": digest_queue, - } - - -__all__ = [ - # Core classes - "EventBus", - "SeverityRouter", - "Dispatcher", - # Stubs for testing/Phase 2.x - "StubDigestQueue", - "StubChannelBackend", - # Factory - "build_pipeline", - # Singleton accessor - "get_bus", -] diff --git a/meshai/notifications/pipeline/bus.py b/meshai/notifications/pipeline/bus.py deleted file mode 100644 index b2cda6d..0000000 --- a/meshai/notifications/pipeline/bus.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Event bus for the notification pipeline. - -The bus is the entry point for all events flowing through the pipeline. -Adapters call bus.emit(event) to push Events into the system. - -Usage: - from meshai.notifications.pipeline import get_bus - from meshai.notifications.events import make_event - - bus = get_bus() - event = make_event(source="nws", category="weather_warning", severity="immediate", ...) - bus.emit(event) -""" - -import logging -from typing import Callable, Iterable - -from meshai.notifications.events import Event - - -class EventBus: - """Central event bus for the notification pipeline. - - Subscribers register handlers that receive every emitted event. - Errors in one subscriber do not prevent other subscribers from - receiving the event. - """ - - def __init__(self): - self._subscribers: list[Callable[[Event], None]] = [] - self._logger = logging.getLogger("meshai.pipeline.bus") - - def subscribe(self, handler: Callable[[Event], None]) -> None: - """Register a handler that receives every emitted event. - - Args: - handler: Callable that takes an Event and returns None - """ - self._subscribers.append(handler) - self._logger.debug(f"Subscribed handler: {handler}") - - def emit(self, event: Event) -> None: - """Push an event to all subscribers. - - Errors in one subscriber do not stop others from receiving - the event. Exceptions are logged but not re-raised. - - Args: - event: The Event to deliver to all subscribers - """ - for handler in self._subscribers: - try: - handler(event) - except Exception: - self._logger.exception( - f"Subscriber {handler} failed on event {event.id}" - ) - - def emit_many(self, events: Iterable[Event]) -> None: - """Emit multiple events in sequence. - - Args: - events: Iterable of Events to emit - """ - for event in events: - self.emit(event) - - -# Module-level singleton for application-wide use -_bus: EventBus | None = None - - -def get_bus() -> EventBus: - """Get the global EventBus singleton. - - This is the primary way adapters access the bus. Tests should - construct a fresh EventBus() directly to avoid shared state. - - Returns: - The global EventBus instance - """ - global _bus - if _bus is None: - _bus = EventBus() - return _bus diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py deleted file mode 100644 index 858a8c5..0000000 --- a/meshai/notifications/pipeline/dispatcher.py +++ /dev/null @@ -1,143 +0,0 @@ -"""Immediate event dispatcher. - -The dispatcher routes immediate-severity events to configured delivery -channels based on the event's toggle category. - -Phase 2.1 provides a stub that logs dispatch attempts. Phase 2.2 will -add real channel backends (Discord webhooks, Meshtastic broadcast, etc.). - -Usage: - dispatcher = Dispatcher(channel_config) - dispatcher.dispatch(event) # Called by SeverityRouter for immediate events -""" - -import logging -from typing import Callable, Optional - -from meshai.notifications.events import Event -from meshai.notifications.categories import get_toggle - - -class Dispatcher: - """Dispatches immediate events to configured channels. - - Each toggle category can have multiple delivery channels configured. - The dispatcher looks up the toggle for an event's category and sends - to all channels registered for that toggle. - - Phase 2.1: Stub implementation that logs but doesn't actually deliver. - Phase 2.2: Will add real channel backends. - """ - - def __init__( - self, - channel_config: Optional[dict[str, list[str]]] = None, - ): - """Initialize the dispatcher. - - Args: - channel_config: Mapping of toggle -> list of channel names. - Example: {"mesh_health": ["discord", "meshtastic"]} - If None, defaults to empty (no channels configured). - """ - self._channels = channel_config or {} - self._logger = logging.getLogger("meshai.pipeline.dispatcher") - self._backends: dict[str, Callable[[Event], None]] = {} - - def register_backend( - self, - channel_name: str, - handler: Callable[[Event], None], - ) -> None: - """Register a delivery backend for a channel. - - Args: - channel_name: Name of the channel (e.g., "discord", "meshtastic") - handler: Callable that delivers the event to the channel - """ - self._backends[channel_name] = handler - self._logger.debug(f"Registered backend: {channel_name}") - - def dispatch(self, event: Event) -> None: - """Dispatch an immediate event to configured channels. - - Looks up the toggle for the event's category, then sends to - all channels configured for that toggle. - - Args: - event: The immediate-severity Event to dispatch - """ - toggle = get_toggle(event.category) - if toggle is None: - self._logger.warning( - f"Unknown category {event.category!r} for event {event.id}, " - "defaulting to mesh_health" - ) - toggle = "mesh_health" - - channels = self._channels.get(toggle, []) - if not channels: - self._logger.info( - f"No channels configured for toggle {toggle!r}, " - f"event {event.id} not dispatched" - ) - return - - for channel in channels: - self._deliver_to_channel(event, channel, toggle) - - def _deliver_to_channel( - self, - event: Event, - channel: str, - toggle: str, - ) -> None: - """Deliver event to a specific channel. - - Args: - event: The Event to deliver - channel: Channel name - toggle: Toggle category (for logging) - """ - backend = self._backends.get(channel) - if backend is None: - # Phase 2.1: Log stub - no real backend yet - self._logger.info( - f"DISPATCH STUB [{toggle}] -> {channel}: {event.title}" - ) - return - - try: - backend(event) - self._logger.info( - f"DISPATCHED [{toggle}] -> {channel}: {event.title}" - ) - except Exception: - self._logger.exception( - f"Failed to dispatch event {event.id} to {channel}" - ) - - -class StubChannelBackend: - """Stub channel backend for testing. - - Collects all events "sent" to it for verification in tests. - """ - - def __init__(self, name: str): - self.name = name - self.events: list[Event] = [] - self._logger = logging.getLogger(f"meshai.pipeline.stub.{name}") - - def send(self, event: Event) -> None: - """Record an event as sent. - - Args: - event: The Event to record - """ - self.events.append(event) - self._logger.info(f"STUB {self.name}: {event.title}") - - def clear(self) -> None: - """Clear recorded events.""" - self.events = [] diff --git a/meshai/notifications/pipeline/severity_router.py b/meshai/notifications/pipeline/severity_router.py deleted file mode 100644 index 089e670..0000000 --- a/meshai/notifications/pipeline/severity_router.py +++ /dev/null @@ -1,104 +0,0 @@ -"""Severity-based event routing. - -The severity router subscribes to the bus and forks each event into -one of two paths based on severity: - -- immediate → immediate_handler (dispatcher for live delivery) -- priority/routine → digest_handler (queue for batched summaries) - -Usage: - router = SeverityRouter( - immediate_handler=dispatcher.dispatch, - digest_handler=digest_queue.enqueue, - ) - bus.subscribe(router.handle) -""" - -import logging -from typing import Callable - -from meshai.notifications.events import Event -from meshai.notifications.categories import get_toggle - - -class SeverityRouter: - """Routes events to immediate or digest handlers based on severity. - - Immediate-severity events go directly to live delivery channels. - Priority and routine events are queued for periodic digest summaries. - """ - - def __init__( - self, - immediate_handler: Callable[[Event], None], - digest_handler: Callable[[Event], None], - ): - """Initialize the severity router. - - Args: - immediate_handler: Called for severity="immediate" events - digest_handler: Called for severity in ("priority", "routine") - """ - self._immediate = immediate_handler - self._digest = digest_handler - self._logger = logging.getLogger("meshai.pipeline.severity_router") - - def handle(self, event: Event) -> None: - """Route an event based on its severity. - - Args: - event: The Event to route - """ - if event.severity == "immediate": - self._logger.info( - f"IMMEDIATE: {event.source}/{event.category} {event.title}" - ) - self._immediate(event) - elif event.severity in ("priority", "routine"): - self._logger.info( - f"DIGEST QUEUED [{event.severity}]: {event.title}" - ) - self._digest(event) - else: - self._logger.warning( - f"Unknown severity {event.severity!r} on event {event.id}, dropping" - ) - - -class StubDigestQueue: - """Placeholder digest queue for Phase 2.1. - - This is a stub that simply collects events in memory. Phase 2.3 - will replace this with the real aggregator that renders and - delivers periodic digest summaries. - """ - - def __init__(self): - self._queue: list[Event] = [] - self._logger = logging.getLogger("meshai.pipeline.digest_stub") - - def enqueue(self, event: Event) -> None: - """Add an event to the digest queue. - - Args: - event: The Event to queue for digest delivery - """ - self._queue.append(event) - toggle = get_toggle(event.category) or "unknown" - self._logger.info(f"DIGEST QUEUED [{toggle}]: {event.title}") - - def drain(self) -> list[Event]: - """Return and clear all queued events. - - For tests and the future aggregator. Returns the current - queue contents and resets the queue to empty. - - Returns: - List of all queued Events - """ - events, self._queue = self._queue, [] - return events - - def __len__(self) -> int: - """Return the number of queued events.""" - return len(self._queue) diff --git a/meshai/notifications/region_tagger.py b/meshai/notifications/region_tagger.py deleted file mode 100644 index faf4fac..0000000 --- a/meshai/notifications/region_tagger.py +++ /dev/null @@ -1,160 +0,0 @@ -"""Region tagger for mapping coordinates and NWS zones to regions. - -This module provides functions to: -- Map lat/lon coordinates to the nearest configured region -- Map NWS zone codes to matching regions - -Usage: - from meshai.notifications.region_tagger import tag_by_coordinates, tag_by_nws_zone - from meshai.config import RegionAnchor - - regions = [ - RegionAnchor(name="South Western ID", lat=43.615, lon=-116.2023, - nws_zones=["IDZ016", "IDZ030"]), - RegionAnchor(name="Magic Valley", lat=42.5558, lon=-114.4701, - nws_zones=["IDZ031"]), - ] - - # Find region by coordinates - region = tag_by_coordinates(43.6, -116.2, regions) - # Returns: "South Western ID" - - # Find regions by NWS zone - regions = tag_by_nws_zone("IDZ016", regions) - # Returns: ["South Western ID"] -""" - -import math -from typing import Optional - -# Import RegionAnchor type for annotations -# Actual import happens at function call time to avoid circular imports -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from meshai.config import RegionAnchor - - -# Earth radius in miles (mean radius) -EARTH_RADIUS_MILES = 3958.8 - - -def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: - """Calculate the great-circle distance between two points on Earth. - - Uses the haversine formula for accuracy on a spherical Earth model. - - Args: - lat1: Latitude of first point in degrees - lon1: Longitude of first point in degrees - lat2: Latitude of second point in degrees - lon2: Longitude of second point in degrees - - Returns: - Distance in miles - """ - # Convert to radians - lat1_rad = math.radians(lat1) - lat2_rad = math.radians(lat2) - lon1_rad = math.radians(lon1) - lon2_rad = math.radians(lon2) - - # Differences - dlat = lat2_rad - lat1_rad - dlon = lon2_rad - lon1_rad - - # Haversine formula - a = math.sin(dlat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2) ** 2 - c = 2 * math.asin(math.sqrt(a)) - - return EARTH_RADIUS_MILES * c - - -def tag_by_coordinates( - lat: float, - lon: float, - regions: list, # list[RegionAnchor] - radius_miles: float = 25.0, -) -> Optional[str]: - """Return the name of the nearest region within radius_miles. - - Finds the closest region anchor to the given coordinates. If the - closest anchor is within radius_miles, returns its name. Otherwise - returns None. - - Args: - lat: Latitude of the point to tag - lon: Longitude of the point to tag - regions: List of RegionAnchor objects to search - radius_miles: Maximum distance to consider (default 25 miles) - - Returns: - Name of the nearest region within range, or None if no match - """ - if not regions: - return None - - closest_region = None - closest_distance = float("inf") - - for region in regions: - # Skip regions without valid coordinates - region_lat = getattr(region, "lat", None) - region_lon = getattr(region, "lon", None) - - if region_lat is None or region_lon is None: - continue - if region_lat == 0.0 and region_lon == 0.0: - # Treat (0, 0) as unset coordinates - continue - - distance = haversine_distance(lat, lon, region_lat, region_lon) - - if distance < closest_distance: - closest_distance = distance - closest_region = region - - # Check if closest is within radius - if closest_region is not None and closest_distance <= radius_miles: - return getattr(closest_region, "name", None) - - return None - - -def tag_by_nws_zone( - zone_code: str, - regions: list, # list[RegionAnchor] -) -> list[str]: - """Return all region names whose nws_zones list contains zone_code. - - Multiple regions can match the same zone (a zone may span multiple - configured regions). - - Args: - zone_code: NWS zone code to match (e.g., "IDZ016") - regions: List of RegionAnchor objects to search - - Returns: - List of region names that contain this zone, empty if no matches - """ - if not zone_code or not regions: - return [] - - # Normalize zone code to uppercase for case-insensitive matching - zone_upper = zone_code.upper().strip() - - matching_regions = [] - - for region in regions: - region_zones = getattr(region, "nws_zones", None) - if not region_zones: - continue - - # Check if zone matches any in this region's list (case-insensitive) - for rz in region_zones: - if rz.upper().strip() == zone_upper: - region_name = getattr(region, "name", None) - if region_name: - matching_regions.append(region_name) - break # Don't add same region twice - - return matching_regions diff --git a/meshai/scripts/__init__.py b/meshai/scripts/__init__.py deleted file mode 100644 index 00d71f6..0000000 --- a/meshai/scripts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# MeshAI scripts package diff --git a/meshai/scripts/migrate_config_v03.py b/meshai/scripts/migrate_config_v03.py deleted file mode 100644 index 35239d8..0000000 --- a/meshai/scripts/migrate_config_v03.py +++ /dev/null @@ -1,736 +0,0 @@ -#!/usr/bin/env python3 -"""Migration script for MeshAI config v0.2 → v0.3. - -This script converts the monolithic /data/config.yaml into the new -multi-file layout under /data/config/. - -Run once: python -m meshai.scripts.migrate_config_v03 - -The migration: -1. Backs up the original config -2. Splits sections into domain files -3. Extracts operator-identifying values to local.yaml -4. Extracts literal secrets to /data/secrets/.env -5. Creates orchestrator config.yaml with !include directives -6. Verifies the new layout loads identically -""" - -import hashlib -import logging -import os -import re -import shutil -import sys -from dataclasses import asdict, fields -from pathlib import Path -from typing import Any - -import yaml - -# Setup logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - datefmt="%H:%M:%S", -) -logger = logging.getLogger(__name__) - - -# ============================================================================= -# DEEP COMPARISON FOR VERIFICATION -# ============================================================================= - -def deep_compare(old, new, path=""): - """Deep compare two values, returning list of difference descriptions.""" - differences = [] - if type(old) != type(new): - differences.append(f"{path}: type {type(old).__name__} vs {type(new).__name__}") - return differences - if isinstance(old, dict): - for key in sorted(set(old.keys()) | set(new.keys())): - child_path = f"{path}.{key}" if path else key - if key not in old: - differences.append(f"{child_path}: missing in backup") - elif key not in new: - differences.append(f"{child_path}: missing in new") - else: - differences.extend(deep_compare(old[key], new[key], child_path)) - elif isinstance(old, list): - if len(old) != len(new): - differences.append(f"{path}: list length {len(old)} vs {len(new)}") - else: - for i, (o, n) in enumerate(zip(old, new)): - differences.extend(deep_compare(o, n, f"{path}[{i}]")) - elif old != new: - differences.append(f"{path}: {repr(old)[:50]} vs {repr(new)[:50]}") - return differences - -# ============================================================================= -# CONFIGURATION -# ============================================================================= - -SOURCE_FILE = Path("/data/config.yaml") -TARGET_DIR = Path("/data/config") -BACKUP_FILE = Path("/data/config.yaml.pre-v03-backup") -SECRETS_DIR = Path("/data/secrets") - -# Section to file mapping -SECTION_TO_FILE = { - "connection": "meshtastic.yaml", - "commands": "meshtastic.yaml", - "mesh_sources": "mesh_sources.yaml", - "mesh_intelligence": "mesh_intelligence.yaml", - "environmental": "env_feeds.yaml", - "notifications": "notifications.yaml", - "llm": "llm.yaml", - "dashboard": "dashboard.yaml", -} - -# Sections that stay inline in orchestrator config.yaml -INLINE_SECTIONS = [ - "timezone", - "bot", - "response", - "history", - "memory", - "context", - "weather", - "meshmonitor", - "knowledge", -] - -# Fields to extract to local.yaml -LOCAL_EXTRACTIONS = { - "bot.name": "identity.name", - "bot.owner": "identity.owner", - "connection.tcp_host": "infrastructure.tcp_host", - "knowledge.qdrant_host": "infrastructure.qdrant_host", - "knowledge.tei_host": "infrastructure.tei_host", - "knowledge.sparse_host": "infrastructure.sparse_host", - "meshmonitor.url": "mesh_sources.meshmonitor_url", - "mesh_intelligence.critical_nodes": "critical_nodes", - "environmental.ducting.latitude": "env_center.latitude", - "environmental.ducting.longitude": "env_center.longitude", - "environmental.nws.user_agent": "identity.contact_email", # Extract email from user_agent -} - -# Secret fields - if found as literals, extract to .env -SECRET_PATTERNS = { - "llm.api_key": "LLM_API_KEY", # Will be renamed based on backend - "mesh_sources.*.api_token": "MESHMONITOR_API_TOKEN", - "mesh_sources.*.password": "MQTT_PASSWORD", - "environmental.traffic.api_key": "TOMTOM_API_KEY", - "environmental.firms.map_key": "FIRMS_MAP_KEY", - "notifications.rules.*.smtp_password": "SMTP_PASSWORD", -} - - -# ============================================================================= -# UTILITY FUNCTIONS -# ============================================================================= - -def get_nested(data: dict, path: str) -> Any: - """Get a value from nested dict using dot notation.""" - parts = path.split(".") - current = data - for part in parts: - if isinstance(current, dict) and part in current: - current = current[part] - else: - return None - return current - - -def set_nested(data: dict, path: str, value: Any) -> None: - """Set a value in nested dict using dot notation, creating dicts as needed.""" - parts = path.split(".") - current = data - for part in parts[:-1]: - if part not in current: - current[part] = {} - current = current[part] - current[parts[-1]] = value - - -def remove_nested(data: dict, path: str) -> bool: - """Remove a value from nested dict. Returns True if removed.""" - parts = path.split(".") - current = data - for part in parts[:-1]: - if isinstance(current, dict) and part in current: - current = current[part] - else: - return False - if parts[-1] in current: - del current[parts[-1]] - return True - return False - - -def file_hash(path: Path) -> str: - """Calculate SHA256 hash of a file.""" - h = hashlib.sha256() - with open(path, "rb") as f: - for chunk in iter(lambda: f.read(8192), b""): - h.update(chunk) - return h.hexdigest() - - -def is_env_var_ref(value: str) -> bool: - """Check if a string is an env var reference like ${VAR_NAME}.""" - if not isinstance(value, str): - return False - return bool(re.match(r"^\$\{[A-Z_][A-Z0-9_]*\}$", value)) - - -def extract_email_from_user_agent(user_agent: str) -> str: - """Extract email from NWS user_agent format: (app, email@domain.com)""" - if not user_agent: - return "" - match = re.search(r"[\w.+-]+@[\w.-]+\.\w+", user_agent) - return match.group(0) if match else "" - - -# ============================================================================= -# PRE-FLIGHT CHECKS -# ============================================================================= - -def preflight_checks() -> bool: - """Run pre-flight checks before migration.""" - logger.info("Running pre-flight checks...") - - # Check source exists - if not SOURCE_FILE.exists(): - logger.error(f"Source file not found: {SOURCE_FILE}") - return False - logger.info(f" Source file exists: {SOURCE_FILE}") - - # Check target directory state - if TARGET_DIR.exists(): - contents = list(TARGET_DIR.iterdir()) - if contents: - logger.error( - f"Target directory {TARGET_DIR} already populated with {len(contents)} items. " - "Manual intervention needed - remove existing files or restore from backup." - ) - return False - logger.info(f" Target directory exists but is empty: {TARGET_DIR}") - else: - logger.info(f" Target directory does not exist: {TARGET_DIR}") - - # Check backup doesn't already exist (indicates previous migration) - if BACKUP_FILE.exists(): - logger.warning( - f"Backup file already exists: {BACKUP_FILE}. " - "This may indicate a previous migration attempt." - ) - # Continue anyway - user may be re-running after fixing issues - - logger.info("Pre-flight checks passed.") - return True - - -# ============================================================================= -# BACKUP -# ============================================================================= - -def create_backup() -> bool: - """Create backup of original config.""" - logger.info(f"Creating backup: {SOURCE_FILE} → {BACKUP_FILE}") - - shutil.copy2(SOURCE_FILE, BACKUP_FILE) - - # Verify backup - source_hash = file_hash(SOURCE_FILE) - backup_hash = file_hash(BACKUP_FILE) - - if source_hash != backup_hash: - logger.error( - f"Backup verification failed! Hashes don't match:\n" - f" Source: {source_hash}\n" - f" Backup: {backup_hash}" - ) - return False - - source_size = SOURCE_FILE.stat().st_size - backup_size = BACKUP_FILE.stat().st_size - - if source_size != backup_size: - logger.error( - f"Backup verification failed! Sizes don't match:\n" - f" Source: {source_size}\n" - f" Backup: {backup_size}" - ) - return False - - logger.info(f" Backup verified: {backup_size} bytes, hash {backup_hash[:12]}...") - return True - - -# ============================================================================= -# EXTRACTION LOGIC -# ============================================================================= - -def extract_local_values(data: dict) -> dict: - """Extract operator-identifying values to local.yaml structure.""" - local = {} - - for source_path, dest_path in LOCAL_EXTRACTIONS.items(): - value = get_nested(data, source_path) - if value is not None and value != "" and value != 0: - # Special handling for user_agent -> email extraction - if source_path == "environmental.nws.user_agent": - value = extract_email_from_user_agent(str(value)) - if not value: - continue - - set_nested(local, dest_path, value) - logger.debug(f" Extracted {source_path} → local.{dest_path}") - - # Extract region coordinates - mi = data.get("mesh_intelligence", {}) - regions = mi.get("regions", []) - if regions: - local["regions"] = {} - for region in regions: - if isinstance(region, dict): - name = region.get("name", "") - lat = region.get("lat", 0) - lon = region.get("lon", 0) - if name and (lat != 0 or lon != 0): - local["regions"][name] = {"lat": lat, "lon": lon} - - # Extract mesh source URLs - mesh_sources = data.get("mesh_sources", []) - if mesh_sources: - local["mesh_sources"] = {"sources": {}} - for source in mesh_sources: - if isinstance(source, dict): - name = source.get("name", "") - url = source.get("url", "") - host = source.get("host", "") - if name and (url or host): - local["mesh_sources"]["sources"][name] = {} - if url: - local["mesh_sources"]["sources"][name]["url"] = url - if host: - local["mesh_sources"]["sources"][name]["host"] = host - - # Extract notification targets - notifications = data.get("notifications", {}) - rules = notifications.get("rules", []) - if rules: - node_ids = set() - recipients = set() - for rule in rules: - if isinstance(rule, dict): - for nid in rule.get("node_ids", []): - node_ids.add(nid) - for rcpt in rule.get("recipients", []): - recipients.add(rcpt) - if node_ids: - local["notification_targets"] = local.get("notification_targets", {}) - local["notification_targets"]["alert_node_ids"] = list(node_ids) - if recipients: - local["notification_targets"] = local.get("notification_targets", {}) - local["notification_targets"]["smtp_recipients"] = list(recipients) - - return local - - -def extract_secrets(data: dict) -> dict: - """Extract literal secrets to .env format.""" - secrets = {} - - # LLM API key - llm = data.get("llm", {}) - api_key = llm.get("api_key", "") - if api_key and not is_env_var_ref(api_key): - backend = llm.get("backend", "openai").upper() - key_name = f"{backend}_API_KEY" - if backend == "GOOGLE": - key_name = "GOOGLE_API_KEY" - secrets[key_name] = api_key - logger.info(f" Extracted llm.api_key → {key_name}") - - # Mesh source tokens/passwords - for i, source in enumerate(data.get("mesh_sources", [])): - if isinstance(source, dict): - token = source.get("api_token", "") - if token and not is_env_var_ref(token): - secrets["MESHMONITOR_API_TOKEN"] = token - logger.info(f" Extracted mesh_sources[{i}].api_token → MESHMONITOR_API_TOKEN") - - password = source.get("password", "") - if password and not is_env_var_ref(password): - secrets["MQTT_PASSWORD"] = password - logger.info(f" Extracted mesh_sources[{i}].password → MQTT_PASSWORD") - - # Environmental API keys - env = data.get("environmental", {}) - traffic = env.get("traffic", {}) - if traffic.get("api_key") and not is_env_var_ref(traffic["api_key"]): - secrets["TOMTOM_API_KEY"] = traffic["api_key"] - logger.info(" Extracted environmental.traffic.api_key → TOMTOM_API_KEY") - - firms = env.get("firms", {}) - if firms.get("map_key") and not is_env_var_ref(firms["map_key"]): - secrets["FIRMS_MAP_KEY"] = firms["map_key"] - logger.info(" Extracted environmental.firms.map_key → FIRMS_MAP_KEY") - - # Notification SMTP passwords - for i, rule in enumerate(data.get("notifications", {}).get("rules", [])): - if isinstance(rule, dict): - smtp_pass = rule.get("smtp_password", "") - if smtp_pass and not is_env_var_ref(smtp_pass): - secrets["SMTP_PASSWORD"] = smtp_pass - logger.info(f" Extracted notifications.rules[{i}].smtp_password → SMTP_PASSWORD") - - return secrets - - -def strip_local_values_from_domain(data: dict) -> dict: - """Remove local values from domain data, replacing with placeholders.""" - # Remove operator values that went to local.yaml - # These get merged back at load time - - # Strip bot name/owner (will come from local.yaml) - if "bot" in data: - data["bot"].pop("name", None) - data["bot"].pop("owner", None) - - # Strip connection tcp_host - if "connection" in data: - data["connection"].pop("tcp_host", None) - - # Strip knowledge hosts - if "knowledge" in data: - data["knowledge"].pop("qdrant_host", None) - data["knowledge"].pop("tei_host", None) - data["knowledge"].pop("sparse_host", None) - - # Strip meshmonitor url - if "meshmonitor" in data: - data["meshmonitor"].pop("url", None) - - # Strip critical_nodes (comes from local.yaml) - if "mesh_intelligence" in data: - data["mesh_intelligence"].pop("critical_nodes", None) - - # Strip region lat/lon (comes from local.yaml) - if "mesh_intelligence" in data: - for region in data["mesh_intelligence"].get("regions", []): - if isinstance(region, dict): - region.pop("lat", None) - region.pop("lon", None) - - # Strip mesh source URLs (comes from local.yaml) - for source in data.get("mesh_sources", []): - if isinstance(source, dict): - source.pop("url", None) - source.pop("host", None) - - # Strip ducting lat/lon (comes from local.yaml) - if "environmental" in data: - ducting = data["environmental"].get("ducting", {}) - ducting.pop("latitude", None) - ducting.pop("longitude", None) - # Strip nws user_agent (comes from local.yaml identity.contact_email) - nws = data["environmental"].get("nws", {}) - nws.pop("user_agent", None) - - # Strip notification node_ids and recipients (comes from local.yaml) - if "notifications" in data: - for rule in data["notifications"].get("rules", []): - if isinstance(rule, dict): - rule.pop("node_ids", None) - rule.pop("recipients", None) - rule.pop("from_address", None) - - return data - - -def replace_secrets_with_refs(data: dict, secrets: dict) -> dict: - """Replace literal secrets with ${VAR_NAME} references.""" - # LLM API key - if "llm" in data and data["llm"].get("api_key"): - backend = data["llm"].get("backend", "openai").upper() - key_name = f"{backend}_API_KEY" - if backend == "GOOGLE": - key_name = "GOOGLE_API_KEY" - data["llm"]["api_key"] = f"${{{key_name}}}" - - # Mesh sources - for source in data.get("mesh_sources", []): - if isinstance(source, dict): - if source.get("api_token") and not is_env_var_ref(source["api_token"]): - source["api_token"] = "${MESHMONITOR_API_TOKEN}" - if source.get("password") and not is_env_var_ref(source["password"]): - source["password"] = "${MQTT_PASSWORD}" - - # Environmental - if "environmental" in data: - traffic = data["environmental"].get("traffic", {}) - if traffic.get("api_key") and not is_env_var_ref(traffic["api_key"]): - traffic["api_key"] = "${TOMTOM_API_KEY}" - - firms = data["environmental"].get("firms", {}) - if firms.get("map_key") and not is_env_var_ref(firms["map_key"]): - firms["map_key"] = "${FIRMS_MAP_KEY}" - - # Notifications - for rule in data.get("notifications", {}).get("rules", []): - if isinstance(rule, dict): - if rule.get("smtp_password") and not is_env_var_ref(rule["smtp_password"]): - rule["smtp_password"] = "${SMTP_PASSWORD}" - - return data - - -# ============================================================================= -# FILE WRITING -# ============================================================================= - -def write_domain_file(path: Path, data: dict) -> None: - """Write a domain YAML file.""" - with open(path, "w") as f: - yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) - logger.info(f" Wrote {path} ({path.stat().st_size} bytes)") - - -def write_orchestrator(path: Path, data: dict) -> None: - """Write the orchestrator config.yaml with !include directives.""" - # Build the orchestrator content manually to preserve !include syntax - lines = [ - "# MeshAI Configuration v0.3", - "# Multi-file layout with !include directives", - "", - ] - - # Add inline sections - for section in INLINE_SECTIONS: - if section in data: - section_yaml = yaml.dump({section: data[section]}, default_flow_style=False, sort_keys=False) - lines.append(section_yaml.rstrip()) - lines.append("") - - # Add !include directives for domain files - lines.append("# Domain files (use !include)") - for section, target_file in SECTION_TO_FILE.items(): - if section in data and section not in ["commands"]: # commands shares file with connection - lines.append(f"{section}: !include {target_file}") - - content = "\n".join(lines) + "\n" - - with open(path, "w") as f: - f.write(content) - logger.info(f" Wrote orchestrator {path} ({path.stat().st_size} bytes)") - - -def write_local_yaml(path: Path, data: dict) -> None: - """Write local.yaml with restricted permissions.""" - header = """# LOCAL OPERATOR CONFIGURATION -# This file is gitignored - contains operator-identifying values -# Edit this file to customize for your deployment - -""" - with open(path, "w") as f: - f.write(header) - yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) - - path.chmod(0o600) - logger.info(f" Wrote {path} ({path.stat().st_size} bytes, mode 600)") - - -def write_env_file(path: Path, secrets: dict) -> None: - """Write .env file with restricted permissions.""" - header = """# MeshAI Secrets -# This file is gitignored - contains API keys and passwords -# Never commit this file to version control - -""" - lines = [header] - for key, value in sorted(secrets.items()): - lines.append(f"{key}={value}") - - content = "\n".join(lines) + "\n" - - with open(path, "w") as f: - f.write(content) - - path.chmod(0o600) - logger.info(f" Wrote {path} ({len(secrets)} secrets, mode 600)") - - -# ============================================================================= -# MAIN MIGRATION -# ============================================================================= - -def run_migration() -> bool: - """Run the full migration process.""" - logger.info("=" * 60) - logger.info("MeshAI Config Migration v0.2 → v0.3") - logger.info("=" * 60) - - # Pre-flight - if not preflight_checks(): - return False - - # Backup - if not create_backup(): - return False - - # Load original config - logger.info(f"Loading original config: {SOURCE_FILE}") - with open(SOURCE_FILE, "r") as f: - original_data = yaml.safe_load(f) - - if not original_data: - logger.error("Original config is empty or invalid!") - return False - - # Make a working copy - import copy - data = copy.deepcopy(original_data) - - # Extract local values - logger.info("Extracting operator-local values...") - local_data = extract_local_values(data) - local_count = sum(1 for _ in _count_values(local_data)) - logger.info(f" Extracted {local_count} local values") - - # Extract secrets - logger.info("Extracting secrets...") - secrets = extract_secrets(data) - logger.info(f" Extracted {len(secrets)} secrets") - - # Replace secrets with env var references - data = replace_secrets_with_refs(data, secrets) - - # Strip local values from domain data - data = strip_local_values_from_domain(data) - - # Create directories - logger.info("Creating directories...") - TARGET_DIR.mkdir(parents=True, exist_ok=True) - SECRETS_DIR.mkdir(parents=True, exist_ok=True) - SECRETS_DIR.chmod(0o700) - logger.info(f" Created {TARGET_DIR}") - logger.info(f" Created {SECRETS_DIR} (mode 700)") - - # Write domain files - logger.info("Writing domain files...") - - # Group sections by target file - file_contents: dict[str, dict] = {} - for section, target_file in SECTION_TO_FILE.items(): - if section in data: - if target_file not in file_contents: - file_contents[target_file] = {} - # For meshtastic.yaml, wrap in section name - if target_file == "meshtastic.yaml": - file_contents[target_file][section] = data[section] - else: - # For dedicated files, the whole file IS the section content - file_contents[target_file] = data[section] - - # Handle meshtastic.yaml specially (has both connection and commands) - for target_file, content in file_contents.items(): - write_domain_file(TARGET_DIR / target_file, content) - - # Write orchestrator - write_orchestrator(TARGET_DIR / "config.yaml", data) - - # Write local.yaml - write_local_yaml(TARGET_DIR / "local.yaml", local_data) - - # Write .env - if secrets: - write_env_file(SECRETS_DIR / ".env", secrets) - else: - logger.info(" No secrets to write (all were already env var refs)") - - # Verification - logger.info("=" * 60) - logger.info("Verifying migration...") - - try: - # Import here to use the newly deployed module - sys.path.insert(0, "/app") - from meshai.config_loader import load_config as new_load - from meshai.config import load_config as old_load, _dataclass_to_dict - - # Load with new loader - new_config = new_load(TARGET_DIR) - - # Load backup with old loader - old_config = old_load(BACKUP_FILE) - # Deep compare all fields using asdict() - old_dict = asdict(old_config) - new_dict = asdict(new_config) - # Remove internal fields that will differ - for d in [old_dict, new_dict]: - d.pop("_config_path", None) - - differences = deep_compare(old_dict, new_dict) - - if differences: - logger.error("Verification FAILED! Differences found:") - for diff in differences: - logger.error(f" {diff}") - return False - logger.info(" Verification PASSED - config loads correctly") - - except Exception as e: - logger.error(f"Verification failed with exception: {e}") - import traceback - traceback.print_exc() - return False - - # Summary - logger.info("=" * 60) - logger.info("MIGRATION COMPLETE") - logger.info("=" * 60) - logger.info("") - logger.info("Files created:") - for f in sorted(TARGET_DIR.iterdir()): - logger.info(f" {f} ({f.stat().st_size} bytes)") - if (SECRETS_DIR / ".env").exists(): - logger.info(f" {SECRETS_DIR / '.env'} ({len(secrets)} secrets)") - logger.info("") - logger.info(f"Local values extracted: {local_count}") - logger.info(f"Secrets extracted: {len(secrets)} ({', '.join(secrets.keys()) if secrets else 'none'})") - logger.info("") - logger.info(f"Backup at: {BACKUP_FILE}") - logger.info("Delete the backup manually after verifying things work.") - logger.info("") - logger.info("ROLLBACK COMMAND (if needed):") - logger.info(f" rm -rf {TARGET_DIR} {SECRETS_DIR}") - logger.info(f" cp {BACKUP_FILE} {SOURCE_FILE}") - logger.info(" # Then revert main.py loader wiring if changed") - - return True - - -def _count_values(d: dict, prefix: str = "") -> Any: - """Generator to count leaf values in a nested dict.""" - for key, value in d.items(): - if isinstance(value, dict): - yield from _count_values(value, f"{prefix}.{key}") - elif isinstance(value, list): - for i, item in enumerate(value): - if isinstance(item, dict): - yield from _count_values(item, f"{prefix}.{key}[{i}]") - else: - yield f"{prefix}.{key}[{i}]" - else: - yield f"{prefix}.{key}" - - -# ============================================================================= -# ENTRY POINT -# ============================================================================= - -if __name__ == "__main__": - success = run_migration() - sys.exit(0 if success else 1)