diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index 281f88855..0476e3572 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -436,14 +436,38 @@ def agents_completion_openai_compatibility(tenant_id, agent_id): ) ) - # Get the last user message as the question question = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") - if req.get("stream", True): - return Response(completionOpenAI(tenant_id, agent_id, question, session_id=req.get("id", req.get("metadata", {}).get("id", "")), stream=True), mimetype="text/event-stream") + stream = req.pop("stream", False) + if stream: + resp = Response( + completionOpenAI( + tenant_id, + agent_id, + question, + session_id=req.get("id", req.get("metadata", {}).get("id", "")), + stream=True, + **req, + ), + mimetype="text/event-stream", + ) + resp.headers.add_header("Cache-control", "no-cache") + resp.headers.add_header("Connection", "keep-alive") + resp.headers.add_header("X-Accel-Buffering", "no") + resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8") + return resp else: # For non-streaming, just return the response directly - response = next(completionOpenAI(tenant_id, agent_id, question, session_id=req.get("id", req.get("metadata", {}).get("id", "")), stream=False)) + response = next( + completionOpenAI( + tenant_id, + agent_id, + question, + session_id=req.get("id", req.get("metadata", {}).get("id", "")), + stream=False, + **req, + ) + ) return jsonify(response) @@ -850,10 +874,10 @@ def begin_inputs(agent_id): return get_error_data_result(f"Can't find agent by ID: {agent_id}") canvas = Canvas(json.dumps(cvs.dsl), objs[0].tenant_id) - return get_result(data={ - "title": cvs.title, - "avatar": cvs.avatar, - "inputs": canvas.get_component_input_form("begin") - }) - - + return get_result( + data={ + "title": cvs.title, + "avatar": cvs.avatar, + "inputs": canvas.get_component_input_form("begin"), + } + ) diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index e6bee4be4..f791edf7a 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -16,7 +16,6 @@ import json import logging import time -import traceback from uuid import uuid4 from agent.canvas import Canvas from api.db import TenantPermission @@ -54,12 +53,12 @@ class UserCanvasService(CommonService): agents = agents.paginate(page_number, items_per_page) return list(agents.dicts()) - + @classmethod @DB.connection_context() def get_by_tenant_id(cls, pid): try: - + fields = [ cls.model.id, cls.model.avatar, @@ -83,7 +82,7 @@ class UserCanvasService(CommonService): except Exception as e: logging.exception(e) return False, None - + @classmethod @DB.connection_context() def get_by_tenant_ids(cls, joined_tenant_ids, user_id, @@ -103,14 +102,14 @@ class UserCanvasService(CommonService): ] if keywords: agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( - ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == + ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == TenantPermission.TEAM.value)) | ( cls.model.user_id == user_id)), (fn.LOWER(cls.model.title).contains(keywords.lower())) ) else: agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( - ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == + ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == TenantPermission.TEAM.value)) | ( cls.model.user_id == user_id)) ) @@ -178,219 +177,99 @@ def completion(tenant_id, agent_id, session_id=None, **kwargs): def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True, **kwargs): - """Main function for OpenAI-compatible completions, structured similarly to the completion function.""" tiktokenenc = tiktoken.get_encoding("cl100k_base") - e, cvs = UserCanvasService.get_by_id(agent_id) - - if not e: - yield get_data_openai( - id=session_id, - model=agent_id, - content="**ERROR**: Agent not found." - ) - return - - if cvs.user_id != tenant_id: - yield get_data_openai( - id=session_id, - model=agent_id, - content="**ERROR**: You do not own the agent" - ) - return - - if not isinstance(cvs.dsl, str): - cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False) - - canvas = Canvas(cvs.dsl, tenant_id) - canvas.reset() - message_id = str(uuid4()) - - # Handle new session creation - if not session_id: - query = canvas.get_preset_param() - if query: - for ele in query: - if not ele["optional"]: - if not kwargs.get(ele["key"]): - yield get_data_openai( - id=None, - model=agent_id, - content=f"`{ele['key']}` is required", - completion_tokens=len(tiktokenenc.encode(f"`{ele['key']}` is required")), - prompt_tokens=len(tiktokenenc.encode(question if question else "")) - ) - return - ele["value"] = kwargs[ele["key"]] - if ele["optional"]: - if kwargs.get(ele["key"]): - ele["value"] = kwargs[ele['key']] - else: - if "value" in ele: - ele.pop("value") - - cvs.dsl = json.loads(str(canvas)) - session_id = get_uuid() - conv = { - "id": session_id, - "dialog_id": cvs.id, - "user_id": kwargs.get("user_id", "") if isinstance(kwargs, dict) else "", - "message": [{"role": "assistant", "content": canvas.get_prologue(), "created_at": time.time()}], - "source": "agent", - "dsl": cvs.dsl - } - canvas.messages.append({"role": "user", "content": question, "id": message_id}) - canvas.add_user_input(question) - - API4ConversationService.save(**conv) - conv = API4Conversation(**conv) - if not conv.message: - conv.message = [] - conv.message.append({ - "role": "user", - "content": question, - "id": message_id - }) - - if not conv.reference: - conv.reference = [] - conv.reference.append({"chunks": [], "doc_aggs": []}) - - # Handle existing session - else: - e, conv = API4ConversationService.get_by_id(session_id) - if not e: - yield get_data_openai( - id=session_id, - model=agent_id, - content="**ERROR**: Session not found!" - ) - return - - canvas = Canvas(json.dumps(conv.dsl), tenant_id) - canvas.messages.append({"role": "user", "content": question, "id": message_id}) - canvas.add_user_input(question) - - if not conv.message: - conv.message = [] - conv.message.append({ - "role": "user", - "content": question, - "id": message_id - }) - - if not conv.reference: - conv.reference = [] - conv.reference.append({"chunks": [], "doc_aggs": []}) - - # Process request based on stream mode - final_ans = {"reference": [], "content": ""} prompt_tokens = len(tiktokenenc.encode(str(question))) - + user_id = kwargs.get("user_id", "") + if stream: + completion_tokens = 0 try: - completion_tokens = 0 - for ans in canvas.run(stream=True, bypass_begin=True): - if ans.get("running_status"): - completion_tokens += len(tiktokenenc.encode(ans.get("content", ""))) - yield "data: " + json.dumps( - get_data_openai( - id=session_id, - model=agent_id, - content=ans["content"], - object="chat.completion.chunk", - completion_tokens=completion_tokens, - prompt_tokens=prompt_tokens - ), - ensure_ascii=False - ) + "\n\n" + for ans in completion( + tenant_id=tenant_id, + agent_id=agent_id, + session_id=session_id, + query=question, + user_id=user_id, + **kwargs + ): + if isinstance(ans, str): + try: + ans = json.loads(ans[5:]) # remove "data:" + except Exception as e: + logging.exception(f"Agent OpenAI-Compatible completionOpenAI parse answer failed: {e}") + continue + + if ans.get("event") != "message": continue - - for k in ans.keys(): - final_ans[k] = ans[k] - - completion_tokens += len(tiktokenenc.encode(final_ans.get("content", ""))) + + content_piece = ans["data"]["content"] + completion_tokens += len(tiktokenenc.encode(content_piece)) + yield "data: " + json.dumps( get_data_openai( - id=session_id, + id=session_id or str(uuid4()), model=agent_id, - content=final_ans["content"], - object="chat.completion.chunk", - finish_reason="stop", + content=content_piece, + prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, - prompt_tokens=prompt_tokens + stream=True ), ensure_ascii=False ) + "\n\n" - - # Update conversation - canvas.messages.append({"role": "assistant", "content": final_ans["content"], "created_at": time.time(), "id": message_id}) - canvas.history.append(("assistant", final_ans["content"])) - if final_ans.get("reference"): - canvas.reference.append(final_ans["reference"]) - conv.dsl = json.loads(str(canvas)) - API4ConversationService.append_message(conv.id, conv.to_dict()) - + yield "data: [DONE]\n\n" - + except Exception as e: - traceback.print_exc() - conv.dsl = json.loads(str(canvas)) - API4ConversationService.append_message(conv.id, conv.to_dict()) yield "data: " + json.dumps( get_data_openai( - id=session_id, + id=session_id or str(uuid4()), model=agent_id, - content="**ERROR**: " + str(e), + content=f"**ERROR**: {str(e)}", finish_reason="stop", - completion_tokens=len(tiktokenenc.encode("**ERROR**: " + str(e))), - prompt_tokens=prompt_tokens + prompt_tokens=prompt_tokens, + completion_tokens=len(tiktokenenc.encode(f"**ERROR**: {str(e)}")), + stream=True ), ensure_ascii=False ) + "\n\n" yield "data: [DONE]\n\n" - - else: # Non-streaming mode + + else: try: - all_answer_content = "" - for answer in canvas.run(stream=False, bypass_begin=True): - if answer.get("running_status"): + all_content = "" + for ans in completion( + tenant_id=tenant_id, + agent_id=agent_id, + session_id=session_id, + query=question, + user_id=user_id, + **kwargs + ): + if isinstance(ans, str): + ans = json.loads(ans[5:]) + if ans.get("event") != "message": continue - - final_ans["content"] = "\n".join(answer["content"]) if "content" in answer else "" - final_ans["reference"] = answer.get("reference", []) - all_answer_content += final_ans["content"] - - final_ans["content"] = all_answer_content - - # Update conversation - canvas.messages.append({"role": "assistant", "content": final_ans["content"], "created_at": time.time(), "id": message_id}) - canvas.history.append(("assistant", final_ans["content"])) - if final_ans.get("reference"): - canvas.reference.append(final_ans["reference"]) - conv.dsl = json.loads(str(canvas)) - API4ConversationService.append_message(conv.id, conv.to_dict()) - - # Return the response in OpenAI format + all_content += ans["data"]["content"] + + completion_tokens = len(tiktokenenc.encode(all_content)) + yield get_data_openai( - id=session_id, + id=session_id or str(uuid4()), model=agent_id, - content=final_ans["content"], - finish_reason="stop", - completion_tokens=len(tiktokenenc.encode(final_ans["content"])), prompt_tokens=prompt_tokens, - param=canvas.get_preset_param() # Added param info like in completion - ) - - except Exception as e: - traceback.print_exc() - conv.dsl = json.loads(str(canvas)) - API4ConversationService.append_message(conv.id, conv.to_dict()) - yield get_data_openai( - id=session_id, - model=agent_id, - content="**ERROR**: " + str(e), + completion_tokens=completion_tokens, + content=all_content, finish_reason="stop", - completion_tokens=len(tiktokenenc.encode("**ERROR**: " + str(e))), - prompt_tokens=prompt_tokens + param=None ) + except Exception as e: + yield get_data_openai( + id=session_id or str(uuid4()), + model=agent_id, + prompt_tokens=prompt_tokens, + completion_tokens=len(tiktokenenc.encode(f"**ERROR**: {str(e)}")), + content=f"**ERROR**: {str(e)}", + finish_reason="stop", + param=None + ) diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 27ab6201d..8e98c1d8d 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -402,8 +402,22 @@ def get_data_openai( finish_reason=None, object="chat.completion", param=None, + stream=False ): total_tokens = prompt_tokens + completion_tokens + + if stream: + return { + "id": f"{id}", + "object": "chat.completion.chunk", + "model": model, + "choices": [{ + "delta": {"content": content}, + "finish_reason": finish_reason, + "index": 0, + }], + } + return { "id": f"{id}", "object": object, @@ -414,9 +428,21 @@ def get_data_openai( "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens, - "completion_tokens_details": {"reasoning_tokens": 0, "accepted_prediction_tokens": 0, "rejected_prediction_tokens": 0}, + "completion_tokens_details": { + "reasoning_tokens": 0, + "accepted_prediction_tokens": 0, + "rejected_prediction_tokens": 0, + }, }, - "choices": [{"message": {"role": "assistant", "content": content}, "logprobs": None, "finish_reason": finish_reason, "index": 0}], + "choices": [{ + "message": { + "role": "assistant", + "content": content + }, + "logprobs": None, + "finish_reason": finish_reason, + "index": 0, + }], }