From 14616cf8457e688ea30275920612c5ec72a68777 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Fri, 28 Nov 2025 19:25:32 +0800 Subject: [PATCH] Feat: add child parent chunking method in backend. (#11598) ### What problem does this PR solve? #7996 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- agent/canvas.py | 20 +------- agent/component/begin.py | 3 +- api/apps/canvas_app.py | 71 ++------------------------ api/apps/document_app.py | 11 +++- api/db/services/dialog_service.py | 8 ++- api/db/services/file_service.py | 84 ++++++++++++++++++++++++++++++- rag/app/naive.py | 42 ++++++---------- rag/flow/splitter/splitter.py | 50 ++++++++++++++++-- rag/nlp/__init__.py | 26 ++++++++-- rag/svr/task_executor.py | 31 ++++++++++-- 10 files changed, 216 insertions(+), 130 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index 9e95a5611..5344d70c3 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import base64 import json import logging import re @@ -25,6 +24,7 @@ from typing import Any, Union, Tuple from agent.component import component_class from agent.component.base import ComponentBase +from api.db.services.file_service import FileService from api.db.services.task_service import has_canceled from common.misc_utils import get_uuid, hash_str2int from common.exceptions import TaskCanceledException @@ -372,7 +372,7 @@ class Canvas(Graph): for k in kwargs.keys(): if k in ["query", "user_id", "files"] and kwargs[k]: if k == "files": - self.globals[f"sys.{k}"] = self.get_files(kwargs[k]) + self.globals[f"sys.{k}"] = FileService.get_files(kwargs[k]) else: self.globals[f"sys.{k}"] = kwargs[k] if not self.globals["sys.conversation_turns"] : @@ -621,22 +621,6 @@ class Canvas(Graph): def get_component_input_elements(self, cpnnm): return self.components[cpnnm]["obj"].get_input_elements() - def get_files(self, files: Union[None, list[dict]]) -> list[str]: - from api.db.services.file_service import FileService - if not files: - return [] - def image_to_base64(file): - return "data:{};base64,{}".format(file["mime_type"], - base64.b64encode(FileService.get_blob(file["created_by"], file["id"])).decode("utf-8")) - exe = ThreadPoolExecutor(max_workers=5) - threads = [] - for file in files: - if file["mime_type"].find("image") >=0: - threads.append(exe.submit(image_to_base64, file)) - continue - threads.append(exe.submit(FileService.parse, file["name"], FileService.get_blob(file["created_by"], file["id"]), True, file["created_by"])) - return [th.result() for th in threads] - def tool_use_callback(self, agent_id: str, func_name: str, params: dict, result: Any, elapsed_time=None): agent_ids = agent_id.split("-->") agent_name = self.get_component_name(agent_ids[0]) diff --git a/agent/component/begin.py b/agent/component/begin.py index b5985bb7a..1314aff74 100644 --- a/agent/component/begin.py +++ b/agent/component/begin.py @@ -14,6 +14,7 @@ # limitations under the License. # from agent.component.fillup import UserFillUpParam, UserFillUp +from api.db.services.file_service import FileService class BeginParam(UserFillUpParam): @@ -48,7 +49,7 @@ class Begin(UserFillUp): if v.get("optional") and v.get("value", None) is None: v = None else: - v = self._canvas.get_files([v["value"]]) + v = FileService.get_files([v["value"]]) else: v = v.get("value") self.set_output(k, v) diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 86ffaedb1..afdb3269b 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -15,13 +15,10 @@ # import json import logging -import re -import sys from functools import partial -import trio from quart import request, Response, make_response from agent.component import LLM -from api.db import CanvasCategory, FileType +from api.db import CanvasCategory from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService, API4ConversationService from api.db.services.document_service import DocumentService from api.db.services.file_service import FileService @@ -38,7 +35,6 @@ from peewee import MySQLDatabase, PostgresqlDatabase from api.db.db_models import APIToken, Task import time -from api.utils.file_utils import filename_type, read_potential_broken_pdf from rag.flow.pipeline import Pipeline from rag.nlp import search from rag.utils.redis_conn import REDIS_CONN @@ -250,71 +246,10 @@ async def upload(canvas_id): return get_data_error_result(message="canvas not found.") user_id = cvs["user_id"] - def structured(filename, filetype, blob, content_type): - nonlocal user_id - if filetype == FileType.PDF.value: - blob = read_potential_broken_pdf(blob) - - location = get_uuid() - FileService.put_blob(user_id, location, blob) - - return { - "id": location, - "name": filename, - "size": sys.getsizeof(blob), - "extension": filename.split(".")[-1].lower(), - "mime_type": content_type, - "created_by": user_id, - "created_at": time.time(), - "preview_url": None - } - - if request.args.get("url"): - from crawl4ai import ( - AsyncWebCrawler, - BrowserConfig, - CrawlerRunConfig, - DefaultMarkdownGenerator, - PruningContentFilter, - CrawlResult - ) - try: - url = request.args.get("url") - filename = re.sub(r"\?.*", "", url.split("/")[-1]) - async def adownload(): - browser_config = BrowserConfig( - headless=True, - verbose=False, - ) - async with AsyncWebCrawler(config=browser_config) as crawler: - crawler_config = CrawlerRunConfig( - markdown_generator=DefaultMarkdownGenerator( - content_filter=PruningContentFilter() - ), - pdf=True, - screenshot=False - ) - result: CrawlResult = await crawler.arun( - url=url, - config=crawler_config - ) - return result - page = trio.run(adownload()) - if page.pdf: - if filename.split(".")[-1].lower() != "pdf": - filename += ".pdf" - return get_json_result(data=structured(filename, "pdf", page.pdf, page.response_headers["content-type"])) - - return get_json_result(data=structured(filename, "html", str(page.markdown).encode("utf-8"), page.response_headers["content-type"], user_id)) - - except Exception as e: - return server_error_response(e) - files = await request.files - file = files['file'] + file = files['file'] if files and files.get("file") else None try: - DocumentService.check_doc_health(user_id, file.filename) - return get_json_result(data=structured(file.filename, filename_type(file.filename), file.read(), file.content_type)) + return get_json_result(data=FileService.upload_info(user_id, file, request.args.get("url"))) except Exception as e: return server_error_response(e) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 7ec8c1587..bd2262919 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -607,7 +607,7 @@ async def get_image(image_id): @login_required @validate_request("conversation_id") async def upload_and_parse(): - files = await request.file + files = await request.files if "file" not in files: return get_json_result(data=False, message="No file part!", code=RetCode.ARGUMENT_ERROR) @@ -705,3 +705,12 @@ async def set_meta(): return get_json_result(data=True) except Exception as e: return server_error_response(e) + +@manager.route("/upload_info", methods=["POST"]) # noqa: F821 +async def upload_info(): + files = await request.files + file = files['file'] if files and files.get("file") else None + try: + return get_json_result(data=FileService.upload_info(current_user.id, file, request.args.get("url"))) + except Exception as e: + return server_error_response(e) diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index 558ba1b0f..ae79b45a6 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -25,6 +25,7 @@ import trio from langfuse import Langfuse from peewee import fn from agentic_reasoning import DeepResearcher +from api.db.services.file_service import FileService from common.constants import LLMType, ParserType, StatusEnum from api.db.db_models import DB, Dialog from api.db.services.common_service import CommonService @@ -380,8 +381,11 @@ def chat(dialog, messages, stream=True, **kwargs): retriever = settings.retriever questions = [m["content"] for m in messages if m["role"] == "user"][-3:] attachments = kwargs["doc_ids"].split(",") if "doc_ids" in kwargs else [] + attachments_= "" if "doc_ids" in messages[-1]: attachments = messages[-1]["doc_ids"] + if "files" in messages[-1]: + attachments_ = "\n\n".join(FileService.get_files(messages[-1]["files"])) prompt_config = dialog.prompt_config field_map = KnowledgebaseService.get_field_map(dialog.kb_ids) @@ -451,7 +455,7 @@ def chat(dialog, messages, stream=True, **kwargs): ), ) - for think in reasoner.thinking(kbinfos, " ".join(questions)): + for think in reasoner.thinking(kbinfos, attachments_ + " ".join(questions)): if isinstance(think, str): thought = think knowledges = [t for t in think.split("\n") if t] @@ -503,7 +507,7 @@ def chat(dialog, messages, stream=True, **kwargs): kwargs["knowledge"] = "\n------\n" + "\n\n------\n\n".join(knowledges) gen_conf = dialog.llm_setting - msg = [{"role": "system", "content": prompt_config["system"].format(**kwargs)}] + msg = [{"role": "system", "content": prompt_config["system"].format(**kwargs)+attachments_}] prompt4citation = "" if knowledges and (prompt_config.get("quote", True) and kwargs.get("quote", True)): prompt4citation = citation_prompt() diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 1fbecdafe..11ef5b454 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -13,10 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import asyncio +import base64 import logging import re +import sys +import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path +from typing import Union from peewee import fn @@ -520,7 +525,7 @@ class FileService(CommonService): if img_base64 and file_type == FileType.VISUAL.value: return GptV4.image2base64(blob) cks = FACTORY.get(FileService.get_parser(filename_type(filename), filename, ""), naive).chunk(filename, blob, **kwargs) - return "\n".join([ck["content_with_weight"] for ck in cks]) + return f"\n -----------------\nFile: {filename}\nContent as following: \n" + "\n".join([ck["content_with_weight"] for ck in cks]) @staticmethod def get_parser(doc_type, filename, default): @@ -588,3 +593,80 @@ class FileService(CommonService): errors += str(e) return errors + + @staticmethod + def upload_info(user_id, file, url: str|None=None): + def structured(filename, filetype, blob, content_type): + nonlocal user_id + if filetype == FileType.PDF.value: + blob = read_potential_broken_pdf(blob) + + location = get_uuid() + FileService.put_blob(user_id, location, blob) + + return { + "id": location, + "name": filename, + "size": sys.getsizeof(blob), + "extension": filename.split(".")[-1].lower(), + "mime_type": content_type, + "created_by": user_id, + "created_at": time.time(), + "preview_url": None + } + + if url: + from crawl4ai import ( + AsyncWebCrawler, + BrowserConfig, + CrawlerRunConfig, + DefaultMarkdownGenerator, + PruningContentFilter, + CrawlResult + ) + filename = re.sub(r"\?.*", "", url.split("/")[-1]) + async def adownload(): + browser_config = BrowserConfig( + headless=True, + verbose=False, + ) + async with AsyncWebCrawler(config=browser_config) as crawler: + crawler_config = CrawlerRunConfig( + markdown_generator=DefaultMarkdownGenerator( + content_filter=PruningContentFilter() + ), + pdf=True, + screenshot=False + ) + result: CrawlResult = await crawler.arun( + url=url, + config=crawler_config + ) + return result + page = asyncio.run(adownload()) + if page.pdf: + if filename.split(".")[-1].lower() != "pdf": + filename += ".pdf" + return structured(filename, "pdf", page.pdf, page.response_headers["content-type"]) + + return structured(filename, "html", str(page.markdown).encode("utf-8"), page.response_headers["content-type"], user_id) + + DocumentService.check_doc_health(user_id, file.filename) + return structured(file.filename, filename_type(file.filename), file.read(), file.content_type) + + @staticmethod + def get_files(self, files: Union[None, list[dict]]) -> list[str]: + if not files: + return [] + def image_to_base64(file): + return "data:{};base64,{}".format(file["mime_type"], + base64.b64encode(FileService.get_blob(file["created_by"], file["id"])).decode("utf-8")) + exe = ThreadPoolExecutor(max_workers=5) + threads = [] + for file in files: + if file["mime_type"].find("image") >=0: + threads.append(exe.submit(image_to_base64, file)) + continue + threads.append(exe.submit(FileService.parse, file["name"], FileService.get_blob(file["created_by"], file["id"]), True, file["created_by"])) + return [th.result() for th in threads] + diff --git a/rag/app/naive.py b/rag/app/naive.py index 7872ebc22..18a956beb 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -39,6 +39,7 @@ from deepdoc.parser.docling_parser import DoclingParser from deepdoc.parser.tcadp_parser import TCADPParser from rag.nlp import concat_img, find_codec, naive_merge, naive_merge_with_images, naive_merge_docx, rag_tokenizer, tokenize_chunks, tokenize_chunks_with_images, tokenize_table, attach_media_context + def by_deepdoc(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", callback=None, pdf_cls = None ,**kwargs): callback = callback binary = binary @@ -600,8 +601,7 @@ def load_from_xml_v2(baseURI, rels_item_xml): srels._srels.append(_SerializedRelationship(baseURI, rel_elm)) return srels -def chunk(filename, binary=None, from_page=0, to_page=100000, - lang="Chinese", callback=None, **kwargs): +def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", callback=None, **kwargs): """ Supported file formats are docx, pdf, excel, txt. This method apply the naive ways to chunk files. @@ -611,14 +611,18 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, urls = set() url_res = [] - is_english = lang.lower() == "english" # is_english(cks) parser_config = kwargs.get( "parser_config", { "chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True}) + + child_deli = re.findall(r"`([^`]+)`", parser_config.get("children_delimiter", "")) + child_deli = sorted(set(child_deli), key=lambda x: -len(x)) + child_deli = "|".join(re.escape(t) for t in child_deli if t) + is_markdown = False table_context_size = max(0, int(parser_config.get("table_context_size", 0) or 0)) image_context_size = max(0, int(parser_config.get("image_context_size", 0) or 0)) - final_sections = False + doc = { "docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename)) @@ -679,12 +683,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, "chunk_token_num", 128)), parser_config.get( "delimiter", "\n!?。;!?")) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - chunks.extend(url_res) - return chunks - - res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images)) + res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images, child_delimiters_pattern=child_deli)) logging.info("naive_merge({}): {}".format(filename, timer() - st)) res.extend(embed_res) res.extend(url_res) @@ -780,7 +779,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, return_section_images=True, ) - final_sections = True + is_markdown = True try: vision_model = LLMBundle(kwargs["tenant_id"], LLMType.IMAGE2TEXT) @@ -857,7 +856,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, "file type not supported yet(pdf, xlsx, doc, docx, txt supported)") st = timer() - if final_sections: + if is_markdown: merged_chunks = [] merged_images = [] chunk_limit = max(0, int(parser_config.get("chunk_token_num", 128))) @@ -900,13 +899,11 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, chunks = merged_chunks has_images = merged_images and any(img is not None for img in merged_images) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - return chunks + if has_images: - res.extend(tokenize_chunks_with_images(chunks, doc, is_english, merged_images)) + res.extend(tokenize_chunks_with_images(chunks, doc, is_english, merged_images, child_delimiters_pattern=child_deli)) else: - res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser)) + res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser, child_delimiters_pattern=child_deli)) else: if section_images: if all(image is None for image in section_images): @@ -917,21 +914,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, int(parser_config.get( "chunk_token_num", 128)), parser_config.get( "delimiter", "\n!?。;!?")) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - return chunks - - res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images)) + res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images, child_delimiters_pattern=child_deli)) else: chunks = naive_merge( sections, int(parser_config.get( "chunk_token_num", 128)), parser_config.get( "delimiter", "\n!?。;!?")) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - return chunks - res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser)) + res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser, child_delimiters_pattern=child_deli)) if urls and parser_config.get("analyze_hyperlink", False) and is_root: for index, url in enumerate(urls): diff --git a/rag/flow/splitter/splitter.py b/rag/flow/splitter/splitter.py index 7e687ad71..c790790cb 100644 --- a/rag/flow/splitter/splitter.py +++ b/rag/flow/splitter/splitter.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import random +import re +from copy import deepcopy from functools import partial - import trio - from common.misc_utils import get_uuid from rag.utils.base64_image import id2image, image2id from deepdoc.parser.pdf_parser import RAGFlowPdfParser @@ -32,6 +32,7 @@ class SplitterParam(ProcessParamBase): self.chunk_token_size = 512 self.delimiters = ["\n"] self.overlapped_percent = 0 + self.children_delimiters = [] def check(self): self.check_empty(self.delimiters, "Delimiters.") @@ -58,6 +59,14 @@ class Splitter(ProcessBase): deli += f"`{d}`" else: deli += d + child_deli = "" + for d in self._param.children_delimiters: + if len(d) > 1: + child_deli += f"`{d}`" + else: + child_deli += d + child_deli = [m.group(1) for m in re.finditer(r"`([^`]+)`", child_deli)] + custom_pattern = "|".join(re.escape(t) for t in sorted(set(child_deli), key=len, reverse=True)) self.set_output("output_format", "chunks") self.callback(random.randint(1, 5) / 100.0, "Start to split into chunks.") @@ -78,7 +87,23 @@ class Splitter(ProcessBase): deli, self._param.overlapped_percent, ) - self.set_output("chunks", [{"text": c.strip()} for c in cks if c.strip()]) + if custom_pattern: + docs = [] + for c in cks: + if not c.strip(): + continue + split_sec = re.split(r"(%s)" % custom_pattern, c, flags=re.DOTALL) + if split_sec: + for txt in split_sec: + docs.append({ + "text": txt, + "mom": c + }) + else: + docs.append({"text": c}) + self.set_output("chunks", docs) + else: + self.set_output("chunks", [{"text": c.strip()} for c in cks if c.strip()]) self.callback(1, "Done.") return @@ -100,12 +125,27 @@ class Splitter(ProcessBase): { "text": RAGFlowPdfParser.remove_tag(c), "image": img, - "positions": [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)], + "positions": [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)] } for c, img in zip(chunks, images) if c.strip() ] async with trio.open_nursery() as nursery: for d in cks: nursery.start_soon(image2id, d, partial(settings.STORAGE_IMPL.put, tenant_id=self._canvas._tenant_id), get_uuid()) - self.set_output("chunks", cks) + + if custom_pattern: + docs = [] + for c in cks: + split_sec = re.split(r"(%s)" % custom_pattern, c["text"], flags=re.DOTALL) + if split_sec: + c["mom"] = c["text"] + for txt in split_sec: + cc = deepcopy(c) + cc["text"] = txt + docs.append(cc) + else: + docs.append(c) + self.set_output("chunks", docs) + else: + self.set_output("chunks", cks) self.callback(1, "Done.") diff --git a/rag/nlp/__init__.py b/rag/nlp/__init__.py index 6f36a927a..ae932be85 100644 --- a/rag/nlp/__init__.py +++ b/rag/nlp/__init__.py @@ -264,14 +264,14 @@ def is_chinese(text): return False -def tokenize(d, t, eng): - d["content_with_weight"] = t - t = re.sub(r"]{0,12})?>", " ", t) +def tokenize(d, txt, eng): + d["content_with_weight"] = txt + t = re.sub(r"]{0,12})?>", " ", txt) d["content_ltks"] = rag_tokenizer.tokenize(t) d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"]) -def tokenize_chunks(chunks, doc, eng, pdf_parser=None): +def tokenize_chunks(chunks, doc, eng, pdf_parser=None, child_delimiters_pattern=None): res = [] # wrap up as es documents for ii, ck in enumerate(chunks): @@ -288,12 +288,21 @@ def tokenize_chunks(chunks, doc, eng, pdf_parser=None): pass else: add_positions(d, [[ii]*5]) + + if child_delimiters_pattern: + d["mom_with_weight"] = ck + for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL): + dd = copy.deepcopy(d) + tokenize(dd, txt, eng) + res.append(dd) + continue + tokenize(d, ck, eng) res.append(d) return res -def tokenize_chunks_with_images(chunks, doc, eng, images): +def tokenize_chunks_with_images(chunks, doc, eng, images, child_delimiters_pattern=None): res = [] # wrap up as es documents for ii, (ck, image) in enumerate(zip(chunks, images)): @@ -303,6 +312,13 @@ def tokenize_chunks_with_images(chunks, doc, eng, images): d = copy.deepcopy(doc) d["image"] = image add_positions(d, [[ii]*5]) + if child_delimiters_pattern: + d["mom_with_weight"] = ck + for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL): + dd = copy.deepcopy(d) + tokenize(dd, txt, eng) + res.append(dd) + continue tokenize(d, ck, eng) res.append(d) return res diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 370bd2a10..d7cbced0c 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -128,9 +128,6 @@ def signal_handler(sig, frame): sys.exit(0) - - - def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."): try: if prog is not None and prog < 0: @@ -720,6 +717,34 @@ async def delete_image(kb_id, chunk_id): async def insert_es(task_id, task_tenant_id, task_dataset_id, chunks, progress_callback): + mothers = [] + mother_ids = set([]) + for ck in chunks: + mom = ck.get("mom") or ck.get("mom_with_weight") or "" + if not mom: + continue + id = xxhash.xxh64(mom.encode("utf-8")).hexdigest() + if id in mother_ids: + continue + mother_ids.add(id) + ck["mom_id"] = id + mom_ck = copy.deepcopy(ck) + mom_ck["id"] = id + mom_ck["content_with_weight"] = mom + mom_ck["available_int"] = 0 + flds = list(mom_ck.keys()) + for fld in flds: + if fld not in ["id", "content_with_weight", "doc_id", "kb_id", "available_int"]: + del mom_ck[fld] + mothers.append(mom_ck) + + for b in range(0, len(mothers), settings.DOC_BULK_SIZE): + await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(mothers[b:b + settings.DOC_BULK_SIZE], search.index_name(task_tenant_id), task_dataset_id)) + task_canceled = has_canceled(task_id) + if task_canceled: + progress_callback(-1, msg="Task has been canceled.") + return False + for b in range(0, len(chunks), settings.DOC_BULK_SIZE): doc_store_result = await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(chunks[b:b + settings.DOC_BULK_SIZE], search.index_name(task_tenant_id), task_dataset_id)) task_canceled = has_canceled(task_id)