From b2ec9356be97e10e72984fc4c088f97c85d22f9a Mon Sep 17 00:00:00 2001 From: tangyi Date: Wed, 26 Mar 2025 10:44:16 +0800 Subject: [PATCH] mysql mcp --- .env.example | 10 +++++ .gitignore | 11 +++++ requirements.txt | 3 ++ src/db/mysql_operations.py | 91 ++++++++++++++++++++++++++++++++++++++ src/server.py | 65 +++++++++++++++++++++++++++ src/tools/mysql_tool.py | 47 ++++++++++++++++++++ 6 files changed, 227 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 requirements.txt create mode 100644 src/db/mysql_operations.py create mode 100644 src/server.py create mode 100644 src/tools/mysql_tool.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..941483d --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +# MySQL数据库连接配置 +MYSQL_HOST=127.0.0.1 +MYSQL_PORT=3306 +MYSQL_USER=root +MYSQL_PASSWORD=root +MYSQL_DATABASE=test + +# 服务器配置 +PORT=3000 +HOST=127.0.0.1 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5cf2ff6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +__MACOSX +.DS_Store +*.code-workspace +.vscode +node_modules/ +.history +vendor/ +.idea +.cursorrules +.cursor +**/__pycache__/ \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9b3746c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +mcp>=0.1.0 +mysql-connector-python==8.0.33 +python-dotenv>=0.19.0 \ No newline at end of file diff --git a/src/db/mysql_operations.py b/src/db/mysql_operations.py new file mode 100644 index 0000000..a3ea89f --- /dev/null +++ b/src/db/mysql_operations.py @@ -0,0 +1,91 @@ +import os +import logging +import mysql.connector +from mysql.connector import Error +from contextlib import contextmanager +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("mysql_server") + +def get_db_config(): + """动态获取数据库配置""" + return { + 'host': os.getenv('MYSQL_HOST', 'localhost'), + 'user': os.getenv('MYSQL_USER', 'root'), + 'password': os.getenv('MYSQL_PASSWORD', ''), + 'database': os.getenv('MYSQL_DATABASE', ''), + 'port': int(os.getenv('MYSQL_PORT', '3306')), + 'connection_timeout': 5 + } + +@contextmanager +def get_db_connection(): + """ + 创建数据库连接的上下文管理器 + + Yields: + mysql.connector.connection.MySQLConnection: 数据库连接对象 + """ + connection = None + try: + db_config = get_db_config() + if not db_config['database']: + raise ValueError("数据库名称未设置,请检查环境变量MYSQL_DATABASE") + + connection = mysql.connector.connect(**db_config) + yield connection + except mysql.connector.Error as err: + error_msg = str(err) + logger.error(f"数据库连接失败: {error_msg}") + + if "Access denied" in error_msg: + raise ValueError("访问被拒绝,请检查用户名和密码") + elif "Unknown database" in error_msg: + db_config = get_db_config() + raise ValueError(f"数据库'{db_config['database']}'不存在") + elif "Can't connect" in error_msg or "Connection refused" in error_msg: + raise ConnectionError("无法连接到MySQL服务器,请检查服务是否启动") + elif "Authentication plugin" in error_msg: + raise ValueError(f"认证插件问题: {error_msg},请尝试修改用户认证方式为mysql_native_password") + else: + raise ConnectionError(f"数据库连接失败: {error_msg}") + finally: + if connection and connection.is_connected(): + connection.close() + logger.debug("数据库连接已关闭") + +def execute_query(connection, query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: + """ + 在给定的数据库连接上执行查询 + + Args: + connection: 数据库连接 + query: SQL查询语句 + params: 查询参数 (可选) + + Returns: + 查询结果列表 + """ + cursor = None + try: + cursor = connection.cursor(dictionary=True) + + # 执行查询 + if params: + cursor.execute(query, params) + else: + cursor.execute(query) + + # 获取结果 + results = cursor.fetchall() + logger.debug(f"查询返回 {len(results)} 条结果") + return results + + except mysql.connector.Error as query_err: + logger.error(f"查询执行失败: {str(query_err)}") + raise ValueError(f"查询执行失败: {str(query_err)}") + finally: + # 确保游标正确关闭 + if cursor: + cursor.close() + logger.debug("数据库游标已关闭") \ No newline at end of file diff --git a/src/server.py b/src/server.py new file mode 100644 index 0000000..13df382 --- /dev/null +++ b/src/server.py @@ -0,0 +1,65 @@ +from mcp.server.fastmcp import FastMCP +import os +import logging +from dotenv import load_dotenv +from src.tools.mysql_tool import register_mysql_tool + +# 配置日志 +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("mysql_server") + +# 尝试导入MySQL连接器 +try: + import mysql.connector + logger.debug("MySQL连接器导入成功") + mysql_available = True +except ImportError as e: + logger.critical(f"无法导入MySQL连接器: {str(e)}") + logger.critical("请确保已安装mysql-connector-python包: pip install mysql-connector-python") + mysql_available = False + +# 加载环境变量 +load_dotenv() +logger.debug("已加载环境变量") + +# 从环境变量获取服务器配置 +host = os.getenv('HOST', '127.0.0.1') +port = int(os.getenv('PORT', '3000')) +logger.debug(f"服务器配置: host={host}, port={port}") + +# 创建MCP服务器实例 +logger.debug("正在创建MCP服务器实例...") +mcp = FastMCP("MySQL Query Server", "cccccccccc", host=host, port=port, debug=True, endpoint='/sse') +logger.debug("MCP服务器实例创建完成") + +# 注册MySQL工具 +register_mysql_tool(mcp) + +def start_server(): + """启动SSE服务器的同步包装器""" + logger.debug("开始启动MySQL查询服务器...") + + print(f"开始启动MySQL查询SSE服务器...") + print(f"服务器监听在 {host}:{port}/sse") + + try: + # 检查MySQL配置是否有效 + from src.db.mysql_operations import get_db_config + db_config = get_db_config() + if mysql_available and not db_config['database']: + logger.warning("未设置数据库名称,请检查环境变量MYSQL_DATABASE") + print("警告: 未设置数据库名称,请检查环境变量MYSQL_DATABASE") + + # 使用run_app函数启动服务器 + logger.debug("调用mcp.run('sse')启动服务器...") + mcp.run('sse') + except Exception as e: + logger.exception(f"服务器运行时发生错误: {str(e)}") + print(f"服务器运行时发生错误: {str(e)}") + +if __name__ == "__main__": + # 确保初始化后工具才被注册 + start_server() diff --git a/src/tools/mysql_tool.py b/src/tools/mysql_tool.py new file mode 100644 index 0000000..2d9310e --- /dev/null +++ b/src/tools/mysql_tool.py @@ -0,0 +1,47 @@ +import json +import logging +from typing import Any, Dict, Optional +from mcp.server.fastmcp import FastMCP +from src.db.mysql_operations import get_db_connection, execute_query +import mysql.connector + +logger = logging.getLogger("mysql_server") + +# 尝试导入MySQL连接器 +try: + mysql.connector + mysql_available = True +except ImportError: + mysql_available = False + +def register_mysql_tool(mcp: FastMCP): + """ + 注册MySQL查询工具到MCP服务器 + + Args: + mcp: FastMCP服务器实例 + """ + logger.debug("注册MySQL查询工具...") + + @mcp.tool() + def mysql_query(query: str, params: Optional[Dict[str, Any]] = None) -> str: + """ + 执行MySQL查询并返回结果 + + Args: + query: SQL查询语句 + params: 查询参数 (可选) + + Returns: + 查询结果的JSON字符串 + """ + logger.debug(f"执行MySQL查询: {query}, 参数: {params}") + + try: + with get_db_connection() as connection: + results = execute_query(connection, query, params) + return json.dumps(results, default=str) + + except Exception as e: + logger.error(f"执行查询时发生异常: {str(e)}") + return json.dumps({"error": str(e)}) \ No newline at end of file