diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7e25d99e6..74aeeee9f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -336,9 +336,13 @@ jobs: - name: Collect ragflow log if: ${{ !cancelled() }} run: | - cp -r docker/ragflow-logs ${ARTIFACTS_DIR}/ragflow-logs-es - echo "ragflow log" && tail -n 200 docker/ragflow-logs/ragflow_server.log - sudo rm -rf docker/ragflow-logs + if [ -d docker/ragflow-logs ]; then + cp -r docker/ragflow-logs ${ARTIFACTS_DIR}/ragflow-logs-es + echo "ragflow log" && tail -n 200 docker/ragflow-logs/ragflow_server.log || true + else + echo "No docker/ragflow-logs directory found; skipping log collection" + fi + sudo rm -rf docker/ragflow-logs || true - name: Stop ragflow:nightly if: always() # always run this step even if previous steps failed @@ -482,9 +486,13 @@ jobs: - name: Collect ragflow log if: ${{ !cancelled() }} run: | - cp -r docker/ragflow-logs ${ARTIFACTS_DIR}/ragflow-logs-infinity - echo "ragflow log" && tail -n 200 docker/ragflow-logs/ragflow_server.log - sudo rm -rf docker/ragflow-logs + if [ -d docker/ragflow-logs ]; then + cp -r docker/ragflow-logs ${ARTIFACTS_DIR}/ragflow-logs-infinity + echo "ragflow log" && tail -n 200 docker/ragflow-logs/ragflow_server.log || true + else + echo "No docker/ragflow-logs directory found; skipping log collection" + fi + sudo rm -rf docker/ragflow-logs || true - name: Stop ragflow:nightly if: always() # always run this step even if previous steps failed run: | diff --git a/api/apps/system_app.py b/api/apps/system_app.py index 8b8cee0b1..b15054490 100644 --- a/api/apps/system_app.py +++ b/api/apps/system_app.py @@ -35,7 +35,7 @@ from timeit import default_timer as timer from rag.utils.redis_conn import REDIS_CONN from quart import jsonify -from api.utils.health_utils import run_health_checks +from api.utils.health_utils import run_health_checks, get_oceanbase_status from common import settings @@ -182,6 +182,42 @@ async def ping(): return "pong", 200 +@manager.route("/oceanbase/status", methods=["GET"]) # noqa: F821 +@login_required +def oceanbase_status(): + """ + Get OceanBase health status and performance metrics. + --- + tags: + - System + security: + - ApiKeyAuth: [] + responses: + 200: + description: OceanBase status retrieved successfully. + schema: + type: object + properties: + status: + type: string + description: Status (alive/timeout). + message: + type: object + description: Detailed status information including health and performance metrics. + """ + try: + status_info = get_oceanbase_status() + return get_json_result(data=status_info) + except Exception as e: + return get_json_result( + data={ + "status": "error", + "message": f"Failed to get OceanBase status: {str(e)}" + }, + code=500 + ) + + @manager.route("/new_token", methods=["POST"]) # noqa: F821 @login_required def new_token(): diff --git a/api/utils/health_utils.py b/api/utils/health_utils.py index 0a7ab6e7a..7456ed0f8 100644 --- a/api/utils/health_utils.py +++ b/api/utils/health_utils.py @@ -23,6 +23,7 @@ from api.db.db_models import DB from rag.utils.redis_conn import REDIS_CONN from rag.utils.es_conn import ESConnection from rag.utils.infinity_conn import InfinityConnection +from rag.utils.ob_conn import OBConnection from common import settings @@ -100,6 +101,121 @@ def get_infinity_status(): } +def get_oceanbase_status(): + """ + Get OceanBase health status and performance metrics. + + Returns: + dict: OceanBase status with health information and performance metrics + """ + doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch') + if doc_engine != 'oceanbase': + raise Exception("OceanBase is not in use.") + try: + ob_conn = OBConnection() + health_info = ob_conn.health() + performance_metrics = ob_conn.get_performance_metrics() + + # Combine health and performance metrics + status = "alive" if health_info.get("status") == "healthy" else "timeout" + + return { + "status": status, + "message": { + "health": health_info, + "performance": performance_metrics + } + } + except Exception as e: + return { + "status": "timeout", + "message": f"error: {str(e)}", + } + + +def check_oceanbase_health() -> dict: + """ + Check OceanBase health status with comprehensive metrics. + + This function provides detailed health information including: + - Connection status + - Query latency + - Storage usage + - Query throughput (QPS) + - Slow query statistics + - Connection pool statistics + + Returns: + dict: Health status with detailed metrics + """ + doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch') + if doc_engine != 'oceanbase': + return { + "status": "not_configured", + "details": { + "connection": "not_configured", + "message": "OceanBase is not configured as the document engine" + } + } + + try: + ob_conn = OBConnection() + health_info = ob_conn.health() + performance_metrics = ob_conn.get_performance_metrics() + + # Determine overall health status + connection_status = performance_metrics.get("connection", "unknown") + + # If connection is disconnected, return unhealthy + if connection_status == "disconnected" or health_info.get("status") != "healthy": + return { + "status": "unhealthy", + "details": { + "connection": connection_status, + "latency_ms": performance_metrics.get("latency_ms", 0), + "storage_used": performance_metrics.get("storage_used", "N/A"), + "storage_total": performance_metrics.get("storage_total", "N/A"), + "query_per_second": performance_metrics.get("query_per_second", 0), + "slow_queries": performance_metrics.get("slow_queries", 0), + "active_connections": performance_metrics.get("active_connections", 0), + "max_connections": performance_metrics.get("max_connections", 0), + "uri": health_info.get("uri", "unknown"), + "version": health_info.get("version_comment", "unknown"), + "error": health_info.get("error", performance_metrics.get("error")) + } + } + + # Check if healthy (connected and low latency) + is_healthy = ( + connection_status == "connected" and + performance_metrics.get("latency_ms", float('inf')) < 1000 # Latency under 1 second + ) + + return { + "status": "healthy" if is_healthy else "degraded", + "details": { + "connection": performance_metrics.get("connection", "unknown"), + "latency_ms": performance_metrics.get("latency_ms", 0), + "storage_used": performance_metrics.get("storage_used", "N/A"), + "storage_total": performance_metrics.get("storage_total", "N/A"), + "query_per_second": performance_metrics.get("query_per_second", 0), + "slow_queries": performance_metrics.get("slow_queries", 0), + "active_connections": performance_metrics.get("active_connections", 0), + "max_connections": performance_metrics.get("max_connections", 0), + "uri": health_info.get("uri", "unknown"), + "version": health_info.get("version_comment", "unknown") + } + } + except Exception as e: + return { + "status": "unhealthy", + "details": { + "connection": "disconnected", + "error": str(e) + } + } + + def get_mysql_status(): try: cursor = DB.execute_sql("SHOW PROCESSLIST;") diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py index 8a3ef49d4..b7c0ead58 100644 --- a/rag/utils/ob_conn.py +++ b/rag/utils/ob_conn.py @@ -511,10 +511,201 @@ class OBConnection(DocStoreConnection): return "oceanbase" def health(self) -> dict: - return { - "uri": self.uri, - "version_comment": self._get_variable_value("version_comment") + """ + Check OceanBase health status with basic connection information. + + Returns: + dict: Health status with URI and version information + """ + try: + return { + "uri": self.uri, + "version_comment": self._get_variable_value("version_comment"), + "status": "healthy", + "connection": "connected" + } + except Exception as e: + return { + "uri": self.uri, + "status": "unhealthy", + "connection": "disconnected", + "error": str(e) + } + + def get_performance_metrics(self) -> dict: + """ + Get comprehensive performance metrics for OceanBase. + + Returns: + dict: Performance metrics including latency, storage, QPS, and slow queries + """ + metrics = { + "connection": "connected", + "latency_ms": 0.0, + "storage_used": "0B", + "storage_total": "0B", + "query_per_second": 0, + "slow_queries": 0, + "active_connections": 0, + "max_connections": 0 } + + try: + # Measure connection latency + import time + start_time = time.time() + self.client.perform_raw_text_sql("SELECT 1").fetchone() + metrics["latency_ms"] = round((time.time() - start_time) * 1000, 2) + + # Get storage information + try: + storage_info = self._get_storage_info() + metrics.update(storage_info) + except Exception as e: + logger.warning(f"Failed to get storage info: {str(e)}") + + # Get connection pool statistics + try: + pool_stats = self._get_connection_pool_stats() + metrics.update(pool_stats) + except Exception as e: + logger.warning(f"Failed to get connection pool stats: {str(e)}") + + # Get slow query statistics + try: + slow_queries = self._get_slow_query_count() + metrics["slow_queries"] = slow_queries + except Exception as e: + logger.warning(f"Failed to get slow query count: {str(e)}") + + # Get QPS (Queries Per Second) - approximate from processlist + try: + qps = self._estimate_qps() + metrics["query_per_second"] = qps + except Exception as e: + logger.warning(f"Failed to estimate QPS: {str(e)}") + + except Exception as e: + metrics["connection"] = "disconnected" + metrics["error"] = str(e) + logger.error(f"Failed to get OceanBase performance metrics: {str(e)}") + + return metrics + + def _get_storage_info(self) -> dict: + """ + Get storage space usage information. + + Returns: + dict: Storage information with used and total space + """ + try: + # Get database size + result = self.client.perform_raw_text_sql( + f"SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'size_mb' " + f"FROM information_schema.tables WHERE table_schema = '{self.db_name}'" + ).fetchone() + + size_mb = float(result[0]) if result and result[0] else 0.0 + + # Try to get total available space (may not be available in all OceanBase versions) + try: + result = self.client.perform_raw_text_sql( + "SELECT ROUND(SUM(total_size) / 1024 / 1024 / 1024, 2) AS 'total_gb' " + "FROM oceanbase.__all_disk_stat" + ).fetchone() + total_gb = float(result[0]) if result and result[0] else None + except Exception: + # Fallback: estimate total space (100GB default if not available) + total_gb = 100.0 + + return { + "storage_used": f"{size_mb:.2f}MB", + "storage_total": f"{total_gb:.2f}GB" if total_gb else "N/A" + } + except Exception as e: + logger.warning(f"Failed to get storage info: {str(e)}") + return { + "storage_used": "N/A", + "storage_total": "N/A" + } + + def _get_connection_pool_stats(self) -> dict: + """ + Get connection pool statistics. + + Returns: + dict: Connection pool statistics + """ + try: + # Get active connections from processlist + result = self.client.perform_raw_text_sql("SHOW PROCESSLIST") + active_connections = len(list(result.fetchall())) + + # Get max_connections setting + max_conn_result = self.client.perform_raw_text_sql( + "SHOW VARIABLES LIKE 'max_connections'" + ).fetchone() + max_connections = int(max_conn_result[1]) if max_conn_result and max_conn_result[1] else 0 + + # Get pool size from client if available + pool_size = getattr(self.client, 'pool_size', None) or 0 + + return { + "active_connections": active_connections, + "max_connections": max_connections if max_connections > 0 else pool_size, + "pool_size": pool_size + } + except Exception as e: + logger.warning(f"Failed to get connection pool stats: {str(e)}") + return { + "active_connections": 0, + "max_connections": 0, + "pool_size": 0 + } + + def _get_slow_query_count(self, threshold_seconds: int = 1) -> int: + """ + Get count of slow queries (queries taking longer than threshold). + + Args: + threshold_seconds: Threshold in seconds for slow queries (default: 1) + + Returns: + int: Number of slow queries + """ + try: + result = self.client.perform_raw_text_sql( + f"SELECT COUNT(*) FROM information_schema.processlist " + f"WHERE time > {threshold_seconds} AND command != 'Sleep'" + ).fetchone() + return int(result[0]) if result and result[0] else 0 + except Exception as e: + logger.warning(f"Failed to get slow query count: {str(e)}") + return 0 + + def _estimate_qps(self) -> int: + """ + Estimate queries per second from processlist. + + Returns: + int: Estimated queries per second + """ + try: + # Count active queries (non-Sleep commands) + result = self.client.perform_raw_text_sql( + "SELECT COUNT(*) FROM information_schema.processlist WHERE command != 'Sleep'" + ).fetchone() + active_queries = int(result[0]) if result and result[0] else 0 + + # Rough estimate: assume average query takes 0.1 seconds + # This is a simplified estimation + estimated_qps = max(0, active_queries * 10) + + return estimated_qps + except Exception as e: + logger.warning(f"Failed to estimate QPS: {str(e)}") + return 0 def _get_variable_value(self, var_name: str) -> Any: rows = self.client.perform_raw_text_sql(f"SHOW VARIABLES LIKE '{var_name}'") diff --git a/test/unit_test/utils/test_oceanbase_health.py b/test/unit_test/utils/test_oceanbase_health.py new file mode 100644 index 000000000..fa6d24dd1 --- /dev/null +++ b/test/unit_test/utils/test_oceanbase_health.py @@ -0,0 +1,412 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Unit tests for OceanBase health check and performance monitoring functionality. +""" +import inspect +import os +import types +import pytest +from unittest.mock import Mock, patch + +from api.utils.health_utils import get_oceanbase_status, check_oceanbase_health + + +class TestOceanBaseHealthCheck: + """Test cases for OceanBase health check functionality.""" + + @patch('api.utils.health_utils.OBConnection') + @patch.dict(os.environ, {'DOC_ENGINE': 'oceanbase'}) + def test_get_oceanbase_status_success(self, mock_ob_class): + """Test successful OceanBase status retrieval.""" + # Setup mock + mock_ob_connection = Mock() + mock_ob_connection.uri = "localhost:2881" + mock_ob_connection.health.return_value = { + "uri": "localhost:2881", + "version_comment": "OceanBase 4.3.5.1", + "status": "healthy", + "connection": "connected" + } + mock_ob_connection.get_performance_metrics.return_value = { + "connection": "connected", + "latency_ms": 5.2, + "storage_used": "1.2MB", + "storage_total": "100GB", + "query_per_second": 150, + "slow_queries": 2, + "active_connections": 10, + "max_connections": 300 + } + mock_ob_class.return_value = mock_ob_connection + + # Execute + result = get_oceanbase_status() + + # Assert + assert result["status"] == "alive" + assert "message" in result + assert "health" in result["message"] + assert "performance" in result["message"] + assert result["message"]["health"]["status"] == "healthy" + assert result["message"]["performance"]["latency_ms"] == 5.2 + + @patch.dict(os.environ, {'DOC_ENGINE': 'elasticsearch'}) + def test_get_oceanbase_status_not_configured(self): + """Test OceanBase status when not configured.""" + with pytest.raises(Exception) as exc_info: + get_oceanbase_status() + assert "OceanBase is not in use" in str(exc_info.value) + + @patch('api.utils.health_utils.OBConnection') + @patch.dict(os.environ, {'DOC_ENGINE': 'oceanbase'}) + def test_get_oceanbase_status_connection_error(self, mock_ob_class): + """Test OceanBase status when connection fails.""" + mock_ob_class.side_effect = Exception("Connection failed") + + result = get_oceanbase_status() + + assert result["status"] == "timeout" + assert "error" in result["message"] + + @patch('api.utils.health_utils.OBConnection') + @patch.dict(os.environ, {'DOC_ENGINE': 'oceanbase'}) + def test_check_oceanbase_health_healthy(self, mock_ob_class): + """Test OceanBase health check returns healthy status.""" + mock_ob_connection = Mock() + mock_ob_connection.health.return_value = { + "uri": "localhost:2881", + "version_comment": "OceanBase 4.3.5.1", + "status": "healthy", + "connection": "connected" + } + mock_ob_connection.get_performance_metrics.return_value = { + "connection": "connected", + "latency_ms": 5.2, + "storage_used": "1.2MB", + "storage_total": "100GB", + "query_per_second": 150, + "slow_queries": 0, + "active_connections": 10, + "max_connections": 300 + } + mock_ob_class.return_value = mock_ob_connection + + result = check_oceanbase_health() + + assert result["status"] == "healthy" + assert result["details"]["connection"] == "connected" + assert result["details"]["latency_ms"] == 5.2 + assert result["details"]["query_per_second"] == 150 + + @patch('api.utils.health_utils.OBConnection') + @patch.dict(os.environ, {'DOC_ENGINE': 'oceanbase'}) + def test_check_oceanbase_health_degraded(self, mock_ob_class): + """Test OceanBase health check returns degraded status for high latency.""" + mock_ob_connection = Mock() + mock_ob_connection.health.return_value = { + "uri": "localhost:2881", + "version_comment": "OceanBase 4.3.5.1", + "status": "healthy", + "connection": "connected" + } + mock_ob_connection.get_performance_metrics.return_value = { + "connection": "connected", + "latency_ms": 1500.0, # High latency > 1000ms + "storage_used": "1.2MB", + "storage_total": "100GB", + "query_per_second": 50, + "slow_queries": 5, + "active_connections": 10, + "max_connections": 300 + } + mock_ob_class.return_value = mock_ob_connection + + result = check_oceanbase_health() + + assert result["status"] == "degraded" + assert result["details"]["latency_ms"] == 1500.0 + + @patch('api.utils.health_utils.OBConnection') + @patch.dict(os.environ, {'DOC_ENGINE': 'oceanbase'}) + def test_check_oceanbase_health_unhealthy(self, mock_ob_class): + """Test OceanBase health check returns unhealthy status.""" + mock_ob_connection = Mock() + mock_ob_connection.health.return_value = { + "uri": "localhost:2881", + "status": "unhealthy", + "connection": "disconnected", + "error": "Connection timeout" + } + mock_ob_connection.get_performance_metrics.return_value = { + "connection": "disconnected", + "error": "Connection timeout" + } + mock_ob_class.return_value = mock_ob_connection + + result = check_oceanbase_health() + + assert result["status"] == "unhealthy" + assert result["details"]["connection"] == "disconnected" + assert "error" in result["details"] + + @patch.dict(os.environ, {'DOC_ENGINE': 'elasticsearch'}) + def test_check_oceanbase_health_not_configured(self): + """Test OceanBase health check when not configured.""" + result = check_oceanbase_health() + + assert result["status"] == "not_configured" + assert result["details"]["connection"] == "not_configured" + assert "not configured" in result["details"]["message"].lower() + + +class TestOBConnectionPerformanceMetrics: + """Test cases for OBConnection performance metrics methods.""" + + def _create_mock_connection(self): + """Create a mock OBConnection with actual methods.""" + # Create a simple object and bind the real methods to it + class MockConn: + pass + conn = MockConn() + # Get the actual class from the singleton wrapper's closure + from rag.utils import ob_conn + # OBConnection is wrapped by @singleton decorator, so it's a function + # The original class is stored in the closure of the singleton function + # Find the class by checking all closure cells + ob_connection_class = None + if hasattr(ob_conn.OBConnection, '__closure__') and ob_conn.OBConnection.__closure__: + for cell in ob_conn.OBConnection.__closure__: + cell_value = cell.cell_contents + if inspect.isclass(cell_value): + ob_connection_class = cell_value + break + + if ob_connection_class is None: + raise ValueError("Could not find OBConnection class in closure") + + # Bind the actual methods to our mock object + conn.get_performance_metrics = types.MethodType(ob_connection_class.get_performance_metrics, conn) + conn._get_storage_info = types.MethodType(ob_connection_class._get_storage_info, conn) + conn._get_connection_pool_stats = types.MethodType(ob_connection_class._get_connection_pool_stats, conn) + conn._get_slow_query_count = types.MethodType(ob_connection_class._get_slow_query_count, conn) + conn._estimate_qps = types.MethodType(ob_connection_class._estimate_qps, conn) + return conn + + def test_get_performance_metrics_success(self): + """Test successful retrieval of performance metrics.""" + # Create mock connection with actual methods + conn = self._create_mock_connection() + mock_client = Mock() + conn.client = mock_client + conn.uri = "localhost:2881" + conn.db_name = "test" + + # Mock client methods - create separate mock results for each call + mock_result1 = Mock() + mock_result1.fetchone.return_value = (1,) + + mock_result2 = Mock() + mock_result2.fetchone.return_value = (100.5,) + + mock_result3 = Mock() + mock_result3.fetchone.return_value = (100.0,) + + mock_result4 = Mock() + mock_result4.fetchall.return_value = [ + (1, 'user', 'host', 'db', 'Query', 0, 'executing', 'SELECT 1') + ] + mock_result4.fetchone.return_value = ('max_connections', '300') + + mock_result5 = Mock() + mock_result5.fetchone.return_value = (0,) + + mock_result6 = Mock() + mock_result6.fetchone.return_value = (5,) + + # Setup side_effect to return different mocks for different queries + def sql_side_effect(query): + if "SELECT 1" in query: + return mock_result1 + elif "information_schema.tables" in query: + return mock_result2 + elif "__all_disk_stat" in query: + return mock_result3 + elif "SHOW PROCESSLIST" in query: + return mock_result4 + elif "SHOW VARIABLES LIKE 'max_connections'" in query: + return mock_result4 + elif "information_schema.processlist" in query and "time >" in query: + return mock_result5 + elif "information_schema.processlist" in query and "COUNT" in query: + return mock_result6 + return Mock() + + mock_client.perform_raw_text_sql.side_effect = sql_side_effect + mock_client.pool_size = 300 + + # Mock logger + import logging + conn.logger = logging.getLogger('test') + + result = conn.get_performance_metrics() + + assert result["connection"] == "connected" + assert result["latency_ms"] >= 0 + assert "storage_used" in result + assert "storage_total" in result + + def test_get_performance_metrics_connection_error(self): + """Test performance metrics when connection fails.""" + # Create mock connection with actual methods + conn = self._create_mock_connection() + mock_client = Mock() + conn.client = mock_client + conn.uri = "localhost:2881" + conn.logger = Mock() + + mock_client.perform_raw_text_sql.side_effect = Exception("Connection failed") + + result = conn.get_performance_metrics() + + assert result["connection"] == "disconnected" + assert "error" in result + + def test_get_storage_info_success(self): + """Test successful retrieval of storage information.""" + # Create mock connection with actual methods + conn = self._create_mock_connection() + mock_client = Mock() + conn.client = mock_client + conn.db_name = "test" + conn.logger = Mock() + + mock_result1 = Mock() + mock_result1.fetchone.return_value = (100.5,) + mock_result2 = Mock() + mock_result2.fetchone.return_value = (100.0,) + + def sql_side_effect(query): + if "information_schema.tables" in query: + return mock_result1 + elif "__all_disk_stat" in query: + return mock_result2 + return Mock() + + mock_client.perform_raw_text_sql.side_effect = sql_side_effect + + result = conn._get_storage_info() + + assert "storage_used" in result + assert "storage_total" in result + assert "MB" in result["storage_used"] + + def test_get_storage_info_fallback(self): + """Test storage info with fallback when total space unavailable.""" + # Create mock connection with actual methods + conn = self._create_mock_connection() + mock_client = Mock() + conn.client = mock_client + conn.db_name = "test" + conn.logger = Mock() + + # First query succeeds, second fails + def side_effect(query): + if "information_schema.tables" in query: + mock_result = Mock() + mock_result.fetchone.return_value = (100.5,) + return mock_result + else: + raise Exception("Table not found") + + mock_client.perform_raw_text_sql.side_effect = side_effect + + result = conn._get_storage_info() + + assert "storage_used" in result + assert "storage_total" in result + + def test_get_connection_pool_stats(self): + """Test retrieval of connection pool statistics.""" + # Create mock connection with actual methods + conn = self._create_mock_connection() + mock_client = Mock() + conn.client = mock_client + conn.logger = Mock() + mock_client.pool_size = 300 + + mock_result1 = Mock() + mock_result1.fetchall.return_value = [ + (1, 'user', 'host', 'db', 'Query', 0, 'executing', 'SELECT 1'), + (2, 'user', 'host', 'db', 'Sleep', 10, None, None) + ] + + mock_result2 = Mock() + mock_result2.fetchone.return_value = ('max_connections', '300') + + def sql_side_effect(query): + if "SHOW PROCESSLIST" in query: + return mock_result1 + elif "SHOW VARIABLES LIKE 'max_connections'" in query: + return mock_result2 + return Mock() + + mock_client.perform_raw_text_sql.side_effect = sql_side_effect + + result = conn._get_connection_pool_stats() + + assert "active_connections" in result + assert "max_connections" in result + assert result["active_connections"] >= 0 + + def test_get_slow_query_count(self): + """Test retrieval of slow query count.""" + # Create mock connection with actual methods + conn = self._create_mock_connection() + mock_client = Mock() + conn.client = mock_client + conn.logger = Mock() + + mock_result = Mock() + mock_result.fetchone.return_value = (5,) + mock_client.perform_raw_text_sql.return_value = mock_result + + result = conn._get_slow_query_count(threshold_seconds=1) + + assert isinstance(result, int) + assert result >= 0 + + def test_estimate_qps(self): + """Test QPS estimation.""" + # Create mock connection with actual methods + conn = self._create_mock_connection() + mock_client = Mock() + conn.client = mock_client + conn.logger = Mock() + + mock_result = Mock() + mock_result.fetchone.return_value = (10,) + mock_client.perform_raw_text_sql.return_value = mock_result + + result = conn._estimate_qps() + + assert isinstance(result, int) + assert result >= 0 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) +