From 23bdf25a1fd4aaed05e385ca086e7b9a5d003ce3 Mon Sep 17 00:00:00 2001
From: Carve_ <75568342+Rynzie02@users.noreply.github.com>
Date: Sat, 31 Jan 2026 15:11:54 +0800
Subject: [PATCH] feature:Add OceanBase Storage Support for Table Parser
(#12923)
### What problem does this PR solve?
close #12770
This PR adds OceanBase as a storage backend for the Table Parser. It
enables dynamic table schema storage via JSON and implements OceanBase
SQL execution for text-to-SQL retrieval.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Changes
- Table Parser stores row data into `chunk_data` when doc engine is
OceanBase. (table.py)
- OceanBase table schema adds `chunk_data` JSON column and migrates if
needed.
- Implemented OceanBase `sql()` to execute text-to-SQL results.
(ob_conn.py)
- Add `DOC_ENGINE_OCEANBASE` flag for engine detection (setting.py)
### Test
1. Set `DOC_ENGINE=oceanbase` (e.g. in `docker/.env`)
2. Upload an Excel file to Knowledge Base.(for test, we use as below)
3. Choose **Table** as parsing method.
4.Ask a natural language query in chat.
---
api/db/services/dialog_service.py | 40 ++++++++++++++++-
common/settings.py | 4 +-
rag/app/table.py | 14 +++---
rag/utils/ob_conn.py | 72 ++++++++++++++++++++++++++++---
4 files changed, 115 insertions(+), 15 deletions(-)
diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py
index 8a11397e4..3940a8a2f 100644
--- a/api/db/services/dialog_service.py
+++ b/api/db/services/dialog_service.py
@@ -586,7 +586,12 @@ async def use_sql(question, field_map, tenant_id, chat_mdl, quota=True, kb_ids=N
logging.debug(f"use_sql: Question: {question}")
# Determine which document engine we're using
- doc_engine = "infinity" if settings.DOC_ENGINE_INFINITY else "es"
+ if settings.DOC_ENGINE_INFINITY:
+ doc_engine = "infinity"
+ elif settings.DOC_ENGINE_OCEANBASE:
+ doc_engine = "oceanbase"
+ else:
+ doc_engine = "es"
# Construct the full table name
# For Elasticsearch: ragflow_{tenant_id} (kb_id is in WHERE clause)
@@ -633,6 +638,37 @@ Write SQL using json_extract_string() with exact field names. Include doc_id, do
"\n".join([f" - {field}" for field in json_field_names]),
question
)
+ elif doc_engine == "oceanbase":
+ # Build OceanBase prompts with JSON extraction context
+ json_field_names = list(field_map.keys())
+ sys_prompt = """You are a Database Administrator. Write SQL for a table with JSON 'chunk_data' column.
+
+JSON Extraction: json_extract_string(chunk_data, '$.FieldName')
+Numeric Cast: CAST(json_extract_string(chunk_data, '$.FieldName') AS INTEGER/FLOAT)
+NULL Check: json_extract_isnull(chunk_data, '$.FieldName') == false
+
+RULES:
+1. Use EXACT field names (case-sensitive) from the list below
+2. For SELECT: include doc_id, docnm_kwd, and json_extract_string() for requested fields
+3. For COUNT: use COUNT(*) or COUNT(DISTINCT json_extract_string(...))
+4. Add AS alias for extracted field names
+5. DO NOT select 'content' field
+6. Only add NULL check (json_extract_isnull() == false) in WHERE clause when:
+ - Question asks to "show me" or "display" specific columns
+ - Question mentions "not null" or "excluding null"
+ - Add NULL check for count specific column
+ - DO NOT add NULL check for COUNT(*) queries (COUNT(*) counts all rows including nulls)
+7. Output ONLY the SQL, no explanations"""
+ user_prompt = """Table: {}
+Fields (EXACT case): {}
+{}
+Question: {}
+Write SQL using json_extract_string() with exact field names. Include doc_id, docnm_kwd for data queries. Only SQL.""".format(
+ table_name,
+ ", ".join(json_field_names),
+ "\n".join([f" - {field}" for field in json_field_names]),
+ question
+ )
else:
# Build ES/OS prompts with direct field access
sys_prompt = """You are a Database Administrator. Write SQL queries.
@@ -703,7 +739,7 @@ Write SQL using exact field names above. Include doc_id, docnm_kwd for data quer
except Exception as e:
logging.warning(f"use_sql: Initial SQL execution FAILED with error: {e}")
# Build retry prompt with error information
- if doc_engine == "infinity":
+ if doc_engine in ("infinity", "oceanbase"):
# Build Infinity error retry prompt
json_field_names = list(field_map.keys())
user_prompt = """
diff --git a/common/settings.py b/common/settings.py
index 41ac2f5eb..221e4a909 100644
--- a/common/settings.py
+++ b/common/settings.py
@@ -79,6 +79,7 @@ FEISHU_OAUTH = None
OAUTH_CONFIG = None
DOC_ENGINE = os.getenv('DOC_ENGINE', 'elasticsearch')
DOC_ENGINE_INFINITY = (DOC_ENGINE.lower() == "infinity")
+DOC_ENGINE_OCEANBASE = (DOC_ENGINE.lower() == "oceanbase")
docStoreConn = None
@@ -241,9 +242,10 @@ def init_settings():
FEISHU_OAUTH = get_base_config("oauth", {}).get("feishu")
OAUTH_CONFIG = get_base_config("oauth", {})
- global DOC_ENGINE, DOC_ENGINE_INFINITY, docStoreConn, ES, OB, OS, INFINITY
+ global DOC_ENGINE, DOC_ENGINE_INFINITY, DOC_ENGINE_OCEANBASE, docStoreConn, ES, OB, OS, INFINITY
DOC_ENGINE = os.environ.get("DOC_ENGINE", "elasticsearch")
DOC_ENGINE_INFINITY = (DOC_ENGINE.lower() == "infinity")
+ DOC_ENGINE_OCEANBASE = (DOC_ENGINE.lower() == "oceanbase")
lower_case_doc_engine = DOC_ENGINE.lower()
if lower_case_doc_engine == "elasticsearch":
ES = get_base_config("es", {})
diff --git a/rag/app/table.py b/rag/app/table.py
index 1b49994e5..fad6b7038 100644
--- a/rag/app/table.py
+++ b/rag/app/table.py
@@ -457,10 +457,10 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
txts.extend([str(c) for c in cln if c])
clmns_map = [(py_clmns[i].lower() + fields_map[clmn_tys[i]], str(clmns[i]).replace("_", " ")) for i in
range(len(clmns))]
- # For Infinity: Use original column names as keys since they're stored in chunk_data JSON
+ # For Infinity/OceanBase: Use original column names as keys since they're stored in chunk_data JSON
# For ES/OS: Use full field names with type suffixes (e.g., url_kwd, body_tks)
- if settings.DOC_ENGINE_INFINITY:
- # For Infinity: key = original column name, value = display name
+ if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
+ # For Infinity/OceanBase: key = original column name, value = display name
field_map = {py_clmns[i].lower(): str(clmns[i]).replace("_", " ") for i in range(len(clmns))}
else:
# For ES/OS: key = typed field name, value = display name
@@ -480,9 +480,9 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
continue
if not isinstance(row[clmns[j]], pd.Series) and pd.isna(row[clmns[j]]):
continue
- # For Infinity: Store in chunk_data JSON column
+ # For Infinity/OceanBase: Store in chunk_data JSON column
# For Elasticsearch/OpenSearch: Store as individual fields with type suffixes
- if settings.DOC_ENGINE_INFINITY:
+ if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
data_json[str(clmns[j])] = row[clmns[j]]
else:
fld = clmns_map[j][0]
@@ -490,8 +490,8 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
row_fields.append((clmns[j], row[clmns[j]]))
if not row_fields:
continue
- # Add the data JSON field to the document (for Infinity only)
- if settings.DOC_ENGINE_INFINITY:
+ # Add the data JSON field to the document (for Infinity/OceanBase)
+ if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
d["chunk_data"] = data_json
# Format as a structured text for better LLM comprehension
# Format each field as "- Field Name: Value" on separate lines
diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py
index b7c0ead58..45ccb256d 100644
--- a/rag/utils/ob_conn.py
+++ b/rag/utils/ob_conn.py
@@ -49,6 +49,7 @@ logger = logging.getLogger('ragflow.ob_conn')
column_order_id = Column("_order_id", Integer, nullable=True, comment="chunk order id for maintaining sequence")
column_group_id = Column("group_id", String(256), nullable=True, comment="group id for external retrieval")
column_mom_id = Column("mom_id", String(256), nullable=True, comment="parent chunk id")
+column_chunk_data = Column("chunk_data", JSON, nullable=True, comment="table parser row data")
column_definitions: list[Column] = [
Column("id", String(256), primary_key=True, comment="chunk id"),
@@ -89,6 +90,7 @@ column_definitions: list[Column] = [
Column("rank_flt", Double, nullable=True, comment="rank of this entity"),
Column("removed_kwd", String(256), nullable=True, index=True, server_default="'N'",
comment="whether it has been deleted"),
+ column_chunk_data,
Column("metadata", JSON, nullable=True, comment="metadata for this chunk"),
Column("extra", JSON, nullable=True, comment="extra information of non-general chunk"),
column_order_id,
@@ -754,7 +756,7 @@ class OBConnection(DocStoreConnection):
Table operations
"""
- def create_idx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
+ def create_idx(self, indexName: str, knowledgebaseId: str, vectorSize: int, parser_id: str = None):
vector_field_name = f"q_{vectorSize}_vec"
vector_index_name = f"{vector_field_name}_idx"
@@ -793,7 +795,7 @@ class OBConnection(DocStoreConnection):
)
# new columns migration
- for column in [column_order_id, column_group_id, column_mom_id]:
+ for column in [column_chunk_data, column_order_id, column_group_id, column_mom_id]:
_try_with_lock(
lock_name=f"ob_add_{column.name}_{indexName}",
check_func=lambda: self._column_exist(indexName, column.name),
@@ -1826,6 +1828,66 @@ class OBConnection(DocStoreConnection):
SQL
"""
- def sql(sql: str, fetch_size: int, format: str):
- # TODO: execute the sql generated by text-to-sql
- return None
+ def sql(self, sql: str, fetch_size: int = 1024, format: str = "json"):
+ logger.debug("OBConnection.sql get sql: %s", sql)
+
+ def normalize_sql(sql_text: str) -> str:
+ cleaned = sql_text.strip().rstrip(";")
+ cleaned = re.sub(r"[`]+", "", cleaned)
+ cleaned = re.sub(
+ r"json_extract_string\s*\(\s*([^,]+?)\s*,\s*([^)]+?)\s*\)",
+ r"JSON_UNQUOTE(JSON_EXTRACT(\1, \2))",
+ cleaned,
+ flags=re.IGNORECASE,
+ )
+ cleaned = re.sub(
+ r"json_extract_isnull\s*\(\s*([^,]+?)\s*,\s*([^)]+?)\s*\)",
+ r"(JSON_EXTRACT(\1, \2) IS NULL)",
+ cleaned,
+ flags=re.IGNORECASE,
+ )
+ return cleaned
+
+ def coerce_value(value: Any) -> Any:
+ if isinstance(value, np.generic):
+ return value.item()
+ if isinstance(value, bytes):
+ return value.decode("utf-8", errors="ignore")
+ return value
+
+ sql_text = normalize_sql(sql)
+ if fetch_size and fetch_size > 0:
+ sql_lower = sql_text.lstrip().lower()
+ if re.match(r"^(select|with)\b", sql_lower) and not re.search(r"\blimit\b", sql_lower):
+ sql_text = f"{sql_text} LIMIT {int(fetch_size)}"
+
+ logger.debug("OBConnection.sql to ob: %s", sql_text)
+
+ try:
+ res = self.client.perform_raw_text_sql(sql_text)
+ except Exception:
+ logger.exception("OBConnection.sql got exception")
+ raise
+
+ if res is None:
+ return None
+
+ columns = list(res.keys()) if hasattr(res, "keys") else []
+ try:
+ rows = res.fetchmany(fetch_size) if fetch_size and fetch_size > 0 else res.fetchall()
+ except Exception:
+ rows = res.fetchall()
+
+ rows_list = [[coerce_value(v) for v in list(row)] for row in rows]
+ result = {
+ "columns": [{"name": col, "type": "text"} for col in columns],
+ "rows": rows_list,
+ }
+
+ if format == "markdown":
+ header = "|" + "|".join(columns) + "|" if columns else ""
+ separator = "|" + "|".join(["---" for _ in columns]) + "|" if columns else ""
+ body = "\n".join(["|" + "|".join([str(v) for v in row]) + "|" for row in rows_list])
+ result["markdown"] = "\n".join([line for line in [header, separator, body] if line])
+
+ return result