mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-24 15:36:50 +08:00
Feat: enhance webhook response to include status and success fields and simplify ReAct agent (#12091)
### What problem does this PR solve? change: enhance webhook response to include status and success fields and simplify ReAct agent ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@ -540,6 +540,8 @@ class Canvas(Graph):
|
||||
cite = re.search(r"\[ID:[ 0-9]+\]", cpn_obj.output("content"))
|
||||
|
||||
message_end = {}
|
||||
if cpn_obj.get_param("status"):
|
||||
message_end["status"] = cpn_obj.get_param("status")
|
||||
if isinstance(cpn_obj.output("attachment"), dict):
|
||||
message_end["attachment"] = cpn_obj.output("attachment")
|
||||
if cite:
|
||||
|
||||
@ -29,8 +29,8 @@ from api.db.services.llm_service import LLMBundle
|
||||
from api.db.services.tenant_llm_service import TenantLLMService
|
||||
from api.db.services.mcp_server_service import MCPServerService
|
||||
from common.connection_utils import timeout
|
||||
from rag.prompts.generator import next_step_async, COMPLETE_TASK, analyze_task_async, \
|
||||
citation_prompt, reflect_async, kb_prompt, citation_plus, full_question, message_fit_in, structured_output_prompt
|
||||
from rag.prompts.generator import next_step_async, COMPLETE_TASK, \
|
||||
citation_prompt, kb_prompt, citation_plus, full_question, message_fit_in, structured_output_prompt
|
||||
from common.mcp_tool_call_conn import MCPToolCallSession, mcp_tool_metadata_to_openai_tool
|
||||
from agent.component.llm import LLMParam, LLM
|
||||
|
||||
@ -246,7 +246,7 @@ class Agent(LLM, ToolBase):
|
||||
_, msg = message_fit_in([{"role": "system", "content": prompt}, *msg], int(self.chat_mdl.max_length * 0.97))
|
||||
answer_without_toolcall = ""
|
||||
use_tools = []
|
||||
async for delta_ans, _ in self._react_with_tools_streamly_async(prompt, msg, use_tools, user_defined_prompt):
|
||||
async for delta_ans, _ in self._react_with_tools_streamly_async_simple(prompt, msg, use_tools, user_defined_prompt):
|
||||
if self.check_if_canceled("Agent streaming"):
|
||||
return
|
||||
|
||||
@ -264,7 +264,7 @@ class Agent(LLM, ToolBase):
|
||||
if use_tools:
|
||||
self.set_output("use_tools", use_tools)
|
||||
|
||||
async def _react_with_tools_streamly_async(self, prompt, history: list[dict], use_tools, user_defined_prompt={}, schema_prompt: str = ""):
|
||||
async def _react_with_tools_streamly_async_simple(self, prompt, history: list[dict], use_tools, user_defined_prompt={}, schema_prompt: str = ""):
|
||||
token_count = 0
|
||||
tool_metas = self.tool_meta
|
||||
hist = deepcopy(history)
|
||||
@ -276,6 +276,28 @@ class Agent(LLM, ToolBase):
|
||||
else:
|
||||
user_request = history[-1]["content"]
|
||||
|
||||
def build_task_desc(prompt: str, user_request: str, tool_metas: list[dict], user_defined_prompt: dict | None = None) -> str:
|
||||
"""Build a minimal task_desc by concatenating prompt, query, and tool schemas."""
|
||||
user_defined_prompt = user_defined_prompt or {}
|
||||
|
||||
tools_json = json.dumps(tool_metas, ensure_ascii=False, indent=2)
|
||||
|
||||
task_desc = (
|
||||
"### Agent Prompt\n"
|
||||
f"{prompt}\n\n"
|
||||
"### User Request\n"
|
||||
f"{user_request}\n\n"
|
||||
"### Tools (schemas)\n"
|
||||
f"{tools_json}\n"
|
||||
)
|
||||
|
||||
if user_defined_prompt:
|
||||
udp_json = json.dumps(user_defined_prompt, ensure_ascii=False, indent=2)
|
||||
task_desc += "\n### User Defined Prompts\n" + udp_json + "\n"
|
||||
|
||||
return task_desc
|
||||
|
||||
|
||||
async def use_tool_async(name, args):
|
||||
nonlocal hist, use_tools, last_calling
|
||||
logging.info(f"{last_calling=} == {name=}")
|
||||
@ -286,9 +308,6 @@ class Agent(LLM, ToolBase):
|
||||
"arguments": args,
|
||||
"results": tool_response
|
||||
})
|
||||
# self.callback("add_memory", {}, "...")
|
||||
#self.add_memory(hist[-2]["content"], hist[-1]["content"], name, args, str(tool_response), user_defined_prompt)
|
||||
|
||||
return name, tool_response
|
||||
|
||||
async def complete():
|
||||
@ -326,6 +345,21 @@ class Agent(LLM, ToolBase):
|
||||
|
||||
self.callback("gen_citations", {}, txt, elapsed_time=timer()-st)
|
||||
|
||||
def build_observation(tool_call_res: list[tuple]) -> str:
|
||||
"""
|
||||
Build a Observation from tool call results.
|
||||
No LLM involved.
|
||||
"""
|
||||
if not tool_call_res:
|
||||
return ""
|
||||
|
||||
lines = ["Observation:"]
|
||||
for name, result in tool_call_res:
|
||||
lines.append(f"[{name} result]")
|
||||
lines.append(str(result))
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def append_user_content(hist, content):
|
||||
if hist[-1]["role"] == "user":
|
||||
hist[-1]["content"] += content
|
||||
@ -333,7 +367,7 @@ class Agent(LLM, ToolBase):
|
||||
hist.append({"role": "user", "content": content})
|
||||
|
||||
st = timer()
|
||||
task_desc = await analyze_task_async(self.chat_mdl, prompt, user_request, tool_metas, user_defined_prompt)
|
||||
task_desc = build_task_desc(prompt, user_request, tool_metas, user_defined_prompt)
|
||||
self.callback("analyze_task", {}, task_desc, elapsed_time=timer()-st)
|
||||
for _ in range(self._param.max_rounds + 1):
|
||||
if self.check_if_canceled("Agent streaming"):
|
||||
@ -364,7 +398,7 @@ class Agent(LLM, ToolBase):
|
||||
|
||||
results = await asyncio.gather(*tool_tasks) if tool_tasks else []
|
||||
st = timer()
|
||||
reflection = await reflect_async(self.chat_mdl, hist, results, user_defined_prompt)
|
||||
reflection = build_observation(results)
|
||||
append_user_content(hist, reflection)
|
||||
self.callback("reflection", {}, str(reflection), elapsed_time=timer()-st)
|
||||
|
||||
@ -393,6 +427,135 @@ Respond immediately with your final comprehensive answer.
|
||||
async for txt, tkcnt in complete():
|
||||
yield txt, tkcnt
|
||||
|
||||
# async def _react_with_tools_streamly_async(self, prompt, history: list[dict], use_tools, user_defined_prompt={}, schema_prompt: str = ""):
|
||||
# token_count = 0
|
||||
# tool_metas = self.tool_meta
|
||||
# hist = deepcopy(history)
|
||||
# last_calling = ""
|
||||
# if len(hist) > 3:
|
||||
# st = timer()
|
||||
# user_request = await full_question(messages=history, chat_mdl=self.chat_mdl)
|
||||
# self.callback("Multi-turn conversation optimization", {}, user_request, elapsed_time=timer()-st)
|
||||
# else:
|
||||
# user_request = history[-1]["content"]
|
||||
|
||||
# async def use_tool_async(name, args):
|
||||
# nonlocal hist, use_tools, last_calling
|
||||
# logging.info(f"{last_calling=} == {name=}")
|
||||
# last_calling = name
|
||||
# tool_response = await self.toolcall_session.tool_call_async(name, args)
|
||||
# use_tools.append({
|
||||
# "name": name,
|
||||
# "arguments": args,
|
||||
# "results": tool_response
|
||||
# })
|
||||
# # self.callback("add_memory", {}, "...")
|
||||
# #self.add_memory(hist[-2]["content"], hist[-1]["content"], name, args, str(tool_response), user_defined_prompt)
|
||||
|
||||
# return name, tool_response
|
||||
|
||||
# async def complete():
|
||||
# nonlocal hist
|
||||
# need2cite = self._param.cite and self._canvas.get_reference()["chunks"] and self._id.find("-->") < 0
|
||||
# if schema_prompt:
|
||||
# need2cite = False
|
||||
# cited = False
|
||||
# if hist and hist[0]["role"] == "system":
|
||||
# if schema_prompt:
|
||||
# hist[0]["content"] += "\n" + schema_prompt
|
||||
# if need2cite and len(hist) < 7:
|
||||
# hist[0]["content"] += citation_prompt()
|
||||
# cited = True
|
||||
# yield "", token_count
|
||||
|
||||
# _hist = hist
|
||||
# if len(hist) > 12:
|
||||
# _hist = [hist[0], hist[1], *hist[-10:]]
|
||||
# entire_txt = ""
|
||||
# async for delta_ans in self._generate_streamly(_hist):
|
||||
# if not need2cite or cited:
|
||||
# yield delta_ans, 0
|
||||
# entire_txt += delta_ans
|
||||
# if not need2cite or cited:
|
||||
# return
|
||||
|
||||
# st = timer()
|
||||
# txt = ""
|
||||
# async for delta_ans in self._gen_citations_async(entire_txt):
|
||||
# if self.check_if_canceled("Agent streaming"):
|
||||
# return
|
||||
# yield delta_ans, 0
|
||||
# txt += delta_ans
|
||||
|
||||
# self.callback("gen_citations", {}, txt, elapsed_time=timer()-st)
|
||||
|
||||
# def append_user_content(hist, content):
|
||||
# if hist[-1]["role"] == "user":
|
||||
# hist[-1]["content"] += content
|
||||
# else:
|
||||
# hist.append({"role": "user", "content": content})
|
||||
|
||||
# st = timer()
|
||||
# task_desc = await analyze_task_async(self.chat_mdl, prompt, user_request, tool_metas, user_defined_prompt)
|
||||
# self.callback("analyze_task", {}, task_desc, elapsed_time=timer()-st)
|
||||
# for _ in range(self._param.max_rounds + 1):
|
||||
# if self.check_if_canceled("Agent streaming"):
|
||||
# return
|
||||
# response, tk = await next_step_async(self.chat_mdl, hist, tool_metas, task_desc, user_defined_prompt)
|
||||
# # self.callback("next_step", {}, str(response)[:256]+"...")
|
||||
# token_count += tk or 0
|
||||
# hist.append({"role": "assistant", "content": response})
|
||||
# try:
|
||||
# functions = json_repair.loads(re.sub(r"```.*", "", response))
|
||||
# if not isinstance(functions, list):
|
||||
# raise TypeError(f"List should be returned, but `{functions}`")
|
||||
# for f in functions:
|
||||
# if not isinstance(f, dict):
|
||||
# raise TypeError(f"An object type should be returned, but `{f}`")
|
||||
|
||||
# tool_tasks = []
|
||||
# for func in functions:
|
||||
# name = func["name"]
|
||||
# args = func["arguments"]
|
||||
# if name == COMPLETE_TASK:
|
||||
# append_user_content(hist, f"Respond with a formal answer. FORGET(DO NOT mention) about `{COMPLETE_TASK}`. The language for the response MUST be as the same as the first user request.\n")
|
||||
# async for txt, tkcnt in complete():
|
||||
# yield txt, tkcnt
|
||||
# return
|
||||
|
||||
# tool_tasks.append(asyncio.create_task(use_tool_async(name, args)))
|
||||
|
||||
# results = await asyncio.gather(*tool_tasks) if tool_tasks else []
|
||||
# st = timer()
|
||||
# reflection = await reflect_async(self.chat_mdl, hist, results, user_defined_prompt)
|
||||
# append_user_content(hist, reflection)
|
||||
# self.callback("reflection", {}, str(reflection), elapsed_time=timer()-st)
|
||||
|
||||
# except Exception as e:
|
||||
# logging.exception(msg=f"Wrong JSON argument format in LLM ReAct response: {e}")
|
||||
# e = f"\nTool call error, please correct the input parameter of response format and call it again.\n *** Exception ***\n{e}"
|
||||
# append_user_content(hist, str(e))
|
||||
|
||||
# logging.warning( f"Exceed max rounds: {self._param.max_rounds}")
|
||||
# final_instruction = f"""
|
||||
# {user_request}
|
||||
# IMPORTANT: You have reached the conversation limit. Based on ALL the information and research you have gathered so far, please provide a DIRECT and COMPREHENSIVE final answer to the original request.
|
||||
# Instructions:
|
||||
# 1. SYNTHESIZE all information collected during this conversation
|
||||
# 2. Provide a COMPLETE response using existing data - do not suggest additional research
|
||||
# 3. Structure your response as a FINAL DELIVERABLE, not a plan
|
||||
# 4. If information is incomplete, state what you found and provide the best analysis possible with available data
|
||||
# 5. DO NOT mention conversation limits or suggest further steps
|
||||
# 6. Focus on delivering VALUE with the information already gathered
|
||||
# Respond immediately with your final comprehensive answer.
|
||||
# """
|
||||
# if self.check_if_canceled("Agent final instruction"):
|
||||
# return
|
||||
# append_user_content(hist, final_instruction)
|
||||
|
||||
# async for txt, tkcnt in complete():
|
||||
# yield txt, tkcnt
|
||||
|
||||
async def _gen_citations_async(self, text):
|
||||
retrievals = self._canvas.get_reference()
|
||||
retrievals = {"chunks": list(retrievals["chunks"].values()), "doc_aggs": list(retrievals["doc_aggs"].values())}
|
||||
|
||||
@ -326,7 +326,6 @@ async def webhook(agent_id: str):
|
||||
secret = jwt_cfg.get("secret")
|
||||
if not secret:
|
||||
raise Exception("JWT secret not configured")
|
||||
required_claims = jwt_cfg.get("required_claims", [])
|
||||
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if not auth_header.startswith("Bearer "):
|
||||
@ -750,7 +749,7 @@ async def webhook(agent_id: str):
|
||||
async def sse():
|
||||
nonlocal canvas
|
||||
contents: list[str] = []
|
||||
|
||||
status = 200
|
||||
try:
|
||||
async for ans in canvas.run(
|
||||
query="",
|
||||
@ -765,6 +764,8 @@ async def webhook(agent_id: str):
|
||||
content = "</think>"
|
||||
if content:
|
||||
contents.append(content)
|
||||
if ans["event"] == "message_end":
|
||||
status = int(ans["data"].get("status", status))
|
||||
if is_test:
|
||||
append_webhook_trace(
|
||||
agent_id,
|
||||
@ -782,7 +783,11 @@ async def webhook(agent_id: str):
|
||||
}
|
||||
)
|
||||
final_content = "".join(contents)
|
||||
yield json.dumps(final_content, ensure_ascii=False)
|
||||
return {
|
||||
"message": final_content,
|
||||
"success": True,
|
||||
"code": status,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if is_test:
|
||||
@ -804,10 +809,14 @@ async def webhook(agent_id: str):
|
||||
"success": False,
|
||||
}
|
||||
)
|
||||
yield json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False)
|
||||
return {"code": 400, "message": str(e),"success":False}
|
||||
|
||||
resp = Response(sse(), mimetype="application/json")
|
||||
return resp
|
||||
result = await sse()
|
||||
return Response(
|
||||
json.dumps(result),
|
||||
status=result["code"],
|
||||
mimetype="application/json",
|
||||
)
|
||||
|
||||
|
||||
@manager.route("/webhook_trace/<agent_id>", methods=["GET"]) # noqa: F821
|
||||
|
||||
Reference in New Issue
Block a user