Fix: update broken agent OpenAI-Compatible completion due to v0.20.0 changes (#9241)

### What problem does this PR solve?

Update broken agent OpenAI-Compatible completion due to v0.20.0. #9199 

Usage example:

**Referring the input is important, otherwise, will result in empty
output.**

<img width="1273" height="711" alt="Image"
src="https://github.com/user-attachments/assets/30740be8-f4d6-400d-9fda-d2616f89063f"
/>

<img width="622" height="247" alt="Image"
src="https://github.com/user-attachments/assets/0a2ca57a-9600-4cec-9362-0cafd0ab3aee"
/>

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Yongteng Lei
2025-08-05 17:47:25 +08:00
committed by GitHub
parent 0a303d9ae1
commit e6bad45c6d
3 changed files with 134 additions and 205 deletions

View File

@ -436,14 +436,38 @@ def agents_completion_openai_compatibility(tenant_id, agent_id):
) )
) )
# Get the last user message as the question
question = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") question = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
if req.get("stream", True): stream = req.pop("stream", False)
return Response(completionOpenAI(tenant_id, agent_id, question, session_id=req.get("id", req.get("metadata", {}).get("id", "")), stream=True), mimetype="text/event-stream") if stream:
resp = Response(
completionOpenAI(
tenant_id,
agent_id,
question,
session_id=req.get("id", req.get("metadata", {}).get("id", "")),
stream=True,
**req,
),
mimetype="text/event-stream",
)
resp.headers.add_header("Cache-control", "no-cache")
resp.headers.add_header("Connection", "keep-alive")
resp.headers.add_header("X-Accel-Buffering", "no")
resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
return resp
else: else:
# For non-streaming, just return the response directly # For non-streaming, just return the response directly
response = next(completionOpenAI(tenant_id, agent_id, question, session_id=req.get("id", req.get("metadata", {}).get("id", "")), stream=False)) response = next(
completionOpenAI(
tenant_id,
agent_id,
question,
session_id=req.get("id", req.get("metadata", {}).get("id", "")),
stream=False,
**req,
)
)
return jsonify(response) return jsonify(response)
@ -850,10 +874,10 @@ def begin_inputs(agent_id):
return get_error_data_result(f"Can't find agent by ID: {agent_id}") return get_error_data_result(f"Can't find agent by ID: {agent_id}")
canvas = Canvas(json.dumps(cvs.dsl), objs[0].tenant_id) canvas = Canvas(json.dumps(cvs.dsl), objs[0].tenant_id)
return get_result(data={ return get_result(
data={
"title": cvs.title, "title": cvs.title,
"avatar": cvs.avatar, "avatar": cvs.avatar,
"inputs": canvas.get_component_input_form("begin") "inputs": canvas.get_component_input_form("begin"),
}) }
)

View File

