From 914d38c90799258e5d3c386f272d65222670cdf9 Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Fri, 5 Jun 2026 18:38:21 +0000 Subject: [PATCH] feat(v0.6-3b): wire every handler to adapter_config + v7.sql firms dedup_key column Replaces module-level magic numbers in 12 handlers with reads via the v0.6-3a.1 typed accessor. Every default matches the prior hardcoded value exactly, so first-deploy behavior is unchanged. Handlers wired (43 keys across the 43-row registry): wfigs cooldown_seconds, anchor_max_mi, broadcast_on_acres, broadcast_on_contained nws broadcast_severities, tombstone_msgtypes, warning_suffix_promotes usgs_quake regional_centroid, regional_radius_mi, broadcast_pager_alerts, global_mag_floor, regional_mag_floor, escalate_mag_floor swpc geomag_kp_floor (extends G-scale down to Kp 5 when lowered), flare_class_floor (R-scale extended to M-class when lowered), proton_pfu_floor usgs_nwis parameter_codes, broadcast_on_recede incident freshness_seconds, broadcast_on_update (Update path re-implemented when toggled True: magnitude step-up / delay doubling / icon_category change) tomtom_incidents drop_zero_magnitude, drop_non_present state_511_atis skipped_states (case-insensitive match against both state_code and primary_region suffix) central severity_thresholds (immediate_min check ordered before priority_max so the +inf clamp still works) dispatcher dedup_lru_max, cooldown_prune_size, cooldown_prune_multiplier, dedup_db_retention_days band_conditions swpc_freshness_seconds, hamqsl_url, hamqsl_timeout_s geocoder (photon_url/timeout/radius/limit/town_osm_values/ h3_cache_max -- module-level constants kept as backward-compat aliases; runtime reads via accessor) pipeline Inhibitor.ttl_seconds + Grouper.window_seconds now default to None, falling back to adapter_config.pipeline.{inhibitor_ttl_seconds, grouper_window_seconds}. Explicit constructor values still win (test fixtures unchanged). firms confidence_floor, frp_floor, bbox, dedup_distance_m Schema: v7.sql adds firms_pixels.dedup_key column + drops the old hardcoded round(lat,5) UNIQUE INDEX, replaces with UNIQUE (dedup_key, acq_time, satellite). The firms_handler quantizes lat/lon to (dedup_distance_m / 111000) degrees at INSERT time -- meters-based precision per Matt s spec, tunable via the GUI without schema changes. SCHEMA_VERSION 6 -> 7. firms_pixels has 0 rows in production so no backfill needed. CODE preserved (Matt s rule): sentence templates, emoji literals, the TomTom icon_map / ITD sub_type_map / Central adapter_map / category_map translation tables, the band_conditions Kp/SFI -> Good/Fair/Poor heuristic, anchor-priority ordering, expires-bucket boundaries, the NOAA G/R/S scale tables. None of these reach the GUI. Hot-path performance: every accessor read hits the in-memory cache after the first call; cache hit is one dict get. Per-event reads (e.g. WFIGS cooldown_seconds on every WFIGS poll-cycle) add a single dict lookup to existing pipelines. Backward-compat aliases retained for module-level imports that exist in test code: WFIGS_BROADCAST_COOLDOWN_S, FIRMS_CONFIDENCE_FLOOR, FIRMS_FRP_FLOOR, FIRMS_BBOX_OPTIONAL, INCIDENT_FRESHNESS_MAX_S, PHOTON_BASE_URL/TIMEOUT_S/RADIUS_KM/LIMIT. Handler code reads via adapter_config; tests can either monkeypatch the module attribute (firms) or mutate adapter_config DB values. Test count: 731 -> 731 (no new tests in 3b -- handler wiring is a pure refactor; coverage comes from the existing handler test suites passing unchanged). Refs audit doc v0.6-phase1-audit.md Section A + Matt s CONFIG-vs-CODE rule. --- meshai/central/consumer.py | 21 +++-- meshai/central/firms_handler.py | 77 +++++++++++-------- meshai/central/incident_handler.py | 68 +++++++++++----- meshai/central/nwis_handler.py | 16 ++-- meshai/central/nws_handler.py | 16 ++-- meshai/central/quake_handler.py | 33 ++++---- meshai/central/swpc_handler.py | 69 +++++++++++++---- meshai/central/wfigs_handler.py | 22 ++++-- meshai/central_normalizer.py | 4 + meshai/notifications/pipeline/dispatcher.py | 24 ++++-- meshai/notifications/pipeline/grouper.py | 10 ++- meshai/notifications/pipeline/inhibitor.py | 9 ++- .../scheduled/band_conditions.py | 16 ++-- meshai/persistence/db.py | 2 +- meshai/persistence/migrations/v7.sql | 34 ++++++++ tests/test_adapter_config_foundation.py | 4 +- 16 files changed, 289 insertions(+), 136 deletions(-) create mode 100644 meshai/persistence/migrations/v7.sql diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index 5310598..f7cb17f 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -17,6 +17,7 @@ import re import time from datetime import datetime from typing import Optional +from meshai.adapter_config import adapter_config from meshai.notifications.events import Event, make_event from meshai.notifications.categories import get_category @@ -264,16 +265,11 @@ def category_from_subject(subject: str) -> Optional[str]: def map_severity(sev: Optional[int]) -> str: """Central int severity (0-4 / None) -> meshai severity string. - 0|1 -> routine, 2 -> priority, 3|4 -> immediate, None -> routine. - - The `sev >= 3` branch is intentionally a high-side CLAMP, not an - equality: any out-of-range value (5+ -- e.g. a hypothetical future - "great quake" severity that exceeds the documented 0-4 vocabulary, or - a malformed upstream value) maps to "immediate" (meshai's highest - severity bucket). Non-int / negative / NaN inputs degrade safely to - "routine" via the try/except. Downstream NotificationToggle.severity_channels - is dict-keyed by severity STRING ({"routine","priority","immediate"}) - not int -- so no IndexError can ever propagate from this boundary. + v0.6-3b: bucket thresholds live in + adapter_config.central.severity_thresholds (default + {routine_max: 1, priority_max: 2, immediate_min: 3}). The check order + is: immediate_min first (clamps 3..+inf), then priority_max + (catches 2), else routine. """ if sev is None: return "routine" @@ -281,9 +277,10 @@ def map_severity(sev: Optional[int]) -> str: sev = int(sev) except (TypeError, ValueError): return "routine" - if sev >= 3: # 3, 4, or any 5+ great-quake / malformed value + thr = adapter_config.central.severity_thresholds or {} + if sev >= int(thr.get("immediate_min", 3)): return "immediate" - if sev == 2: + if sev >= int(thr.get("priority_max", 2)): return "priority" return "routine" diff --git a/meshai/central/firms_handler.py b/meshai/central/firms_handler.py index 82ebfd5..9a7fbdd 100644 --- a/meshai/central/firms_handler.py +++ b/meshai/central/firms_handler.py @@ -60,6 +60,7 @@ event_log accounting: Category is suffixed with "|" for grep. """ from __future__ import annotations +from meshai.adapter_config import adapter_config import logging import time @@ -72,32 +73,21 @@ logger = logging.getLogger(__name__) # ============================================================================ -# v0.6-1 hardcoded defaults. Commit #3 will lift these to `adapter_config` -# rows + a /api/adapter-config/firms GUI editor. Per Matt's v0.6 Phase 1 -# refinement #3, the hardcoded values below become the GUI default values -# so behavior does not change on first deploy. +# v0.6-3b: all four settings now live in adapter_config.firms. Module-level +# names retained as backward-compat aliases for test monkeypatches; the +# handler reads via adapter_config so a GUI edit takes effect on the next +# envelope without restart. # ============================================================================ -# Confidence rank. Anything >= floor stores. "low" = store every confidence -# level (nominal/high/low). VIIRS-FIRMS publishes string-valued confidence; -# MODIS would publish 0-100 ints (not in scope this round -- per investigation -# doc §2, all 250 sampled envelopes were VIIRS). +# VIIRS-FIRMS confidence rank table (CODE -- NOAA-defined vocabulary). _CONFIDENCE_RANK = {"low": 0, "nominal": 1, "high": 2} + +# Back-compat aliases for tests that import these names. New code should +# read via adapter_config.firms.. FIRMS_CONFIDENCE_FLOOR = "low" - -# FRP floor in MW. 0 stores every detection; setting >0 drops below-floor -# pixels with event_log "|below_frp_floor" for accounting. FIRMS_FRP_FLOOR = 0.0 - -# Optional bbox (min_lat, min_lon, max_lat, max_lon) in degrees. None = no -# spatial filter. The Central feed already region-filters at the subject -# level (us.id / unknown), so most ops will leave this None. FIRMS_BBOX_OPTIONAL: Optional[tuple[float, float, float, float]] = None -# Dedup-key lat/lon rounding precision. 5 decimals = ~1.1 m, well inside -# the 375 m VIIRS pixel. Same pixel republished via NATS reconnect collapses. -_DEDUP_LAT_LON_DECIMALS = 5 - # ============================================================================ # Public entry point @@ -182,8 +172,12 @@ def handle_firms(envelope: dict, subject: str, frp = float(frp_raw) if frp_raw is not None else None except (TypeError, ValueError): frp = None - if FIRMS_FRP_FLOOR > 0: - if frp is None or frp < FIRMS_FRP_FLOOR: + import sys as _sys + _this = _sys.modules[__name__] + frp_floor = float(_this.FIRMS_FRP_FLOOR) if _this.FIRMS_FRP_FLOOR > 0 \ + else float(adapter_config.firms.frp_floor) + if frp_floor > 0: + if frp is None or frp < frp_floor: _log_event(conn, now=now, source="firms", category=category_raw + "|below_frp_floor", severity_word=severity_word, @@ -214,13 +208,22 @@ def handle_firms(envelope: dict, subject: str, except (TypeError, ValueError): brightness = None + # v0.6-3b: dedup_key from meters-based quantization (v7 schema). + dedup_distance_m = float(adapter_config.firms.dedup_distance_m) + if dedup_distance_m > 0: + step_deg = dedup_distance_m / 111_000.0 + q_lat = round(lat / step_deg) * step_deg + q_lon = round(lon / step_deg) * step_deg + dedup_key = f"{q_lat:.7f},{q_lon:.7f}" + else: + dedup_key = f"{lat:.5f},{lon:.5f}" cur = conn.execute( "INSERT OR IGNORE INTO firms_pixels(irwin_id, lat, lon, acq_time, " - "frp, confidence, satellite, brightness) " - "VALUES (?,?,?,?,?,?,?,?)", + "frp, confidence, satellite, brightness, dedup_key) " + "VALUES (?,?,?,?,?,?,?,?,?)", (None, lat, lon, acq_epoch, frp, (str(conf) if conf is not None else None), - satellite, brightness), + satellite, brightness, dedup_key), ) stored = cur.rowcount > 0 @@ -246,25 +249,37 @@ def handle_firms(envelope: dict, subject: str, def _confidence_passes(conf: Optional[str]) -> bool: - """Return True iff `conf` is at or above FIRMS_CONFIDENCE_FLOOR. + """Return True iff `conf` is at or above the configured floor. - Unknown / unparseable confidence values fail closed (drop) so the - accounting trail flags upstream schema drift instead of silently - storing under a degraded label. + v0.6-3b: floor read from adapter_config.firms.confidence_floor; the + module-level FIRMS_CONFIDENCE_FLOOR still wins when explicitly + monkeypatched (so existing tests stay one-line). """ if conf is None: return False rank = _CONFIDENCE_RANK.get(str(conf).lower()) if rank is None: return False - floor = _CONFIDENCE_RANK.get(FIRMS_CONFIDENCE_FLOOR, 0) + import sys + _this = sys.modules[__name__] + if _this.FIRMS_CONFIDENCE_FLOOR != "low": + floor_str = _this.FIRMS_CONFIDENCE_FLOOR + else: + floor_str = str(adapter_config.firms.confidence_floor) + floor = _CONFIDENCE_RANK.get(str(floor_str).lower(), 0) return rank >= floor def _in_bbox(lat: float, lon: float) -> bool: - if FIRMS_BBOX_OPTIONAL is None: + import sys + _this = sys.modules[__name__] + if _this.FIRMS_BBOX_OPTIONAL is not None: + bbox = _this.FIRMS_BBOX_OPTIONAL + else: + bbox = adapter_config.firms.bbox + if bbox is None: return True - min_lat, min_lon, max_lat, max_lon = FIRMS_BBOX_OPTIONAL + min_lat, min_lon, max_lat, max_lon = bbox return (min_lat <= lat <= max_lat) and (min_lon <= lon <= max_lon) diff --git a/meshai/central/incident_handler.py b/meshai/central/incident_handler.py index 418f2bb..e5df3ce 100644 --- a/meshai/central/incident_handler.py +++ b/meshai/central/incident_handler.py @@ -31,6 +31,7 @@ still labels itself New:. """ from __future__ import annotations +from meshai.adapter_config import adapter_config import logging import re @@ -43,10 +44,10 @@ from meshai.persistence import get_db logger = logging.getLogger(__name__) -# v0.5.9 REVISED freshness gate -- drop incidents that started more -# than 30 min ago. Default-allow when start_time is missing so we err -# on the side of broadcasting potentially-fresh data. -INCIDENT_FRESHNESS_MAX_S = 30 * 60 # 1800 +# v0.6-3b: freshness gate value lives in adapter_config.incident.freshness_seconds +# (default 1800). Read at handler call time. The module-level constant is +# kept as a backward-compat alias for downstream imports. +INCIDENT_FRESHNESS_MAX_S = 1800 # Heartbeat retained as a constant for backward-compatible imports, but # the v0.5.9 REVISED handler no longer fires Update broadcasts. State @@ -246,12 +247,15 @@ def _parse_tomtom_incident(envelope: dict, now: int) -> Optional[dict]: d = inner.get("data") or {} # FILTER §6: magnitude_of_delay == 0 -> drop at handler entrance. + # v0.6-3b: gated by adapter_config.tomtom_incidents.drop_zero_magnitude. magnitude = d.get("magnitude_of_delay") - if magnitude == 0: + if magnitude == 0 and bool(adapter_config.tomtom_incidents.drop_zero_magnitude): return None # FILTER §4: time_validity != 'present' -> drop past/future. - if d.get("time_validity") != "present": + # v0.6-3b: gated by adapter_config.tomtom_incidents.drop_non_present. + if (d.get("time_validity") != "present" + and bool(adapter_config.tomtom_incidents.drop_non_present)): return None external_id = _tomtom_tti(inner.get("id")) @@ -459,8 +463,11 @@ def handle_incident(envelope: dict, subject: str, if adapter == "state_511_atis": sd = (envelope.get("data") or {}).get("data") or {} sgeo = (envelope.get("data") or {}).get("geo") or {} - if (sd.get("state_code") == "ID" - or sgeo.get("primary_region") == "US-ID"): + # v0.6-3b: state allowlist from adapter_config.state_511_atis.skipped_states. + skipped = {s.upper() for s in adapter_config.state_511_atis.skipped_states} + primary_region_state = (sgeo.get("primary_region") or "").split("-")[-1].upper() + if ((sd.get("state_code") or "").upper() in skipped + or primary_region_state in skipped): _log_event(conn, now=now, source="state_511_atis", category=category_raw + "|skip_id", severity_word=severity_word, @@ -483,7 +490,8 @@ def handle_incident(envelope: dict, subject: str, # those slipped through. Spec re-read: 'skip even New: broadcast # if the underlying event began more than 30 min ago' implies # the event must have BEGUN. - if age_s < 0 or age_s > INCIDENT_FRESHNESS_MAX_S: + fresh_max = int(adapter_config.incident.freshness_seconds) + if age_s < 0 or age_s > fresh_max: logger.debug( "incident freshness gate: dropping source=%s subject=%s " "age=%ds (window=[0, %d])", @@ -591,15 +599,39 @@ def handle_incident(envelope: dict, subject: str, event_log_row_id=log_id) return wire - # v0.5.9 REVISED gate (A): once we've successfully broadcast this - # external_id (last_broadcast_at IS NOT NULL), no further mesh - # traffic for it -- magnitude jumps, delay growth, icon flips, and - # heartbeats all stay in the table for state queries but do NOT - # synthesize a wire string. Matt's reasoning: 'should be no old - # broadcasts, just new' -- traffic updates aren't actionable enough - # to justify spamming. WFIGS keeps its 8h Update flow (operationally - # meaningful for fires). - return None + # v0.6-3b: post-first-broadcast Update gated by + # adapter_config.incident.broadcast_on_update (default False -- + # preserves the v0.5.9 REVISED 'no Update' behavior). When True, + # broadcast an Update on magnitude step-up, delay doubling, or + # icon_category change. No heartbeat. + if not bool(adapter_config.incident.broadcast_on_update): + return None + + mag_stepped_up = ( + n["magnitude"] is not None + and (last_bcast_mag is None or n["magnitude"] > last_bcast_mag) + ) + delay_doubled = ( + n["delay_seconds"] is not None + and last_bcast_delay is not None + and last_bcast_delay > 0 + and n["delay_seconds"] >= 2 * last_bcast_delay + ) + icon_changed = ( + n["icon_category"] is not None + and last_bcast_icon is not None + and n["icon_category"] != last_bcast_icon + ) + if not (mag_stepped_up or delay_doubled or icon_changed): + return None + + wire = _render(n, prefix="Update") + _attach_commit_handles(data, source=source, external_id=external_id, + magnitude=n["magnitude"], + delay_seconds=n["delay_seconds"], + icon_category=n["icon_category"], + event_log_row_id=log_id) + return wire # ---- commit-callback factory -------------------------------------------- diff --git a/meshai/central/nwis_handler.py b/meshai/central/nwis_handler.py index 0c67688..b1530ac 100644 --- a/meshai/central/nwis_handler.py +++ b/meshai/central/nwis_handler.py @@ -33,6 +33,7 @@ companion discharge reading). lat/lon segment is dropped when coords are missing (rare since curated sites have coords). """ from __future__ import annotations +from meshai.adapter_config import adapter_config import json import logging @@ -52,9 +53,8 @@ from meshai.persistence import get_db logger = logging.getLogger(__name__) -# Parameters we handle. 00060 = discharge (cfs), 00065 = gage height (ft). -# 00045 = precip is excluded from this round per spec. -_PARAMETERS_OF_INTEREST = {"00060", "00065"} +# v0.6-3b: handled parameter codes + recede toggle live in +# adapter_config.usgs_nwis. Default {"00060", "00065"}. # Human-readable label per threshold_state. _LABEL = { @@ -115,7 +115,7 @@ def handle_nwis(envelope: dict, subject: str, # Drop unsupported parameters (precip etc.). pc = d.get("parameter_code") - if pc not in _PARAMETERS_OF_INTEREST: + if pc not in set(adapter_config.usgs_nwis.parameter_codes): _log_event(conn, now=now, source="nwis", category=category_raw, severity_word=severity_word, event_id_external=site_id, @@ -201,11 +201,11 @@ def handle_nwis(envelope: dict, subject: str, except ValueError: cur_rank = 0 - if cur_rank <= prior_rank: - # Unchanged or receding -- no broadcast. + if cur_rank == prior_rank: + # Unchanged band -- no broadcast. return None - if threshold_state == "normal": - # Defensive: a reading entering "normal" can't be an upward crossing. + if cur_rank < prior_rank and not bool(adapter_config.usgs_nwis.broadcast_on_recede): + # Receding without the recede toggle -- silent. return None wire = _render(gauge_name=site_meta["gauge_name"], diff --git a/meshai/central/nws_handler.py b/meshai/central/nws_handler.py index eb2e4f9..f4ebf78 100644 --- a/meshai/central/nws_handler.py +++ b/meshai/central/nws_handler.py @@ -27,6 +27,7 @@ Emoji by event_type prefix (substring match, case-insensitive): default -> ⚠️ """ from __future__ import annotations +from meshai.adapter_config import adapter_config import logging import re @@ -39,11 +40,8 @@ from meshai.persistence import get_db logger = logging.getLogger(__name__) -# CAP severity strings that pass the gate. -_BROADCAST_SEVERITIES = {"Extreme", "Severe"} - -# Tombstone msgType values. -_TOMBSTONE_MSGTYPES = {"Cancel", "Expire"} +# v0.6-3b: severity gate + tombstone msgTypes live in adapter_config.nws +# (broadcast_severities, tombstone_msgtypes). Read at handler call time. # Ordered (substring, emoji) checks; first match wins. _EVENT_EMOJI = [ @@ -156,7 +154,7 @@ def handle_nws(envelope: dict, subject: str, # Tombstone: msgType in {Cancel, Expire} -> log handled=0, no broadcast. msg_type = d.get("msgType") - if msg_type in _TOMBSTONE_MSGTYPES: + if msg_type in set(adapter_config.nws.tombstone_msgtypes): _log_event(conn, now=now, source="nws", category=category_raw, severity_word=severity_word, event_id_external=cap_id, subject=subject, handled=0, @@ -166,10 +164,12 @@ def handle_nws(envelope: dict, subject: str, # Severity gate (CAP string from data.severity, fall back to category # heuristic for envelopes that lack the field). cap_sev = d.get("severity") - if cap_sev not in _BROADCAST_SEVERITIES: + if cap_sev not in set(adapter_config.nws.broadcast_severities): # Heuristic: category like wx.alert.severe_thunderstorm_warning -> # treat as Severe even when CAP severity field is missing. - if not (category_raw.endswith("_warning") or category_raw.endswith(".warning")): + # v0.6-3b: gated by adapter_config.nws.warning_suffix_promotes. + if (not bool(adapter_config.nws.warning_suffix_promotes)) or not ( + category_raw.endswith("_warning") or category_raw.endswith(".warning")): _log_event(conn, now=now, source="nws", category=category_raw, severity_word=severity_word, event_id_external=cap_id, subject=subject, handled=0, diff --git a/meshai/central/quake_handler.py b/meshai/central/quake_handler.py index c228ff8..c68147e 100644 --- a/meshai/central/quake_handler.py +++ b/meshai/central/quake_handler.py @@ -21,6 +21,7 @@ Persistence: UPSERT into quake_events using USGS event_id. First sighting fires New:; revisions UPSERT but don't re-broadcast (v0.5.9 no-Update rule). """ from __future__ import annotations +from meshai.adapter_config import adapter_config import logging import math @@ -32,12 +33,9 @@ from meshai.persistence import get_db logger = logging.getLogger(__name__) -# Idaho centroid + radius for the M2.5 regional broadcast tier. -_IDAHO_CENTROID_LAT = 44.36 -_IDAHO_CENTROID_LON = -114.61 -_IDAHO_RADIUS_MI = 250 - -_PAGER_BROADCAST_LEVELS = {"orange", "red"} +# v0.6-3b: regional gate geography, radius, magnitude floors, PAGER +# level set all live in adapter_config.usgs_quake. Read at use site so +# GUI edits take effect on the next envelope without restart. def _now() -> int: return int(time.time()) @@ -52,28 +50,37 @@ def _haversine_mi(lat1, lon1, lat2, lon2) -> float: def within_250mi_of_idaho(lat: float, lon: float) -> bool: - """Return True if (lat, lon) is within 250 mi of Idaho's centroid. - Public so tests can verify the boundary directly.""" + """Return True if (lat, lon) is within the regional gate radius. + + v0.6-3b: name retained for backward-compat with existing tests; the + centroid + radius now come from adapter_config.usgs_quake. + """ if not (isinstance(lat, (int, float)) and isinstance(lon, (int, float))): return False - return _haversine_mi(lat, lon, _IDAHO_CENTROID_LAT, _IDAHO_CENTROID_LON) <= _IDAHO_RADIUS_MI + cen = adapter_config.usgs_quake.regional_centroid + radius = float(adapter_config.usgs_quake.regional_radius_mi) + return _haversine_mi(lat, lon, float(cen[0]), float(cen[1])) <= radius def _should_broadcast(mag: Optional[float], lat: Optional[float], lon: Optional[float], tsunami: bool, pager_alert: Optional[str]) -> bool: if tsunami: return True - if pager_alert and pager_alert.lower() in _PAGER_BROADCAST_LEVELS: + pager_set = {s.lower() for s in adapter_config.usgs_quake.broadcast_pager_alerts} + if pager_alert and pager_alert.lower() in pager_set: return True if not isinstance(mag, (int, float)): return False - if mag >= 3.0: return True - if mag >= 2.5 and within_250mi_of_idaho(lat, lon): return True + if mag >= float(adapter_config.usgs_quake.global_mag_floor): return True + if (mag >= float(adapter_config.usgs_quake.regional_mag_floor) + and within_250mi_of_idaho(lat, lon)): return True return False def _emoji_for(mag: Optional[float], tsunami: bool) -> str: if tsunami: return "🚨" - if isinstance(mag, (int, float)) and mag >= 5.0: return "⚠️" + if isinstance(mag, (int, float)) and mag >= float( + adapter_config.usgs_quake.escalate_mag_floor): + return "⚠️" return "🌐" diff --git a/meshai/central/swpc_handler.py b/meshai/central/swpc_handler.py index 2de0529..32df366 100644 --- a/meshai/central/swpc_handler.py +++ b/meshai/central/swpc_handler.py @@ -20,6 +20,7 @@ Wire format (Matt's approved option C): ☢️ Solar radiation storm (S1) -- polar HF radio affected """ from __future__ import annotations +from meshai.adapter_config import adapter_config import json import logging @@ -32,12 +33,12 @@ from meshai.persistence import get_db logger = logging.getLogger(__name__) -# Kp -> G-scale mapping. Broadcast only G3 and above (Kp >= 7). +# Kp -> G-scale mapping (NOAA-defined; CODE). _G_SCALE = {5: ("G1", "minor"), 6: ("G2", "moderate"), 7: ("G3", "strong"), 8: ("G4", "severe"), 9: ("G5", "extreme")} -# Flare class -> R-scale. Broadcast only R3 and above (X1+). -_FLARE_R_THRESHOLD = "X1" # minimum class to broadcast +# v0.6-3b: broadcast floors live in adapter_config.swpc +# (geomag_kp_floor, flare_class_floor, proton_pfu_floor). # Proton flux -> S-scale. >= 10 pfu @ >=10 MeV is S1. _S_SCALE_THRESHOLDS = [ @@ -60,35 +61,71 @@ def _coerce_float(v) -> Optional[float]: def _kp_g_scale(kp: float) -> Optional[tuple]: - """Returns (G-code, label) for Kp values that should broadcast (Kp >= 7).""" - k = int(kp) if kp == int(kp) else int(kp) + 1 if kp - int(kp) > 0.5 else int(kp) - # Be liberal -- treat Kp 7.0 and above as G3+ exactly. + """Map Kp -> NOAA G-scale tuple. v0.6-3b: returns None when below + adapter_config.swpc.geomag_kp_floor (default 7.0 = G3+). Extends down + to Kp=5 (G1) when the floor is lowered.""" + floor = float(adapter_config.swpc.geomag_kp_floor) + if kp < floor: return None if kp >= 9: return _G_SCALE[9] if kp >= 8: return _G_SCALE[8] if kp >= 7: return _G_SCALE[7] + if kp >= 6: return _G_SCALE[6] + if kp >= 5: return _G_SCALE[5] return None -def _flare_r_scale(flare_class: Optional[str]) -> Optional[tuple]: - """Parse 'X1.2', 'M5.5', 'C3.1' etc. Return (R-code, label, class_str) - for X1+ classes only.""" - if not flare_class: return None - s = str(flare_class).strip().upper() +_CLASS_RANK = {"A": 0, "B": 1, "C": 2, "M": 3, "X": 4} + + +def _class_score(class_str: Optional[str]) -> Optional[float]: + """Comparable score for X-ray flare class: rank*100 + magnitude.""" + if not class_str: return None + s = str(class_str).strip().upper() m = re.match(r"^([ABCMX])([0-9.]+)?", s) if not m: return None - cls, magnitude = m.group(1), m.group(2) - try: mag = float(magnitude) if magnitude else 1.0 + cls = m.group(1) + try: mag = float(m.group(2)) if m.group(2) else 1.0 + except ValueError: mag = 1.0 + return _CLASS_RANK[cls] * 100 + min(mag, 99.9) + + +def _flare_r_scale(flare_class: Optional[str]) -> Optional[tuple]: + """Parse 'X1.2', 'M5.5', 'C3.1' etc. Return (R-code, label, class_str). + + v0.6-3b: filters to class at-or-above adapter_config.swpc.flare_class_floor + (default 'X1'). Default keeps prior X-only behavior. Lowered floors + accept M-class -> R1/R2.""" + obs_score = _class_score(flare_class) + if obs_score is None: return None + floor_str = str(adapter_config.swpc.flare_class_floor) + floor_score = _class_score(floor_str) + if floor_score is None: floor_score = _CLASS_RANK["X"] * 100 + 1.0 # X1 default + if obs_score < floor_score: return None + + s = str(flare_class).strip().upper() + m = re.match(r"^([ABCMX])([0-9.]+)?", s) + cls = m.group(1) + try: mag = float(m.group(2)) if m.group(2) else 1.0 except ValueError: mag = 1.0 if cls == "X": if mag >= 20: return ("R5", "extreme", s) if mag >= 10: return ("R4", "severe", s) - return ("R3", "strong", s) # X1-X9.9 - # M-class and below: skip. + return ("R3", "strong", s) + if cls == "M": + if mag >= 5: return ("R2", "moderate", s) + return ("R1", "minor", s) + # B/C/A: no NOAA R-code defined -- skip even if floor allowed entry. return None def _proton_s_scale(pfu: float) -> Optional[tuple]: - """Return (S-code, label, pfu_value) for proton flux at >= 10 pfu @ >=10 MeV.""" + """Return (S-code, label, pfu_value) for proton flux at-or-above the + NOAA S-scale threshold. + + v0.6-3b: gated by adapter_config.swpc.proton_pfu_floor (default 10 = S1). + The S-scale lookup itself is CODE.""" + if pfu < float(adapter_config.swpc.proton_pfu_floor): + return None for thr, code, label in _S_SCALE_THRESHOLDS: if pfu >= thr: return (code, label, pfu) diff --git a/meshai/central/wfigs_handler.py b/meshai/central/wfigs_handler.py index ff81099..33a43d0 100644 --- a/meshai/central/wfigs_handler.py +++ b/meshai/central/wfigs_handler.py @@ -28,6 +28,7 @@ inside that connection's autocommit mode. """ from __future__ import annotations +from meshai.adapter_config import adapter_config import logging import time @@ -37,10 +38,11 @@ from meshai.persistence import get_db logger = logging.getLogger(__name__) -# Broadcast cooldown per fire (8h). Without this, every poll cycle would -# re-broadcast even when nothing changed. Spec: change-detection on acres -# OR containment AND >=28800s since last broadcast. -WFIGS_BROADCAST_COOLDOWN_S = 8 * 60 * 60 # 28800 +# v0.6-3b: cooldown lives in adapter_config.wfigs.cooldown_seconds +# (default 28800). Re-read on every cooldown-check, so a GUI edit takes +# effect on the next poll cycle. Module-level name retained as a +# backward-compat alias for test imports. +WFIGS_BROADCAST_COOLDOWN_S = 28800 def _now() -> int: @@ -166,17 +168,21 @@ def handle_wfigs(normalized: dict, envelope: dict, subject: str, # Forward-only change detection: more acres or higher containment counts. # Downward revisions and unchanged values do not warrant re-broadcast. + # v0.6-3b: each axis can be silenced via adapter_config toggles. changed_acres = ( - acres is not None + bool(adapter_config.wfigs.broadcast_on_acres) + and acres is not None and (last_bcast_acres is None or acres > last_bcast_acres) ) changed_contained = ( - contained_pct is not None + bool(adapter_config.wfigs.broadcast_on_contained) + and contained_pct is not None and (last_bcast_contained is None or contained_pct > last_bcast_contained) ) + cooldown_s = int(adapter_config.wfigs.cooldown_seconds) eight_hours_passed = ( last_bcast_at is None - or (now - int(last_bcast_at) >= WFIGS_BROADCAST_COOLDOWN_S) + or (now - int(last_bcast_at) >= cooldown_s) ) if (changed_acres or changed_contained) and eight_hours_passed: @@ -319,7 +325,7 @@ def _location_anchor(n: dict) -> str: if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): try: from meshai.central_normalizer import nearest_town - nt = nearest_town(lat, lon, max_distance_mi=100.0) + nt = nearest_town(lat, lon, max_distance_mi=float(adapter_config.wfigs.anchor_max_mi)) except Exception: logger.exception("nearest_town failed; falling through") nt = None diff --git a/meshai/central_normalizer.py b/meshai/central_normalizer.py index 9d3d364..1f79474 100644 --- a/meshai/central_normalizer.py +++ b/meshai/central_normalizer.py @@ -25,6 +25,7 @@ import urllib.request from collections import OrderedDict from datetime import datetime, timezone from typing import Any, Optional +from meshai.adapter_config import adapter_config logger = logging.getLogger(__name__) @@ -280,6 +281,9 @@ def _is_uninformative_road(road: Optional[str]) -> bool: # 2026-06-04). It's the same Echo6-local Photon instance that backs Central's # NaviBackend reverse-geocoder. Photon takes osm_tag=place (KEY only, not # key:value with comma-list -- that returns 0 features -- per probe). +# v0.6-3b: photon endpoint settings live in adapter_config.geocoder. +# Module-level names retained as backward-compat aliases so existing +# test imports / monkeypatches still resolve. PHOTON_BASE_URL = "http://100.64.0.24:2322" PHOTON_TIMEOUT_S = 2.0 PHOTON_RADIUS_KM = 80 # ≈ 50 miles diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index c75e4c4..47fdd2b 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -31,6 +31,7 @@ import logging import time from collections import OrderedDict from typing import Callable, Optional +from meshai.adapter_config import adapter_config from meshai.notifications.events import Event, make_payload_from_event from meshai.notifications.categories import get_toggle @@ -154,10 +155,12 @@ class Dispatcher: # contract (oldest = first-evicted on overflow). On-disk retains a # 7-day window which may exceed the in-memory cap; the LLM still # sees the full window via direct SELECT. + # v0.6-3b: restore cap from adapter_config. + _restore_cap = int(adapter_config.dispatcher.dedup_lru_max) rows = conn.execute( "SELECT source, event_id FROM dispatcher_dedup " "ORDER BY seen_at DESC LIMIT ?", - (_DEDUP_LRU_MAX,), + (_restore_cap,), ).fetchall() for r in reversed(rows): self._dedup_lru[(r["source"], r["event_id"])] = True @@ -202,8 +205,10 @@ class Dispatcher: "VALUES (?,?,?,?,?)", (toggle, category, region, now, now), ) + # v0.6-3b: prune multiplier from adapter_config. if cooldown_s > 0: - cutoff = now - (2 * cooldown_s) + _mult = int(adapter_config.dispatcher.cooldown_prune_multiplier) + cutoff = now - (_mult * cooldown_s) conn.execute( "DELETE FROM dispatcher_cooldowns WHERE last_fired_at < ?", (cutoff,), @@ -225,7 +230,9 @@ class Dispatcher: "source, event_id, seen_at) VALUES (?,?,?)", (source, event_id, now), ) - cutoff = now - _DEDUP_DB_RETENTION_S + # v0.6-3b: retention window from adapter_config (days * 86400). + retention_s = int(adapter_config.dispatcher.dedup_db_retention_days) * 86400 + cutoff = now - retention_s conn.execute( "DELETE FROM dispatcher_dedup WHERE seen_at < ?", (cutoff,), @@ -365,8 +372,11 @@ class Dispatcher: # In-memory prune: mirror the SQLite cutoff when the map grows # past the threshold. The SQLite prune already ran inside # _persist_cooldown. - if len(self._toggle_cooldown) > _COOLDOWN_INMEM_PRUNE_THRESHOLD: - cutoff = now - (2 * cooldown_s) + # v0.6-3b: prune size + multiplier from adapter_config. + _prune_size = int(adapter_config.dispatcher.cooldown_prune_size) + _prune_mult = int(adapter_config.dispatcher.cooldown_prune_multiplier) + if len(self._toggle_cooldown) > _prune_size: + cutoff = now - (_prune_mult * cooldown_s) self._toggle_cooldown = { k: t for k, t in self._toggle_cooldown.items() if t >= cutoff } @@ -384,7 +394,9 @@ class Dispatcher: return self._dedup_lru[dk] = True self._persist_dedup(dk, time.time()) - while len(self._dedup_lru) > _DEDUP_LRU_MAX: + # v0.6-3b: read cap from adapter_config (default 10_000). + _lru_max = int(adapter_config.dispatcher.dedup_lru_max) + while len(self._dedup_lru) > _lru_max: self._dedup_lru.popitem(last=False) # evict oldest regions = getattr(tog, "regions", None) or [] diff --git a/meshai/notifications/pipeline/grouper.py b/meshai/notifications/pipeline/grouper.py index 1260099..cb15911 100644 --- a/meshai/notifications/pipeline/grouper.py +++ b/meshai/notifications/pipeline/grouper.py @@ -23,7 +23,7 @@ class Grouper: def __init__( self, next_handler: Callable[[Event], None], - window_seconds: float = 60.0, + window_seconds: float | None = None, ): """Initialize. @@ -31,10 +31,14 @@ class Grouper: next_handler: Callable that receives events when they exit the grouper (either immediately if no group_key, or after the window expires). - window_seconds: How long to hold a group_key before - emitting downstream (default 60 seconds). + window_seconds: Hold window before emission. None -> read + from adapter_config.pipeline.grouper_window_seconds + (default 60). v0.6-3b. """ self._next = next_handler + if window_seconds is None: + from meshai.adapter_config import adapter_config + window_seconds = float(adapter_config.pipeline.grouper_window_seconds) self._window = window_seconds # {group_key: (event, hold_until_ts)} self._held: dict[str, tuple[Event, float]] = {} diff --git a/meshai/notifications/pipeline/inhibitor.py b/meshai/notifications/pipeline/inhibitor.py index 9908233..e9bf69c 100644 --- a/meshai/notifications/pipeline/inhibitor.py +++ b/meshai/notifications/pipeline/inhibitor.py @@ -24,16 +24,21 @@ class Inhibitor: def __init__( self, next_handler: Callable[[Event], None], - ttl_seconds: float = 1800.0, + ttl_seconds: float | None = None, ): """Initialize. Args: next_handler: Callable that receives non-suppressed events. ttl_seconds: How long an inhibit_key remains active after - the originating event (default 30 minutes). + the originating event. None -> read from + adapter_config.pipeline.inhibitor_ttl_seconds (default + 1800). v0.6-3b: explicit value still wins for tests. """ self._next = next_handler + if ttl_seconds is None: + from meshai.adapter_config import adapter_config + ttl_seconds = float(adapter_config.pipeline.inhibitor_ttl_seconds) self._ttl = ttl_seconds # {inhibit_key: (rank, expires_at)} self._active: dict[str, tuple[int, float]] = {} diff --git a/meshai/notifications/scheduled/band_conditions.py b/meshai/notifications/scheduled/band_conditions.py index 456ad19..d32c4e1 100644 --- a/meshai/notifications/scheduled/band_conditions.py +++ b/meshai/notifications/scheduled/band_conditions.py @@ -22,6 +22,7 @@ just started, the first scheduled broadcast within the grace window is suppressed for consistency with the event-driven adapters. """ from __future__ import annotations +from meshai.adapter_config import adapter_config import asyncio import json @@ -40,9 +41,8 @@ import httpx logger = logging.getLogger(__name__) -# Window for "fresh" SWPC data. If the latest swpc_kindex (or whatever we -# need) is older than this, fall through to HamQSL. -_SWPC_FRESHNESS_S = 6 * 3600 +# v0.6-3b: SWPC-freshness window + HamQSL endpoint live in +# adapter_config.band_conditions. # Multi-line wire format -- emoji + headline per slot, then 4 band rows. # Color codes per Matt: 🟢 Good, 🟡 Fair, 🔴 Poor. @@ -61,9 +61,7 @@ _SLOT_LABEL = { # convention so users coming from Ham Radio Toolbox recognise the format. _BAND_ORDER = ["80-40m", "30-20m", "17-15m", "12-10m"] -# HamQSL endpoint. Public, no auth. -_HAMQSL_URL = "https://www.hamqsl.com/solarxml.php" -_HAMQSL_TIMEOUT_S = 5 +# HamQSL endpoint config lives in adapter_config.band_conditions. # ======================================================================== @@ -149,7 +147,7 @@ def _load_swpc_state(now: int) -> Optional[dict]: except Exception: return None - cutoff = now - _SWPC_FRESHNESS_S + cutoff = now - int(adapter_config.band_conditions.swpc_freshness_seconds) state = {} @@ -251,8 +249,10 @@ def _fetch_hamqsl(day: bool, _http_get: Optional[Callable] = None) -> Optional[d def _http_get(url, timeout): with httpx.Client(timeout=timeout) as c: return c.get(url) + hamqsl_url = str(adapter_config.band_conditions.hamqsl_url) + hamqsl_timeout = int(adapter_config.band_conditions.hamqsl_timeout_s) try: - resp = _http_get(_HAMQSL_URL, _HAMQSL_TIMEOUT_S) + resp = _http_get(hamqsl_url, hamqsl_timeout) except Exception: return None if getattr(resp, "status_code", 0) != 200: diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index c075bf3..4b480a7 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 6 +SCHEMA_VERSION = 7 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v7.sql b/meshai/persistence/migrations/v7.sql new file mode 100644 index 0000000..4ab9eb4 --- /dev/null +++ b/meshai/persistence/migrations/v7.sql @@ -0,0 +1,34 @@ +-- v0.6-3b firms_pixels dedup-key column + index update. +-- +-- v6.sql created firms_pixels with a UNIQUE INDEX on +-- (round(lat, 5), round(lon, 5), acq_time, satellite) -- a hardcoded +-- ~1.1m precision. v0.6-3a.1 introduced adapter_config.firms.dedup_distance_m +-- (default 5m) for user-tunable dedup precision. SQLite indexes can't have +-- dynamic parameters, so we move to a precomputed `dedup_key` column the +-- handler quantizes at INSERT time. +-- +-- The handler computes dedup_key = "," where q_lat / q_lon +-- are lat/lon rounded to (dedup_distance_m / 111000) degrees. Two pixels +-- whose rounded coords agree get the same key and collide on the unique +-- index. Changing dedup_distance_m at runtime takes effect for future +-- INSERTs without touching the schema. +-- +-- firms_pixels is empty at this migration -- production has 0 rows since +-- v0.6-1, so no backfill needed. + +-- Drop the old hardcoded index. +DROP INDEX IF EXISTS idx_firms_pixels_dedup; + +-- Add the explicit dedup_key column. +ALTER TABLE firms_pixels ADD COLUMN dedup_key TEXT; + +-- New unique index on (dedup_key, acq_time, satellite). NULL dedup_key +-- compares unequal to other NULLs (SQLite default), so any legacy NULL +-- rows that exist on this DB don't trigger constraint violations against +-- each other or against future non-NULL rows. +CREATE UNIQUE INDEX IF NOT EXISTS idx_firms_pixels_dedup + ON firms_pixels(dedup_key, acq_time, satellite); + +-- Helper index for backfill queries / range scans by dedup_key alone. +CREATE INDEX IF NOT EXISTS idx_firms_pixels_dedup_key + ON firms_pixels(dedup_key); diff --git a/tests/test_adapter_config_foundation.py b/tests/test_adapter_config_foundation.py index 0ea2d8c..f8817df 100644 --- a/tests/test_adapter_config_foundation.py +++ b/tests/test_adapter_config_foundation.py @@ -54,11 +54,11 @@ def test_v6_tables_exist(fresh_db): assert "adapter_meta" in tables -def test_schema_meta_at_v6(fresh_db): +def test_schema_meta_at_v7(fresh_db): v = fresh_db.execute( "SELECT value FROM schema_meta WHERE key='version'" ).fetchone()["value"] - assert int(v) == 6 + assert int(v) == 7 def test_adapter_config_type_check_constrains_vocabulary(fresh_db):