mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Perf: add limit to embedding druing KG. (#8881)
### What problem does this PR solve? ### Type of change - [x] Performance Improvement
This commit is contained in:
@ -304,7 +304,7 @@ def chunk_id(chunk):
|
|||||||
return xxhash.xxh64((chunk["content_with_weight"] + chunk["kb_id"]).encode("utf-8")).hexdigest()
|
return xxhash.xxh64((chunk["content_with_weight"] + chunk["kb_id"]).encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
@timeout(1, 3)
|
@timeout(3, 3)
|
||||||
async def graph_node_to_chunk(kb_id, embd_mdl, ent_name, meta, chunks):
|
async def graph_node_to_chunk(kb_id, embd_mdl, ent_name, meta, chunks):
|
||||||
chunk = {
|
chunk = {
|
||||||
"id": get_uuid(),
|
"id": get_uuid(),
|
||||||
@ -330,6 +330,7 @@ async def graph_node_to_chunk(kb_id, embd_mdl, ent_name, meta, chunks):
|
|||||||
chunks.append(chunk)
|
chunks.append(chunk)
|
||||||
|
|
||||||
|
|
||||||
|
@timeout(3, 3)
|
||||||
def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1):
|
def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1):
|
||||||
ents = from_ent_name
|
ents = from_ent_name
|
||||||
if isinstance(ents, str):
|
if isinstance(ents, str):
|
||||||
@ -357,7 +358,7 @@ def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1):
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
@timeout(1, 3)
|
@timeout(3, 3)
|
||||||
async def graph_edge_to_chunk(kb_id, embd_mdl, from_ent_name, to_ent_name, meta, chunks):
|
async def graph_edge_to_chunk(kb_id, embd_mdl, from_ent_name, to_ent_name, meta, chunks):
|
||||||
chunk = {
|
chunk = {
|
||||||
"id": get_uuid(),
|
"id": get_uuid(),
|
||||||
@ -480,17 +481,20 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang
|
|||||||
"available_int": 0,
|
"available_int": 0,
|
||||||
"removed_kwd": "N"
|
"removed_kwd": "N"
|
||||||
})
|
})
|
||||||
|
|
||||||
|
semaphore = trio.Semaphore(5)
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
for node in change.added_updated_nodes:
|
for node in change.added_updated_nodes:
|
||||||
node_attrs = graph.nodes[node]
|
node_attrs = graph.nodes[node]
|
||||||
nursery.start_soon(graph_node_to_chunk, kb_id, embd_mdl, node, node_attrs, chunks)
|
async with semaphore:
|
||||||
|
nursery.start_soon(graph_node_to_chunk, kb_id, embd_mdl, node, node_attrs, chunks)
|
||||||
for from_node, to_node in change.added_updated_edges:
|
for from_node, to_node in change.added_updated_edges:
|
||||||
edge_attrs = graph.get_edge_data(from_node, to_node)
|
edge_attrs = graph.get_edge_data(from_node, to_node)
|
||||||
if not edge_attrs:
|
if not edge_attrs:
|
||||||
# added_updated_edges could record a non-existing edge if both from_node and to_node participate in nodes merging.
|
# added_updated_edges could record a non-existing edge if both from_node and to_node participate in nodes merging.
|
||||||
continue
|
continue
|
||||||
nursery.start_soon(graph_edge_to_chunk, kb_id, embd_mdl, from_node, to_node, edge_attrs, chunks)
|
async with semaphore:
|
||||||
|
nursery.start_soon(graph_edge_to_chunk, kb_id, embd_mdl, from_node, to_node, edge_attrs, chunks)
|
||||||
now = trio.current_time()
|
now = trio.current_time()
|
||||||
if callback:
|
if callback:
|
||||||
callback(msg=f"set_graph converted graph change to {len(chunks)} chunks in {now - start:.2f}s.")
|
callback(msg=f"set_graph converted graph change to {len(chunks)} chunks in {now - start:.2f}s.")
|
||||||
|
|||||||
Reference in New Issue
Block a user