From d207291217ed8c2819611c986fd7b30fe7c562c1 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Mon, 10 Nov 2025 13:28:07 +0800 Subject: [PATCH] Fix: add download stats to kb logs. (#11112) ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- agent/canvas.py | 2 +- agent/component/__init__.py | 5 ++--- agent/component/fillup.py | 9 ++++++--- api/apps/connector_app.py | 2 +- api/db/db_models.py | 5 +++++ api/db/services/connector_service.py | 2 +- api/db/services/document_service.py | 9 +++++++++ api/utils/api_utils.py | 2 +- conf/llm_factories.json | 1 + docker/entrypoint.sh | 16 ++++++++++++---- rag/app/naive.py | 4 +++- rag/svr/sync_data_source.py | 8 ++++++++ 12 files changed, 50 insertions(+), 15 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index c971409e3..3ad8c06c1 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -457,7 +457,7 @@ class Canvas(Graph): if o.component_name.lower() == "userfillup": another_inputs.update(o.get_input_elements()) if o.get_param("enable_tips"): - tips = o.get_param("tips") + tips = o.output("tips") self.path = path yield decorate("user_inputs", {"inputs": another_inputs, "tips": tips}) return diff --git a/agent/component/__init__.py b/agent/component/__init__.py index 010458b2a..d4a481518 100644 --- a/agent/component/__init__.py +++ b/agent/component/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import logging import os import importlib import inspect @@ -53,7 +52,7 @@ def component_class(class_name): for module_name in ["agent.component", "agent.tools", "rag.flow"]: try: return getattr(importlib.import_module(module_name), class_name) - except Exception as e: - logging.warning(f"Can't import module: {module_name}, error: {e}") + except Exception: + # logging.warning(f"Can't import module: {module_name}, error: {e}") pass assert False, f"Can't import {class_name}" diff --git a/agent/component/fillup.py b/agent/component/fillup.py index 5b57bed80..60009c101 100644 --- a/agent/component/fillup.py +++ b/agent/component/fillup.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from agent.component.base import ComponentBase, ComponentParamBase +from agent.component.message import MessageParam, Message -class UserFillUpParam(ComponentParamBase): +class UserFillUpParam(MessageParam): def __init__(self): super().__init__() @@ -27,10 +27,13 @@ class UserFillUpParam(ComponentParamBase): return True -class UserFillUp(ComponentBase): +class UserFillUp(Message): component_name = "UserFillUp" def _invoke(self, **kwargs): + if self._param.enable_tips: + tips, kwargs = self.get_kwargs(self._param.tips) + self.set_output("tips", tips) for k, v in kwargs.get("inputs", {}).items(): self.set_output(k, v) diff --git a/api/apps/connector_app.py b/api/apps/connector_app.py index 180e14c3b..25587533a 100644 --- a/api/apps/connector_app.py +++ b/api/apps/connector_app.py @@ -93,7 +93,7 @@ def resume(connector_id): @validate_request("kb_id") def rebuild(connector_id): req = request.json - err = ConnectorService.rebuild(connector_id, req["kb_id"], current_user.id) + err = ConnectorService.rebuild(req["kb_id"], connector_id, current_user.id) if err: return get_json_result(data=False, message=err, code=RetCode.SERVER_ERROR) return get_json_result(data=True) diff --git a/api/db/db_models.py b/api/db/db_models.py index 2ad1a22cc..68bf37ce4 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -669,6 +669,7 @@ class LLMFactories(DataBaseModel): name = CharField(max_length=128, null=False, help_text="LLM factory name", primary_key=True) logo = TextField(null=True, help_text="llm logo base64") tags = CharField(max_length=255, null=False, help_text="LLM, Text Embedding, Image2Text, ASR", index=True) + rank = IntegerField(default=0, index=False) status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True) def __str__(self): @@ -1287,4 +1288,8 @@ def migrate_db(): migrate(migrator.add_column("connector2kb", "auto_parse", CharField(max_length=1, null=False, default="1", index=False))) except Exception: pass + try: + migrate(migrator.add_column("llm_factories", "rank", IntegerField(default=0, index=False))) + except Exception: + pass logging.disable(logging.NOTSET) diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index 5d02f058f..c05c87c8e 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -72,7 +72,7 @@ class ConnectorService(CommonService): if not e: return SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id]) - docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") + docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}", kb_id=kb_id) err = FileService.delete_docs([d.id for d in docs], tenant_id) SyncLogsService.schedule(connector_id, kb_id, reindex=True) return err diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 9f2f35c1c..0f8dc4752 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -755,6 +755,14 @@ class DocumentService(CommonService): .where((cls.model.kb_id == kb_id) & (cls.model.run == TaskStatus.CANCEL)) .scalar() ) + downloaded = ( + cls.model.select(fn.COUNT(1)) + .where( + cls.model.kb_id == kb_id, + cls.model.source_type != "local" + ) + .scalar() + ) row = ( cls.model.select( @@ -791,6 +799,7 @@ class DocumentService(CommonService): "finished": int(row["finished"]), "failed": int(row["failed"]), "cancelled": int(cancelled), + "downloaded": int(downloaded) } @classmethod diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 96bda9a38..4cace9eca 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -625,7 +625,7 @@ async def is_strong_enough(chat_model, embedding_model): def get_allowed_llm_factories() -> list: - factories = list(LLMFactoriesService.get_all()) + factories = list(LLMFactoriesService.get_all(reverse=True, order_by="rank")) if settings.ALLOWED_LLM_FACTORIES is None: return factories diff --git a/conf/llm_factories.json b/conf/llm_factories.json index 324cb0b05..37680da9d 100644 --- a/conf/llm_factories.json +++ b/conf/llm_factories.json @@ -5,6 +5,7 @@ "logo": "", "tags": "LLM,TEXT EMBEDDING,TTS,TEXT RE-RANK,SPEECH2TEXT,MODERATION", "status": "1", + "rank": 999999990, "llm": [ { "llm_name": "gpt-5", diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index be3cabcac..f470fa6c1 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -167,7 +167,9 @@ function task_exe() { JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so" while true; do LD_PRELOAD="$JEMALLOC_PATH" \ - "$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}" + "$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}" & + wait; + sleep 1; done } @@ -238,21 +240,27 @@ if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then echo "Starting ragflow_server..." while true; do - "$PY" api/ragflow_server.py + "$PY" api/ragflow_server.py & + wait; + sleep 1; done & fi if [[ "${ENABLE_DATASYNC}" -eq 1 ]]; then echo "Starting data sync..." while true; do - "$PY" rag/svr/sync_data_source.py + "$PY" rag/svr/sync_data_source.py & + wait; + sleep 1; done & fi if [[ "${ENABLE_ADMIN_SERVER}" -eq 1 ]]; then echo "Starting admin_server..." while true; do - "$PY" admin/server/admin_server.py + "$PY" admin/server/admin_server.py & + wait; + sleep 1; done & fi diff --git a/rag/app/naive.py b/rag/app/naive.py index e3441d663..e3126783b 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -478,12 +478,14 @@ class Markdown(MarkdownParser): images = [] # Find all image URLs in text for url in image_urls: + if not url: + continue try: # check if the url is a local file or a remote URL if url.startswith(('http://', 'https://')): # For remote URLs, download the image response = requests.get(url, stream=True, timeout=30) - if response.status_code == 200 and response.headers['Content-Type'].startswith('image/'): + if response.status_code == 200 and response.headers['Content-Type'] and response.headers['Content-Type'].startswith('image/'): img = Image.open(BytesIO(response.content)).convert('RGB') images.append(img) else: diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index fd97a068c..5c6d84b90 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -258,6 +258,14 @@ func_factory = { async def dispatch_tasks(): async with trio.open_nursery() as nursery: + while True: + try: + list(SyncLogsService.list_sync_tasks()[0]) + break + except Exception as e: + logging.warning(f"DB is not ready yet: {e}") + await trio.sleep(3) + for task in SyncLogsService.list_sync_tasks()[0]: if task["poll_range_start"]: task["poll_range_start"] = task["poll_range_start"].astimezone(timezone.utc)