diff --git a/common/constants.py b/common/constants.py index 574786d00..4cced1266 100644 --- a/common/constants.py +++ b/common/constants.py @@ -148,6 +148,7 @@ class Storage(Enum): AWS_S3 = 4 OSS = 5 OPENDAL = 6 + GCS = 7 # environment # ENV_STRONG_TEST_COUNT = "STRONG_TEST_COUNT" diff --git a/common/settings.py b/common/settings.py index 81a2c19a4..c8f7a6de4 100644 --- a/common/settings.py +++ b/common/settings.py @@ -31,6 +31,7 @@ import rag.utils.ob_conn import rag.utils.opensearch_conn from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob +from rag.utils.gcs_conn import RAGFlowGCS from rag.utils.minio_conn import RAGFlowMinio from rag.utils.opendal_conn import OpenDALStorage from rag.utils.s3_conn import RAGFlowS3 @@ -109,6 +110,7 @@ MINIO = {} OB = {} OSS = {} OS = {} +GCS = {} DOC_MAXIMUM_SIZE: int = 128 * 1024 * 1024 DOC_BULK_SIZE: int = 4 @@ -151,7 +153,8 @@ class StorageFactory: Storage.AZURE_SAS: RAGFlowAzureSasBlob, Storage.AWS_S3: RAGFlowS3, Storage.OSS: RAGFlowOSS, - Storage.OPENDAL: OpenDALStorage + Storage.OPENDAL: OpenDALStorage, + Storage.GCS: RAGFlowGCS, } @classmethod @@ -250,7 +253,7 @@ def init_settings(): else: raise Exception(f"Not supported doc engine: {DOC_ENGINE}") - global AZURE, S3, MINIO, OSS + global AZURE, S3, MINIO, OSS, GCS if STORAGE_IMPL_TYPE in ['AZURE_SPN', 'AZURE_SAS']: AZURE = get_base_config("azure", {}) elif STORAGE_IMPL_TYPE == 'AWS_S3': @@ -259,6 +262,8 @@ def init_settings(): MINIO = decrypt_database_config(name="minio") elif STORAGE_IMPL_TYPE == 'OSS': OSS = get_base_config("oss", {}) + elif STORAGE_IMPL_TYPE == 'GCS': + GCS = get_base_config("gcs", {}) global STORAGE_IMPL STORAGE_IMPL = StorageFactory.create(Storage[STORAGE_IMPL_TYPE]) diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index 07a7b32a9..82f2e9248 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -60,6 +60,8 @@ user_default_llm: # access_key: 'access_key' # secret_key: 'secret_key' # region: 'region' +#gcs: +# bucket: 'bridgtl-edm-d-bucket-ragflow' # oss: # access_key: 'access_key' # secret_key: 'secret_key' diff --git a/rag/utils/gcs_conn.py b/rag/utils/gcs_conn.py new file mode 100644 index 000000000..5268cea42 --- /dev/null +++ b/rag/utils/gcs_conn.py @@ -0,0 +1,207 @@ +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import time +import datetime +from io import BytesIO +from google.cloud import storage +from google.api_core.exceptions import NotFound +from common.decorator import singleton +from common import settings + + +@singleton +class RAGFlowGCS: + def __init__(self): + self.client = None + self.bucket_name = None + self.__open__() + + def __open__(self): + try: + if self.client: + self.client = None + except Exception: + pass + + try: + self.client = storage.Client() + self.bucket_name = settings.GCS["bucket"] + except Exception: + logging.exception("Fail to connect to GCS") + + def _get_blob_path(self, folder, filename): + """Helper to construct the path: folder/filename""" + if not folder: + return filename + return f"{folder}/{filename}" + + def health(self): + folder, fnm, binary = "ragflow-health", "health_check", b"_t@@@1" + try: + bucket_obj = self.client.bucket(self.bucket_name) + if not bucket_obj.exists(): + logging.error(f"Health check failed: Main bucket '{self.bucket_name}' does not exist.") + return False + + blob_path = self._get_blob_path(folder, fnm) + blob = bucket_obj.blob(blob_path) + blob.upload_from_file(BytesIO(binary), content_type='application/octet-stream') + return True + except Exception as e: + logging.exception(f"Health check failed: {e}") + return False + + def put(self, bucket, fnm, binary, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket (to match interface) + for _ in range(3): + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, fnm) + blob = bucket_obj.blob(blob_path) + + blob.upload_from_file(BytesIO(binary), content_type='application/octet-stream') + return True + except NotFound: + logging.error(f"Fail to put: Main bucket {self.bucket_name} does not exist.") + return False + except Exception: + logging.exception(f"Fail to put {bucket}/{fnm}:") + self.__open__() + time.sleep(1) + return False + + def rm(self, bucket, fnm, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, fnm) + blob = bucket_obj.blob(blob_path) + blob.delete() + except NotFound: + pass + except Exception: + logging.exception(f"Fail to remove {bucket}/{fnm}:") + + def get(self, bucket, filename, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + for _ in range(1): + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, filename) + blob = bucket_obj.blob(blob_path) + return blob.download_as_bytes() + except NotFound: + logging.warning(f"File not found {bucket}/{filename} in {self.bucket_name}") + return None + except Exception: + logging.exception(f"Fail to get {bucket}/{filename}") + self.__open__() + time.sleep(1) + return None + + def obj_exist(self, bucket, filename, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, filename) + blob = bucket_obj.blob(blob_path) + return blob.exists() + except Exception: + logging.exception(f"obj_exist {bucket}/{filename} got exception") + return False + + def bucket_exists(self, bucket): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + return bucket_obj.exists() + except Exception: + logging.exception(f"bucket_exist check for {self.bucket_name} got exception") + return False + + def get_presigned_url(self, bucket, fnm, expires, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + for _ in range(10): + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, fnm) + blob = bucket_obj.blob(blob_path) + + expiration = expires + if isinstance(expires, int): + expiration = datetime.timedelta(seconds=expires) + + url = blob.generate_signed_url( + version="v4", + expiration=expiration, + method="GET" + ) + return url + except Exception: + logging.exception(f"Fail to get_presigned {bucket}/{fnm}:") + self.__open__() + time.sleep(1) + return None + + def remove_bucket(self, bucket): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + prefix = f"{bucket}/" + + blobs = list(self.client.list_blobs(self.bucket_name, prefix=prefix)) + + if blobs: + bucket_obj.delete_blobs(blobs) + except Exception: + logging.exception(f"Fail to remove virtual bucket (folder) {bucket}") + + def copy(self, src_bucket, src_path, dest_bucket, dest_path): + # RENAMED PARAMETERS to match original interface + try: + bucket_obj = self.client.bucket(self.bucket_name) + + src_blob_path = self._get_blob_path(src_bucket, src_path) + dest_blob_path = self._get_blob_path(dest_bucket, dest_path) + + src_blob = bucket_obj.blob(src_blob_path) + + if not src_blob.exists(): + logging.error(f"Source object not found: {src_blob_path}") + return False + + bucket_obj.copy_blob(src_blob, bucket_obj, dest_blob_path) + return True + + except NotFound: + logging.error(f"Copy failed: Main bucket {self.bucket_name} does not exist.") + return False + except Exception: + logging.exception(f"Fail to copy {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False + + def move(self, src_bucket, src_path, dest_bucket, dest_path): + try: + if self.copy(src_bucket, src_path, dest_bucket, dest_path): + self.rm(src_bucket, src_path) + return True + else: + logging.error(f"Copy failed, move aborted: {src_bucket}/{src_path}") + return False + except Exception: + logging.exception(f"Fail to move {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False \ No newline at end of file