Update progress info and start welcome info (#3768)

### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] Refactoring

---------

Signed-off-by: jinhai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2024-11-30 18:48:06 +08:00
committed by GitHub
parent d00297a763
commit e079656473
11 changed files with 107 additions and 59 deletions

View File

@ -56,12 +56,13 @@ from api.db.services.llm_service import LLMBundle
from api.db.services.task_service import TaskService
from api.db.services.file2document_service import File2DocumentService
from api import settings
from api.versions import get_ragflow_version
from api.db.db_models import close_connection
from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture, naive, one, audio, \
knowledge_graph, email
from rag.nlp import search, rag_tokenizer
from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor
from rag.settings import DOC_MAXIMUM_SIZE, SVR_QUEUE_NAME
from rag.settings import DOC_MAXIMUM_SIZE, SVR_QUEUE_NAME, print_rag_settings
from rag.utils import rmSpace, num_tokens_from_string
from rag.utils.redis_conn import REDIS_CONN, Payload
from rag.utils.storage_factory import STORAGE_IMPL
@ -395,8 +396,7 @@ def do_handle_task(r):
# TODO: exception handler
## set_progress(r["did"], -1, "ERROR: ")
callback(
msg="Finished slicing files ({} chunks in {:.2f}s). Start to embedding the content.".format(len(cks),
timer() - st)
msg="Generate {} chunks ({:.2f}s). Embedding chunks.".format(len(cks), timer() - st)
)
st = timer()
try:
@ -407,7 +407,7 @@ def do_handle_task(r):
tk_count = 0
raise
logging.info("Embedding elapsed({}): {:.2f}".format(r["name"], timer() - st))
callback(msg="Finished embedding (in {:.2f}s)! Start to build index!".format(timer() - st))
callback(msg="Finished embedding ({:.2f}s)!".format(timer() - st))
# logging.info(f"task_executor init_kb index {search.index_name(r["tenant_id"])} embd_mdl {embd_mdl.llm_name} vector length {vector_size}")
init_kb(r, vector_size)
chunk_count = len(set([c["id"] for c in cks]))
@ -420,7 +420,8 @@ def do_handle_task(r):
callback(prog=0.8 + 0.1 * (b + 1) / len(cks), msg="")
logging.info("Indexing elapsed({}): {:.2f}".format(r["name"], timer() - st))
if es_r:
callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!")
callback(-1,
"Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!")
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"])
logging.error('Insert chunk error: ' + str(es_r))
raise Exception('Insert chunk error: ' + str(es_r))
@ -429,13 +430,12 @@ def do_handle_task(r):
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"])
return
callback(msg="Indexing elapsed in {:.2f}s.".format(timer() - st))
callback(1., "Done!")
callback(1., msg="Index cost {:.2f}s.".format(timer() - st))
DocumentService.increment_chunk_num(
r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
logging.info(
"Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(
r["id"], tk_count, len(cks), timer() - st))
r["id"], tk_count, len(cks), timer() - st))
def handle_task():
@ -502,7 +502,7 @@ def analyze_heap(snapshot1: tracemalloc.Snapshot, snapshot2: tracemalloc.Snapsho
for stat in stats2[:10]:
msg += f"{stat}\n"
stats1_vs_2 = snapshot2.compare_to(snapshot1, 'lineno')
msg += f"{CONSUMER_NAME} memory usage increase from snapshot {snapshot_id-1} to snapshot {snapshot_id}:\n"
msg += f"{CONSUMER_NAME} memory usage increase from snapshot {snapshot_id - 1} to snapshot {snapshot_id}:\n"
for stat in stats1_vs_2[:10]:
msg += f"{stat}\n"
msg += f"{CONSUMER_NAME} detailed traceback for the top memory consumers:\n"
@ -512,7 +512,16 @@ def analyze_heap(snapshot1: tracemalloc.Snapshot, snapshot2: tracemalloc.Snapsho
def main():
logging.info(r"""
______ __ ______ __
/_ __/___ ______/ /__ / ____/ _____ _______ __/ /_____ _____
/ / / __ `/ ___/ //_/ / __/ | |/_/ _ \/ ___/ / / / __/ __ \/ ___/
/ / / /_/ (__ ) ,< / /____> </ __/ /__/ /_/ / /_/ /_/ / /
/_/ \__,_/____/_/|_| /_____/_/|_|\___/\___/\__,_/\__/\____/_/
""")
logging.info(f'TaskExecutor: RAGFlow version: {get_ragflow_version()}')
settings.init_settings()
print_rag_settings()
background_thread = threading.Thread(target=report_status)
background_thread.daemon = True
background_thread.start()
@ -527,11 +536,12 @@ def main():
while True:
handle_task()
num_tasks = DONE_TASKS + FAILED_TASKS
if TRACE_MALLOC_DELTA> 0 and num_tasks > 0 and num_tasks % TRACE_MALLOC_DELTA == 0:
if TRACE_MALLOC_DELTA > 0 and num_tasks > 0 and num_tasks % TRACE_MALLOC_DELTA == 0:
snapshot2 = tracemalloc.take_snapshot()
analyze_heap(snapshot1, snapshot2, int(num_tasks/TRACE_MALLOC_DELTA), num_tasks % TRACE_MALLOC_FULL == 0)
analyze_heap(snapshot1, snapshot2, int(num_tasks / TRACE_MALLOC_DELTA), num_tasks % TRACE_MALLOC_FULL == 0)
snapshot1 = snapshot2
snapshot2 = None
if __name__ == "__main__":
main()