Refa: revise the implementation of LightRAG and enable response caching (#9828)

### What problem does this PR solve?

This revision performed a comprehensive check on LightRAG to ensure the
correctness of its implementation. It **did not involve** Entity
Resolution and Community Reports Generation. There is an example using
default entity types and the General chunking method, which shows good
results in both time and effectiveness. Moreover, response caching is
enabled for resuming failed tasks.


[The-Necklace.pdf](https://github.com/user-attachments/files/22042432/The-Necklace.pdf)

After:


![img_v3_02pk_177dbc6a-e7cc-4732-b202-ad4682d171fg](https://github.com/user-attachments/assets/5ef1d93a-9109-4fe9-8a7b-a65add16f82b)


```bash
Begin at:
Fri, 29 Aug 2025 16:48:03 GMT
Duration:
222.31 s
Progress:
16:48:04 Task has been received.
16:48:06 Page(1~7): Start to parse.
16:48:06 Page(1~7): OCR started
16:48:08 Page(1~7): OCR finished (1.89s)
16:48:11 Page(1~7): Layout analysis (3.72s)
16:48:11 Page(1~7): Table analysis (0.00s)
16:48:11 Page(1~7): Text merged (0.00s)
16:48:11 Page(1~7): Finish parsing.
16:48:12 Page(1~7): Generate 7 chunks
16:48:12 Page(1~7): Embedding chunks (0.29s)
16:48:12 Page(1~7): Indexing done (0.04s). Task done (7.84s)
16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: She had no dresses, no je...
16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: Her husband, already half...
16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: And this life lasted ten ...
16:48:17 Start processing for f421fb06849e11f0bdd32724b93a52b2: Then she asked, hesitatin...
16:49:30 Completed processing for f421fb06849e11f0bdd32724b93a52b2: She had no dresses, no je... after 1 gleanings, 21985 tokens.
16:49:30 Entities extraction of chunk 3 1/7 done, 12 nodes, 13 edges, 21985 tokens.
16:49:40 Completed processing for f421fb06849e11f0bdd32724b93a52b2: Finally, she replied, hes... after 1 gleanings, 22584 tokens.
16:49:40 Entities extraction of chunk 5 2/7 done, 19 nodes, 19 edges, 22584 tokens.
16:50:02 Completed processing for f421fb06849e11f0bdd32724b93a52b2: Then she asked, hesitatin... after 1 gleanings, 24610 tokens.
16:50:02 Entities extraction of chunk 0 3/7 done, 16 nodes, 28 edges, 24610 tokens.
16:50:03 Completed processing for f421fb06849e11f0bdd32724b93a52b2: And this life lasted ten ... after 1 gleanings, 24031 tokens.
16:50:04 Entities extraction of chunk 1 4/7 done, 24 nodes, 22 edges, 24031 tokens.
16:50:14 Completed processing for f421fb06849e11f0bdd32724b93a52b2: So they begged the jewell... after 1 gleanings, 24635 tokens.
16:50:14 Entities extraction of chunk 6 5/7 done, 27 nodes, 26 edges, 24635 tokens.
16:50:29 Completed processing for f421fb06849e11f0bdd32724b93a52b2: Her husband, already half... after 1 gleanings, 25758 tokens.
16:50:29 Entities extraction of chunk 2 6/7 done, 25 nodes, 35 edges, 25758 tokens.
16:51:35 Completed processing for f421fb06849e11f0bdd32724b93a52b2: The Necklace By Guy de Ma... after 1 gleanings, 27491 tokens.
16:51:35 Entities extraction of chunk 4 7/7 done, 39 nodes, 37 edges, 27491 tokens.
16:51:35 Entities and relationships extraction done, 147 nodes, 177 edges, 171094 tokens, 198.58s.
16:51:35 Entities merging done, 0.01s.
16:51:35 Relationships merging done, 0.01s.
16:51:35 ignored 7 relations due to missing entities.
16:51:35 generated subgraph for doc f421fb06849e11f0bdd32724b93a52b2 in 198.68 seconds.
16:51:35 run_graphrag f421fb06849e11f0bdd32724b93a52b2 graphrag_task_lock acquired
16:51:35 set_graph removed 0 nodes and 0 edges from index in 0.00s.
16:51:35 Get embedding of nodes: 9/147
16:51:35 Get embedding of nodes: 109/147
16:51:37 Get embedding of edges: 9/170
16:51:37 Get embedding of edges: 109/170
16:51:40 set_graph converted graph change to 319 chunks in 4.21s.
16:51:40 Insert chunks: 4/319
16:51:40 Insert chunks: 104/319
16:51:40 Insert chunks: 204/319
16:51:40 Insert chunks: 304/319
16:51:40 set_graph added/updated 147 nodes and 170 edges from index in 0.53s.
16:51:40 merging subgraph for doc f421fb06849e11f0bdd32724b93a52b2 into the global graph done in 4.79 seconds.
16:51:40 Knowledge Graph done (204.29s)
```

Before:


![img_v3_02pk_63370edf-ecee-4ee8-8ac8-69c8d2c712fg](https://github.com/user-attachments/assets/1162eb0f-68c2-4de5-abe0-cdfa168f71de)

```bash
Begin at:
Fri, 29 Aug 2025 17:00:47 GMT
processDuration:
173.38 s
Progress:
17:00:49 Task has been received.
17:00:51 Page(1~7): Start to parse.
17:00:51 Page(1~7): OCR started
17:00:53 Page(1~7): OCR finished (1.82s)
17:00:57 Page(1~7): Layout analysis (3.64s)
17:00:57 Page(1~7): Table analysis (0.00s)
17:00:57 Page(1~7): Text merged (0.00s)
17:00:57 Page(1~7): Finish parsing.
17:00:57 Page(1~7): Generate 7 chunks
17:00:57 Page(1~7): Embedding chunks (0.31s)
17:00:57 Page(1~7): Indexing done (0.03s). Task done (7.88s)
17:00:57 created task graphrag
17:01:00 Task has been received.
17:02:17 Entities extraction of chunk 1 1/7 done, 9 nodes, 9 edges, 10654 tokens.
17:02:31 Entities extraction of chunk 2 2/7 done, 12 nodes, 13 edges, 11066 tokens.
17:02:33 Entities extraction of chunk 4 3/7 done, 9 nodes, 10 edges, 10433 tokens.
17:02:42 Entities extraction of chunk 5 4/7 done, 11 nodes, 14 edges, 11290 tokens.
17:02:52 Entities extraction of chunk 6 5/7 done, 13 nodes, 15 edges, 11039 tokens.
17:02:55 Entities extraction of chunk 3 6/7 done, 14 nodes, 13 edges, 11466 tokens.
17:03:32 Entities extraction of chunk 0 7/7 done, 19 nodes, 18 edges, 13107 tokens.
17:03:32 Entities and relationships extraction done, 71 nodes, 89 edges, 79055 tokens, 149.66s.
17:03:32 Entities merging done, 0.01s.
17:03:32 Relationships merging done, 0.01s.
17:03:32 ignored 1 relations due to missing entities.
17:03:32 generated subgraph for doc b1d9d3b6848711f0aacd7ddc0714c4d3 in 149.69 seconds.
17:03:32 run_graphrag b1d9d3b6848711f0aacd7ddc0714c4d3 graphrag_task_lock acquired
17:03:32 set_graph removed 0 nodes and 0 edges from index in 0.00s.
17:03:32 Get embedding of nodes: 9/71
17:03:33 Get embedding of edges: 9/88
17:03:34 set_graph converted graph change to 161 chunks in 2.27s.
17:03:34 Insert chunks: 4/161
17:03:34 Insert chunks: 104/161
17:03:34 set_graph added/updated 71 nodes and 88 edges from index in 0.28s.
17:03:34 merging subgraph for doc b1d9d3b6848711f0aacd7ddc0714c4d3 into the global graph done in 2.60 seconds.
17:03:34 Knowledge Graph done (153.18s)

```

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring
- [x] Performance Improvement
This commit is contained in:
Yongteng Lei
2025-08-29 17:58:36 +08:00
committed by GitHub
parent 4fbad2828c
commit 56cd576876
9 changed files with 435 additions and 395 deletions

View File

@ -1,5 +1,5 @@
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,17 +14,28 @@
# limitations under the License.
#
import logging
import os
import re
from collections import defaultdict, Counter
from collections import Counter, defaultdict
from copy import deepcopy
from typing import Callable
import trio
import networkx as nx
import trio
from api.utils.api_utils import timeout
from graphrag.general.graph_prompt import SUMMARIZE_DESCRIPTIONS_PROMPT
from graphrag.utils import get_llm_cache, set_llm_cache, handle_single_entity_extraction, \
handle_single_relationship_extraction, split_string_by_multi_markers, flat_uniq_list, chat_limiter, get_from_to, GraphChange
from graphrag.utils import (
GraphChange,
chat_limiter,
flat_uniq_list,
get_from_to,
get_llm_cache,
handle_single_entity_extraction,
handle_single_relationship_extraction,
set_llm_cache,
split_string_by_multi_markers,
)
from rag.llm.chat_model import Base as CompletionLLM
from rag.prompts import message_fit_in
from rag.utils import truncate
@ -32,6 +43,7 @@ from rag.utils import truncate
GRAPH_FIELD_SEP = "<SEP>"
DEFAULT_ENTITY_TYPES = ["organization", "person", "geo", "event", "category"]
ENTITY_EXTRACTION_MAX_GLEANINGS = 2
MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK = int(os.environ.get("MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK", 10))
class Extractor:
@ -47,7 +59,7 @@ class Extractor:
self._language = language
self._entity_types = entity_types or DEFAULT_ENTITY_TYPES
@timeout(60*20)
@timeout(60 * 20)
def _chat(self, system, history, gen_conf={}):
hist = deepcopy(history)
conf = deepcopy(gen_conf)
@ -55,6 +67,7 @@ class Extractor:
if response:
return response
_, system_msg = message_fit_in([{"role": "system", "content": system}], int(self._llm.max_length * 0.92))
response = ""
for attempt in range(3):
try:
response = self._llm.chat(system_msg[0]["content"], hist, conf)
@ -74,38 +87,37 @@ class Extractor:
maybe_edges = defaultdict(list)
ent_types = [t.lower() for t in self._entity_types]
for record in records:
record_attributes = split_string_by_multi_markers(
record, [tuple_delimiter]
)
record_attributes = split_string_by_multi_markers(record, [tuple_delimiter])
if_entities = handle_single_entity_extraction(
record_attributes, chunk_key
)
if_entities = handle_single_entity_extraction(record_attributes, chunk_key)
if if_entities is not None and if_entities.get("entity_type", "unknown").lower() in ent_types:
maybe_nodes[if_entities["entity_name"]].append(if_entities)
continue
if_relation = handle_single_relationship_extraction(
record_attributes, chunk_key
)
if_relation = handle_single_relationship_extraction(record_attributes, chunk_key)
if if_relation is not None:
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(
if_relation
)
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(if_relation)
return dict(maybe_nodes), dict(maybe_edges)
async def __call__(
self, doc_id: str, chunks: list[str],
callback: Callable | None = None
):
async def __call__(self, doc_id: str, chunks: list[str], callback: Callable | None = None):
self.callback = callback
start_ts = trio.current_time()
out_results = []
async with trio.open_nursery() as nursery:
for i, ck in enumerate(chunks):
ck = truncate(ck, int(self._llm.max_length*0.8))
nursery.start_soon(self._process_single_content, (doc_id, ck), i, len(chunks), out_results)
async def extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK):
out_results = []
limiter = trio.Semaphore(max_concurrency)
async def worker(chunk_key_dp: tuple[str, str], idx: int, total: int):
async with limiter:
await self._process_single_content(chunk_key_dp, idx, total, out_results)
async with trio.open_nursery() as nursery:
for i, ck in enumerate(chunks):
nursery.start_soon(worker, (doc_id, ck), i, len(chunks))
return out_results
out_results = await extract_all(doc_id, chunks, max_concurrency=MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK)
maybe_nodes = defaultdict(list)
maybe_edges = defaultdict(list)
@ -118,7 +130,7 @@ class Extractor:
sum_token_count += token_count
now = trio.current_time()
if callback:
callback(msg = f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now-start_ts:.2f}s.")
callback(msg=f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now - start_ts:.2f}s.")
start_ts = now
logging.info("Entities merging...")
all_entities_data = []
@ -127,7 +139,7 @@ class Extractor:
nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data)
now = trio.current_time()
if callback:
callback(msg = f"Entities merging done, {now-start_ts:.2f}s.")
callback(msg=f"Entities merging done, {now - start_ts:.2f}s.")
start_ts = now
logging.info("Relationships merging...")
@ -137,12 +149,10 @@ class Extractor:
nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data)
now = trio.current_time()
if callback:
callback(msg = f"Relationships merging done, {now-start_ts:.2f}s.")
callback(msg=f"Relationships merging done, {now - start_ts:.2f}s.")
if not len(all_entities_data) and not len(all_relationships_data):
logging.warning(
"Didn't extract any entities and relationships, maybe your LLM is not working"
)
logging.warning("Didn't extract any entities and relationships, maybe your LLM is not working")
if not len(all_entities_data):
logging.warning("Didn't extract any entities")
@ -155,15 +165,11 @@ class Extractor:
if not entities:
return
entity_type = sorted(
Counter(
[dp["entity_type"] for dp in entities]
).items(),
Counter([dp["entity_type"] for dp in entities]).items(),
key=lambda x: x[1],
reverse=True,
)[0][0]
description = GRAPH_FIELD_SEP.join(
sorted(set([dp["description"] for dp in entities]))
)
description = GRAPH_FIELD_SEP.join(sorted(set([dp["description"] for dp in entities])))
already_source_ids = flat_uniq_list(entities, "source_id")
description = await self._handle_entity_relation_summary(entity_name, description)
node_data = dict(
@ -174,13 +180,7 @@ class Extractor:
node_data["entity_name"] = entity_name
all_relationships_data.append(node_data)
async def _merge_edges(
self,
src_id: str,
tgt_id: str,
edges_data: list[dict],
all_relationships_data=None
):
async def _merge_edges(self, src_id: str, tgt_id: str, edges_data: list[dict], all_relationships_data=None):
if not edges_data:
return
weight = sum([edge["weight"] for edge in edges_data])
@ -188,14 +188,7 @@ class Extractor:
description = await self._handle_entity_relation_summary(f"{src_id} -> {tgt_id}", description)
keywords = flat_uniq_list(edges_data, "keywords")
source_id = flat_uniq_list(edges_data, "source_id")
edge_data = dict(
src_id=src_id,
tgt_id=tgt_id,
description=description,
keywords=keywords,
weight=weight,
source_id=source_id
)
edge_data = dict(src_id=src_id, tgt_id=tgt_id, description=description, keywords=keywords, weight=weight, source_id=source_id)
all_relationships_data.append(edge_data)
async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: GraphChange):
@ -231,14 +224,10 @@ class Extractor:
node0_attrs["description"] = await self._handle_entity_relation_summary(nodes[0], node0_attrs["description"])
graph.nodes[nodes[0]].update(node0_attrs)
async def _handle_entity_relation_summary(
self,
entity_or_relation_name: str,
description: str
) -> str:
async def _handle_entity_relation_summary(self, entity_or_relation_name: str, description: str) -> str:
summary_max_tokens = 512
use_description = truncate(description, summary_max_tokens)
description_list=use_description.split(GRAPH_FIELD_SEP),
description_list = (use_description.split(GRAPH_FIELD_SEP),)
if len(description_list) <= 12:
return use_description
prompt_template = SUMMARIZE_DESCRIPTIONS_PROMPT
@ -250,5 +239,5 @@ class Extractor:
use_prompt = prompt_template.format(**context_base)
logging.info(f"Trigger summary: {entity_or_relation_name}")
async with chat_limiter:
summary = await trio.to_thread.run_sync(lambda: self._chat(use_prompt, [{"role": "user", "content": "Output: "}]))
summary = await trio.to_thread.run_sync(self._chat, "", [{"role": "user", "content": use_prompt}])
return summary

View File

@ -1,5 +1,5 @@
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -23,25 +23,24 @@ import trio
from api import settings
from api.utils import get_uuid
from api.utils.api_utils import timeout
from graphrag.light.graph_extractor import GraphExtractor as LightKGExt
from graphrag.general.graph_extractor import GraphExtractor as GeneralKGExt
from graphrag.general.community_reports_extractor import CommunityReportsExtractor
from graphrag.entity_resolution import EntityResolution
from graphrag.general.community_reports_extractor import CommunityReportsExtractor
from graphrag.general.extractor import Extractor
from graphrag.general.graph_extractor import GraphExtractor as GeneralKGExt
from graphrag.light.graph_extractor import GraphExtractor as LightKGExt
from graphrag.utils import (
graph_merge,
get_graph,
set_graph,
GraphChange,
chunk_id,
does_graph_contains,
get_graph,
graph_merge,
set_graph,
tidy_graph,
GraphChange,
)
from rag.nlp import rag_tokenizer, search
from rag.utils.redis_conn import RedisDistributedLock
async def run_graphrag(
row: dict,
language,
@ -51,20 +50,16 @@ async def run_graphrag(
embedding_model,
callback,
):
enable_timeout_assertion=os.environ.get("ENABLE_TIMEOUT_ASSERTION")
enable_timeout_assertion = os.environ.get("ENABLE_TIMEOUT_ASSERTION")
start = trio.current_time()
tenant_id, kb_id, doc_id = row["tenant_id"], str(row["kb_id"]), row["doc_id"]
chunks = []
for d in settings.retrievaler.chunk_list(
doc_id, tenant_id, [kb_id], fields=["content_with_weight", "doc_id"]
):
for d in settings.retrievaler.chunk_list(doc_id, tenant_id, [kb_id], fields=["content_with_weight", "doc_id"]):
chunks.append(d["content_with_weight"])
with trio.fail_after(max(120, len(chunks)*60*10) if enable_timeout_assertion else 10000000000):
with trio.fail_after(max(120, len(chunks) * 60 * 10) if enable_timeout_assertion else 10000000000):
subgraph = await generate_subgraph(
LightKGExt
if "method" not in row["kb_parser_config"].get("graphrag", {}) or row["kb_parser_config"]["graphrag"]["method"] != "general"
else GeneralKGExt,
LightKGExt if "method" not in row["kb_parser_config"].get("graphrag", {}) or row["kb_parser_config"]["graphrag"]["method"] != "general" else GeneralKGExt,
tenant_id,
kb_id,
doc_id,
@ -177,9 +172,7 @@ async def generate_subgraph(
subgraph.graph["source_id"] = [doc_id]
chunk = {
"content_with_weight": json.dumps(
nx.node_link_data(subgraph, edges="edges"), ensure_ascii=False
),
"content_with_weight": json.dumps(nx.node_link_data(subgraph, edges="edges"), ensure_ascii=False),
"knowledge_graph_kwd": "subgraph",
"kb_id": kb_id,
"source_id": [doc_id],
@ -187,22 +180,14 @@ async def generate_subgraph(
"removed_kwd": "N",
}
cid = chunk_id(chunk)
await trio.to_thread.run_sync(
lambda: settings.docStoreConn.delete(
{"knowledge_graph_kwd": "subgraph", "source_id": doc_id}, search.index_name(tenant_id), kb_id
)
)
await trio.to_thread.run_sync(
lambda: settings.docStoreConn.insert(
[{"id": cid, **chunk}], search.index_name(tenant_id), kb_id
)
)
await trio.to_thread.run_sync(settings.docStoreConn.delete, {"knowledge_graph_kwd": "subgraph", "source_id": doc_id}, search.index_name(tenant_id), kb_id)
await trio.to_thread.run_sync(settings.docStoreConn.insert, [{"id": cid, **chunk}], search.index_name(tenant_id), kb_id)
now = trio.current_time()
callback(msg=f"generated subgraph for doc {doc_id} in {now - start:.2f} seconds.")
return subgraph
@timeout(60*3)
@timeout(60 * 3)
async def merge_subgraph(
tenant_id: str,
kb_id: str,
@ -228,13 +213,11 @@ async def merge_subgraph(
await set_graph(tenant_id, kb_id, embedding_model, new_graph, change, callback)
now = trio.current_time()
callback(
msg=f"merging subgraph for doc {doc_id} into the global graph done in {now - start:.2f} seconds."
)
callback(msg=f"merging subgraph for doc {doc_id} into the global graph done in {now - start:.2f} seconds.")
return new_graph
@timeout(60*30, 1)
@timeout(60 * 30, 1)
async def resolve_entities(
graph,
subgraph_nodes: set[str],
@ -260,7 +243,7 @@ async def resolve_entities(
callback(msg=f"Graph resolution done in {now - start:.2f}s.")
@timeout(60*30, 1)
@timeout(60 * 30, 1)
async def extract_community(
graph,
tenant_id: str,
@ -280,9 +263,7 @@ async def extract_community(
doc_ids = graph.graph["source_id"]
now = trio.current_time()
callback(
msg=f"Graph extracted {len(cr.structured_output)} communities in {now - start:.2f}s."
)
callback(msg=f"Graph extracted {len(cr.structured_output)} communities in {now - start:.2f}s.")
start = now
chunks = []
for stru, rep in zip(community_structure, community_reports):
@ -295,9 +276,7 @@ async def extract_community(
"docnm_kwd": stru["title"],
"title_tks": rag_tokenizer.tokenize(stru["title"]),
"content_with_weight": json.dumps(obj, ensure_ascii=False),
"content_ltks": rag_tokenizer.tokenize(
obj["report"] + " " + obj["evidences"]
),
"content_ltks": rag_tokenizer.tokenize(obj["report"] + " " + obj["evidences"]),
"knowledge_graph_kwd": "community_report",
"weight_flt": stru["weight"],
"entities_kwd": stru["entities"],
@ -306,9 +285,7 @@ async def extract_community(
"source_id": list(doc_ids),
"available_int": 0,
}
chunk["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(
chunk["content_ltks"]
)
chunk["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(chunk["content_ltks"])
chunks.append(chunk)
await trio.to_thread.run_sync(
@ -320,13 +297,11 @@ async def extract_community(
)
es_bulk_size = 4
for b in range(0, len(chunks), es_bulk_size):
doc_store_result = await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(tenant_id), kb_id))
doc_store_result = await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(chunks[b : b + es_bulk_size], search.index_name(tenant_id), kb_id))
if doc_store_result:
error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
raise Exception(error_message)
now = trio.current_time()
callback(
msg=f"Graph indexed {len(cr.structured_output)} communities in {now - start:.2f}s."
)
callback(msg=f"Graph indexed {len(cr.structured_output)} communities in {now - start:.2f}s.")
return community_structure, community_reports