optimize srv broker and executor logic (#630)

### What problem does this PR solve?

Optimize task broker and executor for reduce memory usage and deployment
complexity.

### Type of change
- [x] Performance Improvement
- [x] Refactoring

### Change Log
- Enhance redis utils for message queue(use stream)
- Modify task broker logic via message queue (1.get parse event from
message queue 2.use ThreadPoolExecutor async executor )
- Modify the table column name of document and task (process_duation ->
process_duration maybe just a spelling mistake)
- Reformat some code style(just what i see)
- Add requirement_dev.txt for developer
- Add redis container on docker compose

---------

Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
Fakai Zhao
2024-05-07 11:43:33 +08:00
committed by GitHub
parent c6b6c748ae
commit de839fc3f0
20 changed files with 414 additions and 300 deletions

View File

@ -28,7 +28,7 @@ from functools import partial
from api.db.services.file2document_service import File2DocumentService
from rag.utils.minio_conn import MINIO
from api.db.db_models import close_connection
from rag.settings import database_logger
from rag.settings import database_logger, SVR_QUEUE_NAME
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
from multiprocessing import Pool
import numpy as np
@ -93,20 +93,29 @@ def set_progress(task_id, from_page=0, to_page=-1,
sys.exit()
def collect(comm, mod, tm):
tasks = TaskService.get_tasks(tm, mod, comm)
#print(tasks)
if len(tasks) == 0:
time.sleep(1)
def collect():
try:
payload = REDIS_CONN.queue_consumer(SVR_QUEUE_NAME, "rag_flow_svr_task_broker", "rag_flow_svr_task_consumer")
if not payload:
time.sleep(1)
return pd.DataFrame()
except Exception as e:
cron_logger.error("Get task event from queue exception:" + str(e))
return pd.DataFrame()
msg = payload.get_message()
payload.ack()
if not msg: return pd.DataFrame()
if TaskService.do_cancel(msg["id"]):
return pd.DataFrame()
tasks = TaskService.get_tasks(msg["id"])
assert tasks, "{} empty task!".format(msg["id"])
tasks = pd.DataFrame(tasks)
mtm = tasks["update_time"].max()
cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
return tasks
def get_minio_binary(bucket, name):
global MINIO
return MINIO.get(bucket, name)
@ -122,13 +131,10 @@ def build(row):
row["from_page"],
row["to_page"])
chunker = FACTORY[row["parser_id"].lower()]
pool = Pool(processes=1)
try:
st = timer()
bucket, name = File2DocumentService.get_minio_address(doc_id=row["doc_id"])
thr = pool.apply_async(get_minio_binary, args=(bucket, name))
binary = thr.get(timeout=90)
pool.terminate()
binary = get_minio_binary(bucket, name)
cron_logger.info(
"From minio({}) {}/{}".format(timer()-st, row["location"], row["name"]))
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
@ -147,7 +153,6 @@ def build(row):
else:
callback(-1, f"Internal server error: %s" %
str(e).replace("'", ""))
pool.terminate()
traceback.print_exc()
cron_logger.error(
@ -238,20 +243,13 @@ def embedding(docs, mdl, parser_config={}, callback=None):
return tk_count
def main(comm, mod):
tm_fnm = os.path.join(
get_project_base_directory(),
"rag/res",
f"{comm}-{mod}.tm")
tm = findMaxTm(tm_fnm)
rows = collect(comm, mod, tm)
def main():
rows = collect()
if len(rows) == 0:
return
tmf = open(tm_fnm, "a+")
for _, r in rows.iterrows():
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"])
#callback(random.random()/10., "Task has been received.")
try:
embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"])
except Exception as e:
@ -265,7 +263,6 @@ def main(comm, mod):
if cks is None:
continue
if not cks:
tmf.write(str(r["update_time"]) + "\n")
callback(1., "No chunk! Done!")
continue
# TODO: exception handler
@ -305,8 +302,6 @@ def main(comm, mod):
"Chunk doc({}), token({}), chunks({}), elapsed:{}".format(
r["id"], tk_count, len(cks), timer()-st))
tmf.write(str(r["update_time"]) + "\n")
tmf.close()
if __name__ == "__main__":
@ -315,8 +310,6 @@ if __name__ == "__main__":
peewee_logger.addHandler(database_logger.handlers[0])
peewee_logger.setLevel(database_logger.level)
#from mpi4py import MPI
#comm = MPI.COMM_WORLD
while True:
main(int(sys.argv[2]), int(sys.argv[1]))
main()
close_connection()