From e3cfe8e8480ea60ac59176d8217183d2c5fcc499 Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 11 Dec 2025 13:54:47 +0800 Subject: [PATCH] Fix:async issue and sensitive logging (#11895) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? change: async issue and sensitive logging ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/apps/sdk/session.py | 2 +- common/http_client.py | 38 +++++++++++++++++++++++++++++++++++--- graphrag/search.py | 3 +-- graphrag/utils.py | 4 ++-- rag/utils/opendal_conn.py | 10 +++------- 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index 27760f1a8..55c290710 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -1170,7 +1170,7 @@ async def mindmap(): search_id = req.get("search_id", "") search_app = SearchService.get_detail(search_id) if search_id else {} - mind_map = gen_mindmap(req["question"], req["kb_ids"], tenant_id, search_app.get("search_config", {})) + mind_map =await gen_mindmap(req["question"], req["kb_ids"], tenant_id, search_app.get("search_config", {})) if "error" in mind_map: return server_error_response(Exception(mind_map["error"])) return get_json_result(data=mind_map) diff --git a/common/http_client.py b/common/http_client.py index 5c57f8638..5c633d78d 100644 --- a/common/http_client.py +++ b/common/http_client.py @@ -18,6 +18,7 @@ import time from typing import Any, Dict, Optional from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse +from common import settings import httpx logger = logging.getLogger(__name__) @@ -73,6 +74,34 @@ def _redact_sensitive_url_params(url: str) -> str: except Exception: return url +def _is_sensitive_url(url: str) -> bool: + """Return True if URL is one of the configured OAuth endpoints.""" + # Collect known sensitive endpoint URLs from settings + oauth_urls = set() + # GitHub OAuth endpoints + try: + if settings.GITHUB_OAUTH is not None: + url_val = settings.GITHUB_OAUTH.get("url") + if url_val: + oauth_urls.add(url_val) + except Exception: + pass + # Feishu OAuth endpoints + try: + if settings.FEISHU_OAUTH is not None: + for k in ("app_access_token_url", "user_access_token_url"): + url_val = settings.FEISHU_OAUTH.get(k) + if url_val: + oauth_urls.add(url_val) + except Exception: + pass + # Defensive normalization: compare only scheme+netloc+path + url_obj = urlparse(url) + for sensitive_url in oauth_urls: + sensitive_obj = urlparse(sensitive_url) + if (url_obj.scheme, url_obj.netloc, url_obj.path) == (sensitive_obj.scheme, sensitive_obj.netloc, sensitive_obj.path): + return True + return False async def async_request( method: str, @@ -115,20 +144,23 @@ async def async_request( method=method, url=url, headers=headers, **kwargs ) duration = time.monotonic() - start + log_url = "" if _is_sensitive_url else _redact_sensitive_url_params(url) logger.debug( - f"async_request {method} {_redact_sensitive_url_params(url)} -> {response.status_code} in {duration:.3f}s" + f"async_request {method} {log_url} -> {response.status_code} in {duration:.3f}s" ) return response except httpx.RequestError as exc: last_exc = exc if attempt >= retries: + log_url = "" if _is_sensitive_url else _redact_sensitive_url_params(url) logger.warning( - f"async_request exhausted retries for {method} {_redact_sensitive_url_params(url)}: {exc}" + f"async_request exhausted retries for {method} {log_url}" ) raise delay = _get_delay(backoff_factor, attempt) + log_url = "" if _is_sensitive_url else _redact_sensitive_url_params(url) logger.warning( - f"async_request attempt {attempt + 1}/{retries + 1} failed for {method} {_redact_sensitive_url_params(url)}: {exc}; retrying in {delay:.2f}s" + f"async_request attempt {attempt + 1}/{retries + 1} failed for {method} {log_url}; retrying in {delay:.2f}s" ) await asyncio.sleep(delay) raise last_exc # pragma: no cover diff --git a/graphrag/search.py b/graphrag/search.py index 7399ea393..860c58906 100644 --- a/graphrag/search.py +++ b/graphrag/search.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import asyncio import json import logging from collections import defaultdict @@ -44,7 +43,7 @@ class KGSearch(Dealer): return response def query_rewrite(self, llm, question, idxnms, kb_ids): - ty2ents = asyncio.run(get_entity_type2samples(idxnms, kb_ids)) + ty2ents = get_entity_type2samples(idxnms, kb_ids) hint_prompt = PROMPTS["minirag_query2kwd"].format(query=question, TYPE_POOL=json.dumps(ty2ents, ensure_ascii=False, indent=2)) result = self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {}) diff --git a/graphrag/utils.py b/graphrag/utils.py index 9b3dc2c2b..7e3fec1a9 100644 --- a/graphrag/utils.py +++ b/graphrag/utils.py @@ -626,8 +626,8 @@ def merge_tuples(list1, list2): return result -async def get_entity_type2samples(idxnms, kb_ids: list): - es_res = await asyncio.to_thread(settings.retriever.search,{"knowledge_graph_kwd": "ty2ents", "kb_id": kb_ids, "size": 10000, "fields": ["content_with_weight"]},idxnms,kb_ids) +def get_entity_type2samples(idxnms, kb_ids: list): + es_res = settings.retriever.search({"knowledge_graph_kwd": "ty2ents", "kb_id": kb_ids, "size": 10000, "fields": ["content_with_weight"]},idxnms,kb_ids) res = defaultdict(list) for id in es_res.ids: diff --git a/rag/utils/opendal_conn.py b/rag/utils/opendal_conn.py index a260daebc..1f52f6f63 100644 --- a/rag/utils/opendal_conn.py +++ b/rag/utils/opendal_conn.py @@ -41,13 +41,9 @@ def get_opendal_config(): scheme = opendal_config.get("scheme") config_data = opendal_config.get("config", {}) kwargs = {"scheme": scheme, **config_data} - redacted_kwargs = kwargs.copy() - if 'password' in redacted_kwargs: - redacted_kwargs['password'] = '***REDACTED***' - if 'connection_string' in redacted_kwargs and 'password' in redacted_kwargs: - import re - redacted_kwargs['connection_string'] = re.sub(r':[^@]+@', ':***REDACTED***@', redacted_kwargs['connection_string']) - logging.info("Loaded OpenDAL configuration from yaml: %s", redacted_kwargs) + safe_log_keys=['scheme', 'host', 'port', 'database', 'table'] + loggable_kwargs = {k: v for k, v in kwargs.items() if k in safe_log_keys} + logging.info("Loaded OpenDAL configuration(non sensitive): %s", loggable_kwargs) return kwargs except Exception as e: logging.error("Failed to load OpenDAL configuration from yaml: %s", str(e))