init README of deepdoc, add picture processer. (#71)

* init README of deepdoc, add picture processer.

* add resume parsing
This commit is contained in:
KevinHuSh
2024-02-23 18:28:12 +08:00
committed by GitHub
parent d32322c081
commit 7fd1eca582
42 changed files with 58319 additions and 350 deletions

View File

@ -21,6 +21,7 @@ import hashlib
import copy
import re
import sys
import traceback
from functools import partial
from timeit import default_timer as timer
@ -36,7 +37,7 @@ from rag.nlp import search
from io import BytesIO
import pandas as pd
from rag.app import laws, paper, presentation, manual, qa, table, book, resume
from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture
from api.db import LLMType, ParserType
from api.db.services.document_service import DocumentService
@ -56,47 +57,31 @@ FACTORY = {
ParserType.QA.value: qa,
ParserType.TABLE.value: table,
ParserType.RESUME.value: resume,
ParserType.PICTURE.value: picture,
}
def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."):
def set_progress(task_id, from_page=0, to_page=-1,
prog=None, msg="Processing..."):
if prog is not None and prog < 0:
msg = "[ERROR]"+msg
cancel = TaskService.do_cancel(task_id)
if cancel:
msg += " [Canceled]"
prog = -1
if to_page > 0: msg = f"Page({from_page}~{to_page}): " + msg
if to_page > 0:
msg = f"Page({from_page}~{to_page}): " + msg
d = {"progress_msg": msg}
if prog is not None: d["progress"] = prog
if prog is not None:
d["progress"] = prog
try:
TaskService.update_progress(task_id, d)
except Exception as e:
cron_logger.error("set_progress:({}), {}".format(task_id, str(e)))
if cancel:sys.exit()
"""
def chuck_doc(name, binary, tenant_id, cvmdl=None):
suff = os.path.split(name)[-1].lower().split(".")[-1]
if suff.find("pdf") >= 0:
return PDF(binary)
if suff.find("doc") >= 0:
return DOC(binary)
if re.match(r"(xlsx|xlsm|xltx|xltm)", suff):
return EXC(binary)
if suff.find("ppt") >= 0:
return PPT(binary)
if cvmdl and re.search(r"\.(jpg|jpeg|png|tif|gif|pcx|tga|exif|fpx|svg|psd|cdr|pcd|dxf|ufo|eps|ai|raw|WMF|webp|avif|apng|icon|ico)$",
name.lower()):
txt = cvmdl.describe(binary)
field = TextChunker.Fields()
field.text_chunks = [(txt, binary)]
field.table_chunks = []
return field
return TextChunker()(binary)
"""
if cancel:
sys.exit()
def collect(comm, mod, tm):
@ -109,29 +94,38 @@ def collect(comm, mod, tm):
return tasks
def build(row, cvmdl):
def build(row):
if row["size"] > DOC_MAXIMUM_SIZE:
set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
(int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
return []
callback = partial(set_progress, row["id"], row["from_page"], row["to_page"])
callback = partial(
set_progress,
row["id"],
row["from_page"],
row["to_page"])
chunker = FACTORY[row["parser_id"].lower()]
try:
cron_logger.info("Chunkking {}/{}".format(row["location"], row["name"]))
cks = chunker.chunk(row["name"], binary = MINIO.get(row["kb_id"], row["location"]), from_page=row["from_page"], to_page=row["to_page"],
callback = callback, kb_id=row["kb_id"], parser_config=row["parser_config"])
cron_logger.info(
"Chunkking {}/{}".format(row["location"], row["name"]))
cks = chunker.chunk(row["name"], binary=MINIO.get(row["kb_id"], row["location"]), from_page=row["from_page"],
to_page=row["to_page"], lang=row["language"], callback=callback,
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
except Exception as e:
if re.search("(No such file|not found)", str(e)):
callback(-1, "Can not find file <%s>" % row["doc_name"])
else:
callback(-1, f"Internal server error: %s" % str(e).replace("'", ""))
callback(-1, f"Internal server error: %s" %
str(e).replace("'", ""))
traceback.print_exc()
cron_logger.warn("Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
cron_logger.warn(
"Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
return
callback(msg="Finished slicing files. Start to embedding the content.")
callback(msg="Finished slicing files(%d). Start to embedding the content."%len(cks))
docs = []
doc = {
@ -142,7 +136,8 @@ def build(row, cvmdl):
d = copy.deepcopy(doc)
d.update(ck)
md5 = hashlib.md5()
md5.update((ck["content_with_weight"] + str(d["doc_id"])).encode("utf-8"))
md5.update((ck["content_with_weight"] +
str(d["doc_id"])).encode("utf-8"))
d["_id"] = md5.hexdigest()
d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
@ -173,7 +168,8 @@ def init_kb(row):
def embedding(docs, mdl, parser_config={}):
tts, cnts = [rmSpace(d["title_tks"]) for d in docs if d.get("title_tks")], [d["content_with_weight"] for d in docs]
tts, cnts = [rmSpace(d["title_tks"]) for d in docs if d.get("title_tks")], [
d["content_with_weight"] for d in docs]
tk_count = 0
if len(tts) == len(cnts):
tts, c = mdl.encode(tts)
@ -182,7 +178,8 @@ def embedding(docs, mdl, parser_config={}):
cnts, c = mdl.encode(cnts)
tk_count += c
title_w = float(parser_config.get("filename_embd_weight", 0.1))
vects = (title_w * tts + (1-title_w) * cnts) if len(tts) == len(cnts) else cnts
vects = (title_w * tts + (1 - title_w) *
cnts) if len(tts) == len(cnts) else cnts
assert len(vects) == len(docs)
for i, d in enumerate(docs):
@ -192,7 +189,10 @@ def embedding(docs, mdl, parser_config={}):
def main(comm, mod):
tm_fnm = os.path.join(get_project_base_directory(), "rag/res", f"{comm}-{mod}.tm")
tm_fnm = os.path.join(
get_project_base_directory(),
"rag/res",
f"{comm}-{mod}.tm")
tm = findMaxTm(tm_fnm)
rows = collect(comm, mod, tm)
if len(rows) == 0:
@ -203,15 +203,13 @@ def main(comm, mod):
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"])
try:
embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING)
cv_mdl = LLMBundle(r["tenant_id"], LLMType.IMAGE2TEXT)
# TODO: sequence2text model
except Exception as e:
callback(prog=-1, msg=str(e))
continue
st_tm = timer()
cks = build(r, cv_mdl)
if cks is None:continue
cks = build(r)
if cks is None:
continue
if not cks:
tmf.write(str(r["update_time"]) + "\n")
callback(1., "No chunk! Done!")
@ -233,11 +231,15 @@ def main(comm, mod):
cron_logger.error(str(es_r))
else:
if TaskService.do_cancel(r["id"]):
ELASTICSEARCH.deleteByQuery(Q("match", doc_id=r["doc_id"]), idxnm=search.index_name(r["tenant_id"]))
ELASTICSEARCH.deleteByQuery(
Q("match", doc_id=r["doc_id"]), idxnm=search.index_name(r["tenant_id"]))
continue
callback(1., "Done!")
DocumentService.increment_chunk_num(r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
cron_logger.info("Chunk doc({}), token({}), chunks({})".format(r["id"], tk_count, len(cks)))
DocumentService.increment_chunk_num(
r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
cron_logger.info(
"Chunk doc({}), token({}), chunks({})".format(
r["id"], tk_count, len(cks)))
tmf.write(str(r["update_time"]) + "\n")
tmf.close()