Files
ragflow/common/data_source/zendesk_connector.py
buua436 c2ee2bf7fe Feat: add Zendesk data source integration with configuration and sync capabilities (#12344)
### What problem does this PR solve?
issue:
#12313
change:
add Zendesk data source integration with configuration and sync
capabilities

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
2025-12-31 14:40:49 +08:00

667 lines
23 KiB
Python

import copy
import logging
import time
from collections.abc import Callable
from collections.abc import Iterator
from typing import Any
import requests
from pydantic import BaseModel
from requests.exceptions import HTTPError
from typing_extensions import override
from common.data_source.config import ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS, DocumentSource
from common.data_source.exceptions import ConnectorValidationError, CredentialExpiredError, InsufficientPermissionsError
from common.data_source.html_utils import parse_html_page_basic
from common.data_source.interfaces import CheckpointOutput, CheckpointOutputWrapper, CheckpointedConnector, IndexingHeartbeatInterface, SlimConnectorWithPermSync
from common.data_source.models import BasicExpertInfo, ConnectorCheckpoint, ConnectorFailure, Document, DocumentFailure, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument
from common.data_source.utils import retry_builder, time_str_to_utc,rate_limit_builder
MAX_PAGE_SIZE = 30 # Zendesk API maximum
MAX_AUTHOR_MAP_SIZE = 50_000 # Reset author map cache if it gets too large
_SLIM_BATCH_SIZE = 1000
class ZendeskCredentialsNotSetUpError(PermissionError):
def __init__(self) -> None:
super().__init__(
"Zendesk Credentials are not set up, was load_credentials called?"
)
class ZendeskClient:
def __init__(
self,
subdomain: str,
email: str,
token: str,
calls_per_minute: int | None = None,
):
self.base_url = f"https://{subdomain}.zendesk.com/api/v2"
self.auth = (f"{email}/token", token)
self.make_request = request_with_rate_limit(self, calls_per_minute)
def request_with_rate_limit(
client: ZendeskClient, max_calls_per_minute: int | None = None
) -> Callable[[str, dict[str, Any]], dict[str, Any]]:
@retry_builder()
@(
rate_limit_builder(max_calls=max_calls_per_minute, period=60)
if max_calls_per_minute
else lambda x: x
)
def make_request(endpoint: str, params: dict[str, Any]) -> dict[str, Any]:
response = requests.get(
f"{client.base_url}/{endpoint}", auth=client.auth, params=params
)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After")
if retry_after is not None:
# Sleep for the duration indicated by the Retry-After header
time.sleep(int(retry_after))
elif (
response.status_code == 403
and response.json().get("error") == "SupportProductInactive"
):
return response.json()
response.raise_for_status()
return response.json()
return make_request
class ZendeskPageResponse(BaseModel):
data: list[dict[str, Any]]
meta: dict[str, Any]
has_more: bool
def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]:
content_tags: dict[str, str] = {}
params = {"page[size]": MAX_PAGE_SIZE}
try:
while True:
data = client.make_request("guide/content_tags", params)
for tag in data.get("records", []):
content_tags[tag["id"]] = tag["name"]
# Check if there are more pages
if data.get("meta", {}).get("has_more", False):
params["page[after]"] = data["meta"]["after_cursor"]
else:
break
return content_tags
except Exception as e:
raise Exception(f"Error fetching content tags: {str(e)}")
def _get_articles(
client: ZendeskClient, start_time: int | None = None, page_size: int = MAX_PAGE_SIZE
) -> Iterator[dict[str, Any]]:
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
if start_time is not None:
params["start_time"] = start_time
while True:
data = client.make_request("help_center/articles", params)
for article in data["articles"]:
yield article
if not data.get("meta", {}).get("has_more"):
break
params["page[after]"] = data["meta"]["after_cursor"]
def _get_article_page(
client: ZendeskClient,
start_time: int | None = None,
after_cursor: str | None = None,
page_size: int = MAX_PAGE_SIZE,
) -> ZendeskPageResponse:
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
if start_time is not None:
params["start_time"] = start_time
if after_cursor is not None:
params["page[after]"] = after_cursor
data = client.make_request("help_center/articles", params)
return ZendeskPageResponse(
data=data["articles"],
meta=data["meta"],
has_more=bool(data["meta"].get("has_more", False)),
)
def _get_tickets(
client: ZendeskClient, start_time: int | None = None
) -> Iterator[dict[str, Any]]:
params = {"start_time": start_time or 0}
while True:
data = client.make_request("incremental/tickets.json", params)
for ticket in data["tickets"]:
yield ticket
if not data.get("end_of_stream", False):
params["start_time"] = data["end_time"]
else:
break
# TODO: maybe these don't need to be their own functions?
def _get_tickets_page(
client: ZendeskClient, start_time: int | None = None
) -> ZendeskPageResponse:
params = {"start_time": start_time or 0}
# NOTE: for some reason zendesk doesn't seem to be respecting the start_time param
# in my local testing with very few tickets. We'll look into it if this becomes an
# issue in larger deployments
data = client.make_request("incremental/tickets.json", params)
if data.get("error") == "SupportProductInactive":
raise ValueError(
"Zendesk Support Product is not active for this account, No tickets to index"
)
return ZendeskPageResponse(
data=data["tickets"],
meta={"end_time": data["end_time"]},
has_more=not bool(data.get("end_of_stream", False)),
)
def _fetch_author(
client: ZendeskClient, author_id: str | int
) -> BasicExpertInfo | None:
# Skip fetching if author_id is invalid
# cast to str to avoid issues with zendesk changing their types
if not author_id or str(author_id) == "-1":
return None
try:
author_data = client.make_request(f"users/{author_id}", {})
user = author_data.get("user")
return (
BasicExpertInfo(display_name=user.get("name"), email=user.get("email"))
if user and user.get("name") and user.get("email")
else None
)
except requests.exceptions.HTTPError:
# Handle any API errors gracefully
return None
def _article_to_document(
article: dict[str, Any],
content_tags: dict[str, str],
author_map: dict[str, BasicExpertInfo],
client: ZendeskClient,
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
author_id = article.get("author_id")
if not author_id:
author = None
else:
author = (
author_map.get(author_id)
if author_id in author_map
else _fetch_author(client, author_id)
)
new_author_mapping = {author_id: author} if author_id and author else None
updated_at = article.get("updated_at")
update_time = time_str_to_utc(updated_at) if updated_at else None
text = parse_html_page_basic(article.get("body") or "")
blob = text.encode("utf-8", errors="replace")
# Build metadata
metadata: dict[str, str | list[str]] = {
"labels": [str(label) for label in article.get("label_names", []) if label],
"content_tags": [
content_tags[tag_id]
for tag_id in article.get("content_tag_ids", [])
if tag_id in content_tags
],
}
# Remove empty values
metadata = {k: v for k, v in metadata.items() if v}
return new_author_mapping, Document(
id=f"article:{article['id']}",
source=DocumentSource.ZENDESK,
semantic_identifier=article["title"],
extension=".txt",
blob=blob,
size_bytes=len(blob),
doc_updated_at=update_time,
primary_owners=[author] if author else None,
metadata=metadata,
)
def _get_comment_text(
comment: dict[str, Any],
author_map: dict[str, BasicExpertInfo],
client: ZendeskClient,
) -> tuple[dict[str, BasicExpertInfo] | None, str]:
author_id = comment.get("author_id")
if not author_id:
author = None
else:
author = (
author_map.get(author_id)
if author_id in author_map
else _fetch_author(client, author_id)
)
new_author_mapping = {author_id: author} if author_id and author else None
comment_text = f"Comment{' by ' + author.display_name if author and author.display_name else ''}"
comment_text += f"{' at ' + comment['created_at'] if comment.get('created_at') else ''}:\n{comment['body']}"
return new_author_mapping, comment_text
def _ticket_to_document(
ticket: dict[str, Any],
author_map: dict[str, BasicExpertInfo],
client: ZendeskClient,
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
submitter_id = ticket.get("submitter")
if not submitter_id:
submitter = None
else:
submitter = (
author_map.get(submitter_id)
if submitter_id in author_map
else _fetch_author(client, submitter_id)
)
new_author_mapping = (
{submitter_id: submitter} if submitter_id and submitter else None
)
updated_at = ticket.get("updated_at")
update_time = time_str_to_utc(updated_at) if updated_at else None
metadata: dict[str, str | list[str]] = {}
if status := ticket.get("status"):
metadata["status"] = status
if priority := ticket.get("priority"):
metadata["priority"] = priority
if tags := ticket.get("tags"):
metadata["tags"] = tags
if ticket_type := ticket.get("type"):
metadata["ticket_type"] = ticket_type
# Fetch comments for the ticket
comments_data = client.make_request(f"tickets/{ticket.get('id')}/comments", {})
comments = comments_data.get("comments", [])
comment_texts = []
for comment in comments:
new_author_mapping, comment_text = _get_comment_text(
comment, author_map, client
)
if new_author_mapping:
author_map.update(new_author_mapping)
comment_texts.append(comment_text)
comments_text = "\n\n".join(comment_texts)
subject = ticket.get("subject")
full_text = f"Ticket Subject:\n{subject}\n\nComments:\n{comments_text}"
blob = full_text.encode("utf-8", errors="replace")
return new_author_mapping, Document(
id=f"zendesk_ticket_{ticket['id']}",
blob=blob,
extension=".txt",
size_bytes=len(blob),
source=DocumentSource.ZENDESK,
semantic_identifier=f"Ticket #{ticket['id']}: {subject or 'No Subject'}",
doc_updated_at=update_time,
primary_owners=[submitter] if submitter else None,
metadata=metadata,
)
class ZendeskConnectorCheckpoint(ConnectorCheckpoint):
# We use cursor-based paginated retrieval for articles
after_cursor_articles: str | None
# We use timestamp-based paginated retrieval for tickets
next_start_time_tickets: int | None
cached_author_map: dict[str, BasicExpertInfo] | None
cached_content_tags: dict[str, str] | None
class ZendeskConnector(
SlimConnectorWithPermSync, CheckpointedConnector[ZendeskConnectorCheckpoint]
):
def __init__(
self,
content_type: str = "articles",
calls_per_minute: int | None = None,
) -> None:
self.content_type = content_type
self.subdomain = ""
# Fetch all tags ahead of time
self.content_tags: dict[str, str] = {}
self.calls_per_minute = calls_per_minute
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
# Subdomain is actually the whole URL
subdomain = (
credentials["zendesk_subdomain"]
.replace("https://", "")
.split(".zendesk.com")[0]
)
self.subdomain = subdomain
self.client = ZendeskClient(
subdomain,
credentials["zendesk_email"],
credentials["zendesk_token"],
calls_per_minute=self.calls_per_minute,
)
return None
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
if checkpoint.cached_content_tags is None:
checkpoint.cached_content_tags = _get_content_tag_mapping(self.client)
return checkpoint # save the content tags to the checkpoint
self.content_tags = checkpoint.cached_content_tags
if self.content_type == "articles":
checkpoint = yield from self._retrieve_articles(start, end, checkpoint)
return checkpoint
elif self.content_type == "tickets":
checkpoint = yield from self._retrieve_tickets(start, end, checkpoint)
return checkpoint
else:
raise ValueError(f"Unsupported content_type: {self.content_type}")
def _retrieve_articles(
self,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
# This one is built on the fly as there may be more many more authors than tags
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
after_cursor = checkpoint.after_cursor_articles
doc_batch: list[Document] = []
response = _get_article_page(
self.client,
start_time=int(start) if start else None,
after_cursor=after_cursor,
)
articles = response.data
has_more = response.has_more
after_cursor = response.meta.get("after_cursor")
for article in articles:
if (
article.get("body") is None
or article.get("draft")
or any(
label in ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS
for label in article.get("label_names", [])
)
):
continue
try:
new_author_map, document = _article_to_document(
article, self.content_tags, author_map, self.client
)
except Exception as e:
logging.error(f"Error processing article {article['id']}: {e}")
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=f"{article.get('id')}",
document_link=article.get("html_url", ""),
),
failure_message=str(e),
exception=e,
)
continue
if new_author_map:
author_map.update(new_author_map)
updated_at = document.doc_updated_at
updated_ts = updated_at.timestamp() if updated_at else None
if updated_ts is not None:
if start is not None and updated_ts <= start:
continue
if end is not None and updated_ts > end:
continue
doc_batch.append(document)
if not has_more:
yield from doc_batch
checkpoint.has_more = False
return checkpoint
# Sometimes no documents are retrieved, but the cursor
# is still updated so the connector makes progress.
yield from doc_batch
checkpoint.after_cursor_articles = after_cursor
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
checkpoint.has_more = bool(
end is None
or last_doc_updated_at is None
or last_doc_updated_at.timestamp() <= end
)
checkpoint.cached_author_map = (
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
)
return checkpoint
def _retrieve_tickets(
self,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
doc_batch: list[Document] = []
next_start_time = int(checkpoint.next_start_time_tickets or start or 0)
ticket_response = _get_tickets_page(self.client, start_time=next_start_time)
tickets = ticket_response.data
has_more = ticket_response.has_more
next_start_time = ticket_response.meta["end_time"]
for ticket in tickets:
if ticket.get("status") == "deleted":
continue
try:
new_author_map, document = _ticket_to_document(
ticket=ticket,
author_map=author_map,
client=self.client,
)
except Exception as e:
logging.error(f"Error processing ticket {ticket['id']}: {e}")
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=f"{ticket.get('id')}",
document_link=ticket.get("url", ""),
),
failure_message=str(e),
exception=e,
)
continue
if new_author_map:
author_map.update(new_author_map)
updated_at = document.doc_updated_at
updated_ts = updated_at.timestamp() if updated_at else None
if updated_ts is not None:
if start is not None and updated_ts <= start:
continue
if end is not None and updated_ts > end:
continue
doc_batch.append(document)
if not has_more:
yield from doc_batch
checkpoint.has_more = False
return checkpoint
yield from doc_batch
checkpoint.next_start_time_tickets = next_start_time
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
checkpoint.has_more = bool(
end is None
or last_doc_updated_at is None
or last_doc_updated_at.timestamp() <= end
)
checkpoint.cached_author_map = (
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
)
return checkpoint
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
slim_doc_batch: list[SlimDocument] = []
if self.content_type == "articles":
articles = _get_articles(
self.client, start_time=int(start) if start else None
)
for article in articles:
slim_doc_batch.append(
SlimDocument(
id=f"article:{article['id']}",
)
)
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
yield slim_doc_batch
slim_doc_batch = []
elif self.content_type == "tickets":
tickets = _get_tickets(
self.client, start_time=int(start) if start else None
)
for ticket in tickets:
slim_doc_batch.append(
SlimDocument(
id=f"zendesk_ticket_{ticket['id']}",
)
)
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
yield slim_doc_batch
slim_doc_batch = []
else:
raise ValueError(f"Unsupported content_type: {self.content_type}")
if slim_doc_batch:
yield slim_doc_batch
@override
def validate_connector_settings(self) -> None:
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
try:
_get_article_page(self.client, start_time=0)
except HTTPError as e:
# Check for HTTP status codes
if e.response.status_code == 401:
raise CredentialExpiredError(
"Your Zendesk credentials appear to be invalid or expired (HTTP 401)."
) from e
elif e.response.status_code == 403:
raise InsufficientPermissionsError(
"Your Zendesk token does not have sufficient permissions (HTTP 403)."
) from e
elif e.response.status_code == 404:
raise ConnectorValidationError(
"Zendesk resource not found (HTTP 404)."
) from e
else:
raise ConnectorValidationError(
f"Unexpected Zendesk error (status={e.response.status_code}): {e}"
) from e
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> ZendeskConnectorCheckpoint:
return ZendeskConnectorCheckpoint.model_validate_json(checkpoint_json)
@override
def build_dummy_checkpoint(self) -> ZendeskConnectorCheckpoint:
return ZendeskConnectorCheckpoint(
after_cursor_articles=None,
next_start_time_tickets=None,
cached_author_map=None,
cached_content_tags=None,
has_more=True,
)
if __name__ == "__main__":
import os
connector = ZendeskConnector(content_type="articles")
connector.load_credentials(
{
"zendesk_subdomain": os.environ["ZENDESK_SUBDOMAIN"],
"zendesk_email": os.environ["ZENDESK_EMAIL"],
"zendesk_token": os.environ["ZENDESK_TOKEN"],
}
)
current = time.time()
one_day_ago = current - 24 * 60 * 60 # 1 day
checkpoint = connector.build_dummy_checkpoint()
while checkpoint.has_more:
gen = connector.load_from_checkpoint(
one_day_ago, current, checkpoint
)
wrapper = CheckpointOutputWrapper()
any_doc = False
for document, failure, next_checkpoint in wrapper(gen):
if document:
print("got document:", document.id)
any_doc = True
checkpoint = next_checkpoint
if any_doc:
break