From 68e47c81d4362828afa9235ad82c22a3f1e1b47c Mon Sep 17 00:00:00 2001 From: Billy Bao Date: Tue, 14 Oct 2025 09:31:19 +0800 Subject: [PATCH] Feat: Add parse_document with feed back (#10523) ### What problem does this PR solve? Solved: Sync Parse Document API #5635 Feat: Add parse_document with feed back, user can view the status of each document after parsing finished. ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Documentation Update --- docs/references/python_api_reference.md | 52 +++++++++++++++++++++++ sdk/python/ragflow_sdk/modules/dataset.py | 41 +++++++++++++++++- 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/docs/references/python_api_reference.md b/docs/references/python_api_reference.md index abd1c393e..ba29fa425 100644 --- a/docs/references/python_api_reference.md +++ b/docs/references/python_api_reference.md @@ -698,6 +698,58 @@ print("Async bulk parsing initiated.") --- +### Parse documents (with document status) + +```python +DataSet.parse_documents(document_ids: list[str]) -> list[tuple[str, str, int, int]] +``` + +Parses documents **synchronously** in the current dataset. +This method wraps `async_parse_documents()` and automatically waits for all parsing tasks to complete. +It returns detailed parsing results, including the status and statistics for each document. +If interrupted by the user (e.g. `Ctrl+C`), all pending parsing jobs will be cancelled gracefully. + +#### Parameters + +##### document_ids: `list[str]`, *Required* + +The IDs of the documents to parse. + +#### Returns + +A list of tuples with detailed parsing results: +```python +[ + (document_id: str, status: str, chunk_count: int, token_count: int), + ... +] +``` +- **status** — Final parsing state (`success`, `failed`, `cancelled`, etc.) +- **chunk_count** — Number of content chunks created for the document. +- **token_count** — Total number of tokens processed. + +--- + +#### Example + +```python +rag_object = RAGFlow(api_key="", base_url="http://:9380") +dataset = rag_object.create_dataset(name="dataset_name") +documents = dataset.list_documents(keywords="test") +ids = [doc.id for doc in documents] + +try: + finished = dataset.parse_documents(ids) + for doc_id, status, chunk_count, token_count in finished: + print(f"Document {doc_id} parsing finished with status: {status}, chunks: {chunk_count}, tokens: {token_count}") +except KeyboardInterrupt: + print("\nParsing interrupted by user. All pending tasks have been cancelled.") +except Exception as e: + print(f"Parsing failed: {e}") +``` + +--- + ### Stop parsing documents ```python diff --git a/sdk/python/ragflow_sdk/modules/dataset.py b/sdk/python/ragflow_sdk/modules/dataset.py index b4367ac3b..d2d689da3 100644 --- a/sdk/python/ragflow_sdk/modules/dataset.py +++ b/sdk/python/ragflow_sdk/modules/dataset.py @@ -100,12 +100,51 @@ class DataSet(Base): res = res.json() if res.get("code") != 0: raise Exception(res["message"]) - + + + def _get_documents_status(self, document_ids): + import time + terminal_states = {"DONE", "FAIL", "CANCEL"} + interval_sec = 1 + pending = set(document_ids) + finished = [] + while pending: + for doc_id in list(pending): + def fetch_doc(doc_id: str) -> Document | None: + try: + docs = self.list_documents(id=doc_id) + return docs[0] if docs else None + except Exception: + return None + doc = fetch_doc(doc_id) + if doc is None: + continue + if isinstance(doc.run, str) and doc.run.upper() in terminal_states: + finished.append((doc_id, doc.run, doc.chunk_count, doc.token_count)) + pending.discard(doc_id) + elif float(doc.progress or 0.0) >= 1.0: + finished.append((doc_id, "DONE", doc.chunk_count, doc.token_count)) + pending.discard(doc_id) + if pending: + time.sleep(interval_sec) + return finished + def async_parse_documents(self, document_ids): res = self.post(f"/datasets/{self.id}/chunks", {"document_ids": document_ids}) res = res.json() if res.get("code") != 0: raise Exception(res.get("message")) + + + def parse_documents(self, document_ids): + try: + self.async_parse_documents(document_ids) + self._get_documents_status(document_ids) + except KeyboardInterrupt: + self.async_cancel_parse_documents(document_ids) + + return self._get_documents_status(document_ids) + def async_cancel_parse_documents(self, document_ids): res = self.rm(f"/datasets/{self.id}/chunks", {"document_ids": document_ids})