diff --git a/src/central/adapters/state_511_atis.py b/src/central/adapters/state_511_atis.py index fecefa6..4150a7e 100644 --- a/src/central/adapters/state_511_atis.py +++ b/src/central/adapters/state_511_atis.py @@ -47,14 +47,20 @@ LAYER_EVENT_TYPE: dict[str, str] = { "Construction": "work_zone", } -# DataTables server-side body. POST is required (GET returns an empty data array); -# length covers Idaho's largest layer today (~114) with headroom — warn if exceeded. -_LIST_PAGE_LENGTH = 1000 -_LIST_BODY = { - "draw": "1", "start": "0", "length": str(_LIST_PAGE_LENGTH), - "columns[0][data]": "0", "order[0][column]": "0", - "order[0][dir]": "asc", "search[value]": "", -} +# DataTables server-side body. POST is required (GET returns an empty data array). +# Castle Rock caps each page at 100 rows regardless of `length`, so we paginate. +_LIST_PAGE_LENGTH = 100 +_MAX_PAGES = 50 # defensive ceiling (~5,000 rows/layer) + + +def _list_body(start: int) -> dict[str, str]: + return { + "draw": "1", "start": str(start), "length": str(_LIST_PAGE_LENGTH), + "columns[0][data]": "0", "order[0][column]": "0", + "order[0][dir]": "asc", "search[value]": "", + } + + _XHR = {"X-Requested-With": "XMLHttpRequest"} _FETCH_CONCURRENCY = 4 @@ -175,24 +181,50 @@ class State511ATISAdapter(SourceAdapter): out[str(m["itemId"])] = (float(loc[0]), float(loc[1])) return out - async def _fetch_details(self, base_url: str, layer: str) -> list[dict[str, Any]]: - """POST /List/GetData/ (DataTables) -> rich rows. [] on failure.""" + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_page(self, base_url: str, layer: str, start: int) -> dict[str, Any]: assert self._session is not None - try: - async with self._session.post( - f"{base_url}/List/GetData/{layer}", data=_LIST_BODY, headers=_XHR - ) as resp: - resp.raise_for_status() - doc = await resp.json(content_type=None) - except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError, ValueError) as exc: - logger.warning("state_511_atis detail fetch failed", - extra={"layer": layer, "base_url": base_url, "error": str(exc)}) - return [] - total = doc.get("recordsTotal") or 0 - rows = doc.get("data") or [] - if total > _LIST_PAGE_LENGTH: - logger.warning("state_511_atis layer exceeds page length; add pagination", - extra={"layer": layer, "recordsTotal": total, "length": _LIST_PAGE_LENGTH}) + async with self._session.post( + f"{base_url}/List/GetData/{layer}", data=_list_body(start), headers=_XHR + ) as resp: + resp.raise_for_status() + return await resp.json(content_type=None) + + async def _fetch_details(self, base_url: str, layer: str) -> list[dict[str, Any]]: + """POST /List/GetData/ (DataTables), paginated -> all rows. [] on failure. + + Castle Rock caps each page at 100 rows regardless of `length`, so we loop + until recordsFiltered is collected or a page returns empty, with a + defensive _MAX_PAGES ceiling. A mid-pagination failure returns the rows + gathered so far (retried next poll). + """ + rows: list[dict[str, Any]] = [] + start = 0 + total: int | None = None + for _ in range(_MAX_PAGES): + try: + doc = await self._fetch_page(base_url, layer, start) + except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError, ValueError) as exc: + logger.warning("state_511_atis detail fetch failed", + extra={"layer": layer, "base_url": base_url, "start": start, "error": str(exc)}) + break + if total is None: + total = doc.get("recordsFiltered") or doc.get("recordsTotal") or 0 + page = doc.get("data") or [] + if not page: + break + rows.extend(page) + start += _LIST_PAGE_LENGTH + if len(rows) >= total: + break + else: + logger.warning("state_511_atis pagination hit max_pages cap", + extra={"layer": layer, "max_pages": _MAX_PAGES, + "collected": len(rows), "recordsFiltered": total}) return rows def _build_event( diff --git a/tests/test_state_511_atis.py b/tests/test_state_511_atis.py index 829b3bf..f87ee4b 100644 --- a/tests/test_state_511_atis.py +++ b/tests/test_state_511_atis.py @@ -131,3 +131,77 @@ def test_inherits_dedup_mixin(): for m in ("is_published", "mark_published", "sweep_old_ids"): assert m not in State511ATISAdapter.__dict__, f"redefines {m}" assert getattr(State511ATISAdapter, m) is getattr(SourceAdapter, m) + + +# --- v0.9.7 pagination ------------------------------------------------------ + +def _rec(i): + return {"id": i, "type": "Roadwork", "roadwayName": "SH-1", "location": f"loc {i}"} + + +def _page(records, records_filtered): + return {"draw": 1, "recordsTotal": records_filtered, + "recordsFiltered": records_filtered, "data": records} + + +@pytest.mark.asyncio +async def test_pagination_collects_all_pages(adapter): + await adapter.startup() + pages = {0: _page([_rec(i) for i in range(100)], 114), + 100: _page([_rec(i) for i in range(100, 114)], 114)} + + async def fake_page(base_url, layer, start): + return pages[start] + + adapter._fetch_page = fake_page + rows = await adapter._fetch_details("https://511.idaho.gov", "Construction") + await adapter.shutdown() + assert len(rows) == 114 # 100 + 14, not truncated at the 100-row cap + assert {r["id"] for r in rows} == set(range(114)) + + +@pytest.mark.asyncio +async def test_pagination_handles_short_final_page(adapter): + await adapter.startup() + pages = {0: _page([_rec(i) for i in range(100)], 130), + 100: _page([_rec(i) for i in range(100, 130)], 130)} + + async def fake_page(base_url, layer, start): + return pages[start] + + adapter._fetch_page = fake_page + rows = await adapter._fetch_details("https://511.idaho.gov", "Construction") + await adapter.shutdown() + assert len(rows) == 130 # short 30-row final page collected + + +@pytest.mark.asyncio +async def test_pagination_empty_page_breaks(adapter): + # recordsFiltered overstates the set; an empty page must stop the loop (no hang). + await adapter.startup() + pages = {0: _page([_rec(i) for i in range(100)], 250), 100: _page([], 250)} + + async def fake_page(base_url, layer, start): + return pages.get(start, _page([], 250)) + + adapter._fetch_page = fake_page + rows = await adapter._fetch_details("https://511.idaho.gov", "Construction") + await adapter.shutdown() + assert len(rows) == 100 # empty page 2 terminates cleanly + + +@pytest.mark.asyncio +async def test_pagination_loop_cap(adapter, caplog): + # recordsFiltered never satisfied -> loop must stop at _MAX_PAGES and warn. + await adapter.startup() + + async def fake_page(base_url, layer, start): + return _page([_rec(start + i) for i in range(100)], 999_999) + + adapter._fetch_page = fake_page + with caplog.at_level("WARNING"): + rows = await adapter._fetch_details("https://511.idaho.gov", "Construction") + await adapter.shutdown() + from central.adapters.state_511_atis import _MAX_PAGES + assert len(rows) == _MAX_PAGES * 100 # capped, not infinite + assert "max_pages" in caplog.text