feature:Add OceanBase Storage Support for Table Parser (#12923)

### What problem does this PR solve?

close #12770 

This PR adds OceanBase as a storage backend for the Table Parser. It
enables dynamic table schema storage via JSON and implements OceanBase
SQL execution for text-to-SQL retrieval.


### Type of change

- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):

### Changes
- Table Parser stores row data into `chunk_data` when doc engine is
OceanBase. (table.py)
- OceanBase table schema adds `chunk_data` JSON column and migrates if
needed.
- Implemented OceanBase `sql()` to execute text-to-SQL results.
(ob_conn.py)
- Add `DOC_ENGINE_OCEANBASE` flag for engine detection (setting.py)

### Test
1. Set `DOC_ENGINE=oceanbase` (e.g. in `docker/.env`)
<img width="1290" height="783" alt="doc_engine_ob"
src="https://github.com/user-attachments/assets/7d1c609f-7bf2-4b2e-b4cc-4243e72ad4f1"
/>

2. Upload an Excel file to Knowledge Base.(for test, we use as below)
<img width="786" height="930" alt="excel"
src="https://github.com/user-attachments/assets/bedf82f2-cd00-426b-8f4d-6978a151231a"
/>

3. Choose **Table** as parsing method.
<img width="2550" height="1134" alt="parse_excel"
src="https://github.com/user-attachments/assets/aba11769-02be-4905-97e1-e24485e24cd0"
/>

4.Ask a natural language query in chat.
<img width="2550" height="1134" alt="query"
src="https://github.com/user-attachments/assets/26a910a6-e503-4ac7-b66a-f5754bbb0e91"
/>
This commit is contained in:
Carve_
2026-01-31 15:11:54 +08:00
committed by GitHub
parent ee23b9eb63
commit 23bdf25a1f
4 changed files with 115 additions and 15 deletions

View File

@ -457,10 +457,10 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
txts.extend([str(c) for c in cln if c])
clmns_map = [(py_clmns[i].lower() + fields_map[clmn_tys[i]], str(clmns[i]).replace("_", " ")) for i in
range(len(clmns))]
# For Infinity: Use original column names as keys since they're stored in chunk_data JSON
# For Infinity/OceanBase: Use original column names as keys since they're stored in chunk_data JSON
# For ES/OS: Use full field names with type suffixes (e.g., url_kwd, body_tks)
if settings.DOC_ENGINE_INFINITY:
# For Infinity: key = original column name, value = display name
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
# For Infinity/OceanBase: key = original column name, value = display name
field_map = {py_clmns[i].lower(): str(clmns[i]).replace("_", " ") for i in range(len(clmns))}
else:
# For ES/OS: key = typed field name, value = display name
@ -480,9 +480,9 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
continue
if not isinstance(row[clmns[j]], pd.Series) and pd.isna(row[clmns[j]]):
continue
# For Infinity: Store in chunk_data JSON column
# For Infinity/OceanBase: Store in chunk_data JSON column
# For Elasticsearch/OpenSearch: Store as individual fields with type suffixes
if settings.DOC_ENGINE_INFINITY:
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
data_json[str(clmns[j])] = row[clmns[j]]
else:
fld = clmns_map[j][0]
@ -490,8 +490,8 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
row_fields.append((clmns[j], row[clmns[j]]))
if not row_fields:
continue
# Add the data JSON field to the document (for Infinity only)
if settings.DOC_ENGINE_INFINITY:
# Add the data JSON field to the document (for Infinity/OceanBase)
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
d["chunk_data"] = data_json
# Format as a structured text for better LLM comprehension
# Format each field as "- Field Name: Value" on separate lines

View File

@ -49,6 +49,7 @@ logger = logging.getLogger('ragflow.ob_conn')
column_order_id = Column("_order_id", Integer, nullable=True, comment="chunk order id for maintaining sequence")
column_group_id = Column("group_id", String(256), nullable=True, comment="group id for external retrieval")
column_mom_id = Column("mom_id", String(256), nullable=True, comment="parent chunk id")
column_chunk_data = Column("chunk_data", JSON, nullable=True, comment="table parser row data")
column_definitions: list[Column] = [
Column("id", String(256), primary_key=True, comment="chunk id"),
@ -89,6 +90,7 @@ column_definitions: list[Column] = [
Column("rank_flt", Double, nullable=True, comment="rank of this entity"),
Column("removed_kwd", String(256), nullable=True, index=True, server_default="'N'",
comment="whether it has been deleted"),
column_chunk_data,
Column("metadata", JSON, nullable=True, comment="metadata for this chunk"),
Column("extra", JSON, nullable=True, comment="extra information of non-general chunk"),
column_order_id,
@ -754,7 +756,7 @@ class OBConnection(DocStoreConnection):
Table operations
"""
def create_idx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
def create_idx(self, indexName: str, knowledgebaseId: str, vectorSize: int, parser_id: str = None):
vector_field_name = f"q_{vectorSize}_vec"
vector_index_name = f"{vector_field_name}_idx"
@ -793,7 +795,7 @@ class OBConnection(DocStoreConnection):
)
# new columns migration
for column in [column_order_id, column_group_id, column_mom_id]:
for column in [column_chunk_data, column_order_id, column_group_id, column_mom_id]:
_try_with_lock(
lock_name=f"ob_add_{column.name}_{indexName}",
check_func=lambda: self._column_exist(indexName, column.name),
@ -1826,6 +1828,66 @@ class OBConnection(DocStoreConnection):
SQL
"""
def sql(sql: str, fetch_size: int, format: str):
# TODO: execute the sql generated by text-to-sql
return None
def sql(self, sql: str, fetch_size: int = 1024, format: str = "json"):
logger.debug("OBConnection.sql get sql: %s", sql)
def normalize_sql(sql_text: str) -> str:
cleaned = sql_text.strip().rstrip(";")
cleaned = re.sub(r"[`]+", "", cleaned)
cleaned = re.sub(
r"json_extract_string\s*\(\s*([^,]+?)\s*,\s*([^)]+?)\s*\)",
r"JSON_UNQUOTE(JSON_EXTRACT(\1, \2))",
cleaned,
flags=re.IGNORECASE,
)
cleaned = re.sub(
r"json_extract_isnull\s*\(\s*([^,]+?)\s*,\s*([^)]+?)\s*\)",
r"(JSON_EXTRACT(\1, \2) IS NULL)",
cleaned,
flags=re.IGNORECASE,
)
return cleaned
def coerce_value(value: Any) -> Any:
if isinstance(value, np.generic):
return value.item()
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
return value
sql_text = normalize_sql(sql)
if fetch_size and fetch_size > 0:
sql_lower = sql_text.lstrip().lower()
if re.match(r"^(select|with)\b", sql_lower) and not re.search(r"\blimit\b", sql_lower):
sql_text = f"{sql_text} LIMIT {int(fetch_size)}"
logger.debug("OBConnection.sql to ob: %s", sql_text)
try:
res = self.client.perform_raw_text_sql(sql_text)
except Exception:
logger.exception("OBConnection.sql got exception")
raise
if res is None:
return None
columns = list(res.keys()) if hasattr(res, "keys") else []
try:
rows = res.fetchmany(fetch_size) if fetch_size and fetch_size > 0 else res.fetchall()
except Exception:
rows = res.fetchall()
rows_list = [[coerce_value(v) for v in list(row)] for row in rows]
result = {
"columns": [{"name": col, "type": "text"} for col in columns],
"rows": rows_list,
}
if format == "markdown":
header = "|" + "|".join(columns) + "|" if columns else ""
separator = "|" + "|".join(["---" for _ in columns]) + "|" if columns else ""
body = "\n".join(["|" + "|".join([str(v) for v in row]) + "|" for row in rows_list])
result["markdown"] = "\n".join([line for line in [header, separator, body] if line])
return result