Fix:async issue and sensitive logging (#11895)

### What problem does this PR solve?

change:
async issue and sensitive logging

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
buua436
2025-12-11 13:54:47 +08:00
committed by GitHub
parent c610bb605a
commit e3cfe8e848
5 changed files with 42 additions and 15 deletions

View File

@ -1170,7 +1170,7 @@ async def mindmap():
search_id = req.get("search_id", "") search_id = req.get("search_id", "")
search_app = SearchService.get_detail(search_id) if search_id else {} search_app = SearchService.get_detail(search_id) if search_id else {}
mind_map = gen_mindmap(req["question"], req["kb_ids"], tenant_id, search_app.get("search_config", {})) mind_map =await gen_mindmap(req["question"], req["kb_ids"], tenant_id, search_app.get("search_config", {}))
if "error" in mind_map: if "error" in mind_map:
return server_error_response(Exception(mind_map["error"])) return server_error_response(Exception(mind_map["error"]))
return get_json_result(data=mind_map) return get_json_result(data=mind_map)

View File

@ -18,6 +18,7 @@ import time
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from common import settings
import httpx import httpx
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -73,6 +74,34 @@ def _redact_sensitive_url_params(url: str) -> str:
except Exception: except Exception:
return url return url
def _is_sensitive_url(url: str) -> bool:
"""Return True if URL is one of the configured OAuth endpoints."""
# Collect known sensitive endpoint URLs from settings
oauth_urls = set()
# GitHub OAuth endpoints
try:
if settings.GITHUB_OAUTH is not None:
url_val = settings.GITHUB_OAUTH.get("url")
if url_val:
oauth_urls.add(url_val)
except Exception:
pass
# Feishu OAuth endpoints
try:
if settings.FEISHU_OAUTH is not None:
for k in ("app_access_token_url", "user_access_token_url"):
url_val = settings.FEISHU_OAUTH.get(k)
if url_val:
oauth_urls.add(url_val)
except Exception:
pass
# Defensive normalization: compare only scheme+netloc+path
url_obj = urlparse(url)
for sensitive_url in oauth_urls:
sensitive_obj = urlparse(sensitive_url)
if (url_obj.scheme, url_obj.netloc, url_obj.path) == (sensitive_obj.scheme, sensitive_obj.netloc, sensitive_obj.path):
return True
return False
async def async_request( async def async_request(
method: str, method: str,
@ -115,20 +144,23 @@ async def async_request(
method=method, url=url, headers=headers, **kwargs method=method, url=url, headers=headers, **kwargs
) )
duration = time.monotonic() - start duration = time.monotonic() - start
log_url = "<SENSITIVE ENDPOINT>" if _is_sensitive_url else _redact_sensitive_url_params(url)
logger.debug( logger.debug(
f"async_request {method} {_redact_sensitive_url_params(url)} -> {response.status_code} in {duration:.3f}s" f"async_request {method} {log_url} -> {response.status_code} in {duration:.3f}s"
) )
return response return response
except httpx.RequestError as exc: except httpx.RequestError as exc:
last_exc = exc last_exc = exc
if attempt >= retries: if attempt >= retries:
log_url = "<SENSITIVE ENDPOINT>" if _is_sensitive_url else _redact_sensitive_url_params(url)
logger.warning( logger.warning(
f"async_request exhausted retries for {method} {_redact_sensitive_url_params(url)}: {exc}" f"async_request exhausted retries for {method} {log_url}"
) )
raise raise
delay = _get_delay(backoff_factor, attempt) delay = _get_delay(backoff_factor, attempt)
log_url = "<SENSITIVE ENDPOINT>" if _is_sensitive_url else _redact_sensitive_url_params(url)
logger.warning( logger.warning(
f"async_request attempt {attempt + 1}/{retries + 1} failed for {method} {_redact_sensitive_url_params(url)}: {exc}; retrying in {delay:.2f}s" f"async_request attempt {attempt + 1}/{retries + 1} failed for {method} {log_url}; retrying in {delay:.2f}s"
) )
await asyncio.sleep(delay) await asyncio.sleep(delay)
raise last_exc # pragma: no cover raise last_exc # pragma: no cover

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
import asyncio
import json import json
import logging import logging
from collections import defaultdict from collections import defaultdict
@ -44,7 +43,7 @@ class KGSearch(Dealer):
return response return response
def query_rewrite(self, llm, question, idxnms, kb_ids): def query_rewrite(self, llm, question, idxnms, kb_ids):
ty2ents = asyncio.run(get_entity_type2samples(idxnms, kb_ids)) ty2ents = get_entity_type2samples(idxnms, kb_ids)
hint_prompt = PROMPTS["minirag_query2kwd"].format(query=question, hint_prompt = PROMPTS["minirag_query2kwd"].format(query=question,
TYPE_POOL=json.dumps(ty2ents, ensure_ascii=False, indent=2)) TYPE_POOL=json.dumps(ty2ents, ensure_ascii=False, indent=2))
result = self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {}) result = self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {})

View File

@ -626,8 +626,8 @@ def merge_tuples(list1, list2):
return result return result
async def get_entity_type2samples(idxnms, kb_ids: list): def get_entity_type2samples(idxnms, kb_ids: list):
es_res = await asyncio.to_thread(settings.retriever.search,{"knowledge_graph_kwd": "ty2ents", "kb_id": kb_ids, "size": 10000, "fields": ["content_with_weight"]},idxnms,kb_ids) es_res = settings.retriever.search({"knowledge_graph_kwd": "ty2ents", "kb_id": kb_ids, "size": 10000, "fields": ["content_with_weight"]},idxnms,kb_ids)
res = defaultdict(list) res = defaultdict(list)
for id in es_res.ids: for id in es_res.ids:

View File

@ -41,13 +41,9 @@ def get_opendal_config():
scheme = opendal_config.get("scheme") scheme = opendal_config.get("scheme")
config_data = opendal_config.get("config", {}) config_data = opendal_config.get("config", {})
kwargs = {"scheme": scheme, **config_data} kwargs = {"scheme": scheme, **config_data}
redacted_kwargs = kwargs.copy() safe_log_keys=['scheme', 'host', 'port', 'database', 'table']
if 'password' in redacted_kwargs: loggable_kwargs = {k: v for k, v in kwargs.items() if k in safe_log_keys}
redacted_kwargs['password'] = '***REDACTED***' logging.info("Loaded OpenDAL configuration(non sensitive): %s", loggable_kwargs)
if 'connection_string' in redacted_kwargs and 'password' in redacted_kwargs:
import re
redacted_kwargs['connection_string'] = re.sub(r':[^@]+@', ':***REDACTED***@', redacted_kwargs['connection_string'])
logging.info("Loaded OpenDAL configuration from yaml: %s", redacted_kwargs)
return kwargs return kwargs
except Exception as e: except Exception as e:
logging.error("Failed to load OpenDAL configuration from yaml: %s", str(e)) logging.error("Failed to load OpenDAL configuration from yaml: %s", str(e))