From 3baebd709b6e53d137fc535569516d0baa4c3770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A9=E6=B5=B7=E8=92=BC=E7=81=86?= Date: Fri, 22 Aug 2025 12:04:15 +0800 Subject: [PATCH] Refactoring: Agent completions API change response structure (#9631) ### What problem does this PR solve? Resolve #9549 and #9436 , In v0.20.x,Agent completions API changed a lot,such as without reference and so on ### Type of change - [x] Refactoring --- api/apps/sdk/session.py | 31 ++++++++---------------- api/db/services/canvas_service.py | 40 ++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index 0bd513d0e..269198560 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -450,37 +450,26 @@ def agents_completion_openai_compatibility(tenant_id, agent_id): def agent_completions(tenant_id, agent_id): req = request.json - ans = {} + if req.get("stream", True): - - def generate(): - for answer in agent_completion(tenant_id=tenant_id, agent_id=agent_id, **req): - if isinstance(answer, str): - try: - ans = json.loads(answer[5:]) # remove "data:" - except Exception: - continue - - if ans.get("event") != "message": - continue - - yield answer - - yield "data:[DONE]\n\n" - - resp = Response(generate(), mimetype="text/event-stream") + resp = Response(agent_completion(tenant_id=tenant_id, agent_id=agent_id, **req), mimetype="text/event-stream") resp.headers.add_header("Cache-control", "no-cache") resp.headers.add_header("Connection", "keep-alive") resp.headers.add_header("X-Accel-Buffering", "no") resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8") return resp - + result = {} for answer in agent_completion(tenant_id=tenant_id, agent_id=agent_id, **req): try: ans = json.loads(answer[5:]) # remove "data:" + if not result: + result = ans.copy() + else: + result["data"]["answer"] += ans["data"]["answer"] + result["data"]["reference"] = ans["data"].get("reference", []) except Exception as e: - return get_result(data=f"**ERROR**: {str(e)}") - return get_result(data=ans) + return get_error_data_result(str(e)) + return result @manager.route("/chats//sessions", methods=["GET"]) # noqa: F821 diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index 98d17e1d8..b84693615 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -134,6 +134,25 @@ class UserCanvasService(CommonService): return False return True + +def structure_answer(conv, ans, message_id, session_id): + if not conv: + return ans + content = "" + if ans["event"] == "message": + if ans["data"].get("start_to_think") is True: + content = "" + elif ans["data"].get("end_to_think") is True: + content = "" + else: + content = ans["data"]["content"] + + reference = ans["data"].get("reference") + result = {"id": message_id, "session_id": session_id, "answer": content} + if reference: + result["reference"] = [reference] + return result + def completion(tenant_id, agent_id, session_id=None, **kwargs): query = kwargs.get("query", "") or kwargs.get("question", "") files = kwargs.get("files", []) @@ -176,13 +195,14 @@ def completion(tenant_id, agent_id, session_id=None, **kwargs): }) txt = "" for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): - ans["session_id"] = session_id - if ans["event"] == "message": - txt += ans["data"]["content"] - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + ans = structure_answer(conv, ans, message_id, session_id) + txt += ans["answer"] + if ans.get("answer") or ans.get("reference"): + yield "data:" + json.dumps({"code": 0, "data": ans}, + ensure_ascii=False) + "\n\n" conv.message.append({"role": "assistant", "content": txt, "created_at": time.time(), "id": message_id}) - conv.reference = canvas.get_reference() + conv.reference.append(canvas.get_reference()) conv.errors = canvas.error conv.dsl = str(canvas) conv = conv.to_dict() @@ -211,11 +231,9 @@ def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True except Exception as e: logging.exception(f"Agent OpenAI-Compatible completionOpenAI parse answer failed: {e}") continue - - if ans.get("event") != "message": + if not ans["data"]["answer"]: continue - - content_piece = ans["data"]["content"] + content_piece = ans["data"]["answer"] completion_tokens += len(tiktokenenc.encode(content_piece)) yield "data: " + json.dumps( @@ -260,9 +278,9 @@ def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True ): if isinstance(ans, str): ans = json.loads(ans[5:]) - if ans.get("event") != "message": + if not ans["data"]["answer"]: continue - all_content += ans["data"]["content"] + all_content += ans["data"]["answer"] completion_tokens = len(tiktokenenc.encode(all_content))