Introduced beartype (#3460)

### What problem does this PR solve?

Introduced [beartype](https://github.com/beartype/beartype) for runtime
type-checking.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Zhichang Yu
2024-11-18 17:38:17 +08:00
committed by GitHub
parent 3824c1fec0
commit 4413683898
32 changed files with 125 additions and 134 deletions

View File

@ -17,7 +17,6 @@ import logging
import inspect
import os
import sys
import typing
import operator
from enum import Enum
from functools import wraps
@ -121,13 +120,13 @@ class SerializedField(LongTextField):
f"the serialized type {self._serialized_type} is not supported")
def is_continuous_field(cls: typing.Type) -> bool:
def is_continuous_field(cls: type) -> bool:
if cls in CONTINUOUS_FIELD_TYPE:
return True
for p in cls.__bases__:
if p in CONTINUOUS_FIELD_TYPE:
return True
elif p != Field and p != object:
elif p is not Field and p is not object:
if is_continuous_field(p):
return True
else:
@ -159,7 +158,7 @@ class BaseModel(Model):
def to_dict(self):
return self.__dict__['__data__']
def to_human_model_dict(self, only_primary_with: list = None):
def to_human_model_dict(self, only_primary_with: list | None = None):
model_dict = self.__dict__['__data__']
if not only_primary_with:

View File

@ -15,7 +15,6 @@
#
import operator
from functools import reduce
from typing import Dict, Type, Union
from playhouse.pool import PooledMySQLDatabase
@ -87,7 +86,7 @@ supported_operators = {
def query_dict2expression(
model: Type[DataBaseModel], query: Dict[str, Union[bool, int, str, list, tuple]]):
model: type[DataBaseModel], query: dict[str, bool | int | str | list | tuple]):
expression = []
for field, value in query.items():
@ -105,8 +104,8 @@ def query_dict2expression(
return reduce(operator.iand, expression)
def query_db(model: Type[DataBaseModel], limit: int = 0, offset: int = 0,
query: dict = None, order_by: Union[str, list, tuple] = None):
def query_db(model: type[DataBaseModel], limit: int = 0, offset: int = 0,
query: dict = None, order_by: str | list | tuple | None = None):
data = model.select()
if query:
data = data.where(query_dict2expression(model, query))

View File

@ -14,6 +14,9 @@
# limitations under the License.
#
from beartype.claw import beartype_packages
beartype_packages(["agent", "api", "deepdoc", "plugins", "rag", "ragflow_sdk"]) # <-- raise exceptions in your code
import logging
from api.utils.log_utils import initRootLogger
initRootLogger("ragflow_server")

View File

@ -28,13 +28,12 @@ def get_project_base_directory():
)
return PROJECT_BASE
def initRootLogger(script_path: str, log_level: int = logging.INFO, log_format: str = "%(asctime)-15s %(levelname)-8s %(process)d %(message)s"):
def initRootLogger(logfile_basename: str, log_level: int = logging.INFO, log_format: str = "%(asctime)-15s %(levelname)-8s %(process)d %(message)s"):
logger = logging.getLogger()
if logger.hasHandlers():
return
script_name = os.path.basename(script_path)
log_path = os.path.abspath(os.path.join(get_project_base_directory(), "logs", f"{os.path.splitext(script_name)[0]}.log"))
log_path = os.path.abspath(os.path.join(get_project_base_directory(), "logs", f"{logfile_basename}.log"))
os.makedirs(os.path.dirname(log_path), exist_ok=True)
logger.setLevel(log_level)
@ -50,5 +49,5 @@ def initRootLogger(script_path: str, log_level: int = logging.INFO, log_format:
handler2.setFormatter(formatter)
logger.addHandler(handler2)
msg = f"{script_name} log path: {log_path}"
msg = f"{logfile_basename} log path: {log_path}"
logger.info(msg)

View File

@ -13,11 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import dotenv
import typing
import subprocess
def get_ragflow_version() -> typing.Optional[str]:
def get_ragflow_version() -> str:
return RAGFLOW_VERSION_INFO
@ -42,7 +40,7 @@ def get_closest_tag_and_count():
return closest_tag
else:
return f"{commit_id}({closest_tag}~{commits_count})"
except Exception as e:
except Exception:
return 'unknown'