diff --git a/deepdoc/parser/pdf_parser.py b/deepdoc/parser/pdf_parser.py index 5328eae47..cb34ba68d 100644 --- a/deepdoc/parser/pdf_parser.py +++ b/deepdoc/parser/pdf_parser.py @@ -1061,8 +1061,8 @@ class RAGFlowPdfParser: self.total_page = len(self.pdf.pages) - except Exception: - logging.exception("RAGFlowPdfParser __images__") + except Exception as e: + logging.exception(f"RAGFlowPdfParser __images__, exception: {e}") logging.info(f"__images__ dedupe_chars cost {timer() - start}s") self.outlines = [] diff --git a/rag/app/audio.py b/rag/app/audio.py index 979659382..5bcb3d257 100644 --- a/rag/app/audio.py +++ b/rag/app/audio.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import logging import os import re import tempfile @@ -28,7 +28,7 @@ def chunk(filename, binary, tenant_id, lang, callback=None, **kwargs): doc["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(doc["title_tks"]) # is it English - eng = lang.lower() == "english" # is_english(sections) + is_english = lang.lower() == "english" # is_english(sections) try: _, ext = os.path.splitext(filename) if not ext: @@ -49,7 +49,7 @@ def chunk(filename, binary, tenant_id, lang, callback=None, **kwargs): ans = seq2txt_mdl.transcription(tmp_path) callback(0.8, "Sequence2Txt LLM respond: %s ..." % ans[:32]) - tokenize(doc, ans, eng) + tokenize(doc, ans, is_english) return [doc] except Exception as e: callback(prog=-1, msg=str(e)) @@ -57,6 +57,7 @@ def chunk(filename, binary, tenant_id, lang, callback=None, **kwargs): if tmp_path and os.path.exists(tmp_path): try: os.unlink(tmp_path) - except Exception: + except Exception as e: + logging.exception(f"Failed to remove temporary file: {tmp_path}, exception: {e}") pass return [] diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index ab75cd0c1..b793d9c35 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -157,8 +157,8 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing... logging.info(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}") except DoesNotExist: logging.warning(f"set_progress({task_id}) got exception DoesNotExist") - except Exception: - logging.exception(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}, got exception") + except Exception as e: + logging.exception(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}, got exception: {e}") async def collect(): @@ -166,6 +166,7 @@ async def collect(): global UNACKED_ITERATOR svr_queue_names = settings.get_svr_queue_names() + redis_msg = None try: if not UNACKED_ITERATOR: UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(svr_queue_names, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME) @@ -176,8 +177,8 @@ async def collect(): redis_msg = REDIS_CONN.queue_consumer(svr_queue_name, SVR_CONSUMER_GROUP_NAME, CONSUMER_NAME) if redis_msg: break - except Exception: - logging.exception("collect got exception") + except Exception as e: + logging.exception(f"collect got exception: {e}") return None, None if not redis_msg: @@ -1050,8 +1051,8 @@ async def do_handle_task(task): async def _maybe_insert_es(_chunks): if has_canceled(task_id): return True - e = await insert_es(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback) - return bool(e) + insert_result = await insert_es(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback) + return bool(insert_result) try: if not await _maybe_insert_es(chunks): @@ -1101,10 +1102,9 @@ async def do_handle_task(task): search.index_name(task_tenant_id), task_dataset_id, ) - except Exception: + except Exception as e: logging.exception( - f"Remove doc({task_doc_id}) from docStore failed when task({task_id}) canceled." - ) + f"Remove doc({task_doc_id}) from docStore failed when task({task_id}) canceled, exception: {e}") async def handle_task(): @@ -1117,24 +1117,25 @@ async def handle_task(): task_type = task["task_type"] pipeline_task_type = TASK_TYPE_TO_PIPELINE_TASK_TYPE.get(task_type, PipelineTaskType.PARSE) or PipelineTaskType.PARSE - + task_id = task["id"] try: logging.info(f"handle_task begin for task {json.dumps(task)}") CURRENT_TASKS[task["id"]] = copy.deepcopy(task) await do_handle_task(task) DONE_TASKS += 1 - CURRENT_TASKS.pop(task["id"], None) + CURRENT_TASKS.pop(task_id, None) logging.info(f"handle_task done for task {json.dumps(task)}") except Exception as e: FAILED_TASKS += 1 - CURRENT_TASKS.pop(task["id"], None) + CURRENT_TASKS.pop(task_id, None) try: err_msg = str(e) while isinstance(e, exceptiongroup.ExceptionGroup): e = e.exceptions[0] err_msg += ' -- ' + str(e) - set_progress(task["id"], prog=-1, msg=f"[Exception]: {err_msg}") - except Exception: + set_progress(task_id, prog=-1, msg=f"[Exception]: {err_msg}") + except Exception as e: + logging.exception(f"[Exception]: {str(e)}") pass logging.exception(f"handle_task got exception for task {json.dumps(task)}") finally: @@ -1207,8 +1208,8 @@ async def report_status(): logging.info(f"{consumer_name} expired, removed") REDIS_CONN.srem("TASKEXE", consumer_name) REDIS_CONN.delete(consumer_name) - except Exception: - logging.exception("report_status got exception") + except Exception as e: + logging.exception(f"report_status got exception: {e}") finally: redis_lock.release() await asyncio.sleep(30)