mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-02-04 01:25:07 +08:00
### What problem does this PR solve? This PR adds **Seafile** as a new data source connector for RAGFlow. [Seafile](https://www.seafile.com/) is an open-source, self-hosted file sync and share platform widely used by enterprises, universities, and organizations that require data sovereignty and privacy. Users who store documents in Seafile currently have no way to index and search their content through RAGFlow. This connector enables RAGFlow users to: - Connect to self-hosted Seafile servers via API token - Index documents from personal and shared libraries - Support incremental polling for updated files - Seamlessly integrate Seafile-stored documents into their RAG pipelines ### Type of change - [x] New Feature (non-breaking change which adds functionality) ### Changes included - `SeaFileConnector` implementing `LoadConnector` and `PollConnector` interfaces - Support for API token - Recursive file traversal across libraries - Time-based filtering for incremental updates - Seafile logo (sourced from Simple Icons, CC0) - Connector configuration and registration ### Testing - Tested against self-hosted Seafile Community Edition - Verified authentication (token) - Verified document ingestion from personal and shared libraries - Verified incremental polling with time filters
391 lines
14 KiB
Python
391 lines
14 KiB
Python
"""SeaFile connector"""
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
from retry import retry
|
|
|
|
from common.data_source.utils import (
|
|
get_file_ext,
|
|
rl_requests,
|
|
)
|
|
from common.data_source.config import (
|
|
DocumentSource,
|
|
INDEX_BATCH_SIZE,
|
|
BLOB_STORAGE_SIZE_THRESHOLD,
|
|
)
|
|
from common.data_source.exceptions import (
|
|
ConnectorMissingCredentialError,
|
|
ConnectorValidationError,
|
|
CredentialExpiredError,
|
|
InsufficientPermissionsError,
|
|
)
|
|
from common.data_source.interfaces import LoadConnector, PollConnector
|
|
from common.data_source.models import (
|
|
Document,
|
|
SecondsSinceUnixEpoch,
|
|
GenerateDocumentsOutput,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SeaFileConnector(LoadConnector, PollConnector):
|
|
"""SeaFile connector for syncing files from SeaFile servers"""
|
|
|
|
def __init__(
|
|
self,
|
|
seafile_url: str,
|
|
batch_size: int = INDEX_BATCH_SIZE,
|
|
include_shared: bool = True,
|
|
) -> None:
|
|
"""Initialize SeaFile connector.
|
|
|
|
Args:
|
|
seafile_url: Base URL of the SeaFile server (e.g., https://seafile.example.com)
|
|
batch_size: Number of documents to yield per batch
|
|
include_shared: Whether to include shared libraries
|
|
"""
|
|
|
|
self.seafile_url = seafile_url.rstrip("/")
|
|
self.api_url = f"{self.seafile_url}/api2"
|
|
self.batch_size = batch_size
|
|
self.include_shared = include_shared
|
|
self.token: Optional[str] = None
|
|
self.current_user_email: Optional[str] = None
|
|
self.size_threshold: int = BLOB_STORAGE_SIZE_THRESHOLD
|
|
|
|
def _get_headers(self) -> dict[str, str]:
|
|
"""Get authorization headers for API requests"""
|
|
if not self.token:
|
|
raise ConnectorMissingCredentialError("SeaFile token not set")
|
|
return {
|
|
"Authorization": f"Token {self.token}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
def _make_get_request(self, endpoint: str, params: Optional[dict] = None):
|
|
"""Make authenticated GET request"""
|
|
url = f"{self.api_url}/{endpoint.lstrip('/')}"
|
|
response = rl_requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
params=params,
|
|
timeout=60,
|
|
)
|
|
return response
|
|
|
|
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""Load and validate SeaFile credentials.
|
|
|
|
Args:
|
|
credentials: Dictionary containing 'seafile_token' or 'username'/'password'
|
|
|
|
Returns:
|
|
None
|
|
|
|
Raises:
|
|
ConnectorMissingCredentialError: If required credentials are missing
|
|
"""
|
|
logger.debug(f"Loading credentials for SeaFile server {self.seafile_url}")
|
|
|
|
token = credentials.get("seafile_token")
|
|
username = credentials.get("username")
|
|
password = credentials.get("password")
|
|
|
|
if token:
|
|
self.token = token
|
|
elif username and password:
|
|
self.token = self._authenticate_with_password(username, password)
|
|
else:
|
|
raise ConnectorMissingCredentialError(
|
|
"SeaFile requires 'seafile_token' or 'username'/'password' credentials"
|
|
)
|
|
|
|
# Validate token and get current user info
|
|
try:
|
|
self._validate_token()
|
|
except Exception as e:
|
|
raise CredentialExpiredError(f"SeaFile token validation failed: {e}")
|
|
|
|
return None
|
|
|
|
def _authenticate_with_password(self, username: str, password: str) -> str:
|
|
"""Authenticate with username/password and return API token"""
|
|
try:
|
|
response = rl_requests.post(
|
|
f"{self.api_url}/auth-token/",
|
|
data={"username": username, "password": password},
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
token = data.get("token")
|
|
if not token:
|
|
raise CredentialExpiredError("No token returned from SeaFile")
|
|
return token
|
|
except Exception as e:
|
|
raise ConnectorMissingCredentialError(
|
|
f"Failed to authenticate with SeaFile: {e}"
|
|
)
|
|
|
|
def _validate_token(self) -> dict:
|
|
"""Validate token by fetching account info"""
|
|
response = self._make_get_request("/account/info/")
|
|
response.raise_for_status()
|
|
account_info = response.json()
|
|
self.current_user_email = account_info.get("email")
|
|
logger.info(f"SeaFile authenticated as: {self.current_user_email}")
|
|
return account_info
|
|
|
|
def validate_connector_settings(self) -> None:
|
|
"""Validate SeaFile connector settings"""
|
|
if self.token is None:
|
|
raise ConnectorMissingCredentialError("SeaFile credentials not loaded.")
|
|
|
|
if not self.seafile_url:
|
|
raise ConnectorValidationError("No SeaFile URL was provided.")
|
|
|
|
try:
|
|
account_info = self._validate_token()
|
|
if not account_info.get("email"):
|
|
raise InsufficientPermissionsError("Invalid SeaFile API response")
|
|
|
|
# Check if we can list libraries
|
|
libraries = self._get_libraries()
|
|
logger.info(f"SeaFile connection validated. Found {len(libraries)} libraries.")
|
|
|
|
except Exception as e:
|
|
status = None
|
|
resp = getattr(e, "response", None)
|
|
if resp is not None:
|
|
status = getattr(resp, "status_code", None)
|
|
|
|
if status == 401:
|
|
raise CredentialExpiredError("SeaFile token is invalid or expired.")
|
|
if status == 403:
|
|
raise InsufficientPermissionsError(
|
|
"Insufficient permissions to access SeaFile API."
|
|
)
|
|
raise ConnectorValidationError(f"SeaFile validation failed: {repr(e)}")
|
|
|
|
@retry(tries=3, delay=1, backoff=2)
|
|
def _get_libraries(self) -> list[dict]:
|
|
"""Fetch all accessible libraries (repos)"""
|
|
response = self._make_get_request("/repos/")
|
|
response.raise_for_status()
|
|
libraries = response.json()
|
|
|
|
logger.debug(f"Found {len(libraries)} total libraries")
|
|
|
|
if not self.include_shared and self.current_user_email:
|
|
# Filter to only owned libraries
|
|
owned_libraries = [
|
|
lib for lib in libraries
|
|
if lib.get("owner") == self.current_user_email
|
|
or lib.get("owner_email") == self.current_user_email
|
|
]
|
|
logger.debug(
|
|
f"Filtered to {len(owned_libraries)} owned libraries "
|
|
f"(excluded {len(libraries) - len(owned_libraries)} shared)"
|
|
)
|
|
return owned_libraries
|
|
|
|
return libraries
|
|
|
|
@retry(tries=3, delay=1, backoff=2)
|
|
def _get_directory_entries(self, repo_id: str, path: str = "/") -> list[dict]:
|
|
"""Fetch directory entries for a given path"""
|
|
try:
|
|
response = self._make_get_request(
|
|
f"/repos/{repo_id}/dir/",
|
|
params={"p": path},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.warning(f"Error fetching directory {path} in repo {repo_id}: {e}")
|
|
return []
|
|
|
|
@retry(tries=3, delay=1, backoff=2)
|
|
def _get_file_download_link(self, repo_id: str, path: str) -> Optional[str]:
|
|
"""Get download link for a file"""
|
|
try:
|
|
response = self._make_get_request(
|
|
f"/repos/{repo_id}/file/",
|
|
params={"p": path, "reuse": 1},
|
|
)
|
|
response.raise_for_status()
|
|
return response.text.strip('"')
|
|
except Exception as e:
|
|
logger.warning(f"Error getting download link for {path}: {e}")
|
|
return None
|
|
|
|
def _list_files_recursive(
|
|
self,
|
|
repo_id: str,
|
|
repo_name: str,
|
|
path: str,
|
|
start: datetime,
|
|
end: datetime,
|
|
) -> list[tuple[str, dict, dict]]:
|
|
"""Recursively list all files in the given path within time range.
|
|
|
|
Returns:
|
|
List of tuples: (file_path, file_entry, library_info)
|
|
"""
|
|
files = []
|
|
entries = self._get_directory_entries(repo_id, path)
|
|
|
|
for entry in entries:
|
|
entry_type = entry.get("type")
|
|
entry_name = entry.get("name", "")
|
|
entry_path = f"{path.rstrip('/')}/{entry_name}"
|
|
|
|
if entry_type == "dir":
|
|
# Recursively process subdirectories
|
|
files.extend(
|
|
self._list_files_recursive(repo_id, repo_name, entry_path, start, end)
|
|
)
|
|
elif entry_type == "file":
|
|
# Check modification time
|
|
mtime = entry.get("mtime", 0)
|
|
if mtime:
|
|
modified = datetime.fromtimestamp(mtime, tz=timezone.utc)
|
|
if start < modified <= end:
|
|
files.append((entry_path, entry, {"id": repo_id, "name": repo_name}))
|
|
|
|
return files
|
|
|
|
def _yield_seafile_documents(
|
|
self,
|
|
start: datetime,
|
|
end: datetime,
|
|
) -> GenerateDocumentsOutput:
|
|
"""Generate documents from SeaFile server.
|
|
|
|
Args:
|
|
start: Start datetime for filtering
|
|
end: End datetime for filtering
|
|
|
|
Yields:
|
|
Batches of documents
|
|
"""
|
|
logger.info(f"Searching for files between {start} and {end}")
|
|
|
|
libraries = self._get_libraries()
|
|
logger.info(f"Processing {len(libraries)} libraries")
|
|
|
|
all_files = []
|
|
for lib in libraries:
|
|
repo_id = lib.get("id")
|
|
repo_name = lib.get("name", "Unknown")
|
|
|
|
if not repo_id:
|
|
continue
|
|
|
|
logger.debug(f"Scanning library: {repo_name}")
|
|
try:
|
|
files = self._list_files_recursive(repo_id, repo_name, "/", start, end)
|
|
all_files.extend(files)
|
|
logger.debug(f"Found {len(files)} files in {repo_name}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing library {repo_name}: {e}")
|
|
|
|
logger.info(f"Found {len(all_files)} total files matching time criteria")
|
|
|
|
batch: list[Document] = []
|
|
for file_path, file_entry, library in all_files:
|
|
file_name = file_entry.get("name", "")
|
|
file_size = file_entry.get("size", 0)
|
|
file_id = file_entry.get("id", "")
|
|
mtime = file_entry.get("mtime", 0)
|
|
repo_id = library["id"]
|
|
repo_name = library["name"]
|
|
|
|
# Skip files that are too large
|
|
if file_size > self.size_threshold:
|
|
logger.warning(
|
|
f"Skipping large file: {file_path} ({file_size} bytes)"
|
|
)
|
|
continue
|
|
|
|
try:
|
|
# Get download link
|
|
download_link = self._get_file_download_link(repo_id, file_path)
|
|
if not download_link:
|
|
logger.warning(f"Could not get download link for {file_path}")
|
|
continue
|
|
|
|
# Download file content
|
|
logger.debug(f"Downloading: {file_path}")
|
|
response = rl_requests.get(download_link, timeout=120)
|
|
response.raise_for_status()
|
|
blob = response.content
|
|
|
|
if not blob:
|
|
logger.warning(f"Downloaded content is empty for {file_path}")
|
|
continue
|
|
|
|
# Build semantic identifier
|
|
semantic_id = f"{repo_name}{file_path}"
|
|
|
|
# Get modification time
|
|
modified = datetime.fromtimestamp(mtime, tz=timezone.utc) if mtime else datetime.now(timezone.utc)
|
|
|
|
batch.append(
|
|
Document(
|
|
id=f"seafile:{repo_id}:{file_id}",
|
|
blob=blob,
|
|
source=DocumentSource.SEAFILE,
|
|
semantic_identifier=semantic_id,
|
|
extension=get_file_ext(file_name),
|
|
doc_updated_at=modified,
|
|
size_bytes=len(blob),
|
|
)
|
|
)
|
|
|
|
if len(batch) >= self.batch_size:
|
|
yield batch
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error downloading file {file_path}: {e}")
|
|
|
|
if batch:
|
|
yield batch
|
|
|
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
|
"""Load all documents from SeaFile server.
|
|
|
|
Yields:
|
|
Batches of documents
|
|
"""
|
|
logger.info(f"Loading all documents from SeaFile server {self.seafile_url}")
|
|
return self._yield_seafile_documents(
|
|
start=datetime(1970, 1, 1, tzinfo=timezone.utc),
|
|
end=datetime.now(timezone.utc),
|
|
)
|
|
|
|
def poll_source(
|
|
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
|
) -> GenerateDocumentsOutput:
|
|
"""Poll SeaFile server for updated documents.
|
|
|
|
Args:
|
|
start: Start timestamp (seconds since Unix epoch)
|
|
end: End timestamp (seconds since Unix epoch)
|
|
|
|
Yields:
|
|
Batches of documents
|
|
"""
|
|
start_datetime = datetime.fromtimestamp(start, tz=timezone.utc)
|
|
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
|
|
|
|
logger.info(f"Polling SeaFile for updates from {start_datetime} to {end_datetime}")
|
|
|
|
for batch in self._yield_seafile_documents(start_datetime, end_datetime):
|
|
yield batch
|
|
|
|
|