From acc0f7396ed6b725d3da64f82da520229a33cf2e Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Wed, 22 Oct 2025 09:29:04 +0800 Subject: [PATCH] Feat: add fault-tolerant mechanism to GraphRAG (#10708) ### What problem does this PR solve? Add fault-tolerant mechanism to GraphRAG. #10406. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- graphrag/general/extractor.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/graphrag/general/extractor.py b/graphrag/general/extractor.py index a5c5e5b74..28b591d55 100644 --- a/graphrag/general/extractor.py +++ b/graphrag/general/extractor.py @@ -105,16 +105,36 @@ class Extractor: async def extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK): out_results = [] + error_count = 0 + max_errors = 3 + limiter = trio.Semaphore(max_concurrency) async def worker(chunk_key_dp: tuple[str, str], idx: int, total: int): + nonlocal error_count async with limiter: - await self._process_single_content(chunk_key_dp, idx, total, out_results) + try: + await self._process_single_content(chunk_key_dp, idx, total, out_results) + except Exception as e: + error_count += 1 + error_msg = f"Error processing chunk {idx+1}/{total}: {str(e)}" + logging.warning(error_msg) + if self.callback: + self.callback(msg=error_msg) + + if error_count > max_errors: + raise Exception(f"Maximum error count ({max_errors}) reached. Last errors: {str(e)}") async with trio.open_nursery() as nursery: for i, ck in enumerate(chunks): nursery.start_soon(worker, (doc_id, ck), i, len(chunks)) + if error_count > 0: + warning_msg = f"Completed with {error_count} errors (out of {len(chunks)} chunks processed)" + logging.warning(warning_msg) + if self.callback: + self.callback(msg=warning_msg) + return out_results out_results = await extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK) @@ -129,8 +149,8 @@ class Extractor: maybe_edges[tuple(sorted(k))].extend(v) sum_token_count += token_count now = trio.current_time() - if callback: - callback(msg=f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now - start_ts:.2f}s.") + if self.callback: + self.callback(msg=f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now - start_ts:.2f}s.") start_ts = now logging.info("Entities merging...") all_entities_data = [] @@ -138,8 +158,8 @@ class Extractor: for en_nm, ents in maybe_nodes.items(): nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data) now = trio.current_time() - if callback: - callback(msg=f"Entities merging done, {now - start_ts:.2f}s.") + if self.callback: + self.callback(msg=f"Entities merging done, {now - start_ts:.2f}s.") start_ts = now logging.info("Relationships merging...") @@ -148,8 +168,8 @@ class Extractor: for (src, tgt), rels in maybe_edges.items(): nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data) now = trio.current_time() - if callback: - callback(msg=f"Relationships merging done, {now - start_ts:.2f}s.") + if self.callback: + self.callback(msg=f"Relationships merging done, {now - start_ts:.2f}s.") if not len(all_entities_data) and not len(all_relationships_data): logging.warning("Didn't extract any entities and relationships, maybe your LLM is not working")