mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-01-04 03:25:30 +08:00
Compare commits
6 Commits
v0.23.1
...
6a664fea3b
| Author | SHA1 | Date | |
|---|---|---|---|
| 6a664fea3b | |||
| dcdc1b0ec7 | |||
| 4af4c36e60 | |||
| 05e5244d94 | |||
| c2ee2bf7fe | |||
| 461c81e14a |
@ -202,7 +202,7 @@ class Retrieval(ToolBase, ABC):
|
||||
kbinfos["chunks"] = settings.retriever.retrieval_by_children(kbinfos["chunks"],
|
||||
[kb.tenant_id for kb in kbs])
|
||||
if self._param.use_kg:
|
||||
ck = settings.kg_retriever.retrieval(query,
|
||||
ck = await settings.kg_retriever.retrieval(query,
|
||||
[kb.tenant_id for kb in kbs],
|
||||
kb_ids,
|
||||
embd_mdl,
|
||||
@ -215,7 +215,7 @@ class Retrieval(ToolBase, ABC):
|
||||
kbinfos = {"chunks": [], "doc_aggs": []}
|
||||
|
||||
if self._param.use_kg and kbs:
|
||||
ck = settings.kg_retriever.retrieval(query, [kb.tenant_id for kb in kbs], filtered_kb_ids, embd_mdl,
|
||||
ck = await settings.kg_retriever.retrieval(query, [kb.tenant_id for kb in kbs], filtered_kb_ids, embd_mdl,
|
||||
LLMBundle(kbs[0].tenant_id, LLMType.CHAT))
|
||||
if self.check_if_canceled("Retrieval processing"):
|
||||
return
|
||||
|
||||
@ -381,7 +381,7 @@ async def retrieval_test():
|
||||
rank_feature=labels
|
||||
)
|
||||
if use_kg:
|
||||
ck = settings.kg_retriever.retrieval(_question,
|
||||
ck = await settings.kg_retriever.retrieval(_question,
|
||||
tenant_ids,
|
||||
kb_ids,
|
||||
embd_mdl,
|
||||
|
||||
@ -150,7 +150,7 @@ async def retrieval(tenant_id):
|
||||
)
|
||||
|
||||
if use_kg:
|
||||
ck = settings.kg_retriever.retrieval(question,
|
||||
ck = await settings.kg_retriever.retrieval(question,
|
||||
[tenant_id],
|
||||
[kb_id],
|
||||
embd_mdl,
|
||||
|
||||
@ -1579,7 +1579,7 @@ async def retrieval_test(tenant_id):
|
||||
if cks:
|
||||
ranks["chunks"] = cks
|
||||
if use_kg:
|
||||
ck = settings.kg_retriever.retrieval(question, [k.tenant_id for k in kbs], kb_ids, embd_mdl, LLMBundle(kb.tenant_id, LLMType.CHAT))
|
||||
ck = await settings.kg_retriever.retrieval(question, [k.tenant_id for k in kbs], kb_ids, embd_mdl, LLMBundle(kb.tenant_id, LLMType.CHAT))
|
||||
if ck["content_with_weight"]:
|
||||
ranks["chunks"].insert(0, ck)
|
||||
|
||||
|
||||
@ -1116,7 +1116,7 @@ async def retrieval_test_embedded():
|
||||
local_doc_ids, rerank_mdl=rerank_mdl, highlight=req.get("highlight"), rank_feature=labels
|
||||
)
|
||||
if use_kg:
|
||||
ck = settings.kg_retriever.retrieval(_question, tenant_ids, kb_ids, embd_mdl,
|
||||
ck = await settings.kg_retriever.retrieval(_question, tenant_ids, kb_ids, embd_mdl,
|
||||
LLMBundle(kb.tenant_id, LLMType.CHAT))
|
||||
if ck["content_with_weight"]:
|
||||
ranks["chunks"].insert(0, ck)
|
||||
|
||||
@ -421,7 +421,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs):
|
||||
kbinfos["chunks"].extend(tav_res["chunks"])
|
||||
kbinfos["doc_aggs"].extend(tav_res["doc_aggs"])
|
||||
if prompt_config.get("use_kg"):
|
||||
ck = settings.kg_retriever.retrieval(" ".join(questions), tenant_ids, dialog.kb_ids, embd_mdl,
|
||||
ck = await settings.kg_retriever.retrieval(" ".join(questions), tenant_ids, dialog.kb_ids, embd_mdl,
|
||||
LLMBundle(dialog.tenant_id, LLMType.CHAT))
|
||||
if ck["content_with_weight"]:
|
||||
kbinfos["chunks"].insert(0, ck)
|
||||
|
||||
@ -133,6 +133,7 @@ class FileSource(StrEnum):
|
||||
GITHUB = "github"
|
||||
GITLAB = "gitlab"
|
||||
IMAP = "imap"
|
||||
ZENDESK = "zendesk"
|
||||
|
||||
class PipelineTaskType(StrEnum):
|
||||
PARSE = "Parse"
|
||||
|
||||
@ -39,6 +39,7 @@ from .moodle_connector import MoodleConnector
|
||||
from .airtable_connector import AirtableConnector
|
||||
from .asana_connector import AsanaConnector
|
||||
from .imap_connector import ImapConnector
|
||||
from .zendesk_connector import ZendeskConnector
|
||||
from .config import BlobType, DocumentSource
|
||||
from .models import Document, TextSection, ImageSection, BasicExpertInfo
|
||||
from .exceptions import (
|
||||
@ -76,5 +77,6 @@ __all__ = [
|
||||
"UnexpectedValidationError",
|
||||
"AirtableConnector",
|
||||
"AsanaConnector",
|
||||
"ImapConnector"
|
||||
"ImapConnector",
|
||||
"ZendeskConnector",
|
||||
]
|
||||
|
||||
@ -58,8 +58,9 @@ class DocumentSource(str, Enum):
|
||||
GITHUB = "github"
|
||||
GITLAB = "gitlab"
|
||||
IMAP = "imap"
|
||||
ZENDESK = "zendesk"
|
||||
|
||||
|
||||
|
||||
class FileOrigin(str, Enum):
|
||||
"""File origins"""
|
||||
CONNECTOR = "connector"
|
||||
@ -271,6 +272,10 @@ IMAP_CONNECTOR_SIZE_THRESHOLD = int(
|
||||
os.environ.get("IMAP_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024)
|
||||
)
|
||||
|
||||
ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS = os.environ.get(
|
||||
"ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS", ""
|
||||
).split(",")
|
||||
|
||||
_USER_NOT_FOUND = "Unknown Confluence User"
|
||||
|
||||
_COMMENT_EXPANSION_FIELDS = ["body.storage.value"]
|
||||
|
||||
@ -1149,3 +1149,101 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R
|
||||
future_to_index[executor.submit(_next_or_none, ind, gens[ind])] = next_ind
|
||||
next_ind += 1
|
||||
del future_to_index[future]
|
||||
|
||||
F = TypeVar("F", bound=Callable[..., Any])
|
||||
|
||||
class _RateLimitDecorator:
|
||||
"""Builds a generic wrapper/decorator for calls to external APIs that
|
||||
prevents making more than `max_calls` requests per `period`
|
||||
|
||||
Implementation inspired by the `ratelimit` library:
|
||||
https://github.com/tomasbasham/ratelimit.
|
||||
|
||||
NOTE: is not thread safe.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_calls: int,
|
||||
period: float, # in seconds
|
||||
sleep_time: float = 2, # in seconds
|
||||
sleep_backoff: float = 2, # applies exponential backoff
|
||||
max_num_sleep: int = 0,
|
||||
):
|
||||
self.max_calls = max_calls
|
||||
self.period = period
|
||||
self.sleep_time = sleep_time
|
||||
self.sleep_backoff = sleep_backoff
|
||||
self.max_num_sleep = max_num_sleep
|
||||
|
||||
self.call_history: list[float] = []
|
||||
self.curr_calls = 0
|
||||
|
||||
def __call__(self, func: F) -> F:
|
||||
@wraps(func)
|
||||
def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any:
|
||||
# cleanup calls which are no longer relevant
|
||||
self._cleanup()
|
||||
|
||||
# check if we've exceeded the rate limit
|
||||
sleep_cnt = 0
|
||||
while len(self.call_history) == self.max_calls:
|
||||
sleep_time = self.sleep_time * (self.sleep_backoff**sleep_cnt)
|
||||
logging.warning(
|
||||
f"Rate limit exceeded for function {func.__name__}. "
|
||||
f"Waiting {sleep_time} seconds before retrying."
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
sleep_cnt += 1
|
||||
if self.max_num_sleep != 0 and sleep_cnt >= self.max_num_sleep:
|
||||
raise RateLimitTriedTooManyTimesError(
|
||||
f"Exceeded '{self.max_num_sleep}' retries for function '{func.__name__}'"
|
||||
)
|
||||
|
||||
self._cleanup()
|
||||
|
||||
# add the current call to the call history
|
||||
self.call_history.append(time.monotonic())
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return cast(F, wrapped_func)
|
||||
|
||||
def _cleanup(self) -> None:
|
||||
curr_time = time.monotonic()
|
||||
time_to_expire_before = curr_time - self.period
|
||||
self.call_history = [
|
||||
call_time
|
||||
for call_time in self.call_history
|
||||
if call_time > time_to_expire_before
|
||||
]
|
||||
|
||||
rate_limit_builder = _RateLimitDecorator
|
||||
|
||||
def retry_builder(
|
||||
tries: int = 20,
|
||||
delay: float = 0.1,
|
||||
max_delay: float | None = 60,
|
||||
backoff: float = 2,
|
||||
jitter: tuple[float, float] | float = 1,
|
||||
exceptions: type[Exception] | tuple[type[Exception], ...] = (Exception,),
|
||||
) -> Callable[[F], F]:
|
||||
"""Builds a generic wrapper/decorator for calls to external APIs that
|
||||
may fail due to rate limiting, flakes, or other reasons. Applies exponential
|
||||
backoff with jitter to retry the call."""
|
||||
|
||||
def retry_with_default(func: F) -> F:
|
||||
@retry(
|
||||
tries=tries,
|
||||
delay=delay,
|
||||
max_delay=max_delay,
|
||||
backoff=backoff,
|
||||
jitter=jitter,
|
||||
logger=logging.getLogger(__name__),
|
||||
exceptions=exceptions,
|
||||
)
|
||||
def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return cast(F, wrapped_func)
|
||||
|
||||
return retry_with_default
|
||||
667
common/data_source/zendesk_connector.py
Normal file
667
common/data_source/zendesk_connector.py
Normal file
@ -0,0 +1,667 @@
|
||||
import copy
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
from pydantic import BaseModel
|
||||
from requests.exceptions import HTTPError
|
||||
from typing_extensions import override
|
||||
|
||||
from common.data_source.config import ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS, DocumentSource
|
||||
from common.data_source.exceptions import ConnectorValidationError, CredentialExpiredError, InsufficientPermissionsError
|
||||
from common.data_source.html_utils import parse_html_page_basic
|
||||
from common.data_source.interfaces import CheckpointOutput, CheckpointOutputWrapper, CheckpointedConnector, IndexingHeartbeatInterface, SlimConnectorWithPermSync
|
||||
from common.data_source.models import BasicExpertInfo, ConnectorCheckpoint, ConnectorFailure, Document, DocumentFailure, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument
|
||||
from common.data_source.utils import retry_builder, time_str_to_utc,rate_limit_builder
|
||||
|
||||
MAX_PAGE_SIZE = 30 # Zendesk API maximum
|
||||
MAX_AUTHOR_MAP_SIZE = 50_000 # Reset author map cache if it gets too large
|
||||
_SLIM_BATCH_SIZE = 1000
|
||||
|
||||
|
||||
class ZendeskCredentialsNotSetUpError(PermissionError):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
"Zendesk Credentials are not set up, was load_credentials called?"
|
||||
)
|
||||
|
||||
|
||||
class ZendeskClient:
|
||||
def __init__(
|
||||
self,
|
||||
subdomain: str,
|
||||
email: str,
|
||||
token: str,
|
||||
calls_per_minute: int | None = None,
|
||||
):
|
||||
self.base_url = f"https://{subdomain}.zendesk.com/api/v2"
|
||||
self.auth = (f"{email}/token", token)
|
||||
self.make_request = request_with_rate_limit(self, calls_per_minute)
|
||||
|
||||
|
||||
def request_with_rate_limit(
|
||||
client: ZendeskClient, max_calls_per_minute: int | None = None
|
||||
) -> Callable[[str, dict[str, Any]], dict[str, Any]]:
|
||||
@retry_builder()
|
||||
@(
|
||||
rate_limit_builder(max_calls=max_calls_per_minute, period=60)
|
||||
if max_calls_per_minute
|
||||
else lambda x: x
|
||||
)
|
||||
def make_request(endpoint: str, params: dict[str, Any]) -> dict[str, Any]:
|
||||
response = requests.get(
|
||||
f"{client.base_url}/{endpoint}", auth=client.auth, params=params
|
||||
)
|
||||
|
||||
if response.status_code == 429:
|
||||
retry_after = response.headers.get("Retry-After")
|
||||
if retry_after is not None:
|
||||
# Sleep for the duration indicated by the Retry-After header
|
||||
time.sleep(int(retry_after))
|
||||
|
||||
elif (
|
||||
response.status_code == 403
|
||||
and response.json().get("error") == "SupportProductInactive"
|
||||
):
|
||||
return response.json()
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
return make_request
|
||||
|
||||
|
||||
class ZendeskPageResponse(BaseModel):
|
||||
data: list[dict[str, Any]]
|
||||
meta: dict[str, Any]
|
||||
has_more: bool
|
||||
|
||||
|
||||
def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]:
|
||||
content_tags: dict[str, str] = {}
|
||||
params = {"page[size]": MAX_PAGE_SIZE}
|
||||
|
||||
try:
|
||||
while True:
|
||||
data = client.make_request("guide/content_tags", params)
|
||||
|
||||
for tag in data.get("records", []):
|
||||
content_tags[tag["id"]] = tag["name"]
|
||||
|
||||
# Check if there are more pages
|
||||
if data.get("meta", {}).get("has_more", False):
|
||||
params["page[after]"] = data["meta"]["after_cursor"]
|
||||
else:
|
||||
break
|
||||
|
||||
return content_tags
|
||||
except Exception as e:
|
||||
raise Exception(f"Error fetching content tags: {str(e)}")
|
||||
|
||||
|
||||
def _get_articles(
|
||||
client: ZendeskClient, start_time: int | None = None, page_size: int = MAX_PAGE_SIZE
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
|
||||
if start_time is not None:
|
||||
params["start_time"] = start_time
|
||||
|
||||
while True:
|
||||
data = client.make_request("help_center/articles", params)
|
||||
for article in data["articles"]:
|
||||
yield article
|
||||
|
||||
if not data.get("meta", {}).get("has_more"):
|
||||
break
|
||||
params["page[after]"] = data["meta"]["after_cursor"]
|
||||
|
||||
|
||||
def _get_article_page(
|
||||
client: ZendeskClient,
|
||||
start_time: int | None = None,
|
||||
after_cursor: str | None = None,
|
||||
page_size: int = MAX_PAGE_SIZE,
|
||||
) -> ZendeskPageResponse:
|
||||
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
|
||||
if start_time is not None:
|
||||
params["start_time"] = start_time
|
||||
if after_cursor is not None:
|
||||
params["page[after]"] = after_cursor
|
||||
|
||||
data = client.make_request("help_center/articles", params)
|
||||
return ZendeskPageResponse(
|
||||
data=data["articles"],
|
||||
meta=data["meta"],
|
||||
has_more=bool(data["meta"].get("has_more", False)),
|
||||
)
|
||||
|
||||
|
||||
def _get_tickets(
|
||||
client: ZendeskClient, start_time: int | None = None
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
params = {"start_time": start_time or 0}
|
||||
|
||||
while True:
|
||||
data = client.make_request("incremental/tickets.json", params)
|
||||
for ticket in data["tickets"]:
|
||||
yield ticket
|
||||
|
||||
if not data.get("end_of_stream", False):
|
||||
params["start_time"] = data["end_time"]
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
# TODO: maybe these don't need to be their own functions?
|
||||
def _get_tickets_page(
|
||||
client: ZendeskClient, start_time: int | None = None
|
||||
) -> ZendeskPageResponse:
|
||||
params = {"start_time": start_time or 0}
|
||||
|
||||
# NOTE: for some reason zendesk doesn't seem to be respecting the start_time param
|
||||
# in my local testing with very few tickets. We'll look into it if this becomes an
|
||||
# issue in larger deployments
|
||||
data = client.make_request("incremental/tickets.json", params)
|
||||
if data.get("error") == "SupportProductInactive":
|
||||
raise ValueError(
|
||||
"Zendesk Support Product is not active for this account, No tickets to index"
|
||||
)
|
||||
return ZendeskPageResponse(
|
||||
data=data["tickets"],
|
||||
meta={"end_time": data["end_time"]},
|
||||
has_more=not bool(data.get("end_of_stream", False)),
|
||||
)
|
||||
|
||||
|
||||
def _fetch_author(
|
||||
client: ZendeskClient, author_id: str | int
|
||||
) -> BasicExpertInfo | None:
|
||||
# Skip fetching if author_id is invalid
|
||||
# cast to str to avoid issues with zendesk changing their types
|
||||
if not author_id or str(author_id) == "-1":
|
||||
return None
|
||||
|
||||
try:
|
||||
author_data = client.make_request(f"users/{author_id}", {})
|
||||
user = author_data.get("user")
|
||||
return (
|
||||
BasicExpertInfo(display_name=user.get("name"), email=user.get("email"))
|
||||
if user and user.get("name") and user.get("email")
|
||||
else None
|
||||
)
|
||||
except requests.exceptions.HTTPError:
|
||||
# Handle any API errors gracefully
|
||||
return None
|
||||
|
||||
|
||||
def _article_to_document(
|
||||
article: dict[str, Any],
|
||||
content_tags: dict[str, str],
|
||||
author_map: dict[str, BasicExpertInfo],
|
||||
client: ZendeskClient,
|
||||
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
|
||||
author_id = article.get("author_id")
|
||||
if not author_id:
|
||||
author = None
|
||||
else:
|
||||
author = (
|
||||
author_map.get(author_id)
|
||||
if author_id in author_map
|
||||
else _fetch_author(client, author_id)
|
||||
)
|
||||
|
||||
new_author_mapping = {author_id: author} if author_id and author else None
|
||||
|
||||
updated_at = article.get("updated_at")
|
||||
update_time = time_str_to_utc(updated_at) if updated_at else None
|
||||
|
||||
text = parse_html_page_basic(article.get("body") or "")
|
||||
blob = text.encode("utf-8", errors="replace")
|
||||
# Build metadata
|
||||
metadata: dict[str, str | list[str]] = {
|
||||
"labels": [str(label) for label in article.get("label_names", []) if label],
|
||||
"content_tags": [
|
||||
content_tags[tag_id]
|
||||
for tag_id in article.get("content_tag_ids", [])
|
||||
if tag_id in content_tags
|
||||
],
|
||||
}
|
||||
|
||||
# Remove empty values
|
||||
metadata = {k: v for k, v in metadata.items() if v}
|
||||
|
||||
return new_author_mapping, Document(
|
||||
id=f"article:{article['id']}",
|
||||
source=DocumentSource.ZENDESK,
|
||||
semantic_identifier=article["title"],
|
||||
extension=".txt",
|
||||
blob=blob,
|
||||
size_bytes=len(blob),
|
||||
doc_updated_at=update_time,
|
||||
primary_owners=[author] if author else None,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
def _get_comment_text(
|
||||
comment: dict[str, Any],
|
||||
author_map: dict[str, BasicExpertInfo],
|
||||
client: ZendeskClient,
|
||||
) -> tuple[dict[str, BasicExpertInfo] | None, str]:
|
||||
author_id = comment.get("author_id")
|
||||
if not author_id:
|
||||
author = None
|
||||
else:
|
||||
author = (
|
||||
author_map.get(author_id)
|
||||
if author_id in author_map
|
||||
else _fetch_author(client, author_id)
|
||||
)
|
||||
|
||||
new_author_mapping = {author_id: author} if author_id and author else None
|
||||
|
||||
comment_text = f"Comment{' by ' + author.display_name if author and author.display_name else ''}"
|
||||
comment_text += f"{' at ' + comment['created_at'] if comment.get('created_at') else ''}:\n{comment['body']}"
|
||||
|
||||
return new_author_mapping, comment_text
|
||||
|
||||
|
||||
def _ticket_to_document(
|
||||
ticket: dict[str, Any],
|
||||
author_map: dict[str, BasicExpertInfo],
|
||||
client: ZendeskClient,
|
||||
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
|
||||
submitter_id = ticket.get("submitter")
|
||||
if not submitter_id:
|
||||
submitter = None
|
||||
else:
|
||||
submitter = (
|
||||
author_map.get(submitter_id)
|
||||
if submitter_id in author_map
|
||||
else _fetch_author(client, submitter_id)
|
||||
)
|
||||
|
||||
new_author_mapping = (
|
||||
{submitter_id: submitter} if submitter_id and submitter else None
|
||||
)
|
||||
|
||||
updated_at = ticket.get("updated_at")
|
||||
update_time = time_str_to_utc(updated_at) if updated_at else None
|
||||
|
||||
metadata: dict[str, str | list[str]] = {}
|
||||
if status := ticket.get("status"):
|
||||
metadata["status"] = status
|
||||
if priority := ticket.get("priority"):
|
||||
metadata["priority"] = priority
|
||||
if tags := ticket.get("tags"):
|
||||
metadata["tags"] = tags
|
||||
if ticket_type := ticket.get("type"):
|
||||
metadata["ticket_type"] = ticket_type
|
||||
|
||||
# Fetch comments for the ticket
|
||||
comments_data = client.make_request(f"tickets/{ticket.get('id')}/comments", {})
|
||||
comments = comments_data.get("comments", [])
|
||||
|
||||
comment_texts = []
|
||||
for comment in comments:
|
||||
new_author_mapping, comment_text = _get_comment_text(
|
||||
comment, author_map, client
|
||||
)
|
||||
if new_author_mapping:
|
||||
author_map.update(new_author_mapping)
|
||||
comment_texts.append(comment_text)
|
||||
|
||||
comments_text = "\n\n".join(comment_texts)
|
||||
|
||||
subject = ticket.get("subject")
|
||||
full_text = f"Ticket Subject:\n{subject}\n\nComments:\n{comments_text}"
|
||||
|
||||
blob = full_text.encode("utf-8", errors="replace")
|
||||
return new_author_mapping, Document(
|
||||
id=f"zendesk_ticket_{ticket['id']}",
|
||||
blob=blob,
|
||||
extension=".txt",
|
||||
size_bytes=len(blob),
|
||||
source=DocumentSource.ZENDESK,
|
||||
semantic_identifier=f"Ticket #{ticket['id']}: {subject or 'No Subject'}",
|
||||
doc_updated_at=update_time,
|
||||
primary_owners=[submitter] if submitter else None,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
class ZendeskConnectorCheckpoint(ConnectorCheckpoint):
|
||||
# We use cursor-based paginated retrieval for articles
|
||||
after_cursor_articles: str | None
|
||||
|
||||
# We use timestamp-based paginated retrieval for tickets
|
||||
next_start_time_tickets: int | None
|
||||
|
||||
cached_author_map: dict[str, BasicExpertInfo] | None
|
||||
cached_content_tags: dict[str, str] | None
|
||||
|
||||
|
||||
class ZendeskConnector(
|
||||
SlimConnectorWithPermSync, CheckpointedConnector[ZendeskConnectorCheckpoint]
|
||||
):
|
||||
def __init__(
|
||||
self,
|
||||
content_type: str = "articles",
|
||||
calls_per_minute: int | None = None,
|
||||
) -> None:
|
||||
self.content_type = content_type
|
||||
self.subdomain = ""
|
||||
# Fetch all tags ahead of time
|
||||
self.content_tags: dict[str, str] = {}
|
||||
self.calls_per_minute = calls_per_minute
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
# Subdomain is actually the whole URL
|
||||
subdomain = (
|
||||
credentials["zendesk_subdomain"]
|
||||
.replace("https://", "")
|
||||
.split(".zendesk.com")[0]
|
||||
)
|
||||
self.subdomain = subdomain
|
||||
|
||||
self.client = ZendeskClient(
|
||||
subdomain,
|
||||
credentials["zendesk_email"],
|
||||
credentials["zendesk_token"],
|
||||
calls_per_minute=self.calls_per_minute,
|
||||
)
|
||||
return None
|
||||
|
||||
@override
|
||||
def load_from_checkpoint(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: ZendeskConnectorCheckpoint,
|
||||
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
|
||||
if self.client is None:
|
||||
raise ZendeskCredentialsNotSetUpError()
|
||||
if checkpoint.cached_content_tags is None:
|
||||
checkpoint.cached_content_tags = _get_content_tag_mapping(self.client)
|
||||
return checkpoint # save the content tags to the checkpoint
|
||||
self.content_tags = checkpoint.cached_content_tags
|
||||
|
||||
if self.content_type == "articles":
|
||||
checkpoint = yield from self._retrieve_articles(start, end, checkpoint)
|
||||
return checkpoint
|
||||
elif self.content_type == "tickets":
|
||||
checkpoint = yield from self._retrieve_tickets(start, end, checkpoint)
|
||||
return checkpoint
|
||||
else:
|
||||
raise ValueError(f"Unsupported content_type: {self.content_type}")
|
||||
|
||||
def _retrieve_articles(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None,
|
||||
end: SecondsSinceUnixEpoch | None,
|
||||
checkpoint: ZendeskConnectorCheckpoint,
|
||||
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
|
||||
checkpoint = copy.deepcopy(checkpoint)
|
||||
# This one is built on the fly as there may be more many more authors than tags
|
||||
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
|
||||
after_cursor = checkpoint.after_cursor_articles
|
||||
doc_batch: list[Document] = []
|
||||
|
||||
response = _get_article_page(
|
||||
self.client,
|
||||
start_time=int(start) if start else None,
|
||||
after_cursor=after_cursor,
|
||||
)
|
||||
articles = response.data
|
||||
has_more = response.has_more
|
||||
after_cursor = response.meta.get("after_cursor")
|
||||
for article in articles:
|
||||
if (
|
||||
article.get("body") is None
|
||||
or article.get("draft")
|
||||
or any(
|
||||
label in ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS
|
||||
for label in article.get("label_names", [])
|
||||
)
|
||||
):
|
||||
continue
|
||||
|
||||
try:
|
||||
new_author_map, document = _article_to_document(
|
||||
article, self.content_tags, author_map, self.client
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing article {article['id']}: {e}")
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=f"{article.get('id')}",
|
||||
document_link=article.get("html_url", ""),
|
||||
),
|
||||
failure_message=str(e),
|
||||
exception=e,
|
||||
)
|
||||
continue
|
||||
|
||||
if new_author_map:
|
||||
author_map.update(new_author_map)
|
||||
updated_at = document.doc_updated_at
|
||||
updated_ts = updated_at.timestamp() if updated_at else None
|
||||
if updated_ts is not None:
|
||||
if start is not None and updated_ts <= start:
|
||||
continue
|
||||
if end is not None and updated_ts > end:
|
||||
continue
|
||||
|
||||
doc_batch.append(document)
|
||||
|
||||
if not has_more:
|
||||
yield from doc_batch
|
||||
checkpoint.has_more = False
|
||||
return checkpoint
|
||||
|
||||
# Sometimes no documents are retrieved, but the cursor
|
||||
# is still updated so the connector makes progress.
|
||||
yield from doc_batch
|
||||
checkpoint.after_cursor_articles = after_cursor
|
||||
|
||||
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
|
||||
checkpoint.has_more = bool(
|
||||
end is None
|
||||
or last_doc_updated_at is None
|
||||
or last_doc_updated_at.timestamp() <= end
|
||||
)
|
||||
checkpoint.cached_author_map = (
|
||||
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
|
||||
)
|
||||
return checkpoint
|
||||
|
||||
def _retrieve_tickets(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None,
|
||||
end: SecondsSinceUnixEpoch | None,
|
||||
checkpoint: ZendeskConnectorCheckpoint,
|
||||
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
|
||||
checkpoint = copy.deepcopy(checkpoint)
|
||||
if self.client is None:
|
||||
raise ZendeskCredentialsNotSetUpError()
|
||||
|
||||
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
|
||||
|
||||
doc_batch: list[Document] = []
|
||||
next_start_time = int(checkpoint.next_start_time_tickets or start or 0)
|
||||
ticket_response = _get_tickets_page(self.client, start_time=next_start_time)
|
||||
|
||||
tickets = ticket_response.data
|
||||
has_more = ticket_response.has_more
|
||||
next_start_time = ticket_response.meta["end_time"]
|
||||
for ticket in tickets:
|
||||
if ticket.get("status") == "deleted":
|
||||
continue
|
||||
|
||||
try:
|
||||
new_author_map, document = _ticket_to_document(
|
||||
ticket=ticket,
|
||||
author_map=author_map,
|
||||
client=self.client,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing ticket {ticket['id']}: {e}")
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=f"{ticket.get('id')}",
|
||||
document_link=ticket.get("url", ""),
|
||||
),
|
||||
failure_message=str(e),
|
||||
exception=e,
|
||||
)
|
||||
continue
|
||||
|
||||
if new_author_map:
|
||||
author_map.update(new_author_map)
|
||||
|
||||
updated_at = document.doc_updated_at
|
||||
updated_ts = updated_at.timestamp() if updated_at else None
|
||||
|
||||
if updated_ts is not None:
|
||||
if start is not None and updated_ts <= start:
|
||||
continue
|
||||
if end is not None and updated_ts > end:
|
||||
continue
|
||||
|
||||
doc_batch.append(document)
|
||||
|
||||
if not has_more:
|
||||
yield from doc_batch
|
||||
checkpoint.has_more = False
|
||||
return checkpoint
|
||||
|
||||
yield from doc_batch
|
||||
checkpoint.next_start_time_tickets = next_start_time
|
||||
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
|
||||
checkpoint.has_more = bool(
|
||||
end is None
|
||||
or last_doc_updated_at is None
|
||||
or last_doc_updated_at.timestamp() <= end
|
||||
)
|
||||
checkpoint.cached_author_map = (
|
||||
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
|
||||
)
|
||||
return checkpoint
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
slim_doc_batch: list[SlimDocument] = []
|
||||
if self.content_type == "articles":
|
||||
articles = _get_articles(
|
||||
self.client, start_time=int(start) if start else None
|
||||
)
|
||||
for article in articles:
|
||||
slim_doc_batch.append(
|
||||
SlimDocument(
|
||||
id=f"article:{article['id']}",
|
||||
)
|
||||
)
|
||||
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
|
||||
yield slim_doc_batch
|
||||
slim_doc_batch = []
|
||||
elif self.content_type == "tickets":
|
||||
tickets = _get_tickets(
|
||||
self.client, start_time=int(start) if start else None
|
||||
)
|
||||
for ticket in tickets:
|
||||
slim_doc_batch.append(
|
||||
SlimDocument(
|
||||
id=f"zendesk_ticket_{ticket['id']}",
|
||||
)
|
||||
)
|
||||
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
|
||||
yield slim_doc_batch
|
||||
slim_doc_batch = []
|
||||
else:
|
||||
raise ValueError(f"Unsupported content_type: {self.content_type}")
|
||||
if slim_doc_batch:
|
||||
yield slim_doc_batch
|
||||
|
||||
@override
|
||||
def validate_connector_settings(self) -> None:
|
||||
if self.client is None:
|
||||
raise ZendeskCredentialsNotSetUpError()
|
||||
|
||||
try:
|
||||
_get_article_page(self.client, start_time=0)
|
||||
except HTTPError as e:
|
||||
# Check for HTTP status codes
|
||||
if e.response.status_code == 401:
|
||||
raise CredentialExpiredError(
|
||||
"Your Zendesk credentials appear to be invalid or expired (HTTP 401)."
|
||||
) from e
|
||||
elif e.response.status_code == 403:
|
||||
raise InsufficientPermissionsError(
|
||||
"Your Zendesk token does not have sufficient permissions (HTTP 403)."
|
||||
) from e
|
||||
elif e.response.status_code == 404:
|
||||
raise ConnectorValidationError(
|
||||
"Zendesk resource not found (HTTP 404)."
|
||||
) from e
|
||||
else:
|
||||
raise ConnectorValidationError(
|
||||
f"Unexpected Zendesk error (status={e.response.status_code}): {e}"
|
||||
) from e
|
||||
|
||||
@override
|
||||
def validate_checkpoint_json(
|
||||
self, checkpoint_json: str
|
||||
) -> ZendeskConnectorCheckpoint:
|
||||
return ZendeskConnectorCheckpoint.model_validate_json(checkpoint_json)
|
||||
|
||||
@override
|
||||
def build_dummy_checkpoint(self) -> ZendeskConnectorCheckpoint:
|
||||
return ZendeskConnectorCheckpoint(
|
||||
after_cursor_articles=None,
|
||||
next_start_time_tickets=None,
|
||||
cached_author_map=None,
|
||||
cached_content_tags=None,
|
||||
has_more=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
|
||||
connector = ZendeskConnector(content_type="articles")
|
||||
connector.load_credentials(
|
||||
{
|
||||
"zendesk_subdomain": os.environ["ZENDESK_SUBDOMAIN"],
|
||||
"zendesk_email": os.environ["ZENDESK_EMAIL"],
|
||||
"zendesk_token": os.environ["ZENDESK_TOKEN"],
|
||||
}
|
||||
)
|
||||
|
||||
current = time.time()
|
||||
one_day_ago = current - 24 * 60 * 60 # 1 day
|
||||
|
||||
checkpoint = connector.build_dummy_checkpoint()
|
||||
|
||||
while checkpoint.has_more:
|
||||
gen = connector.load_from_checkpoint(
|
||||
one_day_ago, current, checkpoint
|
||||
)
|
||||
|
||||
wrapper = CheckpointOutputWrapper()
|
||||
any_doc = False
|
||||
|
||||
for document, failure, next_checkpoint in wrapper(gen):
|
||||
if document:
|
||||
print("got document:", document.id)
|
||||
any_doc = True
|
||||
|
||||
checkpoint = next_checkpoint
|
||||
if any_doc:
|
||||
break
|
||||
@ -1,6 +1,6 @@
|
||||
---
|
||||
sidebar_position: 2
|
||||
slug: /what_is_agent_context_engine
|
||||
slug: /what-is-agent-context-engine
|
||||
---
|
||||
|
||||
# What is Agent context engine?
|
||||
@ -58,4 +58,4 @@ We left behind the label of “yet another RAG system” long ago. From DeepDoc
|
||||
|
||||
We believe tomorrow’s enterprise AI advantage will hinge not on who owns the largest model, but on who can feed that model the highest-quality, most real-time, and most relevant context. An Agentic Context Engine is the critical infrastructure that turns this vision into reality.
|
||||
|
||||
In the paradigm shift from “hand-crafted prompts” to “intelligent context,” RAGFlow is determined to be the most steadfast propeller and enabler. We invite every developer, enterprise, and researcher who cares about the future of AI agents to follow RAGFlow’s journey—so together we can witness and build the cornerstone of the next-generation AI stack.
|
||||
In the paradigm shift from “hand-crafted prompts” to “intelligent context,” RAGFlow is determined to be the most steadfast propeller and enabler. We invite every developer, enterprise, and researcher who cares about the future of AI agents to follow RAGFlow’s journey—so together we can witness and build the cornerstone of the next-generation AI stack.
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
---
|
||||
sidebar_position: 1
|
||||
slug: /what_is_rag
|
||||
slug: /what-is-rag
|
||||
---
|
||||
|
||||
# What is Retreival-Augmented-Generation (RAG)?
|
||||
@ -104,4 +104,4 @@ The evolution of RAG is unfolding along several clear paths:
|
||||
3. Towards context engineering 2.0
|
||||
Current RAG can be viewed as Context Engineering 1.0, whose core is assembling static knowledge context for single Q&A tasks. The forthcoming Context Engineering 2.0 will extend with RAG technology at its core, becoming a system that automatically and dynamically assembles comprehensive context for agents. The context fused by this system will come not only from documents but also include interaction memory, available tools/skills, and real-time environmental information. This marks the transition of agent development from a "handicraft workshop" model to the industrial starting point of automated context engineering.
|
||||
|
||||
The essence of RAG is to build a dedicated, efficient, and trustworthy external data interface for large language models; its core is Retrieval, not Generation. Starting from the practical need to solve private data access, its technical depth is reflected in the optimization of retrieval for complex unstructured data. With its deep integration into agent architectures and its development towards automated context engineering, RAG is evolving from a technology that improves Q&A quality into the core infrastructure for building the next generation of trustworthy, controllable, and scalable intelligent applications.
|
||||
The essence of RAG is to build a dedicated, efficient, and trustworthy external data interface for large language models; its core is Retrieval, not Generation. Starting from the practical need to solve private data access, its technical depth is reflected in the optimization of retrieval for complex unstructured data. With its deep integration into agent architectures and its development towards automated context engineering, RAG is evolving from a technology that improves Q&A quality into the core infrastructure for building the next generation of trustworthy, controllable, and scalable intelligent applications.
|
||||
|
||||
8
docs/guides/admin/_category_.json
Normal file
8
docs/guides/admin/_category_.json
Normal file
@ -0,0 +1,8 @@
|
||||
{
|
||||
"label": "Administration",
|
||||
"position": 6,
|
||||
"link": {
|
||||
"type": "generated-index",
|
||||
"description": "RAGFlow administration"
|
||||
}
|
||||
}
|
||||
@ -1,43 +1,11 @@
|
||||
---
|
||||
sidebar_position: 6
|
||||
slug: /manage_users_and_services
|
||||
sidebar_position: 2
|
||||
slug: /admin_cli
|
||||
---
|
||||
|
||||
# Admin CLI
|
||||
|
||||
# Admin CLI and Admin Service
|
||||
|
||||
|
||||
|
||||
The Admin CLI and Admin Service form a client-server architectural suite for RAGFlow system administration. The Admin CLI serves as an interactive command-line interface that receives instructions and displays execution results from the Admin Service in real-time. This duo enables real-time monitoring of system operational status, supporting visibility into RAGFlow Server services and dependent components including MySQL, Elasticsearch, Redis, and MinIO. In administrator mode, they provide user management capabilities that allow viewing users and performing critical operations—such as user creation, password updates, activation status changes, and comprehensive user data deletion—even when corresponding web interface functionalities are disabled.
|
||||
|
||||
|
||||
|
||||
## Starting the Admin Service
|
||||
|
||||
### Launching from source code
|
||||
|
||||
1. Before start Admin Service, please make sure RAGFlow system is already started.
|
||||
|
||||
2. Launch from source code:
|
||||
|
||||
```bash
|
||||
python admin/server/admin_server.py
|
||||
```
|
||||
|
||||
The service will start and listen for incoming connections from the CLI on the configured port.
|
||||
|
||||
### Using docker image
|
||||
|
||||
1. Before startup, please configure the `docker_compose.yml` file to enable admin server:
|
||||
|
||||
```bash
|
||||
command:
|
||||
- --enable-adminserver
|
||||
```
|
||||
|
||||
2. Start the containers, the service will start and listen for incoming connections from the CLI on the configured port.
|
||||
|
||||
|
||||
The RAGFlow Admin CLI is a command-line-based system administration tool that offers administrators an efficient and flexible method for system interaction and control. Operating on a client-server architecture, it communicates in real-time with the Admin Service, receiving administrator commands and dynamically returning execution results.
|
||||
|
||||
## Using the Admin CLI
|
||||
|
||||
39
docs/guides/admin/admin_service.md
Normal file
39
docs/guides/admin/admin_service.md
Normal file
@ -0,0 +1,39 @@
|
||||
---
|
||||
sidebar_position: 0
|
||||
slug: /admin_service
|
||||
---
|
||||
|
||||
|
||||
# Admin Service
|
||||
|
||||
The Admin Service is the core backend management service of the RAGFlow system, providing comprehensive system administration capabilities through centralized API interfaces for managing and controlling the entire platform. Adopting a client-server architecture, it supports access and operations via both a Web UI and an Admin CLI, ensuring flexible and efficient execution of administrative tasks.
|
||||
|
||||
The core functions of the Admin Service include real-time monitoring of the operational status of the RAGFlow server and its critical dependent components—such as MySQL, Elasticsearch, Redis, and MinIO—along with full-featured user management. In administrator mode, it enables key operations such as viewing user information, creating users, updating passwords, modifying activation status, and performing complete user data deletion. These functions remain accessible via the Admin CLI even when the web management interface is disabled, ensuring the system stays under control at all times.
|
||||
|
||||
With its unified interface design, the Admin Service combines the convenience of visual administration with the efficiency and stability of command-line operations, serving as a crucial foundation for the reliable operation and secure management of the RAGFlow system.
|
||||
|
||||
## Starting the Admin Service
|
||||
|
||||
### Launching from source code
|
||||
|
||||
1. Before start Admin Service, please make sure RAGFlow system is already started.
|
||||
|
||||
2. Launch from source code:
|
||||
|
||||
```bash
|
||||
python admin/server/admin_server.py
|
||||
```
|
||||
|
||||
The service will start and listen for incoming connections from the CLI on the configured port.
|
||||
|
||||
### Using docker image
|
||||
|
||||
1. Before startup, please configure the `docker_compose.yml` file to enable admin server:
|
||||
|
||||
```bash
|
||||
command:
|
||||
- --enable-adminserver
|
||||
```
|
||||
|
||||
2. Start the containers, the service will start and listen for incoming connections from the CLI on the configured port.
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
---
|
||||
sidebar_position: 7
|
||||
slug: /accessing_admin_ui
|
||||
sidebar_position: 1
|
||||
slug: /admin_ui
|
||||
---
|
||||
|
||||
# Admin UI
|
||||
@ -12,11 +12,18 @@ Key features, improvements and bug fixes in the latest releases.
|
||||
|
||||
Released on December 31, 2025.
|
||||
|
||||
### Improvements
|
||||
|
||||
- Memory: Enhances the stability of memory extraction when all memory types are selected.
|
||||
- RAG: Refines the context window extraction strategy for images and tables.
|
||||
|
||||
|
||||
### Fixed issues
|
||||
|
||||
- Resolved an issue where the RAGFlow Server would fail to start if an empty memory object existed, and corrected the inability to delete a newly created empty Memory.
|
||||
- Improved the stability of memory extraction across all memory types after selection.
|
||||
- Fixed MDX file parsing support.
|
||||
- Memory:
|
||||
- The RAGFlow server failed to start if an empty memory object existed.
|
||||
- Unable to delete a newly created empty Memory.
|
||||
- RAG: MDX file parsing was not supported.
|
||||
|
||||
### Data sources
|
||||
|
||||
@ -50,6 +57,7 @@ Released on December 27, 2025.
|
||||
|
||||
### Improvements
|
||||
|
||||
- RAG: Accelerates GraphRAG generation significantly.
|
||||
- Bumps RAGFlow's document engine, [Infinity](https://github.com/infiniflow/infinity) to v0.6.15 (backward compatible).
|
||||
|
||||
### Data sources
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
@ -32,21 +33,21 @@ from common.doc_store.doc_store_base import OrderByExpr
|
||||
|
||||
|
||||
class KGSearch(Dealer):
|
||||
def _chat(self, llm_bdl, system, history, gen_conf):
|
||||
async def _chat(self, llm_bdl, system, history, gen_conf):
|
||||
response = get_llm_cache(llm_bdl.llm_name, system, history, gen_conf)
|
||||
if response:
|
||||
return response
|
||||
response = llm_bdl.chat(system, history, gen_conf)
|
||||
response = await llm_bdl.async_chat(system, history, gen_conf)
|
||||
if response.find("**ERROR**") >= 0:
|
||||
raise Exception(response)
|
||||
set_llm_cache(llm_bdl.llm_name, system, response, history, gen_conf)
|
||||
return response
|
||||
|
||||
def query_rewrite(self, llm, question, idxnms, kb_ids):
|
||||
async def query_rewrite(self, llm, question, idxnms, kb_ids):
|
||||
ty2ents = get_entity_type2samples(idxnms, kb_ids)
|
||||
hint_prompt = PROMPTS["minirag_query2kwd"].format(query=question,
|
||||
TYPE_POOL=json.dumps(ty2ents, ensure_ascii=False, indent=2))
|
||||
result = self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {})
|
||||
result = await self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {})
|
||||
try:
|
||||
keywords_data = json_repair.loads(result)
|
||||
type_keywords = keywords_data.get("answer_type_keywords", [])
|
||||
@ -138,7 +139,7 @@ class KGSearch(Dealer):
|
||||
idxnms, kb_ids)
|
||||
return self._ent_info_from_(es_res, 0)
|
||||
|
||||
def retrieval(self, question: str,
|
||||
async def retrieval(self, question: str,
|
||||
tenant_ids: str | list[str],
|
||||
kb_ids: list[str],
|
||||
emb_mdl,
|
||||
@ -158,7 +159,7 @@ class KGSearch(Dealer):
|
||||
idxnms = [index_name(tid) for tid in tenant_ids]
|
||||
ty_kwds = []
|
||||
try:
|
||||
ty_kwds, ents = self.query_rewrite(llm, qst, [index_name(tid) for tid in tenant_ids], kb_ids)
|
||||
ty_kwds, ents = await self.query_rewrite(llm, qst, [index_name(tid) for tid in tenant_ids], kb_ids)
|
||||
logging.info(f"Q: {qst}, Types: {ty_kwds}, Entities: {ents}")
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
@ -334,5 +335,5 @@ if __name__ == "__main__":
|
||||
embed_bdl = LLMBundle(args.tenant_id, LLMType.EMBEDDING, kb.embd_id)
|
||||
|
||||
kg = KGSearch(settings.docStoreConn)
|
||||
print(kg.retrieval({"question": args.question, "kb_ids": [kb_id]},
|
||||
search.index_name(kb.tenant_id), [kb_id], embed_bdl, llm_bdl))
|
||||
print(asyncio.run(kg.retrieval({"question": args.question, "kb_ids": [kb_id]},
|
||||
search.index_name(kb.tenant_id), [kb_id], embed_bdl, llm_bdl)))
|
||||
|
||||
@ -49,7 +49,8 @@ from common.data_source import (
|
||||
WebDAVConnector,
|
||||
AirtableConnector,
|
||||
AsanaConnector,
|
||||
ImapConnector
|
||||
ImapConnector,
|
||||
ZendeskConnector,
|
||||
)
|
||||
from common.constants import FileSource, TaskStatus
|
||||
from common.data_source.config import INDEX_BATCH_SIZE
|
||||
@ -915,7 +916,7 @@ class Github(SyncBase):
|
||||
)
|
||||
|
||||
return async_wrapper()
|
||||
|
||||
|
||||
class IMAP(SyncBase):
|
||||
SOURCE_NAME: str = FileSource.IMAP
|
||||
|
||||
@ -971,6 +972,10 @@ class IMAP(SyncBase):
|
||||
if pending_docs:
|
||||
yield pending_docs
|
||||
|
||||
async def async_wrapper():
|
||||
for batch in document_batches():
|
||||
yield batch
|
||||
|
||||
logging.info(
|
||||
"Connect to IMAP: host(%s) port(%s) user(%s) folder(%s) %s",
|
||||
self.conf["imap_host"],
|
||||
@ -979,7 +984,87 @@ class IMAP(SyncBase):
|
||||
self.conf["imap_mailbox"],
|
||||
begin_info
|
||||
)
|
||||
return document_batches()
|
||||
return async_wrapper()
|
||||
|
||||
class Zendesk(SyncBase):
|
||||
|
||||
SOURCE_NAME: str = FileSource.ZENDESK
|
||||
async def _generate(self, task: dict):
|
||||
self.connector = ZendeskConnector(content_type=self.conf.get("zendesk_content_type"))
|
||||
self.connector.load_credentials(self.conf["credentials"])
|
||||
|
||||
end_time = datetime.now(timezone.utc).timestamp()
|
||||
if task["reindex"] == "1" or not task.get("poll_range_start"):
|
||||
start_time = 0
|
||||
begin_info = "totally"
|
||||
else:
|
||||
start_time = task["poll_range_start"].timestamp()
|
||||
begin_info = f"from {task['poll_range_start']}"
|
||||
|
||||
raw_batch_size = (
|
||||
self.conf.get("sync_batch_size")
|
||||
or self.conf.get("batch_size")
|
||||
or INDEX_BATCH_SIZE
|
||||
)
|
||||
try:
|
||||
batch_size = int(raw_batch_size)
|
||||
except (TypeError, ValueError):
|
||||
batch_size = INDEX_BATCH_SIZE
|
||||
|
||||
if batch_size <= 0:
|
||||
batch_size = INDEX_BATCH_SIZE
|
||||
|
||||
def document_batches():
|
||||
checkpoint = self.connector.build_dummy_checkpoint()
|
||||
pending_docs = []
|
||||
iterations = 0
|
||||
iteration_limit = 100_000
|
||||
|
||||
while checkpoint.has_more:
|
||||
wrapper = CheckpointOutputWrapper()
|
||||
doc_generator = wrapper(
|
||||
self.connector.load_from_checkpoint(
|
||||
start_time, end_time, checkpoint
|
||||
)
|
||||
)
|
||||
|
||||
for document, failure, next_checkpoint in doc_generator:
|
||||
if failure is not None:
|
||||
logging.warning(
|
||||
"Zendesk connector failure: %s",
|
||||
getattr(failure, "failure_message", failure),
|
||||
)
|
||||
continue
|
||||
|
||||
if document is not None:
|
||||
pending_docs.append(document)
|
||||
if len(pending_docs) >= batch_size:
|
||||
yield pending_docs
|
||||
pending_docs = []
|
||||
|
||||
if next_checkpoint is not None:
|
||||
checkpoint = next_checkpoint
|
||||
|
||||
iterations += 1
|
||||
if iterations > iteration_limit:
|
||||
raise RuntimeError(
|
||||
"Too many iterations while loading Zendesk documents."
|
||||
)
|
||||
|
||||
if pending_docs:
|
||||
yield pending_docs
|
||||
|
||||
async def async_wrapper():
|
||||
for batch in document_batches():
|
||||
yield batch
|
||||
|
||||
logging.info(
|
||||
"Connect to Zendesk: subdomain(%s) %s",
|
||||
self.conf['credentials'].get("zendesk_subdomain"),
|
||||
begin_info,
|
||||
)
|
||||
|
||||
return async_wrapper()
|
||||
|
||||
|
||||
class Gitlab(SyncBase):
|
||||
@ -1043,6 +1128,7 @@ func_factory = {
|
||||
FileSource.AIRTABLE: Airtable,
|
||||
FileSource.ASANA: Asana,
|
||||
FileSource.IMAP: IMAP,
|
||||
FileSource.ZENDESK: Zendesk,
|
||||
FileSource.GITHUB: Github,
|
||||
FileSource.GITLAB: Gitlab,
|
||||
}
|
||||
|
||||
8
web/src/assets/svg/data-source/zendesk.svg
Normal file
8
web/src/assets/svg/data-source/zendesk.svg
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Uploaded to: SVG Repo, www.svgrepo.com, Generator: SVG Repo Mixer Tools -->
|
||||
<svg width="800px" height="800px" viewBox="0 -30.5 256 256" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" preserveAspectRatio="xMidYMid">
|
||||
<g>
|
||||
<path d="M118.249172,51.2326115 L118.249172,194.005605 L0,194.005605 L118.249172,51.2326115 Z M118.249172,2.84217094e-14 C118.249172,32.6440764 91.7686624,59.124586 59.124586,59.124586 C26.4805096,59.124586 0,32.6440764 0,2.84217094e-14 L118.249172,2.84217094e-14 Z M137.750828,194.005605 C137.750828,161.328917 164.198726,134.881019 196.875414,134.881019 C229.552102,134.881019 256,161.361529 256,194.005605 L137.750828,194.005605 Z M137.750828,142.740382 L137.750828,0 L256,0 L137.750828,142.740382 Z" fill="#03363D">
|
||||
|
||||
</path>
|
||||
</g>
|
||||
|
After Width: | Height: | Size: 864 B |
@ -29,6 +29,7 @@ export enum DataSourceKey {
|
||||
ASANA = 'asana',
|
||||
IMAP = 'imap',
|
||||
GITHUB = 'github',
|
||||
ZENDESK = 'zendesk',
|
||||
// SHAREPOINT = 'sharepoint',
|
||||
// SLACK = 'slack',
|
||||
// TEAMS = 'teams',
|
||||
@ -133,6 +134,11 @@ export const generateDataSourceInfo = (t: TFunction) => {
|
||||
description: t(`setting.${DataSourceKey.IMAP}Description`),
|
||||
icon: <SvgIcon name={'data-source/imap'} width={38} />,
|
||||
},
|
||||
[DataSourceKey.ZENDESK]: {
|
||||
name: 'Zendesk',
|
||||
description: t(`setting.${DataSourceKey.ZENDESK}Description`),
|
||||
icon: <SvgIcon name={'data-source/zendesk'} width={38} />,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
@ -822,6 +828,36 @@ export const DataSourceFormFields = {
|
||||
required: false,
|
||||
},
|
||||
],
|
||||
[DataSourceKey.ZENDESK]: [
|
||||
{
|
||||
label: 'Zendesk Domain',
|
||||
name: 'config.credentials.zendesk_subdomain',
|
||||
type: FormFieldType.Text,
|
||||
required: true,
|
||||
},
|
||||
{
|
||||
label: 'Zendesk Email',
|
||||
name: 'config.credentials.zendesk_email',
|
||||
type: FormFieldType.Text,
|
||||
required: true,
|
||||
},
|
||||
{
|
||||
label: 'Zendesk Token',
|
||||
name: 'config.credentials.zendesk_token',
|
||||
type: FormFieldType.Password,
|
||||
required: true,
|
||||
},
|
||||
{
|
||||
label: 'Content',
|
||||
name: 'config.zendesk_content_type',
|
||||
type: FormFieldType.Segmented,
|
||||
required: true,
|
||||
options: [
|
||||
{ label: 'Articles', value: 'articles' },
|
||||
{ label: 'Tickets', value: 'tickets' },
|
||||
],
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
export const DataSourceFormDefaultValues = {
|
||||
@ -1076,4 +1112,17 @@ export const DataSourceFormDefaultValues = {
|
||||
},
|
||||
},
|
||||
},
|
||||
[DataSourceKey.ZENDESK]: {
|
||||
name: '',
|
||||
source: DataSourceKey.ZENDESK,
|
||||
config: {
|
||||
name: '',
|
||||
zendesk_content_type: 'articles',
|
||||
credentials: {
|
||||
zendesk_subdomain: '',
|
||||
zendesk_email: '',
|
||||
zendesk_token: '',
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user