mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-01-01 17:45:28 +08:00
Feat: add Zendesk data source integration with configuration and sync capabilities (#12344)
### What problem does this PR solve? issue: #12313 change: add Zendesk data source integration with configuration and sync capabilities ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@ -133,6 +133,7 @@ class FileSource(StrEnum):
|
||||
GITHUB = "github"
|
||||
GITLAB = "gitlab"
|
||||
IMAP = "imap"
|
||||
ZENDESK = "zendesk"
|
||||
|
||||
class PipelineTaskType(StrEnum):
|
||||
PARSE = "Parse"
|
||||
|
||||
@ -39,6 +39,7 @@ from .moodle_connector import MoodleConnector
|
||||
from .airtable_connector import AirtableConnector
|
||||
from .asana_connector import AsanaConnector
|
||||
from .imap_connector import ImapConnector
|
||||
from .zendesk_connector import ZendeskConnector
|
||||
from .config import BlobType, DocumentSource
|
||||
from .models import Document, TextSection, ImageSection, BasicExpertInfo
|
||||
from .exceptions import (
|
||||
@ -76,5 +77,6 @@ __all__ = [
|
||||
"UnexpectedValidationError",
|
||||
"AirtableConnector",
|
||||
"AsanaConnector",
|
||||
"ImapConnector"
|
||||
"ImapConnector",
|
||||
"ZendeskConnector",
|
||||
]
|
||||
|
||||
@ -58,8 +58,9 @@ class DocumentSource(str, Enum):
|
||||
GITHUB = "github"
|
||||
GITLAB = "gitlab"
|
||||
IMAP = "imap"
|
||||
ZENDESK = "zendesk"
|
||||
|
||||
|
||||
|
||||
class FileOrigin(str, Enum):
|
||||
"""File origins"""
|
||||
CONNECTOR = "connector"
|
||||
@ -271,6 +272,10 @@ IMAP_CONNECTOR_SIZE_THRESHOLD = int(
|
||||
os.environ.get("IMAP_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024)
|
||||
)
|
||||
|
||||
ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS = os.environ.get(
|
||||
"ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS", ""
|
||||
).split(",")
|
||||
|
||||
_USER_NOT_FOUND = "Unknown Confluence User"
|
||||
|
||||
_COMMENT_EXPANSION_FIELDS = ["body.storage.value"]
|
||||
|
||||
@ -1149,3 +1149,101 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R
|
||||
future_to_index[executor.submit(_next_or_none, ind, gens[ind])] = next_ind
|
||||
next_ind += 1
|
||||
del future_to_index[future]
|
||||
|
||||
F = TypeVar("F", bound=Callable[..., Any])
|
||||
|
||||
class _RateLimitDecorator:
|
||||
"""Builds a generic wrapper/decorator for calls to external APIs that
|
||||
prevents making more than `max_calls` requests per `period`
|
||||
|
||||
Implementation inspired by the `ratelimit` library:
|
||||
https://github.com/tomasbasham/ratelimit.
|
||||
|
||||
NOTE: is not thread safe.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_calls: int,
|
||||
period: float, # in seconds
|
||||
sleep_time: float = 2, # in seconds
|
||||
sleep_backoff: float = 2, # applies exponential backoff
|
||||
max_num_sleep: int = 0,
|
||||
):
|
||||
self.max_calls = max_calls
|
||||
self.period = period
|
||||
self.sleep_time = sleep_time
|
||||
self.sleep_backoff = sleep_backoff
|
||||
self.max_num_sleep = max_num_sleep
|
||||
|
||||
self.call_history: list[float] = []
|
||||
self.curr_calls = 0
|
||||
|
||||
def __call__(self, func: F) -> F:
|
||||
@wraps(func)
|
||||
def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any:
|
||||
# cleanup calls which are no longer relevant
|
||||
self._cleanup()
|
||||
|
||||
# check if we've exceeded the rate limit
|
||||
sleep_cnt = 0
|
||||
while len(self.call_history) == self.max_calls:
|
||||
sleep_time = self.sleep_time * (self.sleep_backoff**sleep_cnt)
|
||||
logging.warning(
|
||||
f"Rate limit exceeded for function {func.__name__}. "
|
||||
f"Waiting {sleep_time} seconds before retrying."
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
sleep_cnt += 1
|
||||
if self.max_num_sleep != 0 and sleep_cnt >= self.max_num_sleep:
|
||||
raise RateLimitTriedTooManyTimesError(
|
||||
f"Exceeded '{self.max_num_sleep}' retries for function '{func.__name__}'"
|
||||
)
|
||||
|
||||
self._cleanup()
|
||||
|
||||
# add the current call to the call history
|
||||
self.call_history.append(time.monotonic())
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return cast(F, wrapped_func)
|
||||
|
||||
def _cleanup(self) -> None:
|
||||
curr_time = time.monotonic()
|
||||
time_to_expire_before = curr_time - self.period
|
||||
self.call_history = [
|
||||
call_time
|
||||
for call_time in self.call_history
|
||||
if call_time > time_to_expire_before
|
||||
]
|
||||
|
||||
rate_limit_builder = _RateLimitDecorator
|
||||
|
||||
def retry_builder(
|
||||
tries: int = 20,
|
||||
delay: float = 0.1,
|
||||
max_delay: float | None = 60,
|
||||
backoff: float = 2,
|
||||
jitter: tuple[float, float] | float = 1,
|
||||
exceptions: type[Exception] | tuple[type[Exception], ...] = (Exception,),
|
||||
) -> Callable[[F], F]:
|
||||
"""Builds a generic wrapper/decorator for calls to external APIs that
|
||||
may fail due to rate limiting, flakes, or other reasons. Applies exponential
|
||||
backoff with jitter to retry the call."""
|
||||
|
||||
def retry_with_default(func: F) -> F:
|
||||
@retry(
|
||||
tries=tries,
|
||||
delay=delay,
|
||||
max_delay=max_delay,
|
||||
backoff=backoff,
|
||||
jitter=jitter,
|
||||
logger=logging.getLogger(__name__),
|
||||
exceptions=exceptions,
|
||||
)
|
||||
def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return cast(F, wrapped_func)
|
||||
|
||||
return retry_with_default
|
||||
667
common/data_source/zendesk_connector.py
Normal file
667
common/data_source/zendesk_connector.py
Normal file
@ -0,0 +1,667 @@
|
||||
import copy
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
from pydantic import BaseModel
|
||||
from requests.exceptions import HTTPError
|
||||
from typing_extensions import override
|
||||
|
||||
from common.data_source.config import ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS, DocumentSource
|
||||
from common.data_source.exceptions import ConnectorValidationError, CredentialExpiredError, InsufficientPermissionsError
|
||||
from common.data_source.html_utils import parse_html_page_basic
|
||||
from common.data_source.interfaces import CheckpointOutput, CheckpointOutputWrapper, CheckpointedConnector, IndexingHeartbeatInterface, SlimConnectorWithPermSync
|
||||
from common.data_source.models import BasicExpertInfo, ConnectorCheckpoint, ConnectorFailure, Document, DocumentFailure, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument
|
||||
from common.data_source.utils import retry_builder, time_str_to_utc,rate_limit_builder
|
||||
|
||||
MAX_PAGE_SIZE = 30 # Zendesk API maximum
|
||||
MAX_AUTHOR_MAP_SIZE = 50_000 # Reset author map cache if it gets too large
|
||||
_SLIM_BATCH_SIZE = 1000
|
||||
|
||||
|
||||
class ZendeskCredentialsNotSetUpError(PermissionError):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
"Zendesk Credentials are not set up, was load_credentials called?"
|
||||
)
|
||||
|
||||
|
||||
class ZendeskClient:
|
||||
def __init__(
|
||||
self,
|
||||
subdomain: str,
|
||||
email: str,
|
||||
token: str,
|
||||
calls_per_minute: int | None = None,
|
||||
):
|
||||
self.base_url = f"https://{subdomain}.zendesk.com/api/v2"
|
||||
self.auth = (f"{email}/token", token)
|
||||
self.make_request = request_with_rate_limit(self, calls_per_minute)
|
||||
|
||||
|
||||
def request_with_rate_limit(
|
||||
client: ZendeskClient, max_calls_per_minute: int | None = None
|
||||
) -> Callable[[str, dict[str, Any]], dict[str, Any]]:
|
||||
@retry_builder()
|
||||
@(
|
||||
rate_limit_builder(max_calls=max_calls_per_minute, period=60)
|
||||
if max_calls_per_minute
|
||||
else lambda x: x
|
||||
)
|
||||
def make_request(endpoint: str, params: dict[str, Any]) -> dict[str, Any]:
|
||||
response = requests.get(
|
||||
f"{client.base_url}/{endpoint}", auth=client.auth, params=params
|
||||
)
|
||||
|
||||
if response.status_code == 429:
|
||||
retry_after = response.headers.get("Retry-After")
|
||||
if retry_after is not None:
|
||||
# Sleep for the duration indicated by the Retry-After header
|
||||
time.sleep(int(retry_after))
|
||||
|
||||
elif (
|
||||
response.status_code == 403
|
||||
and response.json().get("error") == "SupportProductInactive"
|
||||
):
|
||||
return response.json()
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
return make_request
|
||||
|
||||
|
||||
class ZendeskPageResponse(BaseModel):
|
||||
data: list[dict[str, Any]]
|
||||
meta: dict[str, Any]
|
||||
has_more: bool
|
||||
|
||||
|
||||
def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]:
|
||||
content_tags: dict[str, str] = {}
|
||||
params = {"page[size]": MAX_PAGE_SIZE}
|
||||
|
||||
try:
|
||||
while True:
|
||||
data = client.make_request("guide/content_tags", params)
|
||||
|
||||
for tag in data.get("records", []):
|
||||
content_tags[tag["id"]] = tag["name"]
|
||||
|
||||
# Check if there are more pages
|
||||
if data.get("meta", {}).get("has_more", False):
|
||||
params["page[after]"] = data["meta"]["after_cursor"]
|
||||
else:
|
||||
break
|
||||
|
||||
return content_tags
|
||||
except Exception as e:
|
||||
raise Exception(f"Error fetching content tags: {str(e)}")
|
||||
|
||||
|
||||
def _get_articles(
|
||||
client: ZendeskClient, start_time: int | None = None, page_size: int = MAX_PAGE_SIZE
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
|
||||
if start_time is not None:
|
||||
params["start_time"] = start_time
|
||||
|
||||
while True:
|
||||
data = client.make_request("help_center/articles", params)
|
||||
for article in data["articles"]:
|
||||
yield article
|
||||
|
||||
if not data.get("meta", {}).get("has_more"):
|
||||
break
|
||||
params["page[after]"] = data["meta"]["after_cursor"]
|
||||
|
||||
|
||||
def _get_article_page(
|
||||
client: ZendeskClient,
|
||||
start_time: int | None = None,
|
||||
after_cursor: str | None = None,
|
||||
page_size: int = MAX_PAGE_SIZE,
|
||||
) -> ZendeskPageResponse:
|
||||
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
|
||||
if start_time is not None:
|
||||
params["start_time"] = start_time
|
||||
if after_cursor is not None:
|
||||
params["page[after]"] = after_cursor
|
||||
|
||||
data = client.make_request("help_center/articles", params)
|
||||
return ZendeskPageResponse(
|
||||
data=data["articles"],
|
||||
meta=data["meta"],
|
||||
has_more=bool(data["meta"].get("has_more", False)),
|
||||
)
|
||||
|
||||
|
||||
def _get_tickets(
|
||||
client: ZendeskClient, start_time: int | None = None
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
params = {"start_time": start_time or 0}
|
||||
|
||||
while True:
|
||||
data = client.make_request("incremental/tickets.json", params)
|
||||
for ticket in data["tickets"]:
|
||||
yield ticket
|
||||
|
||||
if not data.get("end_of_stream", False):
|
||||
params["start_time"] = data["end_time"]
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
# TODO: maybe these don't need to be their own functions?
|
||||
def _get_tickets_page(
|
||||
client: ZendeskClient, start_time: int | None = None
|
||||
) -> ZendeskPageResponse:
|
||||
params = {"start_time": start_time or 0}
|
||||
|
||||
# NOTE: for some reason zendesk doesn't seem to be respecting the start_time param
|
||||
# in my local testing with very few tickets. We'll look into it if this becomes an
|
||||
# issue in larger deployments
|
||||
data = client.make_request("incremental/tickets.json", params)
|
||||
if data.get("error") == "SupportProductInactive":
|
||||
raise ValueError(
|
||||
"Zendesk Support Product is not active for this account, No tickets to index"
|
||||
)
|
||||
return ZendeskPageResponse(
|
||||
data=data["tickets"],
|
||||
meta={"end_time": data["end_time"]},
|
||||
has_more=not bool(data.get("end_of_stream", False)),
|
||||
)
|
||||
|
||||
|
||||
def _fetch_author(
|
||||
client: ZendeskClient, author_id: str | int
|
||||
) -> BasicExpertInfo | None:
|
||||
# Skip fetching if author_id is invalid
|
||||
# cast to str to avoid issues with zendesk changing their types
|
||||
if not author_id or str(author_id) == "-1":
|
||||
return None
|
||||
|
||||
try:
|
||||
author_data = client.make_request(f"users/{author_id}", {})
|
||||
user = author_data.get("user")
|
||||
return (
|
||||
BasicExpertInfo(display_name=user.get("name"), email=user.get("email"))
|
||||
if user and user.get("name") and user.get("email")
|
||||
else None
|
||||
)
|
||||
except requests.exceptions.HTTPError:
|
||||
# Handle any API errors gracefully
|
||||
return None
|
||||
|
||||
|
||||
def _article_to_document(
|
||||
article: dict[str, Any],
|
||||
content_tags: dict[str, str],
|
||||
author_map: dict[str, BasicExpertInfo],
|
||||
client: ZendeskClient,
|
||||
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
|
||||
author_id = article.get("author_id")
|
||||
if not author_id:
|
||||
author = None
|
||||
else:
|
||||
author = (
|
||||
author_map.get(author_id)
|
||||
if author_id in author_map
|
||||
else _fetch_author(client, author_id)
|
||||
)
|
||||
|
||||
new_author_mapping = {author_id: author} if author_id and author else None
|
||||
|
||||
updated_at = article.get("updated_at")
|
||||
update_time = time_str_to_utc(updated_at) if updated_at else None
|
||||
|
||||
text = parse_html_page_basic(article.get("body") or "")
|
||||
blob = text.encode("utf-8", errors="replace")
|
||||
# Build metadata
|
||||
metadata: dict[str, str | list[str]] = {
|
||||
"labels": [str(label) for label in article.get("label_names", []) if label],
|
||||
"content_tags": [
|
||||
content_tags[tag_id]
|
||||
for tag_id in article.get("content_tag_ids", [])
|
||||
if tag_id in content_tags
|
||||
],
|
||||
}
|
||||
|
||||
# Remove empty values
|
||||
metadata = {k: v for k, v in metadata.items() if v}
|
||||
|
||||
return new_author_mapping, Document(
|
||||
id=f"article:{article['id']}",
|
||||
source=DocumentSource.ZENDESK,
|
||||
semantic_identifier=article["title"],
|
||||
extension=".txt",
|
||||
blob=blob,
|
||||
size_bytes=len(blob),
|
||||
doc_updated_at=update_time,
|
||||
primary_owners=[author] if author else None,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
def _get_comment_text(
|
||||
comment: dict[str, Any],
|
||||
author_map: dict[str, BasicExpertInfo],
|
||||
client: ZendeskClient,
|
||||
) -> tuple[dict[str, BasicExpertInfo] | None, str]:
|
||||
author_id = comment.get("author_id")
|
||||
if not author_id:
|
||||
author = None
|
||||
else:
|
||||
author = (
|
||||
author_map.get(author_id)
|
||||
if author_id in author_map
|
||||
else _fetch_author(client, author_id)
|
||||
)
|
||||
|
||||
new_author_mapping = {author_id: author} if author_id and author else None
|
||||
|
||||
comment_text = f"Comment{' by ' + author.display_name if author and author.display_name else ''}"
|
||||
comment_text += f"{' at ' + comment['created_at'] if comment.get('created_at') else ''}:\n{comment['body']}"
|
||||
|
||||
return new_author_mapping, comment_text
|
||||
|
||||
|
||||
def _ticket_to_document(
|
||||
ticket: dict[str, Any],
|
||||
author_map: dict[str, BasicExpertInfo],
|
||||
client: ZendeskClient,
|
||||
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
|
||||
submitter_id = ticket.get("submitter")
|
||||
if not submitter_id:
|
||||
submitter = None
|
||||
else:
|
||||
submitter = (
|
||||
author_map.get(submitter_id)
|
||||
if submitter_id in author_map
|
||||
else _fetch_author(client, submitter_id)
|
||||
)
|
||||
|
||||
new_author_mapping = (
|
||||
{submitter_id: submitter} if submitter_id and submitter else None
|
||||
)
|
||||
|
||||
updated_at = ticket.get("updated_at")
|
||||
update_time = time_str_to_utc(updated_at) if updated_at else None
|
||||
|
||||
metadata: dict[str, str | list[str]] = {}
|
||||
if status := ticket.get("status"):
|
||||
metadata["status"] = status
|
||||
if priority := ticket.get("priority"):
|
||||
metadata["priority"] = priority
|
||||
if tags := ticket.get("tags"):
|
||||
metadata["tags"] = tags
|
||||
if ticket_type := ticket.get("type"):
|
||||
metadata["ticket_type"] = ticket_type
|
||||
|
||||
# Fetch comments for the ticket
|
||||
comments_data = client.make_request(f"tickets/{ticket.get('id')}/comments", {})
|
||||
comments = comments_data.get("comments", [])
|
||||
|
||||
comment_texts = []
|
||||
for comment in comments:
|
||||
new_author_mapping, comment_text = _get_comment_text(
|
||||
comment, author_map, client
|
||||
)
|
||||
if new_author_mapping:
|
||||
author_map.update(new_author_mapping)
|
||||
comment_texts.append(comment_text)
|
||||
|
||||
comments_text = "\n\n".join(comment_texts)
|
||||
|
||||
subject = ticket.get("subject")
|
||||
full_text = f"Ticket Subject:\n{subject}\n\nComments:\n{comments_text}"
|
||||
|
||||
blob = full_text.encode("utf-8", errors="replace")
|
||||
return new_author_mapping, Document(
|
||||
id=f"zendesk_ticket_{ticket['id']}",
|
||||
blob=blob,
|
||||
extension=".txt",
|
||||
size_bytes=len(blob),
|
||||
source=DocumentSource.ZENDESK,
|
||||
semantic_identifier=f"Ticket #{ticket['id']}: {subject or 'No Subject'}",
|
||||
doc_updated_at=update_time,
|
||||
primary_owners=[submitter] if submitter else None,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
class ZendeskConnectorCheckpoint(ConnectorCheckpoint):
|
||||
# We use cursor-based paginated retrieval for articles
|
||||
after_cursor_articles: str | None
|
||||
|
||||
# We use timestamp-based paginated retrieval for tickets
|
||||
next_start_time_tickets: int | None
|
||||
|
||||
cached_author_map: dict[str, BasicExpertInfo] | None
|
||||
cached_content_tags: dict[str, str] | None
|
||||
|
||||
|
||||
class ZendeskConnector(
|
||||
SlimConnectorWithPermSync, CheckpointedConnector[ZendeskConnectorCheckpoint]
|
||||
):
|
||||
def __init__(
|
||||
self,
|
||||
content_type: str = "articles",
|
||||
calls_per_minute: int | None = None,
|
||||
) -> None:
|
||||
self.content_type = content_type
|
||||
self.subdomain = ""
|
||||
# Fetch all tags ahead of time
|
||||
self.content_tags: dict[str, str] = {}
|
||||
self.calls_per_minute = calls_per_minute
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
# Subdomain is actually the whole URL
|
||||
subdomain = (
|
||||
credentials["zendesk_subdomain"]
|
||||
.replace("https://", "")
|
||||
.split(".zendesk.com")[0]
|
||||
)
|
||||
self.subdomain = subdomain
|
||||
|
||||
self.client = ZendeskClient(
|
||||
subdomain,
|
||||
credentials["zendesk_email"],
|
||||
credentials["zendesk_token"],
|
||||
calls_per_minute=self.calls_per_minute,
|
||||
)
|
||||
return None
|
||||
|
||||
@override
|
||||
def load_from_checkpoint(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: ZendeskConnectorCheckpoint,
|
||||
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
|
||||
if self.client is None:
|
||||
raise ZendeskCredentialsNotSetUpError()
|
||||
if checkpoint.cached_content_tags is None:
|
||||
checkpoint.cached_content_tags = _get_content_tag_mapping(self.client)
|
||||
return checkpoint # save the content tags to the checkpoint
|
||||
self.content_tags = checkpoint.cached_content_tags
|
||||
|
||||
if self.content_type == "articles":
|
||||
checkpoint = yield from self._retrieve_articles(start, end, checkpoint)
|
||||
return checkpoint
|
||||
elif self.content_type == "tickets":
|
||||
checkpoint = yield from self._retrieve_tickets(start, end, checkpoint)
|
||||
return checkpoint
|
||||
else:
|
||||
raise ValueError(f"Unsupported content_type: {self.content_type}")
|
||||
|
||||
def _retrieve_articles(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None,
|
||||
end: SecondsSinceUnixEpoch | None,
|
||||
checkpoint: ZendeskConnectorCheckpoint,
|
||||
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
|
||||
checkpoint = copy.deepcopy(checkpoint)
|
||||
# This one is built on the fly as there may be more many more authors than tags
|
||||
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
|
||||
after_cursor = checkpoint.after_cursor_articles
|
||||
doc_batch: list[Document] = []
|
||||
|
||||
response = _get_article_page(
|
||||
self.client,
|
||||
start_time=int(start) if start else None,
|
||||
after_cursor=after_cursor,
|
||||
)
|
||||
articles = response.data
|
||||
has_more = response.has_more
|
||||
after_cursor = response.meta.get("after_cursor")
|
||||
for article in articles:
|
||||
if (
|
||||
article.get("body") is None
|
||||
or article.get("draft")
|
||||
or any(
|
||||
label in ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS
|
||||
for label in article.get("label_names", [])
|
||||
)
|
||||
):
|
||||
continue
|
||||
|
||||
try:
|
||||
new_author_map, document = _article_to_document(
|
||||
article, self.content_tags, author_map, self.client
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing article {article['id']}: {e}")
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=f"{article.get('id')}",
|
||||
document_link=article.get("html_url", ""),
|
||||
),
|
||||
failure_message=str(e),
|
||||
exception=e,
|
||||
)
|
||||
continue
|
||||
|
||||
if new_author_map:
|
||||
author_map.update(new_author_map)
|
||||
updated_at = document.doc_updated_at
|
||||
updated_ts = updated_at.timestamp() if updated_at else None
|
||||
if updated_ts is not None:
|
||||
if start is not None and updated_ts <= start:
|
||||
continue
|
||||
if end is not None and updated_ts > end:
|
||||
continue
|
||||
|
||||
doc_batch.append(document)
|
||||
|
||||
if not has_more:
|
||||
yield from doc_batch
|
||||
checkpoint.has_more = False
|
||||
return checkpoint
|
||||
|
||||
# Sometimes no documents are retrieved, but the cursor
|
||||
# is still updated so the connector makes progress.
|
||||
yield from doc_batch
|
||||
checkpoint.after_cursor_articles = after_cursor
|
||||
|
||||
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
|
||||
checkpoint.has_more = bool(
|
||||
end is None
|
||||
or last_doc_updated_at is None
|
||||
or last_doc_updated_at.timestamp() <= end
|
||||
)
|
||||
checkpoint.cached_author_map = (
|
||||
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
|
||||
)
|
||||
return checkpoint
|
||||
|
||||
def _retrieve_tickets(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None,
|
||||
end: SecondsSinceUnixEpoch | None,
|
||||
checkpoint: ZendeskConnectorCheckpoint,
|
||||
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
|
||||
checkpoint = copy.deepcopy(checkpoint)
|
||||
if self.client is None:
|
||||
raise ZendeskCredentialsNotSetUpError()
|
||||
|
||||
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
|
||||
|
||||
doc_batch: list[Document] = []
|
||||
next_start_time = int(checkpoint.next_start_time_tickets or start or 0)
|
||||
ticket_response = _get_tickets_page(self.client, start_time=next_start_time)
|
||||
|
||||
tickets = ticket_response.data
|
||||
has_more = ticket_response.has_more
|
||||
next_start_time = ticket_response.meta["end_time"]
|
||||
for ticket in tickets:
|
||||
if ticket.get("status") == "deleted":
|
||||
continue
|
||||
|
||||
try:
|
||||
new_author_map, document = _ticket_to_document(
|
||||
ticket=ticket,
|
||||
author_map=author_map,
|
||||
client=self.client,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing ticket {ticket['id']}: {e}")
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=f"{ticket.get('id')}",
|
||||
document_link=ticket.get("url", ""),
|
||||
),
|
||||
failure_message=str(e),
|
||||
exception=e,
|
||||
)
|
||||
continue
|
||||
|
||||
if new_author_map:
|
||||
author_map.update(new_author_map)
|
||||
|
||||
updated_at = document.doc_updated_at
|
||||
updated_ts = updated_at.timestamp() if updated_at else None
|
||||
|
||||
if updated_ts is not None:
|
||||
if start is not None and updated_ts <= start:
|
||||
continue
|
||||
if end is not None and updated_ts > end:
|
||||
continue
|
||||
|
||||
doc_batch.append(document)
|
||||
|
||||
if not has_more:
|
||||
yield from doc_batch
|
||||
checkpoint.has_more = False
|
||||
return checkpoint
|
||||
|
||||
yield from doc_batch
|
||||
checkpoint.next_start_time_tickets = next_start_time
|
||||
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
|
||||
checkpoint.has_more = bool(
|
||||
end is None
|
||||
or last_doc_updated_at is None
|
||||
or last_doc_updated_at.timestamp() <= end
|
||||
)
|
||||
checkpoint.cached_author_map = (
|
||||
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
|
||||
)
|
||||
return checkpoint
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
slim_doc_batch: list[SlimDocument] = []
|
||||
if self.content_type == "articles":
|
||||
articles = _get_articles(
|
||||
self.client, start_time=int(start) if start else None
|
||||
)
|
||||
for article in articles:
|
||||
slim_doc_batch.append(
|
||||
SlimDocument(
|
||||
id=f"article:{article['id']}",
|
||||
)
|
||||
)
|
||||
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
|
||||
yield slim_doc_batch
|
||||
slim_doc_batch = []
|
||||
elif self.content_type == "tickets":
|
||||
tickets = _get_tickets(
|
||||
self.client, start_time=int(start) if start else None
|
||||
)
|
||||
for ticket in tickets:
|
||||
slim_doc_batch.append(
|
||||
SlimDocument(
|
||||
id=f"zendesk_ticket_{ticket['id']}",
|
||||
)
|
||||
)
|
||||
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
|
||||
yield slim_doc_batch
|
||||
slim_doc_batch = []
|
||||
else:
|
||||
raise ValueError(f"Unsupported content_type: {self.content_type}")
|
||||
if slim_doc_batch:
|
||||
yield slim_doc_batch
|
||||
|
||||
@override
|
||||
def validate_connector_settings(self) -> None:
|
||||
if self.client is None:
|
||||
raise ZendeskCredentialsNotSetUpError()
|
||||
|
||||
try:
|
||||
_get_article_page(self.client, start_time=0)
|
||||
except HTTPError as e:
|
||||
# Check for HTTP status codes
|
||||
if e.response.status_code == 401:
|
||||
raise CredentialExpiredError(
|
||||
"Your Zendesk credentials appear to be invalid or expired (HTTP 401)."
|
||||
) from e
|
||||
elif e.response.status_code == 403:
|
||||
raise InsufficientPermissionsError(
|
||||
"Your Zendesk token does not have sufficient permissions (HTTP 403)."
|
||||
) from e
|
||||
elif e.response.status_code == 404:
|
||||
raise ConnectorValidationError(
|
||||
"Zendesk resource not found (HTTP 404)."
|
||||
) from e
|
||||
else:
|
||||
raise ConnectorValidationError(
|
||||
f"Unexpected Zendesk error (status={e.response.status_code}): {e}"
|
||||
) from e
|
||||
|
||||
@override
|
||||
def validate_checkpoint_json(
|
||||
self, checkpoint_json: str
|
||||
) -> ZendeskConnectorCheckpoint:
|
||||
return ZendeskConnectorCheckpoint.model_validate_json(checkpoint_json)
|
||||
|
||||
@override
|
||||
def build_dummy_checkpoint(self) -> ZendeskConnectorCheckpoint:
|
||||
return ZendeskConnectorCheckpoint(
|
||||
after_cursor_articles=None,
|
||||
next_start_time_tickets=None,
|
||||
cached_author_map=None,
|
||||
cached_content_tags=None,
|
||||
has_more=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
|
||||
connector = ZendeskConnector(content_type="articles")
|
||||
connector.load_credentials(
|
||||
{
|
||||
"zendesk_subdomain": os.environ["ZENDESK_SUBDOMAIN"],
|
||||
"zendesk_email": os.environ["ZENDESK_EMAIL"],
|
||||
"zendesk_token": os.environ["ZENDESK_TOKEN"],
|
||||
}
|
||||
)
|
||||
|
||||
current = time.time()
|
||||
one_day_ago = current - 24 * 60 * 60 # 1 day
|
||||
|
||||
checkpoint = connector.build_dummy_checkpoint()
|
||||
|
||||
while checkpoint.has_more:
|
||||
gen = connector.load_from_checkpoint(
|
||||
one_day_ago, current, checkpoint
|
||||
)
|
||||
|
||||
wrapper = CheckpointOutputWrapper()
|
||||
any_doc = False
|
||||
|
||||
for document, failure, next_checkpoint in wrapper(gen):
|
||||
if document:
|
||||
print("got document:", document.id)
|
||||
any_doc = True
|
||||
|
||||
checkpoint = next_checkpoint
|
||||
if any_doc:
|
||||
break
|
||||
Reference in New Issue
Block a user