diff --git a/common/constants.py b/common/constants.py index d99c09952..4aea764b2 100644 --- a/common/constants.py +++ b/common/constants.py @@ -133,6 +133,7 @@ class FileSource(StrEnum): GITHUB = "github" GITLAB = "gitlab" IMAP = "imap" + BITBUCKET = "bitbucket" ZENDESK = "zendesk" class PipelineTaskType(StrEnum): diff --git a/common/data_source/bitbucket/__init__.py b/common/data_source/bitbucket/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/common/data_source/bitbucket/connector.py b/common/data_source/bitbucket/connector.py new file mode 100644 index 000000000..f355a8945 --- /dev/null +++ b/common/data_source/bitbucket/connector.py @@ -0,0 +1,388 @@ +from __future__ import annotations + +import copy +from collections.abc import Callable +from collections.abc import Iterator +from datetime import datetime +from datetime import timezone +from typing import Any +from typing import TYPE_CHECKING + +from typing_extensions import override + +from common.data_source.config import INDEX_BATCH_SIZE +from common.data_source.config import DocumentSource +from common.data_source.config import REQUEST_TIMEOUT_SECONDS +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + CredentialExpiredError, + InsufficientPermissionsError, + UnexpectedValidationError, +) +from common.data_source.interfaces import CheckpointedConnector +from common.data_source.interfaces import CheckpointOutput +from common.data_source.interfaces import IndexingHeartbeatInterface +from common.data_source.interfaces import SecondsSinceUnixEpoch +from common.data_source.interfaces import SlimConnectorWithPermSync +from common.data_source.models import ConnectorCheckpoint +from common.data_source.models import ConnectorFailure +from common.data_source.models import DocumentFailure +from common.data_source.models import SlimDocument +from common.data_source.bitbucket.utils import ( + build_auth_client, + list_repositories, + map_pr_to_document, + paginate, + PR_LIST_RESPONSE_FIELDS, + SLIM_PR_LIST_RESPONSE_FIELDS, +) + +if TYPE_CHECKING: + import httpx + + +class BitbucketConnectorCheckpoint(ConnectorCheckpoint): + """Checkpoint state for resumable Bitbucket PR indexing. + + Fields: + repos_queue: Materialized list of repository slugs to process. + current_repo_index: Index of the repository currently being processed. + next_url: Bitbucket "next" URL for continuing pagination within the current repo. + """ + + repos_queue: list[str] = [] + current_repo_index: int = 0 + next_url: str | None = None + + +class BitbucketConnector( + CheckpointedConnector[BitbucketConnectorCheckpoint], + SlimConnectorWithPermSync, +): + """Connector for indexing Bitbucket Cloud pull requests. + + Args: + workspace: Bitbucket workspace ID. + repositories: Comma-separated list of repository slugs to index. + projects: Comma-separated list of project keys to index all repositories within. + batch_size: Max number of documents to yield per batch. + """ + + def __init__( + self, + workspace: str, + repositories: str | None = None, + projects: str | None = None, + batch_size: int = INDEX_BATCH_SIZE, + ) -> None: + self.workspace = workspace + self._repositories = ( + [s.strip() for s in repositories.split(",") if s.strip()] + if repositories + else None + ) + self._projects: list[str] | None = ( + [s.strip() for s in projects.split(",") if s.strip()] if projects else None + ) + self.batch_size = batch_size + self.email: str | None = None + self.api_token: str | None = None + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + """Load API token-based credentials. + + Expects a dict with keys: `bitbucket_email`, `bitbucket_api_token`. + """ + self.email = credentials.get("bitbucket_email") + self.api_token = credentials.get("bitbucket_api_token") + if not self.email or not self.api_token: + raise ConnectorMissingCredentialError("Bitbucket") + return None + + def _client(self) -> httpx.Client: + """Build an authenticated HTTP client or raise if credentials missing.""" + if not self.email or not self.api_token: + raise ConnectorMissingCredentialError("Bitbucket") + return build_auth_client(self.email, self.api_token) + + def _iter_pull_requests_for_repo( + self, + client: httpx.Client, + repo_slug: str, + params: dict[str, Any] | None = None, + start_url: str | None = None, + on_page: Callable[[str | None], None] | None = None, + ) -> Iterator[dict[str, Any]]: + base = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}/{repo_slug}/pullrequests" + yield from paginate( + client, + base, + params, + start_url=start_url, + on_page=on_page, + ) + + def _build_params( + self, + fields: str = PR_LIST_RESPONSE_FIELDS, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + ) -> dict[str, Any]: + """Build Bitbucket fetch params. + + Always include OPEN, MERGED, and DECLINED PRs. If both ``start`` and + ``end`` are provided, apply a single updated_on time window. + """ + + def _iso(ts: SecondsSinceUnixEpoch) -> str: + return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() + + def _tc_epoch( + lower_epoch: SecondsSinceUnixEpoch | None, + upper_epoch: SecondsSinceUnixEpoch | None, + ) -> str | None: + if lower_epoch is not None and upper_epoch is not None: + lower_iso = _iso(lower_epoch) + upper_iso = _iso(upper_epoch) + return f'(updated_on > "{lower_iso}" AND updated_on <= "{upper_iso}")' + return None + + params: dict[str, Any] = {"fields": fields, "pagelen": 50} + time_clause = _tc_epoch(start, end) + q = '(state = "OPEN" OR state = "MERGED" OR state = "DECLINED")' + if time_clause: + q = f"{q} AND {time_clause}" + params["q"] = q + return params + + def _iter_target_repositories(self, client: httpx.Client) -> Iterator[str]: + """Yield repository slugs based on configuration. + + Priority: + - repositories list + - projects list (list repos by project key) + - workspace (all repos) + """ + if self._repositories: + for slug in self._repositories: + yield slug + return + if self._projects: + for project_key in self._projects: + for repo in list_repositories(client, self.workspace, project_key): + slug_val = repo.get("slug") + if isinstance(slug_val, str) and slug_val: + yield slug_val + return + for repo in list_repositories(client, self.workspace, None): + slug_val = repo.get("slug") + if isinstance(slug_val, str) and slug_val: + yield slug_val + + @override + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: BitbucketConnectorCheckpoint, + ) -> CheckpointOutput[BitbucketConnectorCheckpoint]: + """Resumable PR ingestion across repos and pages within a time window. + + Yields Documents (or ConnectorFailure for per-PR mapping failures) and returns + an updated checkpoint that records repo position and next page URL. + """ + new_checkpoint = copy.deepcopy(checkpoint) + + with self._client() as client: + # Materialize target repositories once + if not new_checkpoint.repos_queue: + # Preserve explicit order; otherwise ensure deterministic ordering + repos_list = list(self._iter_target_repositories(client)) + new_checkpoint.repos_queue = sorted(set(repos_list)) + new_checkpoint.current_repo_index = 0 + new_checkpoint.next_url = None + + repos = new_checkpoint.repos_queue + if not repos or new_checkpoint.current_repo_index >= len(repos): + new_checkpoint.has_more = False + return new_checkpoint + + repo_slug = repos[new_checkpoint.current_repo_index] + + first_page_params = self._build_params( + fields=PR_LIST_RESPONSE_FIELDS, + start=start, + end=end, + ) + + def _on_page(next_url: str | None) -> None: + new_checkpoint.next_url = next_url + + for pr in self._iter_pull_requests_for_repo( + client, + repo_slug, + params=first_page_params, + start_url=new_checkpoint.next_url, + on_page=_on_page, + ): + try: + document = map_pr_to_document(pr, self.workspace, repo_slug) + yield document + except Exception as e: + pr_id = pr.get("id") + pr_link = ( + f"https://bitbucket.org/{self.workspace}/{repo_slug}/pull-requests/{pr_id}" + if pr_id is not None + else None + ) + yield ConnectorFailure( + failed_document=DocumentFailure( + document_id=( + f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{repo_slug}:pr:{pr_id}" + if pr_id is not None + else f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{repo_slug}:pr:unknown" + ), + document_link=pr_link, + ), + failure_message=f"Failed to process Bitbucket PR: {e}", + exception=e, + ) + + # Advance to next repository (if any) and set has_more accordingly + new_checkpoint.current_repo_index += 1 + new_checkpoint.next_url = None + new_checkpoint.has_more = new_checkpoint.current_repo_index < len(repos) + + return new_checkpoint + + @override + def build_dummy_checkpoint(self) -> BitbucketConnectorCheckpoint: + """Create an initial checkpoint with work remaining.""" + return BitbucketConnectorCheckpoint(has_more=True) + + @override + def validate_checkpoint_json( + self, checkpoint_json: str + ) -> BitbucketConnectorCheckpoint: + """Validate and deserialize a checkpoint instance from JSON.""" + return BitbucketConnectorCheckpoint.model_validate_json(checkpoint_json) + + def retrieve_all_slim_docs_perm_sync( + self, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + callback: IndexingHeartbeatInterface | None = None, + ) -> Iterator[list[SlimDocument]]: + """Return only document IDs for all existing pull requests.""" + batch: list[SlimDocument] = [] + params = self._build_params( + fields=SLIM_PR_LIST_RESPONSE_FIELDS, + start=start, + end=end, + ) + with self._client() as client: + for slug in self._iter_target_repositories(client): + for pr in self._iter_pull_requests_for_repo( + client, slug, params=params + ): + pr_id = pr["id"] + doc_id = f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{slug}:pr:{pr_id}" + batch.append(SlimDocument(id=doc_id)) + if len(batch) >= self.batch_size: + yield batch + batch = [] + if callback: + if callback.should_stop(): + # Note: this is not actually used for permission sync yet, just pruning + raise RuntimeError( + "bitbucket_pr_sync: Stop signal detected" + ) + callback.progress("bitbucket_pr_sync", len(batch)) + if batch: + yield batch + + def validate_connector_settings(self) -> None: + """Validate Bitbucket credentials and workspace access by probing a lightweight endpoint. + + Raises: + CredentialExpiredError: on HTTP 401 + InsufficientPermissionsError: on HTTP 403 + UnexpectedValidationError: on any other failure + """ + try: + with self._client() as client: + url = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}" + resp = client.get( + url, + params={"pagelen": 1, "fields": "pagelen"}, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + if resp.status_code == 401: + raise CredentialExpiredError( + "Invalid or expired Bitbucket credentials (HTTP 401)." + ) + if resp.status_code == 403: + raise InsufficientPermissionsError( + "Insufficient permissions to access Bitbucket workspace (HTTP 403)." + ) + if resp.status_code < 200 or resp.status_code >= 300: + raise UnexpectedValidationError( + f"Unexpected Bitbucket error (status={resp.status_code})." + ) + except Exception as e: + # Network or other unexpected errors + if isinstance( + e, + ( + CredentialExpiredError, + InsufficientPermissionsError, + UnexpectedValidationError, + ConnectorMissingCredentialError, + ), + ): + raise + raise UnexpectedValidationError( + f"Unexpected error while validating Bitbucket settings: {e}" + ) + +if __name__ == "__main__": + bitbucket = BitbucketConnector( + workspace="" + ) + + bitbucket.load_credentials({ + "bitbucket_email": "", + "bitbucket_api_token": "", + }) + + bitbucket.validate_connector_settings() + print("Credentials validated successfully.") + + start_time = datetime.fromtimestamp(0, tz=timezone.utc) + end_time = datetime.now(timezone.utc) + + for doc_batch in bitbucket.retrieve_all_slim_docs_perm_sync( + start=start_time.timestamp(), + end=end_time.timestamp(), + ): + for doc in doc_batch: + print(doc) + + + bitbucket_checkpoint = bitbucket.build_dummy_checkpoint() + + while bitbucket_checkpoint.has_more: + gen = bitbucket.load_from_checkpoint( + start=start_time.timestamp(), + end=end_time.timestamp(), + checkpoint=bitbucket_checkpoint, + ) + + while True: + try: + doc = next(gen) + print(doc) + except StopIteration as e: + bitbucket_checkpoint = e.value + break + \ No newline at end of file diff --git a/common/data_source/bitbucket/utils.py b/common/data_source/bitbucket/utils.py new file mode 100644 index 000000000..4667a9600 --- /dev/null +++ b/common/data_source/bitbucket/utils.py @@ -0,0 +1,288 @@ +from __future__ import annotations + +import time +from collections.abc import Callable +from collections.abc import Iterator +from datetime import datetime +from datetime import timezone +from typing import Any + +import httpx + +from common.data_source.config import REQUEST_TIMEOUT_SECONDS, DocumentSource +from common.data_source.cross_connector_utils.rate_limit_wrapper import ( + rate_limit_builder, +) +from common.data_source.utils import sanitize_filename +from common.data_source.models import BasicExpertInfo, Document +from common.data_source.cross_connector_utils.retry_wrapper import retry_builder + +# Fields requested from Bitbucket PR list endpoint to ensure rich PR data +PR_LIST_RESPONSE_FIELDS: str = ",".join( + [ + "next", + "page", + "pagelen", + "values.author", + "values.close_source_branch", + "values.closed_by", + "values.comment_count", + "values.created_on", + "values.description", + "values.destination", + "values.draft", + "values.id", + "values.links", + "values.merge_commit", + "values.participants", + "values.reason", + "values.rendered", + "values.reviewers", + "values.source", + "values.state", + "values.summary", + "values.task_count", + "values.title", + "values.type", + "values.updated_on", + ] +) + +# Minimal fields for slim retrieval (IDs only) +SLIM_PR_LIST_RESPONSE_FIELDS: str = ",".join( + [ + "next", + "page", + "pagelen", + "values.id", + ] +) + + +# Minimal fields for repository list calls +REPO_LIST_RESPONSE_FIELDS: str = ",".join( + [ + "next", + "page", + "pagelen", + "values.slug", + "values.full_name", + "values.project.key", + ] +) + + +class BitbucketRetriableError(Exception): + """Raised for retriable Bitbucket conditions (429, 5xx).""" + + +class BitbucketNonRetriableError(Exception): + """Raised for non-retriable Bitbucket client errors (4xx except 429).""" + + +@retry_builder( + tries=6, + delay=1, + backoff=2, + max_delay=30, + exceptions=(BitbucketRetriableError, httpx.RequestError), +) +@rate_limit_builder(max_calls=60, period=60) +def bitbucket_get( + client: httpx.Client, url: str, params: dict[str, Any] | None = None +) -> httpx.Response: + """Perform a GET against Bitbucket with retry and rate limiting. + + Retries on 429 and 5xx responses, and on transport errors. Honors + `Retry-After` header for 429 when present by sleeping before retrying. + """ + try: + response = client.get(url, params=params, timeout=REQUEST_TIMEOUT_SECONDS) + except httpx.RequestError: + # Allow retry_builder to handle retries of transport errors + raise + + try: + response.raise_for_status() + except httpx.HTTPStatusError as e: + status = e.response.status_code if e.response is not None else None + if status == 429: + retry_after = e.response.headers.get("Retry-After") if e.response else None + if retry_after is not None: + try: + time.sleep(int(retry_after)) + except (TypeError, ValueError): + pass + raise BitbucketRetriableError("Bitbucket rate limit exceeded (429)") from e + if status is not None and 500 <= status < 600: + raise BitbucketRetriableError(f"Bitbucket server error: {status}") from e + if status is not None and 400 <= status < 500: + raise BitbucketNonRetriableError(f"Bitbucket client error: {status}") from e + # Unknown status, propagate + raise + + return response + + +def build_auth_client(email: str, api_token: str) -> httpx.Client: + """Create an authenticated httpx client for Bitbucket Cloud API.""" + return httpx.Client(auth=(email, api_token), http2=True) + + +def paginate( + client: httpx.Client, + url: str, + params: dict[str, Any] | None = None, + start_url: str | None = None, + on_page: Callable[[str | None], None] | None = None, +) -> Iterator[dict[str, Any]]: + """Iterate over paginated Bitbucket API responses yielding individual values. + + Args: + client: Authenticated HTTP client. + url: Base collection URL (first page when start_url is None). + params: Query params for the first page. + start_url: If provided, start from this absolute URL (ignores params). + on_page: Optional callback invoked after each page with the next page URL. + """ + next_url = start_url or url + # If resuming from a next URL, do not pass params again + query = params.copy() if params else None + query = None if start_url else query + while next_url: + resp = bitbucket_get(client, next_url, params=query) + data = resp.json() + values = data.get("values", []) + for item in values: + yield item + next_url = data.get("next") + if on_page is not None: + on_page(next_url) + # only include params on first call, next_url will contain all necessary params + query = None + + +def list_repositories( + client: httpx.Client, workspace: str, project_key: str | None = None +) -> Iterator[dict[str, Any]]: + """List repositories in a workspace, optionally filtered by project key.""" + base_url = f"https://api.bitbucket.org/2.0/repositories/{workspace}" + params: dict[str, Any] = { + "fields": REPO_LIST_RESPONSE_FIELDS, + "pagelen": 100, + # Ensure deterministic ordering + "sort": "full_name", + } + if project_key: + params["q"] = f'project.key="{project_key}"' + yield from paginate(client, base_url, params) + + +def map_pr_to_document(pr: dict[str, Any], workspace: str, repo_slug: str) -> Document: + """Map a Bitbucket pull request JSON to Onyx Document.""" + pr_id = pr["id"] + title = pr.get("title") or f"PR {pr_id}" + description = pr.get("description") or "" + state = pr.get("state") + draft = pr.get("draft", False) + author = pr.get("author", {}) + reviewers = pr.get("reviewers", []) + participants = pr.get("participants", []) + + link = pr.get("links", {}).get("html", {}).get("href") or ( + f"https://bitbucket.org/{workspace}/{repo_slug}/pull-requests/{pr_id}" + ) + + created_on = pr.get("created_on") + updated_on = pr.get("updated_on") + updated_dt = ( + datetime.fromisoformat(updated_on.replace("Z", "+00:00")).astimezone( + timezone.utc + ) + if isinstance(updated_on, str) + else None + ) + + source_branch = pr.get("source", {}).get("branch", {}).get("name", "") + destination_branch = pr.get("destination", {}).get("branch", {}).get("name", "") + + approved_by = [ + _get_user_name(p.get("user", {})) for p in participants if p.get("approved") + ] + + primary_owner = None + if author: + primary_owner = BasicExpertInfo( + display_name=_get_user_name(author), + ) + + # secondary_owners = [ + # BasicExpertInfo(display_name=_get_user_name(r)) for r in reviewers + # ] or None + + reviewer_names = [_get_user_name(r) for r in reviewers] + + # Create a concise summary of key PR info + created_date = created_on.split("T")[0] if created_on else "N/A" + updated_date = updated_on.split("T")[0] if updated_on else "N/A" + content_text = ( + "Pull Request Information:\n" + f"- Pull Request ID: {pr_id}\n" + f"- Title: {title}\n" + f"- State: {state or 'N/A'} {'(Draft)' if draft else ''}\n" + ) + if state == "DECLINED": + content_text += f"- Reason: {pr.get('reason', 'N/A')}\n" + content_text += ( + f"- Author: {_get_user_name(author) if author else 'N/A'}\n" + f"- Reviewers: {', '.join(reviewer_names) if reviewer_names else 'N/A'}\n" + f"- Branch: {source_branch} -> {destination_branch}\n" + f"- Created: {created_date}\n" + f"- Updated: {updated_date}" + ) + if description: + content_text += f"\n\nDescription:\n{description}" + + metadata: dict[str, str | list[str]] = { + "object_type": "PullRequest", + "workspace": workspace, + "repository": repo_slug, + "pr_key": f"{workspace}/{repo_slug}#{pr_id}", + "id": str(pr_id), + "title": title, + "state": state or "", + "draft": str(bool(draft)), + "link": link, + "author": _get_user_name(author) if author else "", + "reviewers": reviewer_names, + "approved_by": approved_by, + "comment_count": str(pr.get("comment_count", "")), + "task_count": str(pr.get("task_count", "")), + "created_on": created_on or "", + "updated_on": updated_on or "", + "source_branch": source_branch, + "destination_branch": destination_branch, + "closed_by": ( + _get_user_name(pr.get("closed_by", {})) if pr.get("closed_by") else "" + ), + "close_source_branch": str(bool(pr.get("close_source_branch", False))), + } + + name = sanitize_filename(title, "md") + + return Document( + id=f"{DocumentSource.BITBUCKET.value}:{workspace}:{repo_slug}:pr:{pr_id}", + blob=content_text.encode("utf-8"), + source=DocumentSource.BITBUCKET, + extension=".md", + semantic_identifier=f"#{pr_id}: {name}", + size_bytes=len(content_text.encode("utf-8")), + doc_updated_at=updated_dt, + primary_owners=[primary_owner] if primary_owner else None, + # secondary_owners=secondary_owners, + metadata=metadata, + ) + + +def _get_user_name(user: dict[str, Any]) -> str: + return user.get("display_name") or user.get("nickname") or "unknown" \ No newline at end of file diff --git a/common/data_source/config.py b/common/data_source/config.py index 64b30a051..843423294 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -13,6 +13,9 @@ def get_current_tz_offset() -> int: return round(time_diff.total_seconds() / 3600) +# Default request timeout, mostly used by connectors +REQUEST_TIMEOUT_SECONDS = int(os.environ.get("REQUEST_TIMEOUT_SECONDS") or 60) + ONE_MINUTE = 60 ONE_HOUR = 3600 ONE_DAY = ONE_HOUR * 24 @@ -58,6 +61,7 @@ class DocumentSource(str, Enum): GITHUB = "github" GITLAB = "gitlab" IMAP = "imap" + BITBUCKET = "bitbucket" ZENDESK = "zendesk" diff --git a/common/data_source/cross_connector_utils/__init__.py b/common/data_source/cross_connector_utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/common/data_source/cross_connector_utils/rate_limit_wrapper.py b/common/data_source/cross_connector_utils/rate_limit_wrapper.py new file mode 100644 index 000000000..bc0e0b470 --- /dev/null +++ b/common/data_source/cross_connector_utils/rate_limit_wrapper.py @@ -0,0 +1,126 @@ +import time +import logging +from collections.abc import Callable +from functools import wraps +from typing import Any +from typing import cast +from typing import TypeVar + +import requests + +F = TypeVar("F", bound=Callable[..., Any]) + + +class RateLimitTriedTooManyTimesError(Exception): + pass + + +class _RateLimitDecorator: + """Builds a generic wrapper/decorator for calls to external APIs that + prevents making more than `max_calls` requests per `period` + + Implementation inspired by the `ratelimit` library: + https://github.com/tomasbasham/ratelimit. + + NOTE: is not thread safe. + """ + + def __init__( + self, + max_calls: int, + period: float, # in seconds + sleep_time: float = 2, # in seconds + sleep_backoff: float = 2, # applies exponential backoff + max_num_sleep: int = 0, + ): + self.max_calls = max_calls + self.period = period + self.sleep_time = sleep_time + self.sleep_backoff = sleep_backoff + self.max_num_sleep = max_num_sleep + + self.call_history: list[float] = [] + self.curr_calls = 0 + + def __call__(self, func: F) -> F: + @wraps(func) + def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any: + # cleanup calls which are no longer relevant + self._cleanup() + + # check if we've exceeded the rate limit + sleep_cnt = 0 + while len(self.call_history) == self.max_calls: + sleep_time = self.sleep_time * (self.sleep_backoff**sleep_cnt) + logging.warning( + f"Rate limit exceeded for function {func.__name__}. " + f"Waiting {sleep_time} seconds before retrying." + ) + time.sleep(sleep_time) + sleep_cnt += 1 + if self.max_num_sleep != 0 and sleep_cnt >= self.max_num_sleep: + raise RateLimitTriedTooManyTimesError( + f"Exceeded '{self.max_num_sleep}' retries for function '{func.__name__}'" + ) + + self._cleanup() + + # add the current call to the call history + self.call_history.append(time.monotonic()) + return func(*args, **kwargs) + + return cast(F, wrapped_func) + + def _cleanup(self) -> None: + curr_time = time.monotonic() + time_to_expire_before = curr_time - self.period + self.call_history = [ + call_time + for call_time in self.call_history + if call_time > time_to_expire_before + ] + + +rate_limit_builder = _RateLimitDecorator + + +"""If you want to allow the external service to tell you when you've hit the rate limit, +use the following instead""" + +R = TypeVar("R", bound=Callable[..., requests.Response]) + + +def wrap_request_to_handle_ratelimiting( + request_fn: R, default_wait_time_sec: int = 30, max_waits: int = 30 +) -> R: + def wrapped_request(*args: list, **kwargs: dict[str, Any]) -> requests.Response: + for _ in range(max_waits): + response = request_fn(*args, **kwargs) + if response.status_code == 429: + try: + wait_time = int( + response.headers.get("Retry-After", default_wait_time_sec) + ) + except ValueError: + wait_time = default_wait_time_sec + + time.sleep(wait_time) + continue + + return response + + raise RateLimitTriedTooManyTimesError(f"Exceeded '{max_waits}' retries") + + return cast(R, wrapped_request) + + +_rate_limited_get = wrap_request_to_handle_ratelimiting(requests.get) +_rate_limited_post = wrap_request_to_handle_ratelimiting(requests.post) + + +class _RateLimitedRequest: + get = _rate_limited_get + post = _rate_limited_post + + +rl_requests = _RateLimitedRequest \ No newline at end of file diff --git a/common/data_source/cross_connector_utils/retry_wrapper.py b/common/data_source/cross_connector_utils/retry_wrapper.py new file mode 100644 index 000000000..a05584797 --- /dev/null +++ b/common/data_source/cross_connector_utils/retry_wrapper.py @@ -0,0 +1,88 @@ +from collections.abc import Callable +import logging +from logging import Logger +from typing import Any +from typing import cast +from typing import TypeVar +import requests +from retry import retry + +from common.data_source.config import REQUEST_TIMEOUT_SECONDS + + +F = TypeVar("F", bound=Callable[..., Any]) +logger = logging.getLogger(__name__) + +def retry_builder( + tries: int = 20, + delay: float = 0.1, + max_delay: float | None = 60, + backoff: float = 2, + jitter: tuple[float, float] | float = 1, + exceptions: type[Exception] | tuple[type[Exception], ...] = (Exception,), +) -> Callable[[F], F]: + """Builds a generic wrapper/decorator for calls to external APIs that + may fail due to rate limiting, flakes, or other reasons. Applies exponential + backoff with jitter to retry the call.""" + + def retry_with_default(func: F) -> F: + @retry( + tries=tries, + delay=delay, + max_delay=max_delay, + backoff=backoff, + jitter=jitter, + logger=cast(Logger, logger), + exceptions=exceptions, + ) + def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any: + return func(*args, **kwargs) + + return cast(F, wrapped_func) + + return retry_with_default + + +def request_with_retries( + method: str, + url: str, + *, + data: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + timeout: int = REQUEST_TIMEOUT_SECONDS, + stream: bool = False, + tries: int = 8, + delay: float = 1, + backoff: float = 2, +) -> requests.Response: + @retry(tries=tries, delay=delay, backoff=backoff, logger=cast(Logger, logger)) + def _make_request() -> requests.Response: + response = requests.request( + method=method, + url=url, + data=data, + headers=headers, + params=params, + timeout=timeout, + stream=stream, + ) + try: + response.raise_for_status() + except requests.exceptions.HTTPError: + logging.exception( + "Request failed:\n%s", + { + "method": method, + "url": url, + "data": data, + "headers": headers, + "params": params, + "timeout": timeout, + "stream": stream, + }, + ) + raise + return response + + return _make_request() \ No newline at end of file diff --git a/common/data_source/github/connector.py b/common/data_source/github/connector.py index 2e6d5f2af..6a9b96740 100644 --- a/common/data_source/github/connector.py +++ b/common/data_source/github/connector.py @@ -19,7 +19,7 @@ from github.PaginatedList import PaginatedList from github.PullRequest import PullRequest from pydantic import BaseModel from typing_extensions import override -from common.data_source.google_util.util import sanitize_filename +from common.data_source.utils import sanitize_filename from common.data_source.config import DocumentSource, GITHUB_CONNECTOR_BASE_URL from common.data_source.exceptions import ( ConnectorMissingCredentialError, diff --git a/common/data_source/gmail_connector.py b/common/data_source/gmail_connector.py index e64db9847..1421f9f4b 100644 --- a/common/data_source/gmail_connector.py +++ b/common/data_source/gmail_connector.py @@ -8,10 +8,10 @@ from common.data_source.config import INDEX_BATCH_SIZE, SLIM_BATCH_SIZE, Documen from common.data_source.google_util.auth import get_google_creds from common.data_source.google_util.constant import DB_CREDENTIALS_PRIMARY_ADMIN_KEY, MISSING_SCOPES_ERROR_STR, SCOPE_INSTRUCTIONS, USER_FIELDS from common.data_source.google_util.resource import get_admin_service, get_gmail_service -from common.data_source.google_util.util import _execute_single_retrieval, execute_paginated_retrieval, sanitize_filename, clean_string +from common.data_source.google_util.util import _execute_single_retrieval, execute_paginated_retrieval, clean_string from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch, SlimConnectorWithPermSync from common.data_source.models import BasicExpertInfo, Document, ExternalAccess, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SlimDocument, TextSection -from common.data_source.utils import build_time_range_query, clean_email_and_extract_name, get_message_body, is_mail_service_disabled_error, gmail_time_str_to_utc +from common.data_source.utils import build_time_range_query, clean_email_and_extract_name, get_message_body, is_mail_service_disabled_error, gmail_time_str_to_utc, sanitize_filename # Constants for Gmail API fields THREAD_LIST_FIELDS = "nextPageToken, threads(id)" diff --git a/common/data_source/google_util/util.py b/common/data_source/google_util/util.py index b1f0162a4..187c06d6d 100644 --- a/common/data_source/google_util/util.py +++ b/common/data_source/google_util/util.py @@ -191,42 +191,6 @@ def get_credentials_from_env(email: str, oauth: bool = False, source="drive") -> DB_CREDENTIALS_AUTHENTICATION_METHOD: "uploaded", } -def sanitize_filename(name: str, extension: str = "txt") -> str: - """ - Soft sanitize for MinIO/S3: - - Replace only prohibited characters with a space. - - Preserve readability (no ugly underscores). - - Collapse multiple spaces. - """ - if name is None: - return f"file.{extension}" - - name = str(name).strip() - - # Characters that MUST NOT appear in S3/MinIO object keys - # Replace them with a space (not underscore) - forbidden = r'[\\\?\#\%\*\:\|\<\>"]' - name = re.sub(forbidden, " ", name) - - # Replace slashes "/" (S3 interprets as folder) with space - name = name.replace("/", " ") - - # Collapse multiple spaces into one - name = re.sub(r"\s+", " ", name) - - # Trim both ends - name = name.strip() - - # Enforce reasonable max length - if len(name) > 200: - base, ext = os.path.splitext(name) - name = base[:180].rstrip() + ext - - if not os.path.splitext(name)[1]: - name += f".{extension}" - - return name - def clean_string(text: str | None) -> str | None: """ diff --git a/common/data_source/utils.py b/common/data_source/utils.py index da500f055..1866f9497 100644 --- a/common/data_source/utils.py +++ b/common/data_source/utils.py @@ -1150,6 +1150,42 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R next_ind += 1 del future_to_index[future] + +def sanitize_filename(name: str, extension: str = "txt") -> str: + """ + Soft sanitize for MinIO/S3: + - Replace only prohibited characters with a space. + - Preserve readability (no ugly underscores). + - Collapse multiple spaces. + """ + if name is None: + return f"file.{extension}" + + name = str(name).strip() + + # Characters that MUST NOT appear in S3/MinIO object keys + # Replace them with a space (not underscore) + forbidden = r'[\\\?\#\%\*\:\|\<\>"]' + name = re.sub(forbidden, " ", name) + + # Replace slashes "/" (S3 interprets as folder) with space + name = name.replace("/", " ") + + # Collapse multiple spaces into one + name = re.sub(r"\s+", " ", name) + + # Trim both ends + name = name.strip() + + # Enforce reasonable max length + if len(name) > 200: + base, ext = os.path.splitext(name) + name = base[:180].rstrip() + ext + + if not os.path.splitext(name)[1]: + name += f".{extension}" + + return name F = TypeVar("F", bound=Callable[..., Any]) class _RateLimitDecorator: @@ -1246,4 +1282,4 @@ def retry_builder( return cast(F, wrapped_func) - return retry_with_default \ No newline at end of file + return retry_with_default diff --git a/rag/res/synonym.json b/rag/res/synonym.json index 047303155..ea61b9e1c 100644 --- a/rag/res/synonym.json +++ b/rag/res/synonym.json @@ -10542,6 +10542,5 @@ "周五": ["礼拜五", "星期五"], "周六": ["礼拜六", "星期六"], "周日": ["礼拜日", "星期日", "星期天", "礼拜天"], -"上班": "办公", -"HELO":"agn" +"上班": "办公" } diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 8a4a6ab8a..81478ab9b 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -54,11 +54,13 @@ from common.data_source import ( ) from common.constants import FileSource, TaskStatus from common.data_source.config import INDEX_BATCH_SIZE +from common.data_source.models import ConnectorFailure from common.data_source.confluence_connector import ConfluenceConnector from common.data_source.gmail_connector import GmailConnector from common.data_source.box_connector import BoxConnector from common.data_source.github.connector import GithubConnector from common.data_source.gitlab_connector import GitlabConnector +from common.data_source.bitbucket.connector import BitbucketConnector from common.data_source.interfaces import CheckpointOutputWrapper from common.log_utils import init_root_logger from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc @@ -1107,6 +1109,67 @@ class Gitlab(SyncBase): logging.info("Connect to Gitlab: ({}) {}".format(self.conf["project_name"], begin_info)) return document_generator + +class Bitbucket(SyncBase): + SOURCE_NAME: str = FileSource.BITBUCKET + + async def _generate(self, task: dict): + self.connector = BitbucketConnector( + workspace=self.conf.get("workspace"), + repositories=self.conf.get("repository_slugs"), + projects=self.conf.get("projects"), + ) + + self.connector.load_credentials( + { + "bitbucket_email": self.conf["credentials"].get("bitbucket_account_email"), + "bitbucket_api_token": self.conf["credentials"].get("bitbucket_api_token"), + } + ) + + if task["reindex"] == "1" or not task["poll_range_start"]: + start_time = datetime.fromtimestamp(0, tz=timezone.utc) + begin_info = "totally" + else: + start_time = task.get("poll_range_start") + begin_info = f"from {start_time}" + + end_time = datetime.now(timezone.utc) + + def document_batches(): + checkpoint = self.connector.build_dummy_checkpoint() + + while checkpoint.has_more: + gen = self.connector.load_from_checkpoint( + start=start_time.timestamp(), + end=end_time.timestamp(), + checkpoint=checkpoint) + + while True: + try: + item = next(gen) + if isinstance(item, ConnectorFailure): + logging.exception( + "Bitbucket connector failure: %s", + item.failure_message) + break + yield [item] + except StopIteration as e: + checkpoint = e.value + break + + async def async_wrapper(): + for batch in document_batches(): + yield batch + + logging.info( + "Connect to Bitbucket: workspace(%s), %s", + self.conf.get("workspace"), + begin_info, + ) + + return async_wrapper() + func_factory = { FileSource.S3: S3, FileSource.R2: R2, @@ -1131,6 +1194,7 @@ func_factory = { FileSource.ZENDESK: Zendesk, FileSource.GITHUB: Github, FileSource.GITLAB: Gitlab, + FileSource.BITBUCKET: Bitbucket, } diff --git a/web/src/assets/svg/data-source/bitbucket.svg b/web/src/assets/svg/data-source/bitbucket.svg new file mode 100644 index 000000000..894ed83bf --- /dev/null +++ b/web/src/assets/svg/data-source/bitbucket.svg @@ -0,0 +1,7 @@ + + \ No newline at end of file diff --git a/web/src/locales/de.ts b/web/src/locales/de.ts index c9cd09b52..4297ad73e 100644 --- a/web/src/locales/de.ts +++ b/web/src/locales/de.ts @@ -947,6 +947,19 @@ Beispiel: Virtual Hosted Style`, 'Laden Sie das OAuth-JSON hoch, das von der Google Console generiert wurde. Wenn es nur Client-Anmeldeinformationen enthält, führen Sie die browserbasierte Überprüfung einmal durch, um langlebige Refresh-Token zu erstellen.', dropboxDescription: 'Verbinden Sie Ihre Dropbox, um Dateien und Ordner von einem ausgewählten Konto zu synchronisieren.', + bitbucketDescription: + 'Bitbucket verbinden, um PR-Inhalte zu synchronisieren.', + zendeskDescription: + 'Verbinden Sie Ihr Zendesk, um Tickets, Artikel und andere Inhalte zu synchronisieren.', + bitbucketTopWorkspaceTip: + 'Der zu indizierende Bitbucket-Workspace (z. B. "atlassian" aus https://bitbucket.org/atlassian/workspace )', + bitbucketWorkspaceTip: + 'Dieser Connector indiziert alle Repositories im Workspace.', + bitbucketProjectsTip: 'Kommagetrennte Projekt-Keys, z. B.: PROJ1,PROJ2', + bitbucketRepositorySlugsTip: + 'Kommagetrennte Repository-Slugs, z. B.: repo-one,repo-two', + connectorNameTip: + 'Geben Sie einen aussagekräftigen Namen für den Connector an', boxDescription: 'Verbinden Sie Ihr Box-Laufwerk, um Dateien und Ordner zu synchronisieren.', githubDescription: diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 3915660ac..385dae580 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -879,6 +879,7 @@ This auto-tagging feature enhances retrieval by adding another layer of domain-s cropImage: 'Crop image', selectModelPlaceholder: 'Select model', configureModelTitle: 'Configure model', + connectorNameTip: 'A descriptive name for the connector', confluenceIsCloudTip: 'Check if this is a Confluence Cloud instance, uncheck for Confluence Server/Data Center', confluenceWikiBaseUrlTip: @@ -923,7 +924,9 @@ Example: Virtual Hosted Style`, google_driveTokenTip: 'Upload the OAuth token JSON generated from the OAuth helper or Google Cloud Console. You may also upload a client_secret JSON from an "installed" or "web" application. If this is your first sync, a browser window will open to complete the OAuth consent. If the JSON already contains a refresh token, it will be reused automatically.', google_drivePrimaryAdminTip: - 'Email address that has access to the Drive content being synced.', + 'Email address that has access to the Drive content being synced', + zendeskDescription: + 'Connect your Zendesk to sync tickets, articles, and other content.', google_driveMyDriveEmailsTip: 'Comma-separated emails whose "My Drive" contents should be indexed (include the primary admin).', google_driveSharedFoldersTip: @@ -934,7 +937,16 @@ Example: Virtual Hosted Style`, 'Upload the OAuth JSON generated from Google Console. If it only contains client credentials, run the browser-based verification once to mint long-lived refresh tokens.', dropboxDescription: 'Connect your Dropbox to sync files and folders from a chosen account.', + bitbucketDescription: 'Connect Bitbucket to sync PR content.', + bitbucketTopWorkspaceTip: + 'The Bitbucket workspace to index (e.g., "atlassian" from https://bitbucket.org/atlassian/workspace ).', + bitbucketRepositorySlugsTip: + 'Comma separated repository slugs. E.g., repo-one,repo-two', + bitbucketProjectsTip: 'Comma separated project keys. E.g., PROJ1,PROJ2', + bitbucketWorkspaceTip: + 'This connector will index all repositories in the workspace.', boxDescription: 'Connect your Box drive to sync files and folders.', + githubDescription: 'Connect GitHub to sync pull requests and issues for retrieval.', airtableDescription: diff --git a/web/src/locales/ru.ts b/web/src/locales/ru.ts index 37ff431a9..2744b10d4 100644 --- a/web/src/locales/ru.ts +++ b/web/src/locales/ru.ts @@ -731,6 +731,7 @@ export default { newDocs: 'Новые документы', timeStarted: 'Время начала', log: 'Лог', + connectorNameTip: 'Укажите понятное имя для коннектора', confluenceDescription: 'Интегрируйте ваше рабочее пространство Confluence для поиска документации.', s3Description: @@ -747,6 +748,18 @@ export default { 'Синхронизируйте страницы и базы данных из Notion для извлечения знаний.', boxDescription: 'Подключите ваш диск Box для синхронизации файлов и папок.', + bitbucketDescription: + 'Подключите Bitbucket для синхронизации содержимого PR.', + zendeskDescription: + 'Подключите Zendesk для синхронизации тикетов, статей и другого контента.', + bitbucketTopWorkspaceTip: + 'Рабочее пространство Bitbucket для индексации (например, "atlassian" из https://bitbucket.org/atlassian/workspace )', + bitbucketWorkspaceTip: + 'Этот коннектор проиндексирует все репозитории в рабочем пространстве.', + bitbucketProjectsTip: + 'Ключи проектов через запятую, например: PROJ1,PROJ2', + bitbucketRepositorySlugsTip: + 'Слоги репозиториев через запятую, например: repo-one,repo-two', githubDescription: 'Подключите GitHub для синхронизации содержимого Pull Request и Issue для поиска.', airtableDescription: diff --git a/web/src/locales/zh-traditional.ts b/web/src/locales/zh-traditional.ts index e72449f4d..8eb147351 100644 --- a/web/src/locales/zh-traditional.ts +++ b/web/src/locales/zh-traditional.ts @@ -726,6 +726,16 @@ export default { view: '查看', modelsToBeAddedTooltip: '若您的模型供應商未列於此處,但宣稱與 OpenAI 相容,可透過選擇「OpenAI-API-compatible」卡片來設定相關模型。', + dropboxDescription: '連接 Dropbox,同步指定帳號下的文件與文件夾。', + bitbucketDescription: '連接 Bitbucket,同步 PR 內容。', + zendeskDescription: '連接 Zendesk,同步工單、文章及其他內容。', + bitbucketTopWorkspaceTip: + '要索引的 Bitbucket 工作區(例如:https://bitbucket.org/atlassian/workspace 中的 "atlassian")', + bitbucketWorkspaceTip: '此連接器將索引工作區下的所有倉庫。', + bitbucketRepositorySlugsTip: + '以英文逗號分隔的倉庫 slug,例如:repo-one,repo-two', + bitbucketProjectsTip: '以英文逗號分隔的項目鍵,例如:PROJ1,PROJ2', + connectorNameTip: '為連接器填寫一個有意義的名稱', }, message: { registered: '註冊成功', diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index 903987e2d..96d36a312 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -53,6 +53,7 @@ export default { noData: '暂无数据', bedrockCredentialsHint: '提示:Access Key / Secret Key 可留空,以启用 AWS IAM 自动验证。', + zendeskDescription: '连接 Zendesk,同步工单、文章及其他内容。', promptPlaceholder: '请输入或使用 / 快速插入变量。', selected: '已选择', }, @@ -864,6 +865,14 @@ General:实体和关系提取提示来自 GitHub - microsoft/graphrag:基于 '请上传由 Google Console 生成的 OAuth JSON。如果仅包含 client credentials,请通过浏览器授权一次以获取长期有效的刷新 Token。', dropboxDescription: '连接 Dropbox,同步指定账号下的文件与文件夹。', boxDescription: '连接你的 Box 云盘以同步文件和文件夹。', + bitbucketDescription: '连接 Bitbucket,同步 PR 内容。', + bitbucketTopWorkspaceTip: + '要索引的 Bitbucket 工作区(例如:https://bitbucket.org/atlassian/workspace 中的 "atlassian")', + bitbucketWorkspaceTip: '该连接器将索引工作区下的所有仓库。', + bitbucketProjectsTip: '用英文逗号分隔的项目 key,例如:PROJ1,PROJ2', + bitbucketRepositorySlugsTip: + '用英文逗号分隔的仓库 slug,例如:repo-one,repo-two', + connectorNameTip: '为连接器命名', githubDescription: '连接 GitHub,可同步 Pull Request 与 Issue 内容用于检索。', airtableDescription: '连接 Airtable,同步指定工作区下指定表格中的文件。', diff --git a/web/src/pages/user-setting/data-source/component/blob-token-field.tsx b/web/src/pages/user-setting/data-source/component/blob-token-field.tsx deleted file mode 100644 index 11fe22804..000000000 --- a/web/src/pages/user-setting/data-source/component/blob-token-field.tsx +++ /dev/null @@ -1,247 +0,0 @@ -import { useEffect, useMemo, useState } from 'react'; -import { useFormContext } from 'react-hook-form'; - -import { SelectWithSearch } from '@/components/originui/select-with-search'; -import { RAGFlowFormItem } from '@/components/ragflow-form'; -import { Input } from '@/components/ui/input'; -import { Segmented } from '@/components/ui/segmented'; -import { t } from 'i18next'; - -// UI-only auth modes for S3 -// access_key: Access Key ID + Secret -// iam_role: only Role ARN -// assume_role: no input fields (uses environment role) -type AuthMode = 'access_key' | 'iam_role' | 'assume_role'; -type BlobMode = 's3' | 's3_compatible'; - -const modeOptions = [ - { label: 'S3', value: 's3' }, - { label: 'S3 Compatible', value: 's3_compatible' }, -]; - -const authOptions = [ - { label: 'Access Key', value: 'access_key' }, - { label: 'IAM Role', value: 'iam_role' }, - { label: 'Assume Role', value: 'assume_role' }, -]; - -const addressingOptions = [ - { label: 'Virtual Hosted Style', value: 'virtual' }, - { label: 'Path Style', value: 'path' }, -]; - -const deriveInitialAuthMode = (credentials: any): AuthMode => { - const authMethod = credentials?.authentication_method; - if (authMethod === 'iam_role') return 'iam_role'; - if (authMethod === 'assume_role') return 'assume_role'; - if (credentials?.aws_role_arn) return 'iam_role'; - if (credentials?.aws_access_key_id || credentials?.aws_secret_access_key) - return 'access_key'; - return 'access_key'; -}; - -const deriveInitialMode = (bucketType?: string): BlobMode => - bucketType === 's3_compatible' ? 's3_compatible' : 's3'; - -const BlobTokenField = () => { - const form = useFormContext(); - const credentials = form.watch('config.credentials'); - const watchedBucketType = form.watch('config.bucket_type'); - - const [mode, setMode] = useState( - deriveInitialMode(watchedBucketType), - ); - const [authMode, setAuthMode] = useState(() => - deriveInitialAuthMode(credentials), - ); - - // Keep bucket_type in sync with UI mode - useEffect(() => { - const nextMode = deriveInitialMode(watchedBucketType); - setMode((prev) => (prev === nextMode ? prev : nextMode)); - }, [watchedBucketType]); - - useEffect(() => { - form.setValue('config.bucket_type', mode, { shouldDirty: true }); - // Default addressing style for compatible mode - if ( - mode === 's3_compatible' && - !form.getValues('config.credentials.addressing_style') - ) { - form.setValue('config.credentials.addressing_style', 'virtual', { - shouldDirty: false, - }); - } - if (mode === 's3_compatible' && authMode !== 'access_key') { - setAuthMode('access_key'); - } - // Persist authentication_method for backend - const nextAuthMethod: AuthMode = - mode === 's3_compatible' ? 'access_key' : authMode; - form.setValue('config.credentials.authentication_method', nextAuthMethod, { - shouldDirty: true, - }); - // Clear errors for fields that are not relevant in the current mode/auth selection - const inactiveFields: string[] = []; - if (mode === 's3_compatible') { - inactiveFields.push('config.credentials.aws_role_arn'); - } else { - if (authMode === 'iam_role') { - inactiveFields.push('config.credentials.aws_access_key_id'); - inactiveFields.push('config.credentials.aws_secret_access_key'); - } - if (authMode === 'assume_role') { - inactiveFields.push('config.credentials.aws_access_key_id'); - inactiveFields.push('config.credentials.aws_secret_access_key'); - inactiveFields.push('config.credentials.aws_role_arn'); - } - } - if (inactiveFields.length) { - form.clearErrors(inactiveFields as any); - } - }, [form, mode, authMode]); - - const isS3 = mode === 's3'; - const requiresAccessKey = - authMode === 'access_key' || mode === 's3_compatible'; - const requiresRoleArn = isS3 && authMode === 'iam_role'; - - // Help text for assume role (no inputs) - const assumeRoleNote = useMemo( - () => t('No credentials required. Uses the default environment role.'), - [t], - ); - - return ( -
-
-
Mode
- setMode(val as BlobMode)} - className="w-full" - itemClassName="flex-1 justify-center" - /> -
- - {isS3 && ( -
-
Authentication
- setAuthMode(val as AuthMode)} - className="w-full" - itemClassName="flex-1 justify-center" - /> -
- )} - - {requiresAccessKey && ( - - requiresAccessKey - ? Boolean(val) || 'Access Key ID is required' - : true, - }} - > - {(field) => ( - - )} - - )} - - {requiresAccessKey && ( - - requiresAccessKey - ? Boolean(val) || 'Secret Access Key is required' - : true, - }} - > - {(field) => ( - - )} - - )} - - {requiresRoleArn && ( - - requiresRoleArn ? Boolean(val) || 'Role ARN is required' : true, - }} - > - {(field) => ( - - )} - - )} - - {isS3 && authMode === 'assume_role' && ( -
- {assumeRoleNote} -
- )} - - {mode === 's3_compatible' && ( -
- - {(field) => ( - field.onChange(val)} - /> - )} - - - - {(field) => ( - - )} - -
- )} -
- ); -}; - -export default BlobTokenField; diff --git a/web/src/pages/user-setting/data-source/component/box-token-field.tsx b/web/src/pages/user-setting/data-source/component/box-token-field.tsx index 3bb805868..b482c25aa 100644 --- a/web/src/pages/user-setting/data-source/component/box-token-field.tsx +++ b/web/src/pages/user-setting/data-source/component/box-token-field.tsx @@ -131,7 +131,6 @@ const BoxTokenField = ({ value, onChange }: BoxTokenFieldProps) => { const finalValue: Record = { ...rest, - // 确保客户端配置字段有值(优先后端返回,其次当前输入) client_id: rest.client_id ?? clientId.trim(), client_secret: rest.client_secret ?? clientSecret.trim(), }; @@ -146,8 +145,6 @@ const BoxTokenField = ({ value, onChange }: BoxTokenFieldProps) => { finalValue.authorization_code = code; } - // access_token / refresh_token 由后端返回,已在 ...rest 中带上,无需额外 state - onChange(JSON.stringify(finalValue)); message.success('Box authorization completed.'); clearWebState(); diff --git a/web/src/pages/user-setting/data-source/component/confluence-token-field.tsx b/web/src/pages/user-setting/data-source/component/confluence-token-field.tsx deleted file mode 100644 index 6c7e201d4..000000000 --- a/web/src/pages/user-setting/data-source/component/confluence-token-field.tsx +++ /dev/null @@ -1,200 +0,0 @@ -import { useCallback, useEffect, useMemo, useState } from 'react'; -import { ControllerRenderProps, useFormContext } from 'react-hook-form'; - -import { Checkbox } from '@/components/ui/checkbox'; -import { Input } from '@/components/ui/input'; -import { cn } from '@/lib/utils'; -import { debounce } from 'lodash'; - -/* ---------------- Token Field ---------------- */ - -export type ConfluenceTokenFieldProps = ControllerRenderProps & { - fieldType: 'username' | 'token'; - placeholder?: string; - disabled?: boolean; -}; - -const ConfluenceTokenField = ({ - fieldType, - value, - onChange, - placeholder, - disabled, - ...rest -}: ConfluenceTokenFieldProps) => { - return ( -
- onChange(e.target.value)} - placeholder={ - placeholder || - (fieldType === 'token' - ? 'Enter your Confluence access token' - : 'Confluence username or email') - } - disabled={disabled} - {...rest} - /> -
- ); -}; - -/* ---------------- Indexing Mode Field ---------------- */ - -type ConfluenceIndexingMode = 'everything' | 'space' | 'page'; - -export type ConfluenceIndexingModeFieldProps = ControllerRenderProps; - -export const ConfluenceIndexingModeField = ( - fieldProps: ControllerRenderProps, -) => { - const { value, onChange, disabled } = fieldProps; - const [mode, setMode] = useState( - value || 'everything', - ); - const { watch, setValue } = useFormContext(); - - useEffect(() => setMode(value), [value]); - - const spaceValue = watch('config.space'); - const pageIdValue = watch('config.page_id'); - const indexRecursively = watch('config.index_recursively'); - - useEffect(() => { - if (!value) onChange('everything'); - }, [value, onChange]); - - const handleModeChange = useCallback( - (nextMode?: string) => { - let normalized: ConfluenceIndexingMode = 'everything'; - if (nextMode) { - normalized = nextMode as ConfluenceIndexingMode; - setMode(normalized); - onChange(normalized); - } else { - setMode(mode); - normalized = mode; - onChange(mode); - // onChange(mode); - } - if (normalized === 'everything') { - setValue('config.space', ''); - setValue('config.page_id', ''); - setValue('config.index_recursively', false); - } else if (normalized === 'space') { - setValue('config.page_id', ''); - setValue('config.index_recursively', false); - } else if (normalized === 'page') { - setValue('config.space', ''); - } - }, - [mode, onChange, setValue], - ); - - const debouncedHandleChange = useMemo( - () => - debounce(() => { - handleModeChange(); - }, 300), - [handleModeChange], - ); - - return ( -
-
- {INDEX_MODE_OPTIONS.map((option) => { - const isActive = option.value === mode; - return ( - - ); - })} -
- - {mode === 'everything' && ( -

- This connector will index all pages the provided credentials have - access to. -

- )} - - {mode === 'space' && ( -
-
- Space Key -
- { - const value = e.target.value; - setValue('config.space', value); - debouncedHandleChange(); - }} - placeholder="e.g. KB" - disabled={disabled} - /> -

- The Confluence space key to index. -

-
- )} - - {mode === 'page' && ( -
-
Page ID
- { - setValue('config.page_id', e.target.value); - debouncedHandleChange(); - }} - placeholder="e.g. 123456" - disabled={disabled} - /> -

- The Confluence page ID to index. -

- -
- { - setValue('config.index_recursively', Boolean(checked)); - debouncedHandleChange(); - }} - disabled={disabled} - /> - - Index child pages recursively - -
-
- )} -
- ); -}; - -const INDEX_MODE_OPTIONS = [ - { label: 'Everything', value: 'everything' }, - { label: 'Space', value: 'space' }, - { label: 'Page', value: 'page' }, -]; - -export default ConfluenceTokenField; diff --git a/web/src/pages/user-setting/data-source/constant/bitbucket-constant.tsx b/web/src/pages/user-setting/data-source/constant/bitbucket-constant.tsx new file mode 100644 index 000000000..83f33c07f --- /dev/null +++ b/web/src/pages/user-setting/data-source/constant/bitbucket-constant.tsx @@ -0,0 +1,83 @@ +import { FilterFormField, FormFieldType } from '@/components/dynamic-form'; +import { TFunction } from 'i18next'; + +export const bitbucketConstant = (t: TFunction) => [ + { + label: 'Bitbucket Account Email', + name: 'config.credentials.bitbucket_account_email', + type: FormFieldType.Email, + required: true, + }, + { + label: 'Bitbucket API Token', + name: 'config.credentials.bitbucket_api_token', + type: FormFieldType.Password, + required: true, + }, + { + label: 'Workspace', + name: 'config.workspace', + type: FormFieldType.Text, + required: true, + tooltip: t('setting.bitbucketTopWorkspaceTip'), + }, + { + label: 'Index Mode', + name: 'config.index_mode', + type: FormFieldType.Segmented, + options: [ + { label: 'Repositories', value: 'repositories' }, + { label: 'Project(s)', value: 'projects' }, + { label: 'Workspace', value: 'workspace' }, + ], + }, + { + label: 'Repository Slugs', + name: 'config.repository_slugs', + type: FormFieldType.Text, + customValidate: (val: string, formValues: any) => { + const index_mode = formValues?.config?.index_mode; + if (!val && index_mode === 'repositories') { + return 'Repository Slugs is required'; + } + return true; + }, + shouldRender: (formValues: any) => { + const index_mode = formValues?.config?.index_mode; + return index_mode === 'repositories'; + }, + tooltip: t('setting.bitbucketRepositorySlugsTip'), + }, + { + label: 'Projects', + name: 'config.projects', + type: FormFieldType.Text, + customValidate: (val: string, formValues: any) => { + const index_mode = formValues?.config?.index_mode; + if (!val && index_mode === 'projects') { + return 'Projects is required'; + } + return true; + }, + shouldRender: (formValues: any) => { + const index_mode = formValues?.config?.index_mode; + console.log('formValues.config', formValues?.config); + return index_mode === 'projects'; + }, + tooltip: t('setting.bitbucketProjectsTip'), + }, + { + name: FilterFormField + '.tip', + label: ' ', + type: FormFieldType.Custom, + shouldRender: (formValues: any) => { + const index_mode = formValues?.config?.index_mode; + return index_mode === 'workspace'; + }, + render: () => ( +
+ {t('setting.bitbucketWorkspaceTip')} +
+ ), + }, +]; diff --git a/web/src/pages/user-setting/data-source/constant/confluence-constant.tsx b/web/src/pages/user-setting/data-source/constant/confluence-constant.tsx new file mode 100644 index 000000000..48e2da47c --- /dev/null +++ b/web/src/pages/user-setting/data-source/constant/confluence-constant.tsx @@ -0,0 +1,121 @@ +import { FilterFormField, FormFieldType } from '@/components/dynamic-form'; +import { TFunction } from 'i18next'; + +export const confluenceConstant = (t: TFunction) => [ + { + label: 'Confluence Username', + name: 'config.credentials.confluence_username', + type: FormFieldType.Text, + required: true, + tooltip: t('setting.connectorNameTip'), + }, + { + label: 'Confluence Access Token', + name: 'config.credentials.confluence_access_token', + type: FormFieldType.Password, + required: true, + }, + { + label: 'Wiki Base URL', + name: 'config.wiki_base', + type: FormFieldType.Text, + required: false, + tooltip: t('setting.confluenceWikiBaseUrlTip'), + }, + { + label: 'Is Cloud', + name: 'config.is_cloud', + type: FormFieldType.Checkbox, + required: false, + tooltip: t('setting.confluenceIsCloudTip'), + }, + { + label: 'Index Mode', + name: 'config.index_mode', + type: FormFieldType.Segmented, + options: [ + { label: 'Everything', value: 'everything' }, + { label: 'Space', value: 'space' }, + { label: 'Page', value: 'page' }, + ], + }, + { + name: 'config.page_id', + label: 'Page ID', + type: FormFieldType.Text, + customValidate: (val: string, formValues: any) => { + const index_mode = formValues?.config?.index_mode; + console.log('index_mode', index_mode, val); + if (!val && index_mode === 'page') { + return 'Page ID is required'; + } + return true; + }, + shouldRender: (formValues: any) => { + const index_mode = formValues?.config?.index_mode; + return index_mode === 'page'; + }, + }, + { + name: 'config.space', + label: 'Space Key', + type: FormFieldType.Text, + customValidate: (val: string, formValues: any) => { + const index_mode = formValues?.config?.index_mode; + if (!val && index_mode === 'space') { + return 'Space Key is required'; + } + return true; + }, + shouldRender: (formValues: any) => { + const index_mode = formValues?.config?.index_mode; + return index_mode === 'space'; + }, + }, + { + name: 'config.index_recursively', + label: 'Index Recursively', + type: FormFieldType.Checkbox, + shouldRender: (formValues: any) => { + const index_mode = formValues?.config?.index_mode; + return index_mode === 'page'; + }, + }, + { + name: FilterFormField + '.tip', + label: ' ', + type: FormFieldType.Custom, + shouldRender: (formValues: any) => { + const index_mode = formValues?.config?.index_mode; + return index_mode === 'everything'; + }, + render: () => ( +
+ { + 'This choice will index all pages the provided credentials have access to.' + } +
+ ), + }, + { + label: 'Space Key', + name: 'config.space', + type: FormFieldType.Text, + required: false, + hidden: true, + }, + { + label: 'Page ID', + name: 'config.page_id', + type: FormFieldType.Text, + required: false, + hidden: true, + }, + { + label: 'Index Recursively', + name: 'config.index_recursively', + type: FormFieldType.Checkbox, + required: false, + hidden: true, + }, +]; diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 804ecc2eb..8b9193c18 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -4,11 +4,13 @@ import { t, TFunction } from 'i18next'; import { useEffect, useState } from 'react'; import { useTranslation } from 'react-i18next'; import BoxTokenField from '../component/box-token-field'; -import { ConfluenceIndexingModeField } from '../component/confluence-token-field'; import GmailTokenField from '../component/gmail-token-field'; import GoogleDriveTokenField from '../component/google-drive-token-field'; import { IDataSourceInfoMap } from '../interface'; +import { bitbucketConstant } from './bitbucket-constant'; +import { confluenceConstant } from './confluence-constant'; import { S3Constant } from './s3-constant'; + export enum DataSourceKey { CONFLUENCE = 'confluence', S3 = 's3', @@ -29,6 +31,7 @@ export enum DataSourceKey { ASANA = 'asana', IMAP = 'imap', GITHUB = 'github', + BITBUCKET = 'bitbucket', ZENDESK = 'zendesk', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', @@ -134,6 +137,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.IMAP}Description`), icon: , }, + [DataSourceKey.BITBUCKET]: { + name: 'Bitbucket', + description: t(`setting.${DataSourceKey.BITBUCKET}Description`), + icon: , + }, [DataSourceKey.ZENDESK]: { name: 'Zendesk', description: t(`setting.${DataSourceKey.ZENDESK}Description`), @@ -294,67 +302,7 @@ export const DataSourceFormFields = { }, ], - [DataSourceKey.CONFLUENCE]: [ - { - label: 'Confluence Username', - name: 'config.credentials.confluence_username', - type: FormFieldType.Text, - required: true, - tooltip: 'A descriptive name for the connector.', - }, - { - label: 'Confluence Access Token', - name: 'config.credentials.confluence_access_token', - type: FormFieldType.Password, - required: true, - }, - { - label: 'Wiki Base URL', - name: 'config.wiki_base', - type: FormFieldType.Text, - required: false, - tooltip: t('setting.confluenceWikiBaseUrlTip'), - }, - { - label: 'Is Cloud', - name: 'config.is_cloud', - type: FormFieldType.Checkbox, - required: false, - tooltip: t('setting.confluenceIsCloudTip'), - }, - { - label: 'Index Method', - name: 'config.index_mode', - type: FormFieldType.Text, - required: false, - horizontal: true, - labelClassName: 'self-start pt-4', - render: (fieldProps: any) => ( - - ), - }, - { - label: 'Space Key', - name: 'config.space', - type: FormFieldType.Text, - required: false, - hidden: true, - }, - { - label: 'Page ID', - name: 'config.page_id', - type: FormFieldType.Text, - required: false, - hidden: true, - }, - { - label: 'Index Recursively', - name: 'config.index_recursively', - type: FormFieldType.Checkbox, - required: false, - hidden: true, - }, - ], + [DataSourceKey.CONFLUENCE]: confluenceConstant(t), [DataSourceKey.GOOGLE_DRIVE]: [ { label: 'Primary Admin Email', @@ -828,6 +776,7 @@ export const DataSourceFormFields = { required: false, }, ], + [DataSourceKey.BITBUCKET]: bitbucketConstant(t), [DataSourceKey.ZENDESK]: [ { label: 'Zendesk Domain', @@ -919,6 +868,7 @@ export const DataSourceFormDefaultValues = { wiki_base: '', is_cloud: true, space: '', + page_id: '', credentials: { confluence_username: '', confluence_access_token: '', @@ -1112,6 +1062,19 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.BITBUCKET]: { + name: '', + source: DataSourceKey.BITBUCKET, + config: { + workspace: '', + index_mode: 'workspace', + repository_slugs: '', + projects: '', + }, + credentials: { + bitbucket_api_token: '', + }, + }, [DataSourceKey.ZENDESK]: { name: '', source: DataSourceKey.ZENDESK,