@ -16,7 +16,6 @@
import json import json
import logging import logging
import time import time
import traceback
from uuid import uuid4 from uuid import uuid4
from agent.canvas import Canvas from agent.canvas import Canvas
from api.db import TenantPermission from api.db import TenantPermission
@ -178,219 +177,99 @@ def completion(tenant_id, agent_id, session_id=None, **kwargs):
def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True, **kwargs): def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True, **kwargs):
"""Main function for OpenAI-compatible completions, structured similarly to the completion function."""
tiktokenenc = tiktoken.get_encoding("cl100k_base") tiktokenenc = tiktoken.get_encoding("cl100k_base")
e, cvs = UserCanvasService.get_by_id(agent_id)
if not e:
yield get_data_openai(
id=session_id,
model=agent_id,
content="**ERROR**: Agent not found."
)
return
if cvs.user_id != tenant_id:
yield get_data_openai(
id=session_id,
model=agent_id,
content="**ERROR**: You do not own the agent"
)
return
if not isinstance(cvs.dsl, str):
cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)
canvas = Canvas(cvs.dsl, tenant_id)
canvas.reset()
message_id = str(uuid4())
# Handle new session creation
if not session_id:
query = canvas.get_preset_param()
if query:
for ele in query:
if not ele["optional"]:
if not kwargs.get(ele["key"]):
yield get_data_openai(
id=None,
model=agent_id,
content=f"`{ele['key']}` is required",
completion_tokens=len(tiktokenenc.encode(f"`{ele['key']}` is required")),
prompt_tokens=len(tiktokenenc.encode(question if question else ""))
)
return
ele["value"] = kwargs[ele["key"]]
if ele["optional"]:
if kwargs.get(ele["key"]):
ele["value"] = kwargs[ele['key']]
else:
if "value" in ele:
ele.pop("value")
cvs.dsl = json.loads(str(canvas))
session_id = get_uuid()
conv = {
"id": session_id,
"dialog_id": cvs.id,
"user_id": kwargs.get("user_id", "") if isinstance(kwargs, dict) else "",
"message": [{"role": "assistant", "content": canvas.get_prologue(), "created_at": time.time()}],
"source": "agent",
"dsl": cvs.dsl
}
canvas.messages.append({"role": "user", "content": question, "id": message_id})
canvas.add_user_input(question)
API4ConversationService.save(**conv)
conv = API4Conversation(**conv)
if not conv.message:
conv.message = []
conv.message.append({
"role": "user",
"content": question,
"id": message_id
})
if not conv.reference:
conv.reference = []
conv.reference.append({"chunks": [], "doc_aggs": []})
# Handle existing session
else:
e, conv = API4ConversationService.get_by_id(session_id)
if not e:
yield get_data_openai(
id=session_id,
model=agent_id,
content="**ERROR**: Session not found!"
)
return
canvas = Canvas(json.dumps(conv.dsl), tenant_id)
canvas.messages.append({"role": "user", "content": question, "id": message_id})
canvas.add_user_input(question)
if not conv.message:
conv.message = []
conv.message.append({
"role": "user",
"content": question,
"id": message_id
})
if not conv.reference:
conv.reference = []
conv.reference.append({"chunks": [], "doc_aggs": []})
# Process request based on stream mode
final_ans = {"reference": [], "content": ""}
prompt_tokens = len(tiktokenenc.encode(str(question))) prompt_tokens = len(tiktokenenc.encode(str(question)))
user_id = kwargs.get("user_id", "")
if stream: if stream:
try:
completion_tokens = 0 completion_tokens = 0
for ans in canvas.run(stream=True, bypass_begin=True):
if ans.get("running_status"):
completion_tokens += len(tiktokenenc.encode(ans.get("content", "")))
yield "data: " + json.dumps(
get_data_openai(
id=session_id,
model=agent_id,
content=ans["content"],
object="chat.completion.chunk",
completion_tokens=completion_tokens,
prompt_tokens=prompt_tokens
),
ensure_ascii=False
) + "\n\n"
continue
for k in ans.keys():
final_ans[k] = ans[k]
completion_tokens += len(tiktokenenc.encode(final_ans.get("content", "")))
yield "data: " + json.dumps(
get_data_openai(
id=session_id,
model=agent_id,
content=final_ans["content"],
object="chat.completion.chunk",
finish_reason="stop",
completion_tokens=completion_tokens,
prompt_tokens=prompt_tokens
),
ensure_ascii=False
) + "\n\n"
# Update conversation
canvas.messages.append({"role": "assistant", "content": final_ans["content"], "created_at": time.time(), "id": message_id})
canvas.history.append(("assistant", final_ans["content"]))
if final_ans.get("reference"):
canvas.reference.append(final_ans["reference"])
conv.dsl = json.loads(str(canvas))
API4ConversationService.append_message(conv.id, conv.to_dict())
yield "data: [DONE]\n\n"
except Exception as e:
traceback.print_exc()
conv.dsl = json.loads(str(canvas))
API4ConversationService.append_message(conv.id, conv.to_dict())
yield "data: " + json.dumps(
get_data_openai(
id=session_id,
model=agent_id,
content="**ERROR**: " + str(e),
finish_reason="stop",
completion_tokens=len(tiktokenenc.encode("**ERROR**: " + str(e))),
prompt_tokens=prompt_tokens
),
ensure_ascii=False
) + "\n\n"
yield "data: [DONE]\n\n"
else: # Non-streaming mode
try: try:
all_answer_content = "" for ans in completion(
for answer in canvas.run(stream=False, bypass_begin=True): tenant_id=tenant_id,
if answer.get("running_status"): agent_id=agent_id,
session_id=session_id,
query=question,
user_id=user_id,
**kwargs
):
if isinstance(ans, str):
try:
ans = json.loads(ans[5:]) # remove "data:"
except Exception as e:
logging.exception(f"Agent OpenAI-Compatible completionOpenAI parse answer failed: {e}")
continue continue
final_ans["content"] = "\n".join(answer["content"]) if "content" in answer else "" if ans.get("event") != "message":
final_ans["reference"] = answer.get("reference", []) continue
all_answer_content += final_ans["content"]
final_ans["content"] = all_answer_content content_piece = ans["data"]["content"]
completion_tokens += len(tiktokenenc.encode(content_piece))
# Update conversation yield "data: " + json.dumps(
canvas.messages.append({"role": "assistant", "content": final_ans["content"], "created_at": time.time(), "id": message_id}) get_data_openai(
canvas.history.append(("assistant", final_ans["content"])) id=session_id or str(uuid4()),
if final_ans.get("reference"):
canvas.reference.append(final_ans["reference"])
conv.dsl = json.loads(str(canvas))
API4ConversationService.append_message(conv.id, conv.to_dict())
# Return the response in OpenAI format
yield get_data_openai(
id=session_id,
model=agent_id, model=agent_id,
content=final_ans["content"], content=content_piece,
finish_reason="stop",
completion_tokens=len(tiktokenenc.encode(final_ans["content"])),
prompt_tokens=prompt_tokens, prompt_tokens=prompt_tokens,
param=canvas.get_preset_param() # Added param info like in completion completion_tokens=completion_tokens,
stream=True
),
ensure_ascii=False
) + "\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield "data: " + json.dumps(
get_data_openai(
id=session_id or str(uuid4()),
model=agent_id,
content=f"**ERROR**: {str(e)}",
finish_reason="stop",
prompt_tokens=prompt_tokens,
completion_tokens=len(tiktokenenc.encode(f"**ERROR**: {str(e)}")),
stream=True
),
ensure_ascii=False
) + "\n\n"
yield "data: [DONE]\n\n"
else:
try:
all_content = ""
for ans in completion(
tenant_id=tenant_id,
agent_id=agent_id,
session_id=session_id,
query=question,
user_id=user_id,
**kwargs
):
if isinstance(ans, str):
ans = json.loads(ans[5:])
if ans.get("event") != "message":
continue
all_content += ans["data"]["content"]
completion_tokens = len(tiktokenenc.encode(all_content))
yield get_data_openai(
id=session_id or str(uuid4()),
model=agent_id,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
content=all_content,
finish_reason="stop",
param=None
) )
except Exception as e: except Exception as e:
traceback.print_exc()
conv.dsl = json.loads(str(canvas))
API4ConversationService.append_message(conv.id, conv.to_dict())
yield get_data_openai( yield get_data_openai(
id=session_id, id=session_id or str(uuid4()),
model=agent_id, model=agent_id,
content="**ERROR**: " + str(e), prompt_tokens=prompt_tokens,
completion_tokens=len(tiktokenenc.encode(f"**ERROR**: {str(e)}")),
content=f"**ERROR**: {str(e)}",
finish_reason="stop", finish_reason="stop",
completion_tokens=len(tiktokenenc.encode("**ERROR**: " + str(e))), param=None
prompt_tokens=prompt_tokens
) )

