Feat: GitHub connector (#12314)

### What problem does this PR solve?

Feat: GitHub connector

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Magicbook1108
2025-12-30 15:09:52 +08:00
committed by GitHub
parent f0392e7501
commit 5903d1c8f1
21 changed files with 1522 additions and 49 deletions

View File

@ -55,6 +55,7 @@ from common.data_source.config import INDEX_BATCH_SIZE
from common.data_source.confluence_connector import ConfluenceConnector
from common.data_source.gmail_connector import GmailConnector
from common.data_source.box_connector import BoxConnector
from common.data_source.github.connector import GithubConnector
from common.data_source.gitlab_connector import GitlabConnector
from common.data_source.interfaces import CheckpointOutputWrapper
from common.log_utils import init_root_logger
@ -108,7 +109,7 @@ class SyncBase:
if task["poll_range_start"]:
next_update = task["poll_range_start"]
for document_batch in document_batch_generator:
async for document_batch in document_batch_generator:
if not document_batch:
continue
@ -706,20 +707,17 @@ class Moodle(SyncBase):
self.connector.load_credentials(self.conf["credentials"])
# Determine the time range for synchronization based on reindex or poll_range_start
if task["reindex"] == "1" or not task.get("poll_range_start"):
poll_start = task.get("poll_range_start")
if task["reindex"] == "1" or poll_start is None:
document_generator = self.connector.load_from_state()
begin_info = "totally"
else:
poll_start = task["poll_range_start"]
if poll_start is None:
document_generator = self.connector.load_from_state()
begin_info = "totally"
else:
document_generator = self.connector.poll_source(
poll_start.timestamp(),
datetime.now(timezone.utc).timestamp()
)
begin_info = "from {}".format(poll_start)
document_generator = self.connector.poll_source(
poll_start.timestamp(),
datetime.now(timezone.utc).timestamp(),
)
begin_info = f"from {poll_start}"
logging.info("Connect to Moodle: {} {}".format(self.conf["moodle_url"], begin_info))
return document_generator
@ -749,20 +747,17 @@ class BOX(SyncBase):
auth.token_storage.store(token)
self.connector.load_credentials(auth)
if task["reindex"] == "1" or not task["poll_range_start"]:
poll_start = task["poll_range_start"]
if task["reindex"] == "1" or poll_start is None:
document_generator = self.connector.load_from_state()
begin_info = "totally"
else:
poll_start = task["poll_range_start"]
if poll_start is None:
document_generator = self.connector.load_from_state()
begin_info = "totally"
else:
document_generator = self.connector.poll_source(
poll_start.timestamp(),
datetime.now(timezone.utc).timestamp()
)
begin_info = "from {}".format(poll_start)
document_generator = self.connector.poll_source(
poll_start.timestamp(),
datetime.now(timezone.utc).timestamp(),
)
begin_info = f"from {poll_start}"
logging.info("Connect to Box: folder_id({}) {}".format(self.conf["folder_id"], begin_info))
return document_generator
@ -788,20 +783,17 @@ class Airtable(SyncBase):
{"airtable_access_token": credentials["airtable_access_token"]}
)
if task.get("reindex") == "1" or not task.get("poll_range_start"):
poll_start = task.get("poll_range_start")
if task.get("reindex") == "1" or poll_start is None:
document_generator = self.connector.load_from_state()
begin_info = "totally"
else:
poll_start = task.get("poll_range_start")
if poll_start is None:
document_generator = self.connector.load_from_state()
begin_info = "totally"
else:
document_generator = self.connector.poll_source(
poll_start.timestamp(),
datetime.now(timezone.utc).timestamp(),
)
begin_info = f"from {poll_start}"
document_generator = self.connector.poll_source(
poll_start.timestamp(),
datetime.now(timezone.utc).timestamp(),
)
begin_info = f"from {poll_start}"
logging.info(
"Connect to Airtable: base_id(%s), table(%s) %s",
@ -854,6 +846,75 @@ class Asana(SyncBase):
return document_generator
class Github(SyncBase):
SOURCE_NAME: str = FileSource.GITHUB
async def _generate(self, task: dict):
"""
Sync files from Github repositories.
"""
from common.data_source.connector_runner import ConnectorRunner
self.connector = GithubConnector(
repo_owner=self.conf.get("repository_owner"),
repositories=self.conf.get("repository_name"),
include_prs=self.conf.get("include_pull_requests", False),
include_issues=self.conf.get("include_issues", False),
)
credentials = self.conf.get("credentials", {})
if "github_access_token" not in credentials:
raise ValueError("Missing github_access_token in credentials")
self.connector.load_credentials(
{"github_access_token": credentials["github_access_token"]}
)
if task.get("reindex") == "1" or not task.get("poll_range_start"):
start_time = datetime.fromtimestamp(0, tz=timezone.utc)
begin_info = "totally"
else:
start_time = task.get("poll_range_start")
begin_info = f"from {start_time}"
end_time = datetime.now(timezone.utc)
runner = ConnectorRunner(
connector=self.connector,
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE),
include_permissions=False,
time_range=(start_time, end_time)
)
def document_batches():
checkpoint = self.connector.build_dummy_checkpoint()
while checkpoint.has_more:
for doc_batch, failure, next_checkpoint in runner.run(checkpoint):
if failure is not None:
logging.warning(
"Github connector failure: %s",
getattr(failure, "failure_message", failure),
)
continue
if doc_batch is not None:
yield doc_batch
if next_checkpoint is not None:
checkpoint = next_checkpoint
async def async_wrapper():
for batch in document_batches():
yield batch
logging.info(
"Connect to Github: org_name(%s), repo_names(%s) for %s",
self.conf.get("repository_owner"),
self.conf.get("repository_name"),
begin_info,
)
return async_wrapper()
class Gitlab(SyncBase):
@ -915,8 +976,9 @@ func_factory = {
FileSource.WEBDAV: WebDAV,
FileSource.BOX: BOX,
FileSource.AIRTABLE: Airtable,
FileSource.GITLAB: Gitlab,
FileSource.ASANA: Asana,
FileSource.GITHUB: Github,
FileSource.GITLAB: Gitlab,
}