diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index f0dac16f4..c191b556c 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -440,9 +440,7 @@ def list_canvas(): desc = False else: desc = True - owner_ids = request.args.get("owner_ids", []) - if owner_ids and isinstance(owner_ids, str): - owner_ids = [owner_ids] + owner_ids = [id for id in request.args.get("owner_ids", "").strip().split(",") if id] if not owner_ids: tenants = TenantService.get_joined_tenants_by_user_id(current_user.id) tenants = [m["tenant_id"] for m in tenants] diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index e14dd5264..4c169aecb 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -28,7 +28,7 @@ from api.db.services.task_service import TaskService, GRAPH_RAPTOR_FAKE_DOC_ID from api.db.services.user_service import TenantService, UserTenantService from api.utils.api_utils import get_error_data_result, server_error_response, get_data_error_result, validate_request, not_allowed_parameters from api.utils import get_uuid -from api.db import PipelineTaskType, StatusEnum, FileSource, VALID_FILE_TYPES +from api.db import PipelineTaskType, StatusEnum, FileSource, VALID_FILE_TYPES, VALID_TASK_STATUS from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.db_models import File from api.utils.api_utils import get_json_result @@ -453,7 +453,7 @@ def list_pipeline_logs(): operation_status = req.get("operation_status", []) if operation_status: - invalid_status = {s for s in operation_status if s not in ["success", "failed", "running", "pending"]} + invalid_status = {s for s in operation_status if s not in VALID_TASK_STATUS} if invalid_status: return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}") @@ -495,7 +495,7 @@ def list_pipeline_dataset_logs(): operation_status = req.get("operation_status", []) if operation_status: - invalid_status = {s for s in operation_status if s not in ["success", "failed", "running", "pending"]} + invalid_status = {s for s in operation_status if s not in VALID_TASK_STATUS} if invalid_status: return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}") diff --git a/api/db/services/pipeline_operation_log_service.py b/api/db/services/pipeline_operation_log_service.py index 2860c50b1..7bfe56c80 100644 --- a/api/db/services/pipeline_operation_log_service.py +++ b/api/db/services/pipeline_operation_log_service.py @@ -176,7 +176,7 @@ class PipelineOperationLogService(CommonService): with DB.atomic(): obj = cls.save(**log) - limit = int(os.getenv("PIPELINE_OPERATION_LOG_LIMIT", 1)) + limit = int(os.getenv("PIPELINE_OPERATION_LOG_LIMIT", 1000)) total = cls.model.select().where(cls.model.kb_id == document.kb_id).count() if total > limit: diff --git a/deepdoc/parser/pdf_parser.py b/deepdoc/parser/pdf_parser.py index 7cb4989cb..ecc0b30eb 100644 --- a/deepdoc/parser/pdf_parser.py +++ b/deepdoc/parser/pdf_parser.py @@ -1104,12 +1104,14 @@ class RAGFlowPdfParser: if self.boxes[min_i]["bottom"] < top+self.page_cum_height[pn]: min_i += 1 self.boxes.insert(min_i, { - "page_number": pn+1, "x0": left, "x1": right, "top": top+self.page_cum_height[pn], "bottom": bott+self.page_cum_height[pn], "layout_type": layout_type, "text": txt, "image": img + "page_number": pn+1, "x0": left, "x1": right, "top": top+self.page_cum_height[pn], "bottom": bott+self.page_cum_height[pn], "layout_type": layout_type, "text": txt, "image": img, + "positions": [[pn+1, int(left), int(right), int(top), int(bott)]] }) for b in self.boxes: b["position_tag"] = self._line_tag(b, zoomin) b["image"] = self.crop(b["position_tag"], zoomin) + b["positions"] = [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(b["position_tag"])] insert_table_figures(tbls, "table") insert_table_figures(figs, "figure") diff --git a/rag/flow/extractor/extractor.py b/rag/flow/extractor/extractor.py index 36398ea9f..70464148a 100644 --- a/rag/flow/extractor/extractor.py +++ b/rag/flow/extractor/extractor.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import random +from copy import deepcopy from agent.component.llm import LLMParam, LLM from rag.flow.base import ProcessBase, ProcessParamBase @@ -40,7 +41,7 @@ class Extractor(ProcessBase, LLM): for k, v in inputs.items(): args[k] = v["value"] if isinstance(args[k], list): - chunks = args[k] + chunks = deepcopy(args[k]) chunks_key = k if chunks: diff --git a/rag/flow/parser/parser.py b/rag/flow/parser/parser.py index e0d96d5ef..b154e7410 100644 --- a/rag/flow/parser/parser.py +++ b/rag/flow/parser/parser.py @@ -14,7 +14,6 @@ # limitations under the License. import io import json -import logging import os import random from functools import partial @@ -31,6 +30,7 @@ from api.utils import get_uuid from api.utils.base64_image import image2id from deepdoc.parser import ExcelParser from deepdoc.parser.pdf_parser import PlainParser, RAGFlowPdfParser, VisionParser +from rag.app.naive import Docx from rag.flow.base import ProcessBase, ProcessParamBase from rag.flow.parser.schema import ParserFromUpstream from rag.llm.cv_model import Base as VLM @@ -243,19 +243,13 @@ class Parser(ProcessBase): self.set_output("markdown", spreadsheet_parser.markdown(blob)) def _word(self, name, blob): - from tika import parser as word_parser - self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document") conf = self._param.setups["word"] self.set_output("output_format", conf["output_format"]) - doc_parsed = word_parser.from_buffer(blob) - sections = [] - if doc_parsed.get("content"): - sections = doc_parsed["content"].split("\n") - sections = [{"text": section} for section in sections if section] - else: - logging.warning(f"tika.parser got empty content from {name}.") - + docx_parser = Docx() + sections, tbls = docx_parser(name, binary=blob) + sections = [{"text": section[0], "image": section[1]} for section in sections if section] + sections.extend([{"text": tb, "image": None} for ((_,tb), _) in tbls]) # json assert conf.get("output_format") == "json", "have to be json for doc" if conf.get("output_format") == "json": diff --git a/rag/flow/splitter/splitter.py b/rag/flow/splitter/splitter.py index c3aa9126c..9c6eb7bfd 100644 --- a/rag/flow/splitter/splitter.py +++ b/rag/flow/splitter/splitter.py @@ -100,7 +100,7 @@ class Splitter(ProcessBase): { "text": RAGFlowPdfParser.remove_tag(c), "image": img, - "positions": [[pos[0][-1], *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)], + "positions": [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)], } for c, img in zip(chunks, images) if c.strip() ] diff --git a/rag/llm/embedding_model.py b/rag/llm/embedding_model.py index 132efd35d..5f7f0cf82 100644 --- a/rag/llm/embedding_model.py +++ b/rag/llm/embedding_model.py @@ -756,7 +756,7 @@ class SILICONFLOWEmbed(Base): texts_batch = texts[i : i + batch_size] if self.model_name in ["BAAI/bge-large-zh-v1.5", "BAAI/bge-large-en-v1.5"]: # limit 512, 340 is almost safe - texts_batch = [" " if not text.strip() else truncate(text, 340) for text in texts_batch] + texts_batch = [" " if not text.strip() else truncate(text, 256) for text in texts_batch] else: texts_batch = [" " if not text.strip() else text for text in texts_batch] diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index ac3a709c1..996b6b94b 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -503,9 +503,9 @@ async def run_dataflow(task: dict): embedding_token_consumption = chunks.get("embedding_token_consumption", 0) if chunks.get("chunks"): - chunks = chunks["chunks"] + chunks = copy.deepcopy(chunks["chunks"]) elif chunks.get("json"): - chunks = chunks["json"] + chunks = copy.deepcopy(chunks["json"]) elif chunks.get("markdown"): chunks = [{"text": [chunks["markdown"]]}] elif chunks.get("text"): @@ -515,34 +515,40 @@ async def run_dataflow(task: dict): keys = [k for o in chunks for k in list(o.keys())] if not any([re.match(r"q_[0-9]+_vec", k) for k in keys]): - set_progress(task_id, prog=0.82, msg="\n-------------------------------------\nStart to embedding...") - e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) - embedding_id = kb.embd_id - embedding_model = LLMBundle(task["tenant_id"], LLMType.EMBEDDING, llm_name=embedding_id) - @timeout(60) - def batch_encode(txts): - nonlocal embedding_model - return embedding_model.encode([truncate(c, embedding_model.max_length - 10) for c in txts]) - vects = np.array([]) - texts = [o.get("questions", o.get("summary", o["text"])) for o in chunks] - delta = 0.20/(len(texts)//EMBEDDING_BATCH_SIZE+1) - prog = 0.8 - for i in range(0, len(texts), EMBEDDING_BATCH_SIZE): - async with embed_limiter: - vts, c = await trio.to_thread.run_sync(lambda: batch_encode(texts[i : i + EMBEDDING_BATCH_SIZE])) - if len(vects) == 0: - vects = vts - else: - vects = np.concatenate((vects, vts), axis=0) - embedding_token_consumption += c - prog += delta - if i % (len(texts)//EMBEDDING_BATCH_SIZE/100+1) == 1: - set_progress(task_id, prog=prog, msg=f"{i+1} / {len(texts)//EMBEDDING_BATCH_SIZE}") + try: + set_progress(task_id, prog=0.82, msg="\n-------------------------------------\nStart to embedding...") + e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) + embedding_id = kb.embd_id + embedding_model = LLMBundle(task["tenant_id"], LLMType.EMBEDDING, llm_name=embedding_id) + @timeout(60) + def batch_encode(txts): + nonlocal embedding_model + return embedding_model.encode([truncate(c, embedding_model.max_length - 10) for c in txts]) + vects = np.array([]) + texts = [o.get("questions", o.get("summary", o["text"])) for o in chunks] + delta = 0.20/(len(texts)//EMBEDDING_BATCH_SIZE+1) + prog = 0.8 + for i in range(0, len(texts), EMBEDDING_BATCH_SIZE): + async with embed_limiter: + vts, c = await trio.to_thread.run_sync(lambda: batch_encode(texts[i : i + EMBEDDING_BATCH_SIZE])) + if len(vects) == 0: + vects = vts + else: + vects = np.concatenate((vects, vts), axis=0) + embedding_token_consumption += c + prog += delta + if i % (len(texts)//EMBEDDING_BATCH_SIZE/100+1) == 1: + set_progress(task_id, prog=prog, msg=f"{i+1} / {len(texts)//EMBEDDING_BATCH_SIZE}") + + assert len(vects) == len(chunks) + for i, ck in enumerate(chunks): + v = vects[i].tolist() + ck["q_%d_vec" % len(v)] = v + except Exception as e: + set_progress(task_id, prog=-1, msg=f"[ERROR]: {e}") + PipelineOperationLogService.create(document_id=doc_id, pipeline_id=dataflow_id, task_type=PipelineTaskType.PARSE, dsl=str(pipeline)) + return - assert len(vects) == len(chunks) - for i, ck in enumerate(chunks): - v = vects[i].tolist() - ck["q_%d_vec" % len(v)] = v metadata = {} def dict_update(meta):