From cbdacf21f62bb00be45dd92a8af482e1dbfbb631 Mon Sep 17 00:00:00 2001 From: Wiratama Date: Thu, 4 Dec 2025 09:44:05 +0700 Subject: [PATCH] feat(gcs): Add support for Google Cloud Storage (GCS) integration (#11718) ### What problem does this PR solve? This Pull Request introduces native support for Google Cloud Storage (GCS) as an optional object storage backend. Currently, RAGFlow relies on a limited set of storage options. This feature addresses the need for seamless integration with GCP environments, allowing users to leverage a fully managed, highly durable, and scalable storage service (GCS) instead of needing to deploy and maintain third-party object storage solutions. This simplifies deployment, especially for users running on GCP infrastructure like GKE or Cloud Run. The implementation uses a single GCS bucket defined via configuration, mapping RAGFlow's internal logical storage units (or "buckets") to folder prefixes within that GCS container to maintain data separation. This architectural choice avoids the operational complexities associated with dynamically creating and managing unique GCS buckets for every logical unit. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/constants.py | 1 + common/settings.py | 9 +- conf/service_conf.yaml | 2 + rag/utils/gcs_conn.py | 207 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 rag/utils/gcs_conn.py diff --git a/common/constants.py b/common/constants.py index 574786d00..4cced1266 100644 --- a/common/constants.py +++ b/common/constants.py @@ -148,6 +148,7 @@ class Storage(Enum): AWS_S3 = 4 OSS = 5 OPENDAL = 6 + GCS = 7 # environment # ENV_STRONG_TEST_COUNT = "STRONG_TEST_COUNT" diff --git a/common/settings.py b/common/settings.py index 81a2c19a4..c8f7a6de4 100644 --- a/common/settings.py +++ b/common/settings.py @@ -31,6 +31,7 @@ import rag.utils.ob_conn import rag.utils.opensearch_conn from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob +from rag.utils.gcs_conn import RAGFlowGCS from rag.utils.minio_conn import RAGFlowMinio from rag.utils.opendal_conn import OpenDALStorage from rag.utils.s3_conn import RAGFlowS3 @@ -109,6 +110,7 @@ MINIO = {} OB = {} OSS = {} OS = {} +GCS = {} DOC_MAXIMUM_SIZE: int = 128 * 1024 * 1024 DOC_BULK_SIZE: int = 4 @@ -151,7 +153,8 @@ class StorageFactory: Storage.AZURE_SAS: RAGFlowAzureSasBlob, Storage.AWS_S3: RAGFlowS3, Storage.OSS: RAGFlowOSS, - Storage.OPENDAL: OpenDALStorage + Storage.OPENDAL: OpenDALStorage, + Storage.GCS: RAGFlowGCS, } @classmethod @@ -250,7 +253,7 @@ def init_settings(): else: raise Exception(f"Not supported doc engine: {DOC_ENGINE}") - global AZURE, S3, MINIO, OSS + global AZURE, S3, MINIO, OSS, GCS if STORAGE_IMPL_TYPE in ['AZURE_SPN', 'AZURE_SAS']: AZURE = get_base_config("azure", {}) elif STORAGE_IMPL_TYPE == 'AWS_S3': @@ -259,6 +262,8 @@ def init_settings(): MINIO = decrypt_database_config(name="minio") elif STORAGE_IMPL_TYPE == 'OSS': OSS = get_base_config("oss", {}) + elif STORAGE_IMPL_TYPE == 'GCS': + GCS = get_base_config("gcs", {}) global STORAGE_IMPL STORAGE_IMPL = StorageFactory.create(Storage[STORAGE_IMPL_TYPE]) diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index 07a7b32a9..82f2e9248 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -60,6 +60,8 @@ user_default_llm: # access_key: 'access_key' # secret_key: 'secret_key' # region: 'region' +#gcs: +# bucket: 'bridgtl-edm-d-bucket-ragflow' # oss: # access_key: 'access_key' # secret_key: 'secret_key' diff --git a/rag/utils/gcs_conn.py b/rag/utils/gcs_conn.py new file mode 100644 index 000000000..5268cea42 --- /dev/null +++ b/rag/utils/gcs_conn.py @@ -0,0 +1,207 @@ +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import time +import datetime +from io import BytesIO +from google.cloud import storage +from google.api_core.exceptions import NotFound +from common.decorator import singleton +from common import settings + + +@singleton +class RAGFlowGCS: + def __init__(self): + self.client = None + self.bucket_name = None + self.__open__() + + def __open__(self): + try: + if self.client: + self.client = None + except Exception: + pass + + try: + self.client = storage.Client() + self.bucket_name = settings.GCS["bucket"] + except Exception: + logging.exception("Fail to connect to GCS") + + def _get_blob_path(self, folder, filename): + """Helper to construct the path: folder/filename""" + if not folder: + return filename + return f"{folder}/{filename}" + + def health(self): + folder, fnm, binary = "ragflow-health", "health_check", b"_t@@@1" + try: + bucket_obj = self.client.bucket(self.bucket_name) + if not bucket_obj.exists(): + logging.error(f"Health check failed: Main bucket '{self.bucket_name}' does not exist.") + return False + + blob_path = self._get_blob_path(folder, fnm) + blob = bucket_obj.blob(blob_path) + blob.upload_from_file(BytesIO(binary), content_type='application/octet-stream') + return True + except Exception as e: + logging.exception(f"Health check failed: {e}") + return False + + def put(self, bucket, fnm, binary, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket (to match interface) + for _ in range(3): + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, fnm) + blob = bucket_obj.blob(blob_path) + + blob.upload_from_file(BytesIO(binary), content_type='application/octet-stream') + return True + except NotFound: + logging.error(f"Fail to put: Main bucket {self.bucket_name} does not exist.") + return False + except Exception: + logging.exception(f"Fail to put {bucket}/{fnm}:") + self.__open__() + time.sleep(1) + return False + + def rm(self, bucket, fnm, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, fnm) + blob = bucket_obj.blob(blob_path) + blob.delete() + except NotFound: + pass + except Exception: + logging.exception(f"Fail to remove {bucket}/{fnm}:") + + def get(self, bucket, filename, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + for _ in range(1): + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, filename) + blob = bucket_obj.blob(blob_path) + return blob.download_as_bytes() + except NotFound: + logging.warning(f"File not found {bucket}/{filename} in {self.bucket_name}") + return None + except Exception: + logging.exception(f"Fail to get {bucket}/{filename}") + self.__open__() + time.sleep(1) + return None + + def obj_exist(self, bucket, filename, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, filename) + blob = bucket_obj.blob(blob_path) + return blob.exists() + except Exception: + logging.exception(f"obj_exist {bucket}/{filename} got exception") + return False + + def bucket_exists(self, bucket): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + return bucket_obj.exists() + except Exception: + logging.exception(f"bucket_exist check for {self.bucket_name} got exception") + return False + + def get_presigned_url(self, bucket, fnm, expires, tenant_id=None): + # RENAMED PARAMETER: bucket_name -> bucket + for _ in range(10): + try: + bucket_obj = self.client.bucket(self.bucket_name) + blob_path = self._get_blob_path(bucket, fnm) + blob = bucket_obj.blob(blob_path) + + expiration = expires + if isinstance(expires, int): + expiration = datetime.timedelta(seconds=expires) + + url = blob.generate_signed_url( + version="v4", + expiration=expiration, + method="GET" + ) + return url + except Exception: + logging.exception(f"Fail to get_presigned {bucket}/{fnm}:") + self.__open__() + time.sleep(1) + return None + + def remove_bucket(self, bucket): + # RENAMED PARAMETER: bucket_name -> bucket + try: + bucket_obj = self.client.bucket(self.bucket_name) + prefix = f"{bucket}/" + + blobs = list(self.client.list_blobs(self.bucket_name, prefix=prefix)) + + if blobs: + bucket_obj.delete_blobs(blobs) + except Exception: + logging.exception(f"Fail to remove virtual bucket (folder) {bucket}") + + def copy(self, src_bucket, src_path, dest_bucket, dest_path): + # RENAMED PARAMETERS to match original interface + try: + bucket_obj = self.client.bucket(self.bucket_name) + + src_blob_path = self._get_blob_path(src_bucket, src_path) + dest_blob_path = self._get_blob_path(dest_bucket, dest_path) + + src_blob = bucket_obj.blob(src_blob_path) + + if not src_blob.exists(): + logging.error(f"Source object not found: {src_blob_path}") + return False + + bucket_obj.copy_blob(src_blob, bucket_obj, dest_blob_path) + return True + + except NotFound: + logging.error(f"Copy failed: Main bucket {self.bucket_name} does not exist.") + return False + except Exception: + logging.exception(f"Fail to copy {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False + + def move(self, src_bucket, src_path, dest_bucket, dest_path): + try: + if self.copy(src_bucket, src_path, dest_bucket, dest_path): + self.rm(src_bucket, src_path) + return True + else: + logging.error(f"Copy failed, move aborted: {src_bucket}/{src_path}") + return False + except Exception: + logging.exception(f"Fail to move {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False \ No newline at end of file