fix(central): v0.4 C.3.1 -- preserve secret refs in save_section + deliver_policy=NEW (no backlog flood)

Fixes the two real bugs C.3 surfaced when flipping usgs_quake to central.

BUG #1 -- GUI save dropped ${VAR} secret refs (config_loader.save_section).
  before: A GUI PUT round-trips the *interpolated* secret value (GET returns the
          resolved key string, e.g. the real TomTom key). save_section's
          check_secrets saw a literal string at a SECRET_FIELDS path, didn't
          recognize it as a ref, and DROPPED it -- losing the on-disk
          ${TOMTOM_API_KEY} placeholder. C.3's flip PUT stripped TomTom's key.
  after:  check_secrets now reads the raw on-disk value (pre-interpolation) for
          each secret field and decides three ways:
            on-disk ${VAR} and new == resolved(VAR)  -> keep the ${VAR} ref
            on-disk ${VAR} and new != resolved(VAR)  -> intentional change, store it
            no on-disk ${VAR} ref                    -> reject (never write a raw
                                                        secret to a domain file)
          ${VAR} resolution mirrors load: os.environ first, then /data/secrets/.env.
          The common case (GUI re-saves unchanged config) now preserves the
          placeholder instead of dropping it.

BUG #2 -- CentralConsumer replayed the entire retained backlog on first flip.
  before: js.subscribe(...) with no config -> default deliver_policy=all. Fine
          for quake (682 msgs) but would flood the bus with ~330k traffic_flow
          messages on first flip.
  after:  consumer_config() -> ConsumerConfig(deliver_policy=DeliverPolicy.NEW):
          only messages published AFTER consumer creation. meshai won't see the
          backlog on first flip -- acceptable, Central is a live firehose for
          current events. (NOT geo-filtering -- that's a Central-side issue filed
          separately for the Central project.)

Files: meshai/config_loader.py (save_section secret preservation),
meshai/central/consumer.py (consumer_config() + deliver_policy=NEW),
tests/test_save_section_secret_preserve.py (new),
tests/test_central_consumer.py (deliver_policy assertion).

Verification:
- (A) py_compile clean on config_loader.py + consumer.py.
- (C) pytest -q: 276 passed (272 + 4 new -- preserve-unchanged-ref,
  changed-value-written, no-placeholder-still-rejects, deliver_policy=NEW).
  The C.2.1 strip test still passes (no placeholder -> reject).
- (D) In-prod (rebuilt): GET+PUT /api/config/environmental round-trip ->
  {"saved":true}; on-disk traffic.api_key stayed '${TOMTOM_API_KEY}'
  (SECRET_REF_PRESERVED: True), not the literal key; disk restored to baseline.
  consumer_config().deliver_policy == DeliverPolicy.NEW in the built image.

Follow-up for D rollout: the durable 'meshai-v04-central_quake_' created during
C.3 was made with deliver_policy=all; re-flipping a domain may need that stale
durable deleted on the Central NATS server first (config mismatch on re-subscribe).

D rollout (remaining domains) is now safe: GUI flips preserve secret refs and
new subscriptions don't replay huge backlogs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-28 04:55:20 +00:00
commit a491684861
4 changed files with 132 additions and 6 deletions

View file

@ -22,6 +22,17 @@ from meshai.notifications.events import Event, make_event
logger = logging.getLogger("meshai.central.consumer")
def consumer_config():
"""JetStream consumer config for Central subscriptions.
deliver_policy=NEW: subscribe to messages published AFTER consumer creation.
Avoids replaying the entire retained backlog on first flip (could be 330k+
msgs for high-volume streams like traffic_flow).
"""
from nats.js.api import ConsumerConfig, DeliverPolicy
return ConsumerConfig(deliver_policy=DeliverPolicy.NEW)
# meshai adapter (env-config attr) -> Central subject filters it consumes.
# Adapters with no Central equivalent (avalanche, ducting, roads511) are absent;
# setting source=central on those subscribes to nothing (logged).
@ -229,7 +240,8 @@ class CentralConsumer:
self._js = self._nc.jetstream()
for subj in subjects:
durable = self._central.durable + "-" + re.sub(r"[^a-z0-9]+", "_", subj.lower())
sub = await self._js.subscribe(subj, durable=durable, cb=self._on_message)
sub = await self._js.subscribe(
subj, durable=durable, cb=self._on_message, config=consumer_config())
self._subs.append(sub)
logger.info("CentralConsumer started; %d subjects subscribed: %s",
len(subjects), subjects)

View file

