From 5f03a4de11e92eaff4ba6f963a11d4378ddc78ef Mon Sep 17 00:00:00 2001 From: KevinHuSh Date: Tue, 30 Apr 2024 19:00:41 +0800 Subject: [PATCH] remove redis (#629) ### What problem does this PR solve? ### Type of change - [x] Refactoring --- api/db/services/document_service.py | 12 ++++++------ rag/svr/task_broker.py | 6 ------ rag/svr/task_executor.py | 12 ------------ 3 files changed, 6 insertions(+), 24 deletions(-) diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 455a64e6e..d3db21208 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -16,6 +16,8 @@ from peewee import Expression from elasticsearch_dsl import Q + +from api.utils import current_timestamp from rag.utils.es_conn import ELASTICSEARCH from rag.utils.minio_conn import MINIO from rag.nlp import search @@ -90,7 +92,7 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() - def get_newly_uploaded(cls, tm, mod=0, comm=1, items_per_page=64): + def get_newly_uploaded(cls, tm): fields = [ cls.model.id, cls.model.kb_id, @@ -112,11 +114,9 @@ class DocumentService(CommonService): cls.model.status == StatusEnum.VALID.value, ~(cls.model.type == FileType.VIRTUAL.value), cls.model.progress == 0, - cls.model.update_time >= tm, - cls.model.run == TaskStatus.RUNNING.value, - (Expression(cls.model.create_time, "%%", comm) == mod))\ - .order_by(cls.model.update_time.asc())\ - .paginate(1, items_per_page) + cls.model.update_time >= current_timestamp() - 1000 * 600, + cls.model.run == TaskStatus.RUNNING.value)\ + .order_by(cls.model.update_time.asc()) return list(docs.dicts()) @classmethod diff --git a/rag/svr/task_broker.py b/rag/svr/task_broker.py index 97fb9b75c..82de2defe 100644 --- a/rag/svr/task_broker.py +++ b/rag/svr/task_broker.py @@ -90,12 +90,6 @@ def dispatch(): try: bucket, name = File2DocumentService.get_minio_address(doc_id=r["id"]) file_bin = MINIO.get(bucket, name) - if REDIS_CONN.is_alive(): - try: - REDIS_CONN.set("{}/{}".format(bucket, name), file_bin, 12*60) - except Exception as e: - cron_logger.warning("Put into redis[EXCEPTION]:" + str(e)) - if r["type"] == FileType.PDF.value: do_layout = r["parser_config"].get("layout_recognize", True) pages = PdfParser.total_page_number(r["name"], file_bin) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 4e9086984..981752973 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -107,18 +107,6 @@ def collect(comm, mod, tm): def get_minio_binary(bucket, name): global MINIO - if REDIS_CONN.is_alive(): - try: - for _ in range(30): - if REDIS_CONN.exist("{}/{}".format(bucket, name)): - time.sleep(1) - break - time.sleep(1) - r = REDIS_CONN.get("{}/{}".format(bucket, name)) - if r: return r - cron_logger.warning("Cache missing: {}".format(name)) - except Exception as e: - cron_logger.warning("Get redis[EXCEPTION]:" + str(e)) return MINIO.get(bucket, name)