diff --git a/api/apps/file_app.py b/api/apps/file_app.py index a6e0b4a7a..7828a82e6 100644 --- a/api/apps/file_app.py +++ b/api/apps/file_app.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License # +import logging import os import pathlib import re @@ -234,54 +235,63 @@ def get_all_parent_folders(): return server_error_response(e) -@manager.route('/rm', methods=['POST']) # noqa: F821 +@manager.route("/rm", methods=["POST"]) # noqa: F821 @login_required @validate_request("file_ids") def rm(): req = request.json file_ids = req["file_ids"] + + def _delete_single_file(file): + try: + if file.location: + STORAGE_IMPL.rm(file.parent_id, file.location) + except Exception: + logging.exception(f"Fail to remove object: {file.parent_id}/{file.location}") + + informs = File2DocumentService.get_by_file_id(file.id) + for inform in informs: + doc_id = inform.document_id + e, doc = DocumentService.get_by_id(doc_id) + if e and doc: + tenant_id = DocumentService.get_tenant_id(doc_id) + if tenant_id: + DocumentService.remove_document(doc, tenant_id) + File2DocumentService.delete_by_file_id(file.id) + + FileService.delete(file) + + def _delete_folder_recursive(folder, tenant_id): + sub_files = FileService.list_all_files_by_parent_id(folder.id) + for sub_file in sub_files: + if sub_file.type == FileType.FOLDER.value: + _delete_folder_recursive(sub_file, tenant_id) + else: + _delete_single_file(sub_file) + + FileService.delete(folder) + try: for file_id in file_ids: e, file = FileService.get_by_id(file_id) - if not e: + if not e or not file: return get_data_error_result(message="File or Folder not found!") if not file.tenant_id: return get_data_error_result(message="Tenant not found!") if not check_file_team_permission(file, current_user.id): - return get_json_result(data=False, message='No authorization.', code=settings.RetCode.AUTHENTICATION_ERROR) + return get_json_result(data=False, message="No authorization.", code=settings.RetCode.AUTHENTICATION_ERROR) + if file.source_type == FileSource.KNOWLEDGEBASE: continue if file.type == FileType.FOLDER.value: - file_id_list = FileService.get_all_innermost_file_ids(file_id, []) - for inner_file_id in file_id_list: - e, file = FileService.get_by_id(inner_file_id) - if not e: - return get_data_error_result(message="File not found!") - STORAGE_IMPL.rm(file.parent_id, file.location) - FileService.delete_folder_by_pf_id(current_user.id, file_id) - else: - STORAGE_IMPL.rm(file.parent_id, file.location) - if not FileService.delete(file): - return get_data_error_result( - message="Database error (File removal)!") + _delete_folder_recursive(file, current_user.id) + continue - # delete file2document - informs = File2DocumentService.get_by_file_id(file_id) - for inform in informs: - doc_id = inform.document_id - e, doc = DocumentService.get_by_id(doc_id) - if not e: - return get_data_error_result(message="Document not found!") - tenant_id = DocumentService.get_tenant_id(doc_id) - if not tenant_id: - return get_data_error_result(message="Tenant not found!") - if not DocumentService.remove_document(doc, tenant_id): - return get_data_error_result( - message="Database error (Document removal)!") - File2DocumentService.delete_by_file_id(file_id) + _delete_single_file(file) return get_json_result(data=True) + except Exception as e: return server_error_response(e) @@ -355,31 +365,89 @@ def get(file_id): return server_error_response(e) -@manager.route('/mv', methods=['POST']) # noqa: F821 +@manager.route("/mv", methods=["POST"]) # noqa: F821 @login_required @validate_request("src_file_ids", "dest_file_id") def move(): req = request.json try: file_ids = req["src_file_ids"] - parent_id = req["dest_file_id"] + dest_parent_id = req["dest_file_id"] + + ok, dest_folder = FileService.get_by_id(dest_parent_id) + if not ok or not dest_folder: + return get_data_error_result(message="Parent Folder not found!") + files = FileService.get_by_ids(file_ids) - files_dict = {} - for file in files: - files_dict[file.id] = file + if not files: + return get_data_error_result(message="Source files not found!") + + files_dict = {f.id: f for f in files} for file_id in file_ids: - file = files_dict[file_id] + file = files_dict.get(file_id) if not file: return get_data_error_result(message="File or Folder not found!") if not file.tenant_id: return get_data_error_result(message="Tenant not found!") if not check_file_team_permission(file, current_user.id): - return get_json_result(data=False, message='No authorization.', code=settings.RetCode.AUTHENTICATION_ERROR) - fe, _ = FileService.get_by_id(parent_id) - if not fe: - return get_data_error_result(message="Parent Folder not found!") - FileService.move_file(file_ids, parent_id) + return get_json_result( + data=False, + message="No authorization.", + code=settings.RetCode.AUTHENTICATION_ERROR, + ) + + def _move_entry_recursive(source_file_entry, dest_folder): + if source_file_entry.type == FileType.FOLDER.value: + existing_folder = FileService.query(name=source_file_entry.name, parent_id=dest_folder.id) + if existing_folder: + new_folder = existing_folder[0] + else: + new_folder = FileService.insert( + { + "id": get_uuid(), + "parent_id": dest_folder.id, + "tenant_id": source_file_entry.tenant_id, + "created_by": current_user.id, + "name": source_file_entry.name, + "location": "", + "size": 0, + "type": FileType.FOLDER.value, + } + ) + + sub_files = FileService.list_all_files_by_parent_id(source_file_entry.id) + for sub_file in sub_files: + _move_entry_recursive(sub_file, new_folder) + + FileService.delete_by_id(source_file_entry.id) + return + + old_parent_id = source_file_entry.parent_id + old_location = source_file_entry.location + filename = source_file_entry.name + + new_location = filename + while STORAGE_IMPL.obj_exist(dest_folder.id, new_location): + new_location += "_" + + try: + STORAGE_IMPL.move(old_parent_id, old_location, dest_folder.id, new_location) + except Exception as storage_err: + raise RuntimeError(f"Move file failed at storage layer: {str(storage_err)}") + + FileService.update_by_id( + source_file_entry.id, + { + "parent_id": dest_folder.id, + "location": new_location, + }, + ) + + for file in files: + _move_entry_recursive(file, dest_folder) + return get_json_result(data=True) + except Exception as e: return server_error_response(e) diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 7c8d91de2..c6b63564f 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -476,6 +476,16 @@ class FileService(CommonService): return err, files + @classmethod + @DB.connection_context() + def list_all_files_by_parent_id(cls, parent_id): + try: + files = cls.model.select().where((cls.model.parent_id == parent_id) & (cls.model.id != parent_id)) + return list(files) + except Exception: + logging.exception("list_by_parent_id failed") + raise RuntimeError("Database error (list_by_parent_id)!") + @staticmethod def parse_docs(file_objs, user_id): exe = ThreadPoolExecutor(max_workers=12) diff --git a/rag/utils/minio_conn.py b/rag/utils/minio_conn.py index 9d1aaccf2..ff15c2bbf 100644 --- a/rag/utils/minio_conn.py +++ b/rag/utils/minio_conn.py @@ -17,6 +17,7 @@ import logging import time from minio import Minio +from minio.commonconfig import CopySource from minio.error import S3Error from io import BytesIO from rag import settings @@ -141,3 +142,36 @@ class RAGFlowMinio: except Exception: logging.exception(f"Fail to remove bucket {bucket}") + def copy(self, src_bucket, src_path, dest_bucket, dest_path): + try: + if not self.conn.bucket_exists(dest_bucket): + self.conn.make_bucket(dest_bucket) + + try: + self.conn.stat_object(src_bucket, src_path) + except Exception as e: + logging.exception(f"Source object not found: {src_bucket}/{src_path}, {e}") + return False + + self.conn.copy_object( + dest_bucket, + dest_path, + CopySource(src_bucket, src_path), + ) + return True + + except Exception: + logging.exception(f"Fail to copy {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False + + def move(self, src_bucket, src_path, dest_bucket, dest_path): + try: + if self.copy(src_bucket, src_path, dest_bucket, dest_path): + self.rm(src_bucket, src_path) + return True + else: + logging.error(f"Copy failed, move aborted: {src_bucket}/{src_path}") + return False + except Exception: + logging.exception(f"Fail to move {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False