From a4916848616fa35bc7bde47f84db8aba87de6147 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 28 May 2026 04:55:20 +0000 Subject: [PATCH] fix(central): v0.4 C.3.1 -- preserve secret refs in save_section + deliver_policy=NEW (no backlog flood) Fixes the two real bugs C.3 surfaced when flipping usgs_quake to central. BUG #1 -- GUI save dropped ${VAR} secret refs (config_loader.save_section). before: A GUI PUT round-trips the *interpolated* secret value (GET returns the resolved key string, e.g. the real TomTom key). save_section's check_secrets saw a literal string at a SECRET_FIELDS path, didn't recognize it as a ref, and DROPPED it -- losing the on-disk ${TOMTOM_API_KEY} placeholder. C.3's flip PUT stripped TomTom's key. after: check_secrets now reads the raw on-disk value (pre-interpolation) for each secret field and decides three ways: on-disk ${VAR} and new == resolved(VAR) -> keep the ${VAR} ref on-disk ${VAR} and new != resolved(VAR) -> intentional change, store it no on-disk ${VAR} ref -> reject (never write a raw secret to a domain file) ${VAR} resolution mirrors load: os.environ first, then /data/secrets/.env. The common case (GUI re-saves unchanged config) now preserves the placeholder instead of dropping it. BUG #2 -- CentralConsumer replayed the entire retained backlog on first flip. before: js.subscribe(...) with no config -> default deliver_policy=all. Fine for quake (682 msgs) but would flood the bus with ~330k traffic_flow messages on first flip. after: consumer_config() -> ConsumerConfig(deliver_policy=DeliverPolicy.NEW): only messages published AFTER consumer creation. meshai won't see the backlog on first flip -- acceptable, Central is a live firehose for current events. (NOT geo-filtering -- that's a Central-side issue filed separately for the Central project.) Files: meshai/config_loader.py (save_section secret preservation), meshai/central/consumer.py (consumer_config() + deliver_policy=NEW), tests/test_save_section_secret_preserve.py (new), tests/test_central_consumer.py (deliver_policy assertion). Verification: - (A) py_compile clean on config_loader.py + consumer.py. - (C) pytest -q: 276 passed (272 + 4 new -- preserve-unchanged-ref, changed-value-written, no-placeholder-still-rejects, deliver_policy=NEW). The C.2.1 strip test still passes (no placeholder -> reject). - (D) In-prod (rebuilt): GET+PUT /api/config/environmental round-trip -> {"saved":true}; on-disk traffic.api_key stayed '${TOMTOM_API_KEY}' (SECRET_REF_PRESERVED: True), not the literal key; disk restored to baseline. consumer_config().deliver_policy == DeliverPolicy.NEW in the built image. Follow-up for D rollout: the durable 'meshai-v04-central_quake_' created during C.3 was made with deliver_policy=all; re-flipping a domain may need that stale durable deleted on the Central NATS server first (config mismatch on re-subscribe). D rollout (remaining domains) is now safe: GUI flips preserve secret refs and new subscriptions don't replay huge backlogs. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/central/consumer.py | 14 ++++- meshai/config_loader.py | 60 ++++++++++++++++++++-- tests/test_central_consumer.py | 7 +++ tests/test_save_section_secret_preserve.py | 57 ++++++++++++++++++++ 4 files changed, 132 insertions(+), 6 deletions(-) create mode 100644 tests/test_save_section_secret_preserve.py diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index 1547bb8..ce8003d 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -22,6 +22,17 @@ from meshai.notifications.events import Event, make_event logger = logging.getLogger("meshai.central.consumer") +def consumer_config(): + """JetStream consumer config for Central subscriptions. + + deliver_policy=NEW: subscribe to messages published AFTER consumer creation. + Avoids replaying the entire retained backlog on first flip (could be 330k+ + msgs for high-volume streams like traffic_flow). + """ + from nats.js.api import ConsumerConfig, DeliverPolicy + return ConsumerConfig(deliver_policy=DeliverPolicy.NEW) + + # meshai adapter (env-config attr) -> Central subject filters it consumes. # Adapters with no Central equivalent (avalanche, ducting, roads511) are absent; # setting source=central on those subscribes to nothing (logged). @@ -229,7 +240,8 @@ class CentralConsumer: self._js = self._nc.jetstream() for subj in subjects: durable = self._central.durable + "-" + re.sub(r"[^a-z0-9]+", "_", subj.lower()) - sub = await self._js.subscribe(subj, durable=durable, cb=self._on_message) + sub = await self._js.subscribe( + subj, durable=durable, cb=self._on_message, config=consumer_config()) self._subs.append(sub) logger.info("CentralConsumer started; %d subjects subscribed: %s", len(subjects), subjects) diff --git a/meshai/config_loader.py b/meshai/config_loader.py index 0a5d019..44809ad 100644 --- a/meshai/config_loader.py +++ b/meshai/config_loader.py @@ -573,16 +573,66 @@ def save_section( rejected_secrets = [] # Check for secret fields and reject them + # --- secret-ref preservation (v0.4 C.3.1) ------------------------------- + # A GUI save round-trips the *interpolated* value of a ${VAR} secret (the + # GET returns the resolved key string). Without this, save_section would + # drop the on-disk ${VAR} placeholder and lose the secret reference. So we + # read the raw on-disk values (pre-interpolation) and, for each secret + # field, decide: + # on-disk ${VAR} and new value == resolved(VAR) -> keep the ${VAR} ref + # on-disk ${VAR} and new value != resolved(VAR) -> intentional change, store it + # no on-disk ${VAR} ref -> reject (never write a raw + # secret to a domain file) + _raw_on_disk = {} + if target_path.exists(): + try: + with open(target_path) as _rf: + _raw_on_disk = yaml.safe_load(_rf) or {} + except Exception: + _raw_on_disk = {} + if target_file in ("meshtastic.yaml", "config.yaml") and isinstance(_raw_on_disk, dict): + _raw_section = _raw_on_disk.get(section_name) or {} + else: + _raw_section = _raw_on_disk if isinstance(_raw_on_disk, dict) else {} + + _secrets_path = config_dir.parent / "secrets" / ".env" + if not _secrets_path.exists(): + _secrets_path = Path("/data/secrets/.env") + _env_file = dotenv_values(_secrets_path) if _secrets_path.exists() else {} + + _VAR_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$") + + def _resolve_var(name: str): + v = os.environ.get(name) + return v if v is not None else _env_file.get(name) + + def _ondisk_ref(field_path: str): + node = _raw_section + for part in field_path.split("."): + if isinstance(node, dict) and part in node: + node = node[part] + else: + return None + return node + def check_secrets(d: dict, path: str = "") -> dict: cleaned = {} for key, value in d.items(): field_path = f"{path}.{key}" if path else key if _is_secret_field(section_name, field_path): - rejected_secrets.append(field_path) - _logger.error( - f"Rejected attempt to save secret field '{section_name}.{field_path}'. " - "Secret fields must be set via /data/secrets/.env" - ) + ref = _ondisk_ref(field_path) + m = _VAR_RE.match(ref) if isinstance(ref, str) else None + if m: + if _resolve_var(m.group(1)) == (value if isinstance(value, str) else str(value)): + cleaned[key] = ref # unchanged secret -> preserve ${VAR} placeholder + else: + cleaned[key] = value # intentional change -> store new value + else: + rejected_secrets.append(field_path) + _logger.error( + f"Rejected attempt to save secret field '{section_name}.{field_path}'. " + "Secret fields must be set via /data/secrets/.env" + ) elif isinstance(value, dict): cleaned[key] = check_secrets(value, field_path) elif isinstance(value, list): diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py index 6295553..a3c716a 100644 --- a/tests/test_central_consumer.py +++ b/tests/test_central_consumer.py @@ -137,3 +137,10 @@ def test_start_no_op_when_all_native(): c, env, rec = make_consumer() asyncio.run(c.start()) # must not raise / must not require NATS assert c._nc is None + + +def test_consumer_config_uses_deliver_policy_new(): + """C.3.1: Central subscriptions use deliver_policy=NEW (no full-backlog replay).""" + from meshai.central.consumer import consumer_config + from nats.js.api import DeliverPolicy + assert consumer_config().deliver_policy == DeliverPolicy.NEW diff --git a/tests/test_save_section_secret_preserve.py b/tests/test_save_section_secret_preserve.py new file mode 100644 index 0000000..50adfcd --- /dev/null +++ b/tests/test_save_section_secret_preserve.py @@ -0,0 +1,57 @@ +"""v0.4 C.3.1: save_section preserves on-disk ${VAR} secret refs instead of +dropping them when a GUI save round-trips the interpolated value.""" + +import yaml + +from meshai.config_loader import save_section + + +def _setup(tmp_path, env_yaml, dotenv): + cfg = tmp_path / "config" + cfg.mkdir() + sec = tmp_path / "secrets" + sec.mkdir() + (cfg / "env_feeds.yaml").write_text(env_yaml) + (sec / ".env").write_text(dotenv) + return cfg + + +def test_preserves_unchanged_secret_ref(tmp_path): + # on-disk has ${C31_TEST_KEY}; GUI submits the resolved value -> keep the ref + cfg = _setup( + tmp_path, + "enabled: true\ntraffic:\n enabled: true\n api_key: ${C31_TEST_KEY}\n", + "C31_TEST_KEY=realkey123\n", + ) + res = save_section("environmental", + {"enabled": True, "traffic": {"enabled": True, "api_key": "realkey123"}}, + cfg) + written = yaml.safe_load((cfg / "env_feeds.yaml").read_text()) + assert written["traffic"]["api_key"] == "${C31_TEST_KEY}" # placeholder preserved + assert "traffic.api_key" not in res["rejected_secrets"] + + +def test_changed_secret_value_is_written(tmp_path): + # on-disk ${C31_TEST_KEY}; GUI submits a DIFFERENT value -> intentional change stored + cfg = _setup( + tmp_path, + "enabled: true\ntraffic:\n enabled: true\n api_key: ${C31_TEST_KEY}\n", + "C31_TEST_KEY=oldkey\n", + ) + save_section("environmental", + {"enabled": True, "traffic": {"enabled": True, "api_key": "NEWKEY999"}}, + cfg) + written = yaml.safe_load((cfg / "env_feeds.yaml").read_text()) + assert written["traffic"]["api_key"] == "NEWKEY999" + + +def test_no_placeholder_still_rejects(tmp_path): + # no on-disk ${VAR} ref -> a raw secret must be rejected, never written + cfg = tmp_path / "config" + cfg.mkdir() + res = save_section("environmental", + {"enabled": True, "traffic": {"enabled": True, "api_key": "RAWSECRET"}}, + cfg) + written = yaml.safe_load((cfg / "env_feeds.yaml").read_text()) + assert "api_key" not in written.get("traffic", {}) + assert "traffic.api_key" in res["rejected_secrets"]