mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Oss support opendal(including mysql) (#8204)
### What problem does this PR solve? #8074 Oss support opendal(including mysql) ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
115
rag/utils/opendal_conn.py
Normal file
115
rag/utils/opendal_conn.py
Normal file
@ -0,0 +1,115 @@
|
||||
import opendal
|
||||
import logging
|
||||
import pymysql
|
||||
import yaml
|
||||
|
||||
from rag.utils import singleton
|
||||
|
||||
SERVICE_CONF_PATH = "conf/service_conf.yaml"
|
||||
|
||||
CREATE_TABLE_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS `{}` (
|
||||
`key` VARCHAR(255) PRIMARY KEY,
|
||||
`value` LONGBLOB,
|
||||
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
`updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||||
);
|
||||
"""
|
||||
SET_MAX_ALLOWED_PACKET_SQL = """
|
||||
SET GLOBAL max_allowed_packet={}
|
||||
"""
|
||||
|
||||
|
||||
def get_opendal_config_from_yaml(yaml_path=SERVICE_CONF_PATH):
|
||||
try:
|
||||
with open(yaml_path, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
opendal_config = config.get('opendal', {})
|
||||
kwargs = {}
|
||||
if opendal_config.get("schema") == 'mysql':
|
||||
mysql_config = config.get('mysql', {})
|
||||
kwargs = {
|
||||
"schema": "mysql",
|
||||
"host": mysql_config.get("host", "127.0.0.1"),
|
||||
"port": str(mysql_config.get("port", 3306)),
|
||||
"user": mysql_config.get("user", "root"),
|
||||
"password": mysql_config.get("password", ""),
|
||||
"database": mysql_config.get("name", "test_open_dal"),
|
||||
"table": opendal_config.get("config").get("table", "opendal_storage")
|
||||
}
|
||||
kwargs["connection_string"] = f"mysql://{kwargs['user']}:{kwargs['password']}@{kwargs['host']}:{kwargs['port']}/{kwargs['database']}"
|
||||
else:
|
||||
schema = opendal_config.get("schema")
|
||||
config_data = opendal_config.get("config", {})
|
||||
kwargs = {"schema": schema, **config_data}
|
||||
logging.info("Loaded OpenDAL configuration from yaml: %s", kwargs)
|
||||
return kwargs
|
||||
except Exception as e:
|
||||
logging.error("Failed to load OpenDAL configuration from yaml: %s", str(e))
|
||||
raise
|
||||
|
||||
|
||||
@singleton
|
||||
class OpenDALStorage:
|
||||
def __init__(self):
|
||||
self._kwargs = get_opendal_config_from_yaml()
|
||||
self._schema = self._kwargs.get('schema', 'mysql')
|
||||
if self._schema == 'mysql':
|
||||
self.init_db_config()
|
||||
self.init_opendal_mysql_table()
|
||||
self._operator = opendal.Operator(self._schema, **self._kwargs)
|
||||
|
||||
logging.info("OpenDALStorage initialized successfully")
|
||||
|
||||
def put(self, bucket, fnm, binary):
|
||||
self._operator.write(f"{bucket}/{fnm}", binary)
|
||||
|
||||
def get(self, bucket, fnm):
|
||||
return self._operator.read(f"{bucket}/{fnm}")
|
||||
|
||||
def rm(self, bucket, fnm):
|
||||
self._operator.delete(f"{bucket}/{fnm}")
|
||||
self._operator.__init__()
|
||||
|
||||
def scan(self, bucket, fnm):
|
||||
return self._operator.scan(f"{bucket}/{fnm}")
|
||||
|
||||
def obj_exist(self, bucket, fnm):
|
||||
return self._operator.exists(f"{bucket}/{fnm}")
|
||||
|
||||
|
||||
def init_db_config(self):
|
||||
try:
|
||||
conn = pymysql.connect(
|
||||
host=self._kwargs['host'],
|
||||
port=int(self._kwargs['port']),
|
||||
user=self._kwargs['user'],
|
||||
password=self._kwargs['password'],
|
||||
database=self._kwargs['database']
|
||||
)
|
||||
cursor = conn.cursor()
|
||||
max_packet = self._kwargs.get('max_allowed_packet', 4194304) # Default to 4MB if not specified
|
||||
cursor.execute(SET_MAX_ALLOWED_PACKET_SQL.format(max_packet))
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
logging.info(f"Database configuration initialized with max_allowed_packet={max_packet}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize database configuration: {str(e)}")
|
||||
raise
|
||||
|
||||
def init_opendal_mysql_table(self):
|
||||
conn = pymysql.connect(
|
||||
host=self._kwargs['host'],
|
||||
port=int(self._kwargs['port']),
|
||||
user=self._kwargs['user'],
|
||||
password=self._kwargs['password'],
|
||||
database=self._kwargs['database']
|
||||
)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(CREATE_TABLE_SQL.format(self._kwargs['table']))
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
logging.info(f"Table `{self._kwargs['table']}` initialized.")
|
||||
@ -20,6 +20,7 @@ from enum import Enum
|
||||
from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob
|
||||
from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob
|
||||
from rag.utils.minio_conn import RAGFlowMinio
|
||||
from rag.utils.opendal_conn import OpenDALStorage
|
||||
from rag.utils.s3_conn import RAGFlowS3
|
||||
from rag.utils.oss_conn import RAGFlowOSS
|
||||
|
||||
@ -30,6 +31,7 @@ class Storage(Enum):
|
||||
AZURE_SAS = 3
|
||||
AWS_S3 = 4
|
||||
OSS = 5
|
||||
OPENDAL = 6
|
||||
|
||||
|
||||
class StorageFactory:
|
||||
@ -39,6 +41,7 @@ class StorageFactory:
|
||||
Storage.AZURE_SAS: RAGFlowAzureSasBlob,
|
||||
Storage.AWS_S3: RAGFlowS3,
|
||||
Storage.OSS: RAGFlowOSS,
|
||||
Storage.OPENDAL: OpenDALStorage
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
||||
Reference in New Issue
Block a user