View File

@ -402,8 +402,22 @@ def get_data_openai(
finish_reason=None, finish_reason=None,
object="chat.completion", object="chat.completion",
param=None, param=None,
stream=False
): ):
total_tokens = prompt_tokens + completion_tokens total_tokens = prompt_tokens + completion_tokens
if stream:
return {
"id": f"{id}",
"object": "chat.completion.chunk",
"model": model,
"choices": [{
"delta": {"content": content},
"finish_reason": finish_reason,
"index": 0,
}],
}
return { return {
"id": f"{id}", "id": f"{id}",
"object": object, "object": object,
@ -414,9 +428,21 @@ def get_data_openai(
"prompt_tokens": prompt_tokens, "prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens, "completion_tokens": completion_tokens,
"total_tokens": total_tokens, "total_tokens": total_tokens,
"completion_tokens_details": {"reasoning_tokens": 0, "accepted_prediction_tokens": 0, "rejected_prediction_tokens": 0}, "completion_tokens_details": {
"reasoning_tokens": 0,
"accepted_prediction_tokens": 0,
"rejected_prediction_tokens": 0,
}, },
"choices": [{"message": {"role": "assistant", "content": content}, "logprobs": None, "finish_reason": finish_reason, "index": 0}], },
"choices": [{
"message": {
"role": "assistant",
"content": content
},
"logprobs": None,
"finish_reason": finish_reason,
"index": 0,
}],
} }