mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Feat: add fault-tolerant mechanism to GraphRAG (#10708)
### What problem does this PR solve? Add fault-tolerant mechanism to GraphRAG. #10406. ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@ -105,16 +105,36 @@ class Extractor:
|
|||||||
|
|
||||||
async def extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK):
|
async def extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK):
|
||||||
out_results = []
|
out_results = []
|
||||||
|
error_count = 0
|
||||||
|
max_errors = 3
|
||||||
|
|
||||||
limiter = trio.Semaphore(max_concurrency)
|
limiter = trio.Semaphore(max_concurrency)
|
||||||
|
|
||||||
async def worker(chunk_key_dp: tuple[str, str], idx: int, total: int):
|
async def worker(chunk_key_dp: tuple[str, str], idx: int, total: int):
|
||||||
|
nonlocal error_count
|
||||||
async with limiter:
|
async with limiter:
|
||||||
await self._process_single_content(chunk_key_dp, idx, total, out_results)
|
try:
|
||||||
|
await self._process_single_content(chunk_key_dp, idx, total, out_results)
|
||||||
|
except Exception as e:
|
||||||
|
error_count += 1
|
||||||
|
error_msg = f"Error processing chunk {idx+1}/{total}: {str(e)}"
|
||||||
|
logging.warning(error_msg)
|
||||||
|
if self.callback:
|
||||||
|
self.callback(msg=error_msg)
|
||||||
|
|
||||||
|
if error_count > max_errors:
|
||||||
|
raise Exception(f"Maximum error count ({max_errors}) reached. Last errors: {str(e)}")
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
for i, ck in enumerate(chunks):
|
for i, ck in enumerate(chunks):
|
||||||
nursery.start_soon(worker, (doc_id, ck), i, len(chunks))
|
nursery.start_soon(worker, (doc_id, ck), i, len(chunks))
|
||||||
|
|
||||||
|
if error_count > 0:
|
||||||
|
warning_msg = f"Completed with {error_count} errors (out of {len(chunks)} chunks processed)"
|
||||||
|
logging.warning(warning_msg)
|
||||||
|
if self.callback:
|
||||||
|
self.callback(msg=warning_msg)
|
||||||
|
|
||||||
return out_results
|
return out_results
|
||||||
|
|
||||||
out_results = await extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK)
|
out_results = await extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK)
|
||||||
@ -129,8 +149,8 @@ class Extractor:
|
|||||||
maybe_edges[tuple(sorted(k))].extend(v)
|
maybe_edges[tuple(sorted(k))].extend(v)
|
||||||
sum_token_count += token_count
|
sum_token_count += token_count
|
||||||
now = trio.current_time()
|
now = trio.current_time()
|
||||||
if callback:
|
if self.callback:
|
||||||
callback(msg=f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now - start_ts:.2f}s.")
|
self.callback(msg=f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now - start_ts:.2f}s.")
|
||||||
start_ts = now
|
start_ts = now
|
||||||
logging.info("Entities merging...")
|
logging.info("Entities merging...")
|
||||||
all_entities_data = []
|
all_entities_data = []
|
||||||
@ -138,8 +158,8 @@ class Extractor:
|
|||||||
for en_nm, ents in maybe_nodes.items():
|
for en_nm, ents in maybe_nodes.items():
|
||||||
nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data)
|
nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data)
|
||||||
now = trio.current_time()
|
now = trio.current_time()
|
||||||
if callback:
|
if self.callback:
|
||||||
callback(msg=f"Entities merging done, {now - start_ts:.2f}s.")
|
self.callback(msg=f"Entities merging done, {now - start_ts:.2f}s.")
|
||||||
|
|
||||||
start_ts = now
|
start_ts = now
|
||||||
logging.info("Relationships merging...")
|
logging.info("Relationships merging...")
|
||||||
@ -148,8 +168,8 @@ class Extractor:
|
|||||||
for (src, tgt), rels in maybe_edges.items():
|
for (src, tgt), rels in maybe_edges.items():
|
||||||
nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data)
|
nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data)
|
||||||
now = trio.current_time()
|
now = trio.current_time()
|
||||||
if callback:
|
if self.callback:
|
||||||
callback(msg=f"Relationships merging done, {now - start_ts:.2f}s.")
|
self.callback(msg=f"Relationships merging done, {now - start_ts:.2f}s.")
|
||||||
|
|
||||||
if not len(all_entities_data) and not len(all_relationships_data):
|
if not len(all_entities_data) and not len(all_relationships_data):
|
||||||
logging.warning("Didn't extract any entities and relationships, maybe your LLM is not working")
|
logging.warning("Didn't extract any entities and relationships, maybe your LLM is not working")
|
||||||
|
|||||||
Reference in New Issue
Block a user