mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-05-21 15:14:45 +02:00
- Changed orchestrator to use meshtastic: include meshtastic.yaml - Added hoisting logic to extract connection/commands from wrapper - Fixes restart loop caused by connection.type defaulting to serial Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
688 lines
25 KiB
Python
688 lines
25 KiB
Python
"""Multi-file configuration loader for MeshAI v0.3.
|
|
|
|
This module provides:
|
|
- !include directive support for splitting config across files
|
|
- Environment variable interpolation (${VAR_NAME} and ${VAR_NAME:-default})
|
|
- Operator-local value merging from local.yaml
|
|
- Secret loading from .env files
|
|
- Section-aware save_section() for dashboard write-back
|
|
|
|
The loader produces the same Config dataclass shape as config.py,
|
|
ensuring backward compatibility with all existing consumers.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import yaml
|
|
from dotenv import dotenv_values
|
|
|
|
# Import existing dataclasses - shape must NOT change
|
|
from .config import (
|
|
Config,
|
|
_dict_to_dataclass,
|
|
_dataclass_to_dict,
|
|
)
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
# =============================================================================
|
|
# SECTION TO FILE MAPPING
|
|
# =============================================================================
|
|
|
|
SECTION_TO_FILE: dict[str, str] = {
|
|
# Inline in orchestrator config.yaml
|
|
"timezone": "config.yaml",
|
|
"bot": "config.yaml",
|
|
"response": "config.yaml",
|
|
"history": "config.yaml",
|
|
"memory": "config.yaml",
|
|
"context": "config.yaml",
|
|
"weather": "config.yaml",
|
|
"meshmonitor": "config.yaml",
|
|
"knowledge": "config.yaml",
|
|
|
|
# Domain files
|
|
"connection": "meshtastic.yaml",
|
|
"commands": "meshtastic.yaml",
|
|
"mesh_sources": "mesh_sources.yaml",
|
|
"mesh_intelligence": "mesh_intelligence.yaml",
|
|
"environmental": "env_feeds.yaml",
|
|
"notifications": "notifications.yaml",
|
|
"llm": "llm.yaml",
|
|
"dashboard": "dashboard.yaml",
|
|
}
|
|
|
|
# Fields that should be written to local.yaml instead of domain files
|
|
LOCAL_FIELDS: dict[str, str] = {
|
|
"bot.name": "identity.name",
|
|
"bot.owner": "identity.owner",
|
|
"connection.tcp_host": "infrastructure.tcp_host",
|
|
"knowledge.qdrant_host": "infrastructure.qdrant_host",
|
|
"knowledge.tei_host": "infrastructure.tei_host",
|
|
"knowledge.sparse_host": "infrastructure.sparse_host",
|
|
"meshmonitor.url": "mesh_sources.meshmonitor_url",
|
|
"mesh_intelligence.critical_nodes": "critical_nodes",
|
|
"environmental.ducting.latitude": "env_center.latitude",
|
|
"environmental.ducting.longitude": "env_center.longitude",
|
|
}
|
|
|
|
# Fields that contain secrets - NEVER written, must be in .env
|
|
SECRET_FIELDS: set[str] = {
|
|
"llm.api_key",
|
|
"mesh_sources.*.api_token",
|
|
"mesh_sources.*.password",
|
|
"environmental.traffic.api_key",
|
|
"environmental.firms.map_key",
|
|
"notifications.rules.*.smtp_password",
|
|
}
|
|
|
|
# Secret env var names expected in .env
|
|
EXPECTED_SECRETS: list[str] = [
|
|
"OPENAI_API_KEY",
|
|
"ANTHROPIC_API_KEY",
|
|
"GOOGLE_API_KEY",
|
|
"MESHMONITOR_API_TOKEN",
|
|
"MQTT_PASSWORD",
|
|
"TOMTOM_API_KEY",
|
|
"FIRMS_MAP_KEY",
|
|
"SMTP_PASSWORD",
|
|
]
|
|
|
|
|
|
# =============================================================================
|
|
# YAML !INCLUDE CONSTRUCTOR
|
|
# =============================================================================
|
|
|
|
# Global set for tracking files currently being loaded (cycle detection)
|
|
_loading_files: set[Path] = set()
|
|
|
|
|
|
def _make_include_loader(base_path: Path):
|
|
"""Create an IncludeLoader class with the given base path."""
|
|
|
|
class IncludeLoader(yaml.SafeLoader):
|
|
"""YAML loader with !include tag support."""
|
|
pass
|
|
|
|
def construct_include(loader: IncludeLoader, node: yaml.Node) -> Any:
|
|
"""Handle !include directive."""
|
|
relative_path = loader.construct_scalar(node)
|
|
include_path = (base_path / relative_path).resolve()
|
|
|
|
# Cycle detection using global set
|
|
if include_path in _loading_files:
|
|
raise yaml.YAMLError(
|
|
f"Circular include detected: {include_path} is already being loaded. "
|
|
f"Current loading chain: {[str(p) for p in _loading_files]}"
|
|
)
|
|
|
|
if not include_path.exists():
|
|
raise yaml.YAMLError(
|
|
f"Include file not found: {include_path} "
|
|
f"(referenced from {base_path / 'config.yaml'})"
|
|
)
|
|
|
|
_loading_files.add(include_path)
|
|
try:
|
|
with open(include_path, "r") as f:
|
|
# Recursively load with the include file's directory as new base
|
|
NestedLoader = _make_include_loader(include_path.parent)
|
|
return yaml.load(f, Loader=NestedLoader)
|
|
finally:
|
|
_loading_files.discard(include_path)
|
|
|
|
IncludeLoader.add_constructor("!include", construct_include)
|
|
return IncludeLoader
|
|
|
|
|
|
def _load_yaml_with_includes(file_path: Path) -> dict:
|
|
"""Load a YAML file with !include directive support."""
|
|
global _loading_files
|
|
_loading_files.clear() # Reset cycle detection
|
|
|
|
if not file_path.exists():
|
|
return {}
|
|
|
|
# Add the root file to loading set
|
|
file_path = file_path.resolve()
|
|
_loading_files.add(file_path)
|
|
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
Loader = _make_include_loader(file_path.parent)
|
|
return yaml.load(f, Loader=Loader) or {}
|
|
finally:
|
|
_loading_files.discard(file_path)
|
|
|
|
|
|
# =============================================================================
|
|
# ENVIRONMENT VARIABLE INTERPOLATION
|
|
# =============================================================================
|
|
|
|
_ENV_PATTERN = re.compile(r"\$\{([A-Z_][A-Z0-9_]*)(?::-([^}]*))?\}")
|
|
|
|
|
|
def _interpolate_env_vars(value: Any, env: dict[str, str]) -> Any:
|
|
"""Recursively interpolate ${VAR_NAME} and ${VAR_NAME:-default} in strings.
|
|
|
|
Args:
|
|
value: The value to interpolate (can be string, dict, list, or other)
|
|
env: Combined environment (os.environ + .env file values)
|
|
|
|
Returns:
|
|
The value with environment variables resolved
|
|
"""
|
|
if isinstance(value, str):
|
|
def replace_match(match):
|
|
var_name = match.group(1)
|
|
default = match.group(2)
|
|
|
|
# os.environ takes precedence over .env file
|
|
resolved = os.environ.get(var_name)
|
|
if resolved is None:
|
|
resolved = env.get(var_name)
|
|
if resolved is None:
|
|
if default is not None:
|
|
return default
|
|
_logger.warning(
|
|
f"Environment variable ${{{var_name}}} not found and no default provided. "
|
|
"Using empty string."
|
|
)
|
|
return ""
|
|
return resolved
|
|
|
|
return _ENV_PATTERN.sub(replace_match, value)
|
|
|
|
elif isinstance(value, dict):
|
|
return {k: _interpolate_env_vars(v, env) for k, v in value.items()}
|
|
|
|
elif isinstance(value, list):
|
|
return [_interpolate_env_vars(item, env) for item in value]
|
|
|
|
return value
|
|
|
|
|
|
# =============================================================================
|
|
# LOCAL.YAML MERGING
|
|
# =============================================================================
|
|
|
|
def _merge_local_values(data: dict, local: dict) -> dict:
|
|
"""Merge operator-local values from local.yaml into the config data.
|
|
|
|
This handles:
|
|
- identity.name/owner -> bot.name/owner
|
|
- infrastructure.* -> connection/knowledge hosts
|
|
- regions.{name}.lat/lon -> mesh_intelligence.regions[name].lat/lon
|
|
- critical_nodes -> mesh_intelligence.critical_nodes
|
|
- mesh_sources.sources.{name}.* -> mesh_sources[name].*
|
|
- env_center.* -> environmental.ducting.*
|
|
- notification_targets.* -> notifications rules
|
|
|
|
Args:
|
|
data: The loaded config data (will be modified in place)
|
|
local: The local.yaml data
|
|
|
|
Returns:
|
|
The merged data dict
|
|
"""
|
|
if not local:
|
|
return data
|
|
|
|
# Identity -> bot
|
|
identity = local.get("identity", {})
|
|
if "bot" in data:
|
|
if identity.get("name"):
|
|
data["bot"]["name"] = identity["name"]
|
|
if identity.get("owner"):
|
|
data["bot"]["owner"] = identity["owner"]
|
|
|
|
# Infrastructure hosts
|
|
infra = local.get("infrastructure", {})
|
|
if infra.get("tcp_host") and "connection" in data:
|
|
data["connection"]["tcp_host"] = infra["tcp_host"]
|
|
if "knowledge" in data:
|
|
if infra.get("qdrant_host"):
|
|
data["knowledge"]["qdrant_host"] = infra["qdrant_host"]
|
|
if infra.get("tei_host"):
|
|
data["knowledge"]["tei_host"] = infra["tei_host"]
|
|
if infra.get("sparse_host"):
|
|
data["knowledge"]["sparse_host"] = infra["sparse_host"]
|
|
|
|
# Meshmonitor URL
|
|
mesh_sources_local = local.get("mesh_sources", {})
|
|
if mesh_sources_local.get("meshmonitor_url") and "meshmonitor" in data:
|
|
data["meshmonitor"]["url"] = mesh_sources_local["meshmonitor_url"]
|
|
|
|
# Mesh sources URLs
|
|
sources_local = mesh_sources_local.get("sources", {})
|
|
if "mesh_sources" in data and isinstance(data["mesh_sources"], list):
|
|
for source in data["mesh_sources"]:
|
|
if isinstance(source, dict):
|
|
source_name = source.get("name", "")
|
|
local_source = sources_local.get(source_name, {})
|
|
if local_source.get("url"):
|
|
source["url"] = local_source["url"]
|
|
if local_source.get("host"):
|
|
source["host"] = local_source["host"]
|
|
|
|
# Region coordinates
|
|
regions_local = local.get("regions", {})
|
|
if "mesh_intelligence" in data:
|
|
mi = data["mesh_intelligence"]
|
|
if "regions" in mi and isinstance(mi["regions"], list):
|
|
for region in mi["regions"]:
|
|
if isinstance(region, dict):
|
|
region_name = region.get("name", "")
|
|
local_coords = regions_local.get(region_name, {})
|
|
if "lat" in local_coords:
|
|
region["lat"] = local_coords["lat"]
|
|
if "lon" in local_coords:
|
|
region["lon"] = local_coords["lon"]
|
|
|
|
# Critical nodes
|
|
if local.get("critical_nodes"):
|
|
mi["critical_nodes"] = local["critical_nodes"]
|
|
|
|
# Environmental center point
|
|
env_center = local.get("env_center", {})
|
|
if "environmental" in data:
|
|
env = data["environmental"]
|
|
if "ducting" in env:
|
|
if env_center.get("latitude") is not None:
|
|
env["ducting"]["latitude"] = env_center["latitude"]
|
|
if env_center.get("longitude") is not None:
|
|
env["ducting"]["longitude"] = env_center["longitude"]
|
|
|
|
# NWS user agent from contact email
|
|
if identity.get("contact_email") and "nws" in env:
|
|
email = identity["contact_email"]
|
|
env["nws"]["user_agent"] = f"(meshai, {email})"
|
|
|
|
# Notification targets
|
|
notif_targets = local.get("notification_targets", {})
|
|
if "notifications" in data and "rules" in data["notifications"]:
|
|
alert_node_ids = notif_targets.get("alert_node_ids", [])
|
|
smtp_recipients = notif_targets.get("smtp_recipients", [])
|
|
|
|
for rule in data["notifications"]["rules"]:
|
|
if isinstance(rule, dict):
|
|
# Apply default node_ids if not set
|
|
if rule.get("delivery_type") == "mesh_dm" and not rule.get("node_ids"):
|
|
rule["node_ids"] = alert_node_ids
|
|
# Apply default recipients if not set
|
|
if rule.get("delivery_type") == "email" and not rule.get("recipients"):
|
|
rule["recipients"] = smtp_recipients
|
|
# Apply smtp_from
|
|
if notif_targets.get("smtp_from") and not rule.get("from_address"):
|
|
rule["from_address"] = notif_targets["smtp_from"]
|
|
|
|
return data
|
|
|
|
|
|
# =============================================================================
|
|
# VALIDATION
|
|
# =============================================================================
|
|
|
|
def _validate_config(data: dict, local: dict, env: dict[str, str]) -> None:
|
|
"""Validate config and log warnings for missing values.
|
|
|
|
This does NOT raise errors - MeshAI starts in degraded mode with missing values.
|
|
"""
|
|
# Check regions for missing coordinates
|
|
if "mesh_intelligence" in data:
|
|
mi = data["mesh_intelligence"]
|
|
if mi.get("enabled") and "regions" in mi:
|
|
regions_local = local.get("regions", {}) if local else {}
|
|
for region in mi["regions"]:
|
|
if isinstance(region, dict):
|
|
region_name = region.get("name", "unknown")
|
|
if not region.get("lat") or not region.get("lon"):
|
|
if region_name not in regions_local:
|
|
_logger.warning(
|
|
f"Region '{region_name}' has no coordinates in local.yaml - "
|
|
"geographic features disabled for this region"
|
|
)
|
|
|
|
# Check for missing secrets
|
|
missing_secrets = []
|
|
for secret in EXPECTED_SECRETS:
|
|
if not os.environ.get(secret) and not env.get(secret):
|
|
missing_secrets.append(secret)
|
|
|
|
if missing_secrets:
|
|
_logger.warning(
|
|
f"Missing secret environment variables: {', '.join(missing_secrets)}. "
|
|
"Some features may be disabled."
|
|
)
|
|
|
|
# Check LLM API key
|
|
if "llm" in data:
|
|
api_key = data["llm"].get("api_key", "")
|
|
if not api_key or (api_key.startswith("${") and api_key.endswith("}")):
|
|
# It's a reference, check if resolved
|
|
backend = data["llm"].get("backend", "openai").lower()
|
|
key_var = {
|
|
"openai": "OPENAI_API_KEY",
|
|
"anthropic": "ANTHROPIC_API_KEY",
|
|
"google": "GOOGLE_API_KEY",
|
|
}.get(backend, "LLM_API_KEY")
|
|
if not os.environ.get(key_var) and not env.get(key_var):
|
|
_logger.warning(
|
|
f"LLM backend '{backend}' configured but {key_var} not found. "
|
|
"LLM responses will fail."
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN LOADER
|
|
# =============================================================================
|
|
|
|
def load_config(config_dir: Path = Path("/data/config")) -> Config:
|
|
"""Load configuration from multi-file layout.
|
|
|
|
This function:
|
|
1. Reads config.yaml (orchestrator) with !include directives
|
|
2. Reads local.yaml if present (operator-local values)
|
|
3. Reads /data/secrets/.env if present (secret values)
|
|
4. Interpolates ${VAR_NAME} references
|
|
5. Merges local values into config
|
|
6. Validates and logs warnings for missing values
|
|
7. Returns the same Config dataclass shape
|
|
|
|
Args:
|
|
config_dir: Path to config directory (default: /data/config)
|
|
|
|
Returns:
|
|
Config dataclass instance
|
|
"""
|
|
config_dir = Path(config_dir)
|
|
|
|
# Determine config file path
|
|
# Support both new layout (/data/config/config.yaml) and legacy (/data/config.yaml)
|
|
orchestrator_path = config_dir / "config.yaml"
|
|
legacy_path = config_dir.parent / "config.yaml" if config_dir.name == "config" else None
|
|
|
|
if not orchestrator_path.exists():
|
|
if legacy_path and legacy_path.exists():
|
|
# Fall back to legacy single-file config
|
|
_logger.info(f"Using legacy config at {legacy_path}")
|
|
from .config import load_config as legacy_load
|
|
return legacy_load(legacy_path)
|
|
else:
|
|
_logger.warning(
|
|
f"Config file not found at {orchestrator_path}. "
|
|
"Using default configuration."
|
|
)
|
|
config = Config()
|
|
config._config_path = orchestrator_path
|
|
return config
|
|
|
|
# Load orchestrator with !include support
|
|
_logger.debug(f"Loading config from {orchestrator_path}")
|
|
data = _load_yaml_with_includes(orchestrator_path)
|
|
# Hoist meshtastic.connection and meshtastic.commands to top level
|
|
# meshtastic.yaml contains both sections under wrapper keys
|
|
if "meshtastic" in data and isinstance(data["meshtastic"], dict):
|
|
meshtastic = data.pop("meshtastic")
|
|
if "connection" in meshtastic:
|
|
data["connection"] = meshtastic["connection"]
|
|
if "commands" in meshtastic:
|
|
data["commands"] = meshtastic["commands"]
|
|
|
|
# Load local.yaml
|
|
local_path = config_dir / "local.yaml"
|
|
local_data = {}
|
|
if local_path.exists():
|
|
with open(local_path, "r") as f:
|
|
local_data = yaml.safe_load(f) or {}
|
|
_logger.debug(f"Loaded local config from {local_path}")
|
|
else:
|
|
_logger.warning(
|
|
f"No local.yaml found at {local_path}. "
|
|
"MeshAI is in no-location mode - geographic features disabled."
|
|
)
|
|
|
|
# Load secrets from .env
|
|
secrets_path = config_dir.parent / "secrets" / ".env"
|
|
env_data = {}
|
|
if secrets_path.exists():
|
|
env_data = dotenv_values(secrets_path)
|
|
_logger.debug(f"Loaded {len(env_data)} secrets from {secrets_path}")
|
|
else:
|
|
# Try alternate location
|
|
alt_secrets_path = Path("/data/secrets/.env")
|
|
if alt_secrets_path.exists():
|
|
env_data = dotenv_values(alt_secrets_path)
|
|
_logger.debug(f"Loaded {len(env_data)} secrets from {alt_secrets_path}")
|
|
else:
|
|
_logger.warning(
|
|
f"No .env file found at {secrets_path}. "
|
|
"API keys must be set via environment variables."
|
|
)
|
|
|
|
# Interpolate environment variables
|
|
data = _interpolate_env_vars(data, env_data)
|
|
|
|
# Merge local values
|
|
data = _merge_local_values(data, local_data)
|
|
|
|
# Validate and warn
|
|
_validate_config(data, local_data, env_data)
|
|
|
|
# Convert to Config dataclass
|
|
config = _dict_to_dataclass(Config, data)
|
|
config._config_path = orchestrator_path
|
|
|
|
return config
|
|
|
|
|
|
# =============================================================================
|
|
# SECTION SAVER
|
|
# =============================================================================
|
|
|
|
def _is_secret_field(section: str, field_path: str) -> bool:
|
|
"""Check if a field path matches a secret field pattern."""
|
|
full_path = f"{section}.{field_path}" if field_path else section
|
|
|
|
for pattern in SECRET_FIELDS:
|
|
# Convert pattern to regex
|
|
regex = pattern.replace(".", r"\.").replace("*", r"[^.]+")
|
|
if re.match(f"^{regex}$", full_path):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _extract_local_fields(section: str, data: dict) -> tuple[dict, dict]:
|
|
"""Extract local fields from data.
|
|
|
|
Returns:
|
|
(domain_data, local_data) - data split by destination
|
|
"""
|
|
domain_data = dict(data)
|
|
local_data = {}
|
|
|
|
# Check each LOCAL_FIELDS pattern
|
|
for field_pattern, local_path in LOCAL_FIELDS.items():
|
|
if not field_pattern.startswith(f"{section}."):
|
|
continue
|
|
|
|
# Extract field name from pattern
|
|
field_name = field_pattern[len(section) + 1:] # Remove "section."
|
|
|
|
if ".*." in field_name:
|
|
# Array field pattern - handle specially
|
|
continue
|
|
|
|
if field_name in domain_data:
|
|
# Move to local_data using the local_path
|
|
value = domain_data.pop(field_name)
|
|
# Build nested structure in local_data
|
|
parts = local_path.split(".")
|
|
current = local_data
|
|
for part in parts[:-1]:
|
|
if part not in current:
|
|
current[part] = {}
|
|
current = current[part]
|
|
current[parts[-1]] = value
|
|
|
|
return domain_data, local_data
|
|
|
|
|
|
def save_section(
|
|
section_name: str,
|
|
data: dict,
|
|
config_dir: Path = Path("/data/config"),
|
|
) -> dict:
|
|
"""Save a configuration section to the appropriate file(s).
|
|
|
|
This function:
|
|
1. Determines which file(s) the section belongs to
|
|
2. Extracts local-identifying fields to local.yaml
|
|
3. Rejects attempts to save secret fields
|
|
4. Writes domain data to the appropriate file
|
|
5. Writes local data to local.yaml
|
|
|
|
Args:
|
|
section_name: Name of the section (e.g., "notifications", "llm")
|
|
data: The section data as a dict
|
|
config_dir: Path to config directory
|
|
|
|
Returns:
|
|
Dict with status: {"saved": True, "files_written": [...], "rejected_secrets": [...]}
|
|
|
|
Raises:
|
|
ValueError: If section_name is not recognized
|
|
"""
|
|
config_dir = Path(config_dir)
|
|
|
|
if section_name not in SECTION_TO_FILE:
|
|
raise ValueError(
|
|
f"Unknown section '{section_name}'. "
|
|
f"Valid sections: {', '.join(sorted(SECTION_TO_FILE.keys()))}"
|
|
)
|
|
|
|
target_file = SECTION_TO_FILE[section_name]
|
|
target_path = config_dir / target_file
|
|
local_path = config_dir / "local.yaml"
|
|
|
|
files_written = []
|
|
rejected_secrets = []
|
|
|
|
# Check for secret fields and reject them
|
|
def check_secrets(d: dict, path: str = "") -> dict:
|
|
cleaned = {}
|
|
for key, value in d.items():
|
|
field_path = f"{path}.{key}" if path else key
|
|
if _is_secret_field(section_name, field_path):
|
|
rejected_secrets.append(field_path)
|
|
_logger.error(
|
|
f"Rejected attempt to save secret field '{section_name}.{field_path}'. "
|
|
"Secret fields must be set via /data/secrets/.env"
|
|
)
|
|
elif isinstance(value, dict):
|
|
cleaned[key] = check_secrets(value, field_path)
|
|
elif isinstance(value, list):
|
|
cleaned[key] = [
|
|
check_secrets(item, f"{field_path}[{i}]")
|
|
if isinstance(item, dict) else item
|
|
for i, item in enumerate(value)
|
|
]
|
|
else:
|
|
cleaned[key] = value
|
|
return cleaned
|
|
|
|
data = check_secrets(data)
|
|
|
|
# Extract local fields
|
|
domain_data, local_updates = _extract_local_fields(section_name, data)
|
|
|
|
# Load existing target file
|
|
if target_path.exists():
|
|
with open(target_path, "r") as f:
|
|
existing = yaml.safe_load(f) or {}
|
|
else:
|
|
existing = {}
|
|
|
|
# Handle sections that share a file (meshtastic.yaml has both connection and commands)
|
|
if target_file == "meshtastic.yaml":
|
|
existing[section_name] = domain_data
|
|
elif target_file == "config.yaml":
|
|
# For orchestrator, update the section in place
|
|
existing[section_name] = domain_data
|
|
else:
|
|
# For dedicated files, the whole file IS the section
|
|
existing = domain_data
|
|
|
|
# Write domain file
|
|
with open(target_path, "w") as f:
|
|
yaml.dump(existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
files_written.append(str(target_path))
|
|
_logger.info(f"Saved {section_name} to {target_path}")
|
|
|
|
# Update local.yaml if there are local fields
|
|
if local_updates:
|
|
if local_path.exists():
|
|
with open(local_path, "r") as f:
|
|
local_existing = yaml.safe_load(f) or {}
|
|
else:
|
|
local_existing = {}
|
|
|
|
# Deep merge local_updates into local_existing
|
|
def deep_merge(base: dict, updates: dict) -> dict:
|
|
for key, value in updates.items():
|
|
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
|
|
deep_merge(base[key], value)
|
|
else:
|
|
base[key] = value
|
|
return base
|
|
|
|
deep_merge(local_existing, local_updates)
|
|
|
|
with open(local_path, "w") as f:
|
|
yaml.dump(local_existing, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
|
|
# Set restrictive permissions on local.yaml
|
|
local_path.chmod(0o600)
|
|
files_written.append(str(local_path))
|
|
_logger.info(f"Updated local values in {local_path}")
|
|
|
|
return {
|
|
"saved": True,
|
|
"files_written": files_written,
|
|
"rejected_secrets": rejected_secrets,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# UTILITY FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def get_config_dir_from_path(config_path: Path) -> Path:
|
|
"""Determine config directory from a config file path.
|
|
|
|
Args:
|
|
config_path: Path to config.yaml (could be legacy or new layout)
|
|
|
|
Returns:
|
|
Path to config directory
|
|
"""
|
|
config_path = Path(config_path)
|
|
|
|
if config_path.is_dir():
|
|
return config_path
|
|
|
|
# If pointing to config.yaml in new layout
|
|
if config_path.name == "config.yaml" and config_path.parent.name == "config":
|
|
return config_path.parent
|
|
|
|
# If pointing to legacy /data/config.yaml
|
|
if config_path.name == "config.yaml":
|
|
new_layout = config_path.parent / "config"
|
|
if new_layout.exists() and (new_layout / "config.yaml").exists():
|
|
return new_layout
|
|
|
|
return config_path.parent
|