Compare commits

..

6 Commits

Author SHA1 Message Date
6a664fea3b Docs: Updated v0.23.0 release notes (#12374)
### What problem does this PR solve?


### Type of change


- [x] Documentation Update
2025-12-31 17:10:15 +08:00
dcdc1b0ec7 Fix urls for basic docs (#12372)
### Type of change

- [x] Documentation Update
2025-12-31 17:02:34 +08:00
4af4c36e60 Docs: Added v0.23.1 release notes (#12371)
### What problem does this PR solve?


### Type of change

- [x] Documentation Update
2025-12-31 16:43:56 +08:00
05e5244d94 Refactor docs of RAGFlow admin (#12361)
### What problem does this PR solve?

as title

### Type of change

- [x] Documentation Update

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2025-12-31 14:42:53 +08:00
c2ee2bf7fe Feat: add Zendesk data source integration with configuration and sync capabilities (#12344)
### What problem does this PR solve?
issue:
#12313
change:
add Zendesk data source integration with configuration and sync
capabilities

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
2025-12-31 14:40:49 +08:00
461c81e14a Fix: KG search issue. (#12364)
### What problem does this PR solve?

Close #12347

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
2025-12-31 14:40:27 +08:00
22 changed files with 1005 additions and 65 deletions

View File

@ -202,7 +202,7 @@ class Retrieval(ToolBase, ABC):
kbinfos["chunks"] = settings.retriever.retrieval_by_children(kbinfos["chunks"],
[kb.tenant_id for kb in kbs])
if self._param.use_kg:
ck = settings.kg_retriever.retrieval(query,
ck = await settings.kg_retriever.retrieval(query,
[kb.tenant_id for kb in kbs],
kb_ids,
embd_mdl,
@ -215,7 +215,7 @@ class Retrieval(ToolBase, ABC):
kbinfos = {"chunks": [], "doc_aggs": []}
if self._param.use_kg and kbs:
ck = settings.kg_retriever.retrieval(query, [kb.tenant_id for kb in kbs], filtered_kb_ids, embd_mdl,
ck = await settings.kg_retriever.retrieval(query, [kb.tenant_id for kb in kbs], filtered_kb_ids, embd_mdl,
LLMBundle(kbs[0].tenant_id, LLMType.CHAT))
if self.check_if_canceled("Retrieval processing"):
return

View File

@ -381,7 +381,7 @@ async def retrieval_test():
rank_feature=labels
)
if use_kg:
ck = settings.kg_retriever.retrieval(_question,
ck = await settings.kg_retriever.retrieval(_question,
tenant_ids,
kb_ids,
embd_mdl,

View File

@ -150,7 +150,7 @@ async def retrieval(tenant_id):
)
if use_kg:
ck = settings.kg_retriever.retrieval(question,
ck = await settings.kg_retriever.retrieval(question,
[tenant_id],
[kb_id],
embd_mdl,

View File

@ -1579,7 +1579,7 @@ async def retrieval_test(tenant_id):
if cks:
ranks["chunks"] = cks
if use_kg:
ck = settings.kg_retriever.retrieval(question, [k.tenant_id for k in kbs], kb_ids, embd_mdl, LLMBundle(kb.tenant_id, LLMType.CHAT))
ck = await settings.kg_retriever.retrieval(question, [k.tenant_id for k in kbs], kb_ids, embd_mdl, LLMBundle(kb.tenant_id, LLMType.CHAT))
if ck["content_with_weight"]:
ranks["chunks"].insert(0, ck)

View File

@ -1116,7 +1116,7 @@ async def retrieval_test_embedded():
local_doc_ids, rerank_mdl=rerank_mdl, highlight=req.get("highlight"), rank_feature=labels
)
if use_kg:
ck = settings.kg_retriever.retrieval(_question, tenant_ids, kb_ids, embd_mdl,
ck = await settings.kg_retriever.retrieval(_question, tenant_ids, kb_ids, embd_mdl,
LLMBundle(kb.tenant_id, LLMType.CHAT))
if ck["content_with_weight"]:
ranks["chunks"].insert(0, ck)

View File

@ -421,7 +421,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs):
kbinfos["chunks"].extend(tav_res["chunks"])
kbinfos["doc_aggs"].extend(tav_res["doc_aggs"])
if prompt_config.get("use_kg"):
ck = settings.kg_retriever.retrieval(" ".join(questions), tenant_ids, dialog.kb_ids, embd_mdl,
ck = await settings.kg_retriever.retrieval(" ".join(questions), tenant_ids, dialog.kb_ids, embd_mdl,
LLMBundle(dialog.tenant_id, LLMType.CHAT))
if ck["content_with_weight"]:
kbinfos["chunks"].insert(0, ck)

View File

@ -133,6 +133,7 @@ class FileSource(StrEnum):
GITHUB = "github"
GITLAB = "gitlab"
IMAP = "imap"
ZENDESK = "zendesk"
class PipelineTaskType(StrEnum):
PARSE = "Parse"

View File

@ -39,6 +39,7 @@ from .moodle_connector import MoodleConnector
from .airtable_connector import AirtableConnector
from .asana_connector import AsanaConnector
from .imap_connector import ImapConnector
from .zendesk_connector import ZendeskConnector
from .config import BlobType, DocumentSource
from .models import Document, TextSection, ImageSection, BasicExpertInfo
from .exceptions import (
@ -76,5 +77,6 @@ __all__ = [
"UnexpectedValidationError",
"AirtableConnector",
"AsanaConnector",
"ImapConnector"
"ImapConnector",
"ZendeskConnector",
]

View File

@ -58,8 +58,9 @@ class DocumentSource(str, Enum):
GITHUB = "github"
GITLAB = "gitlab"
IMAP = "imap"
ZENDESK = "zendesk"
class FileOrigin(str, Enum):
"""File origins"""
CONNECTOR = "connector"
@ -271,6 +272,10 @@ IMAP_CONNECTOR_SIZE_THRESHOLD = int(
os.environ.get("IMAP_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024)
)
ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS = os.environ.get(
"ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS", ""
).split(",")
_USER_NOT_FOUND = "Unknown Confluence User"
_COMMENT_EXPANSION_FIELDS = ["body.storage.value"]

View File

@ -1149,3 +1149,101 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R
future_to_index[executor.submit(_next_or_none, ind, gens[ind])] = next_ind
next_ind += 1
del future_to_index[future]
F = TypeVar("F", bound=Callable[..., Any])
class _RateLimitDecorator:
"""Builds a generic wrapper/decorator for calls to external APIs that
prevents making more than `max_calls` requests per `period`
Implementation inspired by the `ratelimit` library:
https://github.com/tomasbasham/ratelimit.
NOTE: is not thread safe.
"""
def __init__(
self,
max_calls: int,
period: float, # in seconds
sleep_time: float = 2, # in seconds
sleep_backoff: float = 2, # applies exponential backoff
max_num_sleep: int = 0,
):
self.max_calls = max_calls
self.period = period
self.sleep_time = sleep_time
self.sleep_backoff = sleep_backoff
self.max_num_sleep = max_num_sleep
self.call_history: list[float] = []
self.curr_calls = 0
def __call__(self, func: F) -> F:
@wraps(func)
def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any:
# cleanup calls which are no longer relevant
self._cleanup()
# check if we've exceeded the rate limit
sleep_cnt = 0
while len(self.call_history) == self.max_calls:
sleep_time = self.sleep_time * (self.sleep_backoff**sleep_cnt)
logging.warning(
f"Rate limit exceeded for function {func.__name__}. "
f"Waiting {sleep_time} seconds before retrying."
)
time.sleep(sleep_time)
sleep_cnt += 1
if self.max_num_sleep != 0 and sleep_cnt >= self.max_num_sleep:
raise RateLimitTriedTooManyTimesError(
f"Exceeded '{self.max_num_sleep}' retries for function '{func.__name__}'"
)
self._cleanup()
# add the current call to the call history
self.call_history.append(time.monotonic())
return func(*args, **kwargs)
return cast(F, wrapped_func)
def _cleanup(self) -> None:
curr_time = time.monotonic()
time_to_expire_before = curr_time - self.period
self.call_history = [
call_time
for call_time in self.call_history
if call_time > time_to_expire_before
]
rate_limit_builder = _RateLimitDecorator
def retry_builder(
tries: int = 20,
delay: float = 0.1,
max_delay: float | None = 60,
backoff: float = 2,
jitter: tuple[float, float] | float = 1,
exceptions: type[Exception] | tuple[type[Exception], ...] = (Exception,),
) -> Callable[[F], F]:
"""Builds a generic wrapper/decorator for calls to external APIs that
may fail due to rate limiting, flakes, or other reasons. Applies exponential
backoff with jitter to retry the call."""
def retry_with_default(func: F) -> F:
@retry(
tries=tries,
delay=delay,
max_delay=max_delay,
backoff=backoff,
jitter=jitter,
logger=logging.getLogger(__name__),
exceptions=exceptions,
)
def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any:
return func(*args, **kwargs)
return cast(F, wrapped_func)
return retry_with_default

View File

@ -0,0 +1,667 @@
import copy
import logging
import time
from collections.abc import Callable
from collections.abc import Iterator
from typing import Any
import requests
from pydantic import BaseModel
from requests.exceptions import HTTPError
from typing_extensions import override
from common.data_source.config import ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS, DocumentSource
from common.data_source.exceptions import ConnectorValidationError, CredentialExpiredError, InsufficientPermissionsError
from common.data_source.html_utils import parse_html_page_basic
from common.data_source.interfaces import CheckpointOutput, CheckpointOutputWrapper, CheckpointedConnector, IndexingHeartbeatInterface, SlimConnectorWithPermSync
from common.data_source.models import BasicExpertInfo, ConnectorCheckpoint, ConnectorFailure, Document, DocumentFailure, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument
from common.data_source.utils import retry_builder, time_str_to_utc,rate_limit_builder
MAX_PAGE_SIZE = 30 # Zendesk API maximum
MAX_AUTHOR_MAP_SIZE = 50_000 # Reset author map cache if it gets too large
_SLIM_BATCH_SIZE = 1000
class ZendeskCredentialsNotSetUpError(PermissionError):
def __init__(self) -> None:
super().__init__(
"Zendesk Credentials are not set up, was load_credentials called?"
)
class ZendeskClient:
def __init__(
self,
subdomain: str,
email: str,
token: str,
calls_per_minute: int | None = None,
):
self.base_url = f"https://{subdomain}.zendesk.com/api/v2"
self.auth = (f"{email}/token", token)
self.make_request = request_with_rate_limit(self, calls_per_minute)
def request_with_rate_limit(
client: ZendeskClient, max_calls_per_minute: int | None = None
) -> Callable[[str, dict[str, Any]], dict[str, Any]]:
@retry_builder()
@(
rate_limit_builder(max_calls=max_calls_per_minute, period=60)
if max_calls_per_minute
else lambda x: x
)
def make_request(endpoint: str, params: dict[str, Any]) -> dict[str, Any]:
response = requests.get(
f"{client.base_url}/{endpoint}", auth=client.auth, params=params
)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After")
if retry_after is not None:
# Sleep for the duration indicated by the Retry-After header
time.sleep(int(retry_after))
elif (
response.status_code == 403
and response.json().get("error") == "SupportProductInactive"
):
return response.json()
response.raise_for_status()
return response.json()
return make_request
class ZendeskPageResponse(BaseModel):
data: list[dict[str, Any]]
meta: dict[str, Any]
has_more: bool
def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]:
content_tags: dict[str, str] = {}
params = {"page[size]": MAX_PAGE_SIZE}
try:
while True:
data = client.make_request("guide/content_tags", params)
for tag in data.get("records", []):
content_tags[tag["id"]] = tag["name"]
# Check if there are more pages
if data.get("meta", {}).get("has_more", False):
params["page[after]"] = data["meta"]["after_cursor"]
else:
break
return content_tags
except Exception as e:
raise Exception(f"Error fetching content tags: {str(e)}")
def _get_articles(
client: ZendeskClient, start_time: int | None = None, page_size: int = MAX_PAGE_SIZE
) -> Iterator[dict[str, Any]]:
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
if start_time is not None:
params["start_time"] = start_time
while True:
data = client.make_request("help_center/articles", params)
for article in data["articles"]:
yield article
if not data.get("meta", {}).get("has_more"):
break
params["page[after]"] = data["meta"]["after_cursor"]
def _get_article_page(
client: ZendeskClient,
start_time: int | None = None,
after_cursor: str | None = None,
page_size: int = MAX_PAGE_SIZE,
) -> ZendeskPageResponse:
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
if start_time is not None:
params["start_time"] = start_time
if after_cursor is not None:
params["page[after]"] = after_cursor
data = client.make_request("help_center/articles", params)
return ZendeskPageResponse(
data=data["articles"],
meta=data["meta"],
has_more=bool(data["meta"].get("has_more", False)),
)
def _get_tickets(
client: ZendeskClient, start_time: int | None = None
) -> Iterator[dict[str, Any]]:
params = {"start_time": start_time or 0}
while True:
data = client.make_request("incremental/tickets.json", params)
for ticket in data["tickets"]:
yield ticket
if not data.get("end_of_stream", False):
params["start_time"] = data["end_time"]
else:
break
# TODO: maybe these don't need to be their own functions?
def _get_tickets_page(
client: ZendeskClient, start_time: int | None = None
) -> ZendeskPageResponse:
params = {"start_time": start_time or 0}
# NOTE: for some reason zendesk doesn't seem to be respecting the start_time param
# in my local testing with very few tickets. We'll look into it if this becomes an
# issue in larger deployments
data = client.make_request("incremental/tickets.json", params)
if data.get("error") == "SupportProductInactive":
raise ValueError(
"Zendesk Support Product is not active for this account, No tickets to index"
)
return ZendeskPageResponse(
data=data["tickets"],
meta={"end_time": data["end_time"]},
has_more=not bool(data.get("end_of_stream", False)),
)
def _fetch_author(
client: ZendeskClient, author_id: str | int
) -> BasicExpertInfo | None:
# Skip fetching if author_id is invalid
# cast to str to avoid issues with zendesk changing their types
if not author_id or str(author_id) == "-1":
return None
try:
author_data = client.make_request(f"users/{author_id}", {})
user = author_data.get("user")
return (
BasicExpertInfo(display_name=user.get("name"), email=user.get("email"))
if user and user.get("name") and user.get("email")
else None
)
except requests.exceptions.HTTPError:
# Handle any API errors gracefully
return None
def _article_to_document(
article: dict[str, Any],
content_tags: dict[str, str],
author_map: dict[str, BasicExpertInfo],
client: ZendeskClient,
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
author_id = article.get("author_id")
if not author_id:
author = None
else:
author = (
author_map.get(author_id)
if author_id in author_map
else _fetch_author(client, author_id)
)
new_author_mapping = {author_id: author} if author_id and author else None
updated_at = article.get("updated_at")
update_time = time_str_to_utc(updated_at) if updated_at else None
text = parse_html_page_basic(article.get("body") or "")
blob = text.encode("utf-8", errors="replace")
# Build metadata
metadata: dict[str, str | list[str]] = {
"labels": [str(label) for label in article.get("label_names", []) if label],
"content_tags": [
content_tags[tag_id]
for tag_id in article.get("content_tag_ids", [])
if tag_id in content_tags
],
}
# Remove empty values
metadata = {k: v for k, v in metadata.items() if v}
return new_author_mapping, Document(
id=f"article:{article['id']}",
source=DocumentSource.ZENDESK,
semantic_identifier=article["title"],
extension=".txt",
blob=blob,
size_bytes=len(blob),
doc_updated_at=update_time,
primary_owners=[author] if author else None,
metadata=metadata,
)
def _get_comment_text(
comment: dict[str, Any],
author_map: dict[str, BasicExpertInfo],
client: ZendeskClient,
) -> tuple[dict[str, BasicExpertInfo] | None, str]:
author_id = comment.get("author_id")
if not author_id:
author = None
else:
author = (
author_map.get(author_id)
if author_id in author_map
else _fetch_author(client, author_id)
)
new_author_mapping = {author_id: author} if author_id and author else None
comment_text = f"Comment{' by ' + author.display_name if author and author.display_name else ''}"
comment_text += f"{' at ' + comment['created_at'] if comment.get('created_at') else ''}:\n{comment['body']}"
return new_author_mapping, comment_text
def _ticket_to_document(
ticket: dict[str, Any],
author_map: dict[str, BasicExpertInfo],
client: ZendeskClient,
) -> tuple[dict[str, BasicExpertInfo] | None, Document]:
submitter_id = ticket.get("submitter")
if not submitter_id:
submitter = None
else:
submitter = (
author_map.get(submitter_id)
if submitter_id in author_map
else _fetch_author(client, submitter_id)
)
new_author_mapping = (
{submitter_id: submitter} if submitter_id and submitter else None
)
updated_at = ticket.get("updated_at")
update_time = time_str_to_utc(updated_at) if updated_at else None
metadata: dict[str, str | list[str]] = {}
if status := ticket.get("status"):
metadata["status"] = status
if priority := ticket.get("priority"):
metadata["priority"] = priority
if tags := ticket.get("tags"):
metadata["tags"] = tags
if ticket_type := ticket.get("type"):
metadata["ticket_type"] = ticket_type
# Fetch comments for the ticket
comments_data = client.make_request(f"tickets/{ticket.get('id')}/comments", {})
comments = comments_data.get("comments", [])
comment_texts = []
for comment in comments:
new_author_mapping, comment_text = _get_comment_text(
comment, author_map, client
)
if new_author_mapping:
author_map.update(new_author_mapping)
comment_texts.append(comment_text)
comments_text = "\n\n".join(comment_texts)
subject = ticket.get("subject")
full_text = f"Ticket Subject:\n{subject}\n\nComments:\n{comments_text}"
blob = full_text.encode("utf-8", errors="replace")
return new_author_mapping, Document(
id=f"zendesk_ticket_{ticket['id']}",
blob=blob,
extension=".txt",
size_bytes=len(blob),
source=DocumentSource.ZENDESK,
semantic_identifier=f"Ticket #{ticket['id']}: {subject or 'No Subject'}",
doc_updated_at=update_time,
primary_owners=[submitter] if submitter else None,
metadata=metadata,
)
class ZendeskConnectorCheckpoint(ConnectorCheckpoint):
# We use cursor-based paginated retrieval for articles
after_cursor_articles: str | None
# We use timestamp-based paginated retrieval for tickets
next_start_time_tickets: int | None
cached_author_map: dict[str, BasicExpertInfo] | None
cached_content_tags: dict[str, str] | None
class ZendeskConnector(
SlimConnectorWithPermSync, CheckpointedConnector[ZendeskConnectorCheckpoint]
):
def __init__(
self,
content_type: str = "articles",
calls_per_minute: int | None = None,
) -> None:
self.content_type = content_type
self.subdomain = ""
# Fetch all tags ahead of time
self.content_tags: dict[str, str] = {}
self.calls_per_minute = calls_per_minute
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
# Subdomain is actually the whole URL
subdomain = (
credentials["zendesk_subdomain"]
.replace("https://", "")
.split(".zendesk.com")[0]
)
self.subdomain = subdomain
self.client = ZendeskClient(
subdomain,
credentials["zendesk_email"],
credentials["zendesk_token"],
calls_per_minute=self.calls_per_minute,
)
return None
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
if checkpoint.cached_content_tags is None:
checkpoint.cached_content_tags = _get_content_tag_mapping(self.client)
return checkpoint # save the content tags to the checkpoint
self.content_tags = checkpoint.cached_content_tags
if self.content_type == "articles":
checkpoint = yield from self._retrieve_articles(start, end, checkpoint)
return checkpoint
elif self.content_type == "tickets":
checkpoint = yield from self._retrieve_tickets(start, end, checkpoint)
return checkpoint
else:
raise ValueError(f"Unsupported content_type: {self.content_type}")
def _retrieve_articles(
self,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
# This one is built on the fly as there may be more many more authors than tags
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
after_cursor = checkpoint.after_cursor_articles
doc_batch: list[Document] = []
response = _get_article_page(
self.client,
start_time=int(start) if start else None,
after_cursor=after_cursor,
)
articles = response.data
has_more = response.has_more
after_cursor = response.meta.get("after_cursor")
for article in articles:
if (
article.get("body") is None
or article.get("draft")
or any(
label in ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS
for label in article.get("label_names", [])
)
):
continue
try:
new_author_map, document = _article_to_document(
article, self.content_tags, author_map, self.client
)
except Exception as e:
logging.error(f"Error processing article {article['id']}: {e}")
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=f"{article.get('id')}",
document_link=article.get("html_url", ""),
),
failure_message=str(e),
exception=e,
)
continue
if new_author_map:
author_map.update(new_author_map)
updated_at = document.doc_updated_at
updated_ts = updated_at.timestamp() if updated_at else None
if updated_ts is not None:
if start is not None and updated_ts <= start:
continue
if end is not None and updated_ts > end:
continue
doc_batch.append(document)
if not has_more:
yield from doc_batch
checkpoint.has_more = False
return checkpoint
# Sometimes no documents are retrieved, but the cursor
# is still updated so the connector makes progress.
yield from doc_batch
checkpoint.after_cursor_articles = after_cursor
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
checkpoint.has_more = bool(
end is None
or last_doc_updated_at is None
or last_doc_updated_at.timestamp() <= end
)
checkpoint.cached_author_map = (
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
)
return checkpoint
def _retrieve_tickets(
self,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
doc_batch: list[Document] = []
next_start_time = int(checkpoint.next_start_time_tickets or start or 0)
ticket_response = _get_tickets_page(self.client, start_time=next_start_time)
tickets = ticket_response.data
has_more = ticket_response.has_more
next_start_time = ticket_response.meta["end_time"]
for ticket in tickets:
if ticket.get("status") == "deleted":
continue
try:
new_author_map, document = _ticket_to_document(
ticket=ticket,
author_map=author_map,
client=self.client,
)
except Exception as e:
logging.error(f"Error processing ticket {ticket['id']}: {e}")
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=f"{ticket.get('id')}",
document_link=ticket.get("url", ""),
),
failure_message=str(e),
exception=e,
)
continue
if new_author_map:
author_map.update(new_author_map)
updated_at = document.doc_updated_at
updated_ts = updated_at.timestamp() if updated_at else None
if updated_ts is not None:
if start is not None and updated_ts <= start:
continue
if end is not None and updated_ts > end:
continue
doc_batch.append(document)
if not has_more:
yield from doc_batch
checkpoint.has_more = False
return checkpoint
yield from doc_batch
checkpoint.next_start_time_tickets = next_start_time
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
checkpoint.has_more = bool(
end is None
or last_doc_updated_at is None
or last_doc_updated_at.timestamp() <= end
)
checkpoint.cached_author_map = (
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
)
return checkpoint
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
slim_doc_batch: list[SlimDocument] = []
if self.content_type == "articles":
articles = _get_articles(
self.client, start_time=int(start) if start else None
)
for article in articles:
slim_doc_batch.append(
SlimDocument(
id=f"article:{article['id']}",
)
)
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
yield slim_doc_batch
slim_doc_batch = []
elif self.content_type == "tickets":
tickets = _get_tickets(
self.client, start_time=int(start) if start else None
)
for ticket in tickets:
slim_doc_batch.append(
SlimDocument(
id=f"zendesk_ticket_{ticket['id']}",
)
)
if len(slim_doc_batch) >= _SLIM_BATCH_SIZE:
yield slim_doc_batch
slim_doc_batch = []
else:
raise ValueError(f"Unsupported content_type: {self.content_type}")
if slim_doc_batch:
yield slim_doc_batch
@override
def validate_connector_settings(self) -> None:
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
try:
_get_article_page(self.client, start_time=0)
except HTTPError as e:
# Check for HTTP status codes
if e.response.status_code == 401:
raise CredentialExpiredError(
"Your Zendesk credentials appear to be invalid or expired (HTTP 401)."
) from e
elif e.response.status_code == 403:
raise InsufficientPermissionsError(
"Your Zendesk token does not have sufficient permissions (HTTP 403)."
) from e
elif e.response.status_code == 404:
raise ConnectorValidationError(
"Zendesk resource not found (HTTP 404)."
) from e
else:
raise ConnectorValidationError(
f"Unexpected Zendesk error (status={e.response.status_code}): {e}"
) from e
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> ZendeskConnectorCheckpoint:
return ZendeskConnectorCheckpoint.model_validate_json(checkpoint_json)
@override
def build_dummy_checkpoint(self) -> ZendeskConnectorCheckpoint:
return ZendeskConnectorCheckpoint(
after_cursor_articles=None,
next_start_time_tickets=None,
cached_author_map=None,
cached_content_tags=None,
has_more=True,
)
if __name__ == "__main__":
import os
connector = ZendeskConnector(content_type="articles")
connector.load_credentials(
{
"zendesk_subdomain": os.environ["ZENDESK_SUBDOMAIN"],
"zendesk_email": os.environ["ZENDESK_EMAIL"],
"zendesk_token": os.environ["ZENDESK_TOKEN"],
}
)
current = time.time()
one_day_ago = current - 24 * 60 * 60 # 1 day
checkpoint = connector.build_dummy_checkpoint()
while checkpoint.has_more:
gen = connector.load_from_checkpoint(
one_day_ago, current, checkpoint
)
wrapper = CheckpointOutputWrapper()
any_doc = False
for document, failure, next_checkpoint in wrapper(gen):
if document:
print("got document:", document.id)
any_doc = True
checkpoint = next_checkpoint
if any_doc:
break

View File

@ -1,6 +1,6 @@
---
sidebar_position: 2
slug: /what_is_agent_context_engine
slug: /what-is-agent-context-engine
---
# What is Agent context engine?
@ -58,4 +58,4 @@ We left behind the label of “yet another RAG system” long ago. From DeepDoc
We believe tomorrows enterprise AI advantage will hinge not on who owns the largest model, but on who can feed that model the highest-quality, most real-time, and most relevant context. An Agentic Context Engine is the critical infrastructure that turns this vision into reality.
In the paradigm shift from “hand-crafted prompts” to “intelligent context,” RAGFlow is determined to be the most steadfast propeller and enabler. We invite every developer, enterprise, and researcher who cares about the future of AI agents to follow RAGFlows journey—so together we can witness and build the cornerstone of the next-generation AI stack.
In the paradigm shift from “hand-crafted prompts” to “intelligent context,” RAGFlow is determined to be the most steadfast propeller and enabler. We invite every developer, enterprise, and researcher who cares about the future of AI agents to follow RAGFlows journey—so together we can witness and build the cornerstone of the next-generation AI stack.

View File

@ -1,6 +1,6 @@
---
sidebar_position: 1
slug: /what_is_rag
slug: /what-is-rag
---
# What is Retreival-Augmented-Generation (RAG)
@ -104,4 +104,4 @@ The evolution of RAG is unfolding along several clear paths:
3. Towards context engineering 2.0
Current RAG can be viewed as Context Engineering 1.0, whose core is assembling static knowledge context for single Q&A tasks. The forthcoming Context Engineering 2.0 will extend with RAG technology at its core, becoming a system that automatically and dynamically assembles comprehensive context for agents. The context fused by this system will come not only from documents but also include interaction memory, available tools/skills, and real-time environmental information. This marks the transition of agent development from a "handicraft workshop" model to the industrial starting point of automated context engineering.
The essence of RAG is to build a dedicated, efficient, and trustworthy external data interface for large language models; its core is Retrieval, not Generation. Starting from the practical need to solve private data access, its technical depth is reflected in the optimization of retrieval for complex unstructured data. With its deep integration into agent architectures and its development towards automated context engineering, RAG is evolving from a technology that improves Q&A quality into the core infrastructure for building the next generation of trustworthy, controllable, and scalable intelligent applications.
The essence of RAG is to build a dedicated, efficient, and trustworthy external data interface for large language models; its core is Retrieval, not Generation. Starting from the practical need to solve private data access, its technical depth is reflected in the optimization of retrieval for complex unstructured data. With its deep integration into agent architectures and its development towards automated context engineering, RAG is evolving from a technology that improves Q&A quality into the core infrastructure for building the next generation of trustworthy, controllable, and scalable intelligent applications.

View File

@ -0,0 +1,8 @@
{
"label": "Administration",
"position": 6,
"link": {
"type": "generated-index",
"description": "RAGFlow administration"
}
}

View File

@ -1,43 +1,11 @@
---
sidebar_position: 6
slug: /manage_users_and_services
sidebar_position: 2
slug: /admin_cli
---
# Admin CLI
# Admin CLI and Admin Service
The Admin CLI and Admin Service form a client-server architectural suite for RAGFlow system administration. The Admin CLI serves as an interactive command-line interface that receives instructions and displays execution results from the Admin Service in real-time. This duo enables real-time monitoring of system operational status, supporting visibility into RAGFlow Server services and dependent components including MySQL, Elasticsearch, Redis, and MinIO. In administrator mode, they provide user management capabilities that allow viewing users and performing critical operations—such as user creation, password updates, activation status changes, and comprehensive user data deletion—even when corresponding web interface functionalities are disabled.
## Starting the Admin Service
### Launching from source code
1. Before start Admin Service, please make sure RAGFlow system is already started.
2. Launch from source code:
```bash
python admin/server/admin_server.py
```
The service will start and listen for incoming connections from the CLI on the configured port.
### Using docker image
1. Before startup, please configure the `docker_compose.yml` file to enable admin server:
```bash
command:
- --enable-adminserver
```
2. Start the containers, the service will start and listen for incoming connections from the CLI on the configured port.
The RAGFlow Admin CLI is a command-line-based system administration tool that offers administrators an efficient and flexible method for system interaction and control. Operating on a client-server architecture, it communicates in real-time with the Admin Service, receiving administrator commands and dynamically returning execution results.
## Using the Admin CLI

View File

@ -0,0 +1,39 @@
---
sidebar_position: 0
slug: /admin_service
---
# Admin Service
The Admin Service is the core backend management service of the RAGFlow system, providing comprehensive system administration capabilities through centralized API interfaces for managing and controlling the entire platform. Adopting a client-server architecture, it supports access and operations via both a Web UI and an Admin CLI, ensuring flexible and efficient execution of administrative tasks.
The core functions of the Admin Service include real-time monitoring of the operational status of the RAGFlow server and its critical dependent components—such as MySQL, Elasticsearch, Redis, and MinIO—along with full-featured user management. In administrator mode, it enables key operations such as viewing user information, creating users, updating passwords, modifying activation status, and performing complete user data deletion. These functions remain accessible via the Admin CLI even when the web management interface is disabled, ensuring the system stays under control at all times.
With its unified interface design, the Admin Service combines the convenience of visual administration with the efficiency and stability of command-line operations, serving as a crucial foundation for the reliable operation and secure management of the RAGFlow system.
## Starting the Admin Service
### Launching from source code
1. Before start Admin Service, please make sure RAGFlow system is already started.
2. Launch from source code:
```bash
python admin/server/admin_server.py
```
The service will start and listen for incoming connections from the CLI on the configured port.
### Using docker image
1. Before startup, please configure the `docker_compose.yml` file to enable admin server:
```bash
command:
- --enable-adminserver
```
2. Start the containers, the service will start and listen for incoming connections from the CLI on the configured port.

View File

@ -1,6 +1,6 @@
---
sidebar_position: 7
slug: /accessing_admin_ui
sidebar_position: 1
slug: /admin_ui
---
# Admin UI

View File

@ -12,11 +12,18 @@ Key features, improvements and bug fixes in the latest releases.
Released on December 31, 2025.
### Improvements
- Memory: Enhances the stability of memory extraction when all memory types are selected.
- RAG: Refines the context window extraction strategy for images and tables.
### Fixed issues
- Resolved an issue where the RAGFlow Server would fail to start if an empty memory object existed, and corrected the inability to delete a newly created empty Memory.
- Improved the stability of memory extraction across all memory types after selection.
- Fixed MDX file parsing support.
- Memory:
- The RAGFlow server failed to start if an empty memory object existed.
- Unable to delete a newly created empty Memory.
- RAG: MDX file parsing was not supported.
### Data sources
@ -50,6 +57,7 @@ Released on December 27, 2025.
### Improvements
- RAG: Accelerates GraphRAG generation significantly.
- Bumps RAGFlow's document engine, [Infinity](https://github.com/infiniflow/infinity) to v0.6.15 (backward compatible).
### Data sources

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import json
import logging
from collections import defaultdict
@ -32,21 +33,21 @@ from common.doc_store.doc_store_base import OrderByExpr
class KGSearch(Dealer):
def _chat(self, llm_bdl, system, history, gen_conf):
async def _chat(self, llm_bdl, system, history, gen_conf):
response = get_llm_cache(llm_bdl.llm_name, system, history, gen_conf)
if response:
return response
response = llm_bdl.chat(system, history, gen_conf)
response = await llm_bdl.async_chat(system, history, gen_conf)
if response.find("**ERROR**") >= 0:
raise Exception(response)
set_llm_cache(llm_bdl.llm_name, system, response, history, gen_conf)
return response
def query_rewrite(self, llm, question, idxnms, kb_ids):
async def query_rewrite(self, llm, question, idxnms, kb_ids):
ty2ents = get_entity_type2samples(idxnms, kb_ids)
hint_prompt = PROMPTS["minirag_query2kwd"].format(query=question,
TYPE_POOL=json.dumps(ty2ents, ensure_ascii=False, indent=2))
result = self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {})
result = await self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {})
try:
keywords_data = json_repair.loads(result)
type_keywords = keywords_data.get("answer_type_keywords", [])
@ -138,7 +139,7 @@ class KGSearch(Dealer):
idxnms, kb_ids)
return self._ent_info_from_(es_res, 0)
def retrieval(self, question: str,
async def retrieval(self, question: str,
tenant_ids: str | list[str],
kb_ids: list[str],
emb_mdl,
@ -158,7 +159,7 @@ class KGSearch(Dealer):
idxnms = [index_name(tid) for tid in tenant_ids]
ty_kwds = []
try:
ty_kwds, ents = self.query_rewrite(llm, qst, [index_name(tid) for tid in tenant_ids], kb_ids)
ty_kwds, ents = await self.query_rewrite(llm, qst, [index_name(tid) for tid in tenant_ids], kb_ids)
logging.info(f"Q: {qst}, Types: {ty_kwds}, Entities: {ents}")
except Exception as e:
logging.exception(e)
@ -334,5 +335,5 @@ if __name__ == "__main__":
embed_bdl = LLMBundle(args.tenant_id, LLMType.EMBEDDING, kb.embd_id)
kg = KGSearch(settings.docStoreConn)
print(kg.retrieval({"question": args.question, "kb_ids": [kb_id]},
search.index_name(kb.tenant_id), [kb_id], embed_bdl, llm_bdl))
print(asyncio.run(kg.retrieval({"question": args.question, "kb_ids": [kb_id]},
search.index_name(kb.tenant_id), [kb_id], embed_bdl, llm_bdl)))

View File

@ -49,7 +49,8 @@ from common.data_source import (
WebDAVConnector,
AirtableConnector,
AsanaConnector,
ImapConnector
ImapConnector,
ZendeskConnector,
)
from common.constants import FileSource, TaskStatus
from common.data_source.config import INDEX_BATCH_SIZE
@ -915,7 +916,7 @@ class Github(SyncBase):
)
return async_wrapper()
class IMAP(SyncBase):
SOURCE_NAME: str = FileSource.IMAP
@ -971,6 +972,10 @@ class IMAP(SyncBase):
if pending_docs:
yield pending_docs
async def async_wrapper():
for batch in document_batches():
yield batch
logging.info(
"Connect to IMAP: host(%s) port(%s) user(%s) folder(%s) %s",
self.conf["imap_host"],
@ -979,7 +984,87 @@ class IMAP(SyncBase):
self.conf["imap_mailbox"],
begin_info
)
return document_batches()
return async_wrapper()
class Zendesk(SyncBase):
SOURCE_NAME: str = FileSource.ZENDESK
async def _generate(self, task: dict):
self.connector = ZendeskConnector(content_type=self.conf.get("zendesk_content_type"))
self.connector.load_credentials(self.conf["credentials"])
end_time = datetime.now(timezone.utc).timestamp()
if task["reindex"] == "1" or not task.get("poll_range_start"):
start_time = 0
begin_info = "totally"
else:
start_time = task["poll_range_start"].timestamp()
begin_info = f"from {task['poll_range_start']}"
raw_batch_size = (
self.conf.get("sync_batch_size")
or self.conf.get("batch_size")
or INDEX_BATCH_SIZE
)
try:
batch_size = int(raw_batch_size)
except (TypeError, ValueError):
batch_size = INDEX_BATCH_SIZE
if batch_size <= 0:
batch_size = INDEX_BATCH_SIZE
def document_batches():
checkpoint = self.connector.build_dummy_checkpoint()
pending_docs = []
iterations = 0
iteration_limit = 100_000
while checkpoint.has_more:
wrapper = CheckpointOutputWrapper()
doc_generator = wrapper(
self.connector.load_from_checkpoint(
start_time, end_time, checkpoint
)
)
for document, failure, next_checkpoint in doc_generator:
if failure is not None:
logging.warning(
"Zendesk connector failure: %s",
getattr(failure, "failure_message", failure),
)
continue
if document is not None:
pending_docs.append(document)
if len(pending_docs) >= batch_size:
yield pending_docs
pending_docs = []
if next_checkpoint is not None:
checkpoint = next_checkpoint
iterations += 1
if iterations > iteration_limit:
raise RuntimeError(
"Too many iterations while loading Zendesk documents."
)
if pending_docs:
yield pending_docs
async def async_wrapper():
for batch in document_batches():
yield batch
logging.info(
"Connect to Zendesk: subdomain(%s) %s",
self.conf['credentials'].get("zendesk_subdomain"),
begin_info,
)
return async_wrapper()
class Gitlab(SyncBase):
@ -1043,6 +1128,7 @@ func_factory = {
FileSource.AIRTABLE: Airtable,
FileSource.ASANA: Asana,
FileSource.IMAP: IMAP,
FileSource.ZENDESK: Zendesk,
FileSource.GITHUB: Github,
FileSource.GITLAB: Gitlab,
}

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Uploaded to: SVG Repo, www.svgrepo.com, Generator: SVG Repo Mixer Tools -->
<svg width="800px" height="800px" viewBox="0 -30.5 256 256" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" preserveAspectRatio="xMidYMid">
<g>
<path d="M118.249172,51.2326115 L118.249172,194.005605 L0,194.005605 L118.249172,51.2326115 Z M118.249172,2.84217094e-14 C118.249172,32.6440764 91.7686624,59.124586 59.124586,59.124586 C26.4805096,59.124586 0,32.6440764 0,2.84217094e-14 L118.249172,2.84217094e-14 Z M137.750828,194.005605 C137.750828,161.328917 164.198726,134.881019 196.875414,134.881019 C229.552102,134.881019 256,161.361529 256,194.005605 L137.750828,194.005605 Z M137.750828,142.740382 L137.750828,0 L256,0 L137.750828,142.740382 Z" fill="#03363D">
</path>
</g>

After

Width:  |  Height:  |  Size: 864 B

View File

@ -29,6 +29,7 @@ export enum DataSourceKey {
ASANA = 'asana',
IMAP = 'imap',
GITHUB = 'github',
ZENDESK = 'zendesk',
// SHAREPOINT = 'sharepoint',
// SLACK = 'slack',
// TEAMS = 'teams',
@ -133,6 +134,11 @@ export const generateDataSourceInfo = (t: TFunction) => {
description: t(`setting.${DataSourceKey.IMAP}Description`),
icon: <SvgIcon name={'data-source/imap'} width={38} />,
},
[DataSourceKey.ZENDESK]: {
name: 'Zendesk',
description: t(`setting.${DataSourceKey.ZENDESK}Description`),
icon: <SvgIcon name={'data-source/zendesk'} width={38} />,
},
};
};
@ -822,6 +828,36 @@ export const DataSourceFormFields = {
required: false,
},
],
[DataSourceKey.ZENDESK]: [
{
label: 'Zendesk Domain',
name: 'config.credentials.zendesk_subdomain',
type: FormFieldType.Text,
required: true,
},
{
label: 'Zendesk Email',
name: 'config.credentials.zendesk_email',
type: FormFieldType.Text,
required: true,
},
{
label: 'Zendesk Token',
name: 'config.credentials.zendesk_token',
type: FormFieldType.Password,
required: true,
},
{
label: 'Content',
name: 'config.zendesk_content_type',
type: FormFieldType.Segmented,
required: true,
options: [
{ label: 'Articles', value: 'articles' },
{ label: 'Tickets', value: 'tickets' },
],
},
],
};
export const DataSourceFormDefaultValues = {
@ -1076,4 +1112,17 @@ export const DataSourceFormDefaultValues = {
},
},
},
[DataSourceKey.ZENDESK]: {
name: '',
source: DataSourceKey.ZENDESK,
config: {
name: '',
zendesk_content_type: 'articles',
credentials: {
zendesk_subdomain: '',
zendesk_email: '',
zendesk_token: '',
},
},
},
};