Fix component exesql (#2754)

### What problem does this PR solve?

#2700 

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
lidp
2024-10-09 17:53:36 +08:00
committed by GitHub
parent 8f4bd10b19
commit 8f815a6c1e

View File

@ -16,7 +16,8 @@
from abc import ABC from abc import ABC
import re import re
import pandas as pd import pandas as pd
from peewee import MySQLDatabase, PostgresqlDatabase import pymysql
import psycopg2
from agent.component.base import ComponentBase, ComponentParamBase from agent.component.base import ComponentBase, ComponentParamBase
@ -66,14 +67,14 @@ class ExeSQL(ComponentBase, ABC):
raise Exception("SQL statement not found!") raise Exception("SQL statement not found!")
if self._param.db_type in ["mysql", "mariadb"]: if self._param.db_type in ["mysql", "mariadb"]:
db = MySQLDatabase(self._param.database, user=self._param.username, host=self._param.host, db = pymysql.connect(db=self._param.database, user=self._param.username, host=self._param.host,
port=self._param.port, password=self._param.password) port=self._param.port, password=self._param.password)
elif self._param.db_type == 'postgresql': elif self._param.db_type == 'postgresql':
db = PostgresqlDatabase(self._param.database, user=self._param.username, host=self._param.host, db = psycopg2.connect(dbname=self._param.database, user=self._param.username, host=self._param.host,
port=self._param.port, password=self._param.password) port=self._param.port, password=self._param.password)
try: try:
db.connect() cursor = db.cursor()
except Exception as e: except Exception as e:
raise Exception("Database Connection Failed! \n" + str(e)) raise Exception("Database Connection Failed! \n" + str(e))
sql_res = [] sql_res = []
@ -81,13 +82,13 @@ class ExeSQL(ComponentBase, ABC):
if not single_sql: if not single_sql:
continue continue
try: try:
query = db.execute_sql(single_sql) cursor.execute(single_sql)
if query.rowcount == 0: if cursor.rowcount == 0:
sql_res.append({"content": "\nTotal: " + str(query.rowcount) + "\n No record in the database!"}) sql_res.append({"content": "\nTotal: 0\n No record in the database!"})
continue continue
single_res = pd.DataFrame([i for i in query.fetchmany(size=self._param.top_n)]) single_res = pd.DataFrame([i for i in cursor.fetchmany(size=self._param.top_n)])
single_res.columns = [i[0] for i in query.description] single_res.columns = [i[0] for i in cursor.description]
sql_res.append({"content": "\nTotal: " + str(query.rowcount) + "\n" + single_res.to_markdown()}) sql_res.append({"content": "\nTotal: " + str(cursor.rowcount) + "\n" + single_res.to_markdown()})
except Exception as e: except Exception as e:
sql_res.append({"content": "**Error**:" + str(e) + "\nError SQL Statement:" + single_sql}) sql_res.append({"content": "**Error**:" + str(e) + "\nError SQL Statement:" + single_sql})
pass pass