@ -573,16 +573,66 @@ def save_section(
rejected_secrets = []
# Check for secret fields and reject them
# --- secret-ref preservation (v0.4 C.3.1) -------------------------------
# A GUI save round-trips the *interpolated* value of a ${VAR} secret (the
# GET returns the resolved key string). Without this, save_section would
# drop the on-disk ${VAR} placeholder and lose the secret reference. So we
# read the raw on-disk values (pre-interpolation) and, for each secret
# field, decide:
# on-disk ${VAR} and new value == resolved(VAR) -> keep the ${VAR} ref
# on-disk ${VAR} and new value != resolved(VAR) -> intentional change, store it
# no on-disk ${VAR} ref -> reject (never write a raw
# secret to a domain file)
_raw_on_disk = {}
if target_path.exists():
try:
with open(target_path) as _rf:
_raw_on_disk = yaml.safe_load(_rf) or {}
except Exception:
_raw_on_disk = {}
if target_file in ("meshtastic.yaml", "config.yaml") and isinstance(_raw_on_disk, dict):
_raw_section = _raw_on_disk.get(section_name) or {}
else:
_raw_section = _raw_on_disk if isinstance(_raw_on_disk, dict) else {}
_secrets_path = config_dir.parent / "secrets" / ".env"
if not _secrets_path.exists():
_secrets_path = Path("/data/secrets/.env")
_env_file = dotenv_values(_secrets_path) if _secrets_path.exists() else {}
_VAR_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$")
def _resolve_var(name: str):
v = os.environ.get(name)
return v if v is not None else _env_file.get(name)
def _ondisk_ref(field_path: str):
node = _raw_section
for part in field_path.split("."):
if isinstance(node, dict) and part in node:
node = node[part]
else:
return None
return node
def check_secrets(d: dict, path: str = "") -> dict:
cleaned = {}
for key, value in d.items():
field_path = f"{path}.{key}" if path else key
if _is_secret_field(section_name, field_path):
rejected_secrets.append(field_path)
_logger.error(
f"Rejected attempt to save secret field '{section_name}.{field_path}'. "
"Secret fields must be set via /data/secrets/.env"
)
ref = _ondisk_ref(field_path)
m = _VAR_RE.match(ref) if isinstance(ref, str) else None
if m:
if _resolve_var(m.group(1)) == (value if isinstance(value, str) else str(value)):
cleaned[key] = ref # unchanged secret -> preserve ${VAR} placeholder
else:
cleaned[key] = value # intentional change -> store new value
else:
rejected_secrets.append(field_path)
_logger.error(
f"Rejected attempt to save secret field '{section_name}.{field_path}'. "
"Secret fields must be set via /data/secrets/.env"
)
elif isinstance(value, dict):
cleaned[key] = check_secrets(value, field_path)
elif isinstance(value, list):

View file

@ -137,3 +137,10 @@ def test_start_no_op_when_all_native():
c, env, rec = make_consumer()
asyncio.run(c.start()) # must not raise / must not require NATS
assert c._nc is None
def test_consumer_config_uses_deliver_policy_new():
"""C.3.1: Central subscriptions use deliver_policy=NEW (no full-backlog replay)."""
from meshai.central.consumer import consumer_config
from nats.js.api import DeliverPolicy
assert consumer_config().deliver_policy == DeliverPolicy.NEW

View file

@ -0,0 +1,57 @@
"""v0.4 C.3.1: save_section preserves on-disk ${VAR} secret refs instead of
dropping them when a GUI save round-trips the interpolated value."""
import yaml
from meshai.config_loader import save_section
def _setup(tmp_path, env_yaml, dotenv):
cfg = tmp_path / "config"
cfg.mkdir()
sec = tmp_path / "secrets"
sec.mkdir()
(cfg / "env_feeds.yaml").write_text(env_yaml)
(sec / ".env").write_text(dotenv)
return cfg
def test_preserves_unchanged_secret_ref(tmp_path):
# on-disk has ${C31_TEST_KEY}; GUI submits the resolved value -> keep the ref
cfg = _setup(
tmp_path,
"enabled: true\ntraffic:\n enabled: true\n api_key: ${C31_TEST_KEY}\n",
"C31_TEST_KEY=realkey123\n",
)
res = save_section("environmental",
{"enabled": True, "traffic": {"enabled": True, "api_key": "realkey123"}},
cfg)
written = yaml.safe_load((cfg / "env_feeds.yaml").read_text())
assert written["traffic"]["api_key"] == "${C31_TEST_KEY}" # placeholder preserved
assert "traffic.api_key" not in res["rejected_secrets"]
def test_changed_secret_value_is_written(tmp_path):
# on-disk ${C31_TEST_KEY}; GUI submits a DIFFERENT value -> intentional change stored
cfg = _setup(
tmp_path,
"enabled: true\ntraffic:\n enabled: true\n api_key: ${C31_TEST_KEY}\n",
"C31_TEST_KEY=oldkey\n",
)
save_section("environmental",
{"enabled": True, "traffic": {"enabled": True, "api_key": "NEWKEY999"}},
cfg)
written = yaml.safe_load((cfg / "env_feeds.yaml").read_text())
assert written["traffic"]["api_key"] == "NEWKEY999"
def test_no_placeholder_still_rejects(tmp_path):
# no on-disk ${VAR} ref -> a raw secret must be rejected, never written
cfg = tmp_path / "config"
cfg.mkdir()
res = save_section("environmental",
{"enabled": True, "traffic": {"enabled": True, "api_key": "RAWSECRET"}},
cfg)
written = yaml.safe_load((cfg / "env_feeds.yaml").read_text())
assert "api_key" not in written.get("traffic", {})
assert "traffic.api_key" in res["rejected_secrets"]