mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-31 01:01:30 +08:00
Refactor code (#12305)
### What problem does this PR solve? as title ### Type of change - [x] Refactoring --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
@ -1061,8 +1061,8 @@ class RAGFlowPdfParser:
|
|||||||
|
|
||||||
self.total_page = len(self.pdf.pages)
|
self.total_page = len(self.pdf.pages)
|
||||||
|
|
||||||
except Exception:
|
except Exception as e:
|
||||||
logging.exception("RAGFlowPdfParser __images__")
|
logging.exception(f"RAGFlowPdfParser __images__, exception: {e}")
|
||||||
logging.info(f"__images__ dedupe_chars cost {timer() - start}s")
|
logging.info(f"__images__ dedupe_chars cost {timer() - start}s")
|
||||||
|
|
||||||
self.outlines = []
|
self.outlines = []
|
||||||
|
|||||||
@ -13,7 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import tempfile
|
import tempfile
|
||||||
@ -28,7 +28,7 @@ def chunk(filename, binary, tenant_id, lang, callback=None, **kwargs):
|
|||||||
doc["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(doc["title_tks"])
|
doc["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(doc["title_tks"])
|
||||||
|
|
||||||
# is it English
|
# is it English
|
||||||
eng = lang.lower() == "english" # is_english(sections)
|
is_english = lang.lower() == "english" # is_english(sections)
|
||||||
try:
|
try:
|
||||||
_, ext = os.path.splitext(filename)
|
_, ext = os.path.splitext(filename)
|
||||||
if not ext:
|
if not ext:
|
||||||
@ -49,7 +49,7 @@ def chunk(filename, binary, tenant_id, lang, callback=None, **kwargs):
|
|||||||
ans = seq2txt_mdl.transcription(tmp_path)
|
ans = seq2txt_mdl.transcription(tmp_path)
|
||||||
callback(0.8, "Sequence2Txt LLM respond: %s ..." % ans[:32])
|
callback(0.8, "Sequence2Txt LLM respond: %s ..." % ans[:32])
|
||||||
|
|
||||||
tokenize(doc, ans, eng)
|
tokenize(doc, ans, is_english)
|
||||||
return [doc]
|
return [doc]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
callback(prog=-1, msg=str(e))
|
callback(prog=-1, msg=str(e))
|
||||||
@ -57,6 +57,7 @@ def chunk(filename, binary, tenant_id, lang, callback=None, **kwargs):
|
|||||||
if tmp_path and os.path.exists(tmp_path):
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
try:
|
try:
|
||||||
os.unlink(tmp_path)
|
os.unlink(tmp_path)
|
||||||
except Exception:
|
except Exception as e:
|
||||||
|
logging.exception(f"Failed to remove temporary file: {tmp_path}, exception: {e}")
|
||||||
pass
|
pass
|
||||||
return []
|
return []
|
||||||
|
|||||||
@ -157,8 +157,8 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing...
|
|||||||
logging.info(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}")
|
logging.info(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}")
|
||||||
except DoesNotExist:
|
except DoesNotExist:
|
||||||
logging.warning(f"set_progress({task_id}) got exception DoesNotExist")
|
logging.warning(f"set_progress({task_id}) got exception DoesNotExist")
|
||||||
except Exception:
|
except Exception as e:
|
||||||
logging.exception(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}, got exception")
|
logging.exception(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}, got exception: {e}")
|
||||||
|
|
||||||
|
|
||||||
async def collect():
|
async def collect():
|
||||||
@ -166,6 +166,7 @@ async def collect():
|
|||||||
global UNACKED_ITERATOR
|
global UNACKED_ITERATOR
|
||||||
|
|
||||||
svr_queue_names = settings.get_svr_queue_names()
|
svr_queue_names = settings.get_svr_queue_names()
|
||||||
|
redis_msg = None
|
||||||
try:
|
try:
|
||||||
if not UNACKED_ITERATOR:
|
if not UNACKED_ITERATOR:
|
||||||
UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
|
UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
|
||||||
@ -176,8 +177,8 @@ async def collect():
|
|||||||
redis_msg = REDIS_CONN.queue_consumer(svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
|
redis_msg = REDIS_CONN.queue_consumer(svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME)
|
||||||
if redis_msg:
|
if redis_msg:
|
||||||
break
|
break
|
||||||
except Exception:
|
except Exception as e:
|
||||||
logging.exception("collect got exception")
|
logging.exception(f"collect got exception: {e}")
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
if not redis_msg:
|
if not redis_msg:
|
||||||
@ -1050,8 +1051,8 @@ async def do_handle_task(task):
|
|||||||
async def _maybe_insert_es(_chunks):
|
async def _maybe_insert_es(_chunks):
|
||||||
if has_canceled(task_id):
|
if has_canceled(task_id):
|
||||||
return True
|
return True
|
||||||
e = await insert_es(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback)
|
insert_result = await insert_es(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback)
|
||||||
return bool(e)
|
return bool(insert_result)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not await _maybe_insert_es(chunks):
|
if not await _maybe_insert_es(chunks):
|
||||||
@ -1101,10 +1102,9 @@ async def do_handle_task(task):
|
|||||||
search.index_name(task_tenant_id),
|
search.index_name(task_tenant_id),
|
||||||
task_dataset_id,
|
task_dataset_id,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception as e:
|
||||||
logging.exception(
|
logging.exception(
|
||||||
f"Remove doc({task_doc_id}) from docStore failed when task({task_id}) canceled."
|
f"Remove doc({task_doc_id}) from docStore failed when task({task_id}) canceled, exception: {e}")
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_task():
|
async def handle_task():
|
||||||
@ -1117,24 +1117,25 @@ async def handle_task():
|
|||||||
task_type = task["task_type"]
|
task_type = task["task_type"]
|
||||||
pipeline_task_type = TASK_TYPE_TO_PIPELINE_TASK_TYPE.get(task_type,
|
pipeline_task_type = TASK_TYPE_TO_PIPELINE_TASK_TYPE.get(task_type,
|
||||||
PipelineTaskType.PARSE) or PipelineTaskType.PARSE
|
PipelineTaskType.PARSE) or PipelineTaskType.PARSE
|
||||||
|
task_id = task["id"]
|
||||||
try:
|
try:
|
||||||
logging.info(f"handle_task begin for task {json.dumps(task)}")
|
logging.info(f"handle_task begin for task {json.dumps(task)}")
|
||||||
CURRENT_TASKS[task["id"]] = copy.deepcopy(task)
|
CURRENT_TASKS[task["id"]] = copy.deepcopy(task)
|
||||||
await do_handle_task(task)
|
await do_handle_task(task)
|
||||||
DONE_TASKS += 1
|
DONE_TASKS += 1
|
||||||
CURRENT_TASKS.pop(task["id"], None)
|
CURRENT_TASKS.pop(task_id, None)
|
||||||
logging.info(f"handle_task done for task {json.dumps(task)}")
|
logging.info(f"handle_task done for task {json.dumps(task)}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
FAILED_TASKS += 1
|
FAILED_TASKS += 1
|
||||||
CURRENT_TASKS.pop(task["id"], None)
|
CURRENT_TASKS.pop(task_id, None)
|
||||||
try:
|
try:
|
||||||
err_msg = str(e)
|
err_msg = str(e)
|
||||||
while isinstance(e, exceptiongroup.ExceptionGroup):
|
while isinstance(e, exceptiongroup.ExceptionGroup):
|
||||||
e = e.exceptions[0]
|
e = e.exceptions[0]
|
||||||
err_msg += ' -- ' + str(e)
|
err_msg += ' -- ' + str(e)
|
||||||
set_progress(task["id"], prog=-1, msg=f"[Exception]: {err_msg}")
|
set_progress(task_id, prog=-1, msg=f"[Exception]: {err_msg}")
|
||||||
except Exception:
|
except Exception as e:
|
||||||
|
logging.exception(f"[Exception]: {str(e)}")
|
||||||
pass
|
pass
|
||||||
logging.exception(f"handle_task got exception for task {json.dumps(task)}")
|
logging.exception(f"handle_task got exception for task {json.dumps(task)}")
|
||||||
finally:
|
finally:
|
||||||
@ -1207,8 +1208,8 @@ async def report_status():
|
|||||||
logging.info(f"{consumer_name} expired, removed")
|
logging.info(f"{consumer_name} expired, removed")
|
||||||
REDIS_CONN.srem("TASKEXE", consumer_name)
|
REDIS_CONN.srem("TASKEXE", consumer_name)
|
||||||
REDIS_CONN.delete(consumer_name)
|
REDIS_CONN.delete(consumer_name)
|
||||||
except Exception:
|
except Exception as e:
|
||||||
logging.exception("report_status got exception")
|
logging.exception(f"report_status got exception: {e}")
|
||||||
finally:
|
finally:
|
||||||
redis_lock.release()
|
redis_lock.release()
|
||||||
await asyncio.sleep(30)
|
await asyncio.sleep(30)
|
||||||
|
|||||||
Reference in New Issue
Block a user