diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py
index 0802a5285..611c3c61a 100644
--- a/common/data_source/__init__.py
+++ b/common/data_source/__init__.py
@@ -11,7 +11,7 @@ from .confluence_connector import ConfluenceConnector
from .discord_connector import DiscordConnector
from .dropbox_connector import DropboxConnector
from .google_drive.connector import GoogleDriveConnector
-from .jira_connector import JiraConnector
+from .jira.connector import JiraConnector
from .sharepoint_connector import SharePointConnector
from .teams_connector import TeamsConnector
from .config import BlobType, DocumentSource
diff --git a/common/data_source/config.py b/common/data_source/config.py
index 02684dbac..e4040f85e 100644
--- a/common/data_source/config.py
+++ b/common/data_source/config.py
@@ -13,6 +13,7 @@ def get_current_tz_offset() -> int:
return round(time_diff.total_seconds() / 3600)
+ONE_MINUTE = 60
ONE_HOUR = 3600
ONE_DAY = ONE_HOUR * 24
@@ -42,6 +43,7 @@ class DocumentSource(str, Enum):
OCI_STORAGE = "oci_storage"
SLACK = "slack"
CONFLUENCE = "confluence"
+ JIRA = "jira"
GOOGLE_DRIVE = "google_drive"
GMAIL = "gmail"
DISCORD = "discord"
@@ -178,6 +180,21 @@ GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD = int(
os.environ.get("GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024)
)
+JIRA_CONNECTOR_LABELS_TO_SKIP = [
+ ignored_tag
+ for ignored_tag in os.environ.get("JIRA_CONNECTOR_LABELS_TO_SKIP", "").split(",")
+ if ignored_tag
+]
+JIRA_CONNECTOR_MAX_TICKET_SIZE = int(
+ os.environ.get("JIRA_CONNECTOR_MAX_TICKET_SIZE", 100 * 1024)
+)
+JIRA_SYNC_TIME_BUFFER_SECONDS = int(
+ os.environ.get("JIRA_SYNC_TIME_BUFFER_SECONDS", ONE_MINUTE)
+)
+JIRA_TIMEZONE_OFFSET = float(
+ os.environ.get("JIRA_TIMEZONE_OFFSET", get_current_tz_offset())
+)
+
OAUTH_SLACK_CLIENT_ID = os.environ.get("OAUTH_SLACK_CLIENT_ID", "")
OAUTH_SLACK_CLIENT_SECRET = os.environ.get("OAUTH_SLACK_CLIENT_SECRET", "")
OAUTH_CONFLUENCE_CLOUD_CLIENT_ID = os.environ.get(
diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py
index aed16ad2b..821f79862 100644
--- a/common/data_source/confluence_connector.py
+++ b/common/data_source/confluence_connector.py
@@ -1788,6 +1788,7 @@ class ConfluenceConnector(
cql_url = self.confluence_client.build_cql_url(
page_query, expand=",".join(_PAGE_EXPANSION_FIELDS)
)
+ logging.info(f"[Confluence Connector] Building CQL URL {cql_url}")
return update_param_in_path(cql_url, "limit", str(limit))
@override
diff --git a/common/data_source/jira/__init__.py b/common/data_source/jira/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/common/data_source/jira/connector.py b/common/data_source/jira/connector.py
new file mode 100644
index 000000000..4635d72f3
--- /dev/null
+++ b/common/data_source/jira/connector.py
@@ -0,0 +1,973 @@
+"""Checkpointed Jira connector that emits markdown blobs for each issue."""
+
+from __future__ import annotations
+
+import argparse
+import copy
+import logging
+import os
+import re
+from collections.abc import Callable, Generator, Iterable, Iterator, Sequence
+from datetime import datetime, timedelta, timezone
+from typing import Any
+from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
+
+from jira import JIRA
+from jira.resources import Issue
+from pydantic import Field
+
+from common.data_source.config import (
+ INDEX_BATCH_SIZE,
+ JIRA_CONNECTOR_LABELS_TO_SKIP,
+ JIRA_CONNECTOR_MAX_TICKET_SIZE,
+ JIRA_TIMEZONE_OFFSET,
+ ONE_HOUR,
+ DocumentSource,
+)
+from common.data_source.exceptions import (
+ ConnectorMissingCredentialError,
+ ConnectorValidationError,
+ InsufficientPermissionsError,
+ UnexpectedValidationError,
+)
+from common.data_source.interfaces import (
+ CheckpointedConnectorWithPermSync,
+ CheckpointOutputWrapper,
+ SecondsSinceUnixEpoch,
+ SlimConnectorWithPermSync,
+)
+from common.data_source.jira.utils import (
+ JIRA_CLOUD_API_VERSION,
+ JIRA_SERVER_API_VERSION,
+ build_issue_url,
+ extract_body_text,
+ extract_named_value,
+ extract_user,
+ format_attachments,
+ format_comments,
+ parse_jira_datetime,
+ should_skip_issue,
+)
+from common.data_source.models import (
+ ConnectorCheckpoint,
+ ConnectorFailure,
+ Document,
+ DocumentFailure,
+ SlimDocument,
+)
+from common.data_source.utils import is_atlassian_cloud_url, is_atlassian_date_error, scoped_url
+
+logger = logging.getLogger(__name__)
+
+_DEFAULT_FIELDS = "summary,description,updated,created,status,priority,assignee,reporter,labels,issuetype,project,comment,attachment"
+_SLIM_FIELDS = "key,project"
+_MAX_RESULTS_FETCH_IDS = 5000
+_JIRA_SLIM_PAGE_SIZE = 500
+_JIRA_FULL_PAGE_SIZE = 50
+_DEFAULT_ATTACHMENT_SIZE_LIMIT = 10 * 1024 * 1024 # 10MB
+
+
+class JiraCheckpoint(ConnectorCheckpoint):
+ """Checkpoint that tracks which slice of the current JQL result set was emitted."""
+
+ start_at: int = 0
+ cursor: str | None = None
+ ids_done: bool = False
+ all_issue_ids: list[list[str]] = Field(default_factory=list)
+
+
+_TZ_OFFSET_PATTERN = re.compile(r"([+-])(\d{2})(:?)(\d{2})$")
+
+
+class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync):
+ """Retrieve Jira issues and emit them as markdown documents."""
+
+ def __init__(
+ self,
+ jira_base_url: str,
+ project_key: str | None = None,
+ jql_query: str | None = None,
+ batch_size: int = INDEX_BATCH_SIZE,
+ include_comments: bool = True,
+ include_attachments: bool = False,
+ labels_to_skip: Sequence[str] | None = None,
+ comment_email_blacklist: Sequence[str] | None = None,
+ scoped_token: bool = False,
+ attachment_size_limit: int | None = None,
+ timezone_offset: float | None = None,
+ ) -> None:
+ if not jira_base_url:
+ raise ConnectorValidationError("Jira base URL must be provided.")
+
+ self.jira_base_url = jira_base_url.rstrip("/")
+ self.project_key = project_key
+ self.jql_query = jql_query
+ self.batch_size = batch_size
+ self.include_comments = include_comments
+ self.include_attachments = include_attachments
+ configured_labels = labels_to_skip or JIRA_CONNECTOR_LABELS_TO_SKIP
+ self.labels_to_skip = {label.lower() for label in configured_labels}
+ self.comment_email_blacklist = {email.lower() for email in comment_email_blacklist or []}
+ self.scoped_token = scoped_token
+ self.jira_client: JIRA | None = None
+
+ self.max_ticket_size = JIRA_CONNECTOR_MAX_TICKET_SIZE
+ self.attachment_size_limit = attachment_size_limit if attachment_size_limit and attachment_size_limit > 0 else _DEFAULT_ATTACHMENT_SIZE_LIMIT
+ self._fields_param = _DEFAULT_FIELDS
+ self._slim_fields = _SLIM_FIELDS
+
+ tz_offset_value = float(timezone_offset) if timezone_offset is not None else float(JIRA_TIMEZONE_OFFSET)
+ self.timezone_offset = tz_offset_value
+ self.timezone = timezone(offset=timedelta(hours=tz_offset_value))
+ self._timezone_overridden = timezone_offset is not None
+
+ # -------------------------------------------------------------------------
+ # Connector lifecycle helpers
+ # -------------------------------------------------------------------------
+
+ def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
+ """Instantiate the Jira client using either an API token or username/password."""
+ jira_url_for_client = self.jira_base_url
+ if self.scoped_token:
+ if is_atlassian_cloud_url(self.jira_base_url):
+ try:
+ jira_url_for_client = scoped_url(self.jira_base_url, "jira")
+ except ValueError as exc:
+ raise ConnectorValidationError(str(exc)) from exc
+ else:
+ logger.warning(f"[Jira] Scoped token requested but Jira base URL {self.jira_base_url} does not appear to be an Atlassian Cloud domain; scoped token ignored.")
+
+ user_email = credentials.get("jira_user_email") or credentials.get("username")
+ api_token = credentials.get("jira_api_token") or credentials.get("token") or credentials.get("api_token")
+ password = credentials.get("jira_password") or credentials.get("password")
+ rest_api_version = credentials.get("rest_api_version")
+
+ if not rest_api_version:
+ rest_api_version = JIRA_CLOUD_API_VERSION if api_token else JIRA_SERVER_API_VERSION
+ options: dict[str, Any] = {"rest_api_version": rest_api_version}
+
+ try:
+ if user_email and api_token:
+ self.jira_client = JIRA(
+ server=jira_url_for_client,
+ basic_auth=(user_email, api_token),
+ options=options,
+ )
+ elif api_token:
+ self.jira_client = JIRA(
+ server=jira_url_for_client,
+ token_auth=api_token,
+ options=options,
+ )
+ elif user_email and password:
+ self.jira_client = JIRA(
+ server=jira_url_for_client,
+ basic_auth=(user_email, password),
+ options=options,
+ )
+ else:
+ raise ConnectorMissingCredentialError("Jira credentials must include either an API token or username/password.")
+ except Exception as exc: # pragma: no cover - jira lib raises many types
+ raise ConnectorMissingCredentialError(f"Jira: {exc}") from exc
+ self._sync_timezone_from_server()
+ return None
+
+ def validate_connector_settings(self) -> None:
+ """Validate connectivity by fetching basic Jira info."""
+ if not self.jira_client:
+ raise ConnectorMissingCredentialError("Jira")
+
+ try:
+ if self.jql_query:
+ dummy_checkpoint = self.build_dummy_checkpoint()
+ checkpoint_callback = self._make_checkpoint_callback(dummy_checkpoint)
+ iterator = self._perform_jql_search(
+ jql=self.jql_query,
+ start=0,
+ max_results=1,
+ fields="key",
+ all_issue_ids=dummy_checkpoint.all_issue_ids,
+ checkpoint_callback=checkpoint_callback,
+ next_page_token=dummy_checkpoint.cursor,
+ ids_done=dummy_checkpoint.ids_done,
+ )
+ next(iter(iterator), None)
+ elif self.project_key:
+ self.jira_client.project(self.project_key)
+ else:
+ self.jira_client.projects()
+ except Exception as exc: # pragma: no cover - dependent on Jira responses
+ self._handle_validation_error(exc)
+
+ # -------------------------------------------------------------------------
+ # Checkpointed connector implementation
+ # -------------------------------------------------------------------------
+
+ def load_from_checkpoint(
+ self,
+ start: SecondsSinceUnixEpoch,
+ end: SecondsSinceUnixEpoch,
+ checkpoint: JiraCheckpoint,
+ ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]:
+ """Load Jira issues, emitting a Document per issue."""
+ try:
+ return (yield from self._load_with_retry(start, end, checkpoint))
+ except Exception as exc:
+ logger.exception(f"[Jira] Jira query ultimately failed: {exc}")
+ yield ConnectorFailure(
+ failure_message=f"Failed to query Jira: {exc}",
+ exception=exc,
+ )
+ return JiraCheckpoint(has_more=False, start_at=checkpoint.start_at)
+
+ def load_from_checkpoint_with_perm_sync(
+ self,
+ start: SecondsSinceUnixEpoch,
+ end: SecondsSinceUnixEpoch,
+ checkpoint: JiraCheckpoint,
+ ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]:
+ """Permissions are not synced separately, so reuse the standard loader."""
+ return (yield from self.load_from_checkpoint(start=start, end=end, checkpoint=checkpoint))
+
+ def _load_with_retry(
+ self,
+ start: SecondsSinceUnixEpoch,
+ end: SecondsSinceUnixEpoch,
+ checkpoint: JiraCheckpoint,
+ ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]:
+ if not self.jira_client:
+ raise ConnectorMissingCredentialError("Jira")
+
+ attempt_start = start
+ retried_with_buffer = False
+ attempt = 0
+
+ while True:
+ attempt += 1
+ jql = self._build_jql(attempt_start, end)
+ logger.info(f"[Jira] Executing Jira JQL attempt {attempt} (start={attempt_start}, end={end}, buffered_retry={retried_with_buffer}): {jql}")
+ try:
+ return (yield from self._load_from_checkpoint_internal(jql, checkpoint, start_filter=start))
+ except Exception as exc:
+ if attempt_start is not None and not retried_with_buffer and is_atlassian_date_error(exc):
+ attempt_start = attempt_start - ONE_HOUR
+ retried_with_buffer = True
+ logger.info(f"[Jira] Atlassian date error detected; retrying with start={attempt_start}.")
+ continue
+ raise
+
+ def _handle_validation_error(self, exc: Exception) -> None:
+ status_code = getattr(exc, "status_code", None)
+ if status_code == 401:
+ raise InsufficientPermissionsError("Jira credential appears to be invalid or expired (HTTP 401).") from exc
+ if status_code == 403:
+ raise InsufficientPermissionsError("Jira token does not have permission to access the requested resources (HTTP 403).") from exc
+ if status_code == 404:
+ raise ConnectorValidationError("Jira resource not found (HTTP 404).") from exc
+ if status_code == 429:
+ raise ConnectorValidationError("Jira rate limit exceeded during validation (HTTP 429).") from exc
+
+ message = getattr(exc, "text", str(exc))
+ if not message:
+ raise UnexpectedValidationError("Unexpected Jira validation error.") from exc
+
+ raise ConnectorValidationError(f"Jira validation failed: {message}") from exc
+
+ def _load_from_checkpoint_internal(
+ self,
+ jql: str,
+ checkpoint: JiraCheckpoint,
+ start_filter: SecondsSinceUnixEpoch | None = None,
+ ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]:
+ assert self.jira_client, "load_credentials must be called before loading issues."
+
+ page_size = self._full_page_size()
+ new_checkpoint = copy.deepcopy(checkpoint)
+ starting_offset = new_checkpoint.start_at or 0
+ current_offset = starting_offset
+ checkpoint_callback = self._make_checkpoint_callback(new_checkpoint)
+
+ issue_iter = self._perform_jql_search(
+ jql=jql,
+ start=current_offset,
+ max_results=page_size,
+ fields=self._fields_param,
+ all_issue_ids=new_checkpoint.all_issue_ids,
+ checkpoint_callback=checkpoint_callback,
+ next_page_token=new_checkpoint.cursor,
+ ids_done=new_checkpoint.ids_done,
+ )
+
+ start_cutoff = float(start_filter) if start_filter is not None else None
+
+ for issue in issue_iter:
+ current_offset += 1
+ issue_key = getattr(issue, "key", "unknown")
+ if should_skip_issue(issue, self.labels_to_skip):
+ continue
+
+ issue_updated = parse_jira_datetime(issue.raw.get("fields", {}).get("updated"))
+ if start_cutoff is not None and issue_updated is not None and issue_updated.timestamp() <= start_cutoff:
+ # Jira JQL only supports minute precision, so we discard already-processed
+ # issues here based on the original second-level cutoff.
+ continue
+
+ try:
+ document = self._issue_to_document(issue)
+ except Exception as exc: # pragma: no cover - defensive
+ logger.exception(f"[Jira] Failed to convert Jira issue {issue_key}: {exc}")
+ yield ConnectorFailure(
+ failure_message=f"Failed to convert Jira issue {issue_key}: {exc}",
+ failed_document=DocumentFailure(
+ document_id=issue_key,
+ document_link=build_issue_url(self.jira_base_url, issue_key),
+ ),
+ exception=exc,
+ )
+ continue
+
+ if document is not None:
+ yield document
+ if self.include_attachments:
+ for attachment_document in self._attachment_documents(issue):
+ if attachment_document is not None:
+ yield attachment_document
+
+ self._update_checkpoint_for_next_run(
+ checkpoint=new_checkpoint,
+ current_offset=current_offset,
+ starting_offset=starting_offset,
+ page_size=page_size,
+ )
+ new_checkpoint.start_at = current_offset
+ return new_checkpoint
+
+ def build_dummy_checkpoint(self) -> JiraCheckpoint:
+ """Create an empty checkpoint used to kick off ingestion."""
+ return JiraCheckpoint(has_more=True, start_at=0)
+
+ def validate_checkpoint_json(self, checkpoint_json: str) -> JiraCheckpoint:
+ """Validate a serialized checkpoint."""
+ return JiraCheckpoint.model_validate_json(checkpoint_json)
+
+ # -------------------------------------------------------------------------
+ # Slim connector implementation
+ # -------------------------------------------------------------------------
+
+ def retrieve_all_slim_docs_perm_sync(
+ self,
+ start: SecondsSinceUnixEpoch | None = None,
+ end: SecondsSinceUnixEpoch | None = None,
+ callback: Any = None, # noqa: ARG002 - maintained for interface compatibility
+ ) -> Generator[list[SlimDocument], None, None]:
+ """Return lightweight references to Jira issues (used for permission syncing)."""
+ if not self.jira_client:
+ raise ConnectorMissingCredentialError("Jira")
+
+ start_ts = start if start is not None else 0
+ end_ts = end if end is not None else datetime.now(timezone.utc).timestamp()
+ jql = self._build_jql(start_ts, end_ts)
+
+ checkpoint = self.build_dummy_checkpoint()
+ checkpoint_callback = self._make_checkpoint_callback(checkpoint)
+ prev_offset = 0
+ current_offset = 0
+ slim_batch: list[SlimDocument] = []
+
+ while checkpoint.has_more:
+ for issue in self._perform_jql_search(
+ jql=jql,
+ start=current_offset,
+ max_results=_JIRA_SLIM_PAGE_SIZE,
+ fields=self._slim_fields,
+ all_issue_ids=checkpoint.all_issue_ids,
+ checkpoint_callback=checkpoint_callback,
+ next_page_token=checkpoint.cursor,
+ ids_done=checkpoint.ids_done,
+ ):
+ current_offset += 1
+ if should_skip_issue(issue, self.labels_to_skip):
+ continue
+
+ doc_id = build_issue_url(self.jira_base_url, issue.key)
+ slim_batch.append(SlimDocument(id=doc_id))
+
+ if len(slim_batch) >= _JIRA_SLIM_PAGE_SIZE:
+ yield slim_batch
+ slim_batch = []
+
+ self._update_checkpoint_for_next_run(
+ checkpoint=checkpoint,
+ current_offset=current_offset,
+ starting_offset=prev_offset,
+ page_size=_JIRA_SLIM_PAGE_SIZE,
+ )
+ prev_offset = current_offset
+
+ if slim_batch:
+ yield slim_batch
+
+ # -------------------------------------------------------------------------
+ # Internal helpers
+ # -------------------------------------------------------------------------
+
+ def _build_jql(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> str:
+ clauses: list[str] = []
+ if self.jql_query:
+ clauses.append(f"({self.jql_query})")
+ elif self.project_key:
+ clauses.append(f'project = "{self.project_key}"')
+ else:
+ raise ConnectorValidationError("Either project_key or jql_query must be provided for Jira connector.")
+
+ if self.labels_to_skip:
+ labels = ", ".join(f'"{label}"' for label in self.labels_to_skip)
+ clauses.append(f"labels NOT IN ({labels})")
+
+ if start is not None:
+ clauses.append(f'updated >= "{self._format_jql_time(start)}"')
+ if end is not None:
+ clauses.append(f'updated <= "{self._format_jql_time(end)}"')
+
+ if not clauses:
+ raise ConnectorValidationError("Unable to build Jira JQL query.")
+
+ jql = " AND ".join(clauses)
+ if "order by" not in jql.lower():
+ jql = f"{jql} ORDER BY updated ASC"
+ return jql
+
+ def _format_jql_time(self, timestamp: SecondsSinceUnixEpoch) -> str:
+ dt_utc = datetime.fromtimestamp(float(timestamp), tz=timezone.utc)
+ dt_local = dt_utc.astimezone(self.timezone)
+ # Jira only accepts minute-precision timestamps in JQL, so we format accordingly
+ # and rely on a post-query second-level filter to avoid duplicates.
+ return dt_local.strftime("%Y-%m-%d %H:%M")
+
+ def _issue_to_document(self, issue: Issue) -> Document | None:
+ fields = issue.raw.get("fields", {})
+ summary = fields.get("summary") or ""
+ description_text = extract_body_text(fields.get("description"))
+ comments_text = (
+ format_comments(
+ fields.get("comment"),
+ blacklist=self.comment_email_blacklist,
+ )
+ if self.include_comments
+ else ""
+ )
+ attachments_text = format_attachments(fields.get("attachment"))
+
+ reporter_name, reporter_email = extract_user(fields.get("reporter"))
+ assignee_name, assignee_email = extract_user(fields.get("assignee"))
+ status = extract_named_value(fields.get("status"))
+ priority = extract_named_value(fields.get("priority"))
+ issue_type = extract_named_value(fields.get("issuetype"))
+ project = fields.get("project") or {}
+
+ issue_url = build_issue_url(self.jira_base_url, issue.key)
+
+ metadata_lines = [
+ f"key: {issue.key}",
+ f"url: {issue_url}",
+ f"summary: {summary}",
+ f"status: {status or 'Unknown'}",
+ f"priority: {priority or 'Unspecified'}",
+ f"issue_type: {issue_type or 'Unknown'}",
+ f"project: {project.get('name') or ''}",
+ f"project_key: {project.get('key') or self.project_key or ''}",
+ ]
+
+ if reporter_name:
+ metadata_lines.append(f"reporter: {reporter_name}")
+ if reporter_email:
+ metadata_lines.append(f"reporter_email: {reporter_email}")
+ if assignee_name:
+ metadata_lines.append(f"assignee: {assignee_name}")
+ if assignee_email:
+ metadata_lines.append(f"assignee_email: {assignee_email}")
+ if fields.get("labels"):
+ metadata_lines.append(f"labels: {', '.join(fields.get('labels'))}")
+
+ created_dt = parse_jira_datetime(fields.get("created"))
+ updated_dt = parse_jira_datetime(fields.get("updated")) or created_dt or datetime.now(timezone.utc)
+ metadata_lines.append(f"created: {created_dt.isoformat() if created_dt else ''}")
+ metadata_lines.append(f"updated: {updated_dt.isoformat() if updated_dt else ''}")
+
+ sections: list[str] = [
+ "---",
+ "\n".join(filter(None, metadata_lines)),
+ "---",
+ "",
+ "## Description",
+ description_text or "No description provided.",
+ ]
+
+ if comments_text:
+ sections.extend(["", "## Comments", comments_text])
+ if attachments_text:
+ sections.extend(["", "## Attachments", attachments_text])
+
+ blob_text = "\n".join(sections).strip() + "\n"
+ blob = blob_text.encode("utf-8")
+
+ if len(blob) > self.max_ticket_size:
+ logger.info(f"[Jira] Skipping {issue.key} because it exceeds the maximum size of {self.max_ticket_size} bytes.")
+ return None
+
+ semantic_identifier = f"{issue.key}: {summary}" if summary else issue.key
+
+ return Document(
+ id=issue_url,
+ source=DocumentSource.JIRA,
+ semantic_identifier=semantic_identifier,
+ extension=".md",
+ blob=blob,
+ doc_updated_at=updated_dt,
+ size_bytes=len(blob),
+ )
+
+ def _attachment_documents(self, issue: Issue) -> Iterable[Document]:
+ attachments = issue.raw.get("fields", {}).get("attachment") or []
+ for attachment in attachments:
+ try:
+ document = self._attachment_to_document(issue, attachment)
+ if document is not None:
+ yield document
+ except Exception as exc: # pragma: no cover - defensive
+ failed_id = attachment.get("id") or attachment.get("filename")
+ issue_key = getattr(issue, "key", "unknown")
+ logger.warning(f"[Jira] Failed to process attachment {failed_id} for issue {issue_key}: {exc}")
+
+ def _attachment_to_document(self, issue: Issue, attachment: dict[str, Any]) -> Document | None:
+ if not self.include_attachments:
+ return None
+
+ filename = attachment.get("filename")
+ content_url = attachment.get("content")
+ if not filename or not content_url:
+ return None
+
+ try:
+ attachment_size = int(attachment.get("size", 0))
+ except (TypeError, ValueError):
+ attachment_size = 0
+ if attachment_size and attachment_size > self.attachment_size_limit:
+ logger.info(f"[Jira] Skipping attachment {filename} on {issue.key} because reported size exceeds limit ({self.attachment_size_limit} bytes).")
+ return None
+
+ blob = self._download_attachment(content_url)
+ if blob is None:
+ return None
+
+ if len(blob) > self.attachment_size_limit:
+ logger.info(f"[Jira] Skipping attachment {filename} on {issue.key} because it exceeds the size limit ({self.attachment_size_limit} bytes).")
+ return None
+
+ attachment_time = parse_jira_datetime(attachment.get("created")) or parse_jira_datetime(attachment.get("updated"))
+ updated_dt = attachment_time or parse_jira_datetime(issue.raw.get("fields", {}).get("updated")) or datetime.now(timezone.utc)
+
+ extension = os.path.splitext(filename)[1] or ""
+ document_id = f"{issue.key}::attachment::{attachment.get('id') or filename}"
+ semantic_identifier = f"{issue.key} attachment: {filename}"
+
+ return Document(
+ id=document_id,
+ source=DocumentSource.JIRA,
+ semantic_identifier=semantic_identifier,
+ extension=extension,
+ blob=blob,
+ doc_updated_at=updated_dt,
+ size_bytes=len(blob),
+ )
+
+ def _download_attachment(self, url: str) -> bytes | None:
+ if not self.jira_client:
+ raise ConnectorMissingCredentialError("Jira")
+ response = self.jira_client._session.get(url)
+ response.raise_for_status()
+ return response.content
+
+ def _sync_timezone_from_server(self) -> None:
+ if self._timezone_overridden or not self.jira_client:
+ return
+ try:
+ server_info = self.jira_client.server_info()
+ except Exception as exc: # pragma: no cover - defensive
+ logger.info(f"[Jira] Unable to determine timezone from server info; continuing with offset {self.timezone_offset}. Error: {exc}")
+ return
+
+ detected_offset = self._extract_timezone_offset(server_info)
+ if detected_offset is None or detected_offset == self.timezone_offset:
+ return
+
+ self.timezone_offset = detected_offset
+ self.timezone = timezone(offset=timedelta(hours=detected_offset))
+ logger.info(f"[Jira] Timezone offset adjusted to {detected_offset} hours using Jira server info.")
+
+ def _extract_timezone_offset(self, server_info: dict[str, Any]) -> float | None:
+ server_time_raw = server_info.get("serverTime")
+ if isinstance(server_time_raw, str):
+ offset = self._parse_offset_from_datetime_string(server_time_raw)
+ if offset is not None:
+ return offset
+
+ tz_name = server_info.get("timeZone")
+ if isinstance(tz_name, str):
+ offset = self._offset_from_zone_name(tz_name)
+ if offset is not None:
+ return offset
+ return None
+
+ @staticmethod
+ def _parse_offset_from_datetime_string(value: str) -> float | None:
+ normalized = JiraConnector._normalize_datetime_string(value)
+ try:
+ dt = datetime.fromisoformat(normalized)
+ except ValueError:
+ return None
+
+ if dt.tzinfo is None:
+ return 0.0
+
+ offset = dt.tzinfo.utcoffset(dt)
+ if offset is None:
+ return None
+ return offset.total_seconds() / 3600.0
+
+ @staticmethod
+ def _normalize_datetime_string(value: str) -> str:
+ trimmed = (value or "").strip()
+ if trimmed.endswith("Z"):
+ return f"{trimmed[:-1]}+00:00"
+
+ match = _TZ_OFFSET_PATTERN.search(trimmed)
+ if match and match.group(3) != ":":
+ sign, hours, _, minutes = match.groups()
+ trimmed = f"{trimmed[: match.start()]}{sign}{hours}:{minutes}"
+ return trimmed
+
+ @staticmethod
+ def _offset_from_zone_name(name: str) -> float | None:
+ try:
+ tz = ZoneInfo(name)
+ except (ZoneInfoNotFoundError, ValueError):
+ return None
+ reference = datetime.now(tz)
+ offset = reference.utcoffset()
+ if offset is None:
+ return None
+ return offset.total_seconds() / 3600.0
+
+ def _is_cloud_client(self) -> bool:
+ if not self.jira_client:
+ return False
+ rest_version = str(self.jira_client._options.get("rest_api_version", "")).strip()
+ return rest_version == str(JIRA_CLOUD_API_VERSION)
+
+ def _full_page_size(self) -> int:
+ return max(1, min(self.batch_size, _JIRA_FULL_PAGE_SIZE))
+
+ def _perform_jql_search(
+ self,
+ *,
+ jql: str,
+ start: int,
+ max_results: int,
+ fields: str | None = None,
+ all_issue_ids: list[list[str]] | None = None,
+ checkpoint_callback: Callable[[Iterable[list[str]], str | None], None] | None = None,
+ next_page_token: str | None = None,
+ ids_done: bool = False,
+ ) -> Iterable[Issue]:
+ assert self.jira_client, "Jira client not initialized."
+ is_cloud = self._is_cloud_client()
+ if is_cloud:
+ if all_issue_ids is None:
+ raise ValueError("all_issue_ids is required for Jira Cloud searches.")
+ yield from self._perform_jql_search_v3(
+ jql=jql,
+ max_results=max_results,
+ fields=fields,
+ all_issue_ids=all_issue_ids,
+ checkpoint_callback=checkpoint_callback,
+ next_page_token=next_page_token,
+ ids_done=ids_done,
+ )
+ else:
+ yield from self._perform_jql_search_v2(
+ jql=jql,
+ start=start,
+ max_results=max_results,
+ fields=fields,
+ )
+
+ def _perform_jql_search_v3(
+ self,
+ *,
+ jql: str,
+ max_results: int,
+ all_issue_ids: list[list[str]],
+ fields: str | None = None,
+ checkpoint_callback: Callable[[Iterable[list[str]], str | None], None] | None = None,
+ next_page_token: str | None = None,
+ ids_done: bool = False,
+ ) -> Iterable[Issue]:
+ assert self.jira_client, "Jira client not initialized."
+
+ if not ids_done:
+ new_ids, page_token = self._enhanced_search_ids(jql, next_page_token)
+ if checkpoint_callback is not None and new_ids:
+ checkpoint_callback(
+ self._chunk_issue_ids(new_ids, max_results),
+ page_token,
+ )
+ elif checkpoint_callback is not None:
+ checkpoint_callback([], page_token)
+
+ if all_issue_ids:
+ issue_ids = all_issue_ids.pop()
+ if issue_ids:
+ yield from self._bulk_fetch_issues(issue_ids, fields)
+
+ def _perform_jql_search_v2(
+ self,
+ *,
+ jql: str,
+ start: int,
+ max_results: int,
+ fields: str | None = None,
+ ) -> Iterable[Issue]:
+ assert self.jira_client, "Jira client not initialized."
+
+ issues = self.jira_client.search_issues(
+ jql_str=jql,
+ startAt=start,
+ maxResults=max_results,
+ fields=fields or self._fields_param,
+ expand="renderedFields",
+ )
+ for issue in issues:
+ yield issue
+
+ def _enhanced_search_ids(
+ self,
+ jql: str,
+ next_page_token: str | None,
+ ) -> tuple[list[str], str | None]:
+ assert self.jira_client, "Jira client not initialized."
+ enhanced_search_path = self.jira_client._get_url("search/jql")
+ params: dict[str, str | int | None] = {
+ "jql": jql,
+ "maxResults": _MAX_RESULTS_FETCH_IDS,
+ "nextPageToken": next_page_token,
+ "fields": "id",
+ }
+ response = self.jira_client._session.get(enhanced_search_path, params=params)
+ response.raise_for_status()
+ data = response.json()
+ return [str(issue["id"]) for issue in data.get("issues", [])], data.get("nextPageToken")
+
+ def _bulk_fetch_issues(
+ self,
+ issue_ids: list[str],
+ fields: str | None,
+ ) -> Iterable[Issue]:
+ assert self.jira_client, "Jira client not initialized."
+ if not issue_ids:
+ return []
+
+ bulk_fetch_path = self.jira_client._get_url("issue/bulkfetch")
+ payload: dict[str, Any] = {"issueIdsOrKeys": issue_ids}
+ payload["fields"] = fields.split(",") if fields else ["*all"]
+
+ response = self.jira_client._session.post(bulk_fetch_path, json=payload)
+ response.raise_for_status()
+ data = response.json()
+ return [Issue(self.jira_client._options, self.jira_client._session, raw=issue) for issue in data.get("issues", [])]
+
+ @staticmethod
+ def _chunk_issue_ids(issue_ids: list[str], chunk_size: int) -> Iterable[list[str]]:
+ if chunk_size <= 0:
+ chunk_size = _JIRA_FULL_PAGE_SIZE
+
+ for idx in range(0, len(issue_ids), chunk_size):
+ yield issue_ids[idx : idx + chunk_size]
+
+ def _make_checkpoint_callback(self, checkpoint: JiraCheckpoint) -> Callable[[Iterable[list[str]], str | None], None]:
+ def checkpoint_callback(
+ issue_ids: Iterable[list[str]] | list[list[str]],
+ page_token: str | None,
+ ) -> None:
+ for id_batch in issue_ids:
+ checkpoint.all_issue_ids.append(list(id_batch))
+ checkpoint.cursor = page_token
+ checkpoint.ids_done = page_token is None
+
+ return checkpoint_callback
+
+ def _update_checkpoint_for_next_run(
+ self,
+ *,
+ checkpoint: JiraCheckpoint,
+ current_offset: int,
+ starting_offset: int,
+ page_size: int,
+ ) -> None:
+ if self._is_cloud_client():
+ checkpoint.has_more = bool(checkpoint.all_issue_ids) or not checkpoint.ids_done
+ else:
+ checkpoint.has_more = current_offset - starting_offset == page_size
+ checkpoint.cursor = None
+ checkpoint.ids_done = True
+ checkpoint.all_issue_ids = []
+
+
+def iterate_jira_documents(
+ connector: "JiraConnector",
+ start: SecondsSinceUnixEpoch,
+ end: SecondsSinceUnixEpoch,
+ iteration_limit: int = 100_000,
+) -> Iterator[Document]:
+ """Yield documents without materializing the entire result set."""
+
+ checkpoint = connector.build_dummy_checkpoint()
+ iterations = 0
+
+ while checkpoint.has_more:
+ wrapper = CheckpointOutputWrapper[JiraCheckpoint]()
+ generator = wrapper(connector.load_from_checkpoint(start=start, end=end, checkpoint=checkpoint))
+
+ for document, failure, next_checkpoint in generator:
+ if failure is not None:
+ failure_message = getattr(failure, "failure_message", str(failure))
+ raise RuntimeError(f"Failed to load Jira documents: {failure_message}")
+ if document is not None:
+ yield document
+ if next_checkpoint is not None:
+ checkpoint = next_checkpoint
+
+ iterations += 1
+ if iterations > iteration_limit:
+ raise RuntimeError("Too many iterations while loading Jira documents.")
+
+
+def test_jira(
+ *,
+ base_url: str,
+ project_key: str | None = None,
+ jql_query: str | None = None,
+ credentials: dict[str, Any],
+ batch_size: int = INDEX_BATCH_SIZE,
+ start_ts: float | None = None,
+ end_ts: float | None = None,
+ connector_options: dict[str, Any] | None = None,
+) -> list[Document]:
+ """Programmatic entry point that mirrors the CLI workflow."""
+
+ connector_kwargs = connector_options.copy() if connector_options else {}
+ connector = JiraConnector(
+ jira_base_url=base_url,
+ project_key=project_key,
+ jql_query=jql_query,
+ batch_size=batch_size,
+ **connector_kwargs,
+ )
+ connector.load_credentials(credentials)
+ connector.validate_connector_settings()
+
+ now_ts = datetime.now(timezone.utc).timestamp()
+ start = start_ts if start_ts is not None else 0.0
+ end = end_ts if end_ts is not None else now_ts
+
+ documents = list(iterate_jira_documents(connector, start=start, end=end))
+ logger.info(f"[Jira] Fetched {len(documents)} Jira documents.")
+ for doc in documents[:5]:
+ logger.info(f"[Jira] Document {doc.semantic_identifier} ({doc.id}) size={doc.size_bytes} bytes")
+ return documents
+
+
+def _build_arg_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(description="Fetch Jira issues and print summary statistics.")
+ parser.add_argument("--base-url", dest="base_url", default=os.environ.get("JIRA_BASE_URL"))
+ parser.add_argument("--project", dest="project_key", default=os.environ.get("JIRA_PROJECT_KEY"))
+ parser.add_argument("--jql", dest="jql_query", default=os.environ.get("JIRA_JQL"))
+ parser.add_argument("--email", dest="user_email", default=os.environ.get("JIRA_USER_EMAIL"))
+ parser.add_argument("--token", dest="api_token", default=os.environ.get("JIRA_API_TOKEN"))
+ parser.add_argument("--password", dest="password", default=os.environ.get("JIRA_PASSWORD"))
+ parser.add_argument("--batch-size", dest="batch_size", type=int, default=int(os.environ.get("JIRA_BATCH_SIZE", INDEX_BATCH_SIZE)))
+ parser.add_argument("--include_comments", dest="include_comments", type=bool, default=True)
+ parser.add_argument("--include_attachments", dest="include_attachments", type=bool, default=True)
+ parser.add_argument("--attachment_size_limit", dest="attachment_size_limit", type=float, default=_DEFAULT_ATTACHMENT_SIZE_LIMIT)
+ parser.add_argument("--start-ts", dest="start_ts", type=float, default=None, help="Epoch seconds inclusive lower bound for updated issues.")
+ parser.add_argument("--end-ts", dest="end_ts", type=float, default=9999999999, help="Epoch seconds inclusive upper bound for updated issues.")
+ return parser
+
+
+def main(config: dict[str, Any] | None = None) -> None:
+ if config is None:
+ args = _build_arg_parser().parse_args()
+ config = {
+ "base_url": args.base_url,
+ "project_key": args.project_key,
+ "jql_query": args.jql_query,
+ "batch_size": args.batch_size,
+ "start_ts": args.start_ts,
+ "end_ts": args.end_ts,
+ "include_comments": args.include_comments,
+ "include_attachments": args.include_attachments,
+ "attachment_size_limit": args.attachment_size_limit,
+ "credentials": {
+ "jira_user_email": args.user_email,
+ "jira_api_token": args.api_token,
+ "jira_password": args.password,
+ },
+ }
+
+ base_url = config.get("base_url")
+ credentials = config.get("credentials", {})
+
+ print(f"[Jira] {config=}", flush=True)
+ print(f"[Jira] {credentials=}", flush=True)
+
+ if not base_url:
+ raise RuntimeError("Jira base URL must be provided via config or CLI arguments.")
+ if not (credentials.get("jira_api_token") or (credentials.get("jira_user_email") and credentials.get("jira_password"))):
+ raise RuntimeError("Provide either an API token or both email/password for Jira authentication.")
+
+ connector_options = {
+ key: value
+ for key, value in (
+ ("include_comments", config.get("include_comments")),
+ ("include_attachments", config.get("include_attachments")),
+ ("attachment_size_limit", config.get("attachment_size_limit")),
+ ("labels_to_skip", config.get("labels_to_skip")),
+ ("comment_email_blacklist", config.get("comment_email_blacklist")),
+ ("scoped_token", config.get("scoped_token")),
+ ("timezone_offset", config.get("timezone_offset")),
+ )
+ if value is not None
+ }
+
+ documents = test_jira(
+ base_url=base_url,
+ project_key=config.get("project_key"),
+ jql_query=config.get("jql_query"),
+ credentials=credentials,
+ batch_size=config.get("batch_size", INDEX_BATCH_SIZE),
+ start_ts=config.get("start_ts"),
+ end_ts=config.get("end_ts"),
+ connector_options=connector_options,
+ )
+
+ preview_count = min(len(documents), 5)
+ for idx in range(preview_count):
+ doc = documents[idx]
+ print(f"[Jira] [Sample {idx + 1}] {doc.semantic_identifier} | id={doc.id} | size={doc.size_bytes} bytes")
+
+ print(f"[Jira] Jira connector test completed. Documents fetched: {len(documents)}")
+
+
+if __name__ == "__main__": # pragma: no cover - manual execution path
+ logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s")
+ main()
diff --git a/common/data_source/jira/utils.py b/common/data_source/jira/utils.py
new file mode 100644
index 000000000..62219d36d
--- /dev/null
+++ b/common/data_source/jira/utils.py
@@ -0,0 +1,149 @@
+"""Helper utilities for the Jira connector."""
+
+from __future__ import annotations
+
+import os
+from collections.abc import Collection
+from datetime import datetime, timezone
+from typing import Any, Iterable
+
+from jira.resources import Issue
+
+from common.data_source.utils import datetime_from_string
+
+JIRA_SERVER_API_VERSION = os.environ.get("JIRA_SERVER_API_VERSION", "2")
+JIRA_CLOUD_API_VERSION = os.environ.get("JIRA_CLOUD_API_VERSION", "3")
+
+
+def build_issue_url(base_url: str, issue_key: str) -> str:
+ """Return the canonical UI URL for a Jira issue."""
+ return f"{base_url.rstrip('/')}/browse/{issue_key}"
+
+
+def parse_jira_datetime(value: Any) -> datetime | None:
+ """Best-effort parse of Jira datetime values to aware UTC datetimes."""
+ if value is None:
+ return None
+ if isinstance(value, datetime):
+ return value.astimezone(timezone.utc) if value.tzinfo else value.replace(tzinfo=timezone.utc)
+ if isinstance(value, str):
+ return datetime_from_string(value)
+ return None
+
+
+def extract_named_value(value: Any) -> str | None:
+ """Extract a readable string out of Jira's typed objects."""
+ if value is None:
+ return None
+ if isinstance(value, str):
+ return value
+ if isinstance(value, dict):
+ return value.get("name") or value.get("value")
+ return getattr(value, "name", None)
+
+
+def extract_user(value: Any) -> tuple[str | None, str | None]:
+ """Return display name + email tuple for a Jira user blob."""
+ if value is None:
+ return None, None
+ if isinstance(value, dict):
+ return value.get("displayName"), value.get("emailAddress")
+
+ display = getattr(value, "displayName", None)
+ email = getattr(value, "emailAddress", None)
+ return display, email
+
+
+def extract_text_from_adf(adf: Any) -> str:
+ """Flatten Atlassian Document Format (ADF) structures to text."""
+ texts: list[str] = []
+
+ def _walk(node: Any) -> None:
+ if node is None:
+ return
+ if isinstance(node, dict):
+ node_type = node.get("type")
+ if node_type == "text":
+ texts.append(node.get("text", ""))
+ for child in node.get("content", []):
+ _walk(child)
+ elif isinstance(node, list):
+ for child in node:
+ _walk(child)
+
+ _walk(adf)
+ return "\n".join(part for part in texts if part)
+
+
+def extract_body_text(value: Any) -> str:
+ """Normalize Jira description/comments (raw/adf/str) into plain text."""
+ if value is None:
+ return ""
+ if isinstance(value, str):
+ return value.strip()
+ if isinstance(value, dict):
+ return extract_text_from_adf(value).strip()
+ return str(value).strip()
+
+
+def format_comments(
+ comment_block: Any,
+ *,
+ blacklist: Collection[str],
+) -> str:
+ """Convert Jira comments into a markdown-ish bullet list."""
+ if not isinstance(comment_block, dict):
+ return ""
+
+ comments = comment_block.get("comments") or []
+ lines: list[str] = []
+ normalized_blacklist = {email.lower() for email in blacklist if email}
+
+ for comment in comments:
+ author = comment.get("author") or {}
+ author_email = (author.get("emailAddress") or "").lower()
+ if author_email and author_email in normalized_blacklist:
+ continue
+
+ author_name = author.get("displayName") or author.get("name") or author_email or "Unknown"
+ created = parse_jira_datetime(comment.get("created"))
+ created_str = created.isoformat() if created else "Unknown time"
+ body = extract_body_text(comment.get("body"))
+ if not body:
+ continue
+
+ lines.append(f"- {author_name} ({created_str}):\n{body}")
+
+ return "\n\n".join(lines)
+
+
+def format_attachments(attachments: Any) -> str:
+ """List Jira attachments as bullet points."""
+ if not isinstance(attachments, list):
+ return ""
+
+ attachment_lines: list[str] = []
+ for attachment in attachments:
+ filename = attachment.get("filename")
+ if not filename:
+ continue
+ size = attachment.get("size")
+ size_text = f" ({size} bytes)" if isinstance(size, int) else ""
+ content_url = attachment.get("content") or ""
+ url_suffix = f" -> {content_url}" if content_url else ""
+ attachment_lines.append(f"- {filename}{size_text}{url_suffix}")
+
+ return "\n".join(attachment_lines)
+
+
+def should_skip_issue(issue: Issue, labels_to_skip: set[str]) -> bool:
+ """Return True if the issue contains any label from the skip list."""
+ if not labels_to_skip:
+ return False
+
+ fields = getattr(issue, "raw", {}).get("fields", {})
+ labels: Iterable[str] = fields.get("labels") or []
+ for label in labels:
+ if (label or "").lower() in labels_to_skip:
+ return True
+ return False
diff --git a/common/data_source/jira_connector.py b/common/data_source/jira_connector.py
deleted file mode 100644
index 4d6f1160e..000000000
--- a/common/data_source/jira_connector.py
+++ /dev/null
@@ -1,112 +0,0 @@
-"""Jira connector"""
-
-from typing import Any
-
-from jira import JIRA
-
-from common.data_source.config import INDEX_BATCH_SIZE
-from common.data_source.exceptions import (
- ConnectorValidationError,
- InsufficientPermissionsError,
- UnexpectedValidationError, ConnectorMissingCredentialError
-)
-from common.data_source.interfaces import (
- CheckpointedConnectorWithPermSync,
- SecondsSinceUnixEpoch,
- SlimConnectorWithPermSync
-)
-from common.data_source.models import (
- ConnectorCheckpoint
-)
-
-
-class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync):
- """Jira connector for accessing Jira issues and projects"""
-
- def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
- self.batch_size = batch_size
- self.jira_client: JIRA | None = None
-
- def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
- """Load Jira credentials"""
- try:
- url = credentials.get("url")
- username = credentials.get("username")
- password = credentials.get("password")
- token = credentials.get("token")
-
- if not url:
- raise ConnectorMissingCredentialError("Jira URL is required")
-
- if token:
- # API token authentication
- self.jira_client = JIRA(server=url, token_auth=token)
- elif username and password:
- # Basic authentication
- self.jira_client = JIRA(server=url, basic_auth=(username, password))
- else:
- raise ConnectorMissingCredentialError("Jira credentials are incomplete")
-
- return None
- except Exception as e:
- raise ConnectorMissingCredentialError(f"Jira: {e}")
-
- def validate_connector_settings(self) -> None:
- """Validate Jira connector settings"""
- if not self.jira_client:
- raise ConnectorMissingCredentialError("Jira")
-
- try:
- # Test connection by getting server info
- self.jira_client.server_info()
- except Exception as e:
- if "401" in str(e) or "403" in str(e):
- raise InsufficientPermissionsError("Invalid credentials or insufficient permissions")
- elif "404" in str(e):
- raise ConnectorValidationError("Jira instance not found")
- else:
- raise UnexpectedValidationError(f"Jira validation error: {e}")
-
- def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Any:
- """Poll Jira for recent issues"""
- # Simplified implementation - in production this would handle actual polling
- return []
-
- def load_from_checkpoint(
- self,
- start: SecondsSinceUnixEpoch,
- end: SecondsSinceUnixEpoch,
- checkpoint: ConnectorCheckpoint,
- ) -> Any:
- """Load documents from checkpoint"""
- # Simplified implementation
- return []
-
- def load_from_checkpoint_with_perm_sync(
- self,
- start: SecondsSinceUnixEpoch,
- end: SecondsSinceUnixEpoch,
- checkpoint: ConnectorCheckpoint,
- ) -> Any:
- """Load documents from checkpoint with permission sync"""
- # Simplified implementation
- return []
-
- def build_dummy_checkpoint(self) -> ConnectorCheckpoint:
- """Build dummy checkpoint"""
- return ConnectorCheckpoint()
-
- def validate_checkpoint_json(self, checkpoint_json: str) -> ConnectorCheckpoint:
- """Validate checkpoint JSON"""
- # Simplified implementation
- return ConnectorCheckpoint()
-
- def retrieve_all_slim_docs_perm_sync(
- self,
- start: SecondsSinceUnixEpoch | None = None,
- end: SecondsSinceUnixEpoch | None = None,
- callback: Any = None,
- ) -> Any:
- """Retrieve all simplified documents with permission sync"""
- # Simplified implementation
- return []
\ No newline at end of file
diff --git a/common/data_source/utils.py b/common/data_source/utils.py
index 7c2cdf898..b42c3833b 100644
--- a/common/data_source/utils.py
+++ b/common/data_source/utils.py
@@ -48,17 +48,35 @@ from common.data_source.exceptions import RateLimitTriedTooManyTimesError
from common.data_source.interfaces import CT, CheckpointedConnector, CheckpointOutputWrapper, ConfluenceUser, LoadFunction, OnyxExtensionType, SecondsSinceUnixEpoch, TokenResponse
from common.data_source.models import BasicExpertInfo, Document
+_TZ_SUFFIX_PATTERN = re.compile(r"([+-])([\d:]+)$")
+
def datetime_from_string(datetime_string: str) -> datetime:
datetime_string = datetime_string.strip()
+ match_jira_format = _TZ_SUFFIX_PATTERN.search(datetime_string)
+ if match_jira_format:
+ sign, tz_field = match_jira_format.groups()
+ digits = tz_field.replace(":", "")
+
+ if digits.isdigit() and 1 <= len(digits) <= 4:
+ if len(digits) >= 3:
+ hours = digits[:-2].rjust(2, "0")
+ minutes = digits[-2:]
+ else:
+ hours = digits.rjust(2, "0")
+ minutes = "00"
+
+ normalized = f"{sign}{hours}:{minutes}"
+ datetime_string = f"{datetime_string[: match_jira_format.start()]}{normalized}"
+
# Handle the case where the datetime string ends with 'Z' (Zulu time)
- if datetime_string.endswith('Z'):
- datetime_string = datetime_string[:-1] + '+00:00'
+ if datetime_string.endswith("Z"):
+ datetime_string = datetime_string[:-1] + "+00:00"
# Handle timezone format "+0000" -> "+00:00"
- if datetime_string.endswith('+0000'):
- datetime_string = datetime_string[:-5] + '+00:00'
+ if datetime_string.endswith("+0000"):
+ datetime_string = datetime_string[:-5] + "+00:00"
datetime_object = datetime.fromisoformat(datetime_string)
@@ -480,7 +498,7 @@ def get_file_ext(file_name: str) -> str:
def is_accepted_file_ext(file_ext: str, extension_type: OnyxExtensionType) -> bool:
- image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
+ image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"}
text_extensions = {".txt", ".md", ".mdx", ".conf", ".log", ".json", ".csv", ".tsv", ".xml", ".yml", ".yaml", ".sql"}
document_extensions = {".pdf", ".docx", ".pptx", ".xlsx", ".eml", ".epub", ".html"}
@@ -902,6 +920,18 @@ def load_all_docs_from_checkpoint_connector(
)
+_ATLASSIAN_CLOUD_DOMAINS = (".atlassian.net", ".jira.com", ".jira-dev.com")
+
+
+def is_atlassian_cloud_url(url: str) -> bool:
+ try:
+ host = urlparse(url).hostname or ""
+ except ValueError:
+ return False
+ host = host.lower()
+ return any(host.endswith(domain) for domain in _ATLASSIAN_CLOUD_DOMAINS)
+
+
def get_cloudId(base_url: str) -> str:
tenant_info_url = urljoin(base_url, "/_edge/tenant_info")
response = requests.get(tenant_info_url, timeout=10)
diff --git a/common/log_utils.py b/common/log_utils.py
index e2110ebeb..abbcd286b 100644
--- a/common/log_utils.py
+++ b/common/log_utils.py
@@ -80,4 +80,4 @@ def log_exception(e, *args):
raise Exception(a.text)
else:
logging.error(str(a))
- raise e
\ No newline at end of file
+ raise e
diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py
index 3dc9a7a3c..6925eb5f7 100644
--- a/rag/svr/sync_data_source.py
+++ b/rag/svr/sync_data_source.py
@@ -20,33 +20,40 @@
import copy
+import faulthandler
+import logging
+import os
+import signal
import sys
import threading
import time
import traceback
+from datetime import datetime, timezone
from typing import Any
+import trio
+
from api.db.services.connector_service import ConnectorService, SyncLogsService
from api.db.services.knowledgebase_service import KnowledgebaseService
-from common.log_utils import init_root_logger
-from common.config_utils import show_configs
-from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector
-import logging
-import os
-from datetime import datetime, timezone
-import signal
-import trio
-import faulthandler
-from common.constants import FileSource, TaskStatus
from common import settings
-from common.versions import get_ragflow_version
+from common.config_utils import show_configs
+from common.constants import FileSource, TaskStatus
+from common.data_source import (
+ BlobStorageConnector,
+ DiscordConnector,
+ GoogleDriveConnector,
+ JiraConnector,
+ NotionConnector,
+)
+from common.data_source.config import INDEX_BATCH_SIZE
from common.data_source.confluence_connector import ConfluenceConnector
from common.data_source.interfaces import CheckpointOutputWrapper
from common.data_source.utils import load_all_docs_from_checkpoint_connector
-from common.data_source.config import INDEX_BATCH_SIZE
+from common.log_utils import init_root_logger
from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc
+from common.versions import get_ragflow_version
-MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5"))
+MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "5"))
task_limiter = trio.Semaphore(MAX_CONCURRENT_TASKS)
@@ -72,31 +79,32 @@ class SyncBase:
min_update = min([doc.doc_updated_at for doc in document_batch])
max_update = max([doc.doc_updated_at for doc in document_batch])
next_update = max([next_update, max_update])
- docs = [{
- "id": doc.id,
- "connector_id": task["connector_id"],
- "source": self.SOURCE_NAME,
- "semantic_identifier": doc.semantic_identifier,
- "extension": doc.extension,
- "size_bytes": doc.size_bytes,
- "doc_updated_at": doc.doc_updated_at,
- "blob": doc.blob
- } for doc in document_batch]
+ docs = [
+ {
+ "id": doc.id,
+ "connector_id": task["connector_id"],
+ "source": self.SOURCE_NAME,
+ "semantic_identifier": doc.semantic_identifier,
+ "extension": doc.extension,
+ "size_bytes": doc.size_bytes,
+ "doc_updated_at": doc.doc_updated_at,
+ "blob": doc.blob,
+ }
+ for doc in document_batch
+ ]
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"])
SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err))
doc_num += len(docs)
- logging.info("{} docs synchronized till {}".format(doc_num, next_update))
+ prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else ""
+ logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}")
SyncLogsService.done(task["id"], task["connector_id"])
task["poll_range_start"] = next_update
except Exception as ex:
- msg = '\n'.join([
- ''.join(traceback.format_exception_only(None, ex)).strip(),
- ''.join(traceback.format_exception(None, ex, ex.__traceback__)).strip()
- ])
+ msg = "\n".join(["".join(traceback.format_exception_only(None, ex)).strip(), "".join(traceback.format_exception(None, ex, ex.__traceback__)).strip()])
SyncLogsService.update_by_id(task["id"], {"status": TaskStatus.FAIL, "full_exception_trace": msg, "error_msg": str(ex)})
SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"])
@@ -109,21 +117,16 @@ class S3(SyncBase):
SOURCE_NAME: str = FileSource.S3
async def _generate(self, task: dict):
- self.connector = BlobStorageConnector(
- bucket_type=self.conf.get("bucket_type", "s3"),
- bucket_name=self.conf["bucket_name"],
- prefix=self.conf.get("prefix", "")
- )
+ self.connector = BlobStorageConnector(bucket_type=self.conf.get("bucket_type", "s3"), bucket_name=self.conf["bucket_name"], prefix=self.conf.get("prefix", ""))
self.connector.load_credentials(self.conf["credentials"])
- document_batch_generator = self.connector.load_from_state() if task["reindex"]=="1" or not task["poll_range_start"] \
- else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp())
+ document_batch_generator = (
+ self.connector.load_from_state()
+ if task["reindex"] == "1" or not task["poll_range_start"]
+ else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp())
+ )
- begin_info = "totally" if task["reindex"]=="1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"])
- logging.info("Connect to {}: {}(prefix/{}) {}".format(self.conf.get("bucket_type", "s3"),
- self.conf["bucket_name"],
- self.conf.get("prefix", ""),
- begin_info
- ))
+ begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"])
+ logging.info("Connect to {}: {}(prefix/{}) {}".format(self.conf.get("bucket_type", "s3"), self.conf["bucket_name"], self.conf.get("prefix", ""), begin_info))
return document_batch_generator
@@ -131,8 +134,8 @@ class Confluence(SyncBase):
SOURCE_NAME: str = FileSource.CONFLUENCE
async def _generate(self, task: dict):
- from common.data_source.interfaces import StaticCredentialsProvider
from common.data_source.config import DocumentSource
+ from common.data_source.interfaces import StaticCredentialsProvider
self.connector = ConfluenceConnector(
wiki_base=self.conf["wiki_base"],
@@ -141,11 +144,7 @@ class Confluence(SyncBase):
# page_id=self.conf.get("page_id", ""),
)
- credentials_provider = StaticCredentialsProvider(
- tenant_id=task["tenant_id"],
- connector_name=DocumentSource.CONFLUENCE,
- credential_json=self.conf["credentials"]
- )
+ credentials_provider = StaticCredentialsProvider(tenant_id=task["tenant_id"], connector_name=DocumentSource.CONFLUENCE, credential_json=self.conf["credentials"])
self.connector.set_credentials_provider(credentials_provider)
# Determine the time range for synchronization based on reindex or poll_range_start
@@ -174,10 +173,13 @@ class Notion(SyncBase):
async def _generate(self, task: dict):
self.connector = NotionConnector(root_page_id=self.conf["root_page_id"])
self.connector.load_credentials(self.conf["credentials"])
- document_generator = self.connector.load_from_state() if task["reindex"]=="1" or not task["poll_range_start"] \
- else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp())
+ document_generator = (
+ self.connector.load_from_state()
+ if task["reindex"] == "1" or not task["poll_range_start"]
+ else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp())
+ )
- begin_info = "totally" if task["reindex"]=="1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"])
+ begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"])
logging.info("Connect to Notion: root({}) {}".format(self.conf["root_page_id"], begin_info))
return document_generator
@@ -194,13 +196,16 @@ class Discord(SyncBase):
server_ids=server_ids.split(",") if server_ids else [],
channel_names=channel_names.split(",") if channel_names else [],
start_date=datetime(1970, 1, 1, tzinfo=timezone.utc).strftime("%Y-%m-%d"),
- batch_size=self.conf.get("batch_size", 1024)
+ batch_size=self.conf.get("batch_size", 1024),
)
self.connector.load_credentials(self.conf["credentials"])
- document_generator = self.connector.load_from_state() if task["reindex"]=="1" or not task["poll_range_start"] \
- else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp())
+ document_generator = (
+ self.connector.load_from_state()
+ if task["reindex"] == "1" or not task["poll_range_start"]
+ else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp())
+ )
- begin_info = "totally" if task["reindex"]=="1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"])
+ begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"])
logging.info("Connect to Discord: servers({}), channel({}) {}".format(server_ids, channel_names, begin_info))
return document_generator
@@ -285,7 +290,7 @@ class GoogleDrive(SyncBase):
admin_email = self.connector.primary_admin_email
except RuntimeError:
admin_email = "unknown"
- logging.info("Connect to Google Drive as %s %s", admin_email, begin_info)
+ logging.info(f"Connect to Google Drive as {admin_email} {begin_info}")
return document_batches()
def _persist_rotated_credentials(self, connector_id: str, credentials: dict[str, Any]) -> None:
@@ -303,7 +308,93 @@ class Jira(SyncBase):
SOURCE_NAME: str = FileSource.JIRA
async def _generate(self, task: dict):
- pass
+ connector_kwargs = {
+ "jira_base_url": self.conf["base_url"],
+ "project_key": self.conf.get("project_key"),
+ "jql_query": self.conf.get("jql_query"),
+ "batch_size": self.conf.get("batch_size", INDEX_BATCH_SIZE),
+ "include_comments": self.conf.get("include_comments", True),
+ "include_attachments": self.conf.get("include_attachments", False),
+ "labels_to_skip": self._normalize_list(self.conf.get("labels_to_skip")),
+ "comment_email_blacklist": self._normalize_list(self.conf.get("comment_email_blacklist")),
+ "scoped_token": self.conf.get("scoped_token", False),
+ "attachment_size_limit": self.conf.get("attachment_size_limit"),
+ "timezone_offset": self.conf.get("timezone_offset"),
+ }
+
+ self.connector = JiraConnector(**connector_kwargs)
+
+ credentials = self.conf.get("credentials")
+ if not credentials:
+ raise ValueError("Jira connector is missing credentials.")
+
+ self.connector.load_credentials(credentials)
+ self.connector.validate_connector_settings()
+
+ if task["reindex"] == "1" or not task["poll_range_start"]:
+ start_time = 0.0
+ begin_info = "totally"
+ else:
+ start_time = task["poll_range_start"].timestamp()
+ begin_info = f"from {task['poll_range_start']}"
+
+ end_time = datetime.now(timezone.utc).timestamp()
+
+ raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
+ try:
+ batch_size = int(raw_batch_size)
+ except (TypeError, ValueError):
+ batch_size = INDEX_BATCH_SIZE
+ if batch_size <= 0:
+ batch_size = INDEX_BATCH_SIZE
+
+ def document_batches():
+ checkpoint = self.connector.build_dummy_checkpoint()
+ pending_docs = []
+ iterations = 0
+ iteration_limit = 100_000
+
+ while checkpoint.has_more:
+ wrapper = CheckpointOutputWrapper()
+ generator = wrapper(
+ self.connector.load_from_checkpoint(
+ start_time,
+ end_time,
+ checkpoint,
+ )
+ )
+ for document, failure, next_checkpoint in generator:
+ if failure is not None:
+ logging.warning(
+ f"[Jira] Jira connector failure: {getattr(failure, 'failure_message', failure)}"
+ )
+ continue
+ if document is not None:
+ pending_docs.append(document)
+ if len(pending_docs) >= batch_size:
+ yield pending_docs
+ pending_docs = []
+ if next_checkpoint is not None:
+ checkpoint = next_checkpoint
+
+ iterations += 1
+ if iterations > iteration_limit:
+ logging.error(f"[Jira] Task {task.get('id')} exceeded iteration limit ({iteration_limit}).")
+ raise RuntimeError("Too many iterations while loading Jira documents.")
+
+ if pending_docs:
+ yield pending_docs
+
+ logging.info(f"[Jira] Connect to Jira {connector_kwargs['jira_base_url']} {begin_info}")
+ return document_batches()
+
+ @staticmethod
+ def _normalize_list(values: Any) -> list[str] | None:
+ if values is None:
+ return None
+ if isinstance(values, str):
+ values = [item.strip() for item in values.split(",")]
+ return [str(value).strip() for value in values if value is not None and str(value).strip()]
class SharePoint(SyncBase):
@@ -337,9 +428,10 @@ func_factory = {
FileSource.JIRA: Jira,
FileSource.SHAREPOINT: SharePoint,
FileSource.SLACK: Slack,
- FileSource.TEAMS: Teams
+ FileSource.TEAMS: Teams,
}
+
async def dispatch_tasks():
async with trio.open_nursery() as nursery:
while True:
@@ -385,7 +477,7 @@ async def main():
__/ |
|___/
""")
- logging.info(f'RAGFlow version: {get_ragflow_version()}')
+ logging.info(f"RAGFlow version: {get_ragflow_version()}")
show_configs()
settings.init_settings()
if sys.platform != "win32":
diff --git a/web/src/assets/svg/data-source/jira.svg b/web/src/assets/svg/data-source/jira.svg
new file mode 100644
index 000000000..8f9cd8b97
--- /dev/null
+++ b/web/src/assets/svg/data-source/jira.svg
@@ -0,0 +1,16 @@
+
diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts
index e2035a378..350a64db8 100644
--- a/web/src/locales/en.ts
+++ b/web/src/locales/en.ts
@@ -732,6 +732,33 @@ Example: general/v2/`,
'Comma-separated emails whose “My Drive” contents should be indexed (include the primary admin).',
google_driveSharedFoldersTip:
'Comma-separated Google Drive folder links to crawl.',
+ jiraDescription:
+ 'Connect your Jira workspace to sync issues, comments, and attachments.',
+ jiraBaseUrlTip:
+ 'Base URL of your Jira site (e.g., https://your-domain.atlassian.net).',
+ jiraProjectKeyTip:
+ 'Optional: limit syncing to a single project key (e.g., ENG).',
+ jiraJqlTip:
+ 'Optional JQL filter. Leave blank to rely on project/time filters.',
+ jiraBatchSizeTip:
+ 'Maximum number of issues requested from Jira per batch.',
+ jiraCommentsTip:
+ 'Include Jira comments in the generated markdown document.',
+ jiraAttachmentsTip:
+ 'Download attachments as separate documents during sync.',
+ jiraAttachmentSizeTip:
+ 'Attachments larger than this number of bytes will be skipped.',
+ jiraLabelsTip:
+ 'Labels that should be skipped while indexing (comma separated).',
+ jiraBlacklistTip:
+ 'Comments whose author email matches these entries will be ignored.',
+ jiraScopedTokenTip:
+ 'Enable this when using scoped Atlassian tokens (api.atlassian.com).',
+ jiraEmailTip: 'Email associated with the Jira account/API token.',
+ jiraTokenTip:
+ 'API token generated from https://id.atlassian.com/manage-profile/security/api-tokens.',
+ jiraPasswordTip:
+ 'Optional password for Jira Server/Data Center environments.',
availableSourcesDescription: 'Select a data source to add',
availableSources: 'Available sources',
datasourceDescription: 'Manage your data source and connections',
diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts
index 301719117..b6d25dc1f 100644
--- a/web/src/locales/zh.ts
+++ b/web/src/locales/zh.ts
@@ -716,6 +716,23 @@ General:实体和关系提取提示来自 GitHub - microsoft/graphrag:基于
'需要索引其 “我的云端硬盘” 的邮箱,多个邮箱用逗号分隔(建议包含管理员)。',
google_driveSharedFoldersTip:
'需要同步的 Google Drive 文件夹链接,多个链接用逗号分隔。',
+ jiraDescription: '接入 Jira 工作区,持续同步Issues、评论与附件。',
+ jiraBaseUrlTip:
+ 'Jira 的 Base URL,例如:https://your-domain.atlassian.net。',
+ jiraProjectKeyTip: '可选:仅同步指定的项目(如 RAG)。',
+ jiraJqlTip: '可选:自定义 JQL 过滤条件,留空则使用项目 / 时间范围。',
+ jiraBatchSizeTip: '每次向 Jira 请求的 Issue 数量上限。',
+ jiraCommentsTip: '同步评论。',
+ jiraAttachmentsTip: '开启后会将附件下载为独立文档。',
+ jiraAttachmentSizeTip: '超过该字节阈值的附件会被跳过。',
+ jiraLabelsTip: '需要跳过的标签(逗号分隔)。',
+ jiraBlacklistTip: '这些邮箱作者的评论会被忽略。',
+ jiraScopedTokenTip:
+ '仅当凭证为 Atlassian scoped token(api.atlassian.com)时生效。',
+ jiraEmailTip: '与 API Token 对应的 Jira 账户邮箱。',
+ jiraTokenTip:
+ '在 https://id.atlassian.com/manage-profile/security/api-tokens 生成的 API Token。 (Clould only)',
+ jiraPasswordTip: '可选:仅 Jira Server/Data Center 环境需要的密码字段。',
availableSourcesDescription: '选择要添加的数据源',
availableSources: '可用数据源',
datasourceDescription: '管理您的数据源和连接',
diff --git a/web/src/pages/user-setting/data-source/contant.tsx b/web/src/pages/user-setting/data-source/contant.tsx
index 7acf3036d..3c8c55826 100644
--- a/web/src/pages/user-setting/data-source/contant.tsx
+++ b/web/src/pages/user-setting/data-source/contant.tsx
@@ -9,8 +9,8 @@ export enum DataSourceKey {
NOTION = 'notion',
DISCORD = 'discord',
GOOGLE_DRIVE = 'google_drive',
- // GMAIL = 'gmail',
- // JIRA = 'jira',
+ // GMAIL = 'gmail',
+ JIRA = 'jira',
// SHAREPOINT = 'sharepoint',
// SLACK = 'slack',
// TEAMS = 'teams',
@@ -42,6 +42,11 @@ export const DataSourceInfo = {
description: t(`setting.${DataSourceKey.GOOGLE_DRIVE}Description`),
icon: ,
},
+ [DataSourceKey.JIRA]: {
+ name: 'Jira',
+ description: t(`setting.${DataSourceKey.JIRA}Description`),
+ icon: ,
+ },
};
export const DataSourceFormBaseFields = [
@@ -270,6 +275,106 @@ export const DataSourceFormFields = {
defaultValue: 'uploaded',
},
],
+ [DataSourceKey.JIRA]: [
+ {
+ label: 'Jira Base URL',
+ name: 'config.base_url',
+ type: FormFieldType.Text,
+ required: true,
+ placeholder: 'https://your-domain.atlassian.net',
+ tooltip: t('setting.jiraBaseUrlTip'),
+ },
+ {
+ label: 'Project Key',
+ name: 'config.project_key',
+ type: FormFieldType.Text,
+ required: false,
+ placeholder: 'RAGFlow',
+ tooltip: t('setting.jiraProjectKeyTip'),
+ },
+ {
+ label: 'Custom JQL',
+ name: 'config.jql_query',
+ type: FormFieldType.Textarea,
+ required: false,
+ placeholder: 'project = RAG AND updated >= -7d',
+ tooltip: t('setting.jiraJqlTip'),
+ },
+ {
+ label: 'Batch Size',
+ name: 'config.batch_size',
+ type: FormFieldType.Number,
+ required: false,
+ tooltip: t('setting.jiraBatchSizeTip'),
+ },
+ {
+ label: 'Include Comments',
+ name: 'config.include_comments',
+ type: FormFieldType.Checkbox,
+ required: false,
+ defaultValue: true,
+ tooltip: t('setting.jiraCommentsTip'),
+ },
+ {
+ label: 'Include Attachments',
+ name: 'config.include_attachments',
+ type: FormFieldType.Checkbox,
+ required: false,
+ defaultValue: false,
+ tooltip: t('setting.jiraAttachmentsTip'),
+ },
+ {
+ label: 'Attachment Size Limit (bytes)',
+ name: 'config.attachment_size_limit',
+ type: FormFieldType.Number,
+ required: false,
+ defaultValue: 10 * 1024 * 1024,
+ tooltip: t('setting.jiraAttachmentSizeTip'),
+ },
+ {
+ label: 'Labels to Skip',
+ name: 'config.labels_to_skip',
+ type: FormFieldType.Tag,
+ required: false,
+ tooltip: t('setting.jiraLabelsTip'),
+ },
+ {
+ label: 'Comment Email Blacklist',
+ name: 'config.comment_email_blacklist',
+ type: FormFieldType.Tag,
+ required: false,
+ tooltip: t('setting.jiraBlacklistTip'),
+ },
+ {
+ label: 'Use Scoped Token (Clould only)',
+ name: 'config.scoped_token',
+ type: FormFieldType.Checkbox,
+ required: false,
+ tooltip: t('setting.jiraScopedTokenTip'),
+ },
+ {
+ label: 'Jira User Email (Cloud) or User Name (Server)',
+ name: 'config.credentials.jira_user_email',
+ type: FormFieldType.Text,
+ required: true,
+ placeholder: 'you@example.com',
+ tooltip: t('setting.jiraEmailTip'),
+ },
+ {
+ label: 'Jira API Token (Cloud only)',
+ name: 'config.credentials.jira_api_token',
+ type: FormFieldType.Password,
+ required: false,
+ tooltip: t('setting.jiraTokenTip'),
+ },
+ {
+ label: 'Jira Password (Server only)',
+ name: 'config.credentials.jira_password',
+ type: FormFieldType.Password,
+ required: false,
+ tooltip: t('setting.jiraPasswordTip'),
+ },
+ ],
// [DataSourceKey.GOOGLE_DRIVE]: [
// {
// label: 'Primary Admin Email',
@@ -433,4 +538,25 @@ export const DataSourceFormDefaultValues = {
},
},
},
+ [DataSourceKey.JIRA]: {
+ name: '',
+ source: DataSourceKey.JIRA,
+ config: {
+ base_url: '',
+ project_key: '',
+ jql_query: '',
+ batch_size: 2,
+ include_comments: true,
+ include_attachments: false,
+ attachment_size_limit: 10 * 1024 * 1024,
+ labels_to_skip: [],
+ comment_email_blacklist: [],
+ scoped_token: false,
+ credentials: {
+ jira_user_email: '',
+ jira_api_token: '',
+ jira_password: '',
+ },
+ },
+ },
};
diff --git a/web/src/pages/user-setting/data-source/index.tsx b/web/src/pages/user-setting/data-source/index.tsx
index 80ceea1d7..9cb58672a 100644
--- a/web/src/pages/user-setting/data-source/index.tsx
+++ b/web/src/pages/user-setting/data-source/index.tsx
@@ -44,6 +44,12 @@ const dataSourceTemplates = [
description: DataSourceInfo[DataSourceKey.NOTION].description,
icon: DataSourceInfo[DataSourceKey.NOTION].icon,
},
+ {
+ id: DataSourceKey.JIRA,
+ name: DataSourceInfo[DataSourceKey.JIRA].name,
+ description: DataSourceInfo[DataSourceKey.JIRA].description,
+ icon: DataSourceInfo[DataSourceKey.JIRA].icon,
+ },
];
const DataSource = () => {
const { t } = useTranslation();