fix(state_511_atis): paginate /List/GetData to fix 100-row truncation (v0.9.7)

Castle Rock caps each DataTables page at 100 rows regardless of `length`, so the
single-request fetcher silently dropped rows on any layer over 100 (confirmed
live: Construction recordsFiltered=114, returned 100 -> 14 rows invisible).
Backports the v0.9.6 cameras pagination loop into _fetch_details.

- _LIST_PAGE_LENGTH 1000 -> 100; new _list_body(start) builder; new @retry
  _fetch_page(base_url, layer, start).
- _fetch_details loops start+=100 until recordsFiltered collected or an empty
  page, with a _MAX_PAGES=50 ceiling that warns+breaks. Mid-pagination failure
  returns rows-so-far (retried next poll).
- Incidents (1) / Closures (29) are under 100 today but pagination applies
  uniformly; future-proof.

central-supervisor restart only (no stream, migration, template, or dep change).

Full suite: 833 passed, 1 skipped (central and unprivileged zvx, 3x each).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-26 02:40:58 +00:00
commit 52b9ae0bbd
2 changed files with 131 additions and 25 deletions

View file

@ -47,14 +47,20 @@ LAYER_EVENT_TYPE: dict[str, str] = {
"Construction": "work_zone", "Construction": "work_zone",
} }
# DataTables server-side body. POST is required (GET returns an empty data array); # DataTables server-side body. POST is required (GET returns an empty data array).
# length covers Idaho's largest layer today (~114) with headroom — warn if exceeded. # Castle Rock caps each page at 100 rows regardless of `length`, so we paginate.
_LIST_PAGE_LENGTH = 1000 _LIST_PAGE_LENGTH = 100
_LIST_BODY = { _MAX_PAGES = 50 # defensive ceiling (~5,000 rows/layer)
"draw": "1", "start": "0", "length": str(_LIST_PAGE_LENGTH),
def _list_body(start: int) -> dict[str, str]:
return {
"draw": "1", "start": str(start), "length": str(_LIST_PAGE_LENGTH),
"columns[0][data]": "0", "order[0][column]": "0", "columns[0][data]": "0", "order[0][column]": "0",
"order[0][dir]": "asc", "search[value]": "", "order[0][dir]": "asc", "search[value]": "",
} }
_XHR = {"X-Requested-With": "XMLHttpRequest"} _XHR = {"X-Requested-With": "XMLHttpRequest"}
_FETCH_CONCURRENCY = 4 _FETCH_CONCURRENCY = 4
@ -175,24 +181,50 @@ class State511ATISAdapter(SourceAdapter):
out[str(m["itemId"])] = (float(loc[0]), float(loc[1])) out[str(m["itemId"])] = (float(loc[0]), float(loc[1]))
return out return out
async def _fetch_details(self, base_url: str, layer: str) -> list[dict[str, Any]]: @retry(
"""POST /List/GetData/<Layer> (DataTables) -> rich rows. [] on failure.""" stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch_page(self, base_url: str, layer: str, start: int) -> dict[str, Any]:
assert self._session is not None assert self._session is not None
try:
async with self._session.post( async with self._session.post(
f"{base_url}/List/GetData/{layer}", data=_LIST_BODY, headers=_XHR f"{base_url}/List/GetData/{layer}", data=_list_body(start), headers=_XHR
) as resp: ) as resp:
resp.raise_for_status() resp.raise_for_status()
doc = await resp.json(content_type=None) return await resp.json(content_type=None)
async def _fetch_details(self, base_url: str, layer: str) -> list[dict[str, Any]]:
"""POST /List/GetData/<Layer> (DataTables), paginated -> all rows. [] on failure.
Castle Rock caps each page at 100 rows regardless of `length`, so we loop
until recordsFiltered is collected or a page returns empty, with a
defensive _MAX_PAGES ceiling. A mid-pagination failure returns the rows
gathered so far (retried next poll).
"""
rows: list[dict[str, Any]] = []
start = 0
total: int | None = None
for _ in range(_MAX_PAGES):
try:
doc = await self._fetch_page(base_url, layer, start)
except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError, ValueError) as exc: except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError, ValueError) as exc:
logger.warning("state_511_atis detail fetch failed", logger.warning("state_511_atis detail fetch failed",
extra={"layer": layer, "base_url": base_url, "error": str(exc)}) extra={"layer": layer, "base_url": base_url, "start": start, "error": str(exc)})
return [] break
total = doc.get("recordsTotal") or 0 if total is None:
rows = doc.get("data") or [] total = doc.get("recordsFiltered") or doc.get("recordsTotal") or 0
if total > _LIST_PAGE_LENGTH: page = doc.get("data") or []
logger.warning("state_511_atis layer exceeds page length; add pagination", if not page:
extra={"layer": layer, "recordsTotal": total, "length": _LIST_PAGE_LENGTH}) break
rows.extend(page)
start += _LIST_PAGE_LENGTH
if len(rows) >= total:
break
else:
logger.warning("state_511_atis pagination hit max_pages cap",
extra={"layer": layer, "max_pages": _MAX_PAGES,
"collected": len(rows), "recordsFiltered": total})
return rows return rows
def _build_event( def _build_event(

View file

@ -131,3 +131,77 @@ def test_inherits_dedup_mixin():
for m in ("is_published", "mark_published", "sweep_old_ids"): for m in ("is_published", "mark_published", "sweep_old_ids"):
assert m not in State511ATISAdapter.__dict__, f"redefines {m}" assert m not in State511ATISAdapter.__dict__, f"redefines {m}"
assert getattr(State511ATISAdapter, m) is getattr(SourceAdapter, m) assert getattr(State511ATISAdapter, m) is getattr(SourceAdapter, m)
# --- v0.9.7 pagination ------------------------------------------------------
def _rec(i):
return {"id": i, "type": "Roadwork", "roadwayName": "SH-1", "location": f"loc {i}"}
def _page(records, records_filtered):
return {"draw": 1, "recordsTotal": records_filtered,
"recordsFiltered": records_filtered, "data": records}
@pytest.mark.asyncio
async def test_pagination_collects_all_pages(adapter):
await adapter.startup()
pages = {0: _page([_rec(i) for i in range(100)], 114),
100: _page([_rec(i) for i in range(100, 114)], 114)}
async def fake_page(base_url, layer, start):
return pages[start]
adapter._fetch_page = fake_page
rows = await adapter._fetch_details("https://511.idaho.gov", "Construction")
await adapter.shutdown()
assert len(rows) == 114 # 100 + 14, not truncated at the 100-row cap
assert {r["id"] for r in rows} == set(range(114))
@pytest.mark.asyncio
async def test_pagination_handles_short_final_page(adapter):
await adapter.startup()
pages = {0: _page([_rec(i) for i in range(100)], 130),
100: _page([_rec(i) for i in range(100, 130)], 130)}
async def fake_page(base_url, layer, start):
return pages[start]
adapter._fetch_page = fake_page
rows = await adapter._fetch_details("https://511.idaho.gov", "Construction")
await adapter.shutdown()
assert len(rows) == 130 # short 30-row final page collected
@pytest.mark.asyncio
async def test_pagination_empty_page_breaks(adapter):
# recordsFiltered overstates the set; an empty page must stop the loop (no hang).
await adapter.startup()
pages = {0: _page([_rec(i) for i in range(100)], 250), 100: _page([], 250)}
async def fake_page(base_url, layer, start):
return pages.get(start, _page([], 250))
adapter._fetch_page = fake_page
rows = await adapter._fetch_details("https://511.idaho.gov", "Construction")
await adapter.shutdown()
assert len(rows) == 100 # empty page 2 terminates cleanly
@pytest.mark.asyncio
async def test_pagination_loop_cap(adapter, caplog):
# recordsFiltered never satisfied -> loop must stop at _MAX_PAGES and warn.
await adapter.startup()
async def fake_page(base_url, layer, start):
return _page([_rec(start + i) for i in range(100)], 999_999)
adapter._fetch_page = fake_page
with caplog.at_level("WARNING"):
rows = await adapter._fetch_details("https://511.idaho.gov", "Construction")
await adapter.shutdown()
from central.adapters.state_511_atis import _MAX_PAGES
assert len(rows) == _MAX_PAGES * 100 # capped, not infinite
assert "max_pages" in caplog.text