mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
fix(2-C): wire dedup into poll loop, add conditional fetch
Bug fixes: 1. Wire is_published/mark_published/bump_last_seen into poll() loop - Skip already-published items, bump TTL to prevent sweep - Mark published after yield to track new items 2. Add conditional fetch support (If-Modified-Since, If-None-Match) - Store Last-Modified/ETag from responses - Send conditional headers on subsequent requests - Handle 304 Not Modified gracefully (return empty list) 3. Document state parsing rationale in docstring - Description has structured State: field vs unreliable title prefixes Tests added: - test_dedup_in_poll_loop: verify second poll yields 0 for same items - test_conditional_304_yields_zero: verify 304 returns empty list - test_conditional_headers_sent_after_first_poll: verify headers sent Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
8751264f8c
commit
1ef19508a1
2 changed files with 180 additions and 0 deletions
|
|
@ -101,6 +101,12 @@ def parse_state_from_description(description: str) -> str | None:
|
||||||
|
|
||||||
Format: "State: Minnesota" or "State: New Mexico"
|
Format: "State: Minnesota" or "State: New Mexico"
|
||||||
Returns 2-letter state code or None if not found.
|
Returns 2-letter state code or None if not found.
|
||||||
|
|
||||||
|
Design note: State is parsed from the description rather than the title
|
||||||
|
because InciWeb titles use unit code prefixes (e.g., "MNMNS Stewart Trail",
|
||||||
|
"CACNP Santa Rosa Island Fire") which are not reliable state indicators.
|
||||||
|
The description has a structured "State: <name>" field that reliably
|
||||||
|
identifies the state for all incidents.
|
||||||
"""
|
"""
|
||||||
pattern = r"State:\s*([A-Za-z\s]+?)(?:\n|---|$)"
|
pattern = r"State:\s*([A-Za-z\s]+?)(?:\n|---|$)"
|
||||||
match = re.search(pattern, description)
|
match = re.search(pattern, description)
|
||||||
|
|
@ -176,6 +182,10 @@ class InciWebAdapter(SourceAdapter):
|
||||||
self._session: aiohttp.ClientSession | None = None
|
self._session: aiohttp.ClientSession | None = None
|
||||||
self._db: sqlite3.Connection | None = None
|
self._db: sqlite3.Connection | None = None
|
||||||
|
|
||||||
|
# Conditional fetch state
|
||||||
|
self._last_modified: str | None = None
|
||||||
|
self._etag: str | None = None
|
||||||
|
|
||||||
# Parse region from settings
|
# Parse region from settings
|
||||||
region_dict = config.settings.get("region")
|
region_dict = config.settings.get("region")
|
||||||
if region_dict:
|
if region_dict:
|
||||||
|
|
@ -300,10 +310,25 @@ class InciWebAdapter(SourceAdapter):
|
||||||
if not self._session:
|
if not self._session:
|
||||||
raise RuntimeError("Session not initialized")
|
raise RuntimeError("Session not initialized")
|
||||||
|
|
||||||
|
# Build request headers with conditional fetch support
|
||||||
headers = {"User-Agent": "Central/0.4"}
|
headers = {"User-Agent": "Central/0.4"}
|
||||||
|
if self._last_modified:
|
||||||
|
headers["If-Modified-Since"] = self._last_modified
|
||||||
|
if self._etag:
|
||||||
|
headers["If-None-Match"] = self._etag
|
||||||
|
|
||||||
async with self._session.get(INCIWEB_RSS_URL, headers=headers) as resp:
|
async with self._session.get(INCIWEB_RSS_URL, headers=headers) as resp:
|
||||||
|
# Handle 304 Not Modified
|
||||||
|
if resp.status == 304:
|
||||||
|
logger.info("InciWeb not modified")
|
||||||
|
return []
|
||||||
|
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
# Capture conditional fetch headers for next request
|
||||||
|
self._last_modified = resp.headers.get("Last-Modified")
|
||||||
|
self._etag = resp.headers.get("ETag")
|
||||||
|
|
||||||
content = await resp.text()
|
content = await resp.text()
|
||||||
|
|
||||||
# Parse RSS XML
|
# Parse RSS XML
|
||||||
|
|
@ -367,6 +392,11 @@ class InciWebAdapter(SourceAdapter):
|
||||||
if not guid:
|
if not guid:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Dedup: skip if already published
|
||||||
|
if self.is_published(guid):
|
||||||
|
self.bump_last_seen(guid)
|
||||||
|
continue
|
||||||
|
|
||||||
description_html = item.get("description", "")
|
description_html = item.get("description", "")
|
||||||
|
|
||||||
# Parse coordinates from description
|
# Parse coordinates from description
|
||||||
|
|
@ -435,6 +465,7 @@ class InciWebAdapter(SourceAdapter):
|
||||||
)
|
)
|
||||||
|
|
||||||
yield event
|
yield event
|
||||||
|
self.mark_published(guid)
|
||||||
events_yielded += 1
|
events_yielded += 1
|
||||||
|
|
||||||
# Periodic cleanup of old entries
|
# Periodic cleanup of old entries
|
||||||
|
|
|
||||||
|
|
@ -448,3 +448,152 @@ class TestInciWebAdapter:
|
||||||
|
|
||||||
assert adapter.region.north == 50.0
|
assert adapter.region.north == 50.0
|
||||||
assert adapter.region.south == 35.0
|
assert adapter.region.south == 35.0
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_dedup_in_poll_loop(
|
||||||
|
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
|
||||||
|
):
|
||||||
|
"""Dedup integration: second poll with same items yields zero events."""
|
||||||
|
from central.adapters.inciweb import InciWebAdapter
|
||||||
|
|
||||||
|
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
|
||||||
|
await adapter.startup()
|
||||||
|
|
||||||
|
# Single-item RSS for clarity
|
||||||
|
single_item_rss = """<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" version="2.0">
|
||||||
|
<channel>
|
||||||
|
<title>InciWeb</title>
|
||||||
|
<item>
|
||||||
|
<title>Test Fire</title>
|
||||||
|
<link>http://inciweb.wildfire.gov/test</link>
|
||||||
|
<description>State: California</description>
|
||||||
|
<pubDate>Mon, 18 May 2026 09:00:00 EDT</pubDate>
|
||||||
|
<guid isPermaLink="false">DEDUP-TEST-001</guid>
|
||||||
|
</item>
|
||||||
|
</channel>
|
||||||
|
</rss>"""
|
||||||
|
|
||||||
|
def make_mock_response():
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status = 200
|
||||||
|
mock_response.raise_for_status = MagicMock()
|
||||||
|
mock_response.text = AsyncMock(return_value=single_item_rss)
|
||||||
|
mock_response.headers = {"Last-Modified": None, "ETag": None}
|
||||||
|
return mock_response
|
||||||
|
|
||||||
|
# First poll: should yield 1 event
|
||||||
|
with patch.object(
|
||||||
|
adapter._session, "get",
|
||||||
|
return_value=AsyncMock(
|
||||||
|
__aenter__=AsyncMock(return_value=make_mock_response()),
|
||||||
|
__aexit__=AsyncMock()
|
||||||
|
)
|
||||||
|
):
|
||||||
|
events_first = [e async for e in adapter.poll()]
|
||||||
|
|
||||||
|
assert len(events_first) == 1
|
||||||
|
assert events_first[0].data["guid"] == "DEDUP-TEST-001"
|
||||||
|
|
||||||
|
# Verify mark_published was called
|
||||||
|
assert adapter.is_published("DEDUP-TEST-001") is True
|
||||||
|
|
||||||
|
# Second poll: same item should be skipped (dedup)
|
||||||
|
with patch.object(
|
||||||
|
adapter._session, "get",
|
||||||
|
return_value=AsyncMock(
|
||||||
|
__aenter__=AsyncMock(return_value=make_mock_response()),
|
||||||
|
__aexit__=AsyncMock()
|
||||||
|
)
|
||||||
|
):
|
||||||
|
events_second = [e async for e in adapter.poll()]
|
||||||
|
|
||||||
|
assert len(events_second) == 0 # Dedup prevents re-yield
|
||||||
|
|
||||||
|
await adapter.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_conditional_304_yields_zero(
|
||||||
|
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
|
||||||
|
):
|
||||||
|
"""HTTP 304 Not Modified returns empty list and yields zero events."""
|
||||||
|
from central.adapters.inciweb import InciWebAdapter
|
||||||
|
|
||||||
|
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
|
||||||
|
await adapter.startup()
|
||||||
|
|
||||||
|
# Mock 304 response
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status = 304
|
||||||
|
mock_response.raise_for_status = MagicMock()
|
||||||
|
|
||||||
|
with patch.object(
|
||||||
|
adapter._session, "get",
|
||||||
|
return_value=AsyncMock(
|
||||||
|
__aenter__=AsyncMock(return_value=mock_response),
|
||||||
|
__aexit__=AsyncMock()
|
||||||
|
)
|
||||||
|
):
|
||||||
|
events = [e async for e in adapter.poll()]
|
||||||
|
|
||||||
|
assert len(events) == 0
|
||||||
|
|
||||||
|
await adapter.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_conditional_headers_sent_after_first_poll(
|
||||||
|
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
|
||||||
|
):
|
||||||
|
"""Conditional fetch headers sent on second poll after first captures them."""
|
||||||
|
from central.adapters.inciweb import InciWebAdapter
|
||||||
|
|
||||||
|
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
|
||||||
|
await adapter.startup()
|
||||||
|
|
||||||
|
# First response with Last-Modified and ETag
|
||||||
|
first_response = AsyncMock()
|
||||||
|
first_response.status = 200
|
||||||
|
first_response.raise_for_status = MagicMock()
|
||||||
|
first_response.text = AsyncMock(return_value="""<?xml version="1.0"?>
|
||||||
|
<rss version="2.0"><channel><title>Test</title></channel></rss>""")
|
||||||
|
first_response.headers = {
|
||||||
|
"Last-Modified": "Tue, 19 May 2026 03:00:00 GMT",
|
||||||
|
"ETag": "\"abc123\"",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Track headers sent on second request
|
||||||
|
captured_headers = {}
|
||||||
|
|
||||||
|
def capture_get(*args, **kwargs):
|
||||||
|
captured_headers.update(kwargs.get("headers", {}))
|
||||||
|
second_response = AsyncMock()
|
||||||
|
second_response.status = 304
|
||||||
|
second_response.raise_for_status = MagicMock()
|
||||||
|
return AsyncMock(
|
||||||
|
__aenter__=AsyncMock(return_value=second_response),
|
||||||
|
__aexit__=AsyncMock()
|
||||||
|
)
|
||||||
|
|
||||||
|
# First poll
|
||||||
|
with patch.object(
|
||||||
|
adapter._session, "get",
|
||||||
|
return_value=AsyncMock(
|
||||||
|
__aenter__=AsyncMock(return_value=first_response),
|
||||||
|
__aexit__=AsyncMock()
|
||||||
|
)
|
||||||
|
):
|
||||||
|
[e async for e in adapter.poll()]
|
||||||
|
|
||||||
|
# Verify adapter captured the headers
|
||||||
|
assert adapter._last_modified == "Tue, 19 May 2026 03:00:00 GMT"
|
||||||
|
assert adapter._etag == "\"abc123\""
|
||||||
|
|
||||||
|
# Second poll with header capture
|
||||||
|
with patch.object(adapter._session, "get", side_effect=capture_get):
|
||||||
|
[e async for e in adapter.poll()]
|
||||||
|
|
||||||
|
# Verify conditional headers were sent
|
||||||
|
assert captured_headers.get("If-Modified-Since") == "Tue, 19 May 2026 03:00:00 GMT"
|
||||||
|
assert captured_headers.get("If-None-Match") == "\"abc123\""
|
||||||
|
|
||||||
|
await adapter.shutdown()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue