Compare commits

...

9 commits

Author SHA1 Message Date
Ubuntu
4e4a837c5e feat(notifications): add Phase 1.3 + 2.1 pipeline skeleton
Phase 1.3:
- events.py: Event dataclass with ID generation and serialization
- region_tagger.py: Coordinate/NWS zone region tagging
- categories.py: Toggle field mapping for all 31 alert categories

Phase 2.1 Pipeline Skeleton:
- pipeline/bus.py: EventBus with subscribe/emit pattern
- pipeline/severity_router.py: Routes immediate->dispatch, routine->digest
- pipeline/dispatcher.py: Delivers immediate events to configured channels
- pipeline/__init__.py: build_pipeline() factory and exports

All components tested and verified in container.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 17:21:45 +00:00
0703d00d94 feat(notifications): map alert categories to v0.3 toggles
Adds toggle field to each ALERT_CATEGORIES entry:
- mesh_health: 18 categories (infra, power, utilization, coverage, health)
- weather: 3 categories (NWS warnings, stream flooding)
- fire: 3 categories (NIFC, FIRMS hotspots)
- rf_propagation: 3 categories (solar, geomag, ducting)
- roads: 2 categories (closures, congestion)
- avalanche: 2 categories (high danger, considerable)

Also adds helper functions:
- categories_for_toggle(toggle) -> list of category IDs
- get_toggle(category_name) -> toggle name or None

Note: seismic and tracking toggles defined but have no categories yet
(reserved for Phase 3 and Phase 7 respectively).

All toggle assignments are unambiguous - no categories defaulted to
mesh_health due to ambiguity.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 16:29:50 +00:00
e6897b3f33 feat(notifications): add region tagger with coordinate and NWS zone matching
Adds meshai/notifications/region_tagger.py with:
- haversine_distance() for great-circle distance calculation
- tag_by_coordinates() maps lat/lon to nearest region within radius
- tag_by_nws_zone() maps NWS zone codes to matching regions

Also adds nws_zones field to RegionAnchor in config.py to support
zone-based matching. Default is empty list for backward compatibility.

This is scaffolding for Phase 2 - not yet wired into any adapters.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 16:26:53 +00:00
dc52187c93 feat(notifications): add Event dataclass for v0.3 pipeline
Adds meshai/notifications/events.py with:
- Event dataclass with all fields for unified pipeline shape
- Stable ID generation via sha1 hash for deduplication
- make_event() factory with auto-timestamp and severity validation
- to_dict/from_dict for serialization round-trip

This is scaffolding for Phase 2 - not yet wired into any adapters.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 16:23:57 +00:00
5274933fa0 fix(migration): deep-equality verification gate for full Config
- Added deep_compare function for recursive dict comparison
- Replaced shallow key-list check with full Config dataclass comparison
- Uses dataclasses.asdict for consistent dict representation
- Reports full path of mismatches (e.g. connection.tcp_host)

The previous gate only checked inline sections and missed the
include-related bugs that caused the restart loop.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 16:14:27 +00:00
67ab2689fe fix(config): correct meshtastic include nesting
- Changed orchestrator to use meshtastic: include meshtastic.yaml
- Added hoisting logic to extract connection/commands from wrapper
- Fixes restart loop caused by connection.type defaulting to serial

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 16:14:18 +00:00
965a844b0d feat(config): split monolithic config + extract secrets
- Update .gitignore for v0.3 multi-file layout
- Add config/.env.example template for secrets
- Add config/local.yaml.example for operator values
- Wire main.py to use new config_loader
- Support both legacy and new layouts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 15:14:12 +00:00
2c11432bd8 feat(config): add migration script for v0.2 to v0.3 layout
- Backup original config before migration
- Split monolithic config into domain files
- Extract operator-identifying values to local.yaml
- Extract secrets to /data/secrets/.env
- Create orchestrator with !include directives
- Post-migration verification
- Safe to run multiple times (idempotent checks)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 15:14:05 +00:00
9e3f940a1b feat(config): add multi-file config loader with !include support
- Add config_loader.py with !include directive support
- Environment variable interpolation with default syntax
- local.yaml merging for operator-identifying values
- Secret loading from /data/secrets/.env
- save_section() for dashboard write-back
- Cycle detection for include directives
- Graceful degradation when files missing

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-14 15:13:56 +00:00
15 changed files with 2397 additions and 7 deletions

20
.gitignore vendored
View file

@ -1,3 +1,13 @@
# Operator-identifying config and secrets (v0.3 split)
/data/config/local.yaml
/data/config/secrets/
/data/secrets/
.env
.env.local
.env.*
!.env.example
local.yaml
!local.yaml.example
# Python
__pycache__/
*.py[cod]
@ -49,3 +59,13 @@ data/
# OS
.DS_Store
Thumbs.db
# Operator-identifying config and secrets (v0.3 split)
/data/config/local.yaml
/data/config/secrets/
/data/secrets/
.env
.env.local
.env.*
!.env.example
local.yaml
!local.yaml.example

19
config/.env.example Normal file
View file

@ -0,0 +1,19 @@
# MeshAI Secrets Template
# Copy to /data/secrets/.env and fill in your values
# This file is gitignored - never commit real secrets
# LLM API Keys (only one needed based on your backend choice)
OPENAI_API_KEY=
ANTHROPIC_API_KEY=
GOOGLE_API_KEY=
# Mesh Source Credentials
MESHMONITOR_API_TOKEN=
MQTT_PASSWORD=
# Environmental Feed Keys
TOMTOM_API_KEY=
FIRMS_MAP_KEY=
# Notification Credentials
SMTP_PASSWORD=

57
config/local.yaml.example Normal file
View file

@ -0,0 +1,57 @@
# MeshAI Local Configuration Template
# Copy to /data/config/local.yaml and customize for your deployment
# This file is gitignored - contains operator-identifying values
# Operator Identity
identity:
name: "" # Bot display name
owner: "" # Owner callsign/name
primary_node_id: "" # Your main mesh node ID
contact_email: "" # For NWS user_agent, SMTP from
# Region Coordinates
# Map your region names to their lat/lon center points
regions:
"Example Region":
lat: 0.0
lon: 0.0
# Add more regions as needed:
# "Another Region":
# lat: 42.5
# lon: -114.5
# Mesh Data Source URLs
mesh_sources:
meshmonitor_url: "" # Your MeshMonitor instance
sources:
# Per-source URL overrides (matches names in mesh_sources.yaml)
"My-Meshview":
url: ""
# "My-MeshMonitor":
# url: ""
# Infrastructure Hosts
infrastructure:
tcp_host: "" # Meshtastic TCP host (meshtasticd)
qdrant_host: "" # Qdrant vector DB (optional)
tei_host: "" # TEI embedding service (optional)
sparse_host: "" # Sparse embedding service (optional)
# Environmental Feed Center Point
env_center:
latitude: 0.0 # Center of your coverage area
longitude: 0.0
# Notification Targets
notification_targets:
smtp_from: "" # Email from address
smtp_recipients: [] # Default email recipients
webhook_urls: [] # Webhook endpoints
alert_node_ids: [] # Node IDs for mesh DM alerts
# Critical Infrastructure Nodes (short names)
critical_nodes: []
# Example:
# critical_nodes:
# - "MHR"
# - "HPR"

View file

@ -230,6 +230,7 @@ class RegionAnchor:
description: str = "" # e.g., "Twin Falls, Burley, Jerome along I-84/US-93"
aliases: list[str] = field(default_factory=list) # e.g., ["southern Idaho", "magic valley"]
cities: list[str] = field(default_factory=list) # e.g., ["Twin Falls", "Burley", "Jerome"]
nws_zones: list[str] = field(default_factory=list) # NWS zone codes (e.g., ["IDZ016", "IDZ030"])
@dataclass

688
meshai/config_loader.py Normal file
View file

