Skip to content

Commit d867715

Browse files
Abel Milashclaude
andcommitted
Add prefetch_pages to _get_multiple for overlapped page fetching
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5b2f6e9 commit d867715

3 files changed

Lines changed: 153 additions & 12 deletions

File tree

src/PowerPlatform/Dataverse/data/_odata.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ def _get_multiple(
840840
page_size: Optional[int] = None,
841841
count: bool = False,
842842
include_annotations: Optional[str] = None,
843+
prefetch_pages: int = 0,
843844
) -> Iterable[List[Dict[str, Any]]]:
844845
"""Iterate records from an entity set, yielding one page (list of dicts) at a time.
845846
@@ -861,6 +862,11 @@ def _get_multiple(
861862
:type count: ``bool``
862863
:param include_annotations: OData annotation pattern for the ``Prefer: odata.include-annotations`` header (e.g. ``"*"`` or ``"OData.Community.Display.V1.FormattedValue"``), or ``None``.
863864
:type include_annotations: ``str`` | ``None``
865+
:param prefetch_pages: Number of pages to pre-fetch ahead of the caller. ``0`` (default) is
866+
fully sequential. ``1`` submits the next HTTP request immediately after receiving the
867+
current page, overlapping network I/O with the caller's processing time. Values above
868+
``1`` are capped at ``1``.
869+
:type prefetch_pages: ``int``
864870
865871
:return: Iterator yielding pages (each page is a ``list`` of record dicts).
866872
:rtype: ``Iterable[list[dict[str, Any]]]``
@@ -905,21 +911,38 @@ def _do_request(url: str, *, params: Optional[Dict[str, Any]] = None) -> Dict[st
905911
if count:
906912
params["$count"] = "true"
907913

908-
data = _do_request(base_url, params=params)
909-
items = data.get("value") if isinstance(data, dict) else None
910-
if isinstance(items, list) and items:
911-
yield [x for x in items if isinstance(x, dict)]
912-
913-
next_link = None
914-
if isinstance(data, dict):
915-
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink")
916-
917-
while next_link:
918-
data = _do_request(next_link)
914+
if prefetch_pages <= 0:
915+
data = _do_request(base_url, params=params)
919916
items = data.get("value") if isinstance(data, dict) else None
920917
if isinstance(items, list) and items:
921918
yield [x for x in items if isinstance(x, dict)]
922-
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink") if isinstance(data, dict) else None
919+
920+
next_link = None
921+
if isinstance(data, dict):
922+
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink")
923+
924+
while next_link:
925+
data = _do_request(next_link)
926+
items = data.get("value") if isinstance(data, dict) else None
927+
if isinstance(items, list) and items:
928+
yield [x for x in items if isinstance(x, dict)]
929+
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink") if isinstance(data, dict) else None
930+
else:
931+
# Submit the next request before yielding the current page so
932+
# network I/O overlaps with the caller's processing time.
933+
ctx = copy_context()
934+
executor = ThreadPoolExecutor(max_workers=1)
935+
try:
936+
pending = executor.submit(ctx.run, _do_request, base_url, params=params)
937+
while pending is not None:
938+
data = pending.result()
939+
items = data.get("value") if isinstance(data, dict) else None
940+
next_link = (data.get("@odata.nextLink") or data.get("odata.nextLink")) if isinstance(data, dict) else None
941+
pending = executor.submit(ctx.run, _do_request, next_link) if next_link else None
942+
if isinstance(items, list) and items:
943+
yield [x for x in items if isinstance(x, dict)]
944+
finally:
945+
executor.shutdown(wait=False, cancel_futures=True)
923946

924947
# --------------------------- SQL Custom API -------------------------
925948
def _query_sql(self, sql: str) -> list[dict[str, Any]]:

src/PowerPlatform/Dataverse/operations/records.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ def get(
301301
page_size: Optional[int] = None,
302302
count: bool = False,
303303
include_annotations: Optional[str] = None,
304+
prefetch_pages: int = 0,
304305
) -> Iterable[List[Record]]:
305306
"""Fetch multiple records from a Dataverse table with pagination.
306307
@@ -338,6 +339,11 @@ def get(
338339
``Prefer: odata.include-annotations`` header (e.g. ``"*"`` or
339340
``"OData.Community.Display.V1.FormattedValue"``), or ``None``.
340341
:type include_annotations: :class:`str` or None
342+
:param prefetch_pages: When ``1``, the next page is fetched in a
343+
background thread while the caller processes the current page,
344+
overlapping network I/O with processing. ``0`` (default) is fully
345+
sequential. Values above ``1`` are capped at ``1``.
346+
:type prefetch_pages: :class:`int`
341347
342348
:return: Generator yielding pages, where each page is a list of
343349
:class:`~PowerPlatform.Dataverse.models.record.Record` objects.
@@ -370,6 +376,7 @@ def get(
370376
page_size: Optional[int] = None,
371377
count: bool = False,
372378
include_annotations: Optional[str] = None,
379+
prefetch_pages: int = 0,
373380
) -> Union[Record, Iterable[List[Record]]]:
374381
"""Fetch a single record by ID, or fetch multiple records with pagination.
375382
@@ -485,6 +492,7 @@ def _paged() -> Iterable[List[Record]]:
485492
page_size=page_size,
486493
count=count,
487494
include_annotations=include_annotations,
495+
**({"prefetch_pages": prefetch_pages} if prefetch_pages else {}),
488496
):
489497
yield [Record.from_api_response(table, row) for row in page]
490498

tests/unit/data/test_odata_internal.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,116 @@ def test_empty_value_list_yields_nothing(self):
10861086
self.assertEqual(pages, [])
10871087

10881088

1089+
class TestGetMultiplePrefetch(unittest.TestCase):
1090+
"""Behavioral tests for _get_multiple with prefetch_pages=1."""
1091+
1092+
def setUp(self):
1093+
self.od = _make_odata_client()
1094+
self.od._entity_set_from_schema_name = MagicMock(return_value="accounts")
1095+
1096+
def _two_page_responses(self):
1097+
page1 = _mock_response(
1098+
json_data={
1099+
"value": [{"accountid": "id-1"}],
1100+
"@odata.nextLink": "https://example.crm.dynamics.com/api/data/v9.2/accounts?$skiptoken=2",
1101+
},
1102+
text="...",
1103+
)
1104+
page2 = _mock_response(
1105+
json_data={"value": [{"accountid": "id-2"}]},
1106+
text="...",
1107+
)
1108+
self.od._request.side_effect = [page1, page2]
1109+
1110+
def _three_page_responses(self):
1111+
"""Three pages with 2 records each — exercises multi-record pages and full pagination."""
1112+
base = "https://example.crm.dynamics.com/api/data/v9.2/accounts"
1113+
pages = [
1114+
{"value": [{"accountid": "id-1"}, {"accountid": "id-2"}], "@odata.nextLink": f"{base}?$skiptoken=3"},
1115+
{"value": [{"accountid": "id-3"}, {"accountid": "id-4"}], "@odata.nextLink": f"{base}?$skiptoken=5"},
1116+
{"value": [{"accountid": "id-5"}, {"accountid": "id-6"}]},
1117+
]
1118+
self.od._request.side_effect = [_mock_response(json_data=p, text="...") for p in pages]
1119+
1120+
def test_prefetch_returns_same_pages_as_sequential(self):
1121+
"""prefetch_pages=1 yields the same records in the same order as sequential mode."""
1122+
self._two_page_responses()
1123+
pages = list(self.od._get_multiple("account", prefetch_pages=1))
1124+
self.assertEqual(len(pages), 2)
1125+
self.assertEqual(pages[0][0]["accountid"], "id-1")
1126+
self.assertEqual(pages[1][0]["accountid"], "id-2")
1127+
1128+
def test_prefetch_matches_sequential_values_exactly(self):
1129+
"""prefetch_pages=1 returns identical page count, record count, and record order to prefetch_pages=0."""
1130+
# Run sequential first
1131+
self._three_page_responses()
1132+
seq_pages = list(self.od._get_multiple("account", prefetch_pages=0))
1133+
1134+
# Run prefetch with identical mock data
1135+
self._three_page_responses()
1136+
pre_pages = list(self.od._get_multiple("account", prefetch_pages=1))
1137+
1138+
self.assertEqual(len(pre_pages), len(seq_pages), "Page count mismatch")
1139+
seq_ids = [r["accountid"] for page in seq_pages for r in page]
1140+
pre_ids = [r["accountid"] for page in pre_pages for r in page]
1141+
self.assertEqual(pre_ids, seq_ids, "Record order or values differ")
1142+
self.assertEqual(pre_ids, ["id-1", "id-2", "id-3", "id-4", "id-5", "id-6"])
1143+
1144+
def test_prefetch_values_above_one_capped_at_one(self):
1145+
"""prefetch_pages=2 (or any value > 1) behaves identically to prefetch_pages=1."""
1146+
self._two_page_responses()
1147+
pages = list(self.od._get_multiple("account", prefetch_pages=2))
1148+
self.assertEqual(len(pages), 2)
1149+
self.assertEqual(pages[0][0]["accountid"], "id-1")
1150+
self.assertEqual(pages[1][0]["accountid"], "id-2")
1151+
# Both pages should have been fetched (max_workers=1 means sequential requests)
1152+
self.assertEqual(self.od._request.call_count, 2)
1153+
1154+
def test_prefetch_exception_propagates_to_caller(self):
1155+
"""An exception raised by the background request surfaces when the caller consumes the next page."""
1156+
page1 = _mock_response(
1157+
json_data={
1158+
"value": [{"accountid": "id-1"}],
1159+
"@odata.nextLink": "https://example.crm.dynamics.com/api/data/v9.2/accounts?$skiptoken=2",
1160+
},
1161+
text="...",
1162+
)
1163+
self.od._request.side_effect = [page1, RuntimeError("network failure")]
1164+
gen = self.od._get_multiple("account", prefetch_pages=1)
1165+
first = next(gen)
1166+
self.assertEqual(first[0]["accountid"], "id-1")
1167+
with self.assertRaises(RuntimeError):
1168+
next(gen)
1169+
1170+
def test_prefetch_early_close_does_not_hang(self):
1171+
"""Closing the generator early (break) does not deadlock or raise an error."""
1172+
page1 = _mock_response(
1173+
json_data={
1174+
"value": [{"accountid": "id-1"}],
1175+
"@odata.nextLink": "https://example.crm.dynamics.com/api/data/v9.2/accounts?$skiptoken=2",
1176+
},
1177+
text="...",
1178+
)
1179+
page2 = _mock_response(
1180+
json_data={"value": [{"accountid": "id-2"}]},
1181+
text="...",
1182+
)
1183+
self.od._request.side_effect = [page1, page2]
1184+
gen = self.od._get_multiple("account", prefetch_pages=1)
1185+
first = next(gen)
1186+
self.assertEqual(first[0]["accountid"], "id-1")
1187+
gen.close() # should not raise or hang
1188+
1189+
def test_prefetch_single_page_no_nextlink(self):
1190+
"""prefetch_pages=1 with a single page (no nextLink) yields exactly one page."""
1191+
data = {"value": [{"accountid": "id-1"}]}
1192+
self.od._request.return_value = _mock_response(json_data=data, text=str(data))
1193+
pages = list(self.od._get_multiple("account", prefetch_pages=1))
1194+
self.assertEqual(len(pages), 1)
1195+
self.assertEqual(pages[0][0]["accountid"], "id-1")
1196+
self.od._request.assert_called_once()
1197+
1198+
10891199
class TestQuerySql(unittest.TestCase):
10901200
"""Unit tests for _ODataClient._query_sql."""
10911201

0 commit comments

Comments
 (0)