diff --git a/common/constants.py b/common/constants.py index 23a755059..d99c09952 100644 --- a/common/constants.py +++ b/common/constants.py @@ -133,6 +133,7 @@ class FileSource(StrEnum): GITHUB = "github" GITLAB = "gitlab" IMAP = "imap" + ZENDESK = "zendesk" class PipelineTaskType(StrEnum): PARSE = "Parse" diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index 2619e779d..9fed196ab 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -39,6 +39,7 @@ from .moodle_connector import MoodleConnector from .airtable_connector import AirtableConnector from .asana_connector import AsanaConnector from .imap_connector import ImapConnector +from .zendesk_connector import ZendeskConnector from .config import BlobType, DocumentSource from .models import Document, TextSection, ImageSection, BasicExpertInfo from .exceptions import ( @@ -76,5 +77,6 @@ __all__ = [ "UnexpectedValidationError", "AirtableConnector", "AsanaConnector", - "ImapConnector" + "ImapConnector", + "ZendeskConnector", ] diff --git a/common/data_source/config.py b/common/data_source/config.py index bca13b5be..64b30a051 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -58,8 +58,9 @@ class DocumentSource(str, Enum): GITHUB = "github" GITLAB = "gitlab" IMAP = "imap" + ZENDESK = "zendesk" + - class FileOrigin(str, Enum): """File origins""" CONNECTOR = "connector" @@ -271,6 +272,10 @@ IMAP_CONNECTOR_SIZE_THRESHOLD = int( os.environ.get("IMAP_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024) ) +ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS = os.environ.get( + "ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS", "" +).split(",") + _USER_NOT_FOUND = "Unknown Confluence User" _COMMENT_EXPANSION_FIELDS = ["body.storage.value"] diff --git a/common/data_source/utils.py b/common/data_source/utils.py index f69ecbd78..da500f055 100644 --- a/common/data_source/utils.py +++ b/common/data_source/utils.py @@ -1149,3 +1149,101 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R future_to_index[executor.submit(_next_or_none, ind, gens[ind])] = next_ind next_ind += 1 del future_to_index[future] + +F = TypeVar("F", bound=Callable[..., Any]) + +class _RateLimitDecorator: + """Builds a generic wrapper/decorator for calls to external APIs that + prevents making more than `max_calls` requests per `period` + + Implementation inspired by the `ratelimit` library: + https://github.com/tomasbasham/ratelimit. + + NOTE: is not thread safe. + """ + + def __init__( + self, + max_calls: int, + period: float, # in seconds + sleep_time: float = 2, # in seconds + sleep_backoff: float = 2, # applies exponential backoff + max_num_sleep: int = 0, + ): + self.max_calls = max_calls + self.period = period + self.sleep_time = sleep_time + self.sleep_backoff = sleep_backoff + self.max_num_sleep = max_num_sleep + + self.call_history: list[float] = [] + self.curr_calls = 0 + + def __call__(self, func: F) -> F: + @wraps(func) + def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any: + # cleanup calls which are no longer relevant + self._cleanup() + + # check if we've exceeded the rate limit + sleep_cnt = 0 + while len(self.call_history) == self.max_calls: + sleep_time = self.sleep_time * (self.sleep_backoff**sleep_cnt) + logging.warning( + f"Rate limit exceeded for function {func.__name__}. " + f"Waiting {sleep_time} seconds before retrying." + ) + time.sleep(sleep_time) + sleep_cnt += 1 + if self.max_num_sleep != 0 and sleep_cnt >= self.max_num_sleep: + raise RateLimitTriedTooManyTimesError( + f"Exceeded '{self.max_num_sleep}' retries for function '{func.__name__}'" + ) + + self._cleanup() + + # add the current call to the call history + self.call_history.append(time.monotonic()) + return func(*args, **kwargs) + + return cast(F, wrapped_func) + + def _cleanup(self) -> None: + curr_time = time.monotonic() + time_to_expire_before = curr_time - self.period + self.call_history = [ + call_time + for call_time in self.call_history + if call_time > time_to_expire_before + ] + +rate_limit_builder = _RateLimitDecorator + +def retry_builder( + tries: int = 20, + delay: float = 0.1, + max_delay: float | None = 60, + backoff: float = 2, + jitter: tuple[float, float] | float = 1, + exceptions: type[Exception] | tuple[type[Exception], ...] = (Exception,), +) -> Callable[[F], F]: + """Builds a generic wrapper/decorator for calls to external APIs that + may fail due to rate limiting, flakes, or other reasons. Applies exponential + backoff with jitter to retry the call.""" + + def retry_with_default(func: F) -> F: + @retry( + tries=tries, + delay=delay, + max_delay=max_delay, + backoff=backoff, + jitter=jitter, + logger=logging.getLogger(__name__), + exceptions=exceptions, + ) + def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any: + return func(*args, **kwargs) + + return cast(F, wrapped_func) + + return retry_with_default \ No newline at end of file diff --git a/common/data_source/zendesk_connector.py b/common/data_source/zendesk_connector.py new file mode 100644 index 000000000..85b3426fe --- /dev/null +++ b/common/data_source/zendesk_connector.py @@ -0,0 +1,667 @@ +import copy +import logging +import time +from collections.abc import Callable +from collections.abc import Iterator +from typing import Any + +import requests +from pydantic import BaseModel +from requests.exceptions import HTTPError +from typing_extensions import override + +from common.data_source.config import ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS, DocumentSource +from common.data_source.exceptions import ConnectorValidationError, CredentialExpiredError, InsufficientPermissionsError +from common.data_source.html_utils import parse_html_page_basic +from common.data_source.interfaces import CheckpointOutput, CheckpointOutputWrapper, CheckpointedConnector, IndexingHeartbeatInterface, SlimConnectorWithPermSync +from common.data_source.models import BasicExpertInfo, ConnectorCheckpoint, ConnectorFailure, Document, DocumentFailure, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument +from common.data_source.utils import retry_builder, time_str_to_utc,rate_limit_builder + +MAX_PAGE_SIZE = 30 # Zendesk API maximum +MAX_AUTHOR_MAP_SIZE = 50_000 # Reset author map cache if it gets too large +_SLIM_BATCH_SIZE = 1000 + + +class ZendeskCredentialsNotSetUpError(PermissionError): + def __init__(self) -> None: + super().__init__( + "Zendesk Credentials are not set up, was load_credentials called?" + ) + + +class ZendeskClient: + def __init__( + self, + subdomain: str, + email: str, + token: str, + calls_per_minute: int | None = None, + ): + self.base_url = f"https://{subdomain}.zendesk.com/api/v2" + self.auth = (f"{email}/token", token) + self.make_request = request_with_rate_limit(self, calls_per_minute) + + +def request_with_rate_limit( + client: ZendeskClient, max_calls_per_minute: int | None = None +) -> Callable[[str, dict[str, Any]], dict[str, Any]]: + @retry_builder() + @( + rate_limit_builder(max_calls=max_calls_per_minute, period=60) + if max_calls_per_minute + else lambda x: x + ) + def make_request(endpoint: str, params: dict[str, Any]) -> dict[str, Any]: + response = requests.get( + f"{client.base_url}/{endpoint}", auth=client.auth, params=params + ) + + if response.status_code == 429: + retry_after = response.headers.get("Retry-After") + if retry_after is not None: + # Sleep for the duration indicated by the Retry-After header + time.sleep(int(retry_after)) + + elif ( + response.status_code == 403 + and response.json().get("error") == "SupportProductInactive" + ): + return response.json() + + response.raise_for_status() + return response.json() + + return make_request + + +class ZendeskPageResponse(BaseModel): + data: list[dict[str, Any]] + meta: dict[str, Any] + has_more: bool + + +def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]: + content_tags: dict[str, str] = {} + params = {"page[size]": MAX_PAGE_SIZE} + + try: + while True: + data = client.make_request("guide/content_tags", params) + + for tag in data.get("records", []): + content_tags[tag["id"]] = tag["name"] + + # Check if there are more pages + if data.get("meta", {}).get("has_more", False): + params["page[after]"] = data["meta"]["after_cursor"] + else: + break + + return content_tags + except Exception as e: + raise Exception(f"Error fetching content tags: {str(e)}") + + +def _get_articles( + client: ZendeskClient, start_time: int | None = None, page_size: int = MAX_PAGE_SIZE +) -> Iterator[dict[str, Any]]: + params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"} + if start_time is not None: + params["start_time"] = start_time + + while True: + data = client.make_request("help_center/articles", params) + for article in data["articles"]: + yield article + + if not data.get("meta", {}).get("has_more"): + break + params["page[after]"] = data["meta"]["after_cursor"] + + +def _get_article_page( + client: ZendeskClient, + start_time: int | None = None, + after_cursor: str | None = None, + page_size: int = MAX_PAGE_SIZE, +) -> ZendeskPageResponse: + params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"} + if start_time is not None: + params["start_time"] = start_time + if after_cursor is not None: + params["page[after]"] = after_cursor + + data = client.make_request("help_center/articles", params) + return ZendeskPageResponse( + data=data["articles"], + meta=data["meta"], + has_more=bool(data["meta"].get("has_more", False)), + ) + + +def _get_tickets( + client: ZendeskClient, start_time: int | None = None +) -> Iterator[dict[str, Any]]: + params = {"start_time": start_time or 0} + + while True: + data = client.make_request("incremental/tickets.json", params) + for ticket in data["tickets"]: + yield ticket + + if not data.get("end_of_stream", False): + params["start_time"] = data["end_time"] + else: + break + + +# TODO: maybe these don't need to be their own functions? +def _get_tickets_page( + client: ZendeskClient, start_time: int | None = None +) -> ZendeskPageResponse: + params = {"start_time": start_time or 0} + + # NOTE: for some reason zendesk doesn't seem to be respecting the start_time param + # in my local testing with very few tickets. We'll look into it if this becomes an + # issue in larger deployments + data = client.make_request("incremental/tickets.json", params) + if data.get("error") == "SupportProductInactive": + raise ValueError( + "Zendesk Support Product is not active for this account, No tickets to index" + ) + return ZendeskPageResponse( + data=data["tickets"], + meta={"end_time": data["end_time"]}, + has_more=not bool(data.get("end_of_stream", False)), + ) + + +def _fetch_author( + client: ZendeskClient, author_id: str | int +) -> BasicExpertInfo | None: + # Skip fetching if author_id is invalid + # cast to str to avoid issues with zendesk changing their types + if not author_id or str(author_id) == "-1": + return None + + try: + author_data = client.make_request(f"users/{author_id}", {}) + user = author_data.get("user") + return ( + BasicExpertInfo(display_name=user.get("name"), email=user.get("email")) + if user and user.get("name") and user.get("email") + else None + ) + except requests.exceptions.HTTPError: + # Handle any API errors gracefully + return None + + +def _article_to_document( + article: dict[str, Any], + content_tags: dict[str, str], + author_map: dict[str, BasicExpertInfo], + client: ZendeskClient, +) -> tuple[dict[str, BasicExpertInfo] | None, Document]: + author_id = article.get("author_id") + if not author_id: + author = None + else: + author = ( + author_map.get(author_id) + if author_id in author_map + else _fetch_author(client, author_id) + ) + + new_author_mapping = {author_id: author} if author_id and author else None + + updated_at = article.get("updated_at") + update_time = time_str_to_utc(updated_at) if updated_at else None + + text = parse_html_page_basic(article.get("body") or "") + blob = text.encode("utf-8", errors="replace") + # Build metadata + metadata: dict[str, str | list[str]] = { + "labels": [str(label) for label in article.get("label_names", []) if label], + "content_tags": [ + content_tags[tag_id] + for tag_id in article.get("content_tag_ids", []) + if tag_id in content_tags + ], + } + + # Remove empty values + metadata = {k: v for k, v in metadata.items() if v} + + return new_author_mapping, Document( + id=f"article:{article['id']}", + source=DocumentSource.ZENDESK, + semantic_identifier=article["title"], + extension=".txt", + blob=blob, + size_bytes=len(blob), + doc_updated_at=update_time, + primary_owners=[author] if author else None, + metadata=metadata, + ) + + +def _get_comment_text( + comment: dict[str, Any], + author_map: dict[str, BasicExpertInfo], + client: ZendeskClient, +) -> tuple[dict[str, BasicExpertInfo] | None, str]: + author_id = comment.get("author_id") + if not author_id: + author = None + else: + author = ( + author_map.get(author_id) + if author_id in author_map + else _fetch_author(client, author_id) + ) + + new_author_mapping = {author_id: author} if author_id and author else None + + comment_text = f"Comment{' by ' + author.display_name if author and author.display_name else ''}" + comment_text += f"{' at ' + comment['created_at'] if comment.get('created_at') else ''}:\n{comment['body']}" + + return new_author_mapping, comment_text + + +def _ticket_to_document( + ticket: dict[str, Any], + author_map: dict[str, BasicExpertInfo], + client: ZendeskClient, +) -> tuple[dict[str, BasicExpertInfo] | None, Document]: + submitter_id = ticket.get("submitter") + if not submitter_id: + submitter = None + else: + submitter = ( + author_map.get(submitter_id) + if submitter_id in author_map + else _fetch_author(client, submitter_id) + ) + + new_author_mapping = ( + {submitter_id: submitter} if submitter_id and submitter else None + ) + + updated_at = ticket.get("updated_at") + update_time = time_str_to_utc(updated_at) if updated_at else None + + metadata: dict[str, str | list[str]] = {} + if status := ticket.get("status"): + metadata["status"] = status + if priority := ticket.get("priority"): + metadata["priority"] = priority + if tags := ticket.get("tags"): + metadata["tags"] = tags + if ticket_type := ticket.get("type"): + metadata["ticket_type"] = ticket_type + + # Fetch comments for the ticket + comments_data = client.make_request(f"tickets/{ticket.get('id')}/comments", {}) + comments = comments_data.get("comments", []) + + comment_texts = [] + for comment in comments: + new_author_mapping, comment_text = _get_comment_text( + comment, author_map, client + ) + if new_author_mapping: + author_map.update(new_author_mapping) + comment_texts.append(comment_text) + + comments_text = "\n\n".join(comment_texts) + + subject = ticket.get("subject") + full_text = f"Ticket Subject:\n{subject}\n\nComments:\n{comments_text}" + + blob = full_text.encode("utf-8", errors="replace") + return new_author_mapping, Document( + id=f"zendesk_ticket_{ticket['id']}", + blob=blob, + extension=".txt", + size_bytes=len(blob), + source=DocumentSource.ZENDESK, + semantic_identifier=f"Ticket #{ticket['id']}: {subject or 'No Subject'}", + doc_updated_at=update_time, + primary_owners=[submitter] if submitter else None, + metadata=metadata, + ) + + +class ZendeskConnectorCheckpoint(ConnectorCheckpoint): + # We use cursor-based paginated retrieval for articles + after_cursor_articles: str | None + + # We use timestamp-based paginated retrieval for tickets + next_start_time_tickets: int | None + + cached_author_map: dict[str, BasicExpertInfo] | None + cached_content_tags: dict[str, str] | None + + +class ZendeskConnector( + SlimConnectorWithPermSync, CheckpointedConnector[ZendeskConnectorCheckpoint] +): + def __init__( + self, + content_type: str = "articles", + calls_per_minute: int | None = None, + ) -> None: + self.content_type = content_type + self.subdomain = "" + # Fetch all tags ahead of time + self.content_tags: dict[str, str] = {} + self.calls_per_minute = calls_per_minute + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + # Subdomain is actually the whole URL + subdomain = ( + credentials["zendesk_subdomain"] + .replace("https://", "") + .split(".zendesk.com")[0] + ) + self.subdomain = subdomain + + self.client = ZendeskClient( + subdomain, + credentials["zendesk_email"], + credentials["zendesk_token"], + calls_per_minute=self.calls_per_minute, + ) + return None + + @override + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ZendeskConnectorCheckpoint, + ) -> CheckpointOutput[ZendeskConnectorCheckpoint]: + if self.client is None: + raise ZendeskCredentialsNotSetUpError() + if checkpoint.cached_content_tags is None: + checkpoint.cached_content_tags = _get_content_tag_mapping(self.client) + return checkpoint # save the content tags to the checkpoint + self.content_tags = checkpoint.cached_content_tags + + if self.content_type == "articles": + checkpoint = yield from self._retrieve_articles(start, end, checkpoint) + return checkpoint + elif self.content_type == "tickets": + checkpoint = yield from self._retrieve_tickets(start, end, checkpoint) + return checkpoint + else: + raise ValueError(f"Unsupported content_type: {self.content_type}") + + def _retrieve_articles( + self, + start: SecondsSinceUnixEpoch | None, + end: SecondsSinceUnixEpoch | None, + checkpoint: ZendeskConnectorCheckpoint, + ) -> CheckpointOutput[ZendeskConnectorCheckpoint]: + checkpoint = copy.deepcopy(checkpoint) + # This one is built on the fly as there may be more many more authors than tags + author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {} + after_cursor = checkpoint.after_cursor_articles + doc_batch: list[Document] = [] + + response = _get_article_page( + self.client, + start_time=int(start) if start else None, + after_cursor=after_cursor, + ) + articles = response.data + has_more = response.has_more + after_cursor = response.meta.get("after_cursor") + for article in articles: + if ( + article.get("body") is None + or article.get("draft") + or any( + label in ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS + for label in article.get("label_names", []) + ) + ): + continue + + try: + new_author_map, document = _article_to_document( + article, self.content_tags, author_map, self.client + ) + except Exception as e: + logging.error(f"Error processing article {article['id']}: {e}") + yield ConnectorFailure( + failed_document=DocumentFailure( + document_id=f"{article.get('id')}", + document_link=article.get("html_url", ""), + ), + failure_message=str(e), + exception=e, + ) + continue + + if new_author_map: + author_map.update(new_author_map) + updated_at = document.doc_updated_at + updated_ts = updated_at.timestamp() if updated_at else None + if updated_ts is not None: + if start is not None and updated_ts <= start: + continue + if end is not None and updated_ts > end: + continue + + doc_batch.append(document) + + if not has_more: + yield from doc_batch + checkpoint.has_more = False + return checkpoint + + # Sometimes no documents are retrieved, but the cursor + # is still updated so the connector makes progress. + yield from doc_batch + checkpoint.after_cursor_articles = after_cursor + + last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None + checkpoint.has_more = bool( + end is None + or last_doc_updated_at is None + or last_doc_updated_at.timestamp() <= end + ) + checkpoint.cached_author_map = ( + author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None + ) + return checkpoint + + def _retrieve_tickets( + self, + start: SecondsSinceUnixEpoch | None, + end: SecondsSinceUnixEpoch | None, + checkpoint: ZendeskConnectorCheckpoint, + ) -> CheckpointOutput[ZendeskConnectorCheckpoint]: + checkpoint = copy.deepcopy(checkpoint) + if self.client is None: + raise ZendeskCredentialsNotSetUpError() + + author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {} + + doc_batch: list[Document] = [] + next_start_time = int(checkpoint.next_start_time_tickets or start or 0) + ticket_response = _get_tickets_page(self.client, start_time=next_start_time) + + tickets = ticket_response.data + has_more = ticket_response.has_more + next_start_time = ticket_response.meta["end_time"] + for ticket in tickets: + if ticket.get("status") == "deleted": + continue + + try: + new_author_map, document = _ticket_to_document( + ticket=ticket, + author_map=author_map, + client=self.client, + ) + except Exception as e: + logging.error(f"Error processing ticket {ticket['id']}: {e}") + yield ConnectorFailure( + failed_document=DocumentFailure( + document_id=f"{ticket.get('id')}", + document_link=ticket.get("url", ""), + ), + failure_message=str(e), + exception=e, + ) + continue + + if new_author_map: + author_map.update(new_author_map) + + updated_at = document.doc_updated_at + updated_ts = updated_at.timestamp() if updated_at else None + + if updated_ts is not None: + if start is not None and updated_ts <= start: + continue + if end is not None and updated_ts > end: + continue + + doc_batch.append(document) + + if not has_more: + yield from doc_batch + checkpoint.has_more = False + return checkpoint + + yield from doc_batch + checkpoint.next_start_time_tickets = next_start_time + last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None + checkpoint.has_more = bool( + end is None + or last_doc_updated_at is None + or last_doc_updated_at.timestamp() <= end + ) + checkpoint.cached_author_map = ( + author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None + ) + return checkpoint + + def retrieve_all_slim_docs_perm_sync( + self, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + callback: IndexingHeartbeatInterface | None = None, + ) -> GenerateSlimDocumentOutput: + slim_doc_batch: list[SlimDocument] = [] + if self.content_type == "articles": + articles = _get_articles( + self.client, start_time=int(start) if start else None + ) + for article in articles: + slim_doc_batch.append( + SlimDocument( + id=f"article:{article['id']}", + ) + ) + if len(slim_doc_batch) >= _SLIM_BATCH_SIZE: + yield slim_doc_batch + slim_doc_batch = [] + elif self.content_type == "tickets": + tickets = _get_tickets( + self.client, start_time=int(start) if start else None + ) + for ticket in tickets: + slim_doc_batch.append( + SlimDocument( + id=f"zendesk_ticket_{ticket['id']}", + ) + ) + if len(slim_doc_batch) >= _SLIM_BATCH_SIZE: + yield slim_doc_batch + slim_doc_batch = [] + else: + raise ValueError(f"Unsupported content_type: {self.content_type}") + if slim_doc_batch: + yield slim_doc_batch + + @override + def validate_connector_settings(self) -> None: + if self.client is None: + raise ZendeskCredentialsNotSetUpError() + + try: + _get_article_page(self.client, start_time=0) + except HTTPError as e: + # Check for HTTP status codes + if e.response.status_code == 401: + raise CredentialExpiredError( + "Your Zendesk credentials appear to be invalid or expired (HTTP 401)." + ) from e + elif e.response.status_code == 403: + raise InsufficientPermissionsError( + "Your Zendesk token does not have sufficient permissions (HTTP 403)." + ) from e + elif e.response.status_code == 404: + raise ConnectorValidationError( + "Zendesk resource not found (HTTP 404)." + ) from e + else: + raise ConnectorValidationError( + f"Unexpected Zendesk error (status={e.response.status_code}): {e}" + ) from e + + @override + def validate_checkpoint_json( + self, checkpoint_json: str + ) -> ZendeskConnectorCheckpoint: + return ZendeskConnectorCheckpoint.model_validate_json(checkpoint_json) + + @override + def build_dummy_checkpoint(self) -> ZendeskConnectorCheckpoint: + return ZendeskConnectorCheckpoint( + after_cursor_articles=None, + next_start_time_tickets=None, + cached_author_map=None, + cached_content_tags=None, + has_more=True, + ) + + +if __name__ == "__main__": + import os + + connector = ZendeskConnector(content_type="articles") + connector.load_credentials( + { + "zendesk_subdomain": os.environ["ZENDESK_SUBDOMAIN"], + "zendesk_email": os.environ["ZENDESK_EMAIL"], + "zendesk_token": os.environ["ZENDESK_TOKEN"], + } + ) + + current = time.time() + one_day_ago = current - 24 * 60 * 60 # 1 day + + checkpoint = connector.build_dummy_checkpoint() + + while checkpoint.has_more: + gen = connector.load_from_checkpoint( + one_day_ago, current, checkpoint + ) + + wrapper = CheckpointOutputWrapper() + any_doc = False + + for document, failure, next_checkpoint in wrapper(gen): + if document: + print("got document:", document.id) + any_doc = True + + checkpoint = next_checkpoint + if any_doc: + break \ No newline at end of file diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index c8a2fa9e0..8a4a6ab8a 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -49,7 +49,8 @@ from common.data_source import ( WebDAVConnector, AirtableConnector, AsanaConnector, - ImapConnector + ImapConnector, + ZendeskConnector, ) from common.constants import FileSource, TaskStatus from common.data_source.config import INDEX_BATCH_SIZE @@ -915,7 +916,7 @@ class Github(SyncBase): ) return async_wrapper() - + class IMAP(SyncBase): SOURCE_NAME: str = FileSource.IMAP @@ -971,6 +972,10 @@ class IMAP(SyncBase): if pending_docs: yield pending_docs + async def async_wrapper(): + for batch in document_batches(): + yield batch + logging.info( "Connect to IMAP: host(%s) port(%s) user(%s) folder(%s) %s", self.conf["imap_host"], @@ -979,7 +984,87 @@ class IMAP(SyncBase): self.conf["imap_mailbox"], begin_info ) - return document_batches() + return async_wrapper() + +class Zendesk(SyncBase): + + SOURCE_NAME: str = FileSource.ZENDESK + async def _generate(self, task: dict): + self.connector = ZendeskConnector(content_type=self.conf.get("zendesk_content_type")) + self.connector.load_credentials(self.conf["credentials"]) + + end_time = datetime.now(timezone.utc).timestamp() + if task["reindex"] == "1" or not task.get("poll_range_start"): + start_time = 0 + begin_info = "totally" + else: + start_time = task["poll_range_start"].timestamp() + begin_info = f"from {task['poll_range_start']}" + + raw_batch_size = ( + self.conf.get("sync_batch_size") + or self.conf.get("batch_size") + or INDEX_BATCH_SIZE + ) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + def document_batches(): + checkpoint = self.connector.build_dummy_checkpoint() + pending_docs = [] + iterations = 0 + iteration_limit = 100_000 + + while checkpoint.has_more: + wrapper = CheckpointOutputWrapper() + doc_generator = wrapper( + self.connector.load_from_checkpoint( + start_time, end_time, checkpoint + ) + ) + + for document, failure, next_checkpoint in doc_generator: + if failure is not None: + logging.warning( + "Zendesk connector failure: %s", + getattr(failure, "failure_message", failure), + ) + continue + + if document is not None: + pending_docs.append(document) + if len(pending_docs) >= batch_size: + yield pending_docs + pending_docs = [] + + if next_checkpoint is not None: + checkpoint = next_checkpoint + + iterations += 1 + if iterations > iteration_limit: + raise RuntimeError( + "Too many iterations while loading Zendesk documents." + ) + + if pending_docs: + yield pending_docs + + async def async_wrapper(): + for batch in document_batches(): + yield batch + + logging.info( + "Connect to Zendesk: subdomain(%s) %s", + self.conf['credentials'].get("zendesk_subdomain"), + begin_info, + ) + + return async_wrapper() class Gitlab(SyncBase): @@ -1043,6 +1128,7 @@ func_factory = { FileSource.AIRTABLE: Airtable, FileSource.ASANA: Asana, FileSource.IMAP: IMAP, + FileSource.ZENDESK: Zendesk, FileSource.GITHUB: Github, FileSource.GITLAB: Gitlab, } diff --git a/web/src/assets/svg/data-source/zendesk.svg b/web/src/assets/svg/data-source/zendesk.svg new file mode 100644 index 000000000..cc7edc68c --- /dev/null +++ b/web/src/assets/svg/data-source/zendesk.svg @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 9538b5650..804ecc2eb 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -29,6 +29,7 @@ export enum DataSourceKey { ASANA = 'asana', IMAP = 'imap', GITHUB = 'github', + ZENDESK = 'zendesk', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', // TEAMS = 'teams', @@ -133,6 +134,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.IMAP}Description`), icon: , }, + [DataSourceKey.ZENDESK]: { + name: 'Zendesk', + description: t(`setting.${DataSourceKey.ZENDESK}Description`), + icon: , + }, }; }; @@ -822,6 +828,36 @@ export const DataSourceFormFields = { required: false, }, ], + [DataSourceKey.ZENDESK]: [ + { + label: 'Zendesk Domain', + name: 'config.credentials.zendesk_subdomain', + type: FormFieldType.Text, + required: true, + }, + { + label: 'Zendesk Email', + name: 'config.credentials.zendesk_email', + type: FormFieldType.Text, + required: true, + }, + { + label: 'Zendesk Token', + name: 'config.credentials.zendesk_token', + type: FormFieldType.Password, + required: true, + }, + { + label: 'Content', + name: 'config.zendesk_content_type', + type: FormFieldType.Segmented, + required: true, + options: [ + { label: 'Articles', value: 'articles' }, + { label: 'Tickets', value: 'tickets' }, + ], + }, + ], }; export const DataSourceFormDefaultValues = { @@ -1076,4 +1112,17 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.ZENDESK]: { + name: '', + source: DataSourceKey.ZENDESK, + config: { + name: '', + zendesk_content_type: 'articles', + credentials: { + zendesk_subdomain: '', + zendesk_email: '', + zendesk_token: '', + }, + }, + }, };