diff --git a/common/data_source/config.py b/common/data_source/config.py index 196d9ed3e..02684dbac 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -170,6 +170,10 @@ CONFLUENCE_TIMEZONE_OFFSET = float( os.environ.get("CONFLUENCE_TIMEZONE_OFFSET", get_current_tz_offset()) ) +CONFLUENCE_SYNC_TIME_BUFFER_SECONDS = int( + os.environ.get("CONFLUENCE_SYNC_TIME_BUFFER_SECONDS", ONE_DAY) +) + GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD = int( os.environ.get("GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024) ) diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index 56cf7865a..aed16ad2b 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -20,6 +20,7 @@ from requests.exceptions import HTTPError from common.data_source.config import INDEX_BATCH_SIZE, DocumentSource, CONTINUE_ON_CONNECTOR_FAILURE, \ CONFLUENCE_CONNECTOR_LABELS_TO_SKIP, CONFLUENCE_TIMEZONE_OFFSET, CONFLUENCE_CONNECTOR_USER_PROFILES_OVERRIDE, \ + CONFLUENCE_SYNC_TIME_BUFFER_SECONDS, \ OAUTH_CONFLUENCE_CLOUD_CLIENT_ID, OAUTH_CONFLUENCE_CLOUD_CLIENT_SECRET, _DEFAULT_PAGINATION_LIMIT, \ _PROBLEMATIC_EXPANSIONS, _REPLACEMENT_EXPANSIONS, _USER_NOT_FOUND, _COMMENT_EXPANSION_FIELDS, \ _ATTACHMENT_EXPANSION_FIELDS, _PAGE_EXPANSION_FIELDS, ONE_DAY, ONE_HOUR, _RESTRICTIONS_EXPANSION_FIELDS, \ @@ -1289,6 +1290,7 @@ class ConfluenceConnector( # pages. labels_to_skip: list[str] = CONFLUENCE_CONNECTOR_LABELS_TO_SKIP, timezone_offset: float = CONFLUENCE_TIMEZONE_OFFSET, + time_buffer_seconds: int = CONFLUENCE_SYNC_TIME_BUFFER_SECONDS, scoped_token: bool = False, ) -> None: self.wiki_base = wiki_base @@ -1300,6 +1302,7 @@ class ConfluenceConnector( self.batch_size = batch_size self.labels_to_skip = labels_to_skip self.timezone_offset = timezone_offset + self.time_buffer_seconds = max(0, time_buffer_seconds) self.scoped_token = scoped_token self._confluence_client: OnyxConfluence | None = None self._low_timeout_confluence_client: OnyxConfluence | None = None @@ -1356,6 +1359,24 @@ class ConfluenceConnector( logging.info(f"Setting allow_images to {value}.") self.allow_images = value + def _adjust_start_for_query( + self, start: SecondsSinceUnixEpoch | None + ) -> SecondsSinceUnixEpoch | None: + if not start or start <= 0: + return start + if self.time_buffer_seconds <= 0: + return start + return max(0.0, start - self.time_buffer_seconds) + + def _is_newer_than_start( + self, doc_time: datetime | None, start: SecondsSinceUnixEpoch | None + ) -> bool: + if not start or start <= 0: + return True + if doc_time is None: + return True + return doc_time.timestamp() > start + @property def confluence_client(self) -> OnyxConfluence: if self._confluence_client is None: @@ -1414,9 +1435,10 @@ class ConfluenceConnector( """ page_query = self.base_cql_page_query + self.cql_label_filter # Add time filters - if start: + query_start = self._adjust_start_for_query(start) + if query_start: formatted_start_time = datetime.fromtimestamp( - start, tz=self.timezone + query_start, tz=self.timezone ).strftime("%Y-%m-%d %H:%M") page_query += f" and lastmodified >= '{formatted_start_time}'" if end: @@ -1436,10 +1458,12 @@ class ConfluenceConnector( ) -> str: attachment_query = f"type=attachment and container='{confluence_page_id}'" attachment_query += self.cql_label_filter + # Add time filters to avoid reprocessing unchanged attachments during refresh - if start: + query_start = self._adjust_start_for_query(start) + if query_start: formatted_start_time = datetime.fromtimestamp( - start, tz=self.timezone + query_start, tz=self.timezone ).strftime("%Y-%m-%d %H:%M") attachment_query += f" and lastmodified >= '{formatted_start_time}'" if end: @@ -1447,6 +1471,7 @@ class ConfluenceConnector( "%Y-%m-%d %H:%M" ) attachment_query += f" and lastmodified <= '{formatted_end_time}'" + attachment_query += " order by lastmodified asc" return attachment_query @@ -1668,7 +1693,8 @@ class ConfluenceConnector( ), primary_owners=primary_owners, ) - attachment_docs.append(attachment_doc) + if self._is_newer_than_start(attachment_doc.doc_updated_at, start): + attachment_docs.append(attachment_doc) except Exception as e: logging.error( f"Failed to extract/summarize attachment {attachment['title']}", @@ -1729,7 +1755,8 @@ class ConfluenceConnector( continue # yield completed document (or failure) - yield doc_or_failure + if self._is_newer_than_start(doc_or_failure.doc_updated_at, start): + yield doc_or_failure # Now get attachments for that page: attachment_docs, attachment_failures = self._fetch_page_attachments(