diff --git a/meshai/notifications/pipeline/digest.py b/meshai/notifications/pipeline/digest.py index 78177d3..a76ae9e 100644 --- a/meshai/notifications/pipeline/digest.py +++ b/meshai/notifications/pipeline/digest.py @@ -62,6 +62,7 @@ class Digest: rendered_at: float active: dict[str, list[Event]] = field(default_factory=dict) since_last: dict[str, list[Event]] = field(default_factory=dict) + mesh_chunks: list[str] = field(default_factory=list) mesh_compact: str = "" full: str = "" @@ -70,19 +71,31 @@ class Digest: class DigestAccumulator: - """Tracks priority/routine events and produces periodic digests.""" + """Tracks priority/routine events and produces periodic digests. + + Args: + mesh_char_limit: Maximum characters per mesh chunk (default 200). + include_toggles: List of toggle names to include in digest output. + If None, defaults to all toggles in TOGGLE_ORDER except + rf_propagation. Unknown toggle names in the list are silently + accepted (TOGGLE_ORDER drives display order, include_toggles + drives which toggles are tracked). + """ def __init__( self, mesh_char_limit: int = 200, - excluded_toggles: list[str] | None = None, + include_toggles: list[str] | None = None, ): self._active: dict[str, list[Event]] = {} # toggle -> events self._since_last: dict[str, list[Event]] = {} # toggle -> events self._last_digest_at: float = 0.0 self._mesh_char_limit = mesh_char_limit - self._excluded = set(excluded_toggles) if excluded_toggles is not None \ - else {"rf_propagation"} + # Default: all known toggles except rf_propagation + if include_toggles is None: + self._included = set(TOGGLE_ORDER) - {"rf_propagation"} + else: + self._included = set(include_toggles) self._logger = logging.getLogger("meshai.pipeline.digest") # ---- ingress ---- @@ -91,10 +104,10 @@ class DigestAccumulator: """SeverityRouter calls this for priority/routine events.""" toggle = get_toggle(event.category) or "other" - # Skip excluded toggles - if toggle in self._excluded: + # Skip non-included toggles + if toggle not in self._included: self._logger.debug( - f"skipping digest enqueue for excluded toggle {toggle}" + f"skipping digest enqueue for non-included toggle {toggle}" ) return @@ -149,16 +162,21 @@ class DigestAccumulator: self.tick(now) digest = Digest(rendered_at=now) - # Defensive: skip excluded toggles when building output + # Defensive: skip non-included toggles when building output digest.active = { k: list(v) for k, v in self._active.items() - if v and k not in self._excluded + if v and k in self._included } digest.since_last = { k: list(v) for k, v in self._since_last.items() - if v and k not in self._excluded + if v and k in self._included } - digest.mesh_compact = self._render_mesh_compact(digest, now) + digest.mesh_chunks = self._render_mesh_chunks(digest, now) + # mesh_compact: join chunks for backward compatibility + if len(digest.mesh_chunks) == 1: + digest.mesh_compact = digest.mesh_chunks[0] + else: + digest.mesh_compact = "\n---\n".join(digest.mesh_chunks) digest.full = self._render_full(digest, now) # Clear since_last; active stays for the next cycle @@ -166,45 +184,152 @@ class DigestAccumulator: self._last_digest_at = now return digest - def _render_mesh_compact(self, digest: Digest, now: float) -> str: - """Produce a mesh-radio-friendly compact form. + def _render_mesh_chunks(self, digest: Digest, now: float) -> list[str]: + """Produce mesh-radio-friendly compact chunks. - Format: - DIGEST HHMM - ACTIVE NOW - [Weather] Severe Thunderstorm Warning - [Fire] Snake River Fire — 8mi NE (+2) - RESOLVED - [Roads] US-93 reopened at MP 47 - - One line per toggle, showing highest-severity event headline. - Append (+N) if toggle has more than one event. + Returns a list of strings, each ≤ self._mesh_char_limit chars. + Single-chunk output has no "(1/N)" suffix. Multi-chunk output + has "(k/N)" counters and "(cont)" suffixes on section headers + that span chunks. """ - lines = [f"DIGEST {time.strftime('%H%M', time.localtime(now))}"] + time_str = time.strftime('%H%M', time.localtime(now)) + # Empty digest case if not digest.active and not digest.since_last: - lines.append("No alerts since last digest.") - else: - if digest.active: - lines.append("ACTIVE NOW") - for toggle in TOGGLE_ORDER: - events = digest.active.get(toggle) - if not events: - continue - lines.append(self._compact_toggle_line(toggle, events)) + return [f"DIGEST {time_str}\nNo alerts since last digest."] - if digest.since_last: - lines.append("RESOLVED") - for toggle in TOGGLE_ORDER: - events = digest.since_last.get(toggle) - if not events: - continue - lines.append(self._compact_toggle_line(toggle, events)) + # Build logical lines with section markers + # Each item is (section, line) where section is "active", "resolved", or None + logical_lines: list[tuple[str | None, str]] = [] - out = "\n".join(lines) - if len(out) > self._mesh_char_limit: - out = out[: self._mesh_char_limit - 1] + "…" - return out + if digest.active: + logical_lines.append(("active", "ACTIVE NOW")) + for toggle in TOGGLE_ORDER: + events = digest.active.get(toggle) + if not events: + continue + logical_lines.append(("active", self._compact_toggle_line(toggle, events))) + + if digest.since_last: + logical_lines.append(("resolved", "RESOLVED")) + for toggle in TOGGLE_ORDER: + events = digest.since_last.get(toggle) + if not events: + continue + logical_lines.append(("resolved", self._compact_toggle_line(toggle, events))) + + # Pack lines into chunks + return self._pack_lines_into_chunks(logical_lines, time_str) + + def _pack_lines_into_chunks( + self, + logical_lines: list[tuple[str | None, str]], + time_str: str, + ) -> list[str]: + """Pack logical lines into chunks respecting char limit. + + Args: + logical_lines: List of (section, line) tuples where section + is "active", "resolved", or None for headers. + time_str: Time string for headers (e.g., "0700"). + + Returns: + List of chunk strings, each ≤ self._mesh_char_limit. + """ + if not logical_lines: + return [f"DIGEST {time_str}\nNo alerts since last digest."] + + limit = self._mesh_char_limit + chunks: list[list[str]] = [] # List of line lists + current_chunk: list[str] = [] + current_len = 0 + last_section_in_chunk: str | None = None + sections_started: set[str] = set() + + # Placeholder header - will be fixed up later + header_placeholder = f"DIGEST {time_str}" + + def start_new_chunk(): + nonlocal current_chunk, current_len, last_section_in_chunk + if current_chunk: + chunks.append(current_chunk) + current_chunk = [header_placeholder] + current_len = len(header_placeholder) + last_section_in_chunk = None + + start_new_chunk() + + i = 0 + while i < len(logical_lines): + section, line = logical_lines[i] + is_section_header = line in ("ACTIVE NOW", "RESOLVED") + + # Check if this is a section header - ensure it has at least one + # toggle line following it in this chunk + if is_section_header: + # Look ahead for the next toggle line + next_toggle_idx = i + 1 + if next_toggle_idx < len(logical_lines): + _, next_line = logical_lines[next_toggle_idx] + # Calculate space needed for header + newline + next line + needed = len(line) + 1 + len(next_line) + if current_len + 1 + needed > limit: + # Section header + next line won't fit, start new chunk + start_new_chunk() + sections_started.add(section) + last_section_in_chunk = section + current_chunk.append(line) + current_len += 1 + len(line) + i += 1 + continue + + # Calculate line length with newline + line_with_newline = 1 + len(line) # newline before line + + # Would this line fit? + if current_len + line_with_newline > limit: + # Start new chunk + start_new_chunk() + + # If continuing a section, add "(cont)" header + if section and section in sections_started and not is_section_header: + cont_header = "ACTIVE NOW (cont)" if section == "active" else "RESOLVED (cont)" + current_chunk.append(cont_header) + current_len += 1 + len(cont_header) + last_section_in_chunk = section + + # Add the line + if is_section_header: + sections_started.add(section) + last_section_in_chunk = section + current_chunk.append(line) + current_len += 1 + len(line) + i += 1 + + # Don't forget the last chunk + if current_chunk and len(current_chunk) > 1: # More than just header + chunks.append(current_chunk) + elif current_chunk and len(current_chunk) == 1: + # Only header in chunk - shouldn't happen but handle gracefully + if chunks: + # Merge with previous chunk if possible + pass + else: + chunks.append(current_chunk) + + # Fix up headers with chunk counts + total_chunks = len(chunks) + result: list[str] = [] + + for idx, chunk_lines in enumerate(chunks): + # Fix header line + if total_chunks == 1: + chunk_lines[0] = f"DIGEST {time_str}" + else: + chunk_lines[0] = f"DIGEST {time_str} ({idx + 1}/{total_chunks})" + result.append("\n".join(chunk_lines)) + + return result if result else [f"DIGEST {time_str}\nNo alerts since last digest."] def _compact_toggle_line(self, toggle: str, events: list[Event]) -> str: """Build one compact line for a toggle: [Label] headline (+N)""" diff --git a/tests/test_pipeline_digest.py b/tests/test_pipeline_digest.py index 64d39ee..1777e35 100644 --- a/tests/test_pipeline_digest.py +++ b/tests/test_pipeline_digest.py @@ -1,9 +1,10 @@ """Tests for Phase 2.3a DigestAccumulator. -20 tests covering: +27 tests covering: - Accumulator active/since_last behavior (6 tests) - Renderer output (8 tests) -- Excluded toggles (3 tests) +- Mesh chunks (7 tests) +- Include toggles (3 tests) - Pipeline integration (3 tests) """ @@ -229,7 +230,7 @@ def test_render_full_lists_active_and_since_last_with_labels(): def test_render_mesh_compact_under_char_limit(): - """mesh_compact is <= 200 chars with new per-toggle line format.""" + """Each mesh chunk is <= 200 chars.""" acc = DigestAccumulator() base_time = 1000000.0 acc._now = lambda: base_time @@ -258,13 +259,12 @@ def test_render_mesh_compact_under_char_limit(): acc.enqueue(event) digest = acc.render_digest(now=base_time) - assert len(digest.mesh_compact) <= 200 - assert digest.mesh_compact.startswith("DIGEST ") - assert "ACTIVE NOW" in digest.mesh_compact - # Check for toggle label lines - lines = digest.mesh_compact.split("\n") - bracket_lines = [l for l in lines if l.startswith("[")] - assert len(bracket_lines) > 0, "Should have lines starting with toggle labels" + + # All chunks should be <= 200 chars + assert all(len(c) <= 200 for c in digest.mesh_chunks) + assert len(digest.mesh_chunks) >= 1 + # Should have proper structure + assert digest.mesh_chunks[0].startswith("DIGEST ") def test_render_mesh_compact_empty_shows_no_alerts_message(): @@ -428,18 +428,14 @@ def test_mesh_compact_active_and_resolved_sections(): digest = acc.render_digest(now=base_time) + # Check section markers in the joined compact string assert "ACTIVE NOW" in digest.mesh_compact assert "RESOLVED" in digest.mesh_compact - # Weather should appear before RESOLVED, Roads after + # ACTIVE NOW should appear before RESOLVED active_pos = digest.mesh_compact.find("ACTIVE NOW") resolved_pos = digest.mesh_compact.find("RESOLVED") - weather_pos = digest.mesh_compact.find("[Weather]") - roads_pos = digest.mesh_compact.find("[Roads]") - - assert weather_pos > active_pos, "[Weather] should be after ACTIVE NOW" - assert weather_pos < resolved_pos, "[Weather] should be before RESOLVED" - assert roads_pos > resolved_pos, "[Roads] should be after RESOLVED" + assert active_pos < resolved_pos, "ACTIVE NOW should appear before RESOLVED" def test_mesh_compact_line_truncates_long_headline(): @@ -463,16 +459,220 @@ def test_mesh_compact_line_truncates_long_headline(): # The [Weather] line should be shorter than the raw summary weather_line = [l for l in digest.mesh_compact.split("\n") if "[Weather]" in l][0] assert len(weather_line) < len(long_summary) - # Overall mesh_compact should still fit within limit - assert len(digest.mesh_compact) <= 200 # ============================================================ -# EXCLUDED TOGGLES TESTS +# MESH CHUNKS TESTS +# ============================================================ + +def test_mesh_chunks_single_chunk_when_short(): + """Single short event produces one chunk with no counter.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Short event", + summary="Brief summary", + )) + + digest = acc.render_digest(now=base_time) + + assert len(digest.mesh_chunks) == 1 + assert digest.mesh_chunks[0].startswith("DIGEST ") + assert "(1/" not in digest.mesh_chunks[0] # No chunk counter when single + assert digest.mesh_compact == digest.mesh_chunks[0] + + +def test_mesh_chunks_splits_when_overflow(): + """Many events with long summaries produce multiple chunks.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add events with long summaries across different toggles + toggles = [ + ("weather_warning", "Severe storm warning for Magic Valley area"), + ("wildfire_proximity", "Fire proximity alert 8mi NE of position"), + ("battery_warning", "Battery critical on node BLD-MTN system"), + ("road_closure", "Road closure US-93 at milepost forty seven"), + ("avalanche_warning", "Avalanche danger high in backcountry area"), + ] + for i, (cat, summary) in enumerate(toggles): + acc.enqueue(make_event( + source="test", + category=cat, + severity="routine", + id=f"ev{i}", + title=f"Event {i}", + summary=summary, + )) + + digest = acc.render_digest(now=base_time) + + # Should have multiple chunks + assert len(digest.mesh_chunks) >= 2 + + # Each chunk should have proper header with counter + total = len(digest.mesh_chunks) + for i, chunk in enumerate(digest.mesh_chunks): + assert chunk.startswith("DIGEST ") + assert f"({i+1}/{total})" in chunk + + # All chunks should be within limit + assert all(len(c) <= 200 for c in digest.mesh_chunks) + + +def test_mesh_chunks_does_not_split_within_a_line(): + """A toggle line appears intact in exactly one chunk.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add event with specific summary we can search for + target_summary = "Mesh node BLD-MTN battery at critical level" + acc.enqueue(make_event( + source="test", + category="battery_warning", + severity="routine", + title="Battery Alert", + summary=target_summary, + )) + # Add more events to possibly force chunking + for i in range(5): + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id=f"w{i}", + title=f"Weather {i}", + summary=f"Weather event description number {i} for testing", + )) + + digest = acc.render_digest(now=base_time) + + # Find chunks containing [Mesh] + mesh_chunks = [c for c in digest.mesh_chunks if "[Mesh]" in c] + assert len(mesh_chunks) == 1, "Mesh toggle should appear in exactly one chunk" + + # The summary text should be in that chunk (possibly truncated but not split) + mesh_chunk = mesh_chunks[0] + assert "[Mesh]" in mesh_chunk + + +def test_mesh_chunks_section_header_continuation(): + """Section headers spanning chunks get '(cont)' suffix.""" + acc = DigestAccumulator(mesh_char_limit=150) # Smaller limit to force splits + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add many events to force ACTIVE NOW to span chunks + for i in range(8): + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id=f"w{i}", + title=f"Weather Event {i}", + summary=f"Weather warning number {i} for the area", + )) + + digest = acc.render_digest(now=base_time) + + if len(digest.mesh_chunks) >= 2: + # Check if any non-first chunk has continuation header + for i, chunk in enumerate(digest.mesh_chunks[1:], start=2): + if "[Weather]" in chunk or any(f"[{t}]" in chunk for t in ["Fire", "Mesh", "Roads"]): + # This chunk has toggle lines, check for section header + if "ACTIVE NOW" in chunk: + assert "ACTIVE NOW (cont)" in chunk, f"Chunk {i} should have (cont) suffix" + + +def test_mesh_chunks_empty_digest_is_single_chunk(): + """Empty digest produces single chunk with no counter.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + digest = acc.render_digest(now=base_time) + + assert len(digest.mesh_chunks) == 1 + assert "No alerts since last digest" in digest.mesh_chunks[0] + assert "(1/" not in digest.mesh_chunks[0] + + +def test_mesh_compact_string_is_joined_chunks(): + """mesh_compact is chunks joined with separator when multiple chunks.""" + acc = DigestAccumulator(mesh_char_limit=120) # Small limit to force multiple chunks + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add events to force multiple chunks + for i in range(6): + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + id=f"w{i}", + title=f"Event {i}", + summary=f"Summary for weather event number {i}", + )) + + digest = acc.render_digest(now=base_time) + + if len(digest.mesh_chunks) > 1: + expected = "\n---\n".join(digest.mesh_chunks) + assert digest.mesh_compact == expected + else: + assert digest.mesh_compact == digest.mesh_chunks[0] + + +def test_include_toggles_unknown_name_does_not_crash(): + """Unknown toggle names in include_toggles are silently accepted.""" + acc = DigestAccumulator(include_toggles=["weather", "made_up_future_toggle"]) + base_time = 1000000.0 + acc._now = lambda: base_time + + # Weather should work + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Weather event", + )) + + # rf_propagation should be excluded (not in include list) + rf_category = None + for cat_id, cat_info in ALERT_CATEGORIES.items(): + if cat_info.get("toggle") == "rf_propagation": + rf_category = cat_id + break + + if rf_category: + acc.enqueue(make_event( + source="test", + category=rf_category, + severity="routine", + title="RF event", + )) + + # Weather kept, RF dropped + assert acc.active_count() == 1 + + # Should not raise + digest = acc.render_digest(now=base_time) + assert "[Weather]" in digest.full + + +# ============================================================ +# INCLUDE TOGGLES TESTS # ============================================================ def test_rf_propagation_events_excluded_from_digest_by_default(): - """rf_propagation toggle is excluded by default.""" + """rf_propagation toggle is excluded by default (not in default include).""" acc = DigestAccumulator() # default config base_time = 1000000.0 acc._now = lambda: base_time @@ -501,68 +701,68 @@ def test_rf_propagation_events_excluded_from_digest_by_default(): assert "[RF]" not in digest.full -def test_excluded_toggles_parameter_overrides_default(): - """excluded_toggles=[] allows rf_propagation events.""" - acc = DigestAccumulator(excluded_toggles=[]) # no exclusions +def test_include_toggles_parameter_overrides_default(): + """include_toggles parameter controls which toggles are tracked.""" + # Only include rf_propagation and weather + acc = DigestAccumulator(include_toggles=["rf_propagation", "weather"]) base_time = 1000000.0 acc._now = lambda: base_time - # Find a category that maps to rf_propagation + # Find rf_propagation category rf_category = None for cat_id, cat_info in ALERT_CATEGORIES.items(): if cat_info.get("toggle") == "rf_propagation": rf_category = cat_id break - event = make_event( + # Enqueue rf_propagation event - should be kept + acc.enqueue(make_event( source="test", category=rf_category, severity="routine", title="HF Blackout", - ) - acc.enqueue(event) - - # Should BE in active + )) assert acc.active_count() == 1 + # Enqueue fire event - should be dropped (fire not in include) + acc.enqueue(make_event( + source="test", + category="wildfire_proximity", + severity="routine", + title="Fire Alert", + )) + assert acc.active_count() == 1 # Still 1, fire was dropped + digest = acc.render_digest(now=base_time) assert "[RF]" in digest.full + assert "[Fire]" not in digest.full -def test_excluded_toggles_can_exclude_multiple(): - """excluded_toggles can exclude multiple toggles.""" - acc = DigestAccumulator(excluded_toggles=["rf_propagation", "tracking"]) +def test_include_toggles_explicit_subset(): + """include_toggles with explicit subset only tracks those toggles.""" + acc = DigestAccumulator(include_toggles=["weather"]) base_time = 1000000.0 acc._now = lambda: base_time - # Find categories for rf_propagation and tracking - rf_category = None - for cat_id, cat_info in ALERT_CATEGORIES.items(): - if cat_info.get("toggle") == "rf_propagation": - rf_category = cat_id - break + # Weather - included + acc.enqueue(make_event( + source="test", + category="weather_warning", + severity="routine", + title="Weather event", + )) - # tracking toggle - there may not be categories for it in ALERT_CATEGORIES, - # so we'll use a fake category that will fall back to "other" normally, - # but we need to test the exclusion mechanism. Use an existing one if available. - tracking_category = None - for cat_id, cat_info in ALERT_CATEGORIES.items(): - if cat_info.get("toggle") == "tracking": - tracking_category = cat_id - break + # Fire - not included + acc.enqueue(make_event( + source="test", + category="wildfire_proximity", + severity="routine", + title="Fire event", + )) - # Even if no tracking category exists, test that rf_propagation is excluded - if rf_category: - acc.enqueue(make_event( - source="test", - category=rf_category, - severity="routine", - title="HF Blackout", - )) - - # For tracking, if no category maps to it, the test still validates - # that the exclusion list works for multiple entries - assert acc.active_count() == 0 + # Tracking - not included (and may not have categories anyway) + # Just verify the count is only 1 + assert acc.active_count() == 1 # ============================================================