diff --git a/agent/canvas.py b/agent/canvas.py index 953beb7e3..38f110282 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -540,6 +540,8 @@ class Canvas(Graph): cite = re.search(r"\[ID:[ 0-9]+\]", cpn_obj.output("content")) message_end = {} + if cpn_obj.get_param("status"): + message_end["status"] = cpn_obj.get_param("status") if isinstance(cpn_obj.output("attachment"), dict): message_end["attachment"] = cpn_obj.output("attachment") if cite: diff --git a/agent/component/agent_with_tools.py b/agent/component/agent_with_tools.py index 988351cf6..443cbdacf 100644 --- a/agent/component/agent_with_tools.py +++ b/agent/component/agent_with_tools.py @@ -29,8 +29,8 @@ from api.db.services.llm_service import LLMBundle from api.db.services.tenant_llm_service import TenantLLMService from api.db.services.mcp_server_service import MCPServerService from common.connection_utils import timeout -from rag.prompts.generator import next_step_async, COMPLETE_TASK, analyze_task_async, \ - citation_prompt, reflect_async, kb_prompt, citation_plus, full_question, message_fit_in, structured_output_prompt +from rag.prompts.generator import next_step_async, COMPLETE_TASK, \ + citation_prompt, kb_prompt, citation_plus, full_question, message_fit_in, structured_output_prompt from common.mcp_tool_call_conn import MCPToolCallSession, mcp_tool_metadata_to_openai_tool from agent.component.llm import LLMParam, LLM @@ -246,7 +246,7 @@ class Agent(LLM, ToolBase): _, msg = message_fit_in([{"role": "system", "content": prompt}, *msg], int(self.chat_mdl.max_length * 0.97)) answer_without_toolcall = "" use_tools = [] - async for delta_ans, _ in self._react_with_tools_streamly_async(prompt, msg, use_tools, user_defined_prompt): + async for delta_ans, _ in self._react_with_tools_streamly_async_simple(prompt, msg, use_tools, user_defined_prompt): if self.check_if_canceled("Agent streaming"): return @@ -264,7 +264,7 @@ class Agent(LLM, ToolBase): if use_tools: self.set_output("use_tools", use_tools) - async def _react_with_tools_streamly_async(self, prompt, history: list[dict], use_tools, user_defined_prompt={}, schema_prompt: str = ""): + async def _react_with_tools_streamly_async_simple(self, prompt, history: list[dict], use_tools, user_defined_prompt={}, schema_prompt: str = ""): token_count = 0 tool_metas = self.tool_meta hist = deepcopy(history) @@ -276,6 +276,28 @@ class Agent(LLM, ToolBase): else: user_request = history[-1]["content"] + def build_task_desc(prompt: str, user_request: str, tool_metas: list[dict], user_defined_prompt: dict | None = None) -> str: + """Build a minimal task_desc by concatenating prompt, query, and tool schemas.""" + user_defined_prompt = user_defined_prompt or {} + + tools_json = json.dumps(tool_metas, ensure_ascii=False, indent=2) + + task_desc = ( + "### Agent Prompt\n" + f"{prompt}\n\n" + "### User Request\n" + f"{user_request}\n\n" + "### Tools (schemas)\n" + f"{tools_json}\n" + ) + + if user_defined_prompt: + udp_json = json.dumps(user_defined_prompt, ensure_ascii=False, indent=2) + task_desc += "\n### User Defined Prompts\n" + udp_json + "\n" + + return task_desc + + async def use_tool_async(name, args): nonlocal hist, use_tools, last_calling logging.info(f"{last_calling=} == {name=}") @@ -286,9 +308,6 @@ class Agent(LLM, ToolBase): "arguments": args, "results": tool_response }) - # self.callback("add_memory", {}, "...") - #self.add_memory(hist[-2]["content"], hist[-1]["content"], name, args, str(tool_response), user_defined_prompt) - return name, tool_response async def complete(): @@ -326,6 +345,21 @@ class Agent(LLM, ToolBase): self.callback("gen_citations", {}, txt, elapsed_time=timer()-st) + def build_observation(tool_call_res: list[tuple]) -> str: + """ + Build a Observation from tool call results. + No LLM involved. + """ + if not tool_call_res: + return "" + + lines = ["Observation:"] + for name, result in tool_call_res: + lines.append(f"[{name} result]") + lines.append(str(result)) + + return "\n".join(lines) + def append_user_content(hist, content): if hist[-1]["role"] == "user": hist[-1]["content"] += content @@ -333,7 +367,7 @@ class Agent(LLM, ToolBase): hist.append({"role": "user", "content": content}) st = timer() - task_desc = await analyze_task_async(self.chat_mdl, prompt, user_request, tool_metas, user_defined_prompt) + task_desc = build_task_desc(prompt, user_request, tool_metas, user_defined_prompt) self.callback("analyze_task", {}, task_desc, elapsed_time=timer()-st) for _ in range(self._param.max_rounds + 1): if self.check_if_canceled("Agent streaming"): @@ -364,7 +398,7 @@ class Agent(LLM, ToolBase): results = await asyncio.gather(*tool_tasks) if tool_tasks else [] st = timer() - reflection = await reflect_async(self.chat_mdl, hist, results, user_defined_prompt) + reflection = build_observation(results) append_user_content(hist, reflection) self.callback("reflection", {}, str(reflection), elapsed_time=timer()-st) @@ -393,6 +427,135 @@ Respond immediately with your final comprehensive answer. async for txt, tkcnt in complete(): yield txt, tkcnt +# async def _react_with_tools_streamly_async(self, prompt, history: list[dict], use_tools, user_defined_prompt={}, schema_prompt: str = ""): +# token_count = 0 +# tool_metas = self.tool_meta +# hist = deepcopy(history) +# last_calling = "" +# if len(hist) > 3: +# st = timer() +# user_request = await full_question(messages=history, chat_mdl=self.chat_mdl) +# self.callback("Multi-turn conversation optimization", {}, user_request, elapsed_time=timer()-st) +# else: +# user_request = history[-1]["content"] + +# async def use_tool_async(name, args): +# nonlocal hist, use_tools, last_calling +# logging.info(f"{last_calling=} == {name=}") +# last_calling = name +# tool_response = await self.toolcall_session.tool_call_async(name, args) +# use_tools.append({ +# "name": name, +# "arguments": args, +# "results": tool_response +# }) +# # self.callback("add_memory", {}, "...") +# #self.add_memory(hist[-2]["content"], hist[-1]["content"], name, args, str(tool_response), user_defined_prompt) + +# return name, tool_response + +# async def complete(): +# nonlocal hist +# need2cite = self._param.cite and self._canvas.get_reference()["chunks"] and self._id.find("-->") < 0 +# if schema_prompt: +# need2cite = False +# cited = False +# if hist and hist[0]["role"] == "system": +# if schema_prompt: +# hist[0]["content"] += "\n" + schema_prompt +# if need2cite and len(hist) < 7: +# hist[0]["content"] += citation_prompt() +# cited = True +# yield "", token_count + +# _hist = hist +# if len(hist) > 12: +# _hist = [hist[0], hist[1], *hist[-10:]] +# entire_txt = "" +# async for delta_ans in self._generate_streamly(_hist): +# if not need2cite or cited: +# yield delta_ans, 0 +# entire_txt += delta_ans +# if not need2cite or cited: +# return + +# st = timer() +# txt = "" +# async for delta_ans in self._gen_citations_async(entire_txt): +# if self.check_if_canceled("Agent streaming"): +# return +# yield delta_ans, 0 +# txt += delta_ans + +# self.callback("gen_citations", {}, txt, elapsed_time=timer()-st) + +# def append_user_content(hist, content): +# if hist[-1]["role"] == "user": +# hist[-1]["content"] += content +# else: +# hist.append({"role": "user", "content": content}) + +# st = timer() +# task_desc = await analyze_task_async(self.chat_mdl, prompt, user_request, tool_metas, user_defined_prompt) +# self.callback("analyze_task", {}, task_desc, elapsed_time=timer()-st) +# for _ in range(self._param.max_rounds + 1): +# if self.check_if_canceled("Agent streaming"): +# return +# response, tk = await next_step_async(self.chat_mdl, hist, tool_metas, task_desc, user_defined_prompt) +# # self.callback("next_step", {}, str(response)[:256]+"...") +# token_count += tk or 0 +# hist.append({"role": "assistant", "content": response}) +# try: +# functions = json_repair.loads(re.sub(r"```.*", "", response)) +# if not isinstance(functions, list): +# raise TypeError(f"List should be returned, but `{functions}`") +# for f in functions: +# if not isinstance(f, dict): +# raise TypeError(f"An object type should be returned, but `{f}`") + +# tool_tasks = [] +# for func in functions: +# name = func["name"] +# args = func["arguments"] +# if name == COMPLETE_TASK: +# append_user_content(hist, f"Respond with a formal answer. FORGET(DO NOT mention) about `{COMPLETE_TASK}`. The language for the response MUST be as the same as the first user request.\n") +# async for txt, tkcnt in complete(): +# yield txt, tkcnt +# return + +# tool_tasks.append(asyncio.create_task(use_tool_async(name, args))) + +# results = await asyncio.gather(*tool_tasks) if tool_tasks else [] +# st = timer() +# reflection = await reflect_async(self.chat_mdl, hist, results, user_defined_prompt) +# append_user_content(hist, reflection) +# self.callback("reflection", {}, str(reflection), elapsed_time=timer()-st) + +# except Exception as e: +# logging.exception(msg=f"Wrong JSON argument format in LLM ReAct response: {e}") +# e = f"\nTool call error, please correct the input parameter of response format and call it again.\n *** Exception ***\n{e}" +# append_user_content(hist, str(e)) + +# logging.warning( f"Exceed max rounds: {self._param.max_rounds}") +# final_instruction = f""" +# {user_request} +# IMPORTANT: You have reached the conversation limit. Based on ALL the information and research you have gathered so far, please provide a DIRECT and COMPREHENSIVE final answer to the original request. +# Instructions: +# 1. SYNTHESIZE all information collected during this conversation +# 2. Provide a COMPLETE response using existing data - do not suggest additional research +# 3. Structure your response as a FINAL DELIVERABLE, not a plan +# 4. If information is incomplete, state what you found and provide the best analysis possible with available data +# 5. DO NOT mention conversation limits or suggest further steps +# 6. Focus on delivering VALUE with the information already gathered +# Respond immediately with your final comprehensive answer. +# """ +# if self.check_if_canceled("Agent final instruction"): +# return +# append_user_content(hist, final_instruction) + +# async for txt, tkcnt in complete(): +# yield txt, tkcnt + async def _gen_citations_async(self, text): retrievals = self._canvas.get_reference() retrievals = {"chunks": list(retrievals["chunks"].values()), "doc_aggs": list(retrievals["doc_aggs"].values())} diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 2a6e539a0..5f3821f47 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -326,7 +326,6 @@ async def webhook(agent_id: str): secret = jwt_cfg.get("secret") if not secret: raise Exception("JWT secret not configured") - required_claims = jwt_cfg.get("required_claims", []) auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): @@ -750,7 +749,7 @@ async def webhook(agent_id: str): async def sse(): nonlocal canvas contents: list[str] = [] - + status = 200 try: async for ans in canvas.run( query="", @@ -765,6 +764,8 @@ async def webhook(agent_id: str): content = "" if content: contents.append(content) + if ans["event"] == "message_end": + status = int(ans["data"].get("status", status)) if is_test: append_webhook_trace( agent_id, @@ -782,7 +783,11 @@ async def webhook(agent_id: str): } ) final_content = "".join(contents) - yield json.dumps(final_content, ensure_ascii=False) + return { + "message": final_content, + "success": True, + "code": status, + } except Exception as e: if is_test: @@ -804,10 +809,14 @@ async def webhook(agent_id: str): "success": False, } ) - yield json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False) + return {"code": 400, "message": str(e),"success":False} - resp = Response(sse(), mimetype="application/json") - return resp + result = await sse() + return Response( + json.dumps(result), + status=result["code"], + mimetype="application/json", + ) @manager.route("/webhook_trace/", methods=["GET"]) # noqa: F821