Feat: Separate connectors from s3 (#12045)

### What problem does this PR solve?

Feat: Separate connectors from s3 #12008 

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

Overview:
<img width="1500" alt="image"
src="https://github.com/user-attachments/assets/d54fea7a-7294-4ec0-ab6c-9753b3f03a72"
/>

Oracle: 
<img width="350" alt="image"
src="https://github.com/user-attachments/assets/bca140c1-33d8-4950-afdc-153407eedc46"
/>
This commit is contained in:
Magicbook1108
2025-12-22 09:36:16 +08:00
committed by GitHub
parent 47005ebe10
commit 5aea82d9c4
10 changed files with 213 additions and 14 deletions

View File

@ -161,23 +161,59 @@ class SyncBase:
def _get_source_prefix(self):
return ""
class S3(SyncBase):
SOURCE_NAME: str = FileSource.S3
class _BlobLikeBase(SyncBase):
DEFAULT_BUCKET_TYPE: str = "s3"
async def _generate(self, task: dict):
self.connector = BlobStorageConnector(bucket_type=self.conf.get("bucket_type", "s3"), bucket_name=self.conf["bucket_name"], prefix=self.conf.get("prefix", ""))
bucket_type = self.conf.get("bucket_type", self.DEFAULT_BUCKET_TYPE)
self.connector = BlobStorageConnector(
bucket_type=bucket_type,
bucket_name=self.conf["bucket_name"],
prefix=self.conf.get("prefix", ""),
)
self.connector.load_credentials(self.conf["credentials"])
document_batch_generator = (
self.connector.load_from_state()
if task["reindex"] == "1" or not task["poll_range_start"]
else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp())
else self.connector.poll_source(
task["poll_range_start"].timestamp(),
datetime.now(timezone.utc).timestamp(),
)
)
begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"])
logging.info("Connect to {}: {}(prefix/{}) {}".format(self.conf.get("bucket_type", "s3"), self.conf["bucket_name"], self.conf.get("prefix", ""), begin_info))
begin_info = (
"totally"
if task["reindex"] == "1" or not task["poll_range_start"]
else "from {}".format(task["poll_range_start"])
)
logging.info(
"Connect to {}: {}(prefix/{}) {}".format(
bucket_type,
self.conf["bucket_name"],
self.conf.get("prefix", ""),
begin_info,
)
)
return document_batch_generator
class S3(_BlobLikeBase):
SOURCE_NAME: str = FileSource.S3
DEFAULT_BUCKET_TYPE: str = "s3"
class R2(_BlobLikeBase):
SOURCE_NAME: str = FileSource.R2
DEFAULT_BUCKET_TYPE: str = "r2"
class OCI_STORAGE(_BlobLikeBase):
SOURCE_NAME: str = FileSource.OCI_STORAGE
DEFAULT_BUCKET_TYPE: str = "oci_storage"
class GOOGLE_CLOUD_STORAGE(_BlobLikeBase):
SOURCE_NAME: str = FileSource.GOOGLE_CLOUD_STORAGE
DEFAULT_BUCKET_TYPE: str = "google_cloud_storage"
class Confluence(SyncBase):
SOURCE_NAME: str = FileSource.CONFLUENCE
@ -705,6 +741,9 @@ class BOX(SyncBase):
func_factory = {
FileSource.S3: S3,
FileSource.R2: R2,
FileSource.OCI_STORAGE: OCI_STORAGE,
FileSource.GOOGLE_CLOUD_STORAGE: GOOGLE_CLOUD_STORAGE,
FileSource.NOTION: Notion,
FileSource.DISCORD: Discord,
FileSource.CONFLUENCE: Confluence,