mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
### What problem does this PR solve? This revision performed a comprehensive check on LightRAG to ensure the correctness of its implementation. It **did not involve** Entity Resolution and Community Reports Generation. There is an example using default entity types and the General chunking method, which shows good results in both time and effectiveness. Moreover, response caching is enabled for resuming failed tasks. [The-Necklace.pdf](https://github.com/user-attachments/files/22042432/The-Necklace.pdf) After:  ```bash Begin at: Fri, 29 Aug 2025 16:48:03 GMT Duration: 222.31 s Progress: 16:48:04 Task has been received. 16:48:06 Page(1~7): Start to parse. 16:48:06 Page(1~7): OCR started 16:48:08 Page(1~7): OCR finished (1.89s) 16:48:11 Page(1~7): Layout analysis (3.72s) 16:48:11 Page(1~7): Table analysis (0.00s) 16:48:11 Page(1~7): Text merged (0.00s) 16:48:11 Page(1~7): Finish parsing. 16:48:12 Page(1~7): Generate 7 chunks 16:48:12 Page(1~7): Embedding chunks (0.29s) 16:48:12 Page(1~7): Indexing done (0.04s). Task done (7.84s) 16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: She had no dresses, no je... 16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: Her husband, already half... 16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: And this life lasted ten ... 16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: Then she asked, hesitatin... 16:49:30 Completed processing for f421fb06849e11f0bdd32724b93a52b2: She had no dresses, no je... after 1 gleanings, 21985 tokens. 16:49:30 Entities extraction of chunk 3 1/7 done, 12 nodes, 13 edges, 21985 tokens. 16:49:40 Completed processing for f421fb06849e11f0bdd32724b93a52b2: Finally, she replied, hes... after 1 gleanings, 22584 tokens. 16:49:40 Entities extraction of chunk 5 2/7 done, 19 nodes, 19 edges, 22584 tokens. 16:50:02 Completed processing for f421fb06849e11f0bdd32724b93a52b2: Then she asked, hesitatin... after 1 gleanings, 24610 tokens. 16:50:02 Entities extraction of chunk 0 3/7 done, 16 nodes, 28 edges, 24610 tokens. 16:50:03 Completed processing for f421fb06849e11f0bdd32724b93a52b2: And this life lasted ten ... after 1 gleanings, 24031 tokens. 16:50:04 Entities extraction of chunk 1 4/7 done, 24 nodes, 22 edges, 24031 tokens. 16:50:14 Completed processing for f421fb06849e11f0bdd32724b93a52b2: So they begged the jewell... after 1 gleanings, 24635 tokens. 16:50:14 Entities extraction of chunk 6 5/7 done, 27 nodes, 26 edges, 24635 tokens. 16:50:29 Completed processing for f421fb06849e11f0bdd32724b93a52b2: Her husband, already half... after 1 gleanings, 25758 tokens. 16:50:29 Entities extraction of chunk 2 6/7 done, 25 nodes, 35 edges, 25758 tokens. 16:51:35 Completed processing for f421fb06849e11f0bdd32724b93a52b2: The Necklace By Guy de Ma... after 1 gleanings, 27491 tokens. 16:51:35 Entities extraction of chunk 4 7/7 done, 39 nodes, 37 edges, 27491 tokens. 16:51:35 Entities and relationships extraction done, 147 nodes, 177 edges, 171094 tokens, 198.58s. 16:51:35 Entities merging done, 0.01s. 16:51:35 Relationships merging done, 0.01s. 16:51:35 ignored 7 relations due to missing entities. 16:51:35 generated subgraph for doc f421fb06849e11f0bdd32724b93a52b2 in 198.68 seconds. 16:51:35 run_graphrag f421fb06849e11f0bdd32724b93a52b2 graphrag_task_lock acquired 16:51:35 set_graph removed 0 nodes and 0 edges from index in 0.00s. 16:51:35 Get embedding of nodes: 9/147 16:51:35 Get embedding of nodes: 109/147 16:51:37 Get embedding of edges: 9/170 16:51:37 Get embedding of edges: 109/170 16:51:40 set_graph converted graph change to 319 chunks in 4.21s. 16:51:40 Insert chunks: 4/319 16:51:40 Insert chunks: 104/319 16:51:40 Insert chunks: 204/319 16:51:40 Insert chunks: 304/319 16:51:40 set_graph added/updated 147 nodes and 170 edges from index in 0.53s. 16:51:40 merging subgraph for doc f421fb06849e11f0bdd32724b93a52b2 into the global graph done in 4.79 seconds. 16:51:40 Knowledge Graph done (204.29s) ``` Before:  ```bash Begin at: Fri, 29 Aug 2025 17:00:47 GMT processDuration: 173.38 s Progress: 17:00:49 Task has been received. 17:00:51 Page(1~7): Start to parse. 17:00:51 Page(1~7): OCR started 17:00:53 Page(1~7): OCR finished (1.82s) 17:00:57 Page(1~7): Layout analysis (3.64s) 17:00:57 Page(1~7): Table analysis (0.00s) 17:00:57 Page(1~7): Text merged (0.00s) 17:00:57 Page(1~7): Finish parsing. 17:00:57 Page(1~7): Generate 7 chunks 17:00:57 Page(1~7): Embedding chunks (0.31s) 17:00:57 Page(1~7): Indexing done (0.03s). Task done (7.88s) 17:00:57 created task graphrag 17:01:00 Task has been received. 17:02:17 Entities extraction of chunk 1 1/7 done, 9 nodes, 9 edges, 10654 tokens. 17:02:31 Entities extraction of chunk 2 2/7 done, 12 nodes, 13 edges, 11066 tokens. 17:02:33 Entities extraction of chunk 4 3/7 done, 9 nodes, 10 edges, 10433 tokens. 17:02:42 Entities extraction of chunk 5 4/7 done, 11 nodes, 14 edges, 11290 tokens. 17:02:52 Entities extraction of chunk 6 5/7 done, 13 nodes, 15 edges, 11039 tokens. 17:02:55 Entities extraction of chunk 3 6/7 done, 14 nodes, 13 edges, 11466 tokens. 17:03:32 Entities extraction of chunk 0 7/7 done, 19 nodes, 18 edges, 13107 tokens. 17:03:32 Entities and relationships extraction done, 71 nodes, 89 edges, 79055 tokens, 149.66s. 17:03:32 Entities merging done, 0.01s. 17:03:32 Relationships merging done, 0.01s. 17:03:32 ignored 1 relations due to missing entities. 17:03:32 generated subgraph for doc b1d9d3b6848711f0aacd7ddc0714c4d3 in 149.69 seconds. 17:03:32 run_graphrag b1d9d3b6848711f0aacd7ddc0714c4d3 graphrag_task_lock acquired 17:03:32 set_graph removed 0 nodes and 0 edges from index in 0.00s. 17:03:32 Get embedding of nodes: 9/71 17:03:33 Get embedding of edges: 9/88 17:03:34 set_graph converted graph change to 161 chunks in 2.27s. 17:03:34 Insert chunks: 4/161 17:03:34 Insert chunks: 104/161 17:03:34 set_graph added/updated 71 nodes and 88 edges from index in 0.28s. 17:03:34 merging subgraph for doc b1d9d3b6848711f0aacd7ddc0714c4d3 into the global graph done in 2.60 seconds. 17:03:34 Knowledge Graph done (153.18s) ``` ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Refactoring - [x] Performance Improvement
244 lines
11 KiB
Python
244 lines
11 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
import logging
|
|
import os
|
|
import re
|
|
from collections import Counter, defaultdict
|
|
from copy import deepcopy
|
|
from typing import Callable
|
|
|
|
import networkx as nx
|
|
import trio
|
|
|
|
from api.utils.api_utils import timeout
|
|
from graphrag.general.graph_prompt import SUMMARIZE_DESCRIPTIONS_PROMPT
|
|
from graphrag.utils import (
|
|
GraphChange,
|
|
chat_limiter,
|
|
flat_uniq_list,
|
|
get_from_to,
|
|
get_llm_cache,
|
|
handle_single_entity_extraction,
|
|
handle_single_relationship_extraction,
|
|
set_llm_cache,
|
|
split_string_by_multi_markers,
|
|
)
|
|
from rag.llm.chat_model import Base as CompletionLLM
|
|
from rag.prompts import message_fit_in
|
|
from rag.utils import truncate
|
|
|
|
GRAPH_FIELD_SEP = "<SEP>"
|
|
DEFAULT_ENTITY_TYPES = ["organization", "person", "geo", "event", "category"]
|
|
ENTITY_EXTRACTION_MAX_GLEANINGS = 2
|
|
MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK = int(os.environ.get("MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK", 10))
|
|
|
|
|
|
class Extractor:
|
|
_llm: CompletionLLM
|
|
|
|
def __init__(
|
|
self,
|
|
llm_invoker: CompletionLLM,
|
|
language: str | None = "English",
|
|
entity_types: list[str] | None = None,
|
|
):
|
|
self._llm = llm_invoker
|
|
self._language = language
|
|
self._entity_types = entity_types or DEFAULT_ENTITY_TYPES
|
|
|
|
@timeout(60 * 20)
|
|
def _chat(self, system, history, gen_conf={}):
|
|
hist = deepcopy(history)
|
|
conf = deepcopy(gen_conf)
|
|
response = get_llm_cache(self._llm.llm_name, system, hist, conf)
|
|
if response:
|
|
return response
|
|
_, system_msg = message_fit_in([{"role": "system", "content": system}], int(self._llm.max_length * 0.92))
|
|
response = ""
|
|
for attempt in range(3):
|
|
try:
|
|
response = self._llm.chat(system_msg[0]["content"], hist, conf)
|
|
response = re.sub(r"^.*</think>", "", response, flags=re.DOTALL)
|
|
if response.find("**ERROR**") >= 0:
|
|
raise Exception(response)
|
|
set_llm_cache(self._llm.llm_name, system, response, history, gen_conf)
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
if attempt == 2:
|
|
raise
|
|
|
|
return response
|
|
|
|
def _entities_and_relations(self, chunk_key: str, records: list, tuple_delimiter: str):
|
|
maybe_nodes = defaultdict(list)
|
|
maybe_edges = defaultdict(list)
|
|
ent_types = [t.lower() for t in self._entity_types]
|
|
for record in records:
|
|
record_attributes = split_string_by_multi_markers(record, [tuple_delimiter])
|
|
|
|
if_entities = handle_single_entity_extraction(record_attributes, chunk_key)
|
|
if if_entities is not None and if_entities.get("entity_type", "unknown").lower() in ent_types:
|
|
maybe_nodes[if_entities["entity_name"]].append(if_entities)
|
|
continue
|
|
|
|
if_relation = handle_single_relationship_extraction(record_attributes, chunk_key)
|
|
if if_relation is not None:
|
|
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(if_relation)
|
|
return dict(maybe_nodes), dict(maybe_edges)
|
|
|
|
async def __call__(self, doc_id: str, chunks: list[str], callback: Callable | None = None):
|
|
self.callback = callback
|
|
start_ts = trio.current_time()
|
|
|
|
async def extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK):
|
|
out_results = []
|
|
limiter = trio.Semaphore(max_concurrency)
|
|
|
|
async def worker(chunk_key_dp: tuple[str, str], idx: int, total: int):
|
|
async with limiter:
|
|
await self._process_single_content(chunk_key_dp, idx, total, out_results)
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
for i, ck in enumerate(chunks):
|
|
nursery.start_soon(worker, (doc_id, ck), i, len(chunks))
|
|
|
|
return out_results
|
|
|
|
out_results = await extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK)
|
|
|
|
maybe_nodes = defaultdict(list)
|
|
maybe_edges = defaultdict(list)
|
|
sum_token_count = 0
|
|
for m_nodes, m_edges, token_count in out_results:
|
|
for k, v in m_nodes.items():
|
|
maybe_nodes[k].extend(v)
|
|
for k, v in m_edges.items():
|
|
maybe_edges[tuple(sorted(k))].extend(v)
|
|
sum_token_count += token_count
|
|
now = trio.current_time()
|
|
if callback:
|
|
callback(msg=f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now - start_ts:.2f}s.")
|
|
start_ts = now
|
|
logging.info("Entities merging...")
|
|
all_entities_data = []
|
|
async with trio.open_nursery() as nursery:
|
|
for en_nm, ents in maybe_nodes.items():
|
|
nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data)
|
|
now = trio.current_time()
|
|
if callback:
|
|
callback(msg=f"Entities merging done, {now - start_ts:.2f}s.")
|
|
|
|
start_ts = now
|
|
logging.info("Relationships merging...")
|
|
all_relationships_data = []
|
|
async with trio.open_nursery() as nursery:
|
|
for (src, tgt), rels in maybe_edges.items():
|
|
nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data)
|
|
now = trio.current_time()
|
|
if callback:
|
|
callback(msg=f"Relationships merging done, {now - start_ts:.2f}s.")
|
|
|
|
if not len(all_entities_data) and not len(all_relationships_data):
|
|
logging.warning("Didn't extract any entities and relationships, maybe your LLM is not working")
|
|
|
|
if not len(all_entities_data):
|
|
logging.warning("Didn't extract any entities")
|
|
if not len(all_relationships_data):
|
|
logging.warning("Didn't extract any relationships")
|
|
|
|
return all_entities_data, all_relationships_data
|
|
|
|
async def _merge_nodes(self, entity_name: str, entities: list[dict], all_relationships_data):
|
|
if not entities:
|
|
return
|
|
entity_type = sorted(
|
|
Counter([dp["entity_type"] for dp in entities]).items(),
|
|
key=lambda x: x[1],
|
|
reverse=True,
|
|
)[0][0]
|
|
description = GRAPH_FIELD_SEP.join(sorted(set([dp["description"] for dp in entities])))
|
|
already_source_ids = flat_uniq_list(entities, "source_id")
|
|
description = await self._handle_entity_relation_summary(entity_name, description)
|
|
node_data = dict(
|
|
entity_type=entity_type,
|
|
description=description,
|
|
source_id=already_source_ids,
|
|
)
|
|
node_data["entity_name"] = entity_name
|
|
all_relationships_data.append(node_data)
|
|
|
|
async def _merge_edges(self, src_id: str, tgt_id: str, edges_data: list[dict], all_relationships_data=None):
|
|
if not edges_data:
|
|
return
|
|
weight = sum([edge["weight"] for edge in edges_data])
|
|
description = GRAPH_FIELD_SEP.join(sorted(set([edge["description"] for edge in edges_data])))
|
|
description = await self._handle_entity_relation_summary(f"{src_id} -> {tgt_id}", description)
|
|
keywords = flat_uniq_list(edges_data, "keywords")
|
|
source_id = flat_uniq_list(edges_data, "source_id")
|
|
edge_data = dict(src_id=src_id, tgt_id=tgt_id, description=description, keywords=keywords, weight=weight, source_id=source_id)
|
|
all_relationships_data.append(edge_data)
|
|
|
|
async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: GraphChange):
|
|
if len(nodes) <= 1:
|
|
return
|
|
change.added_updated_nodes.add(nodes[0])
|
|
change.removed_nodes.update(nodes[1:])
|
|
nodes_set = set(nodes)
|
|
node0_attrs = graph.nodes[nodes[0]]
|
|
node0_neighbors = set(graph.neighbors(nodes[0]))
|
|
for node1 in nodes[1:]:
|
|
# Merge two nodes, keep "entity_name", "entity_type", "page_rank" unchanged.
|
|
node1_attrs = graph.nodes[node1]
|
|
node0_attrs["description"] += f"{GRAPH_FIELD_SEP}{node1_attrs['description']}"
|
|
node0_attrs["source_id"] = sorted(set(node0_attrs["source_id"] + node1_attrs["source_id"]))
|
|
for neighbor in graph.neighbors(node1):
|
|
change.removed_edges.add(get_from_to(node1, neighbor))
|
|
if neighbor not in nodes_set:
|
|
edge1_attrs = graph.get_edge_data(node1, neighbor)
|
|
if neighbor in node0_neighbors:
|
|
# Merge two edges
|
|
change.added_updated_edges.add(get_from_to(nodes[0], neighbor))
|
|
edge0_attrs = graph.get_edge_data(nodes[0], neighbor)
|
|
edge0_attrs["weight"] += edge1_attrs["weight"]
|
|
edge0_attrs["description"] += f"{GRAPH_FIELD_SEP}{edge1_attrs['description']}"
|
|
for attr in ["keywords", "source_id"]:
|
|
edge0_attrs[attr] = sorted(set(edge0_attrs[attr] + edge1_attrs[attr]))
|
|
edge0_attrs["description"] = await self._handle_entity_relation_summary(f"({nodes[0]}, {neighbor})", edge0_attrs["description"])
|
|
graph.add_edge(nodes[0], neighbor, **edge0_attrs)
|
|
else:
|
|
graph.add_edge(nodes[0], neighbor, **edge1_attrs)
|
|
graph.remove_node(node1)
|
|
node0_attrs["description"] = await self._handle_entity_relation_summary(nodes[0], node0_attrs["description"])
|
|
graph.nodes[nodes[0]].update(node0_attrs)
|
|
|
|
async def _handle_entity_relation_summary(self, entity_or_relation_name: str, description: str) -> str:
|
|
summary_max_tokens = 512
|
|
use_description = truncate(description, summary_max_tokens)
|
|
description_list = (use_description.split(GRAPH_FIELD_SEP),)
|
|
if len(description_list) <= 12:
|
|
return use_description
|
|
prompt_template = SUMMARIZE_DESCRIPTIONS_PROMPT
|
|
context_base = dict(
|
|
entity_name=entity_or_relation_name,
|
|
description_list=description_list,
|
|
language=self._language,
|
|
)
|
|
use_prompt = prompt_template.format(**context_base)
|
|
logging.info(f"Trigger summary: {entity_or_relation_name}")
|
|
async with chat_limiter:
|
|
summary = await trio.to_thread.run_sync(self._chat, "", [{"role": "user", "content": use_prompt}])
|
|
return summary
|