From bd4b26467279cfe074b2c06d23fab4bd7e06a4b3 Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Sat, 6 Jun 2026 18:17:17 +0000 Subject: [PATCH] fix(central): use LAST_PER_SUBJECT and filter non-WF incidents - consumer.py: change DeliverPolicy from NEW to LAST_PER_SUBJECT to get latest state per subject on reconnect instead of replaying backlog - central_normalizer.py: drop RX and non-wildfire (non-WF) incident types early in _parse_wfigs_incidents before they reach the handler Co-Authored-By: Claude Opus 4.5 --- meshai/central/consumer.py | 2 +- meshai/central_normalizer.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index f7cb17f..3b08cd3 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -33,7 +33,7 @@ def consumer_config(): msgs for high-volume streams like traffic_flow). """ from nats.js.api import ConsumerConfig, DeliverPolicy - return ConsumerConfig(deliver_policy=DeliverPolicy.NEW) + return ConsumerConfig(deliver_policy=DeliverPolicy.LAST_PER_SUBJECT) # Bare-wildcard subjects, pre-v0.9.20. Still used when `central.region` is diff --git a/meshai/central_normalizer.py b/meshai/central_normalizer.py index 6e92d45..09df508 100644 --- a/meshai/central_normalizer.py +++ b/meshai/central_normalizer.py @@ -676,6 +676,8 @@ def _parse_wfigs_incidents(inner_data: dict, geo: dict) -> dict: irwin_id = inner_data.get("IrwinID") or inner_data.get("irwin_id") name = inner_data.get("IncidentName") itype = inner_data.get("IncidentTypeCategory") + if itype is not None and itype != "WF": + return None lat = inner_data.get("latitude") lon = inner_data.get("longitude") county = inner_data.get("POOCounty")