diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index e3a735488..7237a3ce0 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -676,12 +676,14 @@ async def is_strong_enough(chat_model, embedding_model): @timeout(30, 2) async def _is_strong_enough(): nonlocal chat_model, embedding_model - _ = await trio.to_thread.run_sync(lambda: embedding_model.encode(["Are you strong enough!?"])) - res = await trio.to_thread.run_sync(lambda: chat_model.chat("Nothing special.", [{"role":"user", "content": "Are you strong enough!?"}], {})) + with trio.fail_after(3): + _ = await trio.to_thread.run_sync(lambda: embedding_model.encode(["Are you strong enough!?"])) + with trio.fail_after(30): + res = await trio.to_thread.run_sync(lambda: chat_model.chat("Nothing special.", [{"role":"user", "content": "Are you strong enough!?"}], {})) if res.find("**ERROR**") >= 0: raise Exception(res) # Pressure test for GraphRAG task async with trio.open_nursery() as nursery: - for _ in range(12): + for _ in range(32): nursery.start_soon(_is_strong_enough) \ No newline at end of file diff --git a/graphrag/entity_resolution.py b/graphrag/entity_resolution.py index 8d26335ca..913956ff7 100644 --- a/graphrag/entity_resolution.py +++ b/graphrag/entity_resolution.py @@ -237,7 +237,10 @@ class EntityResolution(Extractor): return True return False - if len(set(a) & set(b)) > 1: - return True + a, b = set(a), set(b) + max_l = max(len(a), len(b)) + if max_l < 4: + return len(a & b) > 1 + + return len(a & b)*1./max_l >= 0.8 - return False diff --git a/graphrag/utils.py b/graphrag/utils.py index bad9af70f..4d2e79858 100644 --- a/graphrag/utils.py +++ b/graphrag/utils.py @@ -22,6 +22,7 @@ import numpy as np import xxhash from networkx.readwrite import json_graph import dataclasses + from api.utils.api_utils import timeout from api import settings from api.utils import get_uuid @@ -304,8 +305,8 @@ def chunk_id(chunk): return xxhash.xxh64((chunk["content_with_weight"] + chunk["kb_id"]).encode("utf-8")).hexdigest() -@timeout(3, 3) async def graph_node_to_chunk(kb_id, embd_mdl, ent_name, meta, chunks): + global chat_limiter chunk = { "id": get_uuid(), "important_kwd": [ent_name], @@ -322,7 +323,9 @@ async def graph_node_to_chunk(kb_id, embd_mdl, ent_name, meta, chunks): chunk["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(chunk["content_ltks"]) ebd = get_embed_cache(embd_mdl.llm_name, ent_name) if ebd is None: - ebd, _ = await trio.to_thread.run_sync(lambda: embd_mdl.encode([ent_name])) + async with chat_limiter: + with trio.fail_after(3): + ebd, _ = await trio.to_thread.run_sync(lambda: embd_mdl.encode([ent_name])) ebd = ebd[0] set_embed_cache(embd_mdl.llm_name, ent_name, ebd) assert ebd is not None @@ -358,7 +361,6 @@ def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1): return res -@timeout(3, 3) async def graph_edge_to_chunk(kb_id, embd_mdl, from_ent_name, to_ent_name, meta, chunks): chunk = { "id": get_uuid(), @@ -377,7 +379,9 @@ async def graph_edge_to_chunk(kb_id, embd_mdl, from_ent_name, to_ent_name, meta, txt = f"{from_ent_name}->{to_ent_name}" ebd = get_embed_cache(embd_mdl.llm_name, txt) if ebd is None: - ebd, _ = await trio.to_thread.run_sync(lambda: embd_mdl.encode([txt+f": {meta['description']}"])) + async with chat_limiter: + with trio.fail_after(3): + ebd, _ = await trio.to_thread.run_sync(lambda: embd_mdl.encode([txt+f": {meta['description']}"])) ebd = ebd[0] set_embed_cache(embd_mdl.llm_name, txt, ebd) assert ebd is not None @@ -440,6 +444,7 @@ async def get_graph(tenant_id, kb_id, exclude_rebuild=None): async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, change: GraphChange, callback): + global chat_limiter start = trio.current_time() await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph"]}, search.index_name(tenant_id), kb_id)) @@ -447,10 +452,15 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang if change.removed_nodes: await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["entity"], "entity_kwd": sorted(change.removed_nodes)}, search.index_name(tenant_id), kb_id)) + if change.removed_edges: + async def del_edges(from_node, to_node): + async with chat_limiter: + await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["relation"], "from_entity_kwd": from_node, "to_entity_kwd": to_node}, search.index_name(tenant_id), kb_id)) async with trio.open_nursery() as nursery: for from_node, to_node in change.removed_edges: - nursery.start_soon(lambda from_node=from_node, to_node=to_node: trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["relation"], "from_entity_kwd": from_node, "to_entity_kwd": to_node}, search.index_name(tenant_id), kb_id))) + nursery.start_soon(del_edges, from_node, to_node) + now = trio.current_time() if callback: callback(msg=f"set_graph removed {len(change.removed_nodes)} nodes and {len(change.removed_edges)} edges from index in {now - start:.2f}s.") @@ -482,23 +492,23 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang "removed_kwd": "N" }) - semaphore = trio.Semaphore(5) async with trio.open_nursery() as nursery: for ii, node in enumerate(change.added_updated_nodes): node_attrs = graph.nodes[node] - async with semaphore: - if ii%100 == 9 and callback: - callback(msg=f"Get embedding of nodes: {ii}/{len(change.added_updated_nodes)}") - nursery.start_soon(graph_node_to_chunk, kb_id, embd_mdl, node, node_attrs, chunks) + nursery.start_soon(graph_node_to_chunk, kb_id, embd_mdl, node, node_attrs, chunks) + if ii%100 == 9 and callback: + callback(msg=f"Get embedding of nodes: {ii}/{len(change.added_updated_nodes)}") + + async with trio.open_nursery() as nursery: for ii, (from_node, to_node) in enumerate(change.added_updated_edges): edge_attrs = graph.get_edge_data(from_node, to_node) if not edge_attrs: # added_updated_edges could record a non-existing edge if both from_node and to_node participate in nodes merging. continue - async with semaphore: - if ii%100 == 9 and callback: - callback(msg=f"Get embedding of edges: {ii}/{len(change.added_updated_edges)}") - nursery.start_soon(graph_edge_to_chunk, kb_id, embd_mdl, from_node, to_node, edge_attrs, chunks) + nursery.start_soon(graph_edge_to_chunk, kb_id, embd_mdl, from_node, to_node, edge_attrs, chunks) + if ii%100 == 9 and callback: + callback(msg=f"Get embedding of edges: {ii}/{len(change.added_updated_edges)}") + now = trio.current_time() if callback: callback(msg=f"set_graph converted graph change to {len(chunks)} chunks in {now - start:.2f}s.") @@ -506,10 +516,10 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang es_bulk_size = 4 for b in range(0, len(chunks), es_bulk_size): - async with semaphore: - if b % 100 == es_bulk_size and callback: - callback(msg=f"Insert chunks: {b}/{len(chunks)}") - doc_store_result = await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(tenant_id), kb_id)) + with trio.fail_after(3): + doc_store_result = await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(tenant_id), kb_id)) + if b % 100 == es_bulk_size and callback: + callback(msg=f"Insert chunks: {b}/{len(chunks)}") if doc_store_result: error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!" raise Exception(error_message)