[fix] Added support of MySql ssl connection

This commit is contained in:
Georgii Petrov
2024-06-03 11:30:04 +03:00
parent f5ffb201af
commit 41a29931cc
2 changed files with 105 additions and 100 deletions

View File

@ -230,7 +230,8 @@
"pool": {
"idleTimeoutMillis": 30000
}
}
},
"mysqlExtraOptions": {}
},
"redis": {
"name": "redis",

View File

@ -32,14 +32,14 @@
'use strict';
const mysql = require('mysql2');
const mysql = require('mysql2/promise');
const connectorUtilities = require('./connectorUtilities');
const config = require('config');
const configSql = config.get('services.CoAuthoring.sql');
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult');
const cfgTableResult = configSql.get('tableResult');
const pool = mysql.createPool({
const connectionConfiguration = {
host : configSql.get('dbHost'),
port : parseInt(configSql.get('dbPort')),
user : configSql.get('dbUser'),
@ -49,120 +49,124 @@ const pool = mysql.createPool({
connectionLimit : configSql.get('connectionlimit'),
timezone : 'Z',
flags : '-FOUND_ROWS'
});
};
const additionalOptions = configSql.get('mysqlExtraOptions');
const configuration = Object.assign({}, connectionConfiguration, additionalOptions);
let pool = null;
function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes = false, opt_noLog = false, opt_values = []) {
pool.getConnection(function(connectionError, connection) {
if (connectionError) {
if (!opt_noLog) {
ctx.logger.error('pool.getConnection error: %s', connectionError);
}
return executeQuery(ctx, sqlCommand, opt_values, opt_noModifyRes, opt_noLog).then(
result => callbackFunction?.(null, result),
error => callbackFunction?.(error)
);
}
callbackFunction?.(connectionError, null);
return;
async function executeQuery(ctx, sqlCommand, values = [], noModifyRes = false, noLog = false) {
let connection = null;
try {
if (!pool) {
pool = mysql.createPool(configuration);
}
let queryCallback = function (error, result) {
connection.release();
if (error && !opt_noLog) {
ctx.logger.error('_______________________error______________________');
ctx.logger.error('sqlQuery: %s sqlCommand: %s', error.code, sqlCommand);
ctx.logger.error(error);
ctx.logger.error('_____________________end_error____________________');
connection = await pool.getConnection();
const result = await connection.query(sqlCommand, values);
let output;
if (!noModifyRes) {
output = result[0]?.affectedRows ? { affectedRows: result[0].affectedRows } : result[0];
} else {
output = result[0];
}
return output ?? { rows: [], affectedRows: 0 };
} catch (error) {
if (!noLog) {
ctx.logger.error(`sqlQuery() error while executing query: ${sqlCommand}\n${error.stack}`);
}
throw error;
} finally {
if (connection) {
try {
// Put the connection back in the pool
connection.release();
} catch (error) {
if (!noLog) {
ctx.logger.error(`connection.release() error while executing query: ${sqlCommand}\n${error.stack}`);
}
}
let output;
if (!opt_noModifyRes) {
output = result?.affectedRows ? { affectedRows: result.affectedRows } : result;
} else {
output = result;
}
output = output ?? { rows: [], affectedRows: 0 };
callbackFunction?.(error, output);
};
connection.query(sqlCommand, opt_values, queryCallback);
});
}
}
}
function closePool() {
return new Promise((resolve, reject) => {
pool.end((error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
return pool?.end();
}
function addSqlParameter(val, values) {
values.push(val);
function addSqlParameter(parameter, accumulatedArray) {
accumulatedArray.push(parameter);
return '?';
}
function concatParams(val1, val2) {
return `CONCAT(COALESCE(${val1}, ''), COALESCE(${val2}, ''))`;
function concatParams(firstParameter, secondParameter) {
return `CONCAT(COALESCE(${firstParameter}, ''), COALESCE(${secondParameter}, ''))`;
}
function upsert(ctx, task) {
return new Promise(function(resolve, reject) {
task.completeDefaults();
let dateNow = new Date();
let values = [];
let cbInsert = task.callback;
if (task.callback) {
let userCallback = new connectorUtilities.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}
let p0 = addSqlParameter(task.tenant, values);
let p1 = addSqlParameter(task.key, values);
let p2 = addSqlParameter(task.status, values);
let p3 = addSqlParameter(task.statusInfo, values);
let p4 = addSqlParameter(dateNow, values);
let p5 = addSqlParameter(task.userIndex, values);
let p6 = addSqlParameter(task.changeId, values);
let p7 = addSqlParameter(cbInsert, values);
let p8 = addSqlParameter(task.baseurl, values);
let p9 = addSqlParameter(dateNow, values);
var sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl)`+
` VALUES (${p0}, ${p1}, ${p2}, ${p3}, ${p4}, ${p5}, ${p6}, ${p7}, ${p8}) ON DUPLICATE KEY UPDATE` +
` last_open_date = ${p9}`;
if (task.callback) {
let p10 = addSqlParameter(JSON.stringify(task.callback), values);
sqlCommand += `, callback = CONCAT(callback , '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' , (user_index + 1) , ',"callback":', ${p10}, '}')`;
}
if (task.baseurl) {
let p11 = addSqlParameter(task.baseurl, values);
sqlCommand += `, baseurl = ${p11}`;
}
async function upsert(ctx, task) {
task.completeDefaults();
const dateNow = new Date();
sqlCommand += ', user_index = LAST_INSERT_ID(user_index + 1);';
let cbInsert = task.callback;
if (task.callback) {
const userCallback = new connectorUtilities.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}
sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
const insertId = result.affectedRows === 1 ? task.userIndex : result.insertId;
//if CLIENT_FOUND_ROWS don't specify 1 row is inserted , 2 row is updated, and 0 row is set to its current values
//http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html
const isInsert = result.affectedRows === 1;
const values = [];
const valuesPlaceholder = [
addSqlParameter(task.tenant, values),
addSqlParameter(task.key, values),
addSqlParameter(task.status, values),
addSqlParameter(task.statusInfo, values),
addSqlParameter(dateNow, values),
addSqlParameter(task.userIndex, values),
addSqlParameter(task.changeId, values),
addSqlParameter(cbInsert, values),
addSqlParameter(task.baseurl, values)
];
resolve({ isInsert, insertId });
}
}, true, false, values);
});
let updateStatement = `last_open_date = ${addSqlParameter(dateNow, values)}`;
if (task.callback) {
let callbackPlaceholder = addSqlParameter(JSON.stringify(task.callback), values);
updateStatement += `, callback = CONCAT(callback , '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' , (user_index + 1) , ',"callback":', ${callbackPlaceholder}, '}')`;
}
if (task.baseurl) {
let baseUrlPlaceholder = addSqlParameter(task.baseurl, values);
updateStatement += `, baseurl = ${baseUrlPlaceholder}`;
}
updateStatement += ', user_index = LAST_INSERT_ID(user_index + 1);';
const sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl) `+
`VALUES (${valuesPlaceholder.join(', ')}) ` +
`ON DUPLICATE KEY UPDATE ${updateStatement}`;
const result = await executeQuery(ctx, sqlCommand, values, true);
const insertId = result.affectedRows === 1 ? task.userIndex : result.insertId;
//if CLIENT_FOUND_ROWS don't specify 1 row is inserted , 2 row is updated, and 0 row is set to its current values
//http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html
const isInsert = result.affectedRows === 1;
return { isInsert, insertId };
}
module.exports = {
sqlQuery,
closePool,
addSqlParameter,
concatParams,
upsert
}
module.exports.sqlQuery = sqlQuery;
module.exports.closePool = closePool;
module.exports.addSqlParameter = addSqlParameter;
module.exports.concatParams = concatParams;
module.exports.upsert = upsert;