Improve file management (#10577)

### What problem does this PR solve?

Improve file management. #10287.

Passed tests:

1. Create folder `A` and `B`.
2. Upload a file inside `A`, called `file`.
3. Create a KB, called `K`.
3. Link `file` to `K`.
4. Parse `file` inside of `K`. (OK)
5. Move `file` from `A` to `B`.
6. Parse `file` inside of `K`. (OK)
7. Move `file` from `B` to `A`.
8. Parse `file` inside of `K`. (OK)
9. Move entire folder `A` into `B`. (B -> A -> file)
10. Parse `file` inside of `K`. (OK)
11. Delete folder `B`.
12. All clear. (There is no document inside of `K`)

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Yongteng Lei
2025-10-16 09:38:25 +08:00
committed by GitHub
parent 1c38f4cefb
commit 86b254d214
3 changed files with 152 additions and 40 deletions

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License # limitations under the License
# #
import logging
import os import os
import pathlib import pathlib
import re import re
@ -234,54 +235,63 @@ def get_all_parent_folders():
return server_error_response(e) return server_error_response(e)
@manager.route('/rm', methods=['POST']) # noqa: F821 @manager.route("/rm", methods=["POST"]) # noqa: F821
@login_required @login_required
@validate_request("file_ids") @validate_request("file_ids")
def rm(): def rm():
req = request.json req = request.json
file_ids = req["file_ids"] file_ids = req["file_ids"]
def _delete_single_file(file):
try:
if file.location:
STORAGE_IMPL.rm(file.parent_id, file.location)
except Exception:
logging.exception(f"Fail to remove object: {file.parent_id}/{file.location}")
informs = File2DocumentService.get_by_file_id(file.id)
for inform in informs:
doc_id = inform.document_id
e, doc = DocumentService.get_by_id(doc_id)
if e and doc:
tenant_id = DocumentService.get_tenant_id(doc_id)
if tenant_id:
DocumentService.remove_document(doc, tenant_id)
File2DocumentService.delete_by_file_id(file.id)
FileService.delete(file)
def _delete_folder_recursive(folder, tenant_id):
sub_files = FileService.list_all_files_by_parent_id(folder.id)
for sub_file in sub_files:
if sub_file.type == FileType.FOLDER.value:
_delete_folder_recursive(sub_file, tenant_id)
else:
_delete_single_file(sub_file)
FileService.delete(folder)
try: try:
for file_id in file_ids: for file_id in file_ids:
e, file = FileService.get_by_id(file_id) e, file = FileService.get_by_id(file_id)
if not e: if not e or not file:
return get_data_error_result(message="File or Folder not found!") return get_data_error_result(message="File or Folder not found!")
if not file.tenant_id: if not file.tenant_id:
return get_data_error_result(message="Tenant not found!") return get_data_error_result(message="Tenant not found!")
if not check_file_team_permission(file, current_user.id): if not check_file_team_permission(file, current_user.id):
return get_json_result(data=False, message='No authorization.', code=settings.RetCode.AUTHENTICATION_ERROR) return get_json_result(data=False, message="No authorization.", code=settings.RetCode.AUTHENTICATION_ERROR)
if file.source_type == FileSource.KNOWLEDGEBASE: if file.source_type == FileSource.KNOWLEDGEBASE:
continue continue
if file.type == FileType.FOLDER.value: if file.type == FileType.FOLDER.value:
file_id_list = FileService.get_all_innermost_file_ids(file_id, []) _delete_folder_recursive(file, current_user.id)
for inner_file_id in file_id_list: continue
e, file = FileService.get_by_id(inner_file_id)
if not e:
return get_data_error_result(message="File not found!")
STORAGE_IMPL.rm(file.parent_id, file.location)
FileService.delete_folder_by_pf_id(current_user.id, file_id)
else:
STORAGE_IMPL.rm(file.parent_id, file.location)
if not FileService.delete(file):
return get_data_error_result(
message="Database error (File removal)!")
# delete file2document _delete_single_file(file)
informs = File2DocumentService.get_by_file_id(file_id)
for inform in informs:
doc_id = inform.document_id
e, doc = DocumentService.get_by_id(doc_id)
if not e:
return get_data_error_result(message="Document not found!")
tenant_id = DocumentService.get_tenant_id(doc_id)
if not tenant_id:
return get_data_error_result(message="Tenant not found!")
if not DocumentService.remove_document(doc, tenant_id):
return get_data_error_result(
message="Database error (Document removal)!")
File2DocumentService.delete_by_file_id(file_id)
return get_json_result(data=True) return get_json_result(data=True)
except Exception as e: except Exception as e:
return server_error_response(e) return server_error_response(e)
@ -355,31 +365,89 @@ def get(file_id):
return server_error_response(e) return server_error_response(e)
@manager.route('/mv', methods=['POST']) # noqa: F821 @manager.route("/mv", methods=["POST"]) # noqa: F821
@login_required @login_required
@validate_request("src_file_ids", "dest_file_id") @validate_request("src_file_ids", "dest_file_id")
def move(): def move():
req = request.json req = request.json
try: try:
file_ids = req["src_file_ids"] file_ids = req["src_file_ids"]
parent_id = req["dest_file_id"] dest_parent_id = req["dest_file_id"]
ok, dest_folder = FileService.get_by_id(dest_parent_id)
if not ok or not dest_folder:
return get_data_error_result(message="Parent Folder not found!")
files = FileService.get_by_ids(file_ids) files = FileService.get_by_ids(file_ids)
files_dict = {} if not files:
for file in files: return get_data_error_result(message="Source files not found!")
files_dict[file.id] = file
files_dict = {f.id: f for f in files}
for file_id in file_ids: for file_id in file_ids:
file = files_dict[file_id] file = files_dict.get(file_id)
if not file: if not file:
return get_data_error_result(message="File or Folder not found!") return get_data_error_result(message="File or Folder not found!")
if not file.tenant_id: if not file.tenant_id:
return get_data_error_result(message="Tenant not found!") return get_data_error_result(message="Tenant not found!")
if not check_file_team_permission(file, current_user.id): if not check_file_team_permission(file, current_user.id):
return get_json_result(data=False, message='No authorization.', code=settings.RetCode.AUTHENTICATION_ERROR) return get_json_result(
fe, _ = FileService.get_by_id(parent_id) data=False,
if not fe: message="No authorization.",
return get_data_error_result(message="Parent Folder not found!") code=settings.RetCode.AUTHENTICATION_ERROR,
FileService.move_file(file_ids, parent_id) )
def _move_entry_recursive(source_file_entry, dest_folder):
if source_file_entry.type == FileType.FOLDER.value:
existing_folder = FileService.query(name=source_file_entry.name, parent_id=dest_folder.id)
if existing_folder:
new_folder = existing_folder[0]
else:
new_folder = FileService.insert(
{
"id": get_uuid(),
"parent_id": dest_folder.id,
"tenant_id": source_file_entry.tenant_id,
"created_by": current_user.id,
"name": source_file_entry.name,
"location": "",
"size": 0,
"type": FileType.FOLDER.value,
}
)
sub_files = FileService.list_all_files_by_parent_id(source_file_entry.id)
for sub_file in sub_files:
_move_entry_recursive(sub_file, new_folder)
FileService.delete_by_id(source_file_entry.id)
return
old_parent_id = source_file_entry.parent_id
old_location = source_file_entry.location
filename = source_file_entry.name
new_location = filename
while STORAGE_IMPL.obj_exist(dest_folder.id, new_location):
new_location += "_"
try:
STORAGE_IMPL.move(old_parent_id, old_location, dest_folder.id, new_location)
except Exception as storage_err:
raise RuntimeError(f"Move file failed at storage layer: {str(storage_err)}")
FileService.update_by_id(
source_file_entry.id,
{
"parent_id": dest_folder.id,
"location": new_location,
},
)
for file in files:
_move_entry_recursive(file, dest_folder)
return get_json_result(data=True) return get_json_result(data=True)
except Exception as e: except Exception as e:
return server_error_response(e) return server_error_response(e)

View File

@ -476,6 +476,16 @@ class FileService(CommonService):
return err, files return err, files
@classmethod
@DB.connection_context()
def list_all_files_by_parent_id(cls, parent_id):
try:
files = cls.model.select().where((cls.model.parent_id == parent_id) & (cls.model.id != parent_id))
return list(files)
except Exception:
logging.exception("list_by_parent_id failed")
raise RuntimeError("Database error (list_by_parent_id)!")
@staticmethod @staticmethod
def parse_docs(file_objs, user_id): def parse_docs(file_objs, user_id):
exe = ThreadPoolExecutor(max_workers=12) exe = ThreadPoolExecutor(max_workers=12)

View File

@ -17,6 +17,7 @@
import logging import logging
import time import time
from minio import Minio from minio import Minio
from minio.commonconfig import CopySource
from minio.error import S3Error from minio.error import S3Error
from io import BytesIO from io import BytesIO
from rag import settings from rag import settings
@ -141,3 +142,36 @@ class RAGFlowMinio:
except Exception: except Exception:
logging.exception(f"Fail to remove bucket {bucket}") logging.exception(f"Fail to remove bucket {bucket}")
def copy(self, src_bucket, src_path, dest_bucket, dest_path):
try:
if not self.conn.bucket_exists(dest_bucket):
self.conn.make_bucket(dest_bucket)
try:
self.conn.stat_object(src_bucket, src_path)
except Exception as e:
logging.exception(f"Source object not found: {src_bucket}/{src_path}, {e}")
return False
self.conn.copy_object(
dest_bucket,
dest_path,
CopySource(src_bucket, src_path),
)
return True
except Exception:
logging.exception(f"Fail to copy {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}")
return False
def move(self, src_bucket, src_path, dest_bucket, dest_path):
try:
if self.copy(src_bucket, src_path, dest_bucket, dest_path):
self.rm(src_bucket, src_path)
return True
else:
logging.error(f"Copy failed, move aborted: {src_bucket}/{src_path}")
return False
except Exception:
logging.exception(f"Fail to move {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}")
return False