diff --git a/common/constants.py b/common/constants.py index d9e75f66a..a09832eed 100644 --- a/common/constants.py +++ b/common/constants.py @@ -118,6 +118,7 @@ class FileSource(StrEnum): SHAREPOINT = "sharepoint" SLACK = "slack" TEAMS = "teams" + WEBDAV = "webdav" MOODLE = "moodle" DROPBOX = "dropbox" diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index e6480e65f..66d393884 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -14,6 +14,7 @@ from .google_drive.connector import GoogleDriveConnector from .jira.connector import JiraConnector from .sharepoint_connector import SharePointConnector from .teams_connector import TeamsConnector +from .webdav_connector import WebDAVConnector from .moodle_connector import MoodleConnector from .config import BlobType, DocumentSource from .models import Document, TextSection, ImageSection, BasicExpertInfo @@ -37,6 +38,7 @@ __all__ = [ "JiraConnector", "SharePointConnector", "TeamsConnector", + "WebDAVConnector", "MoodleConnector", "BlobType", "DocumentSource", diff --git a/common/data_source/config.py b/common/data_source/config.py index 751d1f33c..a643e3d41 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -48,6 +48,7 @@ class DocumentSource(str, Enum): GOOGLE_DRIVE = "google_drive" GMAIL = "gmail" DISCORD = "discord" + WEBDAV = "webdav" MOODLE = "moodle" S3_COMPATIBLE = "s3_compatible" DROPBOX = "dropbox" diff --git a/common/data_source/webdav_connector.py b/common/data_source/webdav_connector.py new file mode 100644 index 000000000..6f96b500d --- /dev/null +++ b/common/data_source/webdav_connector.py @@ -0,0 +1,370 @@ +"""WebDAV connector""" +import logging +import os +from datetime import datetime, timezone +from typing import Any, Optional + +from webdav4.client import Client as WebDAVClient + +from common.data_source.utils import ( + get_file_ext, +) +from common.data_source.config import DocumentSource, INDEX_BATCH_SIZE, BLOB_STORAGE_SIZE_THRESHOLD +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + CredentialExpiredError, + InsufficientPermissionsError +) +from common.data_source.interfaces import LoadConnector, PollConnector +from common.data_source.models import Document, SecondsSinceUnixEpoch, GenerateDocumentsOutput + + +class WebDAVConnector(LoadConnector, PollConnector): + """WebDAV connector for syncing files from WebDAV servers""" + + def __init__( + self, + base_url: str, + remote_path: str = "/", + batch_size: int = INDEX_BATCH_SIZE, + ) -> None: + """Initialize WebDAV connector + + Args: + base_url: Base URL of the WebDAV server (e.g., "https://webdav.example.com") + remote_path: Remote path to sync from (default: "/") + batch_size: Number of documents per batch + """ + self.base_url = base_url.rstrip("/") + if not remote_path: + remote_path = "/" + if not remote_path.startswith("/"): + remote_path = f"/{remote_path}" + if remote_path.endswith("/") and remote_path != "/": + remote_path = remote_path.rstrip("/") + self.remote_path = remote_path + self.batch_size = batch_size + self.client: Optional[WebDAVClient] = None + self._allow_images: bool | None = None + self.size_threshold: int | None = BLOB_STORAGE_SIZE_THRESHOLD + + def set_allow_images(self, allow_images: bool) -> None: + """Set whether to process images""" + logging.info(f"Setting allow_images to {allow_images}.") + self._allow_images = allow_images + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + """Load credentials and initialize WebDAV client + + Args: + credentials: Dictionary containing 'username' and 'password' + + Returns: + None + + Raises: + ConnectorMissingCredentialError: If required credentials are missing + """ + logging.debug(f"Loading credentials for WebDAV server {self.base_url}") + + username = credentials.get("username") + password = credentials.get("password") + + if not username or not password: + raise ConnectorMissingCredentialError( + "WebDAV requires 'username' and 'password' credentials" + ) + + try: + # Initialize WebDAV client + self.client = WebDAVClient( + base_url=self.base_url, + auth=(username, password) + ) + + # Test connection + self.client.exists(self.remote_path) + + except Exception as e: + logging.error(f"Failed to connect to WebDAV server: {e}") + raise ConnectorMissingCredentialError( + f"Failed to authenticate with WebDAV server: {e}" + ) + + return None + + def _list_files_recursive( + self, + path: str, + start: datetime, + end: datetime, + ) -> list[tuple[str, dict]]: + """Recursively list all files in the given path + + Args: + path: Path to list files from + start: Start datetime for filtering + end: End datetime for filtering + + Returns: + List of tuples containing (file_path, file_info) + """ + if self.client is None: + raise ConnectorMissingCredentialError("WebDAV client not initialized") + + files = [] + + try: + logging.debug(f"Listing directory: {path}") + for item in self.client.ls(path, detail=True): + item_path = item['name'] + + if item_path == path or item_path == path + '/': + continue + + logging.debug(f"Found item: {item_path}, type: {item.get('type')}") + + if item.get('type') == 'directory': + try: + files.extend(self._list_files_recursive(item_path, start, end)) + except Exception as e: + logging.error(f"Error recursing into directory {item_path}: {e}") + continue + else: + try: + modified_time = item.get('modified') + if modified_time: + if isinstance(modified_time, datetime): + modified = modified_time + if modified.tzinfo is None: + modified = modified.replace(tzinfo=timezone.utc) + elif isinstance(modified_time, str): + try: + modified = datetime.strptime(modified_time, '%a, %d %b %Y %H:%M:%S %Z') + modified = modified.replace(tzinfo=timezone.utc) + except (ValueError, TypeError): + try: + modified = datetime.fromisoformat(modified_time.replace('Z', '+00:00')) + except (ValueError, TypeError): + logging.warning(f"Could not parse modified time for {item_path}: {modified_time}") + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + + + logging.debug(f"File {item_path}: modified={modified}, start={start}, end={end}, include={start < modified <= end}") + if start < modified <= end: + files.append((item_path, item)) + else: + logging.debug(f"File {item_path} filtered out by time range") + except Exception as e: + logging.error(f"Error processing file {item_path}: {e}") + continue + + except Exception as e: + logging.error(f"Error listing directory {path}: {e}") + + return files + + def _yield_webdav_documents( + self, + start: datetime, + end: datetime, + ) -> GenerateDocumentsOutput: + """Generate documents from WebDAV server + + Args: + start: Start datetime for filtering + end: End datetime for filtering + + Yields: + Batches of documents + """ + if self.client is None: + raise ConnectorMissingCredentialError("WebDAV client not initialized") + + logging.info(f"Searching for files in {self.remote_path} between {start} and {end}") + files = self._list_files_recursive(self.remote_path, start, end) + logging.info(f"Found {len(files)} files matching time criteria") + + batch: list[Document] = [] + for file_path, file_info in files: + file_name = os.path.basename(file_path) + + size_bytes = file_info.get('size', 0) + if ( + self.size_threshold is not None + and isinstance(size_bytes, int) + and size_bytes > self.size_threshold + ): + logging.warning( + f"{file_name} exceeds size threshold of {self.size_threshold}. Skipping." + ) + continue + + try: + logging.debug(f"Downloading file: {file_path}") + from io import BytesIO + buffer = BytesIO() + self.client.download_fileobj(file_path, buffer) + blob = buffer.getvalue() + + if blob is None or len(blob) == 0: + logging.warning(f"Downloaded content is empty for {file_path}") + continue + + modified_time = file_info.get('modified') + if modified_time: + if isinstance(modified_time, datetime): + modified = modified_time + if modified.tzinfo is None: + modified = modified.replace(tzinfo=timezone.utc) + elif isinstance(modified_time, str): + try: + modified = datetime.strptime(modified_time, '%a, %d %b %Y %H:%M:%S %Z') + modified = modified.replace(tzinfo=timezone.utc) + except (ValueError, TypeError): + try: + modified = datetime.fromisoformat(modified_time.replace('Z', '+00:00')) + except (ValueError, TypeError): + logging.warning(f"Could not parse modified time for {file_path}: {modified_time}") + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + + batch.append( + Document( + id=f"webdav:{self.base_url}:{file_path}", + blob=blob, + source=DocumentSource.WEBDAV, + semantic_identifier=file_name, + extension=get_file_ext(file_name), + doc_updated_at=modified, + size_bytes=size_bytes if size_bytes else 0 + ) + ) + + if len(batch) == self.batch_size: + yield batch + batch = [] + + except Exception as e: + logging.exception(f"Error downloading file {file_path}: {e}") + + if batch: + yield batch + + def load_from_state(self) -> GenerateDocumentsOutput: + """Load all documents from WebDAV server + + Yields: + Batches of documents + """ + logging.debug(f"Loading documents from WebDAV server {self.base_url}") + return self._yield_webdav_documents( + start=datetime(1970, 1, 1, tzinfo=timezone.utc), + end=datetime.now(timezone.utc), + ) + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> GenerateDocumentsOutput: + """Poll WebDAV server for updated documents + + Args: + start: Start timestamp (seconds since Unix epoch) + end: End timestamp (seconds since Unix epoch) + + Yields: + Batches of documents + """ + if self.client is None: + raise ConnectorMissingCredentialError("WebDAV client not initialized") + + start_datetime = datetime.fromtimestamp(start, tz=timezone.utc) + end_datetime = datetime.fromtimestamp(end, tz=timezone.utc) + + for batch in self._yield_webdav_documents(start_datetime, end_datetime): + yield batch + + def validate_connector_settings(self) -> None: + """Validate WebDAV connector settings + + Raises: + ConnectorMissingCredentialError: If credentials are not loaded + ConnectorValidationError: If settings are invalid + """ + if self.client is None: + raise ConnectorMissingCredentialError( + "WebDAV credentials not loaded." + ) + + if not self.base_url: + raise ConnectorValidationError( + "No base URL was provided in connector settings." + ) + + try: + if not self.client.exists(self.remote_path): + raise ConnectorValidationError( + f"Remote path '{self.remote_path}' does not exist on WebDAV server." + ) + + except Exception as e: + error_message = str(e) + + if "401" in error_message or "unauthorized" in error_message.lower(): + raise CredentialExpiredError( + "WebDAV credentials appear invalid or expired." + ) + + if "403" in error_message or "forbidden" in error_message.lower(): + raise InsufficientPermissionsError( + f"Insufficient permissions to access path '{self.remote_path}' on WebDAV server." + ) + + if "404" in error_message or "not found" in error_message.lower(): + raise ConnectorValidationError( + f"Remote path '{self.remote_path}' does not exist on WebDAV server." + ) + + raise ConnectorValidationError( + f"Unexpected WebDAV client error: {e}" + ) + + +if __name__ == "__main__": + credentials_dict = { + "username": os.environ.get("WEBDAV_USERNAME"), + "password": os.environ.get("WEBDAV_PASSWORD"), + } + + connector = WebDAVConnector( + base_url=os.environ.get("WEBDAV_URL") or "https://webdav.example.com", + remote_path=os.environ.get("WEBDAV_PATH") or "/", + ) + + try: + connector.load_credentials(credentials_dict) + connector.validate_connector_settings() + + document_batch_generator = connector.load_from_state() + for document_batch in document_batch_generator: + print("First batch of documents:") + for doc in document_batch: + print(f"Document ID: {doc.id}") + print(f"Semantic Identifier: {doc.semantic_identifier}") + print(f"Source: {doc.source}") + print(f"Updated At: {doc.doc_updated_at}") + print("---") + break + + except ConnectorMissingCredentialError as e: + print(f"Error: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") diff --git a/pyproject.toml b/pyproject.toml index 30cbc1ce7..65c571e54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -116,6 +116,7 @@ dependencies = [ "google-genai>=1.41.0,<2.0.0", "volcengine==1.0.194", "voyageai==0.2.3", + "webdav4>=0.10.0,<0.11.0", "webdriver-manager==4.0.1", "werkzeug==3.0.6", "wikipedia==1.4.0", diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index bc9412205..87e9c2dac 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -37,7 +37,7 @@ from api.db.services.connector_service import ConnectorService, SyncLogsService from api.db.services.knowledgebase_service import KnowledgebaseService from common import settings from common.config_utils import show_configs -from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, MoodleConnector, JiraConnector, DropboxConnector +from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, MoodleConnector, JiraConnector, DropboxConnector, WebDAVConnector from common.constants import FileSource, TaskStatus from common.data_source.config import INDEX_BATCH_SIZE from common.data_source.confluence_connector import ConfluenceConnector @@ -67,6 +67,8 @@ class SyncBase: next_update = datetime(1970, 1, 1, tzinfo=timezone.utc) if task["poll_range_start"]: next_update = task["poll_range_start"] + + failed_docs = 0 for document_batch in document_batch_generator: if not document_batch: continue @@ -87,13 +89,30 @@ class SyncBase: for doc in document_batch ] - e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) - err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) - SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) - doc_num += len(docs) + try: + e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) + err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) + SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) + doc_num += len(docs) + except Exception as batch_ex: + error_msg = str(batch_ex) + error_code = getattr(batch_ex, 'args', (None,))[0] if hasattr(batch_ex, 'args') else None + + if error_code == 1267 or "collation" in error_msg.lower(): + logging.warning(f"Skipping {len(docs)} document(s) due to database collation conflict (error 1267)") + for doc in docs: + logging.debug(f"Skipped: {doc['semantic_identifier']}") + else: + logging.error(f"Error processing batch of {len(docs)} documents: {error_msg}") + + failed_docs += len(docs) + continue prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else "" - logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}") + if failed_docs > 0: + logging.info(f"{prefix}{doc_num} docs synchronized till {next_update} ({failed_docs} skipped)") + else: + logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}") SyncLogsService.done(task["id"], task["connector_id"]) task["poll_range_start"] = next_update @@ -433,6 +452,36 @@ class Teams(SyncBase): pass +class WebDAV(SyncBase): + SOURCE_NAME: str = FileSource.WEBDAV + + async def _generate(self, task: dict): + self.connector = WebDAVConnector( + base_url=self.conf["base_url"], + remote_path=self.conf.get("remote_path", "/") + ) + self.connector.load_credentials(self.conf["credentials"]) + + logging.info(f"Task info: reindex={task['reindex']}, poll_range_start={task['poll_range_start']}") + + if task["reindex"]=="1" or not task["poll_range_start"]: + logging.info("Using load_from_state (full sync)") + document_batch_generator = self.connector.load_from_state() + begin_info = "totally" + else: + start_ts = task["poll_range_start"].timestamp() + end_ts = datetime.now(timezone.utc).timestamp() + logging.info(f"Polling WebDAV from {task['poll_range_start']} (ts: {start_ts}) to now (ts: {end_ts})") + document_batch_generator = self.connector.poll_source(start_ts, end_ts) + begin_info = "from {}".format(task["poll_range_start"]) + + logging.info("Connect to WebDAV: {}(path: {}) {}".format( + self.conf["base_url"], + self.conf.get("remote_path", "/"), + begin_info + )) + return document_batch_generator + class Moodle(SyncBase): SOURCE_NAME: str = FileSource.MOODLE @@ -477,6 +526,7 @@ func_factory = { FileSource.TEAMS: Teams, FileSource.MOODLE: Moodle, FileSource.DROPBOX: Dropbox, + FileSource.WEBDAV: WebDAV, } diff --git a/uv.lock b/uv.lock index a9f3c9c86..449bab5d7 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10, <3.13" resolution-markers = [ "python_full_version >= '3.12' and sys_platform == 'darwin'", @@ -5475,6 +5475,7 @@ dependencies = [ { name = "vertexai" }, { name = "volcengine" }, { name = "voyageai" }, + { name = "webdav4" }, { name = "webdriver-manager" }, { name = "werkzeug" }, { name = "wikipedia" }, @@ -5637,6 +5638,7 @@ requires-dist = [ { name = "vertexai", specifier = "==1.70.0" }, { name = "volcengine", specifier = "==1.0.194" }, { name = "voyageai", specifier = "==0.2.3" }, + { name = "webdav4", specifier = ">=0.10.0,<0.11.0" }, { name = "webdriver-manager", specifier = "==4.0.1" }, { name = "werkzeug", specifier = "==3.0.6" }, { name = "wikipedia", specifier = "==1.4.0" }, @@ -7165,6 +7167,19 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166, upload-time = "2024-01-06T02:10:55.763Z" }, ] +[[package]] +name = "webdav4" +version = "0.10.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "httpx" }, + { name = "python-dateutil" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/08/3d/d604f9d5195689e578f124f196a5d7e80f3106c8404f5c19b2181691de19/webdav4-0.10.0.tar.gz", hash = "sha256:387da6f0ee384e77149dddd9bcfd434afa155882f6c440a529a7cb458624407f", size = 229195, upload-time = "2024-07-13T19:42:42.593Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/60/02/1b77232297fa52f7bedcf70f3ebe3817e9295f302389fb57dd0e6c077329/webdav4-0.10.0-py3-none-any.whl", hash = "sha256:8f915d72483e572089a3af0a2ad20c7e12d04eee9b9134eb718dbfa37af221d8", size = 36350, upload-time = "2024-07-13T19:42:41.087Z" }, +] + [[package]] name = "webdriver-manager" version = "4.0.1" diff --git a/web/src/assets/svg/data-source/webdav.svg b/web/src/assets/svg/data-source/webdav.svg new file mode 100644 index 000000000..a970d38fe --- /dev/null +++ b/web/src/assets/svg/data-source/webdav.svg @@ -0,0 +1,15 @@ + + + + + + + + + + \ No newline at end of file diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index fa3db20ee..906d1b9ec 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -736,6 +736,9 @@ Example: Virtual Hosted Style`, 'Sync pages and databases from Notion for knowledge retrieval.', google_driveDescription: 'Connect your Google Drive via OAuth and sync specific folders or drives.', + webdavDescription: 'Connect to WebDAV servers to sync files.', + webdavRemotePathTip: + 'Optional: Specify a folder path on the WebDAV server (e.g., /Documents). Leave empty to sync from root.', google_driveTokenTip: 'Upload the OAuth token JSON generated from the OAuth helper or Google Cloud Console. You may also upload a client_secret JSON from an "installed" or "web" application. If this is your first sync, a browser window will open to complete the OAuth consent. If the JSON already contains a refresh token, it will be reused automatically.', google_drivePrimaryAdminTip: diff --git a/web/src/pages/user-setting/data-source/contant.tsx b/web/src/pages/user-setting/data-source/contant.tsx index 6bbb64c6c..2ce1c7553 100644 --- a/web/src/pages/user-setting/data-source/contant.tsx +++ b/web/src/pages/user-setting/data-source/contant.tsx @@ -12,6 +12,7 @@ export enum DataSourceKey { MOODLE = 'moodle', // GMAIL = 'gmail', JIRA = 'jira', + WEBDAV = 'webdav', DROPBOX = 'dropbox', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', @@ -54,6 +55,11 @@ export const DataSourceInfo = { description: t(`setting.${DataSourceKey.JIRA}Description`), icon: , }, + [DataSourceKey.WEBDAV]: { + name: 'WebDAV', + description: t(`setting.${DataSourceKey.WEBDAV}Description`), + icon: , + }, [DataSourceKey.DROPBOX]: { name: 'Dropbox', description: t(`setting.${DataSourceKey.DROPBOX}Description`), @@ -429,6 +435,35 @@ export const DataSourceFormFields = { tooltip: t('setting.jiraPasswordTip'), }, ], + [DataSourceKey.WEBDAV]: [ + { + label: 'WebDAV Server URL', + name: 'config.base_url', + type: FormFieldType.Text, + required: true, + placeholder: 'https://webdav.example.com', + }, + { + label: 'Username', + name: 'config.credentials.username', + type: FormFieldType.Text, + required: true, + }, + { + label: 'Password', + name: 'config.credentials.password', + type: FormFieldType.Password, + required: true, + }, + { + label: 'Remote Path', + name: 'config.remote_path', + type: FormFieldType.Text, + required: false, + placeholder: '/', + tooltip: t('setting.webdavRemotePathTip'), + }, + ], [DataSourceKey.DROPBOX]: [ { label: 'Access Token', @@ -546,6 +581,18 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.WEBDAV]: { + name: '', + source: DataSourceKey.WEBDAV, + config: { + base_url: '', + remote_path: '/', + credentials: { + username: '', + password: '', + }, + }, + }, [DataSourceKey.DROPBOX]: { name: '', source: DataSourceKey.DROPBOX,