@ -0,0 +1,688 @@
"""Multi-file configuration loader for MeshAI v0.3.
This module provides:
- !include directive support for splitting config across files
- Environment variable interpolation (${VAR_NAME} and ${VAR_NAME:-default})
- Operator-local value merging from local.yaml
- Secret loading from .env files
- Section-aware save_section() for dashboard write-back
The loader produces the same Config dataclass shape as config.py,
ensuring backward compatibility with all existing consumers.
"""
import logging
import os
import re
from pathlib import Path
from typing import Any, Optional
import yaml
from dotenv import dotenv_values
# Import existing dataclasses - shape must NOT change
from .config import (
Config,
_dict_to_dataclass,
_dataclass_to_dict,
)
_logger = logging.getLogger(__name__)
# =============================================================================
# SECTION TO FILE MAPPING
# =============================================================================
SECTION_TO_FILE: dict[str, str] = {
# Inline in orchestrator config.yaml
"timezone": "config.yaml",
"bot": "config.yaml",
"response": "config.yaml",
"history": "config.yaml",
"memory": "config.yaml",
"context": "config.yaml",
"weather": "config.yaml",
"meshmonitor": "config.yaml",
"knowledge": "config.yaml",
# Domain files
"connection": "meshtastic.yaml",
"commands": "meshtastic.yaml",
"mesh_sources": "mesh_sources.yaml",
"mesh_intelligence": "mesh_intelligence.yaml",
"environmental": "env_feeds.yaml",
"notifications": "notifications.yaml",
"llm": "llm.yaml",
"dashboard": "dashboard.yaml",
}
# Fields that should be written to local.yaml instead of domain files
LOCAL_FIELDS: dict[str, str] = {
"bot.name": "identity.name",
"bot.owner": "identity.owner",
"connection.tcp_host": "infrastructure.tcp_host",
"knowledge.qdrant_host": "infrastructure.qdrant_host",
"knowledge.tei_host": "infrastructure.tei_host",
"knowledge.sparse_host": "infrastructure.sparse_host",
"meshmonitor.url": "mesh_sources.meshmonitor_url",
"mesh_intelligence.critical_nodes": "critical_nodes",
"environmental.ducting.latitude": "env_center.latitude",
"environmental.ducting.longitude": "env_center.longitude",
}
# Fields that contain secrets - NEVER written, must be in .env
SECRET_FIELDS: set[str] = {
"llm.api_key",
"mesh_sources.*.api_token",
"mesh_sources.*.password",
"environmental.traffic.api_key",
"environmental.firms.map_key",
"notifications.rules.*.smtp_password",
}
# Secret env var names expected in .env
EXPECTED_SECRETS: list[str] = [
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"GOOGLE_API_KEY",
"MESHMONITOR_API_TOKEN",
"MQTT_PASSWORD",
"TOMTOM_API_KEY",
"FIRMS_MAP_KEY",
"SMTP_PASSWORD",
]
# =============================================================================
# YAML !INCLUDE CONSTRUCTOR
# =============================================================================
# Global set for tracking files currently being loaded (cycle detection)
_loading_files: set[Path] = set()
def _make_include_loader(base_path: Path):
"""Create an IncludeLoader class with the given base path."""
class IncludeLoader(yaml.SafeLoader):
"""YAML loader with !include tag support."""
pass
def construct_include(loader: IncludeLoader, node: yaml.Node) -> Any:
"""Handle !include directive."""
relative_path = loader.construct_scalar(node)
include_path = (base_path / relative_path).resolve()
# Cycle detection using global set
if include_path in _loading_files:
raise yaml.YAMLError(
f"Circular include detected: {include_path} is already being loaded. "
f"Current loading chain: {[str(p) for p in _loading_files]}"
)
if not include_path.exists():
raise yaml.YAMLError(
f"Include file not found: {include_path} "
f"(referenced from {base_path / 'config.yaml'})"
)
_loading_files.add(include_path)
try:
with open(include_path, "r") as f:
# Recursively load with the include file's directory as new base
NestedLoader = _make_include_loader(include_path.parent)
return yaml.load(f, Loader=NestedLoader)
finally:
_loading_files.discard(include_path)
IncludeLoader.add_constructor("!include", construct_include)
return IncludeLoader
def _load_yaml_with_includes(file_path: Path) -> dict:
"""Load a YAML file with !include directive support."""
global _loading_files
_loading_files.clear() # Reset cycle detection
if not file_path.exists():
return {}
# Add the root file to loading set
file_path = file_path.resolve()
_loading_files.add(file_path)
try:
with open(file_path, "r") as f:
Loader = _make_include_loader(file_path.parent)
return yaml.load(f, Loader=Loader) or {}
finally:
_loading_files.discard(file_path)
# =============================================================================
# ENVIRONMENT VARIABLE INTERPOLATION
# =============================================================================
_ENV_PATTERN = re.compile(r"\$\{([A-Z_][A-Z0-9_]*)(?::-([^}]*))?\}")
def _interpolate_env_vars(value: Any, env: dict[str, str]) -> Any:
"""Recursively interpolate ${VAR_NAME} and ${VAR_NAME:-default} in strings.
Args:
value: The value to interpolate (can be string, dict, list, or other)
env: Combined environment (os.environ + .env file values)
Returns:
The value with environment variables resolved
"""
if isinstance(value, str):
def replace_match(match):
var_name = match.group(1)
default = match.group(2)
# os.environ takes precedence over .env file
resolved = os.environ.get(var_name)
if resolved is None:
resolved = env.get(var_name)
if resolved is None:
if default is not None:
return default
_logger.warning(
f"Environment variable ${{{var_name}}} not found and no default provided. "
"Using empty string."
)
return ""
return resolved
return _ENV_PATTERN.sub(replace_match, value)
elif isinstance(value, dict):
return {k: _interpolate_env_vars(v, env) for k, v in value.items()}
elif isinstance(value, list):
return [_interpolate_env_vars(item, env) for item in value]
return value
# =============================================================================
# LOCAL.YAML MERGING
# =============================================================================
def _merge_local_values(data: dict, local: dict) -> dict:
"""Merge operator-local values from local.yaml into the config data.
This handles:
- identity.name/owner -> bot.name/owner
- infrastructure.* -> connection/knowledge hosts
- regions.{name}.lat/lon -> mesh_intelligence.regions[name].lat/lon
- critical_nodes -> mesh_intelligence.critical_nodes
- mesh_sources.sources.{name}.* -> mesh_sources[name].*
- env_center.* -> environmental.ducting.*
- notification_targets.* -> notifications rules
Args:
data: The loaded config data (will be modified in place)
local: The local.yaml data
Returns:
The merged data dict
"""
if not local:
return data
# Identity -> bot
identity = local.get("identity", {})
if "bot" in data:
if identity.get("name"):
data["bot"]["name"] = identity["name"]
if identity.get("owner"):
data["bot"]["owner"] = identity["owner"]
# Infrastructure hosts
infra = local.get("infrastructure", {})
if infra.get("tcp_host") and "connection" in data:
data["connection"]["tcp_host"] = infra["tcp_host"]
if "knowledge" in data:
if infra.get("qdrant_host"):
data["knowledge"]["qdrant_host"] = infra["qdrant_host"]
if infra.get("tei_host"):
data["knowledge"]["tei_host"] = infra["tei_host"]
if infra.get("sparse_host"):
data["knowledge"]["sparse_host"] = infra["sparse_host"]
# Meshmonitor URL
mesh_sources_local = local.get("mesh_sources", {})
if mesh_sources_local.get("meshmonitor_url") and "meshmonitor" in data:
data["meshmonitor"]["url"] = mesh_sources_local["meshmonitor_url"]
# Mesh sources URLs
sources_local = mesh_sources_local.get("sources", {})
if "mesh_sources" in data and isinstance(data["mesh_sources"], list):
for source in data["mesh_sources"]:
if isinstance(source, dict):
source_name = source.get("name", "")
local_source = sources_local.get(source_name, {})
if local_source.get("url"):
source["url"] = local_source["url"]
if local_source.get("host"):
source["host"] = local_source["host"]
# Region coordinates
regions_local = local.get("regions", {})
if "mesh_intelligence" in data:
mi = data["mesh_intelligence"]
if "regions" in mi and isinstance(mi["regions"], list):
for region in mi["regions"]:
if isinstance(region, dict):
region_name = region.get("name", "")
local_coords = regions_local.get(region_name, {})
if "lat" in local_coords:
region["lat"] = local_coords["lat"]
if "lon" in local_coords:
region["lon"] = local_coords["lon"]
# Critical nodes
if local.get("critical_nodes"):
mi["critical_nodes"] = local["critical_nodes"]
# Environmental center point
env_center = local.get("env_center", {})
if "environmental" in data:
env = data["environmental"]
if "ducting" in env:
if env_center.get("latitude") is not None:
env["ducting"]["latitude"] = env_center["latitude"]
if env_center.get("longitude") is not None:
env["ducting"]["longitude"] = env_center["longitude"]
# NWS user agent from contact email
if identity.get("contact_email") and "nws" in env:
email = identity["contact_email"]
env["nws"]["user_agent"] = f"(meshai, {email})"
# Notification targets
notif_targets = local.get("notification_targets", {})
if "notifications" in data and "rules" in data["notifications"]:
alert_node_ids = notif_targets.get("alert_node_ids", [])
smtp_recipients = notif_targets.get("smtp_recipients", [])
for rule in data["notifications"]["rules"]:
if isinstance(rule, dict):
# Apply default node_ids if not set
if rule.get("delivery_type") == "mesh_dm" and not rule.get("node_ids"):
rule["node_ids"] = alert_node_ids
# Apply default recipients if not set
if rule.get("delivery_type") == "email" and not rule.get("recipients"):
rule["recipients"] = smtp_recipients
# Apply smtp_from
if notif_targets.get("smtp_from") and not rule.get("from_address"):
rule["from_address"] = notif_targets["smtp_from"]
return data
# =============================================================================
# VALIDATION
# =============================================================================
def _validate_config(data: dict, local: dict, env: dict[str, str]) -> None:
"""Validate config and log warnings for missing values.
This does NOT raise errors - MeshAI starts in degraded mode with missing values.
"""
# Check regions for missing coordinates
if "mesh_intelligence" in data:
mi = data["mesh_intelligence"]
if mi.get("enabled") and "regions" in mi:
regions_local = local.get("regions", {}) if local else {}
for region in mi["regions"]:
if isinstance(region, dict):
region_name = region.get("name", "unknown")
if not region.get("lat") or not region.get("lon"):
if region_name not in regions_local:
_logger.warning(
f"Region '{region_name}' has no coordinates in local.yaml - "
"geographic features disabled for this region"
)
# Check for missing secrets
missing_secrets = []
for secret in EXPECTED_SECRETS:
if not os.environ.get(secret) and not env.get(secret):
missing_secrets.append(secret)
if missing_secrets:
_logger.warning(
f"Missing secret environment variables: {', '.join(missing_secrets)}. "
"Some features may be disabled."
)
# Check LLM API key
if "llm" in data:
api_key = data["llm"].get("api_key", "")
if not api_key or (api_key.startswith("${") and api_key.endswith("}")):
# It's a reference, check if resolved
backend = data["llm"].get("backend", "openai").lower()
key_var = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"google": "GOOGLE_API_KEY",
}.get(backend, "LLM_API_KEY")
if not os.environ.get(key_var) and not env.get(key_var):
_logger.warning(
f"LLM backend '{backend}' configured but {key_var} not found. "
"LLM responses will fail."
)
# =============================================================================
# MAIN LOADER
# =============================================================================
def load_config(config_dir: Path = Path("/data/config")) -> Config:
"""Load configuration from multi-file layout.
This function:
1. Reads config.yaml (orchestrator) with !include directives
2. Reads local.yaml if present (operator-local values)
3. Reads /data/secrets/.env if present (secret values)
4. Interpolates ${VAR_NAME} references
5. Merges local values into config
6. Validates and logs warnings for missing values
7. Returns the same Config dataclass shape
Args:
config_dir: Path to config directory (default: /data/config)
Returns:
Config dataclass instance
"""
config_dir = Path(config_dir)
# Determine config file path
# Support both new layout (/data/config/config.yaml) and legacy (/data/config.yaml)
orchestrator_path = config_dir / "config.yaml"
legacy_path = config_dir.parent / "config.yaml" if config_dir.name == "config" else None
if not orchestrator_path.exists():
if legacy_path and legacy_path.exists():
# Fall back to legacy single-file config
_logger.info(f"Using legacy config at {legacy_path}")
from .config import load_config as legacy_load
return legacy_load(legacy_path)
else:
_logger.warning(
f"Config file not found at {orchestrator_path}. "
"Using default configuration."
)
config = Config()
config._config_path = orchestrator_path
return config
# Load orchestrator with !include support
_logger.debug(f"Loading config from {orchestrator_path}")
data = _load_yaml_with_includes(orchestrator_path)
# Hoist meshtastic.connection and meshtastic.commands to top level
# meshtastic.yaml contains both sections under wrapper keys
if "meshtastic" in data and isinstance(data["meshtastic"], dict):
meshtastic = data.pop("meshtastic")
if "connection" in meshtastic:
data["connection"] = meshtastic["connection"]
if "commands" in meshtastic:
data["commands"] = meshtastic["commands"]
# Load local.yaml
local_path = config_dir / "local.yaml"
local_data = {}
if local_path.exists():
with open(local_path, "r") as f:
local_data = yaml.safe_load(f) or {}
_logger.debug(f"Loaded local config from {local_path}")
else:
_logger.warning(
f"No local.yaml found at {local_path}. "
"MeshAI is in no-location mode - geographic features disabled."
)
# Load secrets from .env
secrets_path = config_dir.parent / "secrets" / ".env"
env_data = {}
if secrets_path.exists():
env_data = dotenv_values(secrets_path)
_logger.debug(f"Loaded {len(env_data)} secrets from {secrets_path}")
else:
# Try alternate location
alt_secrets_path = Path("/data/secrets/.env")
if alt_secrets_path.exists():
env_data = dotenv_values(alt_secrets_path)
_logger.debug(f"Loaded {len(env_data)} secrets from {alt_secrets_path}")
else:
_logger.warning(
f"No .env file found at {secrets_path}. "
"API keys must be set via environment variables."
)
# Interpolate environment variables
data = _interpolate_env_vars(data, env_data)
# Merge local values
data = _merge_local_values(data, local_data)
# Validate and warn
_validate_config(data, local_data, env_data)
# Convert to Config dataclass
config = _dict_to_dataclass(Config, data)
config._config_path = orchestrator_path
return config
# =============================================================================
# SECTION SAVER
# =============================================================================
def _is_secret_field(section: str, field_path: str) -> bool:
"""Check if a field path matches a secret field pattern."""
full_path = f"{section}.{field_path}" if field_path else section
for pattern in SECRET_FIELDS:
# Convert pattern to regex
regex = pattern.replace(".", r"\.").replace("*", r"[^.]+")
if re.match(f"^{regex}$", full_path):
return True
return False
def _extract_local_fields(section: str, data: dict) -> tuple[dict, dict]:
"""Extract local fields from data.
Returns:
(domain_data, local_data) - data split by destination
"""
domain_data = dict(data)
local_data = {}
# Check each LOCAL_FIELDS pattern
for field_pattern, local_path in LOCAL_FIELDS.items():
if not field_pattern.startswith(f"{section}."):
continue
# Extract field name from pattern
field_name = field_pattern[len(section) + 1:] # Remove "section."
if ".*." in field_name:
# Array field pattern - handle specially
continue
if field_name in domain_data:
# Move to local_data using the local_path
value = domain_data.pop(field_name)
# Build nested structure in local_data
parts = local_path.split(".")
current = local_data
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
return domain_data, local_data
def save_section(
section_name: str,
data: dict,
config_dir: Path = Path("/data/config"),
) -> dict:
"""Save a configuration section to the appropriate file(s).
This function:
1. Determines which file(s) the section belongs to
2. Extracts local-identifying fields to local.yaml
3. Rejects attempts to save secret fields
4. Writes domain data to the appropriate file
5. Writes local data to local.yaml
Args:
section_name: Name of the section (e.g., "notifications", "llm")
data: The section data as a dict
config_dir: Path to config directory
Returns:
Dict with status: {"saved": True, "files_written": [...], "rejected_secrets": [...]}
Raises:
ValueError: If section_name is not recognized
"""
config_dir = Path(config_dir)
if section_name not in SECTION_TO_FILE:
raise ValueError(
f"Unknown section '{section_name}'. "
f"Valid sections: {', '.join(sorted(SECTION_TO_FILE.keys()))}"
)
target_file = SECTION_TO_FILE[section_name]
target_path = config_dir / target_file
local_path = config_dir / "local.yaml"
files_written = []
rejected_secrets = []
# Check for secret fields and reject them
def check_secrets(d: dict, path: str = "") -> dict:
cleaned = {}
for key, value in d.items():
field_path = f"{path}.{key}" if path else key
if _is_secret_field(section_name, field_path):
rejected_secrets.append(field_path)
_logger.error(
f"Rejected attempt to save secret field '{section_name}.{field_path}'. "
"Secret fields must be set via /data/secrets/.env"
)
elif isinstance(value, dict):
cleaned[key] = check_secrets(value, field_path)
elif isinstance(value, list):
cleaned[key] = [
check_secrets(item, f"{field_path}[{i}]")
if isinstance(item, dict) else item
for i, item in enumerate(value)
]
else:
cleaned[key] = value
return cleaned
data = check_secrets(data)
# Extract local fields
domain_data, local_updates = _extract_local_fields(section_name, data)
# Load existing target file
if target_path.exists():
with open(target_path, "r") as f:
existing = yaml.safe_load(f) or {}
else:
existing = {}
# Handle sections that share a file (meshtastic.yaml has both connection and commands)
if target_file == "meshtastic.yaml":
existing[section_name] = domain_data
elif target_file == "config.yaml":
# For orchestrator, update the section in place
existing[section_name] = domain_data
else:
# For dedicated files, the whole file IS the section
existing = domain_data
# Write domain file
with open(target_path, "w") as f:
yaml.dump(existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
files_written.append(str(target_path))
_logger.info(f"Saved {section_name} to {target_path}")
# Update local.yaml if there are local fields
if local_updates:
if local_path.exists():
with open(local_path, "r") as f:
local_existing = yaml.safe_load(f) or {}
else:
local_existing = {}
# Deep merge local_updates into local_existing
def deep_merge(base: dict, updates: dict) -> dict:
for key, value in updates.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
deep_merge(base[key], value)
else:
base[key] = value
return base
deep_merge(local_existing, local_updates)
with open(local_path, "w") as f:
yaml.dump(local_existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
# Set restrictive permissions on local.yaml
local_path.chmod(0o600)
files_written.append(str(local_path))
_logger.info(f"Updated local values in {local_path}")
return {
"saved": True,
"files_written": files_written,
"rejected_secrets": rejected_secrets,
}
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def get_config_dir_from_path(config_path: Path) -> Path:
"""Determine config directory from a config file path.
Args:
config_path: Path to config.yaml (could be legacy or new layout)
Returns:
Path to config directory
"""
config_path = Path(config_path)
if config_path.is_dir():
return config_path
# If pointing to config.yaml in new layout
if config_path.name == "config.yaml" and config_path.parent.name == "config":
return config_path.parent
# If pointing to legacy /data/config.yaml
if config_path.name == "config.yaml":
new_layout = config_path.parent / "config"
if new_layout.exists() and (new_layout / "config.yaml").exists():
return new_layout
return config_path.parent

View file

@ -16,7 +16,8 @@ from .cli import run_configurator
from .commands import CommandDispatcher
from .commands.dispatcher import create_dispatcher
from .commands.status import set_start_time
from .config import Config, load_config
from .config import Config
from .config_loader import load_config, get_config_dir_from_path
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
@ -712,12 +713,21 @@ def main() -> None:
run_configurator(args.config_file)
return
# Load config
config = load_config(args.config_file)
# Load config - support both old (/data/config.yaml) and new (/data/config/) layouts
config_path = args.config_file
config_dir = get_config_dir_from_path(config_path)
# Check if config exists
if not args.config_file.exists():
logger.warning(f"Config file not found: {args.config_file}")
# Check for new multi-file layout first
if (config_dir / "config.yaml").exists():
logger.info(f"Loading config from multi-file layout: {config_dir}")
config = load_config(config_dir)
elif config_path.exists():
# Fall back to legacy single-file loading
logger.info(f"Loading legacy config: {config_path}")
from .config import load_config as legacy_load
config = legacy_load(config_path)
else:
logger.warning(f"Config not found at {config_path} or {config_dir}")
logger.info("Run 'meshai --config' to create one, or copy config.example.yaml")
sys.exit(1)

View file

@ -7,8 +7,34 @@ Severity levels (military/intelligence precedence):
routine - Informational, no time pressure
priority - Needs attention soon
immediate - Act now, drop everything
Toggle categories (for v0.3 notification routing):
mesh_health - infrastructure, power, utilization, coverage, health-score
weather - NWS-sourced alerts, stream flooding
fire - NIFC perimeters, FIRMS hotspots
rf_propagation - solar, geomagnetic, ducting, band conditions
roads - 511, TomTom traffic
avalanche - avalanche advisories
seismic - USGS quakes (Phase 3)
tracking - ADS-B, AIS, satellite passes (Phase 7)
"""
from typing import Optional
# Valid toggle values for v0.3 pipeline
VALID_TOGGLES = frozenset({
"mesh_health",
"weather",
"fire",
"rf_propagation",
"roads",
"avalanche",
"seismic",
"tracking",
})
ALERT_CATEGORIES = {
# Infrastructure alerts
"infra_offline": {
@ -16,24 +42,28 @@ ALERT_CATEGORIES = {
"description": "An infrastructure node (router/repeater) stopped responding",
"default_severity": "priority",
"example_message": "⚠ Infrastructure Offline: MHR — Mountain Harrison Rptr has not been heard for 2 hours",
"toggle": "mesh_health",
},
"critical_node_down": {
"name": "Critical Node Down",
"description": "A node you marked as critical went offline",
"default_severity": "immediate",
"example_message": "🚨 Critical Node Down: HPR — Hayden Peak Rptr offline for 1 hour",
"toggle": "mesh_health",
},
"infra_recovery": {
"name": "Infrastructure Recovery",
"description": "An offline infrastructure node came back online",
"default_severity": "routine",
"example_message": "✅ Recovery: MHR — Mountain Harrison Rptr back online after 2h outage",
"toggle": "mesh_health",
},
"new_router": {
"name": "New Router",
"description": "A new router appeared on the mesh",
"default_severity": "routine",
"example_message": "📡 New Router: Snake River Relay appeared in Wood River Valley",
"toggle": "mesh_health",
},
# Power alerts
@ -42,36 +72,42 @@ ALERT_CATEGORIES = {
"description": "Infrastructure node battery below 30% (3.60V)",
"default_severity": "routine",
"example_message": "🔋 Battery Warning: BLD-MTN at 28% (3.58V), solar not charging",
"toggle": "mesh_health",
},
"battery_critical": {
"name": "Battery Critical",
"description": "Infrastructure node battery below 15% (3.50V)",
"default_severity": "priority",
"example_message": "🔋 Battery Critical: BLD-MTN at 12% (3.48V) — shutdown in hours",
"toggle": "mesh_health",
},
"battery_emergency": {
"name": "Battery Emergency",
"description": "Infrastructure node battery below 5% (3.40V) — shutdown imminent",
"default_severity": "immediate",
"example_message": "🚨 Battery Emergency: BLD-MTN at 4% (3.38V) — shutdown imminent",
"toggle": "mesh_health",
},
"battery_trend": {
"name": "Battery Declining",
"description": "Battery showing declining trend over 7 days — possible solar or charging issue",
"default_severity": "routine",
"example_message": "🔋 Battery Trend: HPR declining 85% → 62% over 7 days (-3.3%/day)",
"toggle": "mesh_health",
},
"power_source_change": {
"name": "Power Source Change",
"description": "Node switched from USB to battery — possible power outage at site",
"default_severity": "priority",
"example_message": "⚡ Power Source: MHR switched from USB to battery — possible outage",
"toggle": "mesh_health",
},
"solar_not_charging": {
"name": "Solar Not Charging",
"description": "Solar panel not charging during daylight hours — panel issue or obstruction",
"default_severity": "priority",
"example_message": "☀️ Solar Issue: BLD-MTN not charging during daylight (12:00 MDT)",
"toggle": "mesh_health",
},
# Utilization alerts
@ -80,18 +116,21 @@ ALERT_CATEGORIES = {
"description": "LoRa channel airtime exceeding threshold — mesh congestion",
"default_severity": "routine",
"example_message": "📊 Channel Airtime: 47% utilization (threshold: 40%). Reliability may degrade.",
"toggle": "mesh_health",
},
"sustained_high_util": {
"name": "Sustained High Utilization",
"description": "Channel airtime elevated for extended period — ongoing congestion",
"default_severity": "priority",
"example_message": "📊 Sustained Congestion: 45% channel utilization for 2+ hours. Consider reducing telemetry.",
"toggle": "mesh_health",
},
"packet_flood": {
"name": "Packet Flood",
"description": "A single node sending excessive radio packets (NOT water flooding) — possible firmware bug or stuck transmitter",
"default_severity": "priority",
"example_message": "📻 Packet Flood: Node 'BKBS' transmitting 42 packets/min (threshold: 10/min). Firmware bug?",
"toggle": "mesh_health",
},
# Coverage alerts
@ -100,18 +139,21 @@ ALERT_CATEGORIES = {
"description": "Infrastructure node dropped to single gateway coverage — reduced redundancy",
"default_severity": "priority",
"example_message": "📶 Reduced Coverage: HPR dropped to single gateway. Previously had 3 paths.",
"toggle": "mesh_health",
},
"feeder_offline": {
"name": "Feeder Offline",
"description": "A feeder gateway stopped responding — coverage gap possible",
"default_severity": "priority",
"example_message": "📡 Feeder Offline: AIDA-N2 gateway not responding. 5 nodes may lose uplink.",
"toggle": "mesh_health",
},
"region_total_blackout": {
"name": "Region Blackout",
"description": "All infrastructure in a region is offline — complete coverage loss",
"default_severity": "immediate",
"example_message": "🚨 REGION BLACKOUT: All infrastructure in Magic Valley offline!",
"toggle": "mesh_health",
},
# Health score alerts
@ -120,12 +162,14 @@ ALERT_CATEGORIES = {
"description": "Overall mesh health score dropped below threshold — multiple issues likely",
"default_severity": "priority",
"example_message": "📉 Mesh Health: Score 62/100 (threshold: 65). Infrastructure: 71, Connectivity: 58.",
"toggle": "mesh_health",
},
"region_score_low": {
"name": "Region Health Low",
"description": "A region's health score below threshold — localized issues",
"default_severity": "priority",
"example_message": "📉 Region Health: Magic Valley at 55/100 (threshold: 60). 2 nodes offline.",
"toggle": "mesh_health",
},
# Environmental - Weather
@ -134,6 +178,7 @@ ALERT_CATEGORIES = {
"description": "NWS warning or advisory affecting your mesh area",
"default_severity": "priority",
"example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z",
"toggle": "weather",
},
# Environmental - Space Weather
@ -142,12 +187,14 @@ ALERT_CATEGORIES = {
"description": "R3+ solar flare degrading HF propagation on sunlit side",
"default_severity": "priority",
"example_message": "⚠ R3 Strong Radio Blackout — X1.2 flare. Wide-area HF blackout ~1 hour on sunlit side.",
"toggle": "rf_propagation",
},
"geomagnetic_storm": {
"name": "Geomagnetic Storm",
"description": "G2+ geomagnetic storm — HF degraded at higher latitudes, aurora possible",
"default_severity": "priority",
"example_message": "🌐 G2 Moderate Geomagnetic Storm — Kp=6. HF fades at high latitudes, aurora to ~55°.",
"toggle": "rf_propagation",
},
# Environmental - Tropospheric
@ -156,6 +203,7 @@ ALERT_CATEGORIES = {
"description": "Atmospheric conditions trapping VHF/UHF signals — extended range",
"default_severity": "routine",
"example_message": "📡 Tropospheric Ducting: Surface duct detected, dM/dz -45 M-units/km, ~120m thick. VHF/UHF extended range.",
"toggle": "rf_propagation",
},
# Environmental - Fire
@ -164,18 +212,21 @@ ALERT_CATEGORIES = {
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "priority",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR. Monitor closely.",
"toggle": "fire",
},
"wildfire_proximity": {
"name": "Fire Near Mesh",
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "priority",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR.",
"toggle": "fire",
},
"new_ignition": {
"name": "New Fire Ignition",
"description": "Satellite hotspot detected NOT near any known fire — potential new wildfire",
"default_severity": "priority",
"example_message": "🛰 New Ignition: Satellite fire at 42.32°N, 114.30°W — high confidence, 47 MW FRP. Not near any known fire.",
"toggle": "fire",
},
# Environmental - Flood
@ -184,12 +235,14 @@ ALERT_CATEGORIES = {
"description": "River gauge exceeds NWS flood stage threshold",
"default_severity": "priority",
"example_message": "🌊 Stream Flood Warning: Snake River nr Twin Falls at 12.8 ft — Minor Flood Stage is 10.5 ft.",
"toggle": "weather",
},
"stream_high_water": {
"name": "Stream High Water",
"description": "River gauge approaching flood stage — monitoring recommended",
"default_severity": "routine",
"example_message": "🌊 High Water: Snake River at 9.8 ft — Action Stage is 9.0 ft. Monitor conditions.",
"toggle": "weather",
},
# Environmental - Roads
@ -198,12 +251,14 @@ ALERT_CATEGORIES = {
"description": "Full road closure on a monitored corridor",
"default_severity": "priority",
"example_message": "🚧 Road Closure: I-84 EB at MP 173 — full closure, construction. Detour via US-30.",
"toggle": "roads",
},
"traffic_congestion": {
"name": "Traffic Congestion",
"description": "Traffic speed dropped below congestion threshold on a monitored corridor",
"default_severity": "routine",
"example_message": "🚗 Traffic Congestion: I-84 Twin Falls — 35 mph (free-flow 70 mph), 50% speed ratio",
"toggle": "roads",
},
# Environmental - Avalanche
@ -212,12 +267,14 @@ ALERT_CATEGORIES = {
"description": "Avalanche danger level 4 (High) or 5 (Extreme) in your area",
"default_severity": "priority",
"example_message": "⛷ Avalanche Danger HIGH: Sawtooth Zone — avoid avalanche terrain. Natural avalanches likely.",
"toggle": "avalanche",
},
"avalanche_considerable": {
"name": "Avalanche Danger Considerable",
"description": "Avalanche danger level 3 (Considerable) — most fatalities occur at this level",
"default_severity": "routine",
"example_message": "⛷ Avalanche Danger CONSIDERABLE: Sawtooth Zone — dangerous conditions on steep slopes.",
"toggle": "avalanche",
},
}
@ -231,6 +288,7 @@ def get_category(category_id: str) -> dict:
"description": f"Alert type: {category_id}",
"default_severity": "routine",
"example_message": f"Alert: {category_id}",
"toggle": "mesh_health", # Default unknown to mesh_health
}
@ -240,3 +298,37 @@ def list_categories() -> list[dict]:
{"id": cat_id, **cat_info}
for cat_id, cat_info in ALERT_CATEGORIES.items()
]
def categories_for_toggle(toggle: str) -> list[str]:
"""Return all category names that route to this toggle.
Args:
toggle: Toggle name (e.g., "mesh_health", "weather")
Returns:
List of category IDs that have this toggle assigned
"""
if toggle not in VALID_TOGGLES:
return []
return [
cat_id
for cat_id, cat_info in ALERT_CATEGORIES.items()
if cat_info.get("toggle") == toggle
]
def get_toggle(category_name: str) -> Optional[str]:
"""Return the toggle name for a category, or None if unknown.
Args:
category_name: Category ID (e.g., "infra_offline")
Returns:
Toggle name (e.g., "mesh_health") or None if category unknown
"""
cat_info = ALERT_CATEGORIES.get(category_name)
if cat_info:
return cat_info.get("toggle")
return None

View file

@ -0,0 +1,186 @@
"""Event dataclass for the v0.3 notification pipeline.
This module defines the unified Event shape that flows through the
notification routing pipeline. All adapters emit Events, and the
router consumes them.
Usage:
from meshai.notifications.events import Event, make_event
# Create an event
event = make_event(
source="nws",
category="tornado_warning",
severity="immediate",
title="Tornado Warning for Ada County",
summary="A tornado warning has been issued...",
lat=43.615,
lon=-116.2023,
)
# Serialize for storage/webhook
data = event.to_dict()
# Restore from storage
event2 = Event.from_dict(data)
"""
import hashlib
import time
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
# Valid severity levels
SEVERITY_LEVELS = frozenset({"routine", "priority", "immediate"})
@dataclass
class Event:
"""Unified event shape for the notification pipeline.
All adapters (NWS, FIRMS, alert_engine, etc.) emit Events.
The router consumes Events and dispatches them to channels.
"""
# Identity
id: str = "" # stable hash for dedup, computed if not provided
source: str = "" # adapter name: "nws", "firms", "alert_engine", etc.
category: str = "" # specific event type within source
# Severity
severity: str = "routine" # "routine" | "priority" | "immediate"
# Geography
region: Optional[str] = None # primary region name, set by region tagger
regions: list[str] = field(default_factory=list) # all regions touched
lat: Optional[float] = None
lon: Optional[float] = None
nws_zones: list[str] = field(default_factory=list) # NWS zone codes
# Content
title: str = "" # one-line summary for digest headers
summary: str = "" # 1-3 sentence summary for immediate/mesh delivery
body: str = "" # full content for email/webhook delivery
# Affected entities (for mesh health events)
node_ids: list[str] = field(default_factory=list)
short_names: list[str] = field(default_factory=list)
# Timing
timestamp: float = 0.0 # event creation time
effective: Optional[float] = None # event start (NWS-style)
expires: Optional[float] = None # event end (NWS-style)
# Routing hints
group_key: Optional[str] = None # events with same key get merged
inhibit_keys: list[str] = field(default_factory=list) # suppression keys
# Raw adapter data (preserved for advanced rendering)
data: dict = field(default_factory=dict)
@staticmethod
def compute_id(
source: str,
category: str,
group_key: Optional[str] = None,
lat: Optional[float] = None,
lon: Optional[float] = None,
) -> str:
"""Compute a stable dedup ID for an event.
Two events with the same source+category+group_key+location
will have the same ID and can be deduplicated.
Args:
source: Adapter name
category: Event category
group_key: Optional grouping key
lat: Optional latitude
lon: Optional longitude
Returns:
16-character hex ID
"""
key_parts = [
source,
category,
group_key or "",
str(lat) if lat is not None else "",
str(lon) if lon is not None else "",
]
key_string = ":".join(key_parts)
return hashlib.sha1(key_string.encode()).hexdigest()[:16]
def to_dict(self) -> dict[str, Any]:
"""Serialize event to a dict for JSON storage/webhook.
Returns:
Dict representation of the event
"""
return asdict(self)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Event":
"""Restore an Event from a dict.
Args:
d: Dict representation (from to_dict or JSON load)
Returns:
Event instance
"""
return cls(**d)
def make_event(
source: str,
category: str,
severity: str,
**kwargs: Any,
) -> Event:
"""Create an Event with automatic ID and timestamp.
This is the primary factory function for creating events.
It auto-computes the ID if not provided and sets timestamp
to the current time if not provided.
Args:
source: Adapter name (e.g., "nws", "firms", "alert_engine")
category: Event category (e.g., "tornado_warning", "infra_offline")
severity: One of "routine", "priority", "immediate"
**kwargs: Additional Event fields
Returns:
Event instance
Raises:
ValueError: If severity is not valid
"""
# Validate severity
if severity not in SEVERITY_LEVELS:
raise ValueError(
f"Invalid severity '{severity}'. "
f"Must be one of: {', '.join(sorted(SEVERITY_LEVELS))}"
)
# Auto-set timestamp if not provided
if "timestamp" not in kwargs or kwargs["timestamp"] == 0.0:
kwargs["timestamp"] = time.time()
# Auto-compute ID if not provided
if "id" not in kwargs or not kwargs["id"]:
kwargs["id"] = Event.compute_id(
source=source,
category=category,
group_key=kwargs.get("group_key"),
lat=kwargs.get("lat"),
lon=kwargs.get("lon"),
)
return Event(
source=source,
category=category,
severity=severity,
**kwargs,
)

View file

@ -0,0 +1,88 @@
"""Notification pipeline package.
Phase 2.1 provides the bare skeleton:
- EventBus: Central pub/sub for all events
- SeverityRouter: Routes immediate vs digest events
- Dispatcher: Delivers immediate events to channels
- StubDigestQueue: Placeholder for Phase 2.3 aggregator
Usage:
from meshai.notifications.pipeline import build_pipeline
pipeline = build_pipeline(channel_config={
"mesh_health": ["discord"],
"weather": ["discord", "meshtastic"],
})
# Emit events through the bus
pipeline["bus"].emit(event)
"""
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue,
)
from meshai.notifications.pipeline.dispatcher import (
Dispatcher,
StubChannelBackend,
)
def build_pipeline(
channel_config: dict[str, list[str]] | None = None,
) -> dict:
"""Build and wire up the notification pipeline.
Creates all pipeline components and connects them:
- EventBus receives all events
- SeverityRouter subscribes to bus, routes by severity
- Dispatcher handles immediate events
- StubDigestQueue collects priority/routine events
Args:
channel_config: Mapping of toggle -> channel names for dispatch.
Example: {"mesh_health": ["discord"]}
Returns:
Dict with all pipeline components:
- bus: EventBus instance
- router: SeverityRouter instance
- dispatcher: Dispatcher instance
- digest_queue: StubDigestQueue instance
"""
# Create components
bus = EventBus()
dispatcher = Dispatcher(channel_config)
digest_queue = StubDigestQueue()
# Wire up the router
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest_queue.enqueue,
)
# Subscribe router to bus
bus.subscribe(router.handle)
return {
"bus": bus,
"router": router,
"dispatcher": dispatcher,
"digest_queue": digest_queue,
}
__all__ = [
# Core classes
"EventBus",
"SeverityRouter",
"Dispatcher",
# Stubs for testing/Phase 2.x
"StubDigestQueue",
"StubChannelBackend",
# Factory
"build_pipeline",
# Singleton accessor
"get_bus",
]

View file

@ -0,0 +1,85 @@
"""Event bus for the notification pipeline.
The bus is the entry point for all events flowing through the pipeline.
Adapters call bus.emit(event) to push Events into the system.
Usage:
from meshai.notifications.pipeline import get_bus
from meshai.notifications.events import make_event
bus = get_bus()
event = make_event(source="nws", category="weather_warning", severity="immediate", ...)
bus.emit(event)
"""
import logging
from typing import Callable, Iterable
from meshai.notifications.events import Event
class EventBus:
"""Central event bus for the notification pipeline.
Subscribers register handlers that receive every emitted event.
Errors in one subscriber do not prevent other subscribers from
receiving the event.
"""
def __init__(self):
self._subscribers: list[Callable[[Event], None]] = []
self._logger = logging.getLogger("meshai.pipeline.bus")
def subscribe(self, handler: Callable[[Event], None]) -> None:
"""Register a handler that receives every emitted event.
Args:
handler: Callable that takes an Event and returns None
"""
self._subscribers.append(handler)
self._logger.debug(f"Subscribed handler: {handler}")
def emit(self, event: Event) -> None:
"""Push an event to all subscribers.
Errors in one subscriber do not stop others from receiving
the event. Exceptions are logged but not re-raised.
Args:
event: The Event to deliver to all subscribers
"""
for handler in self._subscribers:
try:
handler(event)
except Exception:
self._logger.exception(
f"Subscriber {handler} failed on event {event.id}"
)
def emit_many(self, events: Iterable[Event]) -> None:
"""Emit multiple events in sequence.
Args:
events: Iterable of Events to emit
"""
for event in events:
self.emit(event)
# Module-level singleton for application-wide use
_bus: EventBus | None = None
def get_bus() -> EventBus:
"""Get the global EventBus singleton.
This is the primary way adapters access the bus. Tests should
construct a fresh EventBus() directly to avoid shared state.
Returns:
The global EventBus instance
"""
global _bus
if _bus is None:
_bus = EventBus()
return _bus

View file

@ -0,0 +1,143 @@
"""Immediate event dispatcher.
The dispatcher routes immediate-severity events to configured delivery
channels based on the event's toggle category.
Phase 2.1 provides a stub that logs dispatch attempts. Phase 2.2 will
add real channel backends (Discord webhooks, Meshtastic broadcast, etc.).
Usage:
dispatcher = Dispatcher(channel_config)
dispatcher.dispatch(event) # Called by SeverityRouter for immediate events
"""
import logging
from typing import Callable, Optional
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
class Dispatcher:
"""Dispatches immediate events to configured channels.
Each toggle category can have multiple delivery channels configured.
The dispatcher looks up the toggle for an event's category and sends
to all channels registered for that toggle.
Phase 2.1: Stub implementation that logs but doesn't actually deliver.
Phase 2.2: Will add real channel backends.
"""
def __init__(
self,
channel_config: Optional[dict[str, list[str]]] = None,
):
"""Initialize the dispatcher.
Args:
channel_config: Mapping of toggle -> list of channel names.
Example: {"mesh_health": ["discord", "meshtastic"]}
If None, defaults to empty (no channels configured).
"""
self._channels = channel_config or {}
self._logger = logging.getLogger("meshai.pipeline.dispatcher")
self._backends: dict[str, Callable[[Event], None]] = {}
def register_backend(
self,
channel_name: str,
handler: Callable[[Event], None],
) -> None:
"""Register a delivery backend for a channel.
Args:
channel_name: Name of the channel (e.g., "discord", "meshtastic")
handler: Callable that delivers the event to the channel
"""
self._backends[channel_name] = handler
self._logger.debug(f"Registered backend: {channel_name}")
def dispatch(self, event: Event) -> None:
"""Dispatch an immediate event to configured channels.
Looks up the toggle for the event's category, then sends to
all channels configured for that toggle.
Args:
event: The immediate-severity Event to dispatch
"""
toggle = get_toggle(event.category)
if toggle is None:
self._logger.warning(
f"Unknown category {event.category!r} for event {event.id}, "
"defaulting to mesh_health"
)
toggle = "mesh_health"
channels = self._channels.get(toggle, [])
if not channels:
self._logger.info(
f"No channels configured for toggle {toggle!r}, "
f"event {event.id} not dispatched"
)
return
for channel in channels:
self._deliver_to_channel(event, channel, toggle)
def _deliver_to_channel(
self,
event: Event,
channel: str,
toggle: str,
) -> None:
"""Deliver event to a specific channel.
Args:
event: The Event to deliver
channel: Channel name
toggle: Toggle category (for logging)
"""
backend = self._backends.get(channel)
if backend is None:
# Phase 2.1: Log stub - no real backend yet
self._logger.info(
f"DISPATCH STUB [{toggle}] -> {channel}: {event.title}"
)
return
try:
backend(event)
self._logger.info(
f"DISPATCHED [{toggle}] -> {channel}: {event.title}"
)
except Exception:
self._logger.exception(
f"Failed to dispatch event {event.id} to {channel}"
)
class StubChannelBackend:
"""Stub channel backend for testing.
Collects all events "sent" to it for verification in tests.
"""
def __init__(self, name: str):
self.name = name
self.events: list[Event] = []
self._logger = logging.getLogger(f"meshai.pipeline.stub.{name}")
def send(self, event: Event) -> None:
"""Record an event as sent.
Args:
event: The Event to record
"""
self.events.append(event)
self._logger.info(f"STUB {self.name}: {event.title}")
def clear(self) -> None:
"""Clear recorded events."""
self.events = []

View file

@ -0,0 +1,104 @@
"""Severity-based event routing.
The severity router subscribes to the bus and forks each event into
one of two paths based on severity:
- immediate immediate_handler (dispatcher for live delivery)
- priority/routine digest_handler (queue for batched summaries)
Usage:
router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest_queue.enqueue,
)
bus.subscribe(router.handle)
"""
import logging
from typing import Callable
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
class SeverityRouter:
"""Routes events to immediate or digest handlers based on severity.
Immediate-severity events go directly to live delivery channels.
Priority and routine events are queued for periodic digest summaries.
"""
def __init__(
self,
immediate_handler: Callable[[Event], None],
digest_handler: Callable[[Event], None],
):
"""Initialize the severity router.
Args:
immediate_handler: Called for severity="immediate" events
digest_handler: Called for severity in ("priority", "routine")
"""
self._immediate = immediate_handler
self._digest = digest_handler
self._logger = logging.getLogger("meshai.pipeline.severity_router")
def handle(self, event: Event) -> None:
"""Route an event based on its severity.
Args:
event: The Event to route
"""
if event.severity == "immediate":
self._logger.info(
f"IMMEDIATE: {event.source}/{event.category} {event.title}"
)
self._immediate(event)
elif event.severity in ("priority", "routine"):
self._logger.info(
f"DIGEST QUEUED [{event.severity}]: {event.title}"
)
self._digest(event)
else:
self._logger.warning(
f"Unknown severity {event.severity!r} on event {event.id}, dropping"
)
class StubDigestQueue:
"""Placeholder digest queue for Phase 2.1.
This is a stub that simply collects events in memory. Phase 2.3
will replace this with the real aggregator that renders and
delivers periodic digest summaries.
"""
def __init__(self):
self._queue: list[Event] = []
self._logger = logging.getLogger("meshai.pipeline.digest_stub")
def enqueue(self, event: Event) -> None:
"""Add an event to the digest queue.
Args:
event: The Event to queue for digest delivery
"""
self._queue.append(event)
toggle = get_toggle(event.category) or "unknown"
self._logger.info(f"DIGEST QUEUED [{toggle}]: {event.title}")
def drain(self) -> list[Event]:
"""Return and clear all queued events.
For tests and the future aggregator. Returns the current
queue contents and resets the queue to empty.
Returns:
List of all queued Events
"""
events, self._queue = self._queue, []
return events
def __len__(self) -> int:
"""Return the number of queued events."""
return len(self._queue)

View file

@ -0,0 +1,160 @@
"""Region tagger for mapping coordinates and NWS zones to regions.
This module provides functions to:
- Map lat/lon coordinates to the nearest configured region
- Map NWS zone codes to matching regions
Usage:
from meshai.notifications.region_tagger import tag_by_coordinates, tag_by_nws_zone
from meshai.config import RegionAnchor
regions = [
RegionAnchor(name="South Western ID", lat=43.615, lon=-116.2023,
nws_zones=["IDZ016", "IDZ030"]),
RegionAnchor(name="Magic Valley", lat=42.5558, lon=-114.4701,
nws_zones=["IDZ031"]),
]
# Find region by coordinates
region = tag_by_coordinates(43.6, -116.2, regions)
# Returns: "South Western ID"
# Find regions by NWS zone
regions = tag_by_nws_zone("IDZ016", regions)
# Returns: ["South Western ID"]
"""
import math
from typing import Optional
# Import RegionAnchor type for annotations
# Actual import happens at function call time to avoid circular imports
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from meshai.config import RegionAnchor
# Earth radius in miles (mean radius)
EARTH_RADIUS_MILES = 3958.8
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate the great-circle distance between two points on Earth.
Uses the haversine formula for accuracy on a spherical Earth model.
Args:
lat1: Latitude of first point in degrees
lon1: Longitude of first point in degrees
lat2: Latitude of second point in degrees
lon2: Longitude of second point in degrees
Returns:
Distance in miles
"""
# Convert to radians
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
lon1_rad = math.radians(lon1)
lon2_rad = math.radians(lon2)
# Differences
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# Haversine formula
a = math.sin(dlat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2) ** 2
c = 2 * math.asin(math.sqrt(a))
return EARTH_RADIUS_MILES * c
def tag_by_coordinates(
lat: float,
lon: float,
regions: list, # list[RegionAnchor]
radius_miles: float = 25.0,
) -> Optional[str]:
"""Return the name of the nearest region within radius_miles.
Finds the closest region anchor to the given coordinates. If the
closest anchor is within radius_miles, returns its name. Otherwise
returns None.
Args:
lat: Latitude of the point to tag
lon: Longitude of the point to tag
regions: List of RegionAnchor objects to search
radius_miles: Maximum distance to consider (default 25 miles)
Returns:
Name of the nearest region within range, or None if no match
"""
if not regions:
return None
closest_region = None
closest_distance = float("inf")
for region in regions:
# Skip regions without valid coordinates
region_lat = getattr(region, "lat", None)
region_lon = getattr(region, "lon", None)
if region_lat is None or region_lon is None:
continue
if region_lat == 0.0 and region_lon == 0.0:
# Treat (0, 0) as unset coordinates
continue
distance = haversine_distance(lat, lon, region_lat, region_lon)
if distance < closest_distance:
closest_distance = distance
closest_region = region
# Check if closest is within radius
if closest_region is not None and closest_distance <= radius_miles:
return getattr(closest_region, "name", None)
return None
def tag_by_nws_zone(
zone_code: str,
regions: list, # list[RegionAnchor]
) -> list[str]:
"""Return all region names whose nws_zones list contains zone_code.
Multiple regions can match the same zone (a zone may span multiple
configured regions).
Args:
zone_code: NWS zone code to match (e.g., "IDZ016")
regions: List of RegionAnchor objects to search
Returns:
List of region names that contain this zone, empty if no matches
"""
if not zone_code or not regions:
return []
# Normalize zone code to uppercase for case-insensitive matching
zone_upper = zone_code.upper().strip()
matching_regions = []
for region in regions:
region_zones = getattr(region, "nws_zones", None)
if not region_zones:
continue
# Check if zone matches any in this region's list (case-insensitive)
for rz in region_zones:
if rz.upper().strip() == zone_upper:
region_name = getattr(region, "name", None)
if region_name:
matching_regions.append(region_name)
break # Don't add same region twice
return matching_regions

View file

@ -0,0 +1 @@
# MeshAI scripts package

View file

@ -0,0 +1,736 @@
#!/usr/bin/env python3
"""Migration script for MeshAI config v0.2 → v0.3.
This script converts the monolithic /data/config.yaml into the new
multi-file layout under /data/config/.
Run once: python -m meshai.scripts.migrate_config_v03
The migration:
1. Backs up the original config
2. Splits sections into domain files
3. Extracts operator-identifying values to local.yaml
4. Extracts literal secrets to /data/secrets/.env
5. Creates orchestrator config.yaml with !include directives
6. Verifies the new layout loads identically
"""
import hashlib
import logging
import os
import re
import shutil
import sys
from dataclasses import asdict, fields
from pathlib import Path
from typing import Any
import yaml
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
# =============================================================================
# DEEP COMPARISON FOR VERIFICATION
# =============================================================================
def deep_compare(old, new, path=""):
"""Deep compare two values, returning list of difference descriptions."""
differences = []
if type(old) != type(new):
differences.append(f"{path}: type {type(old).__name__} vs {type(new).__name__}")
return differences
if isinstance(old, dict):
for key in sorted(set(old.keys()) | set(new.keys())):
child_path = f"{path}.{key}" if path else key
if key not in old:
differences.append(f"{child_path}: missing in backup")
elif key not in new:
differences.append(f"{child_path}: missing in new")
else:
differences.extend(deep_compare(old[key], new[key], child_path))
elif isinstance(old, list):
if len(old) != len(new):
differences.append(f"{path}: list length {len(old)} vs {len(new)}")
else:
for i, (o, n) in enumerate(zip(old, new)):
differences.extend(deep_compare(o, n, f"{path}[{i}]"))
elif old != new:
differences.append(f"{path}: {repr(old)[:50]} vs {repr(new)[:50]}")
return differences
# =============================================================================
# CONFIGURATION
# =============================================================================
SOURCE_FILE = Path("/data/config.yaml")
TARGET_DIR = Path("/data/config")
BACKUP_FILE = Path("/data/config.yaml.pre-v03-backup")
SECRETS_DIR = Path("/data/secrets")
# Section to file mapping
SECTION_TO_FILE = {
"connection": "meshtastic.yaml",
"commands": "meshtastic.yaml",
"mesh_sources": "mesh_sources.yaml",
"mesh_intelligence": "mesh_intelligence.yaml",
"environmental": "env_feeds.yaml",
"notifications": "notifications.yaml",
"llm": "llm.yaml",
"dashboard": "dashboard.yaml",
}
# Sections that stay inline in orchestrator config.yaml
INLINE_SECTIONS = [
"timezone",
"bot",
"response",
"history",
"memory",
"context",
"weather",
"meshmonitor",
"knowledge",
]
# Fields to extract to local.yaml
LOCAL_EXTRACTIONS = {
"bot.name": "identity.name",
"bot.owner": "identity.owner",
"connection.tcp_host": "infrastructure.tcp_host",
"knowledge.qdrant_host": "infrastructure.qdrant_host",
"knowledge.tei_host": "infrastructure.tei_host",
"knowledge.sparse_host": "infrastructure.sparse_host",
"meshmonitor.url": "mesh_sources.meshmonitor_url",
"mesh_intelligence.critical_nodes": "critical_nodes",
"environmental.ducting.latitude": "env_center.latitude",
"environmental.ducting.longitude": "env_center.longitude",
"environmental.nws.user_agent": "identity.contact_email", # Extract email from user_agent
}
# Secret fields - if found as literals, extract to .env
SECRET_PATTERNS = {
"llm.api_key": "LLM_API_KEY", # Will be renamed based on backend
"mesh_sources.*.api_token": "MESHMONITOR_API_TOKEN",
"mesh_sources.*.password": "MQTT_PASSWORD",
"environmental.traffic.api_key": "TOMTOM_API_KEY",
"environmental.firms.map_key": "FIRMS_MAP_KEY",
"notifications.rules.*.smtp_password": "SMTP_PASSWORD",
}
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def get_nested(data: dict, path: str) -> Any:
"""Get a value from nested dict using dot notation."""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
def set_nested(data: dict, path: str, value: Any) -> None:
"""Set a value in nested dict using dot notation, creating dicts as needed."""
parts = path.split(".")
current = data
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
def remove_nested(data: dict, path: str) -> bool:
"""Remove a value from nested dict. Returns True if removed."""
parts = path.split(".")
current = data
for part in parts[:-1]:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return False
if parts[-1] in current:
del current[parts[-1]]
return True
return False
def file_hash(path: Path) -> str:
"""Calculate SHA256 hash of a file."""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def is_env_var_ref(value: str) -> bool:
"""Check if a string is an env var reference like ${VAR_NAME}."""
if not isinstance(value, str):
return False
return bool(re.match(r"^\$\{[A-Z_][A-Z0-9_]*\}$", value))
def extract_email_from_user_agent(user_agent: str) -> str:
"""Extract email from NWS user_agent format: (app, email@domain.com)"""
if not user_agent:
return ""
match = re.search(r"[\w.+-]+@[\w.-]+\.\w+", user_agent)
return match.group(0) if match else ""
# =============================================================================
# PRE-FLIGHT CHECKS
# =============================================================================
def preflight_checks() -> bool:
"""Run pre-flight checks before migration."""
logger.info("Running pre-flight checks...")
# Check source exists
if not SOURCE_FILE.exists():
logger.error(f"Source file not found: {SOURCE_FILE}")
return False
logger.info(f" Source file exists: {SOURCE_FILE}")
# Check target directory state
if TARGET_DIR.exists():
contents = list(TARGET_DIR.iterdir())
if contents:
logger.error(
f"Target directory {TARGET_DIR} already populated with {len(contents)} items. "
"Manual intervention needed - remove existing files or restore from backup."
)
return False
logger.info(f" Target directory exists but is empty: {TARGET_DIR}")
else:
logger.info(f" Target directory does not exist: {TARGET_DIR}")
# Check backup doesn't already exist (indicates previous migration)
if BACKUP_FILE.exists():
logger.warning(
f"Backup file already exists: {BACKUP_FILE}. "
"This may indicate a previous migration attempt."
)
# Continue anyway - user may be re-running after fixing issues
logger.info("Pre-flight checks passed.")
return True
# =============================================================================
# BACKUP
# =============================================================================
def create_backup() -> bool:
"""Create backup of original config."""
logger.info(f"Creating backup: {SOURCE_FILE}{BACKUP_FILE}")
shutil.copy2(SOURCE_FILE, BACKUP_FILE)
# Verify backup
source_hash = file_hash(SOURCE_FILE)
backup_hash = file_hash(BACKUP_FILE)
if source_hash != backup_hash:
logger.error(
f"Backup verification failed! Hashes don't match:\n"
f" Source: {source_hash}\n"
f" Backup: {backup_hash}"
)
return False
source_size = SOURCE_FILE.stat().st_size
backup_size = BACKUP_FILE.stat().st_size
if source_size != backup_size:
logger.error(
f"Backup verification failed! Sizes don't match:\n"
f" Source: {source_size}\n"
f" Backup: {backup_size}"
)
return False
logger.info(f" Backup verified: {backup_size} bytes, hash {backup_hash[:12]}...")
return True
# =============================================================================
# EXTRACTION LOGIC
# =============================================================================
def extract_local_values(data: dict) -> dict:
"""Extract operator-identifying values to local.yaml structure."""
local = {}
for source_path, dest_path in LOCAL_EXTRACTIONS.items():
value = get_nested(data, source_path)
if value is not None and value != "" and value != 0:
# Special handling for user_agent -> email extraction
if source_path == "environmental.nws.user_agent":
value = extract_email_from_user_agent(str(value))
if not value:
continue
set_nested(local, dest_path, value)
logger.debug(f" Extracted {source_path} → local.{dest_path}")
# Extract region coordinates
mi = data.get("mesh_intelligence", {})
regions = mi.get("regions", [])
if regions:
local["regions"] = {}
for region in regions:
if isinstance(region, dict):
name = region.get("name", "")
lat = region.get("lat", 0)
lon = region.get("lon", 0)
if name and (lat != 0 or lon != 0):
local["regions"][name] = {"lat": lat, "lon": lon}
# Extract mesh source URLs
mesh_sources = data.get("mesh_sources", [])
if mesh_sources:
local["mesh_sources"] = {"sources": {}}
for source in mesh_sources:
if isinstance(source, dict):
name = source.get("name", "")
url = source.get("url", "")
host = source.get("host", "")
if name and (url or host):
local["mesh_sources"]["sources"][name] = {}
if url:
local["mesh_sources"]["sources"][name]["url"] = url
if host:
local["mesh_sources"]["sources"][name]["host"] = host
# Extract notification targets
notifications = data.get("notifications", {})
rules = notifications.get("rules", [])
if rules:
node_ids = set()
recipients = set()
for rule in rules:
if isinstance(rule, dict):
for nid in rule.get("node_ids", []):
node_ids.add(nid)
for rcpt in rule.get("recipients", []):
recipients.add(rcpt)
if node_ids:
local["notification_targets"] = local.get("notification_targets", {})
local["notification_targets"]["alert_node_ids"] = list(node_ids)
if recipients:
local["notification_targets"] = local.get("notification_targets", {})
local["notification_targets"]["smtp_recipients"] = list(recipients)
return local
def extract_secrets(data: dict) -> dict:
"""Extract literal secrets to .env format."""
secrets = {}
# LLM API key
llm = data.get("llm", {})
api_key = llm.get("api_key", "")
if api_key and not is_env_var_ref(api_key):
backend = llm.get("backend", "openai").upper()
key_name = f"{backend}_API_KEY"
if backend == "GOOGLE":
key_name = "GOOGLE_API_KEY"
secrets[key_name] = api_key
logger.info(f" Extracted llm.api_key → {key_name}")
# Mesh source tokens/passwords
for i, source in enumerate(data.get("mesh_sources", [])):
if isinstance(source, dict):
token = source.get("api_token", "")
if token and not is_env_var_ref(token):
secrets["MESHMONITOR_API_TOKEN"] = token
logger.info(f" Extracted mesh_sources[{i}].api_token → MESHMONITOR_API_TOKEN")
password = source.get("password", "")
if password and not is_env_var_ref(password):
secrets["MQTT_PASSWORD"] = password
logger.info(f" Extracted mesh_sources[{i}].password → MQTT_PASSWORD")
# Environmental API keys
env = data.get("environmental", {})
traffic = env.get("traffic", {})
if traffic.get("api_key") and not is_env_var_ref(traffic["api_key"]):
secrets["TOMTOM_API_KEY"] = traffic["api_key"]
logger.info(" Extracted environmental.traffic.api_key → TOMTOM_API_KEY")
firms = env.get("firms", {})
if firms.get("map_key") and not is_env_var_ref(firms["map_key"]):
secrets["FIRMS_MAP_KEY"] = firms["map_key"]
logger.info(" Extracted environmental.firms.map_key → FIRMS_MAP_KEY")
# Notification SMTP passwords
for i, rule in enumerate(data.get("notifications", {}).get("rules", [])):
if isinstance(rule, dict):
smtp_pass = rule.get("smtp_password", "")
if smtp_pass and not is_env_var_ref(smtp_pass):
secrets["SMTP_PASSWORD"] = smtp_pass
logger.info(f" Extracted notifications.rules[{i}].smtp_password → SMTP_PASSWORD")
return secrets
def strip_local_values_from_domain(data: dict) -> dict:
"""Remove local values from domain data, replacing with placeholders."""
# Remove operator values that went to local.yaml
# These get merged back at load time
# Strip bot name/owner (will come from local.yaml)
if "bot" in data:
data["bot"].pop("name", None)
data["bot"].pop("owner", None)
# Strip connection tcp_host
if "connection" in data:
data["connection"].pop("tcp_host", None)
# Strip knowledge hosts
if "knowledge" in data:
data["knowledge"].pop("qdrant_host", None)
data["knowledge"].pop("tei_host", None)
data["knowledge"].pop("sparse_host", None)
# Strip meshmonitor url
if "meshmonitor" in data:
data["meshmonitor"].pop("url", None)
# Strip critical_nodes (comes from local.yaml)
if "mesh_intelligence" in data:
data["mesh_intelligence"].pop("critical_nodes", None)
# Strip region lat/lon (comes from local.yaml)
if "mesh_intelligence" in data:
for region in data["mesh_intelligence"].get("regions", []):
if isinstance(region, dict):
region.pop("lat", None)
region.pop("lon", None)
# Strip mesh source URLs (comes from local.yaml)
for source in data.get("mesh_sources", []):
if isinstance(source, dict):
source.pop("url", None)
source.pop("host", None)
# Strip ducting lat/lon (comes from local.yaml)
if "environmental" in data:
ducting = data["environmental"].get("ducting", {})
ducting.pop("latitude", None)
ducting.pop("longitude", None)
# Strip nws user_agent (comes from local.yaml identity.contact_email)
nws = data["environmental"].get("nws", {})
nws.pop("user_agent", None)
# Strip notification node_ids and recipients (comes from local.yaml)
if "notifications" in data:
for rule in data["notifications"].get("rules", []):
if isinstance(rule, dict):
rule.pop("node_ids", None)
rule.pop("recipients", None)
rule.pop("from_address", None)
return data
def replace_secrets_with_refs(data: dict, secrets: dict) -> dict:
"""Replace literal secrets with ${VAR_NAME} references."""
# LLM API key
if "llm" in data and data["llm"].get("api_key"):
backend = data["llm"].get("backend", "openai").upper()
key_name = f"{backend}_API_KEY"
if backend == "GOOGLE":
key_name = "GOOGLE_API_KEY"
data["llm"]["api_key"] = f"${{{key_name}}}"
# Mesh sources
for source in data.get("mesh_sources", []):
if isinstance(source, dict):
if source.get("api_token") and not is_env_var_ref(source["api_token"]):
source["api_token"] = "${MESHMONITOR_API_TOKEN}"
if source.get("password") and not is_env_var_ref(source["password"]):
source["password"] = "${MQTT_PASSWORD}"
# Environmental
if "environmental" in data:
traffic = data["environmental"].get("traffic", {})
if traffic.get("api_key") and not is_env_var_ref(traffic["api_key"]):
traffic["api_key"] = "${TOMTOM_API_KEY}"
firms = data["environmental"].get("firms", {})
if firms.get("map_key") and not is_env_var_ref(firms["map_key"]):
firms["map_key"] = "${FIRMS_MAP_KEY}"
# Notifications
for rule in data.get("notifications", {}).get("rules", []):
if isinstance(rule, dict):
if rule.get("smtp_password") and not is_env_var_ref(rule["smtp_password"]):
rule["smtp_password"] = "${SMTP_PASSWORD}"
return data
# =============================================================================
# FILE WRITING
# =============================================================================
def write_domain_file(path: Path, data: dict) -> None:
"""Write a domain YAML file."""
with open(path, "w") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
logger.info(f" Wrote {path} ({path.stat().st_size} bytes)")
def write_orchestrator(path: Path, data: dict) -> None:
"""Write the orchestrator config.yaml with !include directives."""
# Build the orchestrator content manually to preserve !include syntax
lines = [
"# MeshAI Configuration v0.3",
"# Multi-file layout with !include directives",
"",
]
# Add inline sections
for section in INLINE_SECTIONS:
if section in data:
section_yaml = yaml.dump({section: data[section]}, default_flow_style=False, sort_keys=False)
lines.append(section_yaml.rstrip())
lines.append("")
# Add !include directives for domain files
lines.append("# Domain files (use !include)")
for section, target_file in SECTION_TO_FILE.items():
if section in data and section not in ["commands"]: # commands shares file with connection
lines.append(f"{section}: !include {target_file}")
content = "\n".join(lines) + "\n"
with open(path, "w") as f:
f.write(content)
logger.info(f" Wrote orchestrator {path} ({path.stat().st_size} bytes)")
def write_local_yaml(path: Path, data: dict) -> None:
"""Write local.yaml with restricted permissions."""
header = """# LOCAL OPERATOR CONFIGURATION
# This file is gitignored - contains operator-identifying values
# Edit this file to customize for your deployment
"""
with open(path, "w") as f:
f.write(header)
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
path.chmod(0o600)
logger.info(f" Wrote {path} ({path.stat().st_size} bytes, mode 600)")
def write_env_file(path: Path, secrets: dict) -> None:
"""Write .env file with restricted permissions."""
header = """# MeshAI Secrets
# This file is gitignored - contains API keys and passwords
# Never commit this file to version control
"""
lines = [header]
for key, value in sorted(secrets.items()):
lines.append(f"{key}={value}")
content = "\n".join(lines) + "\n"
with open(path, "w") as f:
f.write(content)
path.chmod(0o600)
logger.info(f" Wrote {path} ({len(secrets)} secrets, mode 600)")
# =============================================================================
# MAIN MIGRATION
# =============================================================================
def run_migration() -> bool:
"""Run the full migration process."""
logger.info("=" * 60)
logger.info("MeshAI Config Migration v0.2 → v0.3")
logger.info("=" * 60)
# Pre-flight
if not preflight_checks():
return False
# Backup
if not create_backup():
return False
# Load original config
logger.info(f"Loading original config: {SOURCE_FILE}")
with open(SOURCE_FILE, "r") as f:
original_data = yaml.safe_load(f)
if not original_data:
logger.error("Original config is empty or invalid!")
return False
# Make a working copy
import copy
data = copy.deepcopy(original_data)
# Extract local values
logger.info("Extracting operator-local values...")
local_data = extract_local_values(data)
local_count = sum(1 for _ in _count_values(local_data))
logger.info(f" Extracted {local_count} local values")
# Extract secrets
logger.info("Extracting secrets...")
secrets = extract_secrets(data)
logger.info(f" Extracted {len(secrets)} secrets")
# Replace secrets with env var references
data = replace_secrets_with_refs(data, secrets)
# Strip local values from domain data
data = strip_local_values_from_domain(data)
# Create directories
logger.info("Creating directories...")
TARGET_DIR.mkdir(parents=True, exist_ok=True)
SECRETS_DIR.mkdir(parents=True, exist_ok=True)
SECRETS_DIR.chmod(0o700)
logger.info(f" Created {TARGET_DIR}")
logger.info(f" Created {SECRETS_DIR} (mode 700)")
# Write domain files
logger.info("Writing domain files...")
# Group sections by target file
file_contents: dict[str, dict] = {}
for section, target_file in SECTION_TO_FILE.items():
if section in data:
if target_file not in file_contents:
file_contents[target_file] = {}
# For meshtastic.yaml, wrap in section name
if target_file == "meshtastic.yaml":
file_contents[target_file][section] = data[section]
else:
# For dedicated files, the whole file IS the section content
file_contents[target_file] = data[section]
# Handle meshtastic.yaml specially (has both connection and commands)
for target_file, content in file_contents.items():
write_domain_file(TARGET_DIR / target_file, content)
# Write orchestrator
write_orchestrator(TARGET_DIR / "config.yaml", data)
# Write local.yaml
write_local_yaml(TARGET_DIR / "local.yaml", local_data)
# Write .env
if secrets:
write_env_file(SECRETS_DIR / ".env", secrets)
else:
logger.info(" No secrets to write (all were already env var refs)")
# Verification
logger.info("=" * 60)
logger.info("Verifying migration...")
try:
# Import here to use the newly deployed module
sys.path.insert(0, "/app")
from meshai.config_loader import load_config as new_load
from meshai.config import load_config as old_load, _dataclass_to_dict
# Load with new loader
new_config = new_load(TARGET_DIR)
# Load backup with old loader
old_config = old_load(BACKUP_FILE)
# Deep compare all fields using asdict()
old_dict = asdict(old_config)
new_dict = asdict(new_config)
# Remove internal fields that will differ
for d in [old_dict, new_dict]:
d.pop("_config_path", None)
differences = deep_compare(old_dict, new_dict)
if differences:
logger.error("Verification FAILED! Differences found:")
for diff in differences:
logger.error(f" {diff}")
return False
logger.info(" Verification PASSED - config loads correctly")
except Exception as e:
logger.error(f"Verification failed with exception: {e}")
import traceback
traceback.print_exc()
return False
# Summary
logger.info("=" * 60)
logger.info("MIGRATION COMPLETE")
logger.info("=" * 60)
logger.info("")
logger.info("Files created:")
for f in sorted(TARGET_DIR.iterdir()):
logger.info(f" {f} ({f.stat().st_size} bytes)")
if (SECRETS_DIR / ".env").exists():
logger.info(f" {SECRETS_DIR / '.env'} ({len(secrets)} secrets)")
logger.info("")
logger.info(f"Local values extracted: {local_count}")
logger.info(f"Secrets extracted: {len(secrets)} ({', '.join(secrets.keys()) if secrets else 'none'})")
logger.info("")
logger.info(f"Backup at: {BACKUP_FILE}")
logger.info("Delete the backup manually after verifying things work.")
logger.info("")
logger.info("ROLLBACK COMMAND (if needed):")
logger.info(f" rm -rf {TARGET_DIR} {SECRETS_DIR}")
logger.info(f" cp {BACKUP_FILE} {SOURCE_FILE}")
logger.info(" # Then revert main.py loader wiring if changed")
return True
def _count_values(d: dict, prefix: str = "") -> Any:
"""Generator to count leaf values in a nested dict."""
for key, value in d.items():
if isinstance(value, dict):
yield from _count_values(value, f"{prefix}.{key}")
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
yield from _count_values(item, f"{prefix}.{key}[{i}]")
else:
yield f"{prefix}.{key}[{i}]"
else:
yield f"{prefix}.{key}"
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
success = run_migration()
sys.exit(0 if success else 1)