Move 'get_project_base_directory' to common directory (#10940)

### What problem does this PR solve?

As title

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2025-11-02 21:05:28 +08:00
committed by GitHub
parent 57a83eca8a
commit 44f2d6f5da
24 changed files with 186 additions and 51 deletions

View File

@ -23,6 +23,7 @@ import pickle
import importlib
from api.utils import file_utils
from common.file_utils import get_project_base_directory
from filelock import FileLock
from api.utils.common import bytes_to_string, string_to_bytes
from api.constants import SERVICE_CONF
@ -30,7 +31,7 @@ from api.constants import SERVICE_CONF
def conf_realpath(conf_name):
conf_path = f"conf/{conf_name}"
return os.path.join(file_utils.get_project_base_directory(), conf_path)
return os.path.join(get_project_base_directory(), conf_path)
def read_config(conf_name=SERVICE_CONF):
@ -129,8 +130,7 @@ def decrypt_database_config(
def update_config(key, value, conf_name=SERVICE_CONF):
conf_path = conf_realpath(conf_name=conf_name)
if not os.path.isabs(conf_path):
conf_path = os.path.join(
file_utils.get_project_base_directory(), conf_path)
conf_path = os.path.join(get_project_base_directory(), conf_path)
with FileLock(os.path.join(os.path.dirname(conf_path), ".lock")):
config = file_utils.load_yaml_conf(conf_path=conf_path) or {}

View File

@ -19,14 +19,14 @@ import os
import sys
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from api.utils import file_utils
from common.file_utils import get_project_base_directory
def crypt(line):
"""
decrypt(crypt(input_string)) == base64(input_string), which frontend and admin_client use.
"""
file_path = os.path.join(file_utils.get_project_base_directory(), "conf", "public.pem")
file_path = os.path.join(get_project_base_directory(), "conf", "public.pem")
rsa_key = RSA.importKey(open(file_path).read(), "Welcome")
cipher = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64 = base64.b64encode(line.encode('utf-8')).decode("utf-8")
@ -35,7 +35,7 @@ def crypt(line):
def decrypt(line):
file_path = os.path.join(file_utils.get_project_base_directory(), "conf", "private.pem")
file_path = os.path.join(get_project_base_directory(), "conf", "private.pem")
rsa_key = RSA.importKey(open(file_path).read(), "Welcome")
cipher = Cipher_pkcs1_v1_5.new(rsa_key)
return cipher.decrypt(base64.b64decode(line), "Fail to decrypt password!").decode('utf-8')
@ -50,7 +50,7 @@ def decrypt2(crypt_text):
hex_fixed = '00' + decode_data.hex()
decode_data = b16decode(hex_fixed.upper())
file_path = os.path.join(file_utils.get_project_base_directory(), "conf", "private.pem")
file_path = os.path.join(get_project_base_directory(), "conf", "private.pem")
pem = open(file_path).read()
rsa_key = RSA.importKey(pem, "Welcome")
cipher = Cipher_PKCS1_v1_5.new(rsa_key)

View File

@ -43,6 +43,7 @@ from ruamel.yaml import YAML
# Local imports
from api.constants import IMG_BASE64_PREFIX
from api.db import FileType
from common.file_utils import get_project_base_directory
PROJECT_BASE = os.getenv("RAG_PROJECT_BASE") or os.getenv("RAG_DEPLOY_BASE")
@ -51,21 +52,6 @@ if LOCK_KEY_pdfplumber not in sys.modules:
sys.modules[LOCK_KEY_pdfplumber] = threading.Lock()
def get_project_base_directory(*args):
global PROJECT_BASE
if PROJECT_BASE is None:
PROJECT_BASE = os.path.abspath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
os.pardir,
os.pardir,
)
)
if args:
return os.path.join(PROJECT_BASE, *args)
return PROJECT_BASE
@cached(cache=LRUCache(maxsize=10))
def load_json_conf(conf_path):
if os.path.isabs(conf_path):

View File

@ -17,19 +17,10 @@ import os
import os.path
import logging
from logging.handlers import RotatingFileHandler
from common.file_utils import get_project_base_directory
initialized_root_logger = False
def get_project_base_directory():
PROJECT_BASE = os.path.abspath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
os.pardir,
os.pardir,
)
)
return PROJECT_BASE
def init_root_logger(logfile_basename: str, log_format: str = "%(asctime)-15s %(levelname)-8s %(process)d %(message)s"):
global initialized_root_logger
if initialized_root_logger: