mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-30 00:32:30 +08:00
### What problem does this PR solve? Modifying a document’s parser config previously left behind obsolete chunk images. If the dataset isn’t manually deleted, these images accumulate and waste storage. This PR fixes the issue by automatically removing associated images when the parser config changes. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
911 lines
38 KiB
Python
911 lines
38 KiB
Python
#
|
|
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License
|
|
#
|
|
import asyncio
|
|
import json
|
|
import os.path
|
|
import pathlib
|
|
import re
|
|
from pathlib import Path
|
|
from quart import request, make_response
|
|
from api.apps import current_user, login_required
|
|
from api.common.check_team_permission import check_kb_team_permission
|
|
from api.constants import FILE_NAME_LEN_LIMIT, IMG_BASE64_PREFIX
|
|
from api.db import VALID_FILE_TYPES, FileType
|
|
from api.db.db_models import Task
|
|
from api.db.services import duplicate_name
|
|
from api.db.services.document_service import DocumentService, doc_upload_and_parse
|
|
from common.metadata_utils import meta_filter, convert_conditions
|
|
from api.db.services.file2document_service import File2DocumentService
|
|
from api.db.services.file_service import FileService
|
|
from api.db.services.knowledgebase_service import KnowledgebaseService
|
|
from api.db.services.task_service import TaskService, cancel_all_task_of
|
|
from api.db.services.user_service import UserTenantService
|
|
from common.misc_utils import get_uuid
|
|
from api.utils.api_utils import (
|
|
get_data_error_result,
|
|
get_json_result,
|
|
server_error_response,
|
|
validate_request, get_request_json,
|
|
)
|
|
from api.utils.file_utils import filename_type, thumbnail
|
|
from common.file_utils import get_project_base_directory
|
|
from common.constants import RetCode, VALID_TASK_STATUS, ParserType, TaskStatus
|
|
from api.utils.web_utils import CONTENT_TYPE_MAP, html2pdf, is_valid_url
|
|
from deepdoc.parser.html_parser import RAGFlowHtmlParser
|
|
from rag.nlp import search, rag_tokenizer
|
|
from common import settings
|
|
|
|
|
|
@manager.route("/upload", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("kb_id")
|
|
async def upload():
|
|
form = await request.form
|
|
kb_id = form.get("kb_id")
|
|
if not kb_id:
|
|
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
|
|
files = await request.files
|
|
if "file" not in files:
|
|
return get_json_result(data=False, message="No file part!", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
file_objs = files.getlist("file")
|
|
for file_obj in file_objs:
|
|
if file_obj.filename == "":
|
|
return get_json_result(data=False, message="No file selected!", code=RetCode.ARGUMENT_ERROR)
|
|
if len(file_obj.filename.encode("utf-8")) > FILE_NAME_LEN_LIMIT:
|
|
return get_json_result(data=False, message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
e, kb = KnowledgebaseService.get_by_id(kb_id)
|
|
if not e:
|
|
raise LookupError("Can't find this dataset!")
|
|
if not check_kb_team_permission(kb, current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
|
|
err, files = await asyncio.to_thread(FileService.upload_document, kb, file_objs, current_user.id)
|
|
if err:
|
|
return get_json_result(data=files, message="\n".join(err), code=RetCode.SERVER_ERROR)
|
|
|
|
if not files:
|
|
return get_json_result(data=files, message="There seems to be an issue with your file format. Please verify it is correct and not corrupted.", code=RetCode.DATA_ERROR)
|
|
files = [f[0] for f in files] # remove the blob
|
|
|
|
return get_json_result(data=files)
|
|
|
|
|
|
@manager.route("/web_crawl", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("kb_id", "name", "url")
|
|
async def web_crawl():
|
|
form = await request.form
|
|
kb_id = form.get("kb_id")
|
|
if not kb_id:
|
|
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
|
|
name = form.get("name")
|
|
url = form.get("url")
|
|
if not is_valid_url(url):
|
|
return get_json_result(data=False, message="The URL format is invalid", code=RetCode.ARGUMENT_ERROR)
|
|
e, kb = KnowledgebaseService.get_by_id(kb_id)
|
|
if not e:
|
|
raise LookupError("Can't find this dataset!")
|
|
if check_kb_team_permission(kb, current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
|
|
blob = html2pdf(url)
|
|
if not blob:
|
|
return server_error_response(ValueError("Download failure."))
|
|
|
|
root_folder = FileService.get_root_folder(current_user.id)
|
|
pf_id = root_folder["id"]
|
|
FileService.init_knowledgebase_docs(pf_id, current_user.id)
|
|
kb_root_folder = FileService.get_kb_folder(current_user.id)
|
|
kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
|
|
|
|
try:
|
|
filename = duplicate_name(DocumentService.query, name=name + ".pdf", kb_id=kb.id)
|
|
filetype = filename_type(filename)
|
|
if filetype == FileType.OTHER.value:
|
|
raise RuntimeError("This type of file has not been supported yet!")
|
|
|
|
location = filename
|
|
while settings.STORAGE_IMPL.obj_exist(kb_id, location):
|
|
location += "_"
|
|
settings.STORAGE_IMPL.put(kb_id, location, blob)
|
|
doc = {
|
|
"id": get_uuid(),
|
|
"kb_id": kb.id,
|
|
"parser_id": kb.parser_id,
|
|
"parser_config": kb.parser_config,
|
|
"created_by": current_user.id,
|
|
"type": filetype,
|
|
"name": filename,
|
|
"location": location,
|
|
"size": len(blob),
|
|
"thumbnail": thumbnail(filename, blob),
|
|
"suffix": Path(filename).suffix.lstrip("."),
|
|
}
|
|
if doc["type"] == FileType.VISUAL:
|
|
doc["parser_id"] = ParserType.PICTURE.value
|
|
if doc["type"] == FileType.AURAL:
|
|
doc["parser_id"] = ParserType.AUDIO.value
|
|
if re.search(r"\.(ppt|pptx|pages)$", filename):
|
|
doc["parser_id"] = ParserType.PRESENTATION.value
|
|
if re.search(r"\.(eml)$", filename):
|
|
doc["parser_id"] = ParserType.EMAIL.value
|
|
DocumentService.insert(doc)
|
|
FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
return get_json_result(data=True)
|
|
|
|
|
|
@manager.route("/create", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("name", "kb_id")
|
|
async def create():
|
|
req = await get_request_json()
|
|
kb_id = req["kb_id"]
|
|
if not kb_id:
|
|
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
|
|
if len(req["name"].encode("utf-8")) > FILE_NAME_LEN_LIMIT:
|
|
return get_json_result(data=False, message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
if req["name"].strip() == "":
|
|
return get_json_result(data=False, message="File name can't be empty.", code=RetCode.ARGUMENT_ERROR)
|
|
req["name"] = req["name"].strip()
|
|
|
|
try:
|
|
e, kb = KnowledgebaseService.get_by_id(kb_id)
|
|
if not e:
|
|
return get_data_error_result(message="Can't find this dataset!")
|
|
|
|
if DocumentService.query(name=req["name"], kb_id=kb_id):
|
|
return get_data_error_result(message="Duplicated document name in the same dataset.")
|
|
|
|
kb_root_folder = FileService.get_kb_folder(kb.tenant_id)
|
|
if not kb_root_folder:
|
|
return get_data_error_result(message="Cannot find the root folder.")
|
|
kb_folder = FileService.new_a_file_from_kb(
|
|
kb.tenant_id,
|
|
kb.name,
|
|
kb_root_folder["id"],
|
|
)
|
|
if not kb_folder:
|
|
return get_data_error_result(message="Cannot find the kb folder for this file.")
|
|
|
|
doc = DocumentService.insert(
|
|
{
|
|
"id": get_uuid(),
|
|
"kb_id": kb.id,
|
|
"parser_id": kb.parser_id,
|
|
"pipeline_id": kb.pipeline_id,
|
|
"parser_config": kb.parser_config,
|
|
"created_by": current_user.id,
|
|
"type": FileType.VIRTUAL,
|
|
"name": req["name"],
|
|
"suffix": Path(req["name"]).suffix.lstrip("."),
|
|
"location": "",
|
|
"size": 0,
|
|
}
|
|
)
|
|
|
|
FileService.add_file_from_kb(doc.to_dict(), kb_folder["id"], kb.tenant_id)
|
|
|
|
return get_json_result(data=doc.to_json())
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/list", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
async def list_docs():
|
|
kb_id = request.args.get("kb_id")
|
|
if not kb_id:
|
|
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
|
|
tenants = UserTenantService.query(user_id=current_user.id)
|
|
for tenant in tenants:
|
|
if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id):
|
|
break
|
|
else:
|
|
return get_json_result(data=False, message="Only owner of dataset authorized for this operation.", code=RetCode.OPERATING_ERROR)
|
|
keywords = request.args.get("keywords", "")
|
|
|
|
page_number = int(request.args.get("page", 0))
|
|
items_per_page = int(request.args.get("page_size", 0))
|
|
orderby = request.args.get("orderby", "create_time")
|
|
if request.args.get("desc", "true").lower() == "false":
|
|
desc = False
|
|
else:
|
|
desc = True
|
|
create_time_from = int(request.args.get("create_time_from", 0))
|
|
create_time_to = int(request.args.get("create_time_to", 0))
|
|
|
|
req = await get_request_json()
|
|
|
|
return_empty_metadata = req.get("return_empty_metadata", False)
|
|
if isinstance(return_empty_metadata, str):
|
|
return_empty_metadata = return_empty_metadata.lower() == "true"
|
|
|
|
run_status = req.get("run_status", [])
|
|
if run_status:
|
|
invalid_status = {s for s in run_status if s not in VALID_TASK_STATUS}
|
|
if invalid_status:
|
|
return get_data_error_result(message=f"Invalid filter run status conditions: {', '.join(invalid_status)}")
|
|
|
|
types = req.get("types", [])
|
|
if types:
|
|
invalid_types = {t for t in types if t not in VALID_FILE_TYPES}
|
|
if invalid_types:
|
|
return get_data_error_result(message=f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}")
|
|
|
|
suffix = req.get("suffix", [])
|
|
metadata_condition = req.get("metadata_condition", {}) or {}
|
|
metadata = req.get("metadata", {}) or {}
|
|
if isinstance(metadata, dict) and metadata.get("empty_metadata"):
|
|
return_empty_metadata = True
|
|
metadata = {k: v for k, v in metadata.items() if k != "empty_metadata"}
|
|
if return_empty_metadata:
|
|
metadata_condition = {}
|
|
metadata = {}
|
|
else:
|
|
if metadata_condition and not isinstance(metadata_condition, dict):
|
|
return get_data_error_result(message="metadata_condition must be an object.")
|
|
if metadata and not isinstance(metadata, dict):
|
|
return get_data_error_result(message="metadata must be an object.")
|
|
|
|
doc_ids_filter = None
|
|
metas = None
|
|
if metadata_condition or metadata:
|
|
metas = DocumentService.get_flatted_meta_by_kbs([kb_id])
|
|
|
|
if metadata_condition:
|
|
doc_ids_filter = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")))
|
|
if metadata_condition.get("conditions") and not doc_ids_filter:
|
|
return get_json_result(data={"total": 0, "docs": []})
|
|
|
|
if metadata:
|
|
metadata_doc_ids = None
|
|
for key, values in metadata.items():
|
|
if not values:
|
|
continue
|
|
if not isinstance(values, list):
|
|
values = [values]
|
|
values = [str(v) for v in values if v is not None and str(v).strip()]
|
|
if not values:
|
|
continue
|
|
key_doc_ids = set()
|
|
for value in values:
|
|
key_doc_ids.update(metas.get(key, {}).get(value, []))
|
|
if metadata_doc_ids is None:
|
|
metadata_doc_ids = key_doc_ids
|
|
else:
|
|
metadata_doc_ids &= key_doc_ids
|
|
if not metadata_doc_ids:
|
|
return get_json_result(data={"total": 0, "docs": []})
|
|
if metadata_doc_ids is not None:
|
|
if doc_ids_filter is None:
|
|
doc_ids_filter = metadata_doc_ids
|
|
else:
|
|
doc_ids_filter &= metadata_doc_ids
|
|
if not doc_ids_filter:
|
|
return get_json_result(data={"total": 0, "docs": []})
|
|
|
|
if doc_ids_filter is not None:
|
|
doc_ids_filter = list(doc_ids_filter)
|
|
|
|
try:
|
|
docs, tol = DocumentService.get_by_kb_id(
|
|
kb_id,
|
|
page_number,
|
|
items_per_page,
|
|
orderby,
|
|
desc,
|
|
keywords,
|
|
run_status,
|
|
types,
|
|
suffix,
|
|
doc_ids_filter,
|
|
return_empty_metadata=return_empty_metadata,
|
|
)
|
|
|
|
if create_time_from or create_time_to:
|
|
filtered_docs = []
|
|
for doc in docs:
|
|
doc_create_time = doc.get("create_time", 0)
|
|
if (create_time_from == 0 or doc_create_time >= create_time_from) and (create_time_to == 0 or doc_create_time <= create_time_to):
|
|
filtered_docs.append(doc)
|
|
docs = filtered_docs
|
|
|
|
for doc_item in docs:
|
|
if doc_item["thumbnail"] and not doc_item["thumbnail"].startswith(IMG_BASE64_PREFIX):
|
|
doc_item["thumbnail"] = f"/v1/document/image/{kb_id}-{doc_item['thumbnail']}"
|
|
if doc_item.get("source_type"):
|
|
doc_item["source_type"] = doc_item["source_type"].split("/")[0]
|
|
|
|
return get_json_result(data={"total": tol, "docs": docs})
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/filter", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
async def get_filter():
|
|
req = await get_request_json()
|
|
|
|
kb_id = req.get("kb_id")
|
|
if not kb_id:
|
|
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
|
|
tenants = UserTenantService.query(user_id=current_user.id)
|
|
for tenant in tenants:
|
|
if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id):
|
|
break
|
|
else:
|
|
return get_json_result(data=False, message="Only owner of dataset authorized for this operation.", code=RetCode.OPERATING_ERROR)
|
|
|
|
keywords = req.get("keywords", "")
|
|
|
|
suffix = req.get("suffix", [])
|
|
|
|
run_status = req.get("run_status", [])
|
|
if run_status:
|
|
invalid_status = {s for s in run_status if s not in VALID_TASK_STATUS}
|
|
if invalid_status:
|
|
return get_data_error_result(message=f"Invalid filter run status conditions: {', '.join(invalid_status)}")
|
|
|
|
types = req.get("types", [])
|
|
if types:
|
|
invalid_types = {t for t in types if t not in VALID_FILE_TYPES}
|
|
if invalid_types:
|
|
return get_data_error_result(message=f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}")
|
|
|
|
try:
|
|
filter, total = DocumentService.get_filter_by_kb_id(kb_id, keywords, run_status, types, suffix)
|
|
return get_json_result(data={"total": total, "filter": filter})
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/infos", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
async def doc_infos():
|
|
req = await get_request_json()
|
|
doc_ids = req["doc_ids"]
|
|
for doc_id in doc_ids:
|
|
if not DocumentService.accessible(doc_id, current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
docs = DocumentService.get_by_ids(doc_ids)
|
|
return get_json_result(data=list(docs.dicts()))
|
|
|
|
|
|
@manager.route("/metadata/summary", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
async def metadata_summary():
|
|
req = await get_request_json()
|
|
kb_id = req.get("kb_id")
|
|
if not kb_id:
|
|
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
|
|
|
|
tenants = UserTenantService.query(user_id=current_user.id)
|
|
for tenant in tenants:
|
|
if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id):
|
|
break
|
|
else:
|
|
return get_json_result(data=False, message="Only owner of dataset authorized for this operation.", code=RetCode.OPERATING_ERROR)
|
|
|
|
try:
|
|
summary = DocumentService.get_metadata_summary(kb_id)
|
|
return get_json_result(data={"summary": summary})
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/metadata/update", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
async def metadata_update():
|
|
req = await get_request_json()
|
|
kb_id = req.get("kb_id")
|
|
if not kb_id:
|
|
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
|
|
|
|
tenants = UserTenantService.query(user_id=current_user.id)
|
|
for tenant in tenants:
|
|
if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id):
|
|
break
|
|
else:
|
|
return get_json_result(data=False, message="Only owner of dataset authorized for this operation.", code=RetCode.OPERATING_ERROR)
|
|
|
|
selector = req.get("selector", {}) or {}
|
|
updates = req.get("updates", []) or []
|
|
deletes = req.get("deletes", []) or []
|
|
|
|
if not isinstance(selector, dict):
|
|
return get_json_result(data=False, message="selector must be an object.", code=RetCode.ARGUMENT_ERROR)
|
|
if not isinstance(updates, list) or not isinstance(deletes, list):
|
|
return get_json_result(data=False, message="updates and deletes must be lists.", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
metadata_condition = selector.get("metadata_condition", {}) or {}
|
|
if metadata_condition and not isinstance(metadata_condition, dict):
|
|
return get_json_result(data=False, message="metadata_condition must be an object.", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
document_ids = selector.get("document_ids", []) or []
|
|
if document_ids and not isinstance(document_ids, list):
|
|
return get_json_result(data=False, message="document_ids must be a list.", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
for upd in updates:
|
|
if not isinstance(upd, dict) or not upd.get("key") or "value" not in upd:
|
|
return get_json_result(data=False, message="Each update requires key and value.", code=RetCode.ARGUMENT_ERROR)
|
|
for d in deletes:
|
|
if not isinstance(d, dict) or not d.get("key"):
|
|
return get_json_result(data=False, message="Each delete requires key.", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
kb_doc_ids = KnowledgebaseService.list_documents_by_ids([kb_id])
|
|
target_doc_ids = set(kb_doc_ids)
|
|
if document_ids:
|
|
invalid_ids = set(document_ids) - set(kb_doc_ids)
|
|
if invalid_ids:
|
|
return get_json_result(data=False, message=f"These documents do not belong to dataset {kb_id}: {', '.join(invalid_ids)}", code=RetCode.ARGUMENT_ERROR)
|
|
target_doc_ids = set(document_ids)
|
|
|
|
if metadata_condition:
|
|
metas = DocumentService.get_flatted_meta_by_kbs([kb_id])
|
|
filtered_ids = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")))
|
|
target_doc_ids = target_doc_ids & filtered_ids
|
|
if metadata_condition.get("conditions") and not target_doc_ids:
|
|
return get_json_result(data={"updated": 0, "matched_docs": 0})
|
|
|
|
target_doc_ids = list(target_doc_ids)
|
|
updated = DocumentService.batch_update_metadata(kb_id, target_doc_ids, updates, deletes)
|
|
return get_json_result(data={"updated": updated, "matched_docs": len(target_doc_ids)})
|
|
|
|
|
|
@manager.route("/update_metadata_setting", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("doc_id", "metadata")
|
|
async def update_metadata_setting():
|
|
req = await get_request_json()
|
|
if not DocumentService.accessible(req["doc_id"], current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
|
|
e, doc = DocumentService.get_by_id(req["doc_id"])
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
|
|
DocumentService.update_parser_config(doc.id, {"metadata": req["metadata"]})
|
|
e, doc = DocumentService.get_by_id(doc.id)
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
|
|
return get_json_result(data=doc.to_dict())
|
|
|
|
|
|
@manager.route("/thumbnails", methods=["GET"]) # noqa: F821
|
|
# @login_required
|
|
def thumbnails():
|
|
doc_ids = request.args.getlist("doc_ids")
|
|
if not doc_ids:
|
|
return get_json_result(data=False, message='Lack of "Document ID"', code=RetCode.ARGUMENT_ERROR)
|
|
|
|
try:
|
|
docs = DocumentService.get_thumbnails(doc_ids)
|
|
|
|
for doc_item in docs:
|
|
if doc_item["thumbnail"] and not doc_item["thumbnail"].startswith(IMG_BASE64_PREFIX):
|
|
doc_item["thumbnail"] = f"/v1/document/image/{doc_item['kb_id']}-{doc_item['thumbnail']}"
|
|
|
|
return get_json_result(data={d["id"]: d["thumbnail"] for d in docs})
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/change_status", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("doc_ids", "status")
|
|
async def change_status():
|
|
req = await get_request_json()
|
|
doc_ids = req.get("doc_ids", [])
|
|
status = str(req.get("status", ""))
|
|
|
|
if status not in ["0", "1"]:
|
|
return get_json_result(data=False, message='"Status" must be either 0 or 1!', code=RetCode.ARGUMENT_ERROR)
|
|
|
|
result = {}
|
|
for doc_id in doc_ids:
|
|
if not DocumentService.accessible(doc_id, current_user.id):
|
|
result[doc_id] = {"error": "No authorization."}
|
|
continue
|
|
|
|
try:
|
|
e, doc = DocumentService.get_by_id(doc_id)
|
|
if not e:
|
|
result[doc_id] = {"error": "No authorization."}
|
|
continue
|
|
e, kb = KnowledgebaseService.get_by_id(doc.kb_id)
|
|
if not e:
|
|
result[doc_id] = {"error": "Can't find this dataset!"}
|
|
continue
|
|
if not DocumentService.update_by_id(doc_id, {"status": str(status)}):
|
|
result[doc_id] = {"error": "Database error (Document update)!"}
|
|
continue
|
|
|
|
status_int = int(status)
|
|
if not settings.docStoreConn.update({"doc_id": doc_id}, {"available_int": status_int}, search.index_name(kb.tenant_id), doc.kb_id):
|
|
result[doc_id] = {"error": "Database error (docStore update)!"}
|
|
result[doc_id] = {"status": status}
|
|
except Exception as e:
|
|
result[doc_id] = {"error": f"Internal server error: {str(e)}"}
|
|
|
|
return get_json_result(data=result)
|
|
|
|
|
|
@manager.route("/rm", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("doc_id")
|
|
async def rm():
|
|
req = await get_request_json()
|
|
doc_ids = req["doc_id"]
|
|
if isinstance(doc_ids, str):
|
|
doc_ids = [doc_ids]
|
|
|
|
for doc_id in doc_ids:
|
|
if not DocumentService.accessible4deletion(doc_id, current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
|
|
errors = await asyncio.to_thread(FileService.delete_docs, doc_ids, current_user.id)
|
|
|
|
if errors:
|
|
return get_json_result(data=False, message=errors, code=RetCode.SERVER_ERROR)
|
|
|
|
return get_json_result(data=True)
|
|
|
|
|
|
@manager.route("/run", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("doc_ids", "run")
|
|
async def run():
|
|
req = await get_request_json()
|
|
try:
|
|
def _run_sync():
|
|
for doc_id in req["doc_ids"]:
|
|
if not DocumentService.accessible(doc_id, current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
|
|
kb_table_num_map = {}
|
|
for id in req["doc_ids"]:
|
|
info = {"run": str(req["run"]), "progress": 0}
|
|
if str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False):
|
|
info["progress_msg"] = ""
|
|
info["chunk_num"] = 0
|
|
info["token_num"] = 0
|
|
|
|
tenant_id = DocumentService.get_tenant_id(id)
|
|
if not tenant_id:
|
|
return get_data_error_result(message="Tenant not found!")
|
|
e, doc = DocumentService.get_by_id(id)
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
|
|
if str(req["run"]) == TaskStatus.CANCEL.value:
|
|
if str(doc.run) == TaskStatus.RUNNING.value:
|
|
cancel_all_task_of(id)
|
|
else:
|
|
return get_data_error_result(message="Cannot cancel a task that is not in RUNNING status")
|
|
if all([("delete" not in req or req["delete"]), str(req["run"]) == TaskStatus.RUNNING.value, str(doc.run) == TaskStatus.DONE.value]):
|
|
DocumentService.clear_chunk_num_when_rerun(doc.id)
|
|
|
|
DocumentService.update_by_id(id, info)
|
|
if req.get("delete", False):
|
|
TaskService.filter_delete([Task.doc_id == id])
|
|
if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id):
|
|
settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id)
|
|
|
|
if str(req["run"]) == TaskStatus.RUNNING.value:
|
|
if req.get("apply_kb"):
|
|
e, kb = KnowledgebaseService.get_by_id(doc.kb_id)
|
|
if not e:
|
|
raise LookupError("Can't find this dataset!")
|
|
doc.parser_config["enable_metadata"] = kb.parser_config.get("enable_metadata", False)
|
|
doc.parser_config["metadata"] = kb.parser_config.get("metadata", {})
|
|
DocumentService.update_parser_config(doc.id, doc.parser_config)
|
|
doc_dict = doc.to_dict()
|
|
DocumentService.run(tenant_id, doc_dict, kb_table_num_map)
|
|
|
|
return get_json_result(data=True)
|
|
|
|
return await asyncio.to_thread(_run_sync)
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/rename", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("doc_id", "name")
|
|
async def rename():
|
|
req = await get_request_json()
|
|
try:
|
|
def _rename_sync():
|
|
if not DocumentService.accessible(req["doc_id"], current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
|
|
e, doc = DocumentService.get_by_id(req["doc_id"])
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
if pathlib.Path(req["name"].lower()).suffix != pathlib.Path(doc.name.lower()).suffix:
|
|
return get_json_result(data=False, message="The extension of file can't be changed", code=RetCode.ARGUMENT_ERROR)
|
|
if len(req["name"].encode("utf-8")) > FILE_NAME_LEN_LIMIT:
|
|
return get_json_result(data=False, message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
for d in DocumentService.query(name=req["name"], kb_id=doc.kb_id):
|
|
if d.name == req["name"]:
|
|
return get_data_error_result(message="Duplicated document name in the same dataset.")
|
|
|
|
if not DocumentService.update_by_id(req["doc_id"], {"name": req["name"]}):
|
|
return get_data_error_result(message="Database error (Document rename)!")
|
|
|
|
informs = File2DocumentService.get_by_document_id(req["doc_id"])
|
|
if informs:
|
|
e, file = FileService.get_by_id(informs[0].file_id)
|
|
FileService.update_by_id(file.id, {"name": req["name"]})
|
|
|
|
tenant_id = DocumentService.get_tenant_id(req["doc_id"])
|
|
title_tks = rag_tokenizer.tokenize(req["name"])
|
|
es_body = {
|
|
"docnm_kwd": req["name"],
|
|
"title_tks": title_tks,
|
|
"title_sm_tks": rag_tokenizer.fine_grained_tokenize(title_tks),
|
|
}
|
|
if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id):
|
|
settings.docStoreConn.update(
|
|
{"doc_id": req["doc_id"]},
|
|
es_body,
|
|
search.index_name(tenant_id),
|
|
doc.kb_id,
|
|
)
|
|
return get_json_result(data=True)
|
|
|
|
return await asyncio.to_thread(_rename_sync)
|
|
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/get/<doc_id>", methods=["GET"]) # noqa: F821
|
|
# @login_required
|
|
async def get(doc_id):
|
|
try:
|
|
e, doc = DocumentService.get_by_id(doc_id)
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
|
|
b, n = File2DocumentService.get_storage_address(doc_id=doc_id)
|
|
data = await asyncio.to_thread(settings.STORAGE_IMPL.get, b, n)
|
|
response = await make_response(data)
|
|
|
|
ext = re.search(r"\.([^.]+)$", doc.name.lower())
|
|
ext = ext.group(1) if ext else None
|
|
if ext:
|
|
if doc.type == FileType.VISUAL.value:
|
|
|
|
content_type = CONTENT_TYPE_MAP.get(ext, f"image/{ext}")
|
|
else:
|
|
content_type = CONTENT_TYPE_MAP.get(ext, f"application/{ext}")
|
|
response.headers.set("Content-Type", content_type)
|
|
return response
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/download/<attachment_id>", methods=["GET"]) # noqa: F821
|
|
@login_required
|
|
async def download_attachment(attachment_id):
|
|
try:
|
|
ext = request.args.get("ext", "markdown")
|
|
data = await asyncio.to_thread(settings.STORAGE_IMPL.get, current_user.id, attachment_id)
|
|
response = await make_response(data)
|
|
response.headers.set("Content-Type", CONTENT_TYPE_MAP.get(ext, f"application/{ext}"))
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/change_parser", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("doc_id")
|
|
async def change_parser():
|
|
|
|
req = await get_request_json()
|
|
if not DocumentService.accessible(req["doc_id"], current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
|
|
e, doc = DocumentService.get_by_id(req["doc_id"])
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
|
|
def reset_doc():
|
|
nonlocal doc
|
|
e = DocumentService.update_by_id(doc.id, {"pipeline_id": req["pipeline_id"], "parser_id": req["parser_id"], "progress": 0, "progress_msg": "", "run": TaskStatus.UNSTART.value})
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
if doc.token_num > 0:
|
|
e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num * -1, doc.chunk_num * -1, doc.process_duration * -1)
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
tenant_id = DocumentService.get_tenant_id(req["doc_id"])
|
|
if not tenant_id:
|
|
return get_data_error_result(message="Tenant not found!")
|
|
DocumentService.delete_chunk_images(doc, tenant_id)
|
|
if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id):
|
|
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)
|
|
return None
|
|
|
|
try:
|
|
if "pipeline_id" in req and req["pipeline_id"] != "":
|
|
if doc.pipeline_id == req["pipeline_id"]:
|
|
return get_json_result(data=True)
|
|
DocumentService.update_by_id(doc.id, {"pipeline_id": req["pipeline_id"]})
|
|
reset_doc()
|
|
return get_json_result(data=True)
|
|
|
|
if doc.parser_id.lower() == req["parser_id"].lower():
|
|
if "parser_config" in req:
|
|
if req["parser_config"] == doc.parser_config:
|
|
return get_json_result(data=True)
|
|
else:
|
|
return get_json_result(data=True)
|
|
|
|
if (doc.type == FileType.VISUAL and req["parser_id"] != "picture") or (re.search(r"\.(ppt|pptx|pages)$", doc.name) and req["parser_id"] != "presentation"):
|
|
return get_data_error_result(message="Not supported yet!")
|
|
if "parser_config" in req:
|
|
DocumentService.update_parser_config(doc.id, req["parser_config"])
|
|
reset_doc()
|
|
return get_json_result(data=True)
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/image/<image_id>", methods=["GET"]) # noqa: F821
|
|
# @login_required
|
|
async def get_image(image_id):
|
|
try:
|
|
arr = image_id.split("-")
|
|
if len(arr) != 2:
|
|
return get_data_error_result(message="Image not found.")
|
|
bkt, nm = image_id.split("-")
|
|
data = await asyncio.to_thread(settings.STORAGE_IMPL.get, bkt, nm)
|
|
response = await make_response(data)
|
|
response.headers.set("Content-Type", "image/JPEG")
|
|
return response
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/upload_and_parse", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("conversation_id")
|
|
async def upload_and_parse():
|
|
files = await request.files
|
|
if "file" not in files:
|
|
return get_json_result(data=False, message="No file part!", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
file_objs = files.getlist("file")
|
|
for file_obj in file_objs:
|
|
if file_obj.filename == "":
|
|
return get_json_result(data=False, message="No file selected!", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
form = await request.form
|
|
doc_ids = doc_upload_and_parse(form.get("conversation_id"), file_objs, current_user.id)
|
|
return get_json_result(data=doc_ids)
|
|
|
|
|
|
@manager.route("/parse", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
async def parse():
|
|
req = await get_request_json()
|
|
url = req.get("url", "")
|
|
if url:
|
|
if not is_valid_url(url):
|
|
return get_json_result(data=False, message="The URL format is invalid", code=RetCode.ARGUMENT_ERROR)
|
|
download_path = os.path.join(get_project_base_directory(), "logs/downloads")
|
|
os.makedirs(download_path, exist_ok=True)
|
|
from seleniumwire.webdriver import Chrome, ChromeOptions
|
|
|
|
options = ChromeOptions()
|
|
options.add_argument("--headless")
|
|
options.add_argument("--disable-gpu")
|
|
options.add_argument("--no-sandbox")
|
|
options.add_argument("--disable-dev-shm-usage")
|
|
options.add_experimental_option("prefs", {"download.default_directory": download_path, "download.prompt_for_download": False, "download.directory_upgrade": True, "safebrowsing.enabled": True})
|
|
driver = Chrome(options=options)
|
|
driver.get(url)
|
|
res_headers = [r.response.headers for r in driver.requests if r and r.response]
|
|
if len(res_headers) > 1:
|
|
sections = RAGFlowHtmlParser().parser_txt(driver.page_source)
|
|
driver.quit()
|
|
return get_json_result(data="\n".join(sections))
|
|
|
|
class File:
|
|
filename: str
|
|
filepath: str
|
|
|
|
def __init__(self, filename, filepath):
|
|
self.filename = filename
|
|
self.filepath = filepath
|
|
|
|
def read(self):
|
|
with open(self.filepath, "rb") as f:
|
|
return f.read()
|
|
|
|
r = re.search(r"filename=\"([^\"]+)\"", str(res_headers))
|
|
if not r or not r.group(1):
|
|
return get_json_result(data=False, message="Can't not identify downloaded file", code=RetCode.ARGUMENT_ERROR)
|
|
f = File(r.group(1), os.path.join(download_path, r.group(1)))
|
|
txt = FileService.parse_docs([f], current_user.id)
|
|
return get_json_result(data=txt)
|
|
|
|
files = await request.files
|
|
if "file" not in files:
|
|
return get_json_result(data=False, message="No file part!", code=RetCode.ARGUMENT_ERROR)
|
|
|
|
file_objs = files.getlist("file")
|
|
txt = FileService.parse_docs(file_objs, current_user.id)
|
|
|
|
return get_json_result(data=txt)
|
|
|
|
|
|
@manager.route("/set_meta", methods=["POST"]) # noqa: F821
|
|
@login_required
|
|
@validate_request("doc_id", "meta")
|
|
async def set_meta():
|
|
req = await get_request_json()
|
|
if not DocumentService.accessible(req["doc_id"], current_user.id):
|
|
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
|
try:
|
|
meta = json.loads(req["meta"])
|
|
if not isinstance(meta, dict):
|
|
return get_json_result(data=False, message="Only dictionary type supported.", code=RetCode.ARGUMENT_ERROR)
|
|
for k, v in meta.items():
|
|
if isinstance(v, list):
|
|
if not all(isinstance(i, (str, int, float)) for i in v):
|
|
return get_json_result(data=False, message=f"The type is not supported in list: {v}", code=RetCode.ARGUMENT_ERROR)
|
|
elif not isinstance(v, (str, int, float)):
|
|
return get_json_result(data=False, message=f"The type is not supported: {v}", code=RetCode.ARGUMENT_ERROR)
|
|
except Exception as e:
|
|
return get_json_result(data=False, message=f"Json syntax error: {e}", code=RetCode.ARGUMENT_ERROR)
|
|
if not isinstance(meta, dict):
|
|
return get_json_result(data=False, message='Meta data should be in Json map format, like {"key": "value"}', code=RetCode.ARGUMENT_ERROR)
|
|
|
|
try:
|
|
e, doc = DocumentService.get_by_id(req["doc_id"])
|
|
if not e:
|
|
return get_data_error_result(message="Document not found!")
|
|
|
|
if not DocumentService.update_by_id(req["doc_id"], {"meta_fields": meta}):
|
|
return get_data_error_result(message="Database error (meta updates)!")
|
|
|
|
return get_json_result(data=True)
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/upload_info", methods=["POST"]) # noqa: F821
|
|
async def upload_info():
|
|
files = await request.files
|
|
file = files['file'] if files and files.get("file") else None
|
|
try:
|
|
return get_json_result(data=FileService.upload_info(current_user.id, file, request.args.get("url")))
|
|
except Exception as e:
|
|
return server_error_response(e)
|