Add resume parser and fix bugs (#59)

* Update .gitignore

* Update .gitignore

* Add resume parser and fix bugs
This commit is contained in:
KevinHuSh
2024-02-07 19:27:23 +08:00
committed by GitHub
parent eb8254e688
commit c5ea37cd30
16 changed files with 451 additions and 57 deletions

View File

@ -3,7 +3,6 @@ import re
from collections import Counter
from api.db import ParserType
from rag.cv.ppdetection import PPDet
from rag.parser import tokenize
from rag.nlp import huqie
from rag.parser.pdf_parser import HuParser

102
rag/app/resume.py Normal file
View File

@ -0,0 +1,102 @@
import copy
import json
import os
import re
import requests
from api.db.services.knowledgebase_service import KnowledgebaseService
from rag.nlp import huqie
from rag.settings import cron_logger
from rag.utils import rmSpace
def chunk(filename, binary=None, callback=None, **kwargs):
if not re.search(r"\.(pdf|doc|docx|txt)$", filename, flags=re.IGNORECASE): raise NotImplementedError("file type not supported yet(pdf supported)")
url = os.environ.get("INFINIFLOW_SERVER")
if not url:raise EnvironmentError("Please set environment variable: 'INFINIFLOW_SERVER'")
token = os.environ.get("INFINIFLOW_TOKEN")
if not token:raise EnvironmentError("Please set environment variable: 'INFINIFLOW_TOKEN'")
if not binary:
with open(filename, "rb") as f: binary = f.read()
def remote_call():
nonlocal filename, binary
for _ in range(3):
try:
res = requests.post(url + "/v1/layout/resume/", files=[(filename, binary)],
headers={"Authorization": token}, timeout=180)
res = res.json()
if res["retcode"] != 0: raise RuntimeError(res["retmsg"])
return res["data"]
except RuntimeError as e:
raise e
except Exception as e:
cron_logger.error("resume parsing:" + str(e))
resume = remote_call()
print(json.dumps(resume, ensure_ascii=False, indent=2))
field_map = {
"name_kwd": "姓名/名字",
"gender_kwd": "性别(男,女)",
"age_int": "年龄/岁/年纪",
"phone_kwd": "电话/手机/微信",
"email_tks": "email/e-mail/邮箱",
"position_name_tks": "职位/职能/岗位/职责",
"expect_position_name_tks": "期望职位/期望职能/期望岗位",
"hightest_degree_kwd": "最高学历高中职高硕士本科博士初中中技中专专科专升本MPAMBAEMBA",
"first_degree_kwd": "第一学历高中职高硕士本科博士初中中技中专专科专升本MPAMBAEMBA",
"first_major_tks": "第一学历专业",
"first_school_name_tks": "第一学历毕业学校",
"edu_first_fea_kwd": "第一学历标签211留学双一流985海外知名重点大学中专专升本专科本科大专",
"degree_kwd": "过往学历高中职高硕士本科博士初中中技中专专科专升本MPAMBAEMBA",
"major_tks": "学过的专业/过往专业",
"school_name_tks": "学校/毕业院校",
"sch_rank_kwd": "学校标签(顶尖学校,精英学校,优质学校,一般学校)",
"edu_fea_kwd": "教育标签211留学双一流985海外知名重点大学中专专升本专科本科大专",
"work_exp_flt": "工作年限/工作年份/N年经验/毕业了多少年",
"birth_dt": "生日/出生年份",
"corp_nm_tks": "就职过的公司/之前的公司/上过班的公司",
"corporation_name_tks": "最近就职(上班)的公司/上一家公司",
"edu_end_int": "毕业年份",
"expect_city_names_tks": "期望城市",
"industry_name_tks": "所在行业"
}
titles = []
for n in ["name_kwd", "gender_kwd", "position_name_tks", "age_int"]:
v = resume.get(n, "")
if isinstance(v, list):v = v[0]
if n.find("tks") > 0: v = rmSpace(v)
titles.append(str(v))
doc = {
"docnm_kwd": filename,
"title_tks": huqie.qie("-".join(titles)+"-简历")
}
doc["title_sm_tks"] = huqie.qieqie(doc["title_tks"])
pairs = []
for n,m in field_map.items():
if not resume.get(n):continue
v = resume[n]
if isinstance(v, list):v = " ".join(v)
if n.find("tks") > 0: v = rmSpace(v)
pairs.append((m, str(v)))
doc["content_with_weight"] = "\n".join(["{}: {}".format(re.sub(r"[^]+", "", k), v) for k,v in pairs])
doc["content_ltks"] = huqie.qie(doc["content_with_weight"])
doc["content_sm_ltks"] = huqie.qieqie(doc["content_ltks"])
for n, _ in field_map.items(): doc[n] = resume[n]
print(doc)
KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": field_map})
return [doc]
if __name__ == "__main__":
import sys
def dummy(a, b):
pass
chunk(sys.argv[1], callback=dummy)

View File

@ -1,13 +1,13 @@
import copy
import random
import re
from io import BytesIO
from xpinyin import Pinyin
import numpy as np
import pandas as pd
from nltk import word_tokenize
from openpyxl import load_workbook
from dateutil.parser import parse as datetime_parse
from api.db.services.knowledgebase_service import KnowledgebaseService
from rag.parser import is_english, tokenize
from rag.nlp import huqie, stemmer
@ -27,18 +27,19 @@ class Excel(object):
ws = wb[sheetname]
rows = list(ws.rows)
headers = [cell.value for cell in rows[0]]
missed = set([i for i,h in enumerate(headers) if h is None])
headers = [cell.value for i,cell in enumerate(rows[0]) if i not in missed]
missed = set([i for i, h in enumerate(headers) if h is None])
headers = [cell.value for i, cell in enumerate(rows[0]) if i not in missed]
data = []
for i, r in enumerate(rows[1:]):
row = [cell.value for ii,cell in enumerate(r) if ii not in missed]
row = [cell.value for ii, cell in enumerate(r) if ii not in missed]
if len(row) != len(headers):
fails.append(str(i))
continue
data.append(row)
done += 1
if done % 999 == 0:
callback(done * 0.6/total, ("Extract records: {}".format(len(res)) + (f"{len(fails)} failure({sheetname}), line: %s..."%(",".join(fails[:3])) if fails else "")))
callback(done * 0.6 / total, ("Extract records: {}".format(len(res)) + (
f"{len(fails)} failure({sheetname}), line: %s..." % (",".join(fails[:3])) if fails else "")))
res.append(pd.DataFrame(np.array(data), columns=headers))
callback(0.6, ("Extract records: {}. ".format(done) + (
@ -61,9 +62,10 @@ def trans_bool(s):
def column_data_type(arr):
uni = len(set([a for a in arr if a is not None]))
counts = {"int": 0, "float": 0, "text": 0, "datetime": 0, "bool": 0}
trans = {t:f for f,t in [(int, "int"), (float, "float"), (trans_datatime, "datetime"), (trans_bool, "bool"), (str, "text")]}
trans = {t: f for f, t in
[(int, "int"), (float, "float"), (trans_datatime, "datetime"), (trans_bool, "bool"), (str, "text")]}
for a in arr:
if a is None:continue
if a is None: continue
if re.match(r"[+-]?[0-9]+(\.0+)?$", str(a).replace("%%", "")):
counts["int"] += 1
elif re.match(r"[+-]?[0-9.]+$", str(a).replace("%%", "")):
@ -72,17 +74,18 @@ def column_data_type(arr):
counts["bool"] += 1
elif trans_datatime(str(a)):
counts["datetime"] += 1
else: counts["text"] += 1
counts = sorted(counts.items(), key=lambda x: x[1]*-1)
else:
counts["text"] += 1
counts = sorted(counts.items(), key=lambda x: x[1] * -1)
ty = counts[0][0]
for i in range(len(arr)):
if arr[i] is None:continue
if arr[i] is None: continue
try:
arr[i] = trans[ty](str(arr[i]))
except Exception as e:
arr[i] = None
if ty == "text":
if len(arr) > 128 and uni/len(arr) < 0.1:
if len(arr) > 128 and uni / len(arr) < 0.1:
ty = "keyword"
return arr, ty
@ -123,48 +126,51 @@ def chunk(filename, binary=None, callback=None, **kwargs):
dfs = [pd.DataFrame(np.array(rows), columns=headers)]
else: raise NotImplementedError("file type not supported yet(excel, text, csv supported)")
else:
raise NotImplementedError("file type not supported yet(excel, text, csv supported)")
res = []
PY = Pinyin()
fieds_map = {"text": "_tks", "int": "_int", "keyword": "_kwd", "float": "_flt", "datetime": "_dt", "bool": "_kwd"}
for df in dfs:
for n in ["id", "_id", "index", "idx"]:
if n in df.columns:del df[n]
if n in df.columns: del df[n]
clmns = df.columns.values
txts = list(copy.deepcopy(clmns))
py_clmns = [PY.get_pinyins(n)[0].replace("-", "_") for n in clmns]
clmn_tys = []
for j in range(len(clmns)):
cln,ty = column_data_type(df[clmns[j]])
cln, ty = column_data_type(df[clmns[j]])
clmn_tys.append(ty)
df[clmns[j]] = cln
if ty == "text": txts.extend([str(c) for c in cln if c])
clmns_map = [(py_clmns[j] + fieds_map[clmn_tys[j]], clmns[j]) for i in range(len(clmns))]
# TODO: set this column map to KB parser configuration
eng = is_english(txts)
for ii,row in df.iterrows():
for ii, row in df.iterrows():
d = {}
row_txt = []
for j in range(len(clmns)):
if row[clmns[j]] is None:continue
if row[clmns[j]] is None: continue
fld = clmns_map[j][0]
d[fld] = row[clmns[j]] if clmn_tys[j] != "text" else huqie.qie(row[clmns[j]])
row_txt.append("{}:{}".format(clmns[j], row[clmns[j]]))
if not row_txt:continue
if not row_txt: continue
tokenize(d, "; ".join(row_txt), eng)
print(d)
res.append(d)
KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": {k: v for k, v in clmns_map}})
callback(0.6, "")
return res
if __name__== "__main__":
if __name__ == "__main__":
import sys
def dummy(a, b):
pass
chunk(sys.argv[1], callback=dummy)
chunk(sys.argv[1], callback=dummy)

View File

@ -74,7 +74,9 @@ class Dealer:
s = s.highlight("title_ltks")
if not qst:
s = s.sort(
{"create_time": {"order": "desc", "unmapped_type": "date"}})
{"create_time": {"order": "desc", "unmapped_type": "date"}},
{"create_timestamp_flt": {"order": "desc", "unmapped_type": "float"}}
)
if qst:
s = s.highlight_options(
@ -298,3 +300,22 @@ class Dealer:
ranks["doc_aggs"][dnm] += 1
return ranks
def sql_retrieval(self, sql, fetch_size=128):
sql = re.sub(r"[ ]+", " ", sql)
replaces = []
for r in re.finditer(r" ([a-z_]+_l?tks like |[a-z_]+_l?tks ?= ?)'([^']+)'", sql):
fld, v = r.group(1), r.group(2)
fld = re.sub(r" ?(like|=)$", "", fld).lower()
if v[0] == "%%": v = v[1:-1]
match = " MATCH({}, '{}', 'operator=OR;fuzziness=AUTO:1,3;minimum_should_match=30%') ".format(fld, huqie.qie(v))
replaces.append((r.group(1)+r.group(2), match))
for p, r in replaces: sql.replace(p, r)
try:
tbl = self.es.sql(sql, fetch_size)
return tbl
except Exception as e:
es_logger(f"SQL failure: {sql} =>" + str(e))

127
rag/nlp/surname.py Normal file
View File

@ -0,0 +1,127 @@
#-*- coding: utf-8 -*-
m = set(["","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","羿","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","宿","","怀",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","寿","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"广","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","",
"","","","西","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","鹿","",
"万俟","司马","上官","欧阳",
"夏侯","诸葛","闻人","东方",
"赫连","皇甫","尉迟","公羊",
"澹台","公冶","宗政","濮阳",
"淳于","单于","太叔","申屠",
"公孙","仲孙","轩辕","令狐",
"钟离","宇文","长孙","慕容",
"鲜于","闾丘","司徒","司空",
"亓官","司寇","仉督","子车",
"颛孙","端木","巫马","公西",
"漆雕","乐正","壤驷","公良",
"拓跋","夹谷","宰父","榖梁",
"","","","","","","","",
"段干","百里","东郭","南门",
"呼延","","","羊舌","","",
"","","","","","","","",
"梁丘","左丘","东门","西门",
"","","","","","","南宫",
"","","","","","","","",
"第五","",""])
def isit(n):return n.strip() in m

View File

@ -81,11 +81,13 @@ def dispatch():
tsks = []
if r["type"] == FileType.PDF.value:
pages = HuParser.total_page_number(r["name"], MINIO.get(r["kb_id"], r["location"]))
for p in range(0, pages, 10):
task = new_task()
task["from_page"] = p
task["to_page"] = min(p + 10, pages)
tsks.append(task)
for s,e in r["parser_config"].get("pages", [(0,100000)]):
e = min(e, pages)
for p in range(s, e, 10):
task = new_task()
task["from_page"] = p
task["to_page"] = min(p + 10, e)
tsks.append(task)
else:
tsks.append(new_task())
print(tsks)

View File

@ -58,7 +58,7 @@ FACTORY = {
}
def set_progress(task_id, from_page, to_page, prog=None, msg="Processing..."):
def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."):
cancel = TaskService.do_cancel(task_id)
if cancel:
msg += " [Canceled]"
@ -110,7 +110,7 @@ def collect(comm, mod, tm):
def build(row, cvmdl):
if row["size"] > DOC_MAXIMUM_SIZE:
set_progress(row["id"], -1, "File size exceeds( <= %dMb )" %
set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
(int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
return []
@ -119,7 +119,7 @@ def build(row, cvmdl):
try:
cron_logger.info("Chunkking {}/{}".format(row["location"], row["name"]))
cks = chunker.chunk(row["name"], MINIO.get(row["kb_id"], row["location"]), row["from_page"], row["to_page"],
callback)
callback, kb_id=row["kb_id"])
except Exception as e:
if re.search("(No such file|not found)", str(e)):
callback(-1, "Can not find file <%s>" % row["doc_name"])
@ -144,6 +144,7 @@ def build(row, cvmdl):
md5.update((ck["content_with_weight"] + str(d["doc_id"])).encode("utf-8"))
d["_id"] = md5.hexdigest()
d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
if not d.get("image"):
docs.append(d)
continue
@ -197,15 +198,15 @@ def main(comm, mod):
tmf = open(tm_fnm, "a+")
for _, r in rows.iterrows():
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"])
try:
embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING)
cv_mdl = LLMBundle(r["tenant_id"], LLMType.IMAGE2TEXT)
# TODO: sequence2text model
except Exception as e:
set_progress(r["id"], -1, str(e))
callback(prog=-1, msg=str(e))
continue
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"])
st_tm = timer()
cks = build(r, cv_mdl)
if not cks:

View File

@ -3,13 +3,14 @@ import json
import time
import copy
import elasticsearch
from elastic_transport import ConnectionTimeout
from elasticsearch import Elasticsearch
from elasticsearch_dsl import UpdateByQuery, Search, Index
from rag.settings import es_logger
from rag import settings
from rag.utils import singleton
es_logger.info("Elasticsearch version: "+ str(elasticsearch.__version__))
es_logger.info("Elasticsearch version: "+str(elasticsearch.__version__))
@singleton
@ -57,7 +58,7 @@ class HuEs:
body=d,
id=id,
doc_type="doc",
refresh=False,
refresh=True,
retry_on_conflict=100)
else:
r = self.es.update(
@ -65,7 +66,7 @@ class HuEs:
self.idxnm if not idxnm else idxnm),
body=d,
id=id,
refresh=False,
refresh=True,
retry_on_conflict=100)
es_logger.info("Successfully upsert: %s" % id)
T = True
@ -240,6 +241,18 @@ class HuEs:
es_logger.error("ES search timeout for 3 times!")
raise Exception("ES search timeout.")
def sql(self, sql, fetch_size=128, format="json", timeout=2):
for i in range(3):
try:
res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format, request_timeout=timeout)
return res
except ConnectionTimeout as e:
es_logger.error("Timeout【Q】" + sql)
continue
es_logger.error("ES search timeout for 3 times!")
raise ConnectionTimeout()
def get(self, doc_id, idxnm=None):
for i in range(3):
try:
@ -308,7 +321,8 @@ class HuEs:
try:
r = self.es.delete_by_query(
index=idxnm if idxnm else self.idxnm,
body=Search().query(query).to_dict())
refresh = True,
body=Search().query(query).to_dict())
return True
except Exception as e:
es_logger.error("ES updateByQuery deleteByQuery: " +