mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
update sdk document and chunk (#2421)
### What problem does this PR solve? _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
committed by
GitHub
parent
7e75b9d778
commit
99a7c0fb97
@ -6,4 +6,5 @@ from .ragflow import RAGFlow
|
||||
from .modules.dataset import DataSet
|
||||
from .modules.assistant import Assistant
|
||||
from .modules.session import Session
|
||||
from .modules.document import Document
|
||||
from .modules.document import Document
|
||||
from .modules.chunk import Chunk
|
||||
34
sdk/python/ragflow/modules/chunk.py
Normal file
34
sdk/python/ragflow/modules/chunk.py
Normal file
@ -0,0 +1,34 @@
|
||||
from .base import Base
|
||||
|
||||
|
||||
class Chunk(Base):
|
||||
def __init__(self, rag, res_dict):
|
||||
# 初始化类的属性
|
||||
self.id = ""
|
||||
self.content_with_weight = ""
|
||||
self.content_ltks = []
|
||||
self.content_sm_ltks = []
|
||||
self.important_kwd = []
|
||||
self.important_tks = []
|
||||
self.create_time = ""
|
||||
self.create_timestamp_flt = 0.0
|
||||
self.kb_id = None
|
||||
self.docnm_kwd = ""
|
||||
self.doc_id = ""
|
||||
self.q_vec = []
|
||||
self.status = "1"
|
||||
for k, v in res_dict.items():
|
||||
if hasattr(self, k):
|
||||
setattr(self, k, v)
|
||||
|
||||
super().__init__(rag, res_dict)
|
||||
def delete(self) -> bool:
|
||||
"""
|
||||
Delete the chunk in the document.
|
||||
"""
|
||||
res = self.rm('/doc/chunk/rm',
|
||||
{"doc_id": [self.id],""})
|
||||
res = res.json()
|
||||
if res.get("retmsg") == "success":
|
||||
return True
|
||||
raise Exception(res["retmsg"])
|
||||
@ -1,6 +1,7 @@
|
||||
import time
|
||||
|
||||
from .base import Base
|
||||
|
||||
from .chunk import Chunk
|
||||
|
||||
|
||||
class Document(Base):
|
||||
@ -21,6 +22,8 @@ class Document(Base):
|
||||
self.progress_msg = ""
|
||||
self.process_begin_at = None
|
||||
self.process_duration = 0.0
|
||||
self.run = "0"
|
||||
self.status = "1"
|
||||
for k in list(res_dict.keys()):
|
||||
if k not in self.__dict__:
|
||||
res_dict.pop(k)
|
||||
@ -61,7 +64,7 @@ class Document(Base):
|
||||
:return: The downloaded document content in bytes.
|
||||
"""
|
||||
# Construct the URL for the API request using the document ID and knowledge base ID
|
||||
res = self.get(f"/doc/{self.kb_id}/documents/{self.id}",
|
||||
res = self.get(f"/doc/{self.id}",
|
||||
{"headers": self.rag.authorization_header, "id": self.id, "name": self.name, "stream": True})
|
||||
|
||||
# Check the response status code to ensure the request was successful
|
||||
@ -73,3 +76,121 @@ class Document(Base):
|
||||
raise Exception(
|
||||
f"Failed to download document. Server responded with: {res.status_code}, {res.text}"
|
||||
)
|
||||
|
||||
def async_parse(self):
|
||||
"""
|
||||
Initiate document parsing asynchronously without waiting for completion.
|
||||
"""
|
||||
try:
|
||||
# Construct request data including document ID and run status (assuming 1 means to run)
|
||||
data = {"doc_ids": [self.id], "run": 1}
|
||||
|
||||
# Send a POST request to the specified parsing status endpoint to start parsing
|
||||
res = self.post(f'/doc/run', data)
|
||||
|
||||
# Check the server response status code
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Failed to start async parsing: {res.text}")
|
||||
|
||||
print("Async parsing started successfully.")
|
||||
|
||||
except Exception as e:
|
||||
# Catch and handle exceptions
|
||||
print(f"Error occurred during async parsing: {str(e)}")
|
||||
raise
|
||||
|
||||
import time
|
||||
|
||||
def join(self, interval=5, timeout=3600):
|
||||
"""
|
||||
Wait for the asynchronous parsing to complete and yield parsing progress periodically.
|
||||
|
||||
:param interval: The time interval (in seconds) for progress reports.
|
||||
:param timeout: The timeout (in seconds) for the parsing operation.
|
||||
:return: An iterator yielding parsing progress and messages.
|
||||
"""
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
# Check the parsing status
|
||||
res = self.get(f'/doc/{self.id}/status', {"doc_ids": [self.id]})
|
||||
res_data = res.json()
|
||||
data = res_data.get("data", [])
|
||||
|
||||
# Retrieve progress and status message
|
||||
progress = data.get("progress", 0)
|
||||
progress_msg = data.get("status", "")
|
||||
|
||||
yield progress, progress_msg # Yield progress and message
|
||||
|
||||
if progress == 100: # Parsing completed
|
||||
break
|
||||
|
||||
time.sleep(interval)
|
||||
|
||||
def cancel(self):
|
||||
"""
|
||||
Cancel the parsing task for the document.
|
||||
"""
|
||||
try:
|
||||
# Construct request data, including document ID and action to cancel (assuming 2 means cancel)
|
||||
data = {"doc_ids": [self.id], "run": 2}
|
||||
|
||||
# Send a POST request to the specified parsing status endpoint to cancel parsing
|
||||
res = self.post(f'/doc/run', data)
|
||||
|
||||
# Check the server response status code
|
||||
if res.status_code != 200:
|
||||
print("Failed to cancel parsing. Server response:", res.text)
|
||||
else:
|
||||
print("Parsing cancelled successfully.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error occurred during async parsing cancellation: {str(e)}")
|
||||
raise
|
||||
|
||||
def list_chunks(self, page=1, offset=0, limit=12,size=30, keywords="", available_int=None):
|
||||
"""
|
||||
List all chunks associated with this document by calling the external API.
|
||||
|
||||
Args:
|
||||
page (int): The page number to retrieve (default 1).
|
||||
size (int): The number of chunks per page (default 30).
|
||||
keywords (str): Keywords for searching specific chunks (default "").
|
||||
available_int (int): Filter for available chunks (optional).
|
||||
|
||||
Returns:
|
||||
list: A list of chunks returned from the API.
|
||||
"""
|
||||
data = {
|
||||
"doc_id": self.id,
|
||||
"page": page,
|
||||
"size": size,
|
||||
"keywords": keywords,
|
||||
"offset":offset,
|
||||
"limit":limit
|
||||
}
|
||||
|
||||
if available_int is not None:
|
||||
data["available_int"] = available_int
|
||||
|
||||
res = self.post(f'/doc/chunk/list', data)
|
||||
if res.status_code == 200:
|
||||
res_data = res.json()
|
||||
if res_data.get("retmsg") == "success":
|
||||
chunks = res_data["data"]["chunks"]
|
||||
self.chunks = chunks # Store the chunks in the document instance
|
||||
return chunks
|
||||
else:
|
||||
raise Exception(f"Error fetching chunks: {res_data.get('retmsg')}")
|
||||
else:
|
||||
raise Exception(f"API request failed with status code {res.status_code}")
|
||||
|
||||
def add_chunk(self, content: str):
|
||||
res = self.post('/doc/chunk/create', {"doc_id": self.id, "content_with_weight":content})
|
||||
|
||||
# 假设返回的 response 包含 chunk 的信息
|
||||
if res.status_code == 200:
|
||||
chunk_data = res.json()
|
||||
return Chunk(self.rag,chunk_data) # 假设有一个 Chunk 类来处理 chunk 对象
|
||||
else:
|
||||
raise Exception(f"Failed to add chunk: {res.status_code} {res.text}")
|
||||
|
||||
@ -171,3 +171,50 @@ class RAGFlow:
|
||||
return Document(self, res['data'])
|
||||
raise Exception(res["retmsg"])
|
||||
|
||||
def async_parse_documents(self, doc_ids):
|
||||
"""
|
||||
Asynchronously start parsing multiple documents without waiting for completion.
|
||||
|
||||
:param doc_ids: A list containing multiple document IDs.
|
||||
"""
|
||||
try:
|
||||
if not doc_ids or not isinstance(doc_ids, list):
|
||||
raise ValueError("doc_ids must be a non-empty list of document IDs")
|
||||
|
||||
data = {"doc_ids": doc_ids, "run": 1}
|
||||
|
||||
res = self.post(f'/doc/run', data)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Failed to start async parsing for documents: {res.text}")
|
||||
|
||||
print(f"Async parsing started successfully for documents: {doc_ids}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error occurred during async parsing for documents: {str(e)}")
|
||||
raise
|
||||
|
||||
def async_cancel_parse_documents(self, doc_ids):
|
||||
"""
|
||||
Cancel the asynchronous parsing of multiple documents.
|
||||
|
||||
:param doc_ids: A list containing multiple document IDs.
|
||||
"""
|
||||
try:
|
||||
if not doc_ids or not isinstance(doc_ids, list):
|
||||
raise ValueError("doc_ids must be a non-empty list of document IDs")
|
||||
data = {"doc_ids": doc_ids, "run": 2}
|
||||
|
||||
|
||||
res = self.post(f'/doc/run', data)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Failed to cancel async parsing for documents: {res.text}")
|
||||
|
||||
print(f"Async parsing canceled successfully for documents: {doc_ids}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error occurred during canceling parsing for documents: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user