From 6722b3d558b3d305af3f1333655563f09e9f128f Mon Sep 17 00:00:00 2001 From: JobSmithManipulation <143315462+JobSmithManipulation@users.noreply.github.com> Date: Thu, 12 Sep 2024 14:19:45 +0800 Subject: [PATCH] update sdk document (#2374) ### What problem does this PR solve? _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Kevin Hu --- api/apps/sdk/doc.py | 180 +++++++++++++++++++++++++ sdk/python/ragflow/__init__.py | 3 +- sdk/python/ragflow/modules/dataset.py | 40 ++++++ sdk/python/ragflow/modules/document.py | 75 +++++++++++ sdk/python/ragflow/ragflow.py | 31 ++++- sdk/python/test/ragflow.txt | 1 + sdk/python/test/t_document.py | 144 ++++++++++++++++++++ 7 files changed, 472 insertions(+), 2 deletions(-) create mode 100644 api/apps/sdk/doc.py create mode 100644 sdk/python/ragflow/modules/document.py create mode 100644 sdk/python/test/ragflow.txt create mode 100644 sdk/python/test/t_document.py diff --git a/api/apps/sdk/doc.py b/api/apps/sdk/doc.py new file mode 100644 index 000000000..8ec10592f --- /dev/null +++ b/api/apps/sdk/doc.py @@ -0,0 +1,180 @@ +from io import BytesIO + +from flask import request,send_file +from api.utils.api_utils import get_json_result, construct_json_result, server_error_response +from api.utils.api_utils import get_json_result, token_required, get_data_error_result +from api.db import FileType, ParserType, FileSource, TaskStatus +from api.db.db_models import File +from api.db.services.document_service import DocumentService +from api.db.services.file2document_service import File2DocumentService +from api.db.services.file_service import FileService +from api.db.services.knowledgebase_service import KnowledgebaseService +from api.db.services.user_service import TenantService, UserTenantService +from api.settings import RetCode +from api.utils.api_utils import construct_json_result, construct_error_response +from rag.utils.storage_factory import STORAGE_IMPL + + +@manager.route('/dataset//documents/upload', methods=['POST']) +@token_required +def upload(dataset_id, tenant_id): + if 'file' not in request.files: + return get_json_result( + data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR) + file_objs = request.files.getlist('file') + for file_obj in file_objs: + if file_obj.filename == '': + return get_json_result( + data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR) + e, kb = KnowledgebaseService.get_by_id(dataset_id) + if not e: + raise LookupError(f"Can't find the knowledgebase with ID {dataset_id}!") + err, _ = FileService.upload_document(kb, file_objs, tenant_id) + if err: + return get_json_result( + data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR) + return get_json_result(data=True) + + +@manager.route('/infos', methods=['GET']) +@token_required +def docinfos(tenant_id): + req = request.args + if "id" in req: + doc_id = req["id"] + e, doc = DocumentService.get_by_id(doc_id) + return get_json_result(data=doc.to_json()) + if "name" in req: + doc_name = req["name"] + doc_id = DocumentService.get_doc_id_by_doc_name(doc_name) + e, doc = DocumentService.get_by_id(doc_id) + return get_json_result(data=doc.to_json()) + + +@manager.route('/save', methods=['POST']) +@token_required +def save_doc(tenant_id): + req = request.json # Expecting JSON input + if "id" in req: + doc_id = req["id"] + if "name" in req: + doc_name = req["name"] + doc_id = DocumentService.get_doc_id_by_doc_name(doc_name) + data = request.json + # Call the update method with the provided id and data + try: + num = DocumentService.update_by_id(doc_id, data) + if num > 0: + return get_json_result(retmsg="success", data={"updated_count": num}) + else: + return get_json_result(retcode=404, retmsg="Document not found") + except Exception as e: + return get_json_result(retmsg=f"Error occurred: {str(e)}") + + +@manager.route("//documents/", methods=["GET"]) +@token_required +def download_document(dataset_id, document_id): + try: + # Check whether there is this dataset + exist, _ = KnowledgebaseService.get_by_id(dataset_id) + if not exist: + return construct_json_result(code=RetCode.DATA_ERROR, + message=f"This dataset '{dataset_id}' cannot be found!") + + # Check whether there is this document + exist, document = DocumentService.get_by_id(document_id) + if not exist: + return construct_json_result(message=f"This document '{document_id}' cannot be found!", + code=RetCode.ARGUMENT_ERROR) + + # The process of downloading + doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address + file_stream = STORAGE_IMPL.get(doc_id, doc_location) + if not file_stream: + return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) + + file = BytesIO(file_stream) + + # Use send_file with a proper filename and MIME type + return send_file( + file, + as_attachment=True, + download_name=document.name, + mimetype='application/octet-stream' # Set a default MIME type + ) + + # Error + except Exception as e: + return construct_error_response(e) + +@manager.route('/dataset//documents', methods=['GET']) +@token_required +def list_docs(dataset_id,tenant_id): + kb_id = request.args.get("kb_id") + if not kb_id: + return get_json_result( + data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR) + tenants = UserTenantService.query(user_id=tenant_id) + for tenant in tenants: + if KnowledgebaseService.query( + tenant_id=tenant.tenant_id, id=kb_id): + break + else: + return get_json_result( + data=False, retmsg=f'Only owner of knowledgebase authorized for this operation.', + retcode=RetCode.OPERATING_ERROR) + keywords = request.args.get("keywords", "") + + page_number = int(request.args.get("page", 1)) + items_per_page = int(request.args.get("page_size", 15)) + orderby = request.args.get("orderby", "create_time") + desc = request.args.get("desc", True) + try: + docs, tol = DocumentService.get_by_kb_id( + kb_id, page_number, items_per_page, orderby, desc, keywords) + return get_json_result(data={"total": tol, "docs": docs}) + except Exception as e: + return server_error_response(e) + + +@manager.route('/delete', methods=['DELETE']) +@token_required +def rm(tenant_id): + req = request.args + if "doc_id" not in req: + return get_data_error_result( + retmsg="doc_id is required") + doc_ids = req["doc_id"] + if isinstance(doc_ids, str): doc_ids = [doc_ids] + root_folder = FileService.get_root_folder(tenant_id) + pf_id = root_folder["id"] + FileService.init_knowledgebase_docs(pf_id, tenant_id) + errors = "" + for doc_id in doc_ids: + try: + e, doc = DocumentService.get_by_id(doc_id) + if not e: + return get_data_error_result(retmsg="Document not found!") + tenant_id = DocumentService.get_tenant_id(doc_id) + if not tenant_id: + return get_data_error_result(retmsg="Tenant not found!") + + b, n = File2DocumentService.get_minio_address(doc_id=doc_id) + + if not DocumentService.remove_document(doc, tenant_id): + return get_data_error_result( + retmsg="Database error (Document removal)!") + + f2d = File2DocumentService.get_by_document_id(doc_id) + FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) + File2DocumentService.delete_by_document_id(doc_id) + + STORAGE_IMPL.rm(b, n) + except Exception as e: + errors += str(e) + + if errors: + return get_json_result(data=False, retmsg=errors, retcode=RetCode.SERVER_ERROR) + + return get_json_result(data=True,retmsg="success") diff --git a/sdk/python/ragflow/__init__.py b/sdk/python/ragflow/__init__.py index 696594883..7dc296fd0 100644 --- a/sdk/python/ragflow/__init__.py +++ b/sdk/python/ragflow/__init__.py @@ -5,4 +5,5 @@ __version__ = importlib.metadata.version("ragflow") from .ragflow import RAGFlow from .modules.dataset import DataSet from .modules.assistant import Assistant -from .modules.session import Session \ No newline at end of file +from .modules.session import Session +from .modules.document import Document \ No newline at end of file diff --git a/sdk/python/ragflow/modules/dataset.py b/sdk/python/ragflow/modules/dataset.py index e9f7b3afc..a0ef8ffed 100644 --- a/sdk/python/ragflow/modules/dataset.py +++ b/sdk/python/ragflow/modules/dataset.py @@ -1,3 +1,7 @@ +from typing import Optional, List + +from .document import Document + from .base import Base @@ -46,3 +50,39 @@ class DataSet(Base): res = res.json() if res.get("retmsg") == "success": return True raise Exception(res["retmsg"]) + + def list_docs(self, keywords: Optional[str] = None, offset: int = 0, limit: int = -1) -> List[Document]: + """ + List the documents in the dataset, optionally filtering by keywords, with pagination support. + + Args: + keywords (Optional[str]): A string of keywords to filter the documents. Defaults to None. + offset (int): The starting point for pagination. Defaults to 0. + limit (int): The maximum number of documents to return. Defaults to -1 (no limit). + + Returns: + List[Document]: A list of Document objects. + """ + # Construct the request payload for listing documents + payload = { + "kb_id": self.id, + "keywords": keywords, + "offset": offset, + "limit": limit + } + + # Send the request to the server to list documents + res = self.get(f'/doc/dataset/{self.id}/documents', payload) + res_json = res.json() + + # Handle response and error checking + if res_json.get("retmsg") != "success": + raise Exception(res_json.get("retmsg")) + + # Parse the document data from the response + documents = [] + for doc_data in res_json["data"].get("docs", []): + doc = Document(self.rag, doc_data) + documents.append(doc) + + return documents diff --git a/sdk/python/ragflow/modules/document.py b/sdk/python/ragflow/modules/document.py new file mode 100644 index 000000000..55aab0278 --- /dev/null +++ b/sdk/python/ragflow/modules/document.py @@ -0,0 +1,75 @@ + +from .base import Base + + + +class Document(Base): + def __init__(self, rag, res_dict): + self.id = "" + self.name = "" + self.thumbnail = None + self.kb_id = None + self.parser_method = "" + self.parser_config = {"pages": [[1, 1000000]]} + self.source_type = "local" + self.type = "" + self.created_by = "" + self.size = 0 + self.token_num = 0 + self.chunk_num = 0 + self.progress = 0.0 + self.progress_msg = "" + self.process_begin_at = None + self.process_duration = 0.0 + for k in list(res_dict.keys()): + if k not in self.__dict__: + res_dict.pop(k) + super().__init__(rag, res_dict) + + def save(self) -> bool: + """ + Save the document details to the server. + """ + res = self.post('/doc/save', + {"id": self.id, "name": self.name, "thumbnail": self.thumbnail, "kb_id": self.kb_id, + "parser_id": self.parser_method, "parser_config": self.parser_config.to_json(), + "source_type": self.source_type, "type": self.type, "created_by": self.created_by, + "size": self.size, "token_num": self.token_num, "chunk_num": self.chunk_num, + "progress": self.progress, "progress_msg": self.progress_msg, + "process_begin_at": self.process_begin_at, "process_duation": self.process_duration + }) + res = res.json() + if res.get("retmsg") == "success": + return True + raise Exception(res["retmsg"]) + + def delete(self) -> bool: + """ + Delete the document from the server. + """ + res = self.rm('/doc/delete', + {"doc_id": self.id}) + res = res.json() + if res.get("retmsg") == "success": + return True + raise Exception(res["retmsg"]) + + def download(self) -> bytes: + """ + Download the document content from the server using the Flask API. + + :return: The downloaded document content in bytes. + """ + # Construct the URL for the API request using the document ID and knowledge base ID + res = self.get(f"/doc/{self.kb_id}/documents/{self.id}", + {"headers": self.rag.authorization_header, "id": self.id, "name": self.name, "stream": True}) + + # Check the response status code to ensure the request was successful + if res.status_code == 200: + # Return the document content as bytes + return res.content + else: + # Handle the error and raise an exception + raise Exception( + f"Failed to download document. Server responded with: {res.status_code}, {res.text}" + ) diff --git a/sdk/python/ragflow/ragflow.py b/sdk/python/ragflow/ragflow.py index 80ac415fc..5a5ee1a3c 100644 --- a/sdk/python/ragflow/ragflow.py +++ b/sdk/python/ragflow/ragflow.py @@ -19,7 +19,7 @@ import requests from .modules.assistant import Assistant from .modules.dataset import DataSet - +from .modules.document import Document class RAGFlow: def __init__(self, user_key, base_url, version='v1'): @@ -142,3 +142,32 @@ class RAGFlow: result_list.append(Assistant(self, data)) return result_list raise Exception(res["retmsg"]) + + def create_document(self, ds:DataSet, name: str, blob: bytes) -> bool: + url = f"/doc/dataset/{ds.id}/documents/upload" + files = { + 'file': (name, blob) + } + data = { + 'kb_id': ds.id + } + headers = { + 'Authorization': f"Bearer {ds.rag.user_key}" + } + + response = requests.post(self.api_url + url, data=data, files=files, + headers=headers) + + if response.status_code == 200 and response.json().get('retmsg') == 'success': + return True + else: + raise Exception(f"Upload failed: {response.json().get('retmsg')}") + + return False + def get_document(self, id: str = None, name: str = None) -> Document: + res = self.get("/doc/infos", {"id": id, "name": name}) + res = res.json() + if res.get("retmsg") == "success": + return Document(self, res['data']) + raise Exception(res["retmsg"]) + diff --git a/sdk/python/test/ragflow.txt b/sdk/python/test/ragflow.txt new file mode 100644 index 000000000..59b7a2879 --- /dev/null +++ b/sdk/python/test/ragflow.txt @@ -0,0 +1 @@ +{"data":null,"retcode":100,"retmsg":"TypeError(\"download_document() got an unexpected keyword argument 'tenant_id'\")"} diff --git a/sdk/python/test/t_document.py b/sdk/python/test/t_document.py new file mode 100644 index 000000000..4a9ce30fa --- /dev/null +++ b/sdk/python/test/t_document.py @@ -0,0 +1,144 @@ +from ragflow import RAGFlow, DataSet, Document + +from common import API_KEY, HOST_ADDRESS +from test_sdkbase import TestSdk + + +class TestDocument(TestSdk): + def test_upload_document_with_success(self): + """ + Test ingesting a document into a dataset with success. + """ + # Initialize RAGFlow instance + rag = RAGFlow(API_KEY, HOST_ADDRESS) + + # Step 1: Create a new dataset + ds = rag.create_dataset(name="God") + + # Ensure dataset creation was successful + assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}" + assert ds.name == "God", "Dataset name does not match." + + # Step 2: Create a new document + # The blob is the actual file content or a placeholder in this case + name = "TestDocument.txt" + blob = b"Sample document content for ingestion test." + + res = rag.create_document(ds, name=name, blob=blob) + + # Ensure document ingestion was successful + assert res is True, f"Failed to create document, error: {res}" + + def test_get_detail_document_with_success(self): + """ + Test getting a document's detail with success + """ + rag = RAGFlow(API_KEY, HOST_ADDRESS) + doc = rag.get_document(name="TestDocument.txt") + assert isinstance(doc, Document), f"Failed to get dataset, error: {doc}." + assert doc.name == "TestDocument.txt", "Name does not match" + + def test_update_document_with_success(self): + """ + Test updating a document with success. + """ + rag = RAGFlow(API_KEY, HOST_ADDRESS) + doc = rag.get_document(name="TestDocument.txt") + if isinstance(doc, Document): + doc.parser_method = "manual" + res = doc.save() + assert res is True, f"Failed to update document, error: {res}" + else: + assert False, f"Failed to get document, error: {doc}" + + def test_download_document_with_success(self): + """ + Test downloading a document with success. + """ + # Initialize RAGFlow instance + rag = RAGFlow(API_KEY, HOST_ADDRESS) + + # Retrieve a document + doc = rag.get_document(name="TestDocument.txt") + + # Check if the retrieved document is of type Document + if isinstance(doc, Document): + # Download the document content and save it to a file + try: + with open("ragflow.txt", "wb+") as file: + file.write(doc.download()) + # Print the document object for debugging + print(doc) + + # Assert that the download was successful + assert True, "Document downloaded successfully." + except Exception as e: + # If an error occurs, raise an assertion error + assert False, f"Failed to download document, error: {str(e)}" + else: + # If the document retrieval fails, assert failure + assert False, f"Failed to get document, error: {doc}" + + def test_list_all_documents_in_dataset_with_success(self): + """ + Test list all documents into a dataset with success. + """ + # Initialize RAGFlow instance + rag = RAGFlow(API_KEY, HOST_ADDRESS) + + # Step 1: Create a new dataset + ds = rag.create_dataset(name="God2") + + # Ensure dataset creation was successful + assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}" + assert ds.name == "God2", "Dataset name does not match." + + # Step 2: Create a new document + # The blob is the actual file content or a placeholder in this case + name1 = "Test Document111.txt" + blob1 = b"Sample document content for ingestion test111." + name2 = "Test Document222.txt" + blob2 = b"Sample document content for ingestion test222." + + rag.create_document(ds, name=name1, blob=blob1) + rag.create_document(ds, name=name2, blob=blob2) + for d in ds.list_docs(keywords="test", offset=0, limit=12): + assert isinstance(d, Document) + print(d) + + def test_delete_documents_in_dataset_with_success(self): + """ + Test list all documents into a dataset with success. + """ + # Initialize RAGFlow instance + rag = RAGFlow(API_KEY, HOST_ADDRESS) + + # Step 1: Create a new dataset + ds = rag.create_dataset(name="God3") + + # Ensure dataset creation was successful + assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}" + assert ds.name == "God3", "Dataset name does not match." + + # Step 2: Create a new document + # The blob is the actual file content or a placeholder in this case + name1 = "Test Document333.txt" + blob1 = b"Sample document content for ingestion test333." + name2 = "Test Document444.txt" + blob2 = b"Sample document content for ingestion test444." + name3='test.txt' + path='test_data/test.txt' + rag.create_document(ds, name=name3, blob=open(path, "rb").read()) + rag.create_document(ds, name=name1, blob=blob1) + rag.create_document(ds, name=name2, blob=blob2) + for d in ds.list_docs(keywords="document", offset=0, limit=12): + assert isinstance(d, Document) + d.delete() + print(d) + remaining_docs = ds.list_docs(keywords="rag", offset=0, limit=12) + assert len(remaining_docs) == 0, "Documents were not properly deleted." + + + + +