[integration tests] Refactoring: baseConnector, mySqlBaseConnector. Tests fixes and improvement, bugs fixes.

This commit is contained in:
Georgii Petrov
2023-09-05 12:41:09 +03:00
parent c4e11222a0
commit b15aa274e5
8 changed files with 345 additions and 270 deletions

View File

@ -32,7 +32,7 @@
'use strict';
var sqlDataBaseType = {
const sqlDataBaseType = {
mySql : 'mysql',
mariaDB : 'mariadb',
msSql : 'mssql',
@ -43,49 +43,48 @@ var sqlDataBaseType = {
const connectorUtilities = require('./connectorUtilities');
const utils = require('./../../Common/sources/utils');
var bottleneck = require("bottleneck");
var config = require('config');
var configSql = config.get('services.CoAuthoring.sql');
const bottleneck = require("bottleneck");
const config = require('config');
const configSql = config.get('services.CoAuthoring.sql');
const cfgTableResult = configSql.get('tableResult');
const cfgTableChanges = configSql.get('tableChanges');
const maxPacketSize = configSql.get('max_allowed_packet'); // The default size for a query to the database is 1Mb - 1 (because it does not write 1048575, but writes 1048574)
const cfgBottleneckGetChanges = config.get('bottleneck.getChanges');
const dbType = configSql.get('type');
let baseConnector;
const reservoirMaximum = cfgBottleneckGetChanges.reservoirIncreaseMaximum || cfgBottleneckGetChanges.reservoirRefreshAmount;
const group = new bottleneck.Group(cfgBottleneckGetChanges);
const g_oCriticalSection = {};
let dbInstance;
switch (dbType) {
case sqlDataBaseType.mySql:
case sqlDataBaseType.mariaDB:
baseConnector = require('./mySqlBaseConnector');
dbInstance = require('./mySqlBaseConnector');
break;
case sqlDataBaseType.msSql:
baseConnector = require('./msSqlServerConnector');
dbInstance = require('./msSqlServerConnector');
break;
case sqlDataBaseType.dameng:
baseConnector = require('./damengBaseConnector');
dbInstance = require('./damengBaseConnector');
break;
case sqlDataBaseType.oracle:
baseConnector = require('./oracleBaseConnector');
dbInstance = require('./oracleBaseConnector');
break;
default:
baseConnector = require('./postgreSqlBaseConnector');
dbInstance = require('./postgreSqlBaseConnector');
break;
}
const cfgTableResult = configSql.get('tableResult');
const cfgTableChanges = configSql.get('tableChanges');
var g_oCriticalSection = {};
let isSupportFastInsert = !!baseConnector.insertChanges;
let addSqlParam = baseConnector.addSqlParameter;
var maxPacketSize = configSql.get('max_allowed_packet'); // The default size for a query to the database is 1Mb - 1 (because it does not write 1048575, but writes 1048574)
const cfgBottleneckGetChanges = config.get('bottleneck.getChanges');
let reservoirMaximum = cfgBottleneckGetChanges.reservoirIncreaseMaximum || cfgBottleneckGetChanges.reservoirRefreshAmount;
let group = new bottleneck.Group(cfgBottleneckGetChanges);
let isSupportFastInsert = !!dbInstance.insertChanges;
const addSqlParameter = dbInstance.addSqlParameter;
function getChangesSize(changes) {
return changes.reduce((accumulator, currentValue) => accumulator + currentValue.change_data.length, 0);
}
exports.baseConnector = baseConnector;
exports.insertChangesPromiseCompatibility = function (ctx, objChanges, docId, index, user) {
function insertChangesPromiseCompatibility(ctx, objChanges, docId, index, user) {
return new Promise(function(resolve, reject) {
_insertChangesCallback(ctx, 0, objChanges, docId, index, user, function(error, result) {
if (error) {
@ -95,14 +94,15 @@ exports.insertChangesPromiseCompatibility = function (ctx, objChanges, docId, in
}
});
});
};
exports.insertChangesPromiseFast = function (ctx, objChanges, docId, index, user) {
}
function insertChangesPromiseFast(ctx, objChanges, docId, index, user) {
return new Promise(function(resolve, reject) {
baseConnector.insertChanges(ctx, cfgTableChanges, 0, objChanges, docId, index, user, function(error, result, isSupported) {
dbInstance.insertChanges(ctx, cfgTableChanges, 0, objChanges, docId, index, user, function(error, result, isSupported) {
isSupportFastInsert = isSupported;
if (error) {
if (!isSupportFastInsert) {
resolve(exports.insertChangesPromiseCompatibility(ctx, objChanges, docId, index, user));
resolve(insertChangesPromiseCompatibility(ctx, objChanges, docId, index, user));
} else {
reject(error);
}
@ -111,22 +111,21 @@ exports.insertChangesPromiseFast = function (ctx, objChanges, docId, index, user
}
});
});
};
exports.insertChangesPromise = function (ctx, objChanges, docId, index, user) {
if (isSupportFastInsert) {
return exports.insertChangesPromiseFast(ctx, objChanges, docId, index, user);
} else {
return exports.insertChangesPromiseCompatibility(ctx, objChanges, docId, index, user);
}
}
function insertChangesPromise(ctx, objChanges, docId, index, user) {
if (isSupportFastInsert) {
return insertChangesPromiseFast(ctx, objChanges, docId, index, user);
} else {
return insertChangesPromiseCompatibility(ctx, objChanges, docId, index, user);
}
}
};
function _getDateTime2(oDate) {
return oDate.toISOString().slice(0, 19).replace('T', ' ');
}
exports.getDateTime = _getDateTime2;
function _insertChangesCallback (ctx, startIndex, objChanges, docId, index, user, callback) {
function _insertChangesCallback(ctx, startIndex, objChanges, docId, index, user, callback) {
var sqlCommand = `INSERT INTO ${cfgTableChanges} VALUES`;
var i = startIndex, l = objChanges.length, lengthUtf8Current = sqlCommand.length, lengthUtf8Row = 0, values = [];
if (i === l)
@ -141,21 +140,21 @@ function _insertChangesCallback (ctx, startIndex, objChanges, docId, index, user
if (lengthUtf8Row + lengthUtf8Current >= maxPacketSize && i > startIndex) {
sqlCommand += ';';
(function(tmpStart, tmpIndex) {
baseConnector.sqlQuery(ctx, sqlCommand, function() {
dbInstance.sqlQuery(ctx, sqlCommand, function() {
// do not remove lock, but we continue to add
_insertChangesCallback(ctx, tmpStart, objChanges, docId, tmpIndex, user, callback);
}, undefined, undefined, values);
})(i, index);
return;
}
let p0 = addSqlParam(ctx.tenant, values);
let p1 = addSqlParam(docId, values);
let p2 = addSqlParam(index, values);
let p3 = addSqlParam(user.id, values);
let p4 = addSqlParam(user.idOriginal, values);
let p5 = addSqlParam(user.username, values);
let p6 = addSqlParam(objChanges[i].change, values);
let p7 = addSqlParam(objChanges[i].time, values);
let p0 = addSqlParameter(ctx.tenant, values);
let p1 = addSqlParameter(docId, values);
let p2 = addSqlParameter(index, values);
let p3 = addSqlParameter(user.id, values);
let p4 = addSqlParameter(user.idOriginal, values);
let p5 = addSqlParameter(user.username, values);
let p6 = addSqlParameter(objChanges[i].change, values);
let p7 = addSqlParameter(objChanges[i].time, values);
if (i > startIndex) {
sqlCommand += ',';
}
@ -164,47 +163,25 @@ function _insertChangesCallback (ctx, startIndex, objChanges, docId, index, user
}
sqlCommand += ';';
baseConnector.sqlQuery(ctx, sqlCommand, callback, undefined, undefined, values);
dbInstance.sqlQuery(ctx, sqlCommand, callback, undefined, undefined, values);
}
exports.deleteChangesCallback = function(ctx, docId, deleteIndex, callback) {
function deleteChangesCallback(ctx, docId, deleteIndex, callback) {
let sqlCommand, values = [];
let p1 = addSqlParam(ctx.tenant, values);
let p2 = addSqlParam(docId, values);
let p1 = addSqlParameter(ctx.tenant, values);
let p2 = addSqlParameter(docId, values);
if (null !== deleteIndex) {
let sqlParam2 = addSqlParam(deleteIndex, values);
let sqlParam2 = addSqlParameter(deleteIndex, values);
sqlCommand = `DELETE FROM ${cfgTableChanges} WHERE tenant=${p1} AND id=${p2} AND change_id >= ${sqlParam2};`;
} else {
sqlCommand = `DELETE FROM ${cfgTableChanges} WHERE tenant=${p1} AND id=${p2};`;
}
baseConnector.sqlQuery(ctx, sqlCommand, callback, undefined, undefined, values);
};
exports.deleteChangesPromise = function (ctx, docId, deleteIndex) {
return new Promise(function(resolve, reject) {
exports.deleteChangesCallback(ctx, docId, deleteIndex, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
};
exports.deleteChanges = function (ctx, docId, deleteIndex) {
lockCriticalSection(docId, function () {_deleteChanges(ctx, docId, deleteIndex);});
};
function _deleteChanges (ctx, docId, deleteIndex) {
exports.deleteChangesCallback(ctx, docId, deleteIndex, function () {unLockCriticalSection(docId);});
dbInstance.sqlQuery(ctx, sqlCommand, callback, undefined, undefined, values);
}
exports.getChangesIndex = function(ctx, docId, callback) {
let values = [];
let p1 = addSqlParam(ctx.tenant, values);
let p2 = addSqlParam(docId, values);
var sqlCommand = `SELECT MAX(change_id) as change_id FROM ${cfgTableChanges} WHERE tenant=${p1} AND id=${p2};`;
baseConnector.sqlQuery(ctx, sqlCommand, callback, undefined, undefined, values);
};
exports.getChangesIndexPromise = function(ctx, docId) {
function deleteChangesPromise(ctx, docId, deleteIndex) {
return new Promise(function(resolve, reject) {
exports.getChangesIndex(ctx, docId, function(error, result) {
deleteChangesCallback(ctx, docId, deleteIndex, function(error, result) {
if (error) {
reject(error);
} else {
@ -212,35 +189,64 @@ exports.getChangesIndexPromise = function(ctx, docId) {
}
});
});
};
exports.getChangesPromise = function (ctx, docId, optStartIndex, optEndIndex, opt_time) {
}
function deleteChanges(ctx, docId, deleteIndex) {
lockCriticalSection(docId, function () {_deleteChanges(ctx, docId, deleteIndex);});
}
function _deleteChanges (ctx, docId, deleteIndex) {
deleteChangesCallback(ctx, docId, deleteIndex, function () {unLockCriticalSection(docId);});
}
function getChangesIndex(ctx, docId, callback) {
let values = [];
let p1 = addSqlParameter(ctx.tenant, values);
let p2 = addSqlParameter(docId, values);
var sqlCommand = `SELECT MAX(change_id) as change_id FROM ${cfgTableChanges} WHERE tenant=${p1} AND id=${p2};`;
dbInstance.sqlQuery(ctx, sqlCommand, callback, undefined, undefined, values);
}
function getChangesIndexPromise(ctx, docId) {
return new Promise(function(resolve, reject) {
getChangesIndex(ctx, docId, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
}
function getChangesPromise(ctx, docId, optStartIndex, optEndIndex, opt_time) {
let limiter = group.key(`${ctx.tenant}\t${docId}\tchanges`);
return limiter.schedule(() => {
return new Promise(function(resolve, reject) {
let values = [];
let sqlParam = addSqlParam(ctx.tenant, values);
let sqlParam = addSqlParameter(ctx.tenant, values);
let sqlWhere = `tenant=${sqlParam}`;
sqlParam = addSqlParam(docId, values);
sqlParam = addSqlParameter(docId, values);
sqlWhere += ` AND id=${sqlParam}`;
if (null != optStartIndex) {
sqlParam = addSqlParam(optStartIndex, values);
sqlParam = addSqlParameter(optStartIndex, values);
sqlWhere += ` AND change_id>=${sqlParam}`;
}
if (null != optEndIndex) {
sqlParam = addSqlParam(optEndIndex, values);
sqlParam = addSqlParameter(optEndIndex, values);
sqlWhere += ` AND change_id<${sqlParam}`;
}
if (null != opt_time) {
if (!(opt_time instanceof Date)) {
opt_time = new Date(opt_time);
}
sqlParam = addSqlParam(opt_time, values);
sqlParam = addSqlParameter(opt_time, values);
sqlWhere += ` AND change_date<=${sqlParam}`;
}
sqlWhere += ' ORDER BY change_id ASC';
var sqlCommand = `SELECT * FROM ${cfgTableChanges} WHERE ${sqlWhere};`;
baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
dbInstance.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -257,11 +263,12 @@ exports.getChangesPromise = function (ctx, docId, optStartIndex, optEndIndex, op
}, undefined, undefined, values);
});
});
};
exports.getDocumentsWithChanges = baseConnector.getDocumentsWithChanges ?? function (ctx) {
}
function getDocumentsWithChanges(ctx) {
return new Promise(function(resolve, reject) {
const sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE EXISTS(SELECT id FROM ${cfgTableChanges} WHERE tenant=${cfgTableResult}.tenant AND id = ${cfgTableResult}.id LIMIT 1);`;
baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
dbInstance.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -270,16 +277,18 @@ exports.getDocumentsWithChanges = baseConnector.getDocumentsWithChanges ?? funct
}, false, false);
});
}
exports.getExpired = baseConnector.getExpired ?? function(ctx, maxCount, expireSeconds) {
function getExpired(ctx, maxCount, expireSeconds) {
return new Promise(function(resolve, reject) {
const values = [];
const expireDate = new Date();
utils.addSeconds(expireDate, -expireSeconds);
const date = addSqlParam(expireDate, values);
const count = addSqlParam(maxCount, values);
const date = addSqlParameter(expireDate, values);
const count = addSqlParameter(maxCount, values);
const sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE last_open_date <= ${date}` +
` AND NOT EXISTS(SELECT tenant, id FROM ${cfgTableChanges} WHERE ${cfgTableChanges}.tenant = ${cfgTableResult}.tenant AND ${cfgTableChanges}.id = ${cfgTableResult}.id LIMIT 1) LIMIT ${count};`;
baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
dbInstance.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -289,12 +298,12 @@ exports.getExpired = baseConnector.getExpired ?? function(ctx, maxCount, expireS
});
}
exports.isLockCriticalSection = function (id) {
function isLockCriticalSection(id) {
return !!(g_oCriticalSection[id]);
};
}
// critical section
function lockCriticalSection (id, callback) {
function lockCriticalSection(id, callback) {
if (g_oCriticalSection[id]) {
// wait
g_oCriticalSection[id].push(callback);
@ -305,7 +314,8 @@ function lockCriticalSection (id, callback) {
g_oCriticalSection[id].push(callback);
callback();
}
function unLockCriticalSection (id) {
function unLockCriticalSection(id) {
var arrCallbacks = g_oCriticalSection[id];
arrCallbacks.shift();
if (0 < arrCallbacks.length)
@ -313,11 +323,12 @@ function unLockCriticalSection (id) {
else
delete g_oCriticalSection[id];
}
exports.healthCheck = baseConnector.healthCheck ?? function (ctx) {
function healthCheck(ctx) {
return new Promise(function(resolve, reject) {
//SELECT 1; usefull for H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite
//http://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most
baseConnector.sqlQuery(ctx, 'SELECT 1;', function(error, result) {
//SELECT 1; usefull for H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite
//http://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most
dbInstance.sqlQuery(ctx, 'SELECT 1;', function(error, result) {
if (error) {
reject(error);
} else {
@ -325,12 +336,12 @@ exports.healthCheck = baseConnector.healthCheck ?? function (ctx) {
}
});
});
};
}
exports.getEmptyCallbacks = baseConnector.getEmptyCallbacks ?? function(ctx) {
function getEmptyCallbacks(ctx) {
return new Promise(function(resolve, reject) {
const sqlCommand = `SELECT DISTINCT t1.tenant, t1.id FROM ${cfgTableChanges} t1 LEFT JOIN ${cfgTableResult} t2 ON t2.tenant = t1.tenant AND t2.id = t1.id WHERE t2.callback = '';`;
baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
dbInstance.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -338,11 +349,12 @@ exports.getEmptyCallbacks = baseConnector.getEmptyCallbacks ?? function(ctx) {
}
});
});
};
exports.getTableColumns = baseConnector.getTableColumns ?? function(ctx, tableName) {
}
function getTableColumns(ctx, tableName) {
return new Promise(function(resolve, reject) {
const sqlCommand = `SELECT column_name FROM information_schema.COLUMNS WHERE TABLE_NAME = '${tableName}';`;
baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
dbInstance.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -350,7 +362,21 @@ exports.getTableColumns = baseConnector.getTableColumns ?? function(ctx, tableNa
}
});
});
};
exports.UserCallback = connectorUtilities.UserCallback;
exports.DocumentPassword = connectorUtilities.DocumentPassword;
exports.DocumentAdditional = connectorUtilities.DocumentAdditional;
}
module.exports = {
insertChangesPromise,
deleteChangesPromise,
deleteChanges,
getChangesIndexPromise,
getChangesPromise,
isLockCriticalSection,
getDocumentsWithChanges,
getExpired,
healthCheck,
getEmptyCallbacks,
getTableColumns,
getDateTime: _getDateTime2,
...connectorUtilities,
...dbInstance
};

View File

@ -56,7 +56,7 @@ var WAIT_TIMEOUT = 30000;
var LOOP_TIMEOUT = 1000;
var EXEC_TIMEOUT = WAIT_TIMEOUT + utils.getConvertionTimeout(undefined);
let addSqlParam = sqlBase.baseConnector.addSqlParameter;
let addSqlParam = sqlBase.addSqlParameter;
function updateDoc(ctx, docId, status, callback) {
return new Promise(function(resolve, reject) {
@ -66,7 +66,7 @@ function updateDoc(ctx, docId, status, callback) {
let p3 = addSqlParam(ctx.tenant, values);
let p4 = addSqlParam(docId, values);
let sqlCommand = `UPDATE ${cfgTableResult} SET status=${p1},callback=${p2} WHERE tenant=${p3} AND id=${p4};`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
sqlBase.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {

View File

@ -59,7 +59,6 @@ UserCallback.prototype.getCallbacks = function(ctx, callbacksStr) {
}
return res;
};
exports.UserCallback = UserCallback;
function DocumentPassword() {
this.password = undefined;
@ -111,7 +110,6 @@ DocumentPassword.prototype.hasPasswordChanges = function(ctx, docPasswordStr) {
let docPassword = this.getDocPassword(ctx, docPasswordStr);
return docPassword.initial !== docPassword.current;
};
exports.DocumentPassword = DocumentPassword;
function DocumentAdditional() {
this.data = [];
@ -152,4 +150,9 @@ DocumentAdditional.prototype.getOpenedAt = function(str) {
});
return res;
};
exports.DocumentAdditional = DocumentAdditional;
module.exports = {
UserCallback,
DocumentPassword,
DocumentAdditional
}

View File

@ -120,13 +120,13 @@ function registerPlaceholderValues(values, statement) {
}
function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes = false, opt_noLog = false, opt_values = {}) {
return executeSql(ctx, sqlCommand, opt_values, opt_noModifyRes, opt_noLog).then(
return executeQuery(ctx, sqlCommand, opt_values, opt_noModifyRes, opt_noLog).then(
result => callbackFunction?.(null, result),
error => callbackFunction?.(error)
);
}
async function executeSql(ctx, sqlCommand, values = {}, noModifyRes = false, noLog = false) {
async function executeQuery(ctx, sqlCommand, values = {}, noModifyRes = false, noLog = false) {
try {
await sql.connect(configuration);
@ -201,14 +201,14 @@ function concatParams(...parameters) {
function getTableColumns(ctx, tableName) {
const sqlCommand = `SELECT column_name FROM information_schema.COLUMNS WHERE TABLE_NAME = '${tableName}' AND TABLE_SCHEMA = 'dbo';`;
return executeSql(ctx, sqlCommand);
return executeQuery(ctx, sqlCommand);
}
function getDocumentsWithChanges(ctx) {
const existingId = `SELECT TOP(1) id FROM ${cfgTableChanges} WHERE tenant=${cfgTableResult}.tenant AND id = ${cfgTableResult}.id`;
const sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE EXISTS(${existingId});`;
return executeSql(ctx, sqlCommand);
return executeQuery(ctx, sqlCommand);
}
function getExpired(ctx, maxCount, expireSeconds) {
@ -221,7 +221,7 @@ function getExpired(ctx, maxCount, expireSeconds) {
const notExistingTenantAndId = `SELECT TOP(1) tenant, id FROM ${cfgTableChanges} WHERE ${cfgTableChanges}.tenant = ${cfgTableResult}.tenant AND ${cfgTableChanges}.id = ${cfgTableResult}.id`
const sqlCommand = `SELECT TOP(${count}) * FROM ${cfgTableResult} WHERE last_open_date <= ${date} AND NOT EXISTS(${notExistingTenantAndId});`;
return executeSql(ctx, sqlCommand, values);
return executeQuery(ctx, sqlCommand, values);
}
async function upsert(ctx, task, opt_updateUserIndex) {
@ -285,7 +285,7 @@ async function upsert(ctx, task, opt_updateUserIndex) {
+ `WHEN NOT MATCHED THEN INSERT(${sourceColumns}) VALUES(${sourceValues}) `
+ `OUTPUT $ACTION as action, INSERTED.user_index as insertId;`;
const result = await executeSql(ctx, sqlMerge, values, true);
const result = await executeQuery(ctx, sqlMerge, values, true);
const insertId = result.recordset[0].insertId;
const affectedRows = result.recordset[0].action === 'UPDATE' ? 2 : 1;

View File

@ -32,102 +32,140 @@
'use strict';
var mysql = require('mysql2');
var connectorUtilities = require('./connectorUtilities');
const mysql = require('mysql2');
const connectorUtilities = require('./connectorUtilities');
const config = require('config');
const configSql = config.get('services.CoAuthoring.sql');
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult');
var pool = mysql.createPool({
host : configSql.get('dbHost'),
port : configSql.get('dbPort'),
user : configSql.get('dbUser'),
password : configSql.get('dbPass'),
database : configSql.get('dbName'),
charset : configSql.get('charset'),
connectionLimit : configSql.get('connectionlimit'),
timezone : 'Z',
flags : '-FOUND_ROWS'
const pool = mysql.createPool({
host : configSql.get('dbHost'),
port : configSql.get('dbPort'),
user : configSql.get('dbUser'),
password : configSql.get('dbPass'),
database : configSql.get('dbName'),
charset : configSql.get('charset'),
connectionLimit : configSql.get('connectionlimit'),
timezone : 'Z',
flags : '-FOUND_ROWS'
});
exports.sqlQuery = function (ctx, sqlCommand, callbackFunction, opt_noModifyRes, opt_noLog, opt_values) {
pool.getConnection(function(err, connection) {
if (err) {
ctx.logger.error('pool.getConnection error: %s', err);
if (callbackFunction) callbackFunction(err, null);
return;
}
let queryCallback = function (error, result) {
connection.release();
if (error) {
ctx.logger.error('________________________error_____________________');
ctx.logger.error('sqlQuery: %s sqlCommand: %s', error.code, sqlCommand);
ctx.logger.error(error);
ctx.logger.error('_____________________end_error_____________________');
}
if (callbackFunction) callbackFunction(error, result);
};
if(opt_values){
connection.query(sqlCommand, opt_values, queryCallback);
} else {
connection.query(sqlCommand, queryCallback);
}
});
};
let addSqlParam = function (val, values) {
values.push(val);
return '?';
};
exports.addSqlParameter = addSqlParam;
let concatParams = function (val1, val2) {
function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes = false, opt_noLog = false, opt_values = []) {
pool.getConnection(function(connectionError, connection) {
if (connectionError) {
if (!opt_noLog) {
ctx.logger.error('pool.getConnection error: %s', connectionError);
}
callbackFunction?.(connectionError, null);
return;
}
let queryCallback = function (error, result) {
connection.release();
if (error && !opt_noLog) {
ctx.logger.error('_______________________error______________________');
ctx.logger.error('sqlQuery: %s sqlCommand: %s', error.code, sqlCommand);
ctx.logger.error(error);
ctx.logger.error('_____________________end_error____________________');
}
let output;
if (!opt_noModifyRes) {
output = result?.affectedRows ? { affectedRows: result.affectedRows } : result;
} else {
output = result;
}
output = output ?? { rows: [], affectedRows: 0 };
callbackFunction?.(error, output);
};
connection.query(sqlCommand, opt_values, queryCallback);
});
}
function closePool() {
pool.end();
}
function addSqlParameter(val, values) {
values.push(val);
return '?';
}
function concatParams(val1, val2) {
return `CONCAT(COALESCE(${val1}, ''), COALESCE(${val2}, ''))`;
};
exports.concatParams = concatParams;
}
exports.upsert = function(ctx, task, opt_updateUserIndex) {
return new Promise(function(resolve, reject) {
task.completeDefaults();
let dateNow = new Date();
let values = [];
let cbInsert = task.callback;
if (task.callback) {
let userCallback = new connectorUtilities.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}
let p0 = addSqlParam(task.tenant, values);
let p1 = addSqlParam(task.key, values);
let p2 = addSqlParam(task.status, values);
let p3 = addSqlParam(task.statusInfo, values);
let p4 = addSqlParam(dateNow, values);
let p5 = addSqlParam(task.userIndex, values);
let p6 = addSqlParam(task.changeId, values);
let p7 = addSqlParam(cbInsert, values);
let p8 = addSqlParam(task.baseurl, values);
let p9 = addSqlParam(dateNow, values);
var sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl)`+
` VALUES (${p0}, ${p1}, ${p2}, ${p3}, ${p4}, ${p5}, ${p6}, ${p7}, ${p8}) ON DUPLICATE KEY UPDATE` +
` last_open_date = ${p9}`;
if (task.callback) {
let p10 = addSqlParam(JSON.stringify(task.callback), values);
sqlCommand += `, callback = CONCAT(callback , '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' , (user_index + 1) , ',"callback":', ${p10}, '}')`;
}
if (task.baseurl) {
let p11 = addSqlParam(task.baseurl, values);
sqlCommand += `, baseurl = ${p11}`;
}
if (opt_updateUserIndex) {
sqlCommand += ', user_index = LAST_INSERT_ID(user_index + 1)';
}
sqlCommand += ';';
function getTableColumns(ctx, tableName) {
return new Promise(function(resolve, reject) {
const sqlCommand = `SELECT column_name AS 'column_name' FROM information_schema.COLUMNS WHERE TABLE_NAME = '${tableName}';`;
sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
}
exports.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
}, undefined, undefined, values);
});
};
function upsert(ctx, task, opt_updateUserIndex) {
return new Promise(function(resolve, reject) {
task.completeDefaults();
let dateNow = new Date();
let values = [];
let cbInsert = task.callback;
if (task.callback) {
let userCallback = new connectorUtilities.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}
let p0 = addSqlParameter(task.tenant, values);
let p1 = addSqlParameter(task.key, values);
let p2 = addSqlParameter(task.status, values);
let p3 = addSqlParameter(task.statusInfo, values);
let p4 = addSqlParameter(dateNow, values);
let p5 = addSqlParameter(task.userIndex, values);
let p6 = addSqlParameter(task.changeId, values);
let p7 = addSqlParameter(cbInsert, values);
let p8 = addSqlParameter(task.baseurl, values);
let p9 = addSqlParameter(dateNow, values);
var sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl)`+
` VALUES (${p0}, ${p1}, ${p2}, ${p3}, ${p4}, ${p5}, ${p6}, ${p7}, ${p8}) ON DUPLICATE KEY UPDATE` +
` last_open_date = ${p9}`;
if (task.callback) {
let p10 = addSqlParameter(JSON.stringify(task.callback), values);
sqlCommand += `, callback = CONCAT(callback , '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' , (user_index + 1) , ',"callback":', ${p10}, '}')`;
}
if (task.baseurl) {
let p11 = addSqlParameter(task.baseurl, values);
sqlCommand += `, baseurl = ${p11}`;
}
if (opt_updateUserIndex) {
sqlCommand += ', user_index = LAST_INSERT_ID(user_index + 1)';
}
sqlCommand += ';';
sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
resolve({ affectedRows: result.affectedRows, insertId: result.insertId });
}
}, true, false, values);
});
}
module.exports = {
sqlQuery,
closePool,
addSqlParameter,
concatParams,
getTableColumns,
upsert
}

View File

@ -41,8 +41,8 @@ var config = require('config');
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult');
let addSqlParam = sqlBase.baseConnector.addSqlParameter;
let concatParams = sqlBase.baseConnector.concatParams;
let addSqlParam = sqlBase.addSqlParameter;
let concatParams = sqlBase.concatParams;
var RANDOM_KEY_MAX = 10000;
@ -96,7 +96,7 @@ TaskResultData.prototype.completeDefaults = function() {
};
function upsert(ctx, task, opt_updateUserIndex) {
return sqlBase.baseConnector.upsert(ctx, task, opt_updateUserIndex);
return sqlBase.upsert(ctx, task, opt_updateUserIndex);
}
function select(ctx, docId) {
@ -105,7 +105,7 @@ function select(ctx, docId) {
let p1 = addSqlParam(ctx.tenant, values);
let p2 = addSqlParam(docId, values);
let sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE tenant=${p1} AND id=${p2};`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
sqlBase.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -174,7 +174,7 @@ function update(ctx, task, setPassword) {
let p1 = addSqlParam(task.tenant, values);
let p2 = addSqlParam(task.key, values);
let sqlCommand = `UPDATE ${cfgTableResult} SET ${sqlSet} WHERE tenant=${p1} AND id=${p2};`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
sqlBase.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -194,7 +194,7 @@ function updateIf(ctx, task, mask) {
let sqlSet = commandArg.join(', ');
let sqlWhere = commandArgMask.join(' AND ');
let sqlCommand = `UPDATE ${cfgTableResult} SET ${sqlSet} WHERE ${sqlWhere};`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
sqlBase.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -245,7 +245,7 @@ function addRandomKey(ctx, task, opt_prefix, opt_size) {
let p8 = addSqlParam(task.baseurl, values);
let sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl)` +
` VALUES (${p0}, ${p1}, ${p2}, ${p3}, ${p4}, ${p5}, ${p6}, ${p7}, ${p8});`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
sqlBase.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -286,7 +286,7 @@ function remove(ctx, docId) {
let p1 = addSqlParam(ctx.tenant, values);
let p2 = addSqlParam(docId, values);
const sqlCommand = `DELETE FROM ${cfgTableResult} WHERE tenant=${p1} AND id=${p2};`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
sqlBase.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
@ -303,7 +303,7 @@ function removeIf(ctx, mask) {
commandArgMask.push('id=' + addSqlParam(mask.key, values));
let sqlWhere = commandArgMask.join(' AND ');
const sqlCommand = `DELETE FROM ${cfgTableResult} WHERE ${sqlWhere};`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
sqlBase.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {

View File

@ -33,10 +33,10 @@ CREATE TABLE onlyoffice.task_result (
last_open_date TIMESTAMP NOT NULL,
user_index NUMBER DEFAULT 1 NOT NULL,
change_id NUMBER DEFAULT 0 NOT NULL,
callback NVARCHAR2(2000), -- codebase uses '' as default values here, but Oracle treat '' as NULL, so NULL permitted for this value.
baseurl NVARCHAR2(2000), -- codebase uses '' as default values here, but Oracle treat '' as NULL, so NULL permitted for this value.
password NVARCHAR2(2000) NULL,
additional NVARCHAR2(2000) NULL,
callback NCLOB, -- codebase uses '' as default values here, but Oracle treat '' as NULL, so NULL permitted for this value.
baseurl NCLOB, -- codebase uses '' as default values here, but Oracle treat '' as NULL, so NULL permitted for this value.
password NCLOB NULL,
additional NCLOB NULL,
CONSTRAINT task_result_unique UNIQUE (tenant, id),
CONSTRAINT task_result_unsigned_int CHECK (user_index BETWEEN 0 AND 4294967295 AND change_id BETWEEN 0 AND 4294967295)
);

View File

@ -3,10 +3,9 @@ const config = require('../../Common/node_modules/config');
const baseConnector = require('../../DocService/sources/baseConnector');
const operationContext = require('../../Common/sources/operationContext');
const taskResult = require("../../DocService/sources/taskresult");
const commonDefines = require("../../Common/sources/commondefines");
const constants = require("../../Common/sources/constants");
const connectorUtilities = require("../../DocService/sources/connectorUtilities");
const taskResult = require('../../DocService/sources/taskresult');
const commonDefines = require('../../Common/sources/commondefines');
const constants = require('../../Common/sources/constants');
const configSql = config.get('services.CoAuthoring.sql');
const ctx = new operationContext.Context();
@ -107,7 +106,7 @@ async function getRowsCountById(table, id) {
return result[0].count;
}
async function nowRowsExistenceCheck(table, id) {
async function noRowsExistenceCheck(table, id) {
const noRows = await getRowsCountById(table, id);
expect(noRows).toEqual(0);
}
@ -124,7 +123,7 @@ function deleteRowsByIds(table, ids) {
function executeSql(sql, values = []) {
return new Promise((resolve, reject) => {
baseConnector.baseConnector.sqlQuery(ctx, sql, function (error, result) {
baseConnector.sqlQuery(ctx, sql, function (error, result) {
if (error) {
reject(error)
} else {
@ -138,14 +137,14 @@ async function insertChangesLocal(objChanges, docId, index, user) {
for (let currentIndex = 0; currentIndex < objChanges.length; ++currentIndex, ++index) {
const values = [];
const placeholder = [
baseConnector.baseConnector.addSqlParameter(ctx.tenant, values),
baseConnector.baseConnector.addSqlParameter(docId, values),
baseConnector.baseConnector.addSqlParameter(index, values),
baseConnector.baseConnector.addSqlParameter(user.id, values),
baseConnector.baseConnector.addSqlParameter(user.idOriginal, values),
baseConnector.baseConnector.addSqlParameter(user.username, values),
baseConnector.baseConnector.addSqlParameter(objChanges[currentIndex].change, values),
baseConnector.baseConnector.addSqlParameter(objChanges[currentIndex].time, values),
baseConnector.addSqlParameter(ctx.tenant, values),
baseConnector.addSqlParameter(docId, values),
baseConnector.addSqlParameter(index, values),
baseConnector.addSqlParameter(user.id, values),
baseConnector.addSqlParameter(user.idOriginal, values),
baseConnector.addSqlParameter(user.username, values),
baseConnector.addSqlParameter(objChanges[currentIndex].change, values),
baseConnector.addSqlParameter(objChanges[currentIndex].time, values),
];
const sqlInsert = `INSERT INTO ${cfgTableChanges} VALUES(${placeholder.join(', ')});`;
@ -169,7 +168,7 @@ function createTask(id, callback = '', baseurl = '') {
function insertResult(dateNow, task) {
let cbInsert = task.callback;
if (task.callback) {
const userCallback = new connectorUtilities.UserCallback();
const userCallback = new baseConnector.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}
@ -177,15 +176,15 @@ function insertResult(dateNow, task) {
const columns = ['tenant', 'id', 'status', 'status_info', 'last_open_date', 'user_index', 'change_id', 'callback', 'baseurl'];
const values = [];
const placeholder = [
baseConnector.baseConnector.addSqlParameter(task.tenant, values),
baseConnector.baseConnector.addSqlParameter(task.key, values),
baseConnector.baseConnector.addSqlParameter(task.status, values),
baseConnector.baseConnector.addSqlParameter(task.statusInfo, values),
baseConnector.baseConnector.addSqlParameter(dateNow, values),
baseConnector.baseConnector.addSqlParameter(task.userIndex, values),
baseConnector.baseConnector.addSqlParameter(task.changeId, values),
baseConnector.baseConnector.addSqlParameter(cbInsert, values),
baseConnector.baseConnector.addSqlParameter(task.baseurl, values)
baseConnector.addSqlParameter(task.tenant, values),
baseConnector.addSqlParameter(task.key, values),
baseConnector.addSqlParameter(task.status, values),
baseConnector.addSqlParameter(task.statusInfo, values),
baseConnector.addSqlParameter(dateNow, values),
baseConnector.addSqlParameter(task.userIndex, values),
baseConnector.addSqlParameter(task.changeId, values),
baseConnector.addSqlParameter(cbInsert, values),
baseConnector.addSqlParameter(task.baseurl, values)
];
return executeSql(`INSERT INTO ${cfgTableResult}(${columns.join(', ')}) VALUES(${placeholder.join(', ')});`, values);
@ -205,7 +204,7 @@ afterAll(async function () {
];
await Promise.allSettled(deletionPool);
baseConnector.baseConnector.closePool?.();
baseConnector.closePool?.();
});
// Assumed that at least default DB was installed and configured.
@ -272,7 +271,9 @@ describe('Base database connector', function () {
for (const table in tables) {
test(`${table} table existence`, async function () {
const result = await baseConnector.getTableColumns(ctx, table);
expect(result).toEqual(tables[table]);
for (const row of tables[table]) {
expect(result).toContainEqual(row);
}
});
}
});
@ -294,7 +295,7 @@ describe('Base database connector', function () {
const docId = insertCases[testCase];
const objChanges = createChanges(+testCase, date);
await nowRowsExistenceCheck(cfgTableChanges, docId);
await noRowsExistenceCheck(cfgTableChanges, docId);
await baseConnector.insertChangesPromise(ctx, objChanges, docId, index, user);
const result = await getRowsCountById(cfgTableChanges, docId);
@ -311,7 +312,7 @@ describe('Base database connector', function () {
test('Get changes in range', async function () {
const docId = changesCases.range;
await nowRowsExistenceCheck(cfgTableChanges, docId);
await noRowsExistenceCheck(cfgTableChanges, docId);
await insertChangesLocal(objChanges, docId, index, user);
@ -323,7 +324,7 @@ describe('Base database connector', function () {
test('Get changes index', async function () {
const docId = changesCases.index;
await nowRowsExistenceCheck(cfgTableChanges, docId);
await noRowsExistenceCheck(cfgTableChanges, docId);
await insertChangesLocal(objChanges, docId, index, user);
@ -353,6 +354,8 @@ describe('Base database connector', function () {
const idCount = 5;
const notNullCallbacks = idCount - 2;
const resultBefore = await baseConnector.getEmptyCallbacks(ctx);
// Adding non-empty callbacks.
for (let i = 0; i < notNullCallbacks; i++) {
const task = createTask(emptyCallbacksCase[i], 'some_callback');
@ -371,13 +374,9 @@ describe('Base database connector', function () {
await insertChangesLocal(objChanges, emptyCallbacksCase[i], index, user);
}
const result = await baseConnector.getEmptyCallbacks(ctx);
const resultAfter = await baseConnector.getEmptyCallbacks(ctx);
// Needs to add ids that already exist at this point. It is cfgTableChanges rest rows of LEFT JOIN result.
const restRows = await getIdsCount(cfgTableChanges, [...Object.values(insertCases), ...Object.values(changesCases)]);
// Rest rows + rows with empty callbacks in right table.
expect(result.length).toEqual(restRows[0].count + idCount - notNullCallbacks);
expect(resultAfter.length).toEqual(resultBefore.length + idCount - notNullCallbacks);
});
test('Get documents with changes', async function () {
@ -419,11 +418,11 @@ describe('Base database connector', function () {
test('New row inserted', async function () {
const task = createTask(upsertCases.insert);
await nowRowsExistenceCheck(cfgTableResult, task.key);
await noRowsExistenceCheck(cfgTableResult, task.key);
const result = await baseConnector.baseConnector.upsert(ctx, task);
const result = await baseConnector.upsert(ctx, task);
// affectedRows should be 1 because of insert operation, insertId should be 2 due to it defaults.
// affectedRows should be 1 because of insert operation, insertId should be 1 by default.
const expected = { affectedRows: 1, insertId: 1 };
expect(result).toEqual(expected);
@ -435,21 +434,30 @@ describe('Base database connector', function () {
test('Row updated', async function () {
const task = createTask(upsertCases.update, '', 'some-url');
const dateNow = new Date();
await insertResult(dateNow, task);
await noRowsExistenceCheck(cfgTableResult, task.key);
await baseConnector.upsert(ctx, task);
// Changing baseurl to verify upsert() changing the row.
task.baseurl = 'some-updated-url';
const result = await baseConnector.baseConnector.upsert(ctx, task);
const resultUrl = await baseConnector.upsert(ctx, task);
// affectedRows should be 2 because of update operation, insertId should be 1 by default.
const expected = { affectedRows: 2, insertId: 1 };
expect(result).toEqual(expected);
const expectedUrl = { affectedRows: 2, insertId: 1 };
expect(resultUrl).toEqual(expectedUrl);
const updatedRow = await executeSql(`SELECT id, baseurl FROM ${cfgTableResult} WHERE id = '${task.key}';`);
const expectedUpdate = [{ id: task.key, baseurl: 'some-updated-url' }];
expect(updatedRow).toEqual(expectedUpdate);
const expectedUrlChanges = [{ id: task.key, baseurl: 'some-updated-url' }];
expect(updatedRow).toEqual(expectedUrlChanges);
// Changing baseurl to verify upsert() changing the user_index.
task.baseurl = 'some-updated-url-with-last-index-updated';
const resultLastIndex = await baseConnector.upsert(ctx, task, true);
// affectedRows should be 2 because of update operation, insertId should be 2 due to 'opt_updateUserIndex' parameter in upsert() function.
const expectedLastIndex = { affectedRows: 2, insertId: 2 }
expect(resultLastIndex).toEqual(expectedLastIndex);
});
});
});