From 38234aca5317a25fb7be44b6b7fd40f401f408a3 Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 20 Nov 2025 10:00:14 +0800 Subject: [PATCH] feat: add OceanBase doc engine (#11228) ### What problem does this PR solve? Add OceanBase doc engine. Close #5350 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/settings.py | 7 +- conf/service_conf.yaml | 8 + docker/.env | 22 + docker/README.md | 9 + docker/docker-compose-base.yml | 27 + docker/oceanbase/init.d/vec_memory.sql | 1 + docker/service_conf.yaml.template | 8 + pyproject.toml | 1 + rag/nlp/query.py | 8 +- rag/utils/ob_conn.py | 1562 ++++++++++++++++++++++++ uv.lock | 68 ++ 11 files changed, 1717 insertions(+), 4 deletions(-) create mode 100644 docker/oceanbase/init.d/vec_memory.sql create mode 100644 rag/utils/ob_conn.py diff --git a/common/settings.py b/common/settings.py index 27894b774..ac2b13e00 100644 --- a/common/settings.py +++ b/common/settings.py @@ -27,6 +27,7 @@ from common.constants import SVR_QUEUE_NAME, Storage import rag.utils import rag.utils.es_conn import rag.utils.infinity_conn +import rag.utils.ob_conn import rag.utils.opensearch_conn from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob @@ -103,6 +104,7 @@ INFINITY = {} AZURE = {} S3 = {} MINIO = {} +OB = {} OSS = {} OS = {} @@ -227,7 +229,7 @@ def init_settings(): FEISHU_OAUTH = get_base_config("oauth", {}).get("feishu") OAUTH_CONFIG = get_base_config("oauth", {}) - global DOC_ENGINE, docStoreConn, ES, OS, INFINITY + global DOC_ENGINE, docStoreConn, ES, OB, OS, INFINITY DOC_ENGINE = os.environ.get("DOC_ENGINE", "elasticsearch") # DOC_ENGINE = os.environ.get('DOC_ENGINE', "opensearch") lower_case_doc_engine = DOC_ENGINE.lower() @@ -240,6 +242,9 @@ def init_settings(): elif lower_case_doc_engine == "opensearch": OS = get_base_config("os", {}) docStoreConn = rag.utils.opensearch_conn.OSConnection() + elif lower_case_doc_engine == "oceanbase": + OB = get_base_config("oceanbase", {}) + docStoreConn = rag.utils.ob_conn.OBConnection() else: raise Exception(f"Not supported doc engine: {DOC_ENGINE}") diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index df6290042..ec756a149 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -28,6 +28,14 @@ os: infinity: uri: 'localhost:23817' db_name: 'default_db' +oceanbase: + scheme: 'oceanbase' # set 'mysql' to create connection using mysql config + config: + db_name: 'test' + user: 'root@ragflow' + password: 'infini_rag_flow' + host: 'localhost' + port: 2881 redis: db: 1 password: 'infini_rag_flow' diff --git a/docker/.env b/docker/.env index d024def7e..d7e4b025f 100644 --- a/docker/.env +++ b/docker/.env @@ -7,6 +7,7 @@ # Available options: # - `elasticsearch` (default) # - `infinity` (https://github.com/infiniflow/infinity) +# - `oceanbase` (https://github.com/oceanbase/oceanbase) # - `opensearch` (https://github.com/opensearch-project/OpenSearch) DOC_ENGINE=${DOC_ENGINE:-elasticsearch} @@ -62,6 +63,27 @@ INFINITY_THRIFT_PORT=23817 INFINITY_HTTP_PORT=23820 INFINITY_PSQL_PORT=5432 +# The hostname where the OceanBase service is exposed +OCEANBASE_HOST=oceanbase +# The port used to expose the OceanBase service +OCEANBASE_PORT=2881 +# The username for OceanBase +OCEANBASE_USER=root@ragflow +# The password for OceanBase +OCEANBASE_PASSWORD=infini_rag_flow +# The doc database of the OceanBase service to use +OCEANBASE_DOC_DBNAME=ragflow_doc + +# OceanBase container configuration +OB_CLUSTER_NAME=${OB_CLUSTER_NAME:-ragflow} +OB_TENANT_NAME=${OB_TENANT_NAME:-ragflow} +OB_SYS_PASSWORD=${OCEANBASE_PASSWORD:-infini_rag_flow} +OB_TENANT_PASSWORD=${OCEANBASE_PASSWORD:-infini_rag_flow} +OB_MEMORY_LIMIT=${OB_MEMORY_LIMIT:-10G} +OB_SYSTEM_MEMORY=${OB_SYSTEM_MEMORY:-2G} +OB_DATAFILE_SIZE=${OB_DATAFILE_SIZE:-20G} +OB_LOG_DISK_SIZE=${OB_LOG_DISK_SIZE:-20G} + # The password for MySQL. MYSQL_PASSWORD=infini_rag_flow # The hostname where the MySQL service is exposed diff --git a/docker/README.md b/docker/README.md index bb7586a6b..5de0946b0 100644 --- a/docker/README.md +++ b/docker/README.md @@ -138,6 +138,15 @@ The [.env](./.env) file contains important environment variables for Docker. - `password`: The password for MinIO. - `host`: The MinIO serving IP *and* port inside the Docker container. Defaults to `minio:9000`. +- `oceanbase` + - `scheme`: The connection scheme. Set to `mysql` to use mysql config, or other values to use config below. + - `config`: + - `db_name`: The OceanBase database name. + - `user`: The username for OceanBase. + - `password`: The password for OceanBase. + - `host`: The hostname of the OceanBase service. + - `port`: The port of OceanBase. + - `oss` - `access_key`: The access key ID used to authenticate requests to the OSS service. - `secret_key`: The secret access key used to authenticate requests to the OSS service. diff --git a/docker/docker-compose-base.yml b/docker/docker-compose-base.yml index 049f5f827..05e99e4b2 100644 --- a/docker/docker-compose-base.yml +++ b/docker/docker-compose-base.yml @@ -96,6 +96,31 @@ services: retries: 120 restart: on-failure + oceanbase: + profiles: + - oceanbase + image: oceanbase/oceanbase-ce:4.4.1.0-100000032025101610 + volumes: + - ./oceanbase/data:/root/ob + - ./oceanbase/conf:/root/.obd/cluster + - ./oceanbase/init.d:/root/boot/init.d + ports: + - ${OCEANBASE_PORT:-2881}:2881 + env_file: .env + environment: + - MODE=normal + - OB_SERVER_IP=127.0.0.1 + mem_limit: ${MEM_LIMIT} + healthcheck: + test: [ 'CMD-SHELL', 'obclient -h127.0.0.1 -P2881 -uroot@${OB_TENANT_NAME:-ragflow} -p${OB_TENANT_PASSWORD:-infini_rag_flow} -e "CREATE DATABASE IF NOT EXISTS ${OCEANBASE_DOC_DBNAME:-ragflow_doc};"' ] + interval: 10s + retries: 30 + start_period: 30s + timeout: 10s + networks: + - ragflow + restart: on-failure + sandbox-executor-manager: profiles: - sandbox @@ -256,6 +281,8 @@ volumes: driver: local infinity_data: driver: local + ob_data: + driver: local mysql_data: driver: local minio_data: diff --git a/docker/oceanbase/init.d/vec_memory.sql b/docker/oceanbase/init.d/vec_memory.sql new file mode 100644 index 000000000..f4c283fdf --- /dev/null +++ b/docker/oceanbase/init.d/vec_memory.sql @@ -0,0 +1 @@ +ALTER SYSTEM SET ob_vector_memory_limit_percentage = 30; \ No newline at end of file diff --git a/docker/service_conf.yaml.template b/docker/service_conf.yaml.template index dbefe053a..14b1b71f9 100644 --- a/docker/service_conf.yaml.template +++ b/docker/service_conf.yaml.template @@ -28,6 +28,14 @@ os: infinity: uri: '${INFINITY_HOST:-infinity}:23817' db_name: 'default_db' +oceanbase: + scheme: 'oceanbase' # set 'mysql' to create connection using mysql config + config: + db_name: '${OCEANBASE_DOC_DBNAME:-test}' + user: '${OCEANBASE_USER:-root@ragflow}' + password: '${OCEANBASE_PASSWORD:-infini_rag_flow}' + host: '${OCEANBASE_HOST:-oceanbase}' + port: ${OCEANBASE_PORT:-2881} redis: db: 1 password: '${REDIS_PASSWORD:-infini_rag_flow}' diff --git a/pyproject.toml b/pyproject.toml index aec19be33..64e89528a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -149,6 +149,7 @@ dependencies = [ "captcha>=0.7.1", "pip>=25.2", "pypandoc>=1.16", + "pyobvector==0.2.18", ] [dependency-groups] diff --git a/rag/nlp/query.py b/rag/nlp/query.py index ec3628525..2c203f521 100644 --- a/rag/nlp/query.py +++ b/rag/nlp/query.py @@ -83,6 +83,7 @@ class FulltextQueryer: return txt def question(self, txt, tbl="qa", min_match: float = 0.6): + original_query = txt txt = FulltextQueryer.add_space_between_eng_zh(txt) txt = re.sub( r"[ :|\r\n\t,,。??/`!!&^%%()\[\]{}<>]+", @@ -127,7 +128,7 @@ class FulltextQueryer: q.append(txt) query = " ".join(q) return MatchTextExpr( - self.query_fields, query, 100 + self.query_fields, query, 100, {"original_query": original_query} ), keywords def need_fine_grained_tokenize(tk): @@ -212,7 +213,7 @@ class FulltextQueryer: if not query: query = otxt return MatchTextExpr( - self.query_fields, query, 100, {"minimum_should_match": min_match} + self.query_fields, query, 100, {"minimum_should_match": min_match, "original_query": original_query} ), keywords return None, keywords @@ -259,6 +260,7 @@ class FulltextQueryer: content_tks = [c.strip() for c in content_tks.strip() if c.strip()] tks_w = self.tw.weights(content_tks, preprocess=False) + origin_keywords = keywords.copy() keywords = [f'"{k.strip()}"' for k in keywords] for tk, w in sorted(tks_w, key=lambda x: x[1] * -1)[:keywords_topn]: tk_syns = self.syn.lookup(tk) @@ -274,4 +276,4 @@ class FulltextQueryer: keywords.append(f"{tk}^{w}") return MatchTextExpr(self.query_fields, " ".join(keywords), 100, - {"minimum_should_match": min(3, len(keywords) // 10)}) + {"minimum_should_match": min(3, len(keywords) / 10), "original_query": " ".join(origin_keywords)}) diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py new file mode 100644 index 000000000..6218a8c4e --- /dev/null +++ b/rag/utils/ob_conn.py @@ -0,0 +1,1562 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json +import logging +import os +import re +import time +from typing import Any, Optional + +from elasticsearch_dsl import Q, Search +from pydantic import BaseModel +from pymysql.converters import escape_string +from pyobvector import ObVecClient, FtsIndexParam, FtsParser, ARRAY, VECTOR +from pyobvector.client.hybrid_search import HybridSearch +from pyobvector.util import ObVersion +from sqlalchemy import text, Column, String, Integer, JSON, Double, Row, Table +from sqlalchemy.dialects.mysql import LONGTEXT, TEXT +from sqlalchemy.sql.type_api import TypeEngine + +from common import settings +from common.constants import PAGERANK_FLD, TAG_FLD +from common.decorator import singleton +from common.float_utils import get_float +from rag.nlp import rag_tokenizer +from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr, FusionExpr, MatchTextExpr, \ + MatchDenseExpr + +ATTEMPT_TIME = 2 +OB_QUERY_TIMEOUT = int(os.environ.get("OB_QUERY_TIMEOUT", "100_000_000")) + +logger = logging.getLogger('ragflow.ob_conn') + +column_order_id = Column("_order_id", Integer, nullable=True, comment="chunk order id for maintaining sequence") +column_group_id = Column("group_id", String(256), nullable=True, comment="group id for external retrieval") + +column_definitions: list[Column] = [ + Column("id", String(256), primary_key=True, comment="chunk id"), + Column("kb_id", String(256), nullable=False, index=True, comment="knowledge base id"), + Column("doc_id", String(256), nullable=True, index=True, comment="document id"), + Column("docnm_kwd", String(256), nullable=True, comment="document name"), + Column("doc_type_kwd", String(256), nullable=True, comment="document type"), + Column("title_tks", String(256), nullable=True, comment="title tokens"), + Column("title_sm_tks", String(256), nullable=True, comment="fine-grained (small) title tokens"), + Column("content_with_weight", LONGTEXT, nullable=True, comment="the original content"), + Column("content_ltks", LONGTEXT, nullable=True, comment="long text tokens derived from content_with_weight"), + Column("content_sm_ltks", LONGTEXT, nullable=True, comment="fine-grained (small) tokens derived from content_ltks"), + Column("pagerank_fea", Integer, nullable=True, comment="page rank priority, usually set in kb level"), + Column("important_kwd", ARRAY(String(256)), nullable=True, comment="keywords"), + Column("important_tks", TEXT, nullable=True, comment="keyword tokens"), + Column("question_kwd", ARRAY(String(1024)), nullable=True, comment="questions"), + Column("question_tks", TEXT, nullable=True, comment="question tokens"), + Column("tag_kwd", ARRAY(String(256)), nullable=True, comment="tags"), + Column("tag_feas", JSON, nullable=True, + comment="tag features used for 'rank_feature', format: [tag -> relevance score]"), + Column("available_int", Integer, nullable=False, index=True, server_default="1", + comment="status of availability, 0 for unavailable, 1 for available"), + Column("create_time", String(19), nullable=True, comment="creation time in YYYY-MM-DD HH:MM:SS format"), + Column("create_timestamp_flt", Double, nullable=True, comment="creation timestamp in float format"), + Column("img_id", String(128), nullable=True, comment="image id"), + Column("position_int", ARRAY(ARRAY(Integer)), nullable=True, comment="position"), + Column("page_num_int", ARRAY(Integer), nullable=True, comment="page number"), + Column("top_int", ARRAY(Integer), nullable=True, comment="rank from the top"), + Column("knowledge_graph_kwd", String(256), nullable=True, index=True, comment="knowledge graph chunk type"), + Column("source_id", ARRAY(String(256)), nullable=True, comment="source document id"), + Column("entity_kwd", String(256), nullable=True, comment="entity name"), + Column("entity_type_kwd", String(256), nullable=True, index=True, comment="entity type"), + Column("from_entity_kwd", String(256), nullable=True, comment="the source entity of this edge"), + Column("to_entity_kwd", String(256), nullable=True, comment="the target entity of this edge"), + Column("weight_int", Integer, nullable=True, comment="the weight of this edge"), + Column("weight_flt", Double, nullable=True, comment="the weight of community report"), + Column("entities_kwd", ARRAY(String(256)), nullable=True, comment="node ids of entities"), + Column("rank_flt", Double, nullable=True, comment="rank of this entity"), + Column("removed_kwd", String(256), nullable=True, index=True, server_default="'N'", + comment="whether it has been deleted"), + Column("metadata", JSON, nullable=True, comment="metadata for this chunk"), + Column("extra", JSON, nullable=True, comment="extra information of non-general chunk"), + column_order_id, + column_group_id, +] + +column_names: list[str] = [col.name for col in column_definitions] +column_types: dict[str, TypeEngine] = {col.name: col.type for col in column_definitions} +array_columns: list[str] = [col.name for col in column_definitions if isinstance(col.type, ARRAY)] + +vector_column_pattern = re.compile(r"q_(?P\d+)_vec") + +index_columns: list[str] = [ + "kb_id", + "doc_id", + "available_int", + "knowledge_graph_kwd", + "entity_type_kwd", + "removed_kwd", +] + +fulltext_search_columns: list[str] = [ + "docnm_kwd", + "content_with_weight", + "title_tks", + "title_sm_tks", + "important_tks", + "question_tks", + "content_ltks", + "content_sm_ltks" +] + +fts_columns_origin: list[str] = [ + "docnm_kwd^10", + "content_with_weight", + "important_tks^20", + "question_tks^20", +] + +fts_columns_tks: list[str] = [ + "title_tks^10", + "title_sm_tks^5", + "important_tks^20", + "question_tks^20", + "content_ltks^2", + "content_sm_ltks", +] + +index_name_template = "ix_%s_%s" +fulltext_index_name_template = "fts_idx_%s" +# MATCH AGAINST: https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002017607 +fulltext_search_template = "MATCH (%s) AGAINST ('%s' IN NATURAL LANGUAGE MODE)" +# cosine_distance: https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002012938 +vector_search_template = "cosine_distance(%s, %s)" + + +class SearchResult(BaseModel): + total: int + chunks: list[dict] + + +def get_column_value(column_name: str, value: Any) -> Any: + if column_name in column_types: + column_type = column_types[column_name] + if isinstance(column_type, String): + return str(value) + elif isinstance(column_type, Integer): + return int(value) + elif isinstance(column_type, Double): + return float(value) + elif isinstance(column_type, ARRAY) or isinstance(column_type, JSON): + if isinstance(value, str): + try: + return json.loads(value) + except json.JSONDecodeError: + return value + else: + return value + else: + raise ValueError(f"Unsupported column type for column '{column_name}': {column_type}") + elif vector_column_pattern.match(column_name): + if isinstance(value, str): + try: + return json.loads(value) + except json.JSONDecodeError: + return value + else: + return value + elif column_name == "_score": + return float(value) + else: + raise ValueError(f"Unknown column '{column_name}' with value '{value}'.") + + +def get_default_value(column_name: str) -> Any: + if column_name == "available_int": + return 1 + elif column_name == "removed_kwd": + return "N" + elif column_name == "_order_id": + return 0 + else: + return None + + +def get_value_str(value: Any) -> str: + if isinstance(value, str): + cleaned_str = value.replace('\\', '\\\\') + cleaned_str = cleaned_str.replace('\n', '\\n') + cleaned_str = cleaned_str.replace('\r', '\\r') + cleaned_str = cleaned_str.replace('\t', '\\t') + return f"'{escape_string(cleaned_str)}'" + elif isinstance(value, bool): + return "true" if value else "false" + elif value is None: + return "NULL" + elif isinstance(value, (list, dict)): + json_str = json.dumps(value, ensure_ascii=False) + return f"'{escape_string(json_str)}'" + else: + return str(value) + + +def get_metadata_filter_expression(metadata_filtering_conditions: dict) -> str: + """ + Convert metadata filtering conditions to MySQL JSON path expression. + + Args: + metadata_filtering_conditions: dict with 'conditions' and 'logical_operator' keys + + Returns: + MySQL JSON path expression string + """ + if not metadata_filtering_conditions: + return "" + + conditions = metadata_filtering_conditions.get("conditions", []) + logical_operator = metadata_filtering_conditions.get("logical_operator", "and").upper() + + if not conditions: + return "" + + if logical_operator not in ["AND", "OR"]: + raise ValueError(f"Unsupported logical operator: {logical_operator}. Only 'and' and 'or' are supported.") + + metadata_filters = [] + for condition in conditions: + name = condition.get("name") + comparison_operator = condition.get("comparison_operator") + value = condition.get("value") + + if not all([name, comparison_operator]): + continue + + expr = f"JSON_EXTRACT(metadata, '$.{name}')" + value_str = get_value_str(value) if value else "" + + # Convert comparison operator to MySQL JSON path syntax + if comparison_operator == "is": + # JSON_EXTRACT(metadata, '$.field_name') = 'value' + metadata_filters.append(f"{expr} = {value_str}") + elif comparison_operator == "is not": + metadata_filters.append(f"{expr} != {value_str}") + elif comparison_operator == "contains": + metadata_filters.append(f"JSON_CONTAINS({expr}, {value_str})") + elif comparison_operator == "not contains": + metadata_filters.append(f"NOT JSON_CONTAINS({expr}, {value_str})") + elif comparison_operator == "start with": + metadata_filters.append(f"{expr} LIKE CONCAT({value_str}, '%')") + elif comparison_operator == "end with": + metadata_filters.append(f"{expr} LIKE CONCAT('%', {value_str})") + elif comparison_operator == "empty": + metadata_filters.append(f"({expr} IS NULL OR {expr} = '' OR {expr} = '[]' OR {expr} = '{{}}')") + elif comparison_operator == "not empty": + metadata_filters.append(f"({expr} IS NOT NULL AND {expr} != '' AND {expr} != '[]' AND {expr} != '{{}}')") + # Number operators + elif comparison_operator == "=": + metadata_filters.append(f"CAST({expr} AS DECIMAL(20,10)) = {value_str}") + elif comparison_operator == "≠": + metadata_filters.append(f"CAST({expr} AS DECIMAL(20,10)) != {value_str}") + elif comparison_operator == ">": + metadata_filters.append(f"CAST({expr} AS DECIMAL(20,10)) > {value_str}") + elif comparison_operator == "<": + metadata_filters.append(f"CAST({expr} AS DECIMAL(20,10)) < {value_str}") + elif comparison_operator == "≥": + metadata_filters.append(f"CAST({expr} AS DECIMAL(20,10)) >= {value_str}") + elif comparison_operator == "≤": + metadata_filters.append(f"CAST({expr} AS DECIMAL(20,10)) <= {value_str}") + # Time operators + elif comparison_operator == "before": + metadata_filters.append(f"CAST({expr} AS DATETIME) < {value_str}") + elif comparison_operator == "after": + metadata_filters.append(f"CAST({expr} AS DATETIME) > {value_str}") + else: + logger.warning(f"Unsupported comparison operator: {comparison_operator}") + continue + + if not metadata_filters: + return "" + + return f"({f' {logical_operator} '.join(metadata_filters)})" + + +def get_filters(condition: dict) -> list[str]: + filters: list[str] = [] + for k, v in condition.items(): + if not v: + continue + + if k == "exists": + filters.append(f"{v} IS NOT NULL") + elif k == "must_not" and isinstance(v, dict) and "exists" in v: + filters.append(f"{v.get('exists')} IS NULL") + elif k == "metadata_filtering_conditions": + # Handle metadata filtering conditions + metadata_filter = get_metadata_filter_expression(v) + if metadata_filter: + filters.append(metadata_filter) + elif k in array_columns: + if isinstance(v, list): + array_filters = [] + for vv in v: + array_filters.append(f"array_contains({k}, {get_value_str(vv)})") + array_filter = " OR ".join(array_filters) + filters.append(f"({array_filter})") + else: + filters.append(f"array_contains({k}, {get_value_str(v)})") + elif isinstance(v, list): + values: list[str] = [] + for item in v: + values.append(get_value_str(item)) + value = ", ".join(values) + filters.append(f"{k} IN ({value})") + else: + filters.append(f"{k} = {get_value_str(v)}") + return filters + + +def _try_with_lock(lock_name: str, process_func, check_func, timeout: int = None): + if not timeout: + timeout = int(os.environ.get("OB_DDL_TIMEOUT", "60")) + + if not check_func(): + from rag.utils.redis_conn import RedisDistributedLock + lock = RedisDistributedLock(lock_name) + if lock.acquire(): + logger.info(f"acquired lock success: {lock_name}, start processing.") + try: + process_func() + return + finally: + lock.release() + + if not check_func(): + logger.info(f"Waiting for process complete for {lock_name} on other task executors.") + time.sleep(1) + count = 1 + while count < timeout and not check_func(): + count += 1 + time.sleep(1) + if count >= timeout and not check_func(): + raise Exception(f"Timeout to wait for process complete for {lock_name}.") + + +@singleton +class OBConnection(DocStoreConnection): + def __init__(self): + scheme: str = settings.OB.get("scheme") + ob_config = settings.OB.get("config", {}) + + if scheme and scheme.lower() == "mysql": + mysql_config = settings.get_base_config("mysql", {}) + logger.info("Use MySQL scheme to create OceanBase connection.") + host = mysql_config.get("host", "localhost") + port = mysql_config.get("port", 2881) + self.username = mysql_config.get("user", "root@test") + self.password = mysql_config.get("password", "infini_rag_flow") + else: + logger.info("Use customized config to create OceanBase connection.") + host = ob_config.get("host", "localhost") + port = ob_config.get("port", 2881) + self.username = ob_config.get("user", "root@test") + self.password = ob_config.get("password", "infini_rag_flow") + + self.db_name = ob_config.get("db_name", "test") + self.uri = f"{host}:{port}" + + logger.info(f"Use OceanBase '{self.uri}' as the doc engine.") + + for _ in range(ATTEMPT_TIME): + try: + self.client = ObVecClient( + uri=self.uri, + user=self.username, + password=self.password, + db_name=self.db_name, + pool_pre_ping=True, + pool_recycle=3600, + ) + break + except Exception as e: + logger.warning(f"{str(e)}. Waiting OceanBase {self.uri} to be healthy.") + time.sleep(5) + + if self.client is None: + msg = f"OceanBase {self.uri} connection failed after {ATTEMPT_TIME} attempts." + logger.error(msg) + raise Exception(msg) + + self._load_env_vars() + self._check_ob_version() + self._try_to_update_ob_query_timeout() + + logger.info(f"OceanBase {self.uri} is healthy.") + + def _check_ob_version(self): + try: + res = self.client.perform_raw_text_sql("SELECT OB_VERSION() FROM DUAL").fetchone() + version_str = res[0] if res else None + logger.info(f"OceanBase {self.uri} version is {version_str}") + except Exception as e: + raise Exception(f"Failed to get OceanBase version from {self.uri}, error: {str(e)}") + + if not version_str: + raise Exception(f"Failed to get OceanBase version from {self.uri}.") + + ob_version = ObVersion.from_db_version_string(version_str) + if ob_version < ObVersion.from_db_version_nums(4, 3, 5, 1): + raise Exception( + f"The version of OceanBase needs to be higher than or equal to 4.3.5.1, current version is {version_str}" + ) + + self.es = None + if not ob_version < ObVersion.from_db_version_nums(4, 4, 1, 0) and self.enable_hybrid_search: + self.es = HybridSearch( + uri=self.uri, + user=self.username, + password=self.password, + db_name=self.db_name, + pool_pre_ping=True, + pool_recycle=3600, + ) + logger.info("OceanBase Hybrid Search feature is enabled") + + def _try_to_update_ob_query_timeout(self): + try: + val = self._get_variable_value("ob_query_timeout") + if val and int(val) >= OB_QUERY_TIMEOUT: + return + except Exception as e: + logger.warning("Failed to get 'ob_query_timeout' variable: %s", str(e)) + + try: + self.client.perform_raw_text_sql(f"SET GLOBAL ob_query_timeout={OB_QUERY_TIMEOUT}") + logger.info("Set GLOBAL variable 'ob_query_timeout' to %d.", OB_QUERY_TIMEOUT) + + # refresh connection pool to ensure 'ob_query_timeout' has taken effect + self.client.engine.dispose() + if self.es is not None: + self.es.engine.dispose() + logger.info("Disposed all connections in engine pool to refresh connection pool") + except Exception as e: + logger.warning(f"Failed to set 'ob_query_timeout' variable: {str(e)}") + + def _load_env_vars(self): + + def is_true(var: str, default: str) -> bool: + return os.getenv(var, default).lower() in ['true', '1', 'yes', 'y'] + + self.enable_fulltext_search = is_true('ENABLE_FULLTEXT_SEARCH', 'true') + self.use_fulltext_hint = is_true('USE_FULLTEXT_HINT', 'true') + self.search_original_content = is_true("SEARCH_ORIGINAL_CONTENT", 'true') + self.enable_hybrid_search = is_true('ENABLE_HYBRID_SEARCH', 'false') + + """ + Database operations + """ + + def dbType(self) -> str: + return "oceanbase" + + def health(self) -> dict: + return { + "uri": self.uri, + "version_comment": self._get_variable_value("version_comment") + } + + def _get_variable_value(self, var_name: str) -> Any: + rows = self.client.perform_raw_text_sql(f"SHOW VARIABLES LIKE '{var_name}'") + for row in rows: + return row[1] + raise Exception(f"Variable '{var_name}' not found.") + + """ + Table operations + """ + + def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int): + vector_field_name = f"q_{vectorSize}_vec" + vector_index_name = f"{vector_field_name}_idx" + + try: + _try_with_lock( + lock_name=f"ob_create_table_{indexName}", + check_func=lambda: self.client.check_table_exists(indexName), + process_func=lambda: self._create_table(indexName), + ) + + for column_name in index_columns: + _try_with_lock( + lock_name=f"ob_add_idx_{indexName}_{column_name}", + check_func=lambda: self._index_exists(indexName, index_name_template % (indexName, column_name)), + process_func=lambda: self._add_index(indexName, column_name), + ) + + fts_columns = fts_columns_origin if self.search_original_content else fts_columns_tks + for fts_column in fts_columns: + column_name = fts_column.split("^")[0] + _try_with_lock( + lock_name=f"ob_add_fulltext_idx_{indexName}_{column_name}", + check_func=lambda: self._index_exists(indexName, fulltext_index_name_template % column_name), + process_func=lambda: self._add_fulltext_index(indexName, column_name), + ) + + _try_with_lock( + lock_name=f"ob_add_vector_column_{indexName}_{vector_field_name}", + check_func=lambda: self._column_exist(indexName, vector_field_name), + process_func=lambda: self._add_vector_column(indexName, vectorSize), + ) + + _try_with_lock( + lock_name=f"ob_add_vector_idx_{indexName}_{vector_field_name}", + check_func=lambda: self._index_exists(indexName, vector_index_name), + process_func=lambda: self._add_vector_index(indexName, vector_field_name), + ) + + # new columns migration + for column in [column_order_id, column_group_id]: + _try_with_lock( + lock_name=f"ob_add_{column.name}_{indexName}", + check_func=lambda: self._column_exist(indexName, column.name), + process_func=lambda: self._add_column(indexName, column), + ) + except Exception as e: + raise Exception(f"OBConnection.createIndex error: {str(e)}") + finally: + # always refresh metadata to make sure it contains the latest table structure + self.client.refresh_metadata([indexName]) + + def deleteIdx(self, indexName: str, knowledgebaseId: str): + if len(knowledgebaseId) > 0: + # The index need to be alive after any kb deletion since all kb under this tenant are in one index. + return + try: + if self.client.check_table_exists(table_name=indexName): + self.client.drop_table_if_exist(indexName) + logger.info(f"Dropped table '{indexName}'.") + except Exception as e: + raise Exception(f"OBConnection.deleteIndex error: {str(e)}") + + def indexExist(self, indexName: str, knowledgebaseId: str = None) -> bool: + try: + if not self.client.check_table_exists(indexName): + return False + for column_name in index_columns: + if not self._index_exists(indexName, index_name_template % (indexName, column_name)): + return False + fts_columns = fts_columns_origin if self.search_original_content else fts_columns_tks + for fts_column in fts_columns: + column_name = fts_column.split("^")[0] + if not self._index_exists(indexName, fulltext_index_name_template % column_name): + return False + for column in [column_order_id, column_group_id]: + if not self._column_exist(indexName, column.name): + return False + except Exception as e: + raise Exception(f"OBConnection.indexExist error: {str(e)}") + + return True + + def _get_count(self, table_name: str, filter_list: list[str] = None) -> int: + where_clause = "WHERE " + " AND ".join(filter_list) if len(filter_list) > 0 else "" + (count,) = self.client.perform_raw_text_sql( + f"SELECT COUNT(*) FROM {table_name} {where_clause}" + ).fetchone() + return count + + def _column_exist(self, table_name: str, column_name: str) -> bool: + return self._get_count( + table_name="INFORMATION_SCHEMA.COLUMNS", + filter_list=[ + f"TABLE_SCHEMA = '{self.db_name}'", + f"TABLE_NAME = '{table_name}'", + f"COLUMN_NAME = '{column_name}'", + ]) > 0 + + def _index_exists(self, table_name: str, index_name: str) -> bool: + return self._get_count( + table_name="INFORMATION_SCHEMA.STATISTICS", + filter_list=[ + f"TABLE_SCHEMA = '{self.db_name}'", + f"TABLE_NAME = '{table_name}'", + f"INDEX_NAME = '{index_name}'", + ]) > 0 + + def _create_table(self, table_name: str): + # remove outdated metadata for external changes + if table_name in self.client.metadata_obj.tables: + self.client.metadata_obj.remove(Table(table_name, self.client.metadata_obj)) + + table_options = { + "mysql_charset": "utf8mb4", + "mysql_collate": "utf8mb4_unicode_ci", + "mysql_organization": "heap", + } + + self.client.create_table( + table_name=table_name, + columns=column_definitions, + **table_options, + ) + logger.info(f"Created table '{table_name}'.") + + def _add_index(self, table_name: str, column_name: str): + index_name = index_name_template % (table_name, column_name) + self.client.create_index( + table_name=table_name, + is_vec_index=False, + index_name=index_name, + column_names=[column_name], + ) + logger.info(f"Created index '{index_name}' on table '{table_name}'.") + + def _add_fulltext_index(self, table_name: str, column_name: str): + fulltext_index_name = fulltext_index_name_template % column_name + self.client.create_fts_idx_with_fts_index_param( + table_name=table_name, + fts_idx_param=FtsIndexParam( + index_name=fulltext_index_name, + field_names=[column_name], + parser_type=FtsParser.IK, + ), + ) + logger.info(f"Created full text index '{fulltext_index_name}' on table '{table_name}'.") + + def _add_vector_column(self, table_name: str, vector_size: int): + vector_field_name = f"q_{vector_size}_vec" + + self.client.add_columns( + table_name=table_name, + columns=[Column(vector_field_name, VECTOR(vector_size), nullable=True)], + ) + logger.info(f"Added vector column '{vector_field_name}' to table '{table_name}'.") + + def _add_vector_index(self, table_name: str, vector_field_name: str): + vector_index_name = f"{vector_field_name}_idx" + self.client.create_index( + table_name=table_name, + is_vec_index=True, + index_name=vector_index_name, + column_names=[vector_field_name], + vidx_params="distance=cosine, type=hnsw, lib=vsag", + ) + logger.info( + f"Created vector index '{vector_index_name}' on table '{table_name}' with column '{vector_field_name}'." + ) + + def _add_column(self, table_name: str, column: Column): + try: + self.client.add_columns( + table_name=table_name, + columns=[column], + ) + logger.info(f"Added column '{column.name}' to table '{table_name}'.") + except Exception as e: + logger.warning(f"Failed to add column '{column.name}' to table '{table_name}': {str(e)}") + + """ + CRUD operations + """ + + def search( + self, + selectFields: list[str], + highlightFields: list[str], + condition: dict, + matchExprs: list[MatchExpr], + orderBy: OrderByExpr, + offset: int, + limit: int, + indexNames: str | list[str], + knowledgebaseIds: list[str], + aggFields: list[str] = [], + rank_feature: dict | None = None, + **kwargs, + ): + if isinstance(indexNames, str): + indexNames = indexNames.split(",") + assert isinstance(indexNames, list) and len(indexNames) > 0 + indexNames = list(set(indexNames)) + + if len(matchExprs) == 3: + if not self.enable_fulltext_search: + # disable fulltext search in fusion search, which means fallback to vector search + matchExprs = [m for m in matchExprs if isinstance(m, MatchDenseExpr)] + else: + for m in matchExprs: + if isinstance(m, FusionExpr): + weights = m.fusion_params["weights"] + vector_similarity_weight = get_float(weights.split(",")[1]) + # skip the search if its weight is zero + if vector_similarity_weight <= 0.0: + matchExprs = [m for m in matchExprs if isinstance(m, MatchTextExpr)] + elif vector_similarity_weight >= 1.0: + matchExprs = [m for m in matchExprs if isinstance(m, MatchDenseExpr)] + + result: SearchResult = SearchResult( + total=0, + chunks=[], + ) + + # copied from es_conn.py + if len(matchExprs) == 3 and self.es: + bqry = Q("bool", must=[]) + condition["kb_id"] = knowledgebaseIds + for k, v in condition.items(): + if k == "available_int": + if v == 0: + bqry.filter.append(Q("range", available_int={"lt": 1})) + else: + bqry.filter.append( + Q("bool", must_not=Q("range", available_int={"lt": 1}))) + continue + if not v: + continue + if isinstance(v, list): + bqry.filter.append(Q("terms", **{k: v})) + elif isinstance(v, str) or isinstance(v, int): + bqry.filter.append(Q("term", **{k: v})) + else: + raise Exception( + f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.") + + s = Search() + vector_similarity_weight = 0.5 + for m in matchExprs: + if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params: + assert len(matchExprs) == 3 and isinstance(matchExprs[0], MatchTextExpr) and isinstance( + matchExprs[1], + MatchDenseExpr) and isinstance( + matchExprs[2], FusionExpr) + weights = m.fusion_params["weights"] + vector_similarity_weight = get_float(weights.split(",")[1]) + for m in matchExprs: + if isinstance(m, MatchTextExpr): + minimum_should_match = m.extra_options.get("minimum_should_match", 0.0) + if isinstance(minimum_should_match, float): + minimum_should_match = str(int(minimum_should_match * 100)) + "%" + bqry.must.append(Q("query_string", fields=fts_columns_tks, + type="best_fields", query=m.matching_text, + minimum_should_match=minimum_should_match, + boost=1)) + bqry.boost = 1.0 - vector_similarity_weight + + elif isinstance(m, MatchDenseExpr): + assert (bqry is not None) + similarity = 0.0 + if "similarity" in m.extra_options: + similarity = m.extra_options["similarity"] + s = s.knn(m.vector_column_name, + m.topn, + m.topn * 2, + query_vector=list(m.embedding_data), + filter=bqry.to_dict(), + similarity=similarity, + ) + + if bqry and rank_feature: + for fld, sc in rank_feature.items(): + if fld != PAGERANK_FLD: + fld = f"{TAG_FLD}.{fld}" + bqry.should.append(Q("rank_feature", field=fld, linear={}, boost=sc)) + + if bqry: + s = s.query(bqry) + # for field in highlightFields: + # s = s.highlight(field) + + if orderBy: + orders = list() + for field, order in orderBy.fields: + order = "asc" if order == 0 else "desc" + if field in ["page_num_int", "top_int"]: + order_info = {"order": order, "unmapped_type": "float", + "mode": "avg", "numeric_type": "double"} + elif field.endswith("_int") or field.endswith("_flt"): + order_info = {"order": order, "unmapped_type": "float"} + else: + order_info = {"order": order, "unmapped_type": "text"} + orders.append({field: order_info}) + s = s.sort(*orders) + + for fld in aggFields: + s.aggs.bucket(f'aggs_{fld}', 'terms', field=fld, size=1000000) + + if limit > 0: + s = s[offset:offset + limit] + q = s.to_dict() + logger.debug(f"OBConnection.hybrid_search {str(indexNames)} query: " + json.dumps(q)) + + for index_name in indexNames: + start_time = time.time() + res = self.es.search(index=index_name, + body=q, + timeout="600s", + track_total_hits=True, + _source=True) + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: hybrid, elapsed time: {elapsed_time:.3f} seconds," + f" got count: {len(res)}" + ) + for chunk in res: + result.chunks.append(self._es_row_to_entity(chunk)) + result.total = result.total + 1 + return result + + output_fields = selectFields.copy() + if "id" not in output_fields: + output_fields = ["id"] + output_fields + if "_score" in output_fields: + output_fields.remove("_score") + + if highlightFields: + for field in highlightFields: + if field not in output_fields: + output_fields.append(field) + + fields_expr = ", ".join(output_fields) + + condition["kb_id"] = knowledgebaseIds + filters: list[str] = get_filters(condition) + filters_expr = " AND ".join(filters) + + fulltext_query: Optional[str] = None + fulltext_topn: Optional[int] = None + fulltext_search_weight: dict[str, float] = {} + fulltext_search_expr: dict[str, str] = {} + fulltext_search_idx_list: list[str] = [] + fulltext_search_score_expr: Optional[str] = None + fulltext_search_filter: Optional[str] = None + + vector_column_name: Optional[str] = None + vector_data: Optional[list[float]] = None + vector_topn: Optional[int] = None + vector_similarity_threshold: Optional[float] = None + vector_similarity_weight: Optional[float] = None + vector_search_expr: Optional[str] = None + vector_search_score_expr: Optional[str] = None + vector_search_filter: Optional[str] = None + + for m in matchExprs: + if isinstance(m, MatchTextExpr): + assert "original_query" in m.extra_options, "'original_query' is missing in extra_options." + fulltext_query = m.extra_options["original_query"] + fulltext_query = escape_string(fulltext_query.strip()) + fulltext_topn = m.topn + + fts_columns = fts_columns_origin if self.search_original_content else fts_columns_tks + + # get fulltext match expression and weight values + for field in fts_columns: + parts = field.split("^") + column_name: str = parts[0] + column_weight: float = float(parts[1]) if (len(parts) > 1 and parts[1]) else 1.0 + + fulltext_search_weight[column_name] = column_weight + fulltext_search_expr[column_name] = fulltext_search_template % (column_name, fulltext_query) + fulltext_search_idx_list.append(fulltext_index_name_template % column_name) + + # adjust the weight to 0~1 + weight_sum = sum(fulltext_search_weight.values()) + for column_name in fulltext_search_weight.keys(): + fulltext_search_weight[column_name] = fulltext_search_weight[column_name] / weight_sum + + elif isinstance(m, MatchDenseExpr): + assert m.embedding_data_type == "float", f"embedding data type '{m.embedding_data_type}' is not float." + vector_column_name = m.vector_column_name + vector_data = m.embedding_data + vector_topn = m.topn + vector_similarity_threshold = m.extra_options.get("similarity", 0.0) + elif isinstance(m, FusionExpr): + weights = m.fusion_params["weights"] + vector_similarity_weight = get_float(weights.split(",")[1]) + + if fulltext_query: + fulltext_search_filter = f"({' OR '.join([expr for expr in fulltext_search_expr.values()])})" + fulltext_search_score_expr = f"({' + '.join(f'{expr} * {fulltext_search_weight.get(col, 0)}' for col, expr in fulltext_search_expr.items())})" + + if vector_data: + vector_search_expr = vector_search_template % (vector_column_name, vector_data) + # use (1 - cosine_distance) as score, which should be [-1, 1] + # https://www.oceanbase.com/docs/common-oceanbase-database-standalone-1000000003577323 + vector_search_score_expr = f"(1 - {vector_search_expr})" + vector_search_filter = f"{vector_search_score_expr} >= {vector_similarity_threshold}" + + pagerank_score_expr = f"(CAST(IFNULL({PAGERANK_FLD}, 0) AS DECIMAL(10, 2)) / 100)" + + # TODO use tag rank_feature in sorting + # tag_rank_fea = {k: float(v) for k, v in (rank_feature or {}).items() if k != PAGERANK_FLD} + + if fulltext_query and vector_data: + search_type = "fusion" + elif fulltext_query: + search_type = "fulltext" + elif vector_data: + search_type = "vector" + elif len(aggFields) > 0: + search_type = "aggregation" + else: + search_type = "filter" + + if search_type in ["fusion", "fulltext", "vector"] and "_score" not in output_fields: + output_fields.append("_score") + + group_results = kwargs.get("group_results", False) + + for index_name in indexNames: + + if not self.client.check_table_exists(index_name): + continue + + fulltext_search_hint = f"/*+ UNION_MERGE({index_name} {' '.join(fulltext_search_idx_list)}) */" if self.use_fulltext_hint else "" + + if search_type == "fusion": + # fusion search, usually for chat + num_candidates = vector_topn + fulltext_topn + if group_results: + count_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")," + f" scored_results AS (" + f" SELECT *" + f" FROM fulltext_results" + f" WHERE {vector_search_filter}" + f")," + f" group_results AS (" + f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id) as rn" + f" FROM scored_results" + f")" + f" SELECT COUNT(*)" + f" FROM group_results" + f" WHERE rn = 1" + ) + else: + count_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")" + f" SELECT COUNT(*) FROM fulltext_results WHERE {vector_search_filter}" + ) + logger.debug("OBConnection.search with count sql: %s", count_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(count_sql) + total_count = res.fetchone()[0] if res else 0 + result.total += total_count + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: fusion, step: 1-count, elapsed time: {elapsed_time:.3f} seconds," + f" vector column: '{vector_column_name}'," + f" query text: '{fulltext_query}'," + f" condition: '{condition}'," + f" vector_similarity_threshold: {vector_similarity_threshold}," + f" got count: {total_count}" + ) + + if total_count == 0: + continue + + score_expr = f"(relevance * {1 - vector_similarity_weight} + {vector_search_score_expr} * {vector_similarity_weight} + {pagerank_score_expr})" + if group_results: + fusion_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")," + f" scored_results AS (" + f" SELECT *, {score_expr} AS _score" + f" FROM fulltext_results" + f" WHERE {vector_search_filter}" + f")," + f" group_results AS (" + f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" + f" FROM scored_results" + f")" + f" SELECT {fields_expr}, _score" + f" FROM group_results" + f" WHERE rn = 1" + f" ORDER BY _score DESC" + f" LIMIT {offset}, {limit}" + ) + else: + fusion_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")" + f" SELECT {fields_expr}, {score_expr} AS _score" + f" FROM fulltext_results" + f" WHERE {vector_search_filter}" + f" ORDER BY _score DESC" + f" LIMIT {offset}, {limit}" + ) + logger.debug("OBConnection.search with fusion sql: %s", fusion_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(fusion_sql) + rows = res.fetchall() + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: fusion, step: 2-query, elapsed time: {elapsed_time:.3f} seconds," + f" select fields: '{output_fields}'," + f" vector column: '{vector_column_name}'," + f" query text: '{fulltext_query}'," + f" condition: '{condition}'," + f" vector_similarity_threshold: {vector_similarity_threshold}," + f" vector_similarity_weight: {vector_similarity_weight}," + f" return rows count: {len(rows)}" + ) + + for row in rows: + result.chunks.append(self._row_to_entity(row, output_fields)) + elif search_type == "vector": + # vector search, usually used for graph search + count_sql = f"SELECT COUNT(id) FROM {index_name} WHERE {filters_expr} AND {vector_search_filter}" + logger.debug("OBConnection.search with vector count sql: %s", count_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(count_sql) + total_count = res.fetchone()[0] if res else 0 + result.total += total_count + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: vector, step: 1-count, elapsed time: {elapsed_time:.3f} seconds," + f" vector column: '{vector_column_name}'," + f" condition: '{condition}'," + f" vector_similarity_threshold: {vector_similarity_threshold}," + f" got count: {total_count}" + ) + + if total_count == 0: + continue + + vector_sql = ( + f"SELECT {fields_expr}, {vector_search_score_expr} AS _score" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {vector_search_filter}" + f" ORDER BY {vector_search_expr}" + f" APPROXIMATE LIMIT {limit if limit != 0 else vector_topn}" + ) + if offset != 0: + vector_sql += f" OFFSET {offset}" + logger.debug("OBConnection.search with vector sql: %s", vector_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(vector_sql) + rows = res.fetchall() + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: vector, step: 2-query, elapsed time: {elapsed_time:.3f} seconds," + f" select fields: '{output_fields}'," + f" vector column: '{vector_column_name}'," + f" condition: '{condition}'," + f" vector_similarity_threshold: {vector_similarity_threshold}," + f" return rows count: {len(rows)}" + ) + + for row in rows: + result.chunks.append(self._row_to_entity(row, output_fields)) + elif search_type == "fulltext": + # fulltext search, usually used to search chunks in one dataset + count_sql = f"SELECT {fulltext_search_hint} COUNT(id) FROM {index_name} WHERE {filters_expr} AND {fulltext_search_filter}" + logger.debug("OBConnection.search with fulltext count sql: %s", count_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(count_sql) + total_count = res.fetchone()[0] if res else 0 + result.total += total_count + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: fulltext, step: 1-count, elapsed time: {elapsed_time:.3f} seconds," + f" query text: '{fulltext_query}'," + f" condition: '{condition}'," + f" got count: {total_count}" + ) + + if total_count == 0: + continue + + fulltext_sql = ( + f"SELECT {fulltext_search_hint} {fields_expr}, {fulltext_search_score_expr} AS _score" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY _score DESC" + f" LIMIT {offset}, {limit if limit != 0 else fulltext_topn}" + ) + logger.debug("OBConnection.search with fulltext sql: %s", fulltext_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(fulltext_sql) + rows = res.fetchall() + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: fulltext, step: 2-query, elapsed time: {elapsed_time:.3f} seconds," + f" select fields: '{output_fields}'," + f" query text: '{fulltext_query}'," + f" condition: '{condition}'," + f" return rows count: {len(rows)}" + ) + + for row in rows: + result.chunks.append(self._row_to_entity(row, output_fields)) + elif search_type == "aggregation": + # aggregation search + assert len(aggFields) == 1, "Only one aggregation field is supported in OceanBase." + agg_field = aggFields[0] + if agg_field in array_columns: + res = self.client.perform_raw_text_sql( + f"SELECT {agg_field} FROM {index_name}" + f" WHERE {agg_field} IS NOT NULL AND {filters_expr}" + ) + counts = {} + for row in res: + if row[0]: + if isinstance(row[0], str): + try: + arr = json.loads(row[0]) + except json.JSONDecodeError: + logger.warning(f"Failed to parse JSON array: {row[0]}") + continue + else: + arr = row[0] + + if isinstance(arr, list): + for v in arr: + if isinstance(v, str) and v.strip(): + counts[v] = counts.get(v, 0) + 1 + + for v, count in counts.items(): + result.chunks.append({ + "value": v, + "count": count, + }) + result.total += len(counts) + else: + res = self.client.perform_raw_text_sql( + f"SELECT {agg_field}, COUNT(*) as count FROM {index_name}" + f" WHERE {agg_field} IS NOT NULL AND {filters_expr}" + f" GROUP BY {agg_field}" + ) + for row in res: + result.chunks.append({ + "value": row[0], + "count": int(row[1]), + }) + result.total += 1 + else: + # only filter + orders: list[str] = [] + if orderBy: + for field, order in orderBy.fields: + if isinstance(column_types[field], ARRAY): + f = field + "_sort" + fields_expr += f", array_to_string({field}, ',') AS {f}" + field = f + order = "ASC" if order == 0 else "DESC" + orders.append(f"{field} {order}") + count_sql = f"SELECT COUNT(id) FROM {index_name} WHERE {filters_expr}" + logger.debug("OBConnection.search with normal count sql: %s", count_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(count_sql) + total_count = res.fetchone()[0] if res else 0 + result.total += total_count + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: normal, step: 1-count, elapsed time: {elapsed_time:.3f} seconds," + f" condition: '{condition}'," + f" got count: {total_count}" + ) + + if total_count == 0: + continue + + order_by_expr = ("ORDER BY " + ", ".join(orders)) if len(orders) > 0 else "" + limit_expr = f"LIMIT {offset}, {limit}" if limit != 0 else "" + filter_sql = ( + f"SELECT {fields_expr}" + f" FROM {index_name}" + f" WHERE {filters_expr}" + f" {order_by_expr} {limit_expr}" + ) + logger.debug("OBConnection.search with normal sql: %s", filter_sql) + + start_time = time.time() + + res = self.client.perform_raw_text_sql(filter_sql) + rows = res.fetchall() + + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: normal, step: 2-query, elapsed time: {elapsed_time:.3f} seconds," + f" select fields: '{output_fields}'," + f" condition: '{condition}'," + f" return rows count: {len(rows)}" + ) + + for row in rows: + result.chunks.append(self._row_to_entity(row, output_fields)) + return result + + def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None: + if not self.client.check_table_exists(indexName): + return None + + try: + res = self.client.get( + table_name=indexName, + ids=[chunkId], + ) + row = res.fetchone() + if row is None: + raise Exception(f"ChunkId {chunkId} not found in index {indexName}.") + + return self._row_to_entity(row, fields=list(res.keys())) + except json.JSONDecodeError as e: + logger.error(f"JSON decode error when getting chunk {chunkId}: {str(e)}") + return { + "id": chunkId, + "error": f"Failed to parse chunk data due to invalid JSON: {str(e)}" + } + except Exception as e: + logger.error(f"Error getting chunk {chunkId}: {str(e)}") + raise + + def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]: + if not documents: + return [] + + docs: list[dict] = [] + ids: list[str] = [] + for document in documents: + d: dict = {} + for k, v in document.items(): + if vector_column_pattern.match(k): + d[k] = v + continue + if k not in column_names: + if "extra" not in d: + d["extra"] = {} + d["extra"][k] = v + continue + if v is None: + d[k] = get_default_value(k) + continue + + if k == "kb_id" and isinstance(v, list): + d[k] = v[0] + elif k == "content_with_weight" and isinstance(v, dict): + d[k] = json.dumps(v, ensure_ascii=False) + elif k == "position_int": + d[k] = json.dumps([list(vv) for vv in v], ensure_ascii=False) + elif isinstance(v, list): + # remove characters like '\t' for JSON dump and clean special characters + cleaned_v = [] + for vv in v: + if isinstance(vv, str): + cleaned_str = vv.strip() + cleaned_str = cleaned_str.replace('\\', '\\\\') + cleaned_str = cleaned_str.replace('\n', '\\n') + cleaned_str = cleaned_str.replace('\r', '\\r') + cleaned_str = cleaned_str.replace('\t', '\\t') + cleaned_v.append(cleaned_str) + else: + cleaned_v.append(vv) + d[k] = json.dumps(cleaned_v, ensure_ascii=False) + else: + d[k] = v + + ids.append(d["id"]) + # this is to fix https://github.com/sqlalchemy/sqlalchemy/issues/9703 + for column_name in column_names: + if column_name not in d: + d[column_name] = get_default_value(column_name) + + metadata = d.get("metadata", {}) + if metadata is None: + metadata = {} + group_id = metadata.get("_group_id") + title = metadata.get("_title") + if d.get("doc_id"): + if group_id: + d["group_id"] = group_id + else: + d["group_id"] = d["doc_id"] + if title: + d["docnm_kwd"] = title + + docs.append(d) + + logger.debug("OBConnection.insert chunks: %s", docs) + + res = [] + try: + self.client.upsert(indexName, docs) + except Exception as e: + logger.error(f"OBConnection.insert error: {str(e)}") + res.append(str(e)) + return res + + def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool: + if not self.client.check_table_exists(indexName): + return True + + condition["kb_id"] = knowledgebaseId + filters = get_filters(condition) + set_values: list[str] = [] + for k, v in newValue.items(): + if k == "remove": + if isinstance(v, str): + set_values.append(f"{v} = NULL") + else: + assert isinstance(v, dict), f"Expected str or dict for 'remove', got {type(newValue[k])}." + for kk, vv in v.items(): + assert kk in array_columns, f"Column '{kk}' is not an array column." + set_values.append(f"{kk} = array_remove({kk}, {get_value_str(vv)})") + elif k == "add": + assert isinstance(v, dict), f"Expected str or dict for 'add', got {type(newValue[k])}." + for kk, vv in v.items(): + assert kk in array_columns, f"Column '{kk}' is not an array column." + set_values.append(f"{kk} = array_append({kk}, {get_value_str(vv)})") + elif k == "metadata": + assert isinstance(v, dict), f"Expected dict for 'metadata', got {type(newValue[k])}" + set_values.append(f"{k} = {get_value_str(v)}") + if v and "doc_id" in condition: + group_id = v.get("_group_id") + title = v.get("_title") + if group_id: + set_values.append(f"group_id = {get_value_str(group_id)}") + if title: + set_values.append(f"docnm_kwd = {get_value_str(title)}") + else: + set_values.append(f"{k} = {get_value_str(v)}") + + if not set_values: + return True + + update_sql = ( + f"UPDATE {indexName}" + f" SET {', '.join(set_values)}" + f" WHERE {' AND '.join(filters)}" + ) + logger.debug("OBConnection.update sql: %s", update_sql) + + try: + self.client.perform_raw_text_sql(update_sql) + return True + except Exception as e: + logger.error(f"OBConnection.update error: {str(e)}") + return False + + def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: + if not self.client.check_table_exists(indexName): + return 0 + + condition["kb_id"] = knowledgebaseId + try: + res = self.client.get( + table_name=indexName, + ids=None, + where_clause=[text(f) for f in get_filters(condition)], + output_column_name=["id"], + ) + rows = res.fetchall() + if len(rows) == 0: + return 0 + ids = [row[0] for row in rows] + logger.debug(f"OBConnection.delete chunks, filters: {condition}, ids: {ids}") + self.client.delete( + table_name=indexName, + ids=ids, + ) + return len(ids) + except Exception as e: + logger.error(f"OBConnection.delete error: {str(e)}") + return 0 + + @staticmethod + def _row_to_entity(data: Row, fields: list[str]) -> dict: + entity = {} + for i, field in enumerate(fields): + value = data[i] + if value is None: + continue + entity[field] = get_column_value(field, value) + return entity + + @staticmethod + def _es_row_to_entity(data: dict) -> dict: + entity = {} + for k, v in data.items(): + if v is None: + continue + entity[k] = get_column_value(k, v) + return entity + + """ + Helper functions for search result + """ + + def get_total(self, res) -> int: + return res.total + + def get_chunk_ids(self, res) -> list[str]: + return [row["id"] for row in res.chunks] + + def get_fields(self, res, fields: list[str]) -> dict[str, dict]: + result = {} + for row in res.chunks: + data = {} + for field in fields: + v = row.get(field) + if v is not None: + data[field] = v + result[row["id"]] = data + return result + + # copied from query.FulltextQueryer + def is_chinese(self, line): + arr = re.split(r"[ \t]+", line) + if len(arr) <= 3: + return True + e = 0 + for t in arr: + if not re.match(r"[a-zA-Z]+$", t): + e += 1 + return e * 1.0 / len(arr) >= 0.7 + + def highlight(self, txt: str, tks: str, question: str, keywords: list[str]) -> Optional[str]: + if not txt or not keywords: + return None + + highlighted_txt = txt + + if question and not self.is_chinese(question): + highlighted_txt = re.sub( + r"(^|\W)(%s)(\W|$)" % re.escape(question), + r"\1\2\3", highlighted_txt, + flags=re.IGNORECASE | re.MULTILINE, + ) + if re.search(r"[^<>]+", highlighted_txt, flags=re.IGNORECASE | re.MULTILINE): + return highlighted_txt + + for keyword in keywords: + highlighted_txt = re.sub( + r"(^|\W)(%s)(\W|$)" % re.escape(keyword), + r"\1\2\3", highlighted_txt, + flags=re.IGNORECASE | re.MULTILINE, + ) + if len(re.findall(r'', highlighted_txt)) > 0 or len( + re.findall(r'\s*', highlighted_txt)) > 0: + return highlighted_txt + else: + return None + + if not tks: + tks = rag_tokenizer.tokenize(txt) + tokens = tks.split() + if not tokens: + return None + + last_pos = len(txt) + + for i in range(len(tokens) - 1, -1, -1): + token = tokens[i] + token_pos = highlighted_txt.rfind(token, 0, last_pos) + if token_pos != -1: + if token in keywords: + highlighted_txt = ( + highlighted_txt[:token_pos] + + f'{token}' + + highlighted_txt[token_pos + len(token):] + ) + last_pos = token_pos + return re.sub(r'', '', highlighted_txt) + + def get_highlight(self, res, keywords: list[str], fieldnm: str): + ans = {} + if len(res.chunks) == 0 or len(keywords) == 0: + return ans + + for d in res.chunks: + txt = d.get(fieldnm) + if not txt: + continue + + tks = d.get("content_ltks") if fieldnm == "content_with_weight" else "" + highlighted_txt = self.highlight(txt, tks, " ".join(keywords), keywords) + if highlighted_txt: + ans[d["id"]] = highlighted_txt + return ans + + def get_aggregation(self, res, fieldnm: str): + if len(res.chunks) == 0: + return [] + + counts = {} + result = [] + for d in res.chunks: + if "value" in d and "count" in d: + # directly use the aggregation result + result.append((d["value"], d["count"])) + elif fieldnm in d: + # aggregate the values of specific field + v = d[fieldnm] + if isinstance(v, list): + for vv in v: + if isinstance(vv, str) and vv.strip(): + counts[vv] = counts.get(vv, 0) + 1 + elif isinstance(v, str) and v.strip(): + counts[v] = counts.get(v, 0) + 1 + + if len(counts) > 0: + for k, v in counts.items(): + result.append((k, v)) + + return result + + """ + SQL + """ + + def sql(sql: str, fetch_size: int, format: str): + # TODO: execute the sql generated by text-to-sql + return None diff --git a/uv.lock b/uv.lock index 2eda10fd3..6e05d92b2 100644 --- a/uv.lock +++ b/uv.lock @@ -108,6 +108,18 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/f3/ba/df6e8e1045aebc4778d19b8a3a9bc1808adb1619ba94ca354d9ba17d86c3/aiolimiter-1.2.1-py3-none-any.whl", hash = "sha256:d3f249e9059a20badcb56b61601a83556133655c11d1eb3dd3e04ff069e5f3c7" }, ] +[[package]] +name = "aiomysql" +version = "0.3.2" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "pymysql" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/29/e0/302aeffe8d90853556f47f3106b89c16cc2ec2a4d269bdfd82e3f4ae12cc/aiomysql-0.3.2.tar.gz", hash = "sha256:72d15ef5cfc34c03468eb41e1b90adb9fd9347b0b589114bd23ead569a02ac1a", size = 108311, upload-time = "2025-10-22T00:15:21.278Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/4c/af/aae0153c3e28712adaf462328f6c7a3c196a1c1c27b491de4377dd3e6b52/aiomysql-0.3.2-py3-none-any.whl", hash = "sha256:c82c5ba04137d7afd5c693a258bea8ead2aad77101668044143a991e04632eb2", size = 71834, upload-time = "2025-10-22T00:15:15.905Z" }, +] + [[package]] name = "aiosignal" version = "1.4.0" @@ -4884,6 +4896,23 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/d2/53/d23a97e0a2c690d40b165d1062e2c4ccc796be458a1ce59f6ba030434663/pynndescent-0.5.13-py3-none-any.whl", hash = "sha256:69aabb8f394bc631b6ac475a1c7f3994c54adf3f51cd63b2730fefba5771b949" }, ] +[[package]] +name = "pyobvector" +version = "0.2.18" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "aiomysql" }, + { name = "numpy" }, + { name = "pydantic" }, + { name = "pymysql" }, + { name = "sqlalchemy" }, + { name = "sqlglot" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/82/33/adf722744a88eb75b4422129cbc4fe9b05738064ee79762348e285d93520/pyobvector-0.2.18.tar.gz", hash = "sha256:58ca2765ab99de188e99c815aab914ab9efd003cfa1ce9c5f2e41d0e2b4878be", size = 43035, upload-time = "2025-11-05T06:18:36.747Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/5c/1f/73fa42b215722ec36172ac155626db5d2b95ea9f884cf9fb0624492e303b/pyobvector-0.2.18-py3-none-any.whl", hash = "sha256:93e34b7796e4cbc6ad139118d655eb127d1e7a0f5df76df66e25520533a15488", size = 58129, upload-time = "2025-11-05T06:18:35.326Z" }, +] + [[package]] name = "pyodbc" version = "5.2.0" @@ -5375,6 +5404,7 @@ dependencies = [ { name = "pycryptodomex" }, { name = "pyicu" }, { name = "pymysql" }, + { name = "pyobvector" }, { name = "pyodbc" }, { name = "pypandoc" }, { name = "pypdf" }, @@ -5534,6 +5564,7 @@ requires-dist = [ { name = "pycryptodomex", specifier = "==3.20.0" }, { name = "pyicu", specifier = ">=2.15.3,<3.0.0" }, { name = "pymysql", specifier = ">=1.1.1,<2.0.0" }, + { name = "pyobvector", specifier = "==0.2.18" }, { name = "pyodbc", specifier = ">=5.2.0,<6.0.0" }, { name = "pypandoc", specifier = ">=1.16" }, { name = "pypdf", specifier = "==6.0.0" }, @@ -6430,6 +6461,43 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/52/a7/d2782e4e3f77c8450f727ba74a8f12756d5ba823d81b941f1b04da9d033a/sphinxcontrib_serializinghtml-2.0.0-py3-none-any.whl", hash = "sha256:6e2cb0eef194e10c27ec0023bfeb25badbbb5868244cf5bc5bdc04e4464bf331" }, ] +[[package]] +name = "sqlalchemy" +version = "2.0.44" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "greenlet", marker = "platform_machine == 'AMD64' or platform_machine == 'WIN32' or platform_machine == 'aarch64' or platform_machine == 'amd64' or platform_machine == 'ppc64le' or platform_machine == 'win32' or platform_machine == 'x86_64'" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/f0/f2/840d7b9496825333f532d2e3976b8eadbf52034178aac53630d09fe6e1ef/sqlalchemy-2.0.44.tar.gz", hash = "sha256:0ae7454e1ab1d780aee69fd2aae7d6b8670a581d8847f2d1e0f7ddfbf47e5a22", size = 9819830, upload-time = "2025-10-10T14:39:12.935Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/a2/a7/e9ccfa7eecaf34c6f57d8cb0bb7cbdeeff27017cc0f5d0ca90fdde7a7c0d/sqlalchemy-2.0.44-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7c77f3080674fc529b1bd99489378c7f63fcb4ba7f8322b79732e0258f0ea3ce", size = 2137282, upload-time = "2025-10-10T15:36:10.965Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/b1/e1/50bc121885bdf10833a4f65ecbe9fe229a3215f4d65a58da8a181734cae3/sqlalchemy-2.0.44-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4c26ef74ba842d61635b0152763d057c8d48215d5be9bb8b7604116a059e9985", size = 2127322, upload-time = "2025-10-10T15:36:12.428Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/46/f2/a8573b7230a3ce5ee4b961a2d510d71b43872513647398e595b744344664/sqlalchemy-2.0.44-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f4a172b31785e2f00780eccab00bc240ccdbfdb8345f1e6063175b3ff12ad1b0", size = 3214772, upload-time = "2025-10-10T15:34:15.09Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/4a/d8/c63d8adb6a7edaf8dcb6f75a2b1e9f8577960a1e489606859c4d73e7d32b/sqlalchemy-2.0.44-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f9480c0740aabd8cb29c329b422fb65358049840b34aba0adf63162371d2a96e", size = 3214434, upload-time = "2025-10-10T15:47:00.473Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/ee/a6/243d277a4b54fae74d4797957a7320a5c210c293487f931cbe036debb697/sqlalchemy-2.0.44-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:17835885016b9e4d0135720160db3095dc78c583e7b902b6be799fb21035e749", size = 3155365, upload-time = "2025-10-10T15:34:17.932Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/5f/f8/6a39516ddd75429fd4ee5a0d72e4c80639fab329b2467c75f363c2ed9751/sqlalchemy-2.0.44-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:cbe4f85f50c656d753890f39468fcd8190c5f08282caf19219f684225bfd5fd2", size = 3178910, upload-time = "2025-10-10T15:47:02.346Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/43/f0/118355d4ad3c39d9a2f5ee4c7304a9665b3571482777357fa9920cd7a6b4/sqlalchemy-2.0.44-cp310-cp310-win32.whl", hash = "sha256:2fcc4901a86ed81dc76703f3b93ff881e08761c63263c46991081fd7f034b165", size = 2105624, upload-time = "2025-10-10T15:38:15.552Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/61/83/6ae5f9466f8aa5d0dcebfff8c9c33b98b27ce23292df3b990454b3d434fd/sqlalchemy-2.0.44-cp310-cp310-win_amd64.whl", hash = "sha256:9919e77403a483ab81e3423151e8ffc9dd992c20d2603bf17e4a8161111e55f5", size = 2129240, upload-time = "2025-10-10T15:38:17.175Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/e3/81/15d7c161c9ddf0900b076b55345872ed04ff1ed6a0666e5e94ab44b0163c/sqlalchemy-2.0.44-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0fe3917059c7ab2ee3f35e77757062b1bea10a0b6ca633c58391e3f3c6c488dd", size = 2140517, upload-time = "2025-10-10T15:36:15.64Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/d4/d5/4abd13b245c7d91bdf131d4916fd9e96a584dac74215f8b5bc945206a974/sqlalchemy-2.0.44-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:de4387a354ff230bc979b46b2207af841dc8bf29847b6c7dbe60af186d97aefa", size = 2130738, upload-time = "2025-10-10T15:36:16.91Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/cb/3c/8418969879c26522019c1025171cefbb2a8586b6789ea13254ac602986c0/sqlalchemy-2.0.44-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c3678a0fb72c8a6a29422b2732fe423db3ce119c34421b5f9955873eb9b62c1e", size = 3304145, upload-time = "2025-10-10T15:34:19.569Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/94/2d/fdb9246d9d32518bda5d90f4b65030b9bf403a935cfe4c36a474846517cb/sqlalchemy-2.0.44-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3cf6872a23601672d61a68f390e44703442639a12ee9dd5a88bbce52a695e46e", size = 3304511, upload-time = "2025-10-10T15:47:05.088Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/7d/fb/40f2ad1da97d5c83f6c1269664678293d3fe28e90ad17a1093b735420549/sqlalchemy-2.0.44-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:329aa42d1be9929603f406186630135be1e7a42569540577ba2c69952b7cf399", size = 3235161, upload-time = "2025-10-10T15:34:21.193Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/95/cb/7cf4078b46752dca917d18cf31910d4eff6076e5b513c2d66100c4293d83/sqlalchemy-2.0.44-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:70e03833faca7166e6a9927fbee7c27e6ecde436774cd0b24bbcc96353bce06b", size = 3261426, upload-time = "2025-10-10T15:47:07.196Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/f8/3b/55c09b285cb2d55bdfa711e778bdffdd0dc3ffa052b0af41f1c5d6e582fa/sqlalchemy-2.0.44-cp311-cp311-win32.whl", hash = "sha256:253e2f29843fb303eca6b2fc645aca91fa7aa0aa70b38b6950da92d44ff267f3", size = 2105392, upload-time = "2025-10-10T15:38:20.051Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/c7/23/907193c2f4d680aedbfbdf7bf24c13925e3c7c292e813326c1b84a0b878e/sqlalchemy-2.0.44-cp311-cp311-win_amd64.whl", hash = "sha256:7a8694107eb4308a13b425ca8c0e67112f8134c846b6e1f722698708741215d5", size = 2130293, upload-time = "2025-10-10T15:38:21.601Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/62/c4/59c7c9b068e6813c898b771204aad36683c96318ed12d4233e1b18762164/sqlalchemy-2.0.44-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:72fea91746b5890f9e5e0997f16cbf3d53550580d76355ba2d998311b17b2250", size = 2139675, upload-time = "2025-10-10T16:03:31.064Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/d6/ae/eeb0920537a6f9c5a3708e4a5fc55af25900216bdb4847ec29cfddf3bf3a/sqlalchemy-2.0.44-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:585c0c852a891450edbb1eaca8648408a3cc125f18cf433941fa6babcc359e29", size = 2127726, upload-time = "2025-10-10T16:03:35.934Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/d8/d5/2ebbabe0379418eda8041c06b0b551f213576bfe4c2f09d77c06c07c8cc5/sqlalchemy-2.0.44-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b94843a102efa9ac68a7a30cd46df3ff1ed9c658100d30a725d10d9c60a2f44", size = 3327603, upload-time = "2025-10-10T15:35:28.322Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/45/e5/5aa65852dadc24b7d8ae75b7efb8d19303ed6ac93482e60c44a585930ea5/sqlalchemy-2.0.44-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:119dc41e7a7defcefc57189cfa0e61b1bf9c228211aba432b53fb71ef367fda1", size = 3337842, upload-time = "2025-10-10T15:43:45.431Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/41/92/648f1afd3f20b71e880ca797a960f638d39d243e233a7082c93093c22378/sqlalchemy-2.0.44-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0765e318ee9179b3718c4fd7ba35c434f4dd20332fbc6857a5e8df17719c24d7", size = 3264558, upload-time = "2025-10-10T15:35:29.93Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/40/cf/e27d7ee61a10f74b17740918e23cbc5bc62011b48282170dc4c66da8ec0f/sqlalchemy-2.0.44-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:2e7b5b079055e02d06a4308d0481658e4f06bc7ef211567edc8f7d5dce52018d", size = 3301570, upload-time = "2025-10-10T15:43:48.407Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/3b/3d/3116a9a7b63e780fb402799b6da227435be878b6846b192f076d2f838654/sqlalchemy-2.0.44-cp312-cp312-win32.whl", hash = "sha256:846541e58b9a81cce7dee8329f352c318de25aa2f2bbe1e31587eb1f057448b4", size = 2103447, upload-time = "2025-10-10T15:03:21.678Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/25/83/24690e9dfc241e6ab062df82cc0df7f4231c79ba98b273fa496fb3dd78ed/sqlalchemy-2.0.44-cp312-cp312-win_amd64.whl", hash = "sha256:7cbcb47fd66ab294703e1644f78971f6f2f1126424d2b300678f419aa73c7b6e", size = 2130912, upload-time = "2025-10-10T15:03:24.656Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/9c/5e/6a29fa884d9fb7ddadf6b69490a9d45fded3b38541713010dad16b77d015/sqlalchemy-2.0.44-py3-none-any.whl", hash = "sha256:19de7ca1246fbef9f9d1bff8f1ab25641569df226364a0e40457dc5457c54b05", size = 1928718, upload-time = "2025-10-10T15:29:45.32Z" }, +] + [[package]] name = "sqlglot" version = "27.29.0"