diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index 37dc05d92..faa966c00 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -208,12 +208,14 @@ def chat(dialog, messages, stream=True, **kwargs): check_llm_ts = timer() langfuse_tracer = None + trace_context = {} langfuse_keys = TenantLangfuseService.filter_by_tenant(tenant_id=dialog.tenant_id) if langfuse_keys: langfuse = Langfuse(public_key=langfuse_keys.public_key, secret_key=langfuse_keys.secret_key, host=langfuse_keys.host) if langfuse.auth_check(): langfuse_tracer = langfuse - langfuse.trace = langfuse_tracer.trace(name=f"{dialog.name}-{llm_model_config['llm_name']}") + trace_id = langfuse_tracer.create_trace_id() + trace_context = {"trace_id": trace_id} check_langfuse_tracer_ts = timer() kbs, embd_mdl, rerank_mdl, chat_mdl, tts_mdl = get_models(dialog) @@ -400,17 +402,19 @@ def chat(dialog, messages, stream=True, **kwargs): f" - Token speed: {int(tk_num / (generate_result_time_cost / 1000.0))}/s" ) - langfuse_output = "\n" + re.sub(r"^.*?(### Query:.*)", r"\1", prompt, flags=re.DOTALL) - langfuse_output = {"time_elapsed:": re.sub(r"\n", " \n", langfuse_output), "created_at": time.time()} - # Add a condition check to call the end method only if langfuse_tracer exists if langfuse_tracer and "langfuse_generation" in locals(): - langfuse_generation.end(output=langfuse_output) + langfuse_output = "\n" + re.sub(r"^.*?(### Query:.*)", r"\1", prompt, flags=re.DOTALL) + langfuse_output = {"time_elapsed:": re.sub(r"\n", " \n", langfuse_output), "created_at": time.time()} + langfuse_generation.update(output=langfuse_output) + langfuse_generation.end() return {"answer": think + answer, "reference": refs, "prompt": re.sub(r"\n", " \n", prompt), "created_at": time.time()} if langfuse_tracer: - langfuse_generation = langfuse_tracer.trace.generation(name="chat", model=llm_model_config["llm_name"], input={"prompt": prompt, "prompt4citation": prompt4citation, "messages": msg}) + langfuse_generation = langfuse_tracer.start_generation( + trace_context=trace_context, name="chat", model=llm_model_config["llm_name"], input={"prompt": prompt, "prompt4citation": prompt4citation, "messages": msg} + ) if stream: last_ans = "" diff --git a/api/db/services/llm_service.py b/api/db/services/llm_service.py index d669cdd90..97130a9dc 100644 --- a/api/db/services/llm_service.py +++ b/api/db/services/llm_service.py @@ -217,7 +217,7 @@ class TenantLLMService(CommonService): return list(objs) @staticmethod - def llm_id2llm_type(llm_id: str) ->str|None: + def llm_id2llm_type(llm_id: str) -> str | None: llm_id, *_ = TenantLLMService.split_model_name_and_factory(llm_id) llm_factories = settings.FACTORY_LLM_INFOS for llm_factory in llm_factories: @@ -240,13 +240,13 @@ class LLMBundle: self.verbose_tool_use = kwargs.get("verbose_tool_use") langfuse_keys = TenantLangfuseService.filter_by_tenant(tenant_id=tenant_id) + self.langfuse = None if langfuse_keys: langfuse = Langfuse(public_key=langfuse_keys.public_key, secret_key=langfuse_keys.secret_key, host=langfuse_keys.host) if langfuse.auth_check(): self.langfuse = langfuse - self.trace = self.langfuse.trace(name=f"{self.llm_type}-{self.llm_name}") - else: - self.langfuse = None + trace_id = self.langfuse.create_trace_id() + self.trace_context = {"trace_id": trace_id} def bind_tools(self, toolcall_session, tools): if not self.is_tools: @@ -256,7 +256,7 @@ class LLMBundle: def encode(self, texts: list): if self.langfuse: - generation = self.trace.generation(name="encode", model=self.llm_name, input={"texts": texts}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode", model=self.llm_name, input={"texts": texts}) embeddings, used_tokens = self.mdl.encode(texts) llm_name = getattr(self, "llm_name", None) @@ -264,13 +264,14 @@ class LLMBundle: logging.error("LLMBundle.encode can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) if self.langfuse: - generation.end(usage_details={"total_tokens": used_tokens}) + generation.update(usage_details={"total_tokens": used_tokens}) + generation.end() return embeddings, used_tokens def encode_queries(self, query: str): if self.langfuse: - generation = self.trace.generation(name="encode_queries", model=self.llm_name, input={"query": query}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode_queries", model=self.llm_name, input={"query": query}) emd, used_tokens = self.mdl.encode_queries(query) llm_name = getattr(self, "llm_name", None) @@ -278,65 +279,70 @@ class LLMBundle: logging.error("LLMBundle.encode_queries can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) if self.langfuse: - generation.end(usage_details={"total_tokens": used_tokens}) + generation.update(usage_details={"total_tokens": used_tokens}) + generation.end() return emd, used_tokens def similarity(self, query: str, texts: list): if self.langfuse: - generation = self.trace.generation(name="similarity", model=self.llm_name, input={"query": query, "texts": texts}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="similarity", model=self.llm_name, input={"query": query, "texts": texts}) sim, used_tokens = self.mdl.similarity(query, texts) if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, used_tokens): logging.error("LLMBundle.similarity can't update token usage for {}/RERANK used_tokens: {}".format(self.tenant_id, used_tokens)) if self.langfuse: - generation.end(usage_details={"total_tokens": used_tokens}) + generation.update(usage_details={"total_tokens": used_tokens}) + generation.end() return sim, used_tokens def describe(self, image, max_tokens=300): if self.langfuse: - generation = self.trace.generation(name="describe", metadata={"model": self.llm_name}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="describe", metadata={"model": self.llm_name}) txt, used_tokens = self.mdl.describe(image) if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, used_tokens): logging.error("LLMBundle.describe can't update token usage for {}/IMAGE2TEXT used_tokens: {}".format(self.tenant_id, used_tokens)) if self.langfuse: - generation.end(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.update(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.end() return txt def describe_with_prompt(self, image, prompt): if self.langfuse: - generation = self.trace.generation(name="describe_with_prompt", metadata={"model": self.llm_name, "prompt": prompt}) + generation = self.language.start_generation(trace_context=self.trace_context, name="describe_with_prompt", metadata={"model": self.llm_name, "prompt": prompt}) txt, used_tokens = self.mdl.describe_with_prompt(image, prompt) if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, used_tokens): logging.error("LLMBundle.describe can't update token usage for {}/IMAGE2TEXT used_tokens: {}".format(self.tenant_id, used_tokens)) if self.langfuse: - generation.end(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.update(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.end() return txt def transcription(self, audio): if self.langfuse: - generation = self.trace.generation(name="transcription", metadata={"model": self.llm_name}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="transcription", metadata={"model": self.llm_name}) txt, used_tokens = self.mdl.transcription(audio) if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, used_tokens): logging.error("LLMBundle.transcription can't update token usage for {}/SEQUENCE2TXT used_tokens: {}".format(self.tenant_id, used_tokens)) if self.langfuse: - generation.end(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.update(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.end() return txt def tts(self, text: str) -> Generator[bytes, None, None]: if self.langfuse: - span = self.trace.span(name="tts", input={"text": text}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="tts", input={"text": text}) for chunk in self.mdl.tts(text): if isinstance(chunk, int): @@ -346,7 +352,7 @@ class LLMBundle: yield chunk if self.langfuse: - span.end() + generation.end() def _remove_reasoning_content(self, txt: str) -> str: first_think_start = txt.find("") @@ -362,9 +368,9 @@ class LLMBundle: return txt[last_think_end + len("") :] - def chat(self, system: str, history: list, gen_conf: dict={}, **kwargs) -> str: + def chat(self, system: str, history: list, gen_conf: dict = {}, **kwargs) -> str: if self.langfuse: - generation = self.trace.generation(name="chat", model=self.llm_name, input={"system": system, "history": history}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat", model=self.llm_name, input={"system": system, "history": history}) chat_partial = partial(self.mdl.chat, system, history, gen_conf) if self.is_tools and self.mdl.is_tools: @@ -380,13 +386,14 @@ class LLMBundle: logging.error("LLMBundle.chat can't update token usage for {}/CHAT llm_name: {}, used_tokens: {}".format(self.tenant_id, self.llm_name, used_tokens)) if self.langfuse: - generation.end(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.update(output={"output": txt}, usage_details={"total_tokens": used_tokens}) + generation.end() return txt - def chat_streamly(self, system: str, history: list, gen_conf: dict={}, **kwargs): + def chat_streamly(self, system: str, history: list, gen_conf: dict = {}, **kwargs): if self.langfuse: - generation = self.trace.generation(name="chat_streamly", model=self.llm_name, input={"system": system, "history": history}) + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat_streamly", model=self.llm_name, input={"system": system, "history": history}) ans = "" chat_partial = partial(self.mdl.chat_streamly, system, history, gen_conf) @@ -398,7 +405,8 @@ class LLMBundle: if isinstance(txt, int): total_tokens = txt if self.langfuse: - generation.end(output={"output": ans}) + generation.update(output={"output": ans}) + generation.end() break if txt.endswith(""):