diff --git a/common/constants.py b/common/constants.py index de228bbe8..533e087f9 100644 --- a/common/constants.py +++ b/common/constants.py @@ -135,6 +135,7 @@ class FileSource(StrEnum): IMAP = "imap" BITBUCKET = "bitbucket" ZENDESK = "zendesk" + SEAFILE = "seafile" class PipelineTaskType(StrEnum): diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index 97ce3f18e..a8509d532 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -39,6 +39,7 @@ from .airtable_connector import AirtableConnector from .asana_connector import AsanaConnector from .imap_connector import ImapConnector from .zendesk_connector import ZendeskConnector +from .seafile_connector import SeaFileConnector from .config import BlobType, DocumentSource from .models import Document, TextSection, ImageSection, BasicExpertInfo from .exceptions import ( @@ -77,4 +78,5 @@ __all__ = [ "AsanaConnector", "ImapConnector", "ZendeskConnector", + "SeaFileConnector", ] diff --git a/common/data_source/config.py b/common/data_source/config.py index 843423294..74e37c815 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -63,6 +63,7 @@ class DocumentSource(str, Enum): IMAP = "imap" BITBUCKET = "bitbucket" ZENDESK = "zendesk" + SEAFILE = "seafile" class FileOrigin(str, Enum): diff --git a/common/data_source/seafile_connector.py b/common/data_source/seafile_connector.py new file mode 100644 index 000000000..0181269e8 --- /dev/null +++ b/common/data_source/seafile_connector.py @@ -0,0 +1,390 @@ +"""SeaFile connector""" +import logging +from datetime import datetime, timezone +from typing import Any, Optional + +from retry import retry + +from common.data_source.utils import ( + get_file_ext, + rl_requests, +) +from common.data_source.config import ( + DocumentSource, + INDEX_BATCH_SIZE, + BLOB_STORAGE_SIZE_THRESHOLD, +) +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + CredentialExpiredError, + InsufficientPermissionsError, +) +from common.data_source.interfaces import LoadConnector, PollConnector +from common.data_source.models import ( + Document, + SecondsSinceUnixEpoch, + GenerateDocumentsOutput, +) + +logger = logging.getLogger(__name__) + + +class SeaFileConnector(LoadConnector, PollConnector): + """SeaFile connector for syncing files from SeaFile servers""" + + def __init__( + self, + seafile_url: str, + batch_size: int = INDEX_BATCH_SIZE, + include_shared: bool = True, + ) -> None: + """Initialize SeaFile connector. + + Args: + seafile_url: Base URL of the SeaFile server (e.g., https://seafile.example.com) + batch_size: Number of documents to yield per batch + include_shared: Whether to include shared libraries + """ + + self.seafile_url = seafile_url.rstrip("/") + self.api_url = f"{self.seafile_url}/api2" + self.batch_size = batch_size + self.include_shared = include_shared + self.token: Optional[str] = None + self.current_user_email: Optional[str] = None + self.size_threshold: int = BLOB_STORAGE_SIZE_THRESHOLD + + def _get_headers(self) -> dict[str, str]: + """Get authorization headers for API requests""" + if not self.token: + raise ConnectorMissingCredentialError("SeaFile token not set") + return { + "Authorization": f"Token {self.token}", + "Accept": "application/json", + } + + def _make_get_request(self, endpoint: str, params: Optional[dict] = None): + """Make authenticated GET request""" + url = f"{self.api_url}/{endpoint.lstrip('/')}" + response = rl_requests.get( + url, + headers=self._get_headers(), + params=params, + timeout=60, + ) + return response + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + """Load and validate SeaFile credentials. + + Args: + credentials: Dictionary containing 'seafile_token' or 'username'/'password' + + Returns: + None + + Raises: + ConnectorMissingCredentialError: If required credentials are missing + """ + logger.debug(f"Loading credentials for SeaFile server {self.seafile_url}") + + token = credentials.get("seafile_token") + username = credentials.get("username") + password = credentials.get("password") + + if token: + self.token = token + elif username and password: + self.token = self._authenticate_with_password(username, password) + else: + raise ConnectorMissingCredentialError( + "SeaFile requires 'seafile_token' or 'username'/'password' credentials" + ) + + # Validate token and get current user info + try: + self._validate_token() + except Exception as e: + raise CredentialExpiredError(f"SeaFile token validation failed: {e}") + + return None + + def _authenticate_with_password(self, username: str, password: str) -> str: + """Authenticate with username/password and return API token""" + try: + response = rl_requests.post( + f"{self.api_url}/auth-token/", + data={"username": username, "password": password}, + timeout=30, + ) + response.raise_for_status() + data = response.json() + token = data.get("token") + if not token: + raise CredentialExpiredError("No token returned from SeaFile") + return token + except Exception as e: + raise ConnectorMissingCredentialError( + f"Failed to authenticate with SeaFile: {e}" + ) + + def _validate_token(self) -> dict: + """Validate token by fetching account info""" + response = self._make_get_request("/account/info/") + response.raise_for_status() + account_info = response.json() + self.current_user_email = account_info.get("email") + logger.info(f"SeaFile authenticated as: {self.current_user_email}") + return account_info + + def validate_connector_settings(self) -> None: + """Validate SeaFile connector settings""" + if self.token is None: + raise ConnectorMissingCredentialError("SeaFile credentials not loaded.") + + if not self.seafile_url: + raise ConnectorValidationError("No SeaFile URL was provided.") + + try: + account_info = self._validate_token() + if not account_info.get("email"): + raise InsufficientPermissionsError("Invalid SeaFile API response") + + # Check if we can list libraries + libraries = self._get_libraries() + logger.info(f"SeaFile connection validated. Found {len(libraries)} libraries.") + + except Exception as e: + status = None + resp = getattr(e, "response", None) + if resp is not None: + status = getattr(resp, "status_code", None) + + if status == 401: + raise CredentialExpiredError("SeaFile token is invalid or expired.") + if status == 403: + raise InsufficientPermissionsError( + "Insufficient permissions to access SeaFile API." + ) + raise ConnectorValidationError(f"SeaFile validation failed: {repr(e)}") + + @retry(tries=3, delay=1, backoff=2) + def _get_libraries(self) -> list[dict]: + """Fetch all accessible libraries (repos)""" + response = self._make_get_request("/repos/") + response.raise_for_status() + libraries = response.json() + + logger.debug(f"Found {len(libraries)} total libraries") + + if not self.include_shared and self.current_user_email: + # Filter to only owned libraries + owned_libraries = [ + lib for lib in libraries + if lib.get("owner") == self.current_user_email + or lib.get("owner_email") == self.current_user_email + ] + logger.debug( + f"Filtered to {len(owned_libraries)} owned libraries " + f"(excluded {len(libraries) - len(owned_libraries)} shared)" + ) + return owned_libraries + + return libraries + + @retry(tries=3, delay=1, backoff=2) + def _get_directory_entries(self, repo_id: str, path: str = "/") -> list[dict]: + """Fetch directory entries for a given path""" + try: + response = self._make_get_request( + f"/repos/{repo_id}/dir/", + params={"p": path}, + ) + response.raise_for_status() + return response.json() + except Exception as e: + logger.warning(f"Error fetching directory {path} in repo {repo_id}: {e}") + return [] + + @retry(tries=3, delay=1, backoff=2) + def _get_file_download_link(self, repo_id: str, path: str) -> Optional[str]: + """Get download link for a file""" + try: + response = self._make_get_request( + f"/repos/{repo_id}/file/", + params={"p": path, "reuse": 1}, + ) + response.raise_for_status() + return response.text.strip('"') + except Exception as e: + logger.warning(f"Error getting download link for {path}: {e}") + return None + + def _list_files_recursive( + self, + repo_id: str, + repo_name: str, + path: str, + start: datetime, + end: datetime, + ) -> list[tuple[str, dict, dict]]: + """Recursively list all files in the given path within time range. + + Returns: + List of tuples: (file_path, file_entry, library_info) + """ + files = [] + entries = self._get_directory_entries(repo_id, path) + + for entry in entries: + entry_type = entry.get("type") + entry_name = entry.get("name", "") + entry_path = f"{path.rstrip('/')}/{entry_name}" + + if entry_type == "dir": + # Recursively process subdirectories + files.extend( + self._list_files_recursive(repo_id, repo_name, entry_path, start, end) + ) + elif entry_type == "file": + # Check modification time + mtime = entry.get("mtime", 0) + if mtime: + modified = datetime.fromtimestamp(mtime, tz=timezone.utc) + if start < modified <= end: + files.append((entry_path, entry, {"id": repo_id, "name": repo_name})) + + return files + + def _yield_seafile_documents( + self, + start: datetime, + end: datetime, + ) -> GenerateDocumentsOutput: + """Generate documents from SeaFile server. + + Args: + start: Start datetime for filtering + end: End datetime for filtering + + Yields: + Batches of documents + """ + logger.info(f"Searching for files between {start} and {end}") + + libraries = self._get_libraries() + logger.info(f"Processing {len(libraries)} libraries") + + all_files = [] + for lib in libraries: + repo_id = lib.get("id") + repo_name = lib.get("name", "Unknown") + + if not repo_id: + continue + + logger.debug(f"Scanning library: {repo_name}") + try: + files = self._list_files_recursive(repo_id, repo_name, "/", start, end) + all_files.extend(files) + logger.debug(f"Found {len(files)} files in {repo_name}") + except Exception as e: + logger.error(f"Error processing library {repo_name}: {e}") + + logger.info(f"Found {len(all_files)} total files matching time criteria") + + batch: list[Document] = [] + for file_path, file_entry, library in all_files: + file_name = file_entry.get("name", "") + file_size = file_entry.get("size", 0) + file_id = file_entry.get("id", "") + mtime = file_entry.get("mtime", 0) + repo_id = library["id"] + repo_name = library["name"] + + # Skip files that are too large + if file_size > self.size_threshold: + logger.warning( + f"Skipping large file: {file_path} ({file_size} bytes)" + ) + continue + + try: + # Get download link + download_link = self._get_file_download_link(repo_id, file_path) + if not download_link: + logger.warning(f"Could not get download link for {file_path}") + continue + + # Download file content + logger.debug(f"Downloading: {file_path}") + response = rl_requests.get(download_link, timeout=120) + response.raise_for_status() + blob = response.content + + if not blob: + logger.warning(f"Downloaded content is empty for {file_path}") + continue + + # Build semantic identifier + semantic_id = f"{repo_name}{file_path}" + + # Get modification time + modified = datetime.fromtimestamp(mtime, tz=timezone.utc) if mtime else datetime.now(timezone.utc) + + batch.append( + Document( + id=f"seafile:{repo_id}:{file_id}", + blob=blob, + source=DocumentSource.SEAFILE, + semantic_identifier=semantic_id, + extension=get_file_ext(file_name), + doc_updated_at=modified, + size_bytes=len(blob), + ) + ) + + if len(batch) >= self.batch_size: + yield batch + batch = [] + + except Exception as e: + logger.error(f"Error downloading file {file_path}: {e}") + + if batch: + yield batch + + def load_from_state(self) -> GenerateDocumentsOutput: + """Load all documents from SeaFile server. + + Yields: + Batches of documents + """ + logger.info(f"Loading all documents from SeaFile server {self.seafile_url}") + return self._yield_seafile_documents( + start=datetime(1970, 1, 1, tzinfo=timezone.utc), + end=datetime.now(timezone.utc), + ) + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> GenerateDocumentsOutput: + """Poll SeaFile server for updated documents. + + Args: + start: Start timestamp (seconds since Unix epoch) + end: End timestamp (seconds since Unix epoch) + + Yields: + Batches of documents + """ + start_datetime = datetime.fromtimestamp(start, tz=timezone.utc) + end_datetime = datetime.fromtimestamp(end, tz=timezone.utc) + + logger.info(f"Polling SeaFile for updates from {start_datetime} to {end_datetime}") + + for batch in self._yield_seafile_documents(start_datetime, end_datetime): + yield batch + + diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index b6cce38b2..a9735e3dd 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -53,6 +53,7 @@ from common.data_source import ( AsanaConnector, ImapConnector, ZendeskConnector, + SeaFileConnector, ) from common.constants import FileSource, TaskStatus from common.data_source.config import INDEX_BATCH_SIZE @@ -1178,6 +1179,40 @@ class Bitbucket(SyncBase): return wrapper() +class SeaFile(SyncBase): + SOURCE_NAME: str = FileSource.SEAFILE + + async def _generate(self, task: dict): + self.connector = SeaFileConnector( + seafile_url=self.conf["seafile_url"], + batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE), + include_shared=self.conf.get("include_shared", True) + ) + + self.connector.load_credentials(self.conf["credentials"]) + + # Determine the time range for synchronization based on reindex or poll_range_start + poll_start = task.get("poll_range_start") + + if task["reindex"] == "1" or poll_start is None: + document_generator = self.connector.load_from_state() + begin_info = "totally" + else: + document_generator = self.connector.poll_source( + poll_start.timestamp(), + datetime.now(timezone.utc).timestamp(), + ) + begin_info = f"from {poll_start}" + + logging.info( + "Connect to SeaFile: {} (include_shared: {}) {}".format( + self.conf["seafile_url"], + self.conf.get("include_shared", True), + begin_info + ) + ) + return document_generator + func_factory = { FileSource.S3: S3, FileSource.R2: R2, @@ -1203,6 +1238,7 @@ func_factory = { FileSource.GITHUB: Github, FileSource.GITLAB: Gitlab, FileSource.BITBUCKET: Bitbucket, + FileSource.SEAFILE: SeaFile, } diff --git a/web/src/assets/svg/data-source/seafile.svg b/web/src/assets/svg/data-source/seafile.svg new file mode 100644 index 000000000..8fd594ce8 --- /dev/null +++ b/web/src/assets/svg/data-source/seafile.svg @@ -0,0 +1 @@ +Seafile \ No newline at end of file diff --git a/web/src/locales/de.ts b/web/src/locales/de.ts index fff1ffe5b..196615d82 100644 --- a/web/src/locales/de.ts +++ b/web/src/locales/de.ts @@ -1026,6 +1026,16 @@ Beispiel: Virtual Hosted Style`, 'Die Basis-URL Ihrer Moodle-Instanz (z.B. https://moodle.university.edu). Fügen Sie nicht /webservice oder /login hinzu.', moodleTokenTip: 'Generieren Sie ein Web-Service-Token in Moodle: Gehen Sie zu Website-Administration → Server → Web-Services → Token verwalten. Der Benutzer muss in den Kursen eingeschrieben sein, die Sie synchronisieren möchten.', + seafileDescription: + 'Verbinden Sie sich mit Ihrem SeaFile-Server, um Dateien und Dokumente aus Ihren Bibliotheken zu synchronisieren.', + seafileUrlTip: + 'Die Basis-URL Ihres SeaFile-Servers (z.B. https://seafile.example.com). Fügen Sie kein /api2 oder andere Pfade hinzu.', + seafileTokenTip: + 'Erstellen Sie ein API-Token in SeaFile: Gehen Sie zu Einstellungen → API-Token → Token erstellen. Das Token ermöglicht den Zugriff auf alle für Ihr Konto sichtbaren Bibliotheken.', + seafileIncludeSharedTip: + 'Wenn aktiviert, werden auch Bibliotheken synchronisiert, die andere Benutzer mit Ihnen geteilt haben.', + seafileBatchSizeTip: + 'Anzahl der Dokumente, die pro Batch verarbeitet werden. Höhere Werte können die Leistung verbessern, benötigen aber mehr Arbeitsspeicher. Standard: 100.', jiraDescription: 'Verbinden Sie Ihren Jira-Arbeitsbereich, um Vorgänge, Kommentare und Anhänge zu synchronisieren.', jiraBaseUrlTip: diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index a7749c00e..205e7dc52 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -998,6 +998,16 @@ Example: Virtual Hosted Style`, 'The base URL of your Moodle instance (e.g., https://moodle.university.edu). Do not include /webservice or /login.', moodleTokenTip: 'Generate a web service token in Moodle: Go to Site administration → Server → Web services → Manage tokens. The user must be enrolled in the courses you want to sync.', + seafileDescription: + 'Connect to your SeaFile server to sync files and documents from your libraries.', + seafileUrlTip: + 'The base URL of your SeaFile server (e.g., https://seafile.example.com). Do not include /api2 or other paths.', + seafileTokenTip: + 'Generate an API token in SeaFile: Go to Settings → API Token → Generate Token. The token provides access to all libraries visible to your account.', + seafileIncludeSharedTip: + 'When enabled, libraries shared with you by other users will also be synced.', + seafileBatchSizeTip: + 'Number of documents to process per batch. Higher values may improve performance but use more memory. Default: 100.', jiraDescription: 'Connect your Jira workspace to sync issues, comments, and attachments.', jiraBaseUrlTip: diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 199443178..ade53c36e 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -35,6 +35,7 @@ export enum DataSourceKey { GITHUB = 'github', BITBUCKET = 'bitbucket', ZENDESK = 'zendesk', + SEAFILE = 'seafile', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', // TEAMS = 'teams', @@ -155,6 +156,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.ZENDESK}Description`), icon: , }, + [DataSourceKey.SEAFILE]: { + name: 'SeaFile', + description: t(`setting.${DataSourceKey.SEAFILE}Description`), + icon: , + }, }; }; @@ -815,6 +821,39 @@ export const DataSourceFormFields = { ], }, ], + [DataSourceKey.SEAFILE]: [ + { + label: 'SeaFile Server URL', + name: 'config.seafile_url', + type: FormFieldType.Text, + required: true, + placeholder: 'https://seafile.example.com', + tooltip: t('setting.seafileUrlTip'), + }, + { + label: 'API Token', + name: 'config.credentials.seafile_token', + type: FormFieldType.Password, + required: true, + tooltip: t('setting.seafileTokenTip'), + }, + { + label: 'Include Shared Libraries', + name: 'config.include_shared', + type: FormFieldType.Checkbox, + required: false, + defaultValue: true, + tooltip: t('setting.seafileIncludeSharedTip'), + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + placeholder: '100', + tooltip: t('setting.seafileBatchSizeTip'), + }, + ], }; export const DataSourceFormDefaultValues = { @@ -1096,4 +1135,17 @@ export const DataSourceFormDefaultValues = { }, }, }, + + [DataSourceKey.SEAFILE]: { + name: '', + source: DataSourceKey.SEAFILE, + config: { + seafile_url: '', + include_shared: true, + batch_size: 100, + credentials: { + seafile_token: '', + }, + }, + }, };