mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Feat: Add parse_document with feed back (#10523)
### What problem does this PR solve? Solved: Sync Parse Document API #5635 Feat: Add parse_document with feed back, user can view the status of each document after parsing finished. ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Documentation Update
This commit is contained in:
@ -698,6 +698,58 @@ print("Async bulk parsing initiated.")
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
### Parse documents (with document status)
|
||||||
|
|
||||||
|
```python
|
||||||
|
DataSet.parse_documents(document_ids: list[str]) -> list[tuple[str, str, int, int]]
|
||||||
|
```
|
||||||
|
|
||||||
|
Parses documents **synchronously** in the current dataset.
|
||||||
|
This method wraps `async_parse_documents()` and automatically waits for all parsing tasks to complete.
|
||||||
|
It returns detailed parsing results, including the status and statistics for each document.
|
||||||
|
If interrupted by the user (e.g. `Ctrl+C`), all pending parsing jobs will be cancelled gracefully.
|
||||||
|
|
||||||
|
#### Parameters
|
||||||
|
|
||||||
|
##### document_ids: `list[str]`, *Required*
|
||||||
|
|
||||||
|
The IDs of the documents to parse.
|
||||||
|
|
||||||
|
#### Returns
|
||||||
|
|
||||||
|
A list of tuples with detailed parsing results:
|
||||||
|
```python
|
||||||
|
[
|
||||||
|
(document_id: str, status: str, chunk_count: int, token_count: int),
|
||||||
|
...
|
||||||
|
]
|
||||||
|
```
|
||||||
|
- **status** — Final parsing state (`success`, `failed`, `cancelled`, etc.)
|
||||||
|
- **chunk_count** — Number of content chunks created for the document.
|
||||||
|
- **token_count** — Total number of tokens processed.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### Example
|
||||||
|
|
||||||
|
```python
|
||||||
|
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
|
||||||
|
dataset = rag_object.create_dataset(name="dataset_name")
|
||||||
|
documents = dataset.list_documents(keywords="test")
|
||||||
|
ids = [doc.id for doc in documents]
|
||||||
|
|
||||||
|
try:
|
||||||
|
finished = dataset.parse_documents(ids)
|
||||||
|
for doc_id, status, chunk_count, token_count in finished:
|
||||||
|
print(f"Document {doc_id} parsing finished with status: {status}, chunks: {chunk_count}, tokens: {token_count}")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\nParsing interrupted by user. All pending tasks have been cancelled.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Parsing failed: {e}")
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
### Stop parsing documents
|
### Stop parsing documents
|
||||||
|
|
||||||
```python
|
```python
|
||||||
|
|||||||
@ -101,12 +101,51 @@ class DataSet(Base):
|
|||||||
if res.get("code") != 0:
|
if res.get("code") != 0:
|
||||||
raise Exception(res["message"])
|
raise Exception(res["message"])
|
||||||
|
|
||||||
|
|
||||||
|
def _get_documents_status(self, document_ids):
|
||||||
|
import time
|
||||||
|
terminal_states = {"DONE", "FAIL", "CANCEL"}
|
||||||
|
interval_sec = 1
|
||||||
|
pending = set(document_ids)
|
||||||
|
finished = []
|
||||||
|
while pending:
|
||||||
|
for doc_id in list(pending):
|
||||||
|
def fetch_doc(doc_id: str) -> Document | None:
|
||||||
|
try:
|
||||||
|
docs = self.list_documents(id=doc_id)
|
||||||
|
return docs[0] if docs else None
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
doc = fetch_doc(doc_id)
|
||||||
|
if doc is None:
|
||||||
|
continue
|
||||||
|
if isinstance(doc.run, str) and doc.run.upper() in terminal_states:
|
||||||
|
finished.append((doc_id, doc.run, doc.chunk_count, doc.token_count))
|
||||||
|
pending.discard(doc_id)
|
||||||
|
elif float(doc.progress or 0.0) >= 1.0:
|
||||||
|
finished.append((doc_id, "DONE", doc.chunk_count, doc.token_count))
|
||||||
|
pending.discard(doc_id)
|
||||||
|
if pending:
|
||||||
|
time.sleep(interval_sec)
|
||||||
|
return finished
|
||||||
|
|
||||||
def async_parse_documents(self, document_ids):
|
def async_parse_documents(self, document_ids):
|
||||||
res = self.post(f"/datasets/{self.id}/chunks", {"document_ids": document_ids})
|
res = self.post(f"/datasets/{self.id}/chunks", {"document_ids": document_ids})
|
||||||
res = res.json()
|
res = res.json()
|
||||||
if res.get("code") != 0:
|
if res.get("code") != 0:
|
||||||
raise Exception(res.get("message"))
|
raise Exception(res.get("message"))
|
||||||
|
|
||||||
|
|
||||||
|
def parse_documents(self, document_ids):
|
||||||
|
try:
|
||||||
|
self.async_parse_documents(document_ids)
|
||||||
|
self._get_documents_status(document_ids)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
self.async_cancel_parse_documents(document_ids)
|
||||||
|
|
||||||
|
return self._get_documents_status(document_ids)
|
||||||
|
|
||||||
|
|
||||||
def async_cancel_parse_documents(self, document_ids):
|
def async_cancel_parse_documents(self, document_ids):
|
||||||
res = self.rm(f"/datasets/{self.id}/chunks", {"document_ids": document_ids})
|
res = self.rm(f"/datasets/{self.id}/chunks", {"document_ids": document_ids})
|
||||||
res = res.json()
|
res = res.json()
|
||||||
|
|||||||
Reference in New Issue
Block a user