Fix: add download stats to kb logs. (#11112)

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Kevin Hu
2025-11-10 13:28:07 +08:00
committed by GitHub
parent bf382e5c4d
commit d207291217
12 changed files with 50 additions and 15 deletions

View File

@ -457,7 +457,7 @@ class Canvas(Graph):
if o.component_name.lower() == "userfillup": if o.component_name.lower() == "userfillup":
another_inputs.update(o.get_input_elements()) another_inputs.update(o.get_input_elements())
if o.get_param("enable_tips"): if o.get_param("enable_tips"):
tips = o.get_param("tips") tips = o.output("tips")
self.path = path self.path = path
yield decorate("user_inputs", {"inputs": another_inputs, "tips": tips}) yield decorate("user_inputs", {"inputs": another_inputs, "tips": tips})
return return

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
import logging
import os import os
import importlib import importlib
import inspect import inspect
@ -53,7 +52,7 @@ def component_class(class_name):
for module_name in ["agent.component", "agent.tools", "rag.flow"]: for module_name in ["agent.component", "agent.tools", "rag.flow"]:
try: try:
return getattr(importlib.import_module(module_name), class_name) return getattr(importlib.import_module(module_name), class_name)
except Exception as e: except Exception:
logging.warning(f"Can't import module: {module_name}, error: {e}") # logging.warning(f"Can't import module: {module_name}, error: {e}")
pass pass
assert False, f"Can't import {class_name}" assert False, f"Can't import {class_name}"

View File

@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
from agent.component.base import ComponentBase, ComponentParamBase from agent.component.message import MessageParam, Message
class UserFillUpParam(ComponentParamBase): class UserFillUpParam(MessageParam):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@ -27,10 +27,13 @@ class UserFillUpParam(ComponentParamBase):
return True return True
class UserFillUp(ComponentBase): class UserFillUp(Message):
component_name = "UserFillUp" component_name = "UserFillUp"
def _invoke(self, **kwargs): def _invoke(self, **kwargs):
if self._param.enable_tips:
tips, kwargs = self.get_kwargs(self._param.tips)
self.set_output("tips", tips)
for k, v in kwargs.get("inputs", {}).items(): for k, v in kwargs.get("inputs", {}).items():
self.set_output(k, v) self.set_output(k, v)

View File

@ -93,7 +93,7 @@ def resume(connector_id):
@validate_request("kb_id") @validate_request("kb_id")
def rebuild(connector_id): def rebuild(connector_id):
req = request.json req = request.json
err = ConnectorService.rebuild(connector_id, req["kb_id"], current_user.id) err = ConnectorService.rebuild(req["kb_id"], connector_id, current_user.id)
if err: if err:
return get_json_result(data=False, message=err, code=RetCode.SERVER_ERROR) return get_json_result(data=False, message=err, code=RetCode.SERVER_ERROR)
return get_json_result(data=True) return get_json_result(data=True)

View File

@ -669,6 +669,7 @@ class LLMFactories(DataBaseModel):
name = CharField(max_length=128, null=False, help_text="LLM factory name", primary_key=True) name = CharField(max_length=128, null=False, help_text="LLM factory name", primary_key=True)
logo = TextField(null=True, help_text="llm logo base64") logo = TextField(null=True, help_text="llm logo base64")
tags = CharField(max_length=255, null=False, help_text="LLM, Text Embedding, Image2Text, ASR", index=True) tags = CharField(max_length=255, null=False, help_text="LLM, Text Embedding, Image2Text, ASR", index=True)
rank = IntegerField(default=0, index=False)
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True) status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
def __str__(self): def __str__(self):
@ -1287,4 +1288,8 @@ def migrate_db():
migrate(migrator.add_column("connector2kb", "auto_parse", CharField(max_length=1, null=False, default="1", index=False))) migrate(migrator.add_column("connector2kb", "auto_parse", CharField(max_length=1, null=False, default="1", index=False)))
except Exception: except Exception:
pass pass
try:
migrate(migrator.add_column("llm_factories", "rank", IntegerField(default=0, index=False)))
except Exception:
pass
logging.disable(logging.NOTSET) logging.disable(logging.NOTSET)

View File

@ -72,7 +72,7 @@ class ConnectorService(CommonService):
if not e: if not e:
return return
SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id]) SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id])
docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}", kb_id=kb_id)
err = FileService.delete_docs([d.id for d in docs], tenant_id) err = FileService.delete_docs([d.id for d in docs], tenant_id)
SyncLogsService.schedule(connector_id, kb_id, reindex=True) SyncLogsService.schedule(connector_id, kb_id, reindex=True)
return err return err

View File

@ -755,6 +755,14 @@ class DocumentService(CommonService):
.where((cls.model.kb_id == kb_id) & (cls.model.run == TaskStatus.CANCEL)) .where((cls.model.kb_id == kb_id) & (cls.model.run == TaskStatus.CANCEL))
.scalar() .scalar()
) )
downloaded = (
cls.model.select(fn.COUNT(1))
.where(
cls.model.kb_id == kb_id,
cls.model.source_type != "local"
)
.scalar()
)
row = ( row = (
cls.model.select( cls.model.select(
@ -791,6 +799,7 @@ class DocumentService(CommonService):
"finished": int(row["finished"]), "finished": int(row["finished"]),
"failed": int(row["failed"]), "failed": int(row["failed"]),
"cancelled": int(cancelled), "cancelled": int(cancelled),
"downloaded": int(downloaded)
} }
@classmethod @classmethod

View File

@ -625,7 +625,7 @@ async def is_strong_enough(chat_model, embedding_model):
def get_allowed_llm_factories() -> list: def get_allowed_llm_factories() -> list:
factories = list(LLMFactoriesService.get_all()) factories = list(LLMFactoriesService.get_all(reverse=True, order_by="rank"))
if settings.ALLOWED_LLM_FACTORIES is None: if settings.ALLOWED_LLM_FACTORIES is None:
return factories return factories

View File

@ -5,6 +5,7 @@
"logo": "", "logo": "",
"tags": "LLM,TEXT EMBEDDING,TTS,TEXT RE-RANK,SPEECH2TEXT,MODERATION", "tags": "LLM,TEXT EMBEDDING,TTS,TEXT RE-RANK,SPEECH2TEXT,MODERATION",
"status": "1", "status": "1",
"rank": 999999990,
"llm": [ "llm": [
{ {
"llm_name": "gpt-5", "llm_name": "gpt-5",

View File

@ -167,7 +167,9 @@ function task_exe() {
JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so" JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so"
while true; do while true; do
LD_PRELOAD="$JEMALLOC_PATH" \ LD_PRELOAD="$JEMALLOC_PATH" \
"$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}" "$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}" &
wait;
sleep 1;
done done
} }
@ -238,21 +240,27 @@ if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then
echo "Starting ragflow_server..." echo "Starting ragflow_server..."
while true; do while true; do
"$PY" api/ragflow_server.py "$PY" api/ragflow_server.py &
wait;
sleep 1;
done & done &
fi fi
if [[ "${ENABLE_DATASYNC}" -eq 1 ]]; then if [[ "${ENABLE_DATASYNC}" -eq 1 ]]; then
echo "Starting data sync..." echo "Starting data sync..."
while true; do while true; do
"$PY" rag/svr/sync_data_source.py "$PY" rag/svr/sync_data_source.py &
wait;
sleep 1;
done & done &
fi fi
if [[ "${ENABLE_ADMIN_SERVER}" -eq 1 ]]; then if [[ "${ENABLE_ADMIN_SERVER}" -eq 1 ]]; then
echo "Starting admin_server..." echo "Starting admin_server..."
while true; do while true; do
"$PY" admin/server/admin_server.py "$PY" admin/server/admin_server.py &
wait;
sleep 1;
done & done &
fi fi

View File

@ -478,12 +478,14 @@ class Markdown(MarkdownParser):
images = [] images = []
# Find all image URLs in text # Find all image URLs in text
for url in image_urls: for url in image_urls:
if not url:
continue
try: try:
# check if the url is a local file or a remote URL # check if the url is a local file or a remote URL
if url.startswith(('http://', 'https://')): if url.startswith(('http://', 'https://')):
# For remote URLs, download the image # For remote URLs, download the image
response = requests.get(url, stream=True, timeout=30) response = requests.get(url, stream=True, timeout=30)
if response.status_code == 200 and response.headers['Content-Type'].startswith('image/'): if response.status_code == 200 and response.headers['Content-Type'] and response.headers['Content-Type'].startswith('image/'):
img = Image.open(BytesIO(response.content)).convert('RGB') img = Image.open(BytesIO(response.content)).convert('RGB')
images.append(img) images.append(img)
else: else:

View File

@ -258,6 +258,14 @@ func_factory = {
async def dispatch_tasks(): async def dispatch_tasks():
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
while True:
try:
list(SyncLogsService.list_sync_tasks()[0])
break
except Exception as e:
logging.warning(f"DB is not ready yet: {e}")
await trio.sleep(3)
for task in SyncLogsService.list_sync_tasks()[0]: for task in SyncLogsService.list_sync_tasks()[0]:
if task["poll_range_start"]: if task["poll_range_start"]:
task["poll_range_start"] = task["poll_range_start"].astimezone(timezone.utc) task["poll_range_start"] = task["poll_range_start"].astimezone(timezone.utc)