From deeae8dba4ccf96c9426add0c78cc164a929ee84 Mon Sep 17 00:00:00 2001 From: Yesid Cano Castro <46203884+yesidc@users.noreply.github.com> Date: Tue, 3 Feb 2026 06:42:05 +0100 Subject: [PATCH] feat(connector): add Seafile as data source (#12945) ### What problem does this PR solve? This PR adds **Seafile** as a new data source connector for RAGFlow. [Seafile](https://www.seafile.com/) is an open-source, self-hosted file sync and share platform widely used by enterprises, universities, and organizations that require data sovereignty and privacy. Users who store documents in Seafile currently have no way to index and search their content through RAGFlow. This connector enables RAGFlow users to: - Connect to self-hosted Seafile servers via API token - Index documents from personal and shared libraries - Support incremental polling for updated files - Seamlessly integrate Seafile-stored documents into their RAG pipelines ### Type of change - [x] New Feature (non-breaking change which adds functionality) ### Changes included - `SeaFileConnector` implementing `LoadConnector` and `PollConnector` interfaces - Support for API token - Recursive file traversal across libraries - Time-based filtering for incremental updates - Seafile logo (sourced from Simple Icons, CC0) - Connector configuration and registration ### Testing - Tested against self-hosted Seafile Community Edition - Verified authentication (token) - Verified document ingestion from personal and shared libraries - Verified incremental polling with time filters --- common/constants.py | 1 + common/data_source/__init__.py | 2 + common/data_source/config.py | 1 + common/data_source/seafile_connector.py | 390 ++++++++++++++++++ rag/svr/sync_data_source.py | 36 ++ web/src/assets/svg/data-source/seafile.svg | 1 + web/src/locales/de.ts | 10 + web/src/locales/en.ts | 10 + .../data-source/constant/index.tsx | 52 +++ 9 files changed, 503 insertions(+) create mode 100644 common/data_source/seafile_connector.py create mode 100644 web/src/assets/svg/data-source/seafile.svg diff --git a/common/constants.py b/common/constants.py index de228bbe8..533e087f9 100644 --- a/common/constants.py +++ b/common/constants.py @@ -135,6 +135,7 @@ class FileSource(StrEnum): IMAP = "imap" BITBUCKET = "bitbucket" ZENDESK = "zendesk" + SEAFILE = "seafile" class PipelineTaskType(StrEnum): diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index 97ce3f18e..a8509d532 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -39,6 +39,7 @@ from .airtable_connector import AirtableConnector from .asana_connector import AsanaConnector from .imap_connector import ImapConnector from .zendesk_connector import ZendeskConnector +from .seafile_connector import SeaFileConnector from .config import BlobType, DocumentSource from .models import Document, TextSection, ImageSection, BasicExpertInfo from .exceptions import ( @@ -77,4 +78,5 @@ __all__ = [ "AsanaConnector", "ImapConnector", "ZendeskConnector", + "SeaFileConnector", ] diff --git a/common/data_source/config.py b/common/data_source/config.py index 843423294..74e37c815 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -63,6 +63,7 @@ class DocumentSource(str, Enum): IMAP = "imap" BITBUCKET = "bitbucket" ZENDESK = "zendesk" + SEAFILE = "seafile" class FileOrigin(str, Enum): diff --git a/common/data_source/seafile_connector.py b/common/data_source/seafile_connector.py new file mode 100644 index 000000000..0181269e8 --- /dev/null +++ b/common/data_source/seafile_connector.py @@ -0,0 +1,390 @@ +"""SeaFile connector""" +import logging +from datetime import datetime, timezone +from typing import Any, Optional + +from retry import retry + +from common.data_source.utils import ( + get_file_ext, + rl_requests, +) +from common.data_source.config import ( + DocumentSource, + INDEX_BATCH_SIZE, + BLOB_STORAGE_SIZE_THRESHOLD, +) +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + CredentialExpiredError, + InsufficientPermissionsError, +) +from common.data_source.interfaces import LoadConnector, PollConnector +from common.data_source.models import ( + Document, + SecondsSinceUnixEpoch, + GenerateDocumentsOutput, +) + +logger = logging.getLogger(__name__) + + +class SeaFileConnector(LoadConnector, PollConnector): + """SeaFile connector for syncing files from SeaFile servers""" + + def __init__( + self, + seafile_url: str, + batch_size: int = INDEX_BATCH_SIZE, + include_shared: bool = True, + ) -> None: + """Initialize SeaFile connector. + + Args: + seafile_url: Base URL of the SeaFile server (e.g., https://seafile.example.com) + batch_size: Number of documents to yield per batch + include_shared: Whether to include shared libraries + """ + + self.seafile_url = seafile_url.rstrip("/") + self.api_url = f"{self.seafile_url}/api2" + self.batch_size = batch_size + self.include_shared = include_shared + self.token: Optional[str] = None + self.current_user_email: Optional[str] = None + self.size_threshold: int = BLOB_STORAGE_SIZE_THRESHOLD + + def _get_headers(self) -> dict[str, str]: + """Get authorization headers for API requests""" + if not self.token: + raise ConnectorMissingCredentialError("SeaFile token not set") + return { + "Authorization": f"Token {self.token}", + "Accept": "application/json", + } + + def _make_get_request(self, endpoint: str, params: Optional[dict] = None): + """Make authenticated GET request""" + url = f"{self.api_url}/{endpoint.lstrip('/')}" + response = rl_requests.get( + url, + headers=self._get_headers(), + params=params, + timeout=60, + ) + return response + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + """Load and validate SeaFile credentials. + + Args: + credentials: Dictionary containing 'seafile_token' or 'username'/'password' + + Returns: + None + + Raises: + ConnectorMissingCredentialError: If required credentials are missing + """ + logger.debug(f"Loading credentials for SeaFile server {self.seafile_url}") + + token = credentials.get("seafile_token") + username = credentials.get("username") + password = credentials.get("password") + + if token: + self.token = token + elif username and password: + self.token = self._authenticate_with_password(username, password) + else: + raise ConnectorMissingCredentialError( + "SeaFile requires 'seafile_token' or 'username'/'password' credentials" + ) + + # Validate token and get current user info + try: + self._validate_token() + except Exception as e: + raise CredentialExpiredError(f"SeaFile token validation failed: {e}") + + return None + + def _authenticate_with_password(self, username: str, password: str) -> str: + """Authenticate with username/password and return API token""" + try: + response = rl_requests.post( + f"{self.api_url}/auth-token/", + data={"username": username, "password": password}, + timeout=30, + ) + response.raise_for_status() + data = response.json() + token = data.get("token") + if not token: + raise CredentialExpiredError("No token returned from SeaFile") + return token + except Exception as e: + raise ConnectorMissingCredentialError( + f"Failed to authenticate with SeaFile: {e}" + ) + + def _validate_token(self) -> dict: + """Validate token by fetching account info""" + response = self._make_get_request("/account/info/") + response.raise_for_status() + account_info = response.json() + self.current_user_email = account_info.get("email") + logger.info(f"SeaFile authenticated as: {self.current_user_email}") + return account_info + + def validate_connector_settings(self) -> None: + """Validate SeaFile connector settings""" + if self.token is None: + raise ConnectorMissingCredentialError("SeaFile credentials not loaded.") + + if not self.seafile_url: + raise ConnectorValidationError("No SeaFile URL was provided.") + + try: + account_info = self._validate_token() + if not account_info.get("email"): + raise InsufficientPermissionsError("Invalid SeaFile API response") + + # Check if we can list libraries + libraries = self._get_libraries() + logger.info(f"SeaFile connection validated. Found {len(libraries)} libraries.") + + except Exception as e: + status = None + resp = getattr(e, "response", None) + if resp is not None: + status = getattr(resp, "status_code", None) + + if status == 401: + raise CredentialExpiredError("SeaFile token is invalid or expired.") + if status == 403: + raise InsufficientPermissionsError( + "Insufficient permissions to access SeaFile API." + ) + raise ConnectorValidationError(f"SeaFile validation failed: {repr(e)}") + + @retry(tries=3, delay=1, backoff=2) + def _get_libraries(self) -> list[dict]: + """Fetch all accessible libraries (repos)""" + response = self._make_get_request("/repos/") + response.raise_for_status() + libraries = response.json() + + logger.debug(f"Found {len(libraries)} total libraries") + + if not self.include_shared and self.current_user_email: + # Filter to only owned libraries + owned_libraries = [ + lib for lib in libraries + if lib.get("owner") == self.current_user_email + or lib.get("owner_email") == self.current_user_email + ] + logger.debug( + f"Filtered to {len(owned_libraries)} owned libraries " + f"(excluded {len(libraries) - len(owned_libraries)} shared)" + ) + return owned_libraries + + return libraries + + @retry(tries=3, delay=1, backoff=2) + def _get_directory_entries(self, repo_id: str, path: str = "/") -> list[dict]: + """Fetch directory entries for a given path""" + try: + response = self._make_get_request( + f"/repos/{repo_id}/dir/", + params={"p": path}, + ) + response.raise_for_status() + return response.json() + except Exception as e: + logger.warning(f"Error fetching directory {path} in repo {repo_id}: {e}") + return [] + + @retry(tries=3, delay=1, backoff=2) + def _get_file_download_link(self, repo_id: str, path: str) -> Optional[str]: + """Get download link for a file""" + try: + response = self._make_get_request( + f"/repos/{repo_id}/file/", + params={"p": path, "reuse": 1}, + ) + response.raise_for_status() + return response.text.strip('"') + except Exception as e: + logger.warning(f"Error getting download link for {path}: {e}") + return None + + def _list_files_recursive( + self, + repo_id: str, + repo_name: str, + path: str, + start: datetime, + end: datetime, + ) -> list[tuple[str, dict, dict]]: + """Recursively list all files in the given path within time range. + + Returns: + List of tuples: (file_path, file_entry, library_info) + """ + files = [] + entries = self._get_directory_entries(repo_id, path) + + for entry in entries: + entry_type = entry.get("type") + entry_name = entry.get("name", "") + entry_path = f"{path.rstrip('/')}/{entry_name}" + + if entry_type == "dir": + # Recursively process subdirectories + files.extend( + self._list_files_recursive(repo_id, repo_name, entry_path, start, end) + ) + elif entry_type == "file": + # Check modification time + mtime = entry.get("mtime", 0) + if mtime: + modified = datetime.fromtimestamp(mtime, tz=timezone.utc) + if start < modified <= end: + files.append((entry_path, entry, {"id": repo_id, "name": repo_name})) + + return files + + def _yield_seafile_documents( + self, + start: datetime, + end: datetime, + ) -> GenerateDocumentsOutput: + """Generate documents from SeaFile server. + + Args: + start: Start datetime for filtering + end: End datetime for filtering + + Yields: + Batches of documents + """ + logger.info(f"Searching for files between {start} and {end}") + + libraries = self._get_libraries() + logger.info(f"Processing {len(libraries)} libraries") + + all_files = [] + for lib in libraries: + repo_id = lib.get("id") + repo_name = lib.get("name", "Unknown") + + if not repo_id: + continue + + logger.debug(f"Scanning library: {repo_name}") + try: + files = self._list_files_recursive(repo_id, repo_name, "/", start, end) + all_files.extend(files) + logger.debug(f"Found {len(files)} files in {repo_name}") + except Exception as e: + logger.error(f"Error processing library {repo_name}: {e}") + + logger.info(f"Found {len(all_files)} total files matching time criteria") + + batch: list[Document] = [] + for file_path, file_entry, library in all_files: + file_name = file_entry.get("name", "") + file_size = file_entry.get("size", 0) + file_id = file_entry.get("id", "") + mtime = file_entry.get("mtime", 0) + repo_id = library["id"] + repo_name = library["name"] + + # Skip files that are too large + if file_size > self.size_threshold: + logger.warning( + f"Skipping large file: {file_path} ({file_size} bytes)" + ) + continue + + try: + # Get download link + download_link = self._get_file_download_link(repo_id, file_path) + if not download_link: + logger.warning(f"Could not get download link for {file_path}") + continue + + # Download file content + logger.debug(f"Downloading: {file_path}") + response = rl_requests.get(download_link, timeout=120) + response.raise_for_status() + blob = response.content + + if not blob: + logger.warning(f"Downloaded content is empty for {file_path}") + continue + + # Build semantic identifier + semantic_id = f"{repo_name}{file_path}" + + # Get modification time + modified = datetime.fromtimestamp(mtime, tz=timezone.utc) if mtime else datetime.now(timezone.utc) + + batch.append( + Document( + id=f"seafile:{repo_id}:{file_id}", + blob=blob, + source=DocumentSource.SEAFILE, + semantic_identifier=semantic_id, + extension=get_file_ext(file_name), + doc_updated_at=modified, + size_bytes=len(blob), + ) + ) + + if len(batch) >= self.batch_size: + yield batch + batch = [] + + except Exception as e: + logger.error(f"Error downloading file {file_path}: {e}") + + if batch: + yield batch + + def load_from_state(self) -> GenerateDocumentsOutput: + """Load all documents from SeaFile server. + + Yields: + Batches of documents + """ + logger.info(f"Loading all documents from SeaFile server {self.seafile_url}") + return self._yield_seafile_documents( + start=datetime(1970, 1, 1, tzinfo=timezone.utc), + end=datetime.now(timezone.utc), + ) + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> GenerateDocumentsOutput: + """Poll SeaFile server for updated documents. + + Args: + start: Start timestamp (seconds since Unix epoch) + end: End timestamp (seconds since Unix epoch) + + Yields: + Batches of documents + """ + start_datetime = datetime.fromtimestamp(start, tz=timezone.utc) + end_datetime = datetime.fromtimestamp(end, tz=timezone.utc) + + logger.info(f"Polling SeaFile for updates from {start_datetime} to {end_datetime}") + + for batch in self._yield_seafile_documents(start_datetime, end_datetime): + yield batch + + diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index b6cce38b2..a9735e3dd 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -53,6 +53,7 @@ from common.data_source import ( AsanaConnector, ImapConnector, ZendeskConnector, + SeaFileConnector, ) from common.constants import FileSource, TaskStatus from common.data_source.config import INDEX_BATCH_SIZE @@ -1178,6 +1179,40 @@ class Bitbucket(SyncBase): return wrapper() +class SeaFile(SyncBase): + SOURCE_NAME: str = FileSource.SEAFILE + + async def _generate(self, task: dict): + self.connector = SeaFileConnector( + seafile_url=self.conf["seafile_url"], + batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE), + include_shared=self.conf.get("include_shared", True) + ) + + self.connector.load_credentials(self.conf["credentials"]) + + # Determine the time range for synchronization based on reindex or poll_range_start + poll_start = task.get("poll_range_start") + + if task["reindex"] == "1" or poll_start is None: + document_generator = self.connector.load_from_state() + begin_info = "totally" + else: + document_generator = self.connector.poll_source( + poll_start.timestamp(), + datetime.now(timezone.utc).timestamp(), + ) + begin_info = f"from {poll_start}" + + logging.info( + "Connect to SeaFile: {} (include_shared: {}) {}".format( + self.conf["seafile_url"], + self.conf.get("include_shared", True), + begin_info + ) + ) + return document_generator + func_factory = { FileSource.S3: S3, FileSource.R2: R2, @@ -1203,6 +1238,7 @@ func_factory = { FileSource.GITHUB: Github, FileSource.GITLAB: Gitlab, FileSource.BITBUCKET: Bitbucket, + FileSource.SEAFILE: SeaFile, } diff --git a/web/src/assets/svg/data-source/seafile.svg b/web/src/assets/svg/data-source/seafile.svg new file mode 100644 index 000000000..8fd594ce8 --- /dev/null +++ b/web/src/assets/svg/data-source/seafile.svg @@ -0,0 +1 @@ +Seafile \ No newline at end of file diff --git a/web/src/locales/de.ts b/web/src/locales/de.ts index fff1ffe5b..196615d82 100644 --- a/web/src/locales/de.ts +++ b/web/src/locales/de.ts @@ -1026,6 +1026,16 @@ Beispiel: Virtual Hosted Style`, 'Die Basis-URL Ihrer Moodle-Instanz (z.B. https://moodle.university.edu). Fügen Sie nicht /webservice oder /login hinzu.', moodleTokenTip: 'Generieren Sie ein Web-Service-Token in Moodle: Gehen Sie zu Website-Administration → Server → Web-Services → Token verwalten. Der Benutzer muss in den Kursen eingeschrieben sein, die Sie synchronisieren möchten.', + seafileDescription: + 'Verbinden Sie sich mit Ihrem SeaFile-Server, um Dateien und Dokumente aus Ihren Bibliotheken zu synchronisieren.', + seafileUrlTip: + 'Die Basis-URL Ihres SeaFile-Servers (z.B. https://seafile.example.com). Fügen Sie kein /api2 oder andere Pfade hinzu.', + seafileTokenTip: + 'Erstellen Sie ein API-Token in SeaFile: Gehen Sie zu Einstellungen → API-Token → Token erstellen. Das Token ermöglicht den Zugriff auf alle für Ihr Konto sichtbaren Bibliotheken.', + seafileIncludeSharedTip: + 'Wenn aktiviert, werden auch Bibliotheken synchronisiert, die andere Benutzer mit Ihnen geteilt haben.', + seafileBatchSizeTip: + 'Anzahl der Dokumente, die pro Batch verarbeitet werden. Höhere Werte können die Leistung verbessern, benötigen aber mehr Arbeitsspeicher. Standard: 100.', jiraDescription: 'Verbinden Sie Ihren Jira-Arbeitsbereich, um Vorgänge, Kommentare und Anhänge zu synchronisieren.', jiraBaseUrlTip: diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index a7749c00e..205e7dc52 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -998,6 +998,16 @@ Example: Virtual Hosted Style`, 'The base URL of your Moodle instance (e.g., https://moodle.university.edu). Do not include /webservice or /login.', moodleTokenTip: 'Generate a web service token in Moodle: Go to Site administration → Server → Web services → Manage tokens. The user must be enrolled in the courses you want to sync.', + seafileDescription: + 'Connect to your SeaFile server to sync files and documents from your libraries.', + seafileUrlTip: + 'The base URL of your SeaFile server (e.g., https://seafile.example.com). Do not include /api2 or other paths.', + seafileTokenTip: + 'Generate an API token in SeaFile: Go to Settings → API Token → Generate Token. The token provides access to all libraries visible to your account.', + seafileIncludeSharedTip: + 'When enabled, libraries shared with you by other users will also be synced.', + seafileBatchSizeTip: + 'Number of documents to process per batch. Higher values may improve performance but use more memory. Default: 100.', jiraDescription: 'Connect your Jira workspace to sync issues, comments, and attachments.', jiraBaseUrlTip: diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 199443178..ade53c36e 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -35,6 +35,7 @@ export enum DataSourceKey { GITHUB = 'github', BITBUCKET = 'bitbucket', ZENDESK = 'zendesk', + SEAFILE = 'seafile', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', // TEAMS = 'teams', @@ -155,6 +156,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.ZENDESK}Description`), icon: , }, + [DataSourceKey.SEAFILE]: { + name: 'SeaFile', + description: t(`setting.${DataSourceKey.SEAFILE}Description`), + icon: , + }, }; }; @@ -815,6 +821,39 @@ export const DataSourceFormFields = { ], }, ], + [DataSourceKey.SEAFILE]: [ + { + label: 'SeaFile Server URL', + name: 'config.seafile_url', + type: FormFieldType.Text, + required: true, + placeholder: 'https://seafile.example.com', + tooltip: t('setting.seafileUrlTip'), + }, + { + label: 'API Token', + name: 'config.credentials.seafile_token', + type: FormFieldType.Password, + required: true, + tooltip: t('setting.seafileTokenTip'), + }, + { + label: 'Include Shared Libraries', + name: 'config.include_shared', + type: FormFieldType.Checkbox, + required: false, + defaultValue: true, + tooltip: t('setting.seafileIncludeSharedTip'), + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + placeholder: '100', + tooltip: t('setting.seafileBatchSizeTip'), + }, + ], }; export const DataSourceFormDefaultValues = { @@ -1096,4 +1135,17 @@ export const DataSourceFormDefaultValues = { }, }, }, + + [DataSourceKey.SEAFILE]: { + name: '', + source: DataSourceKey.SEAFILE, + config: { + seafile_url: '', + include_shared: true, + batch_size: 100, + credentials: { + seafile_token: '', + }, + }, + }, };