diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index 1547bb8..ce8003d 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -22,6 +22,17 @@ from meshai.notifications.events import Event, make_event logger = logging.getLogger("meshai.central.consumer") +def consumer_config(): + """JetStream consumer config for Central subscriptions. + + deliver_policy=NEW: subscribe to messages published AFTER consumer creation. + Avoids replaying the entire retained backlog on first flip (could be 330k+ + msgs for high-volume streams like traffic_flow). + """ + from nats.js.api import ConsumerConfig, DeliverPolicy + return ConsumerConfig(deliver_policy=DeliverPolicy.NEW) + + # meshai adapter (env-config attr) -> Central subject filters it consumes. # Adapters with no Central equivalent (avalanche, ducting, roads511) are absent; # setting source=central on those subscribes to nothing (logged). @@ -229,7 +240,8 @@ class CentralConsumer: self._js = self._nc.jetstream() for subj in subjects: durable = self._central.durable + "-" + re.sub(r"[^a-z0-9]+", "_", subj.lower()) - sub = await self._js.subscribe(subj, durable=durable, cb=self._on_message) + sub = await self._js.subscribe( + subj, durable=durable, cb=self._on_message, config=consumer_config()) self._subs.append(sub) logger.info("CentralConsumer started; %d subjects subscribed: %s", len(subjects), subjects) diff --git a/meshai/config_loader.py b/meshai/config_loader.py index 0a5d019..44809ad 100644 --- a/meshai/config_loader.py +++ b/meshai/config_loader.py @@ -573,16 +573,66 @@ def save_section( rejected_secrets = [] # Check for secret fields and reject them + # --- secret-ref preservation (v0.4 C.3.1) ------------------------------- + # A GUI save round-trips the *interpolated* value of a ${VAR} secret (the + # GET returns the resolved key string). Without this, save_section would + # drop the on-disk ${VAR} placeholder and lose the secret reference. So we + # read the raw on-disk values (pre-interpolation) and, for each secret + # field, decide: + # on-disk ${VAR} and new value == resolved(VAR) -> keep the ${VAR} ref + # on-disk ${VAR} and new value != resolved(VAR) -> intentional change, store it + # no on-disk ${VAR} ref -> reject (never write a raw + # secret to a domain file) + _raw_on_disk = {} + if target_path.exists(): + try: + with open(target_path) as _rf: + _raw_on_disk = yaml.safe_load(_rf) or {} + except Exception: + _raw_on_disk = {} + if target_file in ("meshtastic.yaml", "config.yaml") and isinstance(_raw_on_disk, dict): + _raw_section = _raw_on_disk.get(section_name) or {} + else: + _raw_section = _raw_on_disk if isinstance(_raw_on_disk, dict) else {} + + _secrets_path = config_dir.parent / "secrets" / ".env" + if not _secrets_path.exists(): + _secrets_path = Path("/data/secrets/.env") + _env_file = dotenv_values(_secrets_path) if _secrets_path.exists() else {} + + _VAR_RE = re.compile(r"^\$\{([A-Za-z_][A-Za-z0-9_]*)\}$") + + def _resolve_var(name: str): + v = os.environ.get(name) + return v if v is not None else _env_file.get(name) + + def _ondisk_ref(field_path: str): + node = _raw_section + for part in field_path.split("."): + if isinstance(node, dict) and part in node: + node = node[part] + else: + return None + return node + def check_secrets(d: dict, path: str = "") -> dict: cleaned = {} for key, value in d.items(): field_path = f"{path}.{key}" if path else key if _is_secret_field(section_name, field_path): - rejected_secrets.append(field_path) - _logger.error( - f"Rejected attempt to save secret field '{section_name}.{field_path}'. " - "Secret fields must be set via /data/secrets/.env" - ) + ref = _ondisk_ref(field_path) + m = _VAR_RE.match(ref) if isinstance(ref, str) else None + if m: + if _resolve_var(m.group(1)) == (value if isinstance(value, str) else str(value)): + cleaned[key] = ref # unchanged secret -> preserve ${VAR} placeholder + else: + cleaned[key] = value # intentional change -> store new value + else: + rejected_secrets.append(field_path) + _logger.error( + f"Rejected attempt to save secret field '{section_name}.{field_path}'. " + "Secret fields must be set via /data/secrets/.env" + ) elif isinstance(value, dict): cleaned[key] = check_secrets(value, field_path) elif isinstance(value, list): diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py index 6295553..a3c716a 100644 --- a/tests/test_central_consumer.py +++ b/tests/test_central_consumer.py @@ -137,3 +137,10 @@ def test_start_no_op_when_all_native(): c, env, rec = make_consumer() asyncio.run(c.start()) # must not raise / must not require NATS assert c._nc is None + + +def test_consumer_config_uses_deliver_policy_new(): + """C.3.1: Central subscriptions use deliver_policy=NEW (no full-backlog replay).""" + from meshai.central.consumer import consumer_config + from nats.js.api import DeliverPolicy + assert consumer_config().deliver_policy == DeliverPolicy.NEW diff --git a/tests/test_save_section_secret_preserve.py b/tests/test_save_section_secret_preserve.py new file mode 100644 index 0000000..50adfcd --- /dev/null +++ b/tests/test_save_section_secret_preserve.py @@ -0,0 +1,57 @@ +"""v0.4 C.3.1: save_section preserves on-disk ${VAR} secret refs instead of +dropping them when a GUI save round-trips the interpolated value.""" + +import yaml + +from meshai.config_loader import save_section + + +def _setup(tmp_path, env_yaml, dotenv): + cfg = tmp_path / "config" + cfg.mkdir() + sec = tmp_path / "secrets" + sec.mkdir() + (cfg / "env_feeds.yaml").write_text(env_yaml) + (sec / ".env").write_text(dotenv) + return cfg + + +def test_preserves_unchanged_secret_ref(tmp_path): + # on-disk has ${C31_TEST_KEY}; GUI submits the resolved value -> keep the ref + cfg = _setup( + tmp_path, + "enabled: true\ntraffic:\n enabled: true\n api_key: ${C31_TEST_KEY}\n", + "C31_TEST_KEY=realkey123\n", + ) + res = save_section("environmental", + {"enabled": True, "traffic": {"enabled": True, "api_key": "realkey123"}}, + cfg) + written = yaml.safe_load((cfg / "env_feeds.yaml").read_text()) + assert written["traffic"]["api_key"] == "${C31_TEST_KEY}" # placeholder preserved + assert "traffic.api_key" not in res["rejected_secrets"] + + +def test_changed_secret_value_is_written(tmp_path): + # on-disk ${C31_TEST_KEY}; GUI submits a DIFFERENT value -> intentional change stored + cfg = _setup( + tmp_path, + "enabled: true\ntraffic:\n enabled: true\n api_key: ${C31_TEST_KEY}\n", + "C31_TEST_KEY=oldkey\n", + ) + save_section("environmental", + {"enabled": True, "traffic": {"enabled": True, "api_key": "NEWKEY999"}}, + cfg) + written = yaml.safe_load((cfg / "env_feeds.yaml").read_text()) + assert written["traffic"]["api_key"] == "NEWKEY999" + + +def test_no_placeholder_still_rejects(tmp_path): + # no on-disk ${VAR} ref -> a raw secret must be rejected, never written + cfg = tmp_path / "config" + cfg.mkdir() + res = save_section("environmental", + {"enabled": True, "traffic": {"enabled": True, "api_key": "RAWSECRET"}}, + cfg) + written = yaml.safe_load((cfg / "env_feeds.yaml").read_text()) + assert "api_key" not in written.get("traffic", {}) + assert "traffic.api_key" in res["rejected_secrets"]