Feat: enriches Notion connector (#11414)

### What problem does this PR solve?

Enriches rich text (links, mentions, equations), flags to-do blocks with
[x]/[ ], captures block-level equations, builds table HTML, downloads
attachments.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Yongteng Lei
2025-11-20 19:51:37 +08:00
committed by GitHub
parent 820934fc77
commit 065917bf1c

View File

@ -1,38 +1,45 @@
import html
import logging import logging
from collections.abc import Generator from collections.abc import Generator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional from typing import Any, Optional
from urllib.parse import urlparse
from retry import retry from retry import retry
from common.data_source.config import ( from common.data_source.config import (
INDEX_BATCH_SIZE, INDEX_BATCH_SIZE,
DocumentSource, NOTION_CONNECTOR_DISABLE_RECURSIVE_PAGE_LOOKUP NOTION_CONNECTOR_DISABLE_RECURSIVE_PAGE_LOOKUP,
DocumentSource,
)
from common.data_source.exceptions import (
ConnectorMissingCredentialError,
ConnectorValidationError,
CredentialExpiredError,
InsufficientPermissionsError,
UnexpectedValidationError,
) )
from common.data_source.interfaces import ( from common.data_source.interfaces import (
LoadConnector, LoadConnector,
PollConnector, PollConnector,
SecondsSinceUnixEpoch SecondsSinceUnixEpoch,
) )
from common.data_source.models import ( from common.data_source.models import (
Document, Document,
TextSection, GenerateDocumentsOutput GenerateDocumentsOutput,
)
from common.data_source.exceptions import (
ConnectorValidationError,
CredentialExpiredError,
InsufficientPermissionsError,
UnexpectedValidationError, ConnectorMissingCredentialError
)
from common.data_source.models import (
NotionPage,
NotionBlock, NotionBlock,
NotionSearchResponse NotionPage,
NotionSearchResponse,
TextSection,
) )
from common.data_source.utils import ( from common.data_source.utils import (
rl_requests,
batch_generator, batch_generator,
datetime_from_string,
fetch_notion_data, fetch_notion_data,
filter_pages_by_time,
properties_to_str, properties_to_str,
filter_pages_by_time, datetime_from_string rl_requests,
) )
@ -61,11 +68,9 @@ class NotionConnector(LoadConnector, PollConnector):
self.recursive_index_enabled = recursive_index_enabled or bool(root_page_id) self.recursive_index_enabled = recursive_index_enabled or bool(root_page_id)
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _fetch_child_blocks( def _fetch_child_blocks(self, block_id: str, cursor: Optional[str] = None) -> dict[str, Any] | None:
self, block_id: str, cursor: Optional[str] = None
) -> dict[str, Any] | None:
"""Fetch all child blocks via the Notion API.""" """Fetch all child blocks via the Notion API."""
logging.debug(f"Fetching children of block with ID '{block_id}'") logging.debug(f"[Notion]: Fetching children of block with ID {block_id}")
block_url = f"https://api.notion.com/v1/blocks/{block_id}/children" block_url = f"https://api.notion.com/v1/blocks/{block_id}/children"
query_params = {"start_cursor": cursor} if cursor else None query_params = {"start_cursor": cursor} if cursor else None
@ -79,49 +84,42 @@ class NotionConnector(LoadConnector, PollConnector):
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
except Exception as e: except Exception as e:
if hasattr(e, 'response') and e.response.status_code == 404: if hasattr(e, "response") and e.response.status_code == 404:
logging.error( logging.error(f"[Notion]: Unable to access block with ID {block_id}. This is likely due to the block not being shared with the integration.")
f"Unable to access block with ID '{block_id}'. "
f"This is likely due to the block not being shared with the integration."
)
return None return None
else: else:
logging.exception(f"Error fetching blocks: {e}") logging.exception(f"[Notion]: Error fetching blocks: {e}")
raise raise
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _fetch_page(self, page_id: str) -> NotionPage: def _fetch_page(self, page_id: str) -> NotionPage:
"""Fetch a page from its ID via the Notion API.""" """Fetch a page from its ID via the Notion API."""
logging.debug(f"Fetching page for ID '{page_id}'") logging.debug(f"[Notion]: Fetching page for ID {page_id}")
page_url = f"https://api.notion.com/v1/pages/{page_id}" page_url = f"https://api.notion.com/v1/pages/{page_id}"
try: try:
data = fetch_notion_data(page_url, self.headers, "GET") data = fetch_notion_data(page_url, self.headers, "GET")
return NotionPage(**data) return NotionPage(**data)
except Exception as e: except Exception as e:
logging.warning(f"Failed to fetch page, trying database for ID '{page_id}': {e}") logging.warning(f"[Notion]: Failed to fetch page, trying database for ID {page_id}: {e}")
return self._fetch_database_as_page(page_id) return self._fetch_database_as_page(page_id)
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _fetch_database_as_page(self, database_id: str) -> NotionPage: def _fetch_database_as_page(self, database_id: str) -> NotionPage:
"""Attempt to fetch a database as a page.""" """Attempt to fetch a database as a page."""
logging.debug(f"Fetching database for ID '{database_id}' as a page") logging.debug(f"[Notion]: Fetching database for ID {database_id} as a page")
database_url = f"https://api.notion.com/v1/databases/{database_id}" database_url = f"https://api.notion.com/v1/databases/{database_id}"
data = fetch_notion_data(database_url, self.headers, "GET") data = fetch_notion_data(database_url, self.headers, "GET")
database_name = data.get("title") database_name = data.get("title")
database_name = ( database_name = database_name[0].get("text", {}).get("content") if database_name else None
database_name[0].get("text", {}).get("content") if database_name else None
)
return NotionPage(**data, database_name=database_name) return NotionPage(**data, database_name=database_name)
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _fetch_database( def _fetch_database(self, database_id: str, cursor: Optional[str] = None) -> dict[str, Any]:
self, database_id: str, cursor: Optional[str] = None
) -> dict[str, Any]:
"""Fetch a database from its ID via the Notion API.""" """Fetch a database from its ID via the Notion API."""
logging.debug(f"Fetching database for ID '{database_id}'") logging.debug(f"[Notion]: Fetching database for ID {database_id}")
block_url = f"https://api.notion.com/v1/databases/{database_id}/query" block_url = f"https://api.notion.com/v1/databases/{database_id}/query"
body = {"start_cursor": cursor} if cursor else None body = {"start_cursor": cursor} if cursor else None
@ -129,17 +127,12 @@ class NotionConnector(LoadConnector, PollConnector):
data = fetch_notion_data(block_url, self.headers, "POST", body) data = fetch_notion_data(block_url, self.headers, "POST", body)
return data return data
except Exception as e: except Exception as e:
if hasattr(e, 'response') and e.response.status_code in [404, 400]: if hasattr(e, "response") and e.response.status_code in [404, 400]:
logging.error( logging.error(f"[Notion]: Unable to access database with ID {database_id}. This is likely due to the database not being shared with the integration.")
f"Unable to access database with ID '{database_id}'. "
f"This is likely due to the database not being shared with the integration."
)
return {"results": [], "next_cursor": None} return {"results": [], "next_cursor": None}
raise raise
def _read_pages_from_database( def _read_pages_from_database(self, database_id: str) -> tuple[list[NotionBlock], list[str]]:
self, database_id: str
) -> tuple[list[NotionBlock], list[str]]:
"""Returns a list of top level blocks and all page IDs in the database.""" """Returns a list of top level blocks and all page IDs in the database."""
result_blocks: list[NotionBlock] = [] result_blocks: list[NotionBlock] = []
result_pages: list[str] = [] result_pages: list[str] = []
@ -158,10 +151,10 @@ class NotionConnector(LoadConnector, PollConnector):
if self.recursive_index_enabled: if self.recursive_index_enabled:
if obj_type == "page": if obj_type == "page":
logging.debug(f"Found page with ID '{obj_id}' in database '{database_id}'") logging.debug(f"[Notion]: Found page with ID {obj_id} in database {database_id}")
result_pages.append(result["id"]) result_pages.append(result["id"])
elif obj_type == "database": elif obj_type == "database":
logging.debug(f"Found database with ID '{obj_id}' in database '{database_id}'") logging.debug(f"[Notion]: Found database with ID {obj_id} in database {database_id}")
_, child_pages = self._read_pages_from_database(obj_id) _, child_pages = self._read_pages_from_database(obj_id)
result_pages.extend(child_pages) result_pages.extend(child_pages)
@ -172,44 +165,229 @@ class NotionConnector(LoadConnector, PollConnector):
return result_blocks, result_pages return result_blocks, result_pages
def _read_blocks(self, base_block_id: str) -> tuple[list[NotionBlock], list[str]]: def _extract_rich_text(self, rich_text_array: list[dict[str, Any]]) -> str:
"""Reads all child blocks for the specified block, returns blocks and child page ids.""" collected_text: list[str] = []
for rich_text in rich_text_array:
content = ""
r_type = rich_text.get("type")
if r_type == "equation":
expr = rich_text.get("equation", {}).get("expression")
if expr:
content = expr
elif r_type == "mention":
mention = rich_text.get("mention", {}) or {}
mention_type = mention.get("type")
mention_value = mention.get(mention_type, {}) if mention_type else {}
if mention_type == "date":
start = mention_value.get("start")
end = mention_value.get("end")
if start and end:
content = f"{start} - {end}"
elif start:
content = start
elif mention_type in {"page", "database"}:
content = mention_value.get("id", rich_text.get("plain_text", ""))
elif mention_type == "link_preview":
content = mention_value.get("url", rich_text.get("plain_text", ""))
else:
content = rich_text.get("plain_text", "") or str(mention_value)
else:
if rich_text.get("plain_text"):
content = rich_text["plain_text"]
elif "text" in rich_text and rich_text["text"].get("content"):
content = rich_text["text"]["content"]
href = rich_text.get("href")
if content and href:
content = f"{content} ({href})"
if content:
collected_text.append(content)
return "".join(collected_text).strip()
def _build_table_html(self, table_block_id: str) -> str | None:
rows: list[str] = []
cursor = None
while True:
data = self._fetch_child_blocks(table_block_id, cursor)
if data is None:
break
for result in data["results"]:
if result.get("type") != "table_row":
continue
cells_html: list[str] = []
for cell in result["table_row"].get("cells", []):
cell_text = self._extract_rich_text(cell)
cell_html = html.escape(cell_text) if cell_text else ""
cells_html.append(f"<td>{cell_html}</td>")
rows.append(f"<tr>{''.join(cells_html)}</tr>")
if data.get("next_cursor") is None:
break
cursor = data["next_cursor"]
if not rows:
return None
return "<table>\n" + "\n".join(rows) + "\n</table>"
def _download_file(self, url: str) -> bytes | None:
try:
response = rl_requests.get(url, timeout=60)
response.raise_for_status()
return response.content
except Exception as exc:
logging.warning(f"[Notion]: Failed to download Notion file from {url}: {exc}")
return None
def _extract_file_metadata(self, result_obj: dict[str, Any], block_id: str) -> tuple[str | None, str, str | None]:
file_source_type = result_obj.get("type")
file_source = result_obj.get(file_source_type, {}) if file_source_type else {}
url = file_source.get("url")
name = result_obj.get("name") or file_source.get("name")
if url and not name:
parsed_name = Path(urlparse(url).path).name
name = parsed_name or f"notion_file_{block_id}"
elif not name:
name = f"notion_file_{block_id}"
caption = self._extract_rich_text(result_obj.get("caption", [])) if "caption" in result_obj else None
return url, name, caption
def _build_attachment_document(
self,
block_id: str,
url: str,
name: str,
caption: Optional[str],
page_last_edited_time: Optional[str],
) -> Document | None:
file_bytes = self._download_file(url)
if file_bytes is None:
return None
extension = Path(name).suffix or Path(urlparse(url).path).suffix or ".bin"
if extension and not extension.startswith("."):
extension = f".{extension}"
if not extension:
extension = ".bin"
updated_at = datetime_from_string(page_last_edited_time) if page_last_edited_time else datetime.now(timezone.utc)
semantic_identifier = caption or name or f"Notion file {block_id}"
return Document(
id=block_id,
blob=file_bytes,
source=DocumentSource.NOTION,
semantic_identifier=semantic_identifier,
extension=extension,
size_bytes=len(file_bytes),
doc_updated_at=updated_at,
)
def _read_blocks(self, base_block_id: str, page_last_edited_time: Optional[str] = None) -> tuple[list[NotionBlock], list[str], list[Document]]:
result_blocks: list[NotionBlock] = [] result_blocks: list[NotionBlock] = []
child_pages: list[str] = [] child_pages: list[str] = []
attachments: list[Document] = []
cursor = None cursor = None
while True: while True:
data = self._fetch_child_blocks(base_block_id, cursor) data = self._fetch_child_blocks(base_block_id, cursor)
if data is None: if data is None:
return result_blocks, child_pages return result_blocks, child_pages, attachments
for result in data["results"]: for result in data["results"]:
logging.debug(f"Found child block for block with ID '{base_block_id}': {result}") logging.debug(f"[Notion]: Found child block for block with ID {base_block_id}: {result}")
result_block_id = result["id"] result_block_id = result["id"]
result_type = result["type"] result_type = result["type"]
result_obj = result[result_type] result_obj = result[result_type]
if result_type in ["ai_block", "unsupported", "external_object_instance_page"]: if result_type in ["ai_block", "unsupported", "external_object_instance_page"]:
logging.warning(f"Skipping unsupported block type '{result_type}'") logging.warning(f"[Notion]: Skipping unsupported block type {result_type}")
continue
if result_type == "table":
table_html = self._build_table_html(result_block_id)
if table_html:
result_blocks.append(
NotionBlock(
id=result_block_id,
text=table_html,
prefix="\n\n",
)
)
continue
if result_type == "equation":
expr = result_obj.get("expression")
if expr:
result_blocks.append(
NotionBlock(
id=result_block_id,
text=expr,
prefix="\n",
)
)
continue continue
cur_result_text_arr = [] cur_result_text_arr = []
if "rich_text" in result_obj: if "rich_text" in result_obj:
for rich_text in result_obj["rich_text"]: text = self._extract_rich_text(result_obj["rich_text"])
if "text" in rich_text: if text:
text = rich_text["text"]["content"] cur_result_text_arr.append(text)
cur_result_text_arr.append(text)
if result_type == "bulleted_list_item":
if cur_result_text_arr:
cur_result_text_arr[0] = f"- {cur_result_text_arr[0]}"
else:
cur_result_text_arr = ["- "]
if result_type == "numbered_list_item":
if cur_result_text_arr:
cur_result_text_arr[0] = f"1. {cur_result_text_arr[0]}"
else:
cur_result_text_arr = ["1. "]
if result_type == "to_do":
checked = result_obj.get("checked")
checkbox_prefix = "[x]" if checked else "[ ]"
if cur_result_text_arr:
cur_result_text_arr = [f"{checkbox_prefix} {cur_result_text_arr[0]}"] + cur_result_text_arr[1:]
else:
cur_result_text_arr = [checkbox_prefix]
if result_type in {"file", "image", "pdf", "video", "audio"}:
file_url, file_name, caption = self._extract_file_metadata(result_obj, result_block_id)
if file_url:
attachment_doc = self._build_attachment_document(
block_id=result_block_id,
url=file_url,
name=file_name,
caption=caption,
page_last_edited_time=page_last_edited_time,
)
if attachment_doc:
attachments.append(attachment_doc)
attachment_label = caption or file_name
if attachment_label:
cur_result_text_arr.append(f"{result_type.capitalize()}: {attachment_label}")
if result["has_children"]: if result["has_children"]:
if result_type == "child_page": if result_type == "child_page":
child_pages.append(result_block_id) child_pages.append(result_block_id)
else: else:
logging.debug(f"Entering sub-block: {result_block_id}") logging.debug(f"[Notion]: Entering sub-block: {result_block_id}")
subblocks, subblock_child_pages = self._read_blocks(result_block_id) subblocks, subblock_child_pages, subblock_attachments = self._read_blocks(result_block_id, page_last_edited_time)
logging.debug(f"Finished sub-block: {result_block_id}") logging.debug(f"[Notion]: Finished sub-block: {result_block_id}")
result_blocks.extend(subblocks) result_blocks.extend(subblocks)
child_pages.extend(subblock_child_pages) child_pages.extend(subblock_child_pages)
attachments.extend(subblock_attachments)
if result_type == "child_database": if result_type == "child_database":
inner_blocks, inner_child_pages = self._read_pages_from_database(result_block_id) inner_blocks, inner_child_pages = self._read_pages_from_database(result_block_id)
@ -231,7 +409,7 @@ class NotionConnector(LoadConnector, PollConnector):
cursor = data["next_cursor"] cursor = data["next_cursor"]
return result_blocks, child_pages return result_blocks, child_pages, attachments
def _read_page_title(self, page: NotionPage) -> Optional[str]: def _read_page_title(self, page: NotionPage) -> Optional[str]:
"""Extracts the title from a Notion page.""" """Extracts the title from a Notion page."""
@ -245,9 +423,7 @@ class NotionConnector(LoadConnector, PollConnector):
return None return None
def _read_pages( def _read_pages(self, pages: list[NotionPage], start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None) -> Generator[Document, None, None]:
self, pages: list[NotionPage]
) -> Generator[Document, None, None]:
"""Reads pages for rich text content and generates Documents.""" """Reads pages for rich text content and generates Documents."""
all_child_page_ids: list[str] = [] all_child_page_ids: list[str] = []
@ -255,11 +431,17 @@ class NotionConnector(LoadConnector, PollConnector):
if isinstance(page, dict): if isinstance(page, dict):
page = NotionPage(**page) page = NotionPage(**page)
if page.id in self.indexed_pages: if page.id in self.indexed_pages:
logging.debug(f"Already indexed page with ID '{page.id}'. Skipping.") logging.debug(f"[Notion]: Already indexed page with ID {page.id}. Skipping.")
continue continue
logging.info(f"Reading page with ID '{page.id}', with url {page.url}") if start is not None and end is not None:
page_blocks, child_page_ids = self._read_blocks(page.id) page_ts = datetime_from_string(page.last_edited_time).timestamp()
if not (page_ts > start and page_ts <= end):
logging.debug(f"[Notion]: Skipping page {page.id} outside polling window.")
continue
logging.info(f"[Notion]: Reading page with ID {page.id}, with url {page.url}")
page_blocks, child_page_ids, attachment_docs = self._read_blocks(page.id, page.last_edited_time)
all_child_page_ids.extend(child_page_ids) all_child_page_ids.extend(child_page_ids)
self.indexed_pages.add(page.id) self.indexed_pages.add(page.id)
@ -268,14 +450,12 @@ class NotionConnector(LoadConnector, PollConnector):
if not page_blocks: if not page_blocks:
if not raw_page_title: if not raw_page_title:
logging.warning(f"No blocks OR title found for page with ID '{page.id}'. Skipping.") logging.warning(f"[Notion]: No blocks OR title found for page with ID {page.id}. Skipping.")
continue continue
text = page_title text = page_title
if page.properties: if page.properties:
text += "\n\n" + "\n".join( text += "\n\n" + "\n".join([f"{key}: {value}" for key, value in page.properties.items()])
[f"{key}: {value}" for key, value in page.properties.items()]
)
sections = [TextSection(link=page.url, text=text)] sections = [TextSection(link=page.url, text=text)]
else: else:
sections = [ sections = [
@ -286,45 +466,39 @@ class NotionConnector(LoadConnector, PollConnector):
for block in page_blocks for block in page_blocks
] ]
blob = ("\n".join([sec.text for sec in sections])).encode("utf-8") joined_text = "\n".join(sec.text for sec in sections)
blob = joined_text.encode("utf-8")
yield Document( yield Document(
id=page.id, id=page.id, blob=blob, source=DocumentSource.NOTION, semantic_identifier=page_title, extension=".txt", size_bytes=len(blob), doc_updated_at=datetime_from_string(page.last_edited_time)
blob=blob,
source=DocumentSource.NOTION,
semantic_identifier=page_title,
extension=".txt",
size_bytes=len(blob),
doc_updated_at=datetime_from_string(page.last_edited_time)
) )
for attachment_doc in attachment_docs:
yield attachment_doc
if self.recursive_index_enabled and all_child_page_ids: if self.recursive_index_enabled and all_child_page_ids:
for child_page_batch_ids in batch_generator(all_child_page_ids, INDEX_BATCH_SIZE): for child_page_batch_ids in batch_generator(all_child_page_ids, INDEX_BATCH_SIZE):
child_page_batch = [ child_page_batch = [self._fetch_page(page_id) for page_id in child_page_batch_ids if page_id not in self.indexed_pages]
self._fetch_page(page_id) yield from self._read_pages(child_page_batch, start, end)
for page_id in child_page_batch_ids
if page_id not in self.indexed_pages
]
yield from self._read_pages(child_page_batch)
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _search_notion(self, query_dict: dict[str, Any]) -> NotionSearchResponse: def _search_notion(self, query_dict: dict[str, Any]) -> NotionSearchResponse:
"""Search for pages from a Notion database.""" """Search for pages from a Notion database."""
logging.debug(f"Searching for pages in Notion with query_dict: {query_dict}") logging.debug(f"[Notion]: Searching for pages in Notion with query_dict: {query_dict}")
data = fetch_notion_data("https://api.notion.com/v1/search", self.headers, "POST", query_dict) data = fetch_notion_data("https://api.notion.com/v1/search", self.headers, "POST", query_dict)
return NotionSearchResponse(**data) return NotionSearchResponse(**data)
def _recursive_load(self) -> Generator[list[Document], None, None]: def _recursive_load(self, start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None) -> Generator[list[Document], None, None]:
"""Recursively load pages starting from root page ID.""" """Recursively load pages starting from root page ID."""
if self.root_page_id is None or not self.recursive_index_enabled: if self.root_page_id is None or not self.recursive_index_enabled:
raise RuntimeError("Recursive page lookup is not enabled") raise RuntimeError("Recursive page lookup is not enabled")
logging.info(f"Recursively loading pages from Notion based on root page with ID: {self.root_page_id}") logging.info(f"[Notion]: Recursively loading pages from Notion based on root page with ID: {self.root_page_id}")
pages = [self._fetch_page(page_id=self.root_page_id)] pages = [self._fetch_page(page_id=self.root_page_id)]
yield from batch_generator(self._read_pages(pages), self.batch_size) yield from batch_generator(self._read_pages(pages, start, end), self.batch_size)
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Applies integration token to headers.""" """Applies integration token to headers."""
self.headers["Authorization"] = f'Bearer {credentials["notion_integration_token"]}' self.headers["Authorization"] = f"Bearer {credentials['notion_integration_token']}"
return None return None
def load_from_state(self) -> GenerateDocumentsOutput: def load_from_state(self) -> GenerateDocumentsOutput:
@ -348,12 +522,10 @@ class NotionConnector(LoadConnector, PollConnector):
else: else:
break break
def poll_source( def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput:
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
"""Poll Notion for updated pages within a time period.""" """Poll Notion for updated pages within a time period."""
if self.recursive_index_enabled and self.root_page_id: if self.recursive_index_enabled and self.root_page_id:
yield from self._recursive_load() yield from self._recursive_load(start, end)
return return
query_dict = { query_dict = {
@ -367,7 +539,7 @@ class NotionConnector(LoadConnector, PollConnector):
pages = filter_pages_by_time(db_res.results, start, end, "last_edited_time") pages = filter_pages_by_time(db_res.results, start, end, "last_edited_time")
if pages: if pages:
yield from batch_generator(self._read_pages(pages), self.batch_size) yield from batch_generator(self._read_pages(pages, start, end), self.batch_size)
if db_res.has_more: if db_res.has_more:
query_dict["start_cursor"] = db_res.next_cursor query_dict["start_cursor"] = db_res.next_cursor
else: else: