mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-20 04:39:00 +08:00
Fix: parent-children chunking method. (#11997)
### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@ -147,7 +147,7 @@ async def run():
|
|||||||
if cvs.canvas_category == CanvasCategory.DataFlow:
|
if cvs.canvas_category == CanvasCategory.DataFlow:
|
||||||
task_id = get_uuid()
|
task_id = get_uuid()
|
||||||
Pipeline(cvs.dsl, tenant_id=current_user.id, doc_id=CANVAS_DEBUG_DOC_ID, task_id=task_id, flow_id=req["id"])
|
Pipeline(cvs.dsl, tenant_id=current_user.id, doc_id=CANVAS_DEBUG_DOC_ID, task_id=task_id, flow_id=req["id"])
|
||||||
ok, error_message = await asyncio.to_thread(queue_dataflow, user_id, req["id"], task_id, files[0], 0)
|
ok, error_message = await asyncio.to_thread(queue_dataflow, user_id, req["id"], task_id, CANVAS_DEBUG_DOC_ID, files[0], 0)
|
||||||
if not ok:
|
if not ok:
|
||||||
return get_data_error_result(message=error_message)
|
return get_data_error_result(message=error_message)
|
||||||
return get_json_result(data={"message_id": task_id})
|
return get_json_result(data={"message_id": task_id})
|
||||||
|
|||||||
@ -386,6 +386,7 @@ async def retrieval_test():
|
|||||||
LLMBundle(kb.tenant_id, LLMType.CHAT))
|
LLMBundle(kb.tenant_id, LLMType.CHAT))
|
||||||
if ck["content_with_weight"]:
|
if ck["content_with_weight"]:
|
||||||
ranks["chunks"].insert(0, ck)
|
ranks["chunks"].insert(0, ck)
|
||||||
|
ranks["chunks"] = settings.retriever.retrieval_by_children(ranks["chunks"], tenant_ids)
|
||||||
|
|
||||||
for c in ranks["chunks"]:
|
for c in ranks["chunks"]:
|
||||||
c.pop("vector", None)
|
c.pop("vector", None)
|
||||||
|
|||||||
@ -13,7 +13,10 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
from typing import Any, Callable
|
import logging
|
||||||
|
from typing import Any, Callable, Dict
|
||||||
|
|
||||||
|
import json_repair
|
||||||
|
|
||||||
from rag.prompts.generator import gen_meta_filter
|
from rag.prompts.generator import gen_meta_filter
|
||||||
|
|
||||||
@ -140,3 +143,63 @@ async def apply_meta_data_filter(
|
|||||||
doc_ids = ["-999"]
|
doc_ids = ["-999"]
|
||||||
|
|
||||||
return doc_ids
|
return doc_ids
|
||||||
|
|
||||||
|
|
||||||
|
def update_metadata_to(metadata, meta):
|
||||||
|
if not meta:
|
||||||
|
return metadata
|
||||||
|
if isinstance(meta, str):
|
||||||
|
try:
|
||||||
|
meta = json_repair.loads(meta)
|
||||||
|
except Exception:
|
||||||
|
logging.error("Meta data format error.")
|
||||||
|
return metadata
|
||||||
|
if not isinstance(meta, dict):
|
||||||
|
return metadata
|
||||||
|
for k, v in meta.items():
|
||||||
|
if isinstance(v, list):
|
||||||
|
v = [vv for vv in v if isinstance(vv, str)]
|
||||||
|
if not v:
|
||||||
|
continue
|
||||||
|
if not isinstance(v, list) and not isinstance(v, str):
|
||||||
|
continue
|
||||||
|
if k not in metadata:
|
||||||
|
metadata[k] = v
|
||||||
|
continue
|
||||||
|
if isinstance(metadata[k], list):
|
||||||
|
if isinstance(v, list):
|
||||||
|
metadata[k].extend(v)
|
||||||
|
else:
|
||||||
|
metadata[k].append(v)
|
||||||
|
else:
|
||||||
|
metadata[k] = v
|
||||||
|
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
|
||||||
|
def metadata_schema(metadata: list|None) -> Dict[str, Any]:
|
||||||
|
if not metadata:
|
||||||
|
return {}
|
||||||
|
properties = {}
|
||||||
|
|
||||||
|
for item in metadata:
|
||||||
|
key = item.get("key")
|
||||||
|
if not key:
|
||||||
|
continue
|
||||||
|
|
||||||
|
prop_schema = {
|
||||||
|
"description": item.get("description", "")
|
||||||
|
}
|
||||||
|
if "enum" in item and item["enum"]:
|
||||||
|
prop_schema["enum"] = item["enum"]
|
||||||
|
prop_schema["type"] = "string"
|
||||||
|
|
||||||
|
properties[key] = prop_schema
|
||||||
|
|
||||||
|
json_schema = {
|
||||||
|
"type": "object",
|
||||||
|
"properties": properties,
|
||||||
|
}
|
||||||
|
|
||||||
|
json_schema["additionalProperties"] = False
|
||||||
|
return json_schema
|
||||||
@ -78,7 +78,7 @@ class Extractor:
|
|||||||
raise TaskCanceledException(f"Task {task_id} was cancelled")
|
raise TaskCanceledException(f"Task {task_id} was cancelled")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = self._llm.chat(system_msg[0]["content"], hist, conf)
|
response = asyncio.run(self._llm.async_chat(system_msg[0]["content"], hist, conf))
|
||||||
response = re.sub(r"^.*</think>", "", response, flags=re.DOTALL)
|
response = re.sub(r"^.*</think>", "", response, flags=re.DOTALL)
|
||||||
if response.find("**ERROR**") >= 0:
|
if response.find("**ERROR**") >= 0:
|
||||||
raise Exception(response)
|
raise Exception(response)
|
||||||
|
|||||||
@ -635,9 +635,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
|
|||||||
"parser_config", {
|
"parser_config", {
|
||||||
"chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True})
|
"chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True})
|
||||||
|
|
||||||
child_deli = re.findall(r"`([^`]+)`", parser_config.get("children_delimiter", ""))
|
child_deli = parser_config.get("children_delimiter", "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
|
||||||
child_deli = sorted(set(child_deli), key=lambda x: -len(x))
|
cust_child_deli = re.findall(r"`([^`]+)`", child_deli)
|
||||||
child_deli = "|".join(re.escape(t) for t in child_deli if t)
|
child_deli = "|".join(re.sub(r"`([^`]+)`", "", child_deli))
|
||||||
|
if cust_child_deli:
|
||||||
|
cust_child_deli = sorted(set(cust_child_deli), key=lambda x: -len(x))
|
||||||
|
cust_child_deli = "|".join(re.escape(t) for t in cust_child_deli if t)
|
||||||
|
child_deli += cust_child_deli
|
||||||
|
|
||||||
is_markdown = False
|
is_markdown = False
|
||||||
table_context_size = max(0, int(parser_config.get("table_context_size", 0) or 0))
|
table_context_size = max(0, int(parser_config.get("table_context_size", 0) or 0))
|
||||||
image_context_size = max(0, int(parser_config.get("image_context_size", 0) or 0))
|
image_context_size = max(0, int(parser_config.get("image_context_size", 0) or 0))
|
||||||
|
|||||||
@ -60,14 +60,7 @@ class Splitter(ProcessBase):
|
|||||||
deli += f"`{d}`"
|
deli += f"`{d}`"
|
||||||
else:
|
else:
|
||||||
deli += d
|
deli += d
|
||||||
child_deli = ""
|
custom_pattern = "|".join(re.escape(t) for t in sorted(set(self._param.children_delimiters), key=len, reverse=True))
|
||||||
for d in self._param.children_delimiters:
|
|
||||||
if len(d) > 1:
|
|
||||||
child_deli += f"`{d}`"
|
|
||||||
else:
|
|
||||||
child_deli += d
|
|
||||||
child_deli = [m.group(1) for m in re.finditer(r"`([^`]+)`", child_deli)]
|
|
||||||
custom_pattern = "|".join(re.escape(t) for t in sorted(set(child_deli), key=len, reverse=True))
|
|
||||||
|
|
||||||
self.set_output("output_format", "chunks")
|
self.set_output("output_format", "chunks")
|
||||||
self.callback(random.randint(1, 5) / 100.0, "Start to split into chunks.")
|
self.callback(random.randint(1, 5) / 100.0, "Start to split into chunks.")
|
||||||
|
|||||||
@ -273,6 +273,21 @@ def tokenize(d, txt, eng):
|
|||||||
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
|
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
|
||||||
|
|
||||||
|
|
||||||
|
def split_with_pattern(d, pattern:str, content:str, eng) -> list:
|
||||||
|
docs = []
|
||||||
|
txts = [txt for txt in re.split(r"(%s)" % pattern, content, flags=re.DOTALL)]
|
||||||
|
for j in range(0, len(txts), 2):
|
||||||
|
txt = txts[j]
|
||||||
|
if not txt:
|
||||||
|
continue
|
||||||
|
if j + 1 < len(txts):
|
||||||
|
txt += txts[j+1]
|
||||||
|
dd = copy.deepcopy(d)
|
||||||
|
tokenize(dd, txt, eng)
|
||||||
|
docs.append(dd)
|
||||||
|
return docs
|
||||||
|
|
||||||
|
|
||||||
def tokenize_chunks(chunks, doc, eng, pdf_parser=None, child_delimiters_pattern=None):
|
def tokenize_chunks(chunks, doc, eng, pdf_parser=None, child_delimiters_pattern=None):
|
||||||
res = []
|
res = []
|
||||||
# wrap up as es documents
|
# wrap up as es documents
|
||||||
@ -293,10 +308,7 @@ def tokenize_chunks(chunks, doc, eng, pdf_parser=None, child_delimiters_pattern=
|
|||||||
|
|
||||||
if child_delimiters_pattern:
|
if child_delimiters_pattern:
|
||||||
d["mom_with_weight"] = ck
|
d["mom_with_weight"] = ck
|
||||||
for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL):
|
res.extend(split_with_pattern(d, child_delimiters_pattern, ck, eng))
|
||||||
dd = copy.deepcopy(d)
|
|
||||||
tokenize(dd, txt, eng)
|
|
||||||
res.append(dd)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
tokenize(d, ck, eng)
|
tokenize(d, ck, eng)
|
||||||
@ -316,10 +328,7 @@ def tokenize_chunks_with_images(chunks, doc, eng, images, child_delimiters_patte
|
|||||||
add_positions(d, [[ii]*5])
|
add_positions(d, [[ii]*5])
|
||||||
if child_delimiters_pattern:
|
if child_delimiters_pattern:
|
||||||
d["mom_with_weight"] = ck
|
d["mom_with_weight"] = ck
|
||||||
for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL):
|
res.extend(split_with_pattern(d, child_delimiters_pattern, ck, eng))
|
||||||
dd = copy.deepcopy(d)
|
|
||||||
tokenize(dd, txt, eng)
|
|
||||||
res.append(dd)
|
|
||||||
continue
|
continue
|
||||||
tokenize(d, ck, eng)
|
tokenize(d, ck, eng)
|
||||||
res.append(d)
|
res.append(d)
|
||||||
|
|||||||
@ -821,3 +821,13 @@ async def relevant_chunks_with_toc(query: str, toc:list[dict], chat_mdl, topn: i
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.exception(e)
|
logging.exception(e)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
META_DATA = load_prompt("meta_data")
|
||||||
|
async def gen_metadata(chat_mdl, schema:dict, content:str):
|
||||||
|
template = PROMPT_JINJA_ENV.from_string(META_DATA)
|
||||||
|
system_prompt = template.render(content=content, schema=schema)
|
||||||
|
user_prompt = "Output: "
|
||||||
|
_, msg = message_fit_in(form_message(system_prompt, user_prompt), chat_mdl.max_length)
|
||||||
|
ans = await chat_mdl.async_chat(msg[0]["content"], msg[1:])
|
||||||
|
return re.sub(r"^.*</think>", "", ans, flags=re.DOTALL)
|
||||||
13
rag/prompts/meta_data.md
Normal file
13
rag/prompts/meta_data.md
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
Extract important structured information from the given content.
|
||||||
|
Output ONLY a valid JSON string with no additional text.
|
||||||
|
If no important structured information is found, output an empty JSON object: {}.
|
||||||
|
|
||||||
|
Important structured information structure as following:
|
||||||
|
|
||||||
|
{{ schema }}
|
||||||
|
|
||||||
|
---------------------------
|
||||||
|
The given content as following:
|
||||||
|
|
||||||
|
{{ content }}
|
||||||
|
|
||||||
@ -23,19 +23,19 @@ import sys
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import json_repair
|
|
||||||
|
|
||||||
from api.db import PIPELINE_SPECIAL_PROGRESS_FREEZE_TASK_TYPES
|
from api.db import PIPELINE_SPECIAL_PROGRESS_FREEZE_TASK_TYPES
|
||||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||||
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
|
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
|
||||||
from common.connection_utils import timeout
|
from common.connection_utils import timeout
|
||||||
|
from common.metadata_utils import update_metadata_to, metadata_schema
|
||||||
from rag.utils.base64_image import image2id
|
from rag.utils.base64_image import image2id
|
||||||
from rag.utils.raptor_utils import should_skip_raptor, get_skip_reason
|
from rag.utils.raptor_utils import should_skip_raptor, get_skip_reason
|
||||||
from common.log_utils import init_root_logger
|
from common.log_utils import init_root_logger
|
||||||
from common.config_utils import show_configs
|
from common.config_utils import show_configs
|
||||||
from graphrag.general.index import run_graphrag_for_kb
|
from graphrag.general.index import run_graphrag_for_kb
|
||||||
from graphrag.utils import get_llm_cache, set_llm_cache, get_tags_from_cache, set_tags_to_cache
|
from graphrag.utils import get_llm_cache, set_llm_cache, get_tags_from_cache, set_tags_to_cache
|
||||||
from rag.prompts.generator import keyword_extraction, question_proposal, content_tagging, run_toc_from_text
|
from rag.prompts.generator import keyword_extraction, question_proposal, content_tagging, run_toc_from_text, \
|
||||||
|
gen_metadata
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@ -368,6 +368,45 @@ async def build_chunks(task, progress_callback):
|
|||||||
raise
|
raise
|
||||||
progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st))
|
progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st))
|
||||||
|
|
||||||
|
if task["parser_config"].get("enable_metadata", False) and task["parser_config"].get("metadata"):
|
||||||
|
st = timer()
|
||||||
|
progress_callback(msg="Start to generate meta-data for every chunk ...")
|
||||||
|
chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])
|
||||||
|
|
||||||
|
async def gen_metadata_task(chat_mdl, d):
|
||||||
|
cached = get_llm_cache(chat_mdl.llm_name, d["content_with_weight"], "metadata")
|
||||||
|
if not cached:
|
||||||
|
async with chat_limiter:
|
||||||
|
cached = await gen_metadata(chat_mdl,
|
||||||
|
metadata_schema(task["parser_config"]["metadata"]),
|
||||||
|
d["content_with_weight"])
|
||||||
|
set_llm_cache(chat_mdl.llm_name, d["content_with_weight"], cached, "metadata")
|
||||||
|
if cached:
|
||||||
|
d["metadata_obj"] = cached
|
||||||
|
tasks = []
|
||||||
|
for d in docs:
|
||||||
|
tasks.append(asyncio.create_task(gen_metadata_task(chat_mdl, d)))
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=False)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("Error in doc_question_proposal", exc_info=e)
|
||||||
|
for t in tasks:
|
||||||
|
t.cancel()
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
raise
|
||||||
|
metadata = {}
|
||||||
|
for ck in cks:
|
||||||
|
metadata = update_metadata_to(metadata, ck["metadata_obj"])
|
||||||
|
del ck["metadata_obj"]
|
||||||
|
if metadata:
|
||||||
|
e, doc = DocumentService.get_by_id(task["doc_id"])
|
||||||
|
if e:
|
||||||
|
if isinstance(doc.meta_fields, str):
|
||||||
|
doc.meta_fields = json.loads(doc.meta_fields)
|
||||||
|
metadata = update_metadata_to(metadata, doc.meta_fields)
|
||||||
|
DocumentService.update_by_id(task["doc_id"], {"meta_fields": metadata})
|
||||||
|
progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st))
|
||||||
|
|
||||||
if task["kb_parser_config"].get("tag_kb_ids", []):
|
if task["kb_parser_config"].get("tag_kb_ids", []):
|
||||||
progress_callback(msg="Start to tag for every chunk ...")
|
progress_callback(msg="Start to tag for every chunk ...")
|
||||||
kb_ids = task["kb_parser_config"]["tag_kb_ids"]
|
kb_ids = task["kb_parser_config"]["tag_kb_ids"]
|
||||||
@ -602,36 +641,6 @@ async def run_dataflow(task: dict):
|
|||||||
|
|
||||||
|
|
||||||
metadata = {}
|
metadata = {}
|
||||||
def dict_update(meta):
|
|
||||||
nonlocal metadata
|
|
||||||
if not meta:
|
|
||||||
return
|
|
||||||
if isinstance(meta, str):
|
|
||||||
try:
|
|
||||||
meta = json_repair.loads(meta)
|
|
||||||
except Exception:
|
|
||||||
logging.error("Meta data format error.")
|
|
||||||
return
|
|
||||||
if not isinstance(meta, dict):
|
|
||||||
return
|
|
||||||
for k, v in meta.items():
|
|
||||||
if isinstance(v, list):
|
|
||||||
v = [vv for vv in v if isinstance(vv, str)]
|
|
||||||
if not v:
|
|
||||||
continue
|
|
||||||
if not isinstance(v, list) and not isinstance(v, str):
|
|
||||||
continue
|
|
||||||
if k not in metadata:
|
|
||||||
metadata[k] = v
|
|
||||||
continue
|
|
||||||
if isinstance(metadata[k], list):
|
|
||||||
if isinstance(v, list):
|
|
||||||
metadata[k].extend(v)
|
|
||||||
else:
|
|
||||||
metadata[k].append(v)
|
|
||||||
else:
|
|
||||||
metadata[k] = v
|
|
||||||
|
|
||||||
for ck in chunks:
|
for ck in chunks:
|
||||||
ck["doc_id"] = doc_id
|
ck["doc_id"] = doc_id
|
||||||
ck["kb_id"] = [str(task["kb_id"])]
|
ck["kb_id"] = [str(task["kb_id"])]
|
||||||
@ -656,7 +665,7 @@ async def run_dataflow(task: dict):
|
|||||||
ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"])
|
ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"])
|
||||||
del ck["summary"]
|
del ck["summary"]
|
||||||
if "metadata" in ck:
|
if "metadata" in ck:
|
||||||
dict_update(ck["metadata"])
|
metadata = update_metadata_to(metadata, ck["metadata"])
|
||||||
del ck["metadata"]
|
del ck["metadata"]
|
||||||
if "content_with_weight" not in ck:
|
if "content_with_weight" not in ck:
|
||||||
ck["content_with_weight"] = ck["text"]
|
ck["content_with_weight"] = ck["text"]
|
||||||
@ -670,7 +679,7 @@ async def run_dataflow(task: dict):
|
|||||||
if e:
|
if e:
|
||||||
if isinstance(doc.meta_fields, str):
|
if isinstance(doc.meta_fields, str):
|
||||||
doc.meta_fields = json.loads(doc.meta_fields)
|
doc.meta_fields = json.loads(doc.meta_fields)
|
||||||
dict_update(doc.meta_fields)
|
metadata = update_metadata_to(metadata, doc.meta_fields)
|
||||||
DocumentService.update_by_id(doc_id, {"meta_fields": metadata})
|
DocumentService.update_by_id(doc_id, {"meta_fields": metadata})
|
||||||
|
|
||||||
start_ts = timer()
|
start_ts = timer()
|
||||||
|
|||||||
Reference in New Issue
Block a user