diff --git a/graphrag/general/extractor.py b/graphrag/general/extractor.py index c529380c5..23caccb8a 100644 --- a/graphrag/general/extractor.py +++ b/graphrag/general/extractor.py @@ -56,7 +56,8 @@ class Extractor: response = self._llm.chat(system_msg[0]["content"], hist, conf) response = re.sub(r".*", "", response, flags=re.DOTALL) if response.find("**ERROR**") >= 0: - raise Exception(response) + logging.warning(f"Extractor._chat got error. response: {response}") + return "" set_llm_cache(self._llm.llm_name, system, response, history, gen_conf) return response diff --git a/graphrag/general/graph_extractor.py b/graphrag/general/graph_extractor.py index 75e023698..375327bfa 100644 --- a/graphrag/general/graph_extractor.py +++ b/graphrag/general/graph_extractor.py @@ -94,7 +94,7 @@ class GraphExtractor(Extractor): self._tuple_delimiter_key: DEFAULT_TUPLE_DELIMITER, self._record_delimiter_key: DEFAULT_RECORD_DELIMITER, self._completion_delimiter_key: DEFAULT_COMPLETION_DELIMITER, - self._entity_types_key: entity_types, + self._entity_types_key: ",".join(entity_types), } async def _process_single_content(self, chunk_key_dp: tuple[str, str], chunk_seq: int, num_chunks: int, out_results): diff --git a/graphrag/general/index.py b/graphrag/general/index.py index 79b30a058..6335bdf3a 100644 --- a/graphrag/general/index.py +++ b/graphrag/general/index.py @@ -72,41 +72,51 @@ async def run_graphrag( if not subgraph: return - subgraph_nodes = set(subgraph.nodes()) - new_graph = await merge_subgraph( - tenant_id, - kb_id, - doc_id, - subgraph, - embedding_model, - callback, - ) - assert new_graph is not None + graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=3600) + while True: + if graphrag_task_lock.acquire(): + break + callback(msg=f"merge_subgraph {doc_id} is waiting graphrag_task_lock") + await trio.sleep(20) - if not with_resolution or not with_community: - return - - if with_resolution: - await resolve_entities( - new_graph, - subgraph_nodes, + try: + subgraph_nodes = set(subgraph.nodes()) + new_graph = await merge_subgraph( tenant_id, kb_id, doc_id, - chat_model, - embedding_model, - callback, - ) - if with_community: - await extract_community( - new_graph, - tenant_id, - kb_id, - doc_id, - chat_model, + subgraph, embedding_model, callback, ) + assert new_graph is not None + + if not with_resolution or not with_community: + return + + if with_resolution: + await resolve_entities( + new_graph, + subgraph_nodes, + tenant_id, + kb_id, + doc_id, + chat_model, + embedding_model, + callback, + ) + if with_community: + await extract_community( + new_graph, + tenant_id, + kb_id, + doc_id, + chat_model, + embedding_model, + callback, + ) + finally: + graphrag_task_lock.release() now = trio.current_time() callback(msg=f"GraphRAG for doc {doc_id} done in {now - start:.2f} seconds.") return @@ -191,13 +201,6 @@ async def merge_subgraph( embedding_model, callback, ): - graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=600) - while True: - if graphrag_task_lock.acquire(): - break - callback(msg=f"merge_subgraph {doc_id} is waiting graphrag_task_lock") - await trio.sleep(10) - start = trio.current_time() change = GraphChange() old_graph = await get_graph(tenant_id, kb_id) @@ -214,7 +217,6 @@ async def merge_subgraph( new_graph.nodes[node_name]["pagerank"] = pagerank await set_graph(tenant_id, kb_id, embedding_model, new_graph, change, callback) - graphrag_task_lock.release() now = trio.current_time() callback( msg=f"merging subgraph for doc {doc_id} into the global graph done in {now - start:.2f} seconds." @@ -232,13 +234,6 @@ async def resolve_entities( embed_bdl, callback, ): - graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=600) - while True: - if graphrag_task_lock.acquire(): - break - callback(msg=f"resolve_entities {doc_id} is waiting graphrag_task_lock") - await trio.sleep(10) - start = trio.current_time() er = EntityResolution( llm_bdl, @@ -250,7 +245,6 @@ async def resolve_entities( callback(msg="Graph resolution updated pagerank.") await set_graph(tenant_id, kb_id, embed_bdl, graph, change, callback) - graphrag_task_lock.release() now = trio.current_time() callback(msg=f"Graph resolution done in {now - start:.2f}s.") @@ -264,13 +258,6 @@ async def extract_community( embed_bdl, callback, ): - graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=600) - while True: - if graphrag_task_lock.acquire(): - break - callback(msg=f"extract_community {doc_id} is waiting graphrag_task_lock") - await trio.sleep(10) - start = trio.current_time() ext = CommunityReportsExtractor( llm_bdl, @@ -326,7 +313,6 @@ async def extract_community( error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!" raise Exception(error_message) - graphrag_task_lock.release() now = trio.current_time() callback( msg=f"Graph indexed {len(cr.structured_output)} communities in {now - start:.2f}s." diff --git a/rag/llm/chat_model.py b/rag/llm/chat_model.py index 480c5c6d4..8030c9455 100644 --- a/rag/llm/chat_model.py +++ b/rag/llm/chat_model.py @@ -82,7 +82,7 @@ class Base(ABC): return ERROR_CONNECTION elif "quota" in error_str or "capacity" in error_str or "credit" in error_str or "billing" in error_str or "limit" in error_str and "rate" not in error_str: return ERROR_QUOTA - elif "filter" in error_str or "content" in error_str or "policy" in error_str or "blocked" in error_str or "safety" in error_str: + elif "filter" in error_str or "content" in error_str or "policy" in error_str or "blocked" in error_str or "safety" in error_str or "inappropriate" in error_str: return ERROR_CONTENT_FILTER elif "model" in error_str or "not found" in error_str or "does not exist" in error_str or "not available" in error_str: return ERROR_MODEL @@ -110,6 +110,7 @@ class Base(ABC): ans += LENGTH_NOTIFICATION_EN return ans, self.total_token_count(response) except Exception as e: + logging.exception("chat_model.Base.chat got exception") # Classify the error error_code = self._classify_error(e) @@ -124,7 +125,7 @@ class Base(ABC): # For non-rate limit errors or the last attempt, return an error message if attempt == self.max_retries - 1: error_code = ERROR_MAX_RETRIES - return f"{ERROR_PREFIX}: {error_code} - {str(e)}", 0 + return f"{ERROR_PREFIX}: {error_code} - {str(e)}. response: {response}", 0 def chat_streamly(self, system, history, gen_conf): if system: diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index 0f3799cbc..bbc22eca0 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -318,4 +318,4 @@ class RedisDistributedLock: return self.lock.acquire(token=self.lock_value) def release(self): - return self.lock.release() + REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)