mirror of
https://github.com/ONLYOFFICE/server.git
synced 2026-02-10 18:05:07 +08:00
[mssql] Fixes in sql queries for Expire() fuctions
This commit is contained in:
@ -42,12 +42,13 @@ var sqlDataBaseType = {
|
||||
};
|
||||
|
||||
const connectorUtilities = require('./connectorUtilities');
|
||||
const utils = require('./../../Common/sources/utils');
|
||||
var bottleneck = require("bottleneck");
|
||||
var config = require('config');
|
||||
var configSql = config.get('services.CoAuthoring.sql');
|
||||
const dbType = configSql.get('type');
|
||||
|
||||
var baseConnector;
|
||||
let baseConnector;
|
||||
switch (dbType) {
|
||||
case sqlDataBaseType.mySql:
|
||||
case sqlDataBaseType.mariaDB:
|
||||
@ -257,6 +258,36 @@ exports.getChangesPromise = function (ctx, docId, optStartIndex, optEndIndex, op
|
||||
});
|
||||
});
|
||||
};
|
||||
exports.getDocumentsWithChanges = baseConnector.getDocumentsWithChanges ?? function (ctx) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
const sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE EXISTS(SELECT id FROM ${cfgTableChanges} WHERE tenant=${cfgTableResult}.tenant AND id = ${cfgTableResult}.id LIMIT 1);`;
|
||||
baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
}, false, false);
|
||||
});
|
||||
}
|
||||
exports.getExpired = baseConnector.getExpired ?? function(ctx, maxCount, expireSeconds) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
const values = [];
|
||||
const expireDate = new Date();
|
||||
utils.addSeconds(expireDate, -expireSeconds);
|
||||
const sqlParam1 = addSqlParam(expireDate, values);
|
||||
const sqlParam2 = addSqlParam(maxCount, values);
|
||||
const sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE last_open_date <= ${sqlParam1}` +
|
||||
` AND NOT EXISTS(SELECT tenant, id FROM ${cfgTableChanges} WHERE ${cfgTableChanges}.tenant = ${cfgTableResult}.tenant AND ${cfgTableChanges}.id = ${cfgTableResult}.id LIMIT 1) LIMIT ${sqlParam2};`;
|
||||
baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
}, false, false, values);
|
||||
});
|
||||
}
|
||||
|
||||
exports.isLockCriticalSection = function (id) {
|
||||
return !!(g_oCriticalSection[id]);
|
||||
|
||||
@ -48,7 +48,6 @@ const editorDataStorage = require('./' + config.get('services.CoAuthoring.server
|
||||
|
||||
const cfgForgottenFiles = config.get('services.CoAuthoring.server.forgottenfiles');
|
||||
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult');
|
||||
const cfgTableChanges = config.get('services.CoAuthoring.sql.tableChanges');
|
||||
|
||||
var cfgRedisPrefix = configCoAuthoring.get('redis.prefix');
|
||||
var redisKeyShutdown = cfgRedisPrefix + constants.REDIS_KEY_SHUTDOWN;
|
||||
@ -58,18 +57,7 @@ var LOOP_TIMEOUT = 1000;
|
||||
var EXEC_TIMEOUT = WAIT_TIMEOUT + utils.getConvertionTimeout(undefined);
|
||||
|
||||
let addSqlParam = sqlBase.baseConnector.addSqlParameter;
|
||||
function getDocumentsWithChanges(ctx) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
let sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE EXISTS(SELECT id FROM ${cfgTableChanges} WHERE tenant=${cfgTableResult}.tenant AND id = ${cfgTableResult}.id LIMIT 1);`;
|
||||
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
}, undefined, undefined);
|
||||
});
|
||||
}
|
||||
|
||||
function updateDoc(ctx, docId, status, callback) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
let values = [];
|
||||
@ -109,7 +97,7 @@ function shutdown() {
|
||||
ctx.logger.debug('shutdown start wait pubsub deliver');
|
||||
yield utils.sleep(LOOP_TIMEOUT);
|
||||
|
||||
let documentsWithChanges = yield getDocumentsWithChanges(ctx);
|
||||
let documentsWithChanges = yield sqlBase.baseConnector.getDocumentsWithChanges(ctx);
|
||||
ctx.logger.debug('shutdown docs with changes count = %s', documentsWithChanges.length);
|
||||
let docsWithEmptyForgotten = [];
|
||||
let docsWithOutOfDateForgotten = [];
|
||||
|
||||
@ -35,14 +35,18 @@
|
||||
const sql = require("mssql");
|
||||
const config = require('config');
|
||||
const connectorUtilities = require('./connectorUtilities');
|
||||
const utils = require('./../../Common/sources/utils');
|
||||
|
||||
const configSql = config.get('services.CoAuthoring.sql');
|
||||
const cfgTableResult = configSql.get('tableResult');
|
||||
const cfgTableChanges = configSql.get('tableChanges');
|
||||
const cfgMaxPacketSize = configSql.get('max_allowed_packet');
|
||||
|
||||
const connectionConfiguration = {
|
||||
user: configSql.get('dbUser'),
|
||||
password: configSql.get('dbPass'),
|
||||
server: configSql.get('dbHost'),
|
||||
port: configSql.get('dbPort'),
|
||||
database: configSql.get('dbName'),
|
||||
pool: {
|
||||
max: configSql.get('connectionlimit'),
|
||||
@ -102,8 +106,16 @@ function convertPlaceholdersValues(values) {
|
||||
}
|
||||
|
||||
function registerPlaceholderValues(values, statement) {
|
||||
for (const key of Object.keys(values)) {
|
||||
statement.input(key, dataType(values[key]));
|
||||
if (values._typesMetadata !== undefined) {
|
||||
for (const placeholderName of Object.keys(values._typesMetadata)) {
|
||||
statement.input(placeholderName, values._typesMetadata[placeholderName]);
|
||||
}
|
||||
|
||||
delete values._typesMetadata;
|
||||
} else {
|
||||
for (const key of Object.keys(values)) {
|
||||
statement.input(key, dataType(values[key]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -150,19 +162,28 @@ async function executeSql(ctx, sqlCommand, values = {}, noModifyRes = false, noL
|
||||
}
|
||||
|
||||
async function executeBulk(ctx, table) {
|
||||
const pool = new sql.ConnectionPool(configuration);
|
||||
await pool.connect();
|
||||
const request = pool.request();
|
||||
await request.bulk(table);
|
||||
// await sql.connect(configuration);
|
||||
// await new sql.Request().bulk(table);
|
||||
try {
|
||||
await sql.connect(configuration);
|
||||
const result = await new sql.Request().bulk(table);
|
||||
|
||||
return { affectedRows: 0};
|
||||
return { affectedRows: result?.rowsAffected ?? 0 };
|
||||
} catch (error) {
|
||||
errorHandle(`sqlQuery() error while executing bulk for table ${table.name}`, error, ctx);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
function addSqlParameterObjectBased(parameter, name, accumulatedObject) {
|
||||
accumulatedObject[`${placeholderPrefix}${name}`] = parameter;
|
||||
return `@${placeholderPrefix}${name}`;
|
||||
function addSqlParameterObjectBased(parameter, name, type, accumulatedObject) {
|
||||
if (accumulatedObject._typesMetadata === undefined) {
|
||||
accumulatedObject._typesMetadata = {};
|
||||
}
|
||||
|
||||
const placeholder = `${placeholderPrefix}${name}`;
|
||||
accumulatedObject[placeholder] = parameter;
|
||||
accumulatedObject._typesMetadata[placeholder] = type;
|
||||
|
||||
return `@${placeholder}`;
|
||||
}
|
||||
|
||||
function addSqlParameter(parameter, accumulatedArray) {
|
||||
@ -179,6 +200,26 @@ function getTableColumns(ctx, tableName) {
|
||||
return executeSql(ctx, sqlCommand);
|
||||
}
|
||||
|
||||
function getDocumentsWithChanges(ctx) {
|
||||
const existingId = `SELECT TOP(1) id FROM ${cfgTableChanges} WHERE tenant=${cfgTableResult}.tenant AND id = ${cfgTableResult}.id`;
|
||||
const sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE EXISTS(${existingId});`;
|
||||
|
||||
return executeSql(ctx, sqlCommand);
|
||||
}
|
||||
|
||||
function getExpired(ctx, maxCount, expireSeconds) {
|
||||
const expireDate = new Date();
|
||||
utils.addSeconds(expireDate, -expireSeconds);
|
||||
|
||||
const values = {};
|
||||
const date = addSqlParameterObjectBased(expireDate, 'expireDate', sql.TYPES.DateTime(), values);
|
||||
const count = addSqlParameterObjectBased(maxCount, 'maxCount', sql.TYPES.Int(), values);
|
||||
const notExistingTenantAndId = `SELECT TOP(1) tenant, id FROM ${cfgTableChanges} WHERE ${cfgTableChanges}.tenant = ${cfgTableResult}.tenant AND ${cfgTableChanges}.id = ${cfgTableResult}.id`
|
||||
const sqlCommand = `SELECT TOP(${count}) * FROM ${cfgTableResult} WHERE last_open_date <= ${date} AND NOT EXISTS(${notExistingTenantAndId});`;
|
||||
|
||||
return executeSql(ctx, sqlCommand, values);
|
||||
}
|
||||
|
||||
async function upsert(ctx, task, opt_updateUserIndex) {
|
||||
task.completeDefaults();
|
||||
|
||||
@ -193,15 +234,15 @@ async function upsert(ctx, task, opt_updateUserIndex) {
|
||||
|
||||
const values = {};
|
||||
const insertValuesPlaceholder = [
|
||||
addSqlParameterObjectBased(task.tenant, 'tenant', values),
|
||||
addSqlParameterObjectBased(task.key, 'key', values),
|
||||
addSqlParameterObjectBased(task.status, 'status', values),
|
||||
addSqlParameterObjectBased(task.statusInfo, 'statusInfo', values),
|
||||
addSqlParameterObjectBased(dateNow, 'dateNow', values),
|
||||
addSqlParameterObjectBased(task.userIndex, 'userIndex', values),
|
||||
addSqlParameterObjectBased(task.changeId, 'changeId', values),
|
||||
addSqlParameterObjectBased(cbInsert, 'cbInsert', values),
|
||||
addSqlParameterObjectBased(task.baseurl, 'baseurl', values),
|
||||
addSqlParameterObjectBased(task.tenant, 'tenant', sql.TYPES.NVarChar(510), values),
|
||||
addSqlParameterObjectBased(task.key, 'key', sql.TYPES.NVarChar(510), values),
|
||||
addSqlParameterObjectBased(task.status, 'status', sql.TYPES.SmallInt(), values),
|
||||
addSqlParameterObjectBased(task.statusInfo, 'statusInfo', sql.TYPES.Int(), values),
|
||||
addSqlParameterObjectBased(dateNow, 'dateNow', sql.TYPES.DateTime(), values),
|
||||
addSqlParameterObjectBased(task.userIndex, 'userIndex', sql.TYPES.Decimal(18, 0), values),
|
||||
addSqlParameterObjectBased(task.changeId, 'changeId', sql.TYPES.Decimal(18, 0), values),
|
||||
addSqlParameterObjectBased(cbInsert, 'cbInsert', sql.TYPES.NVarChar(sql.MAX), values),
|
||||
addSqlParameterObjectBased(task.baseurl, 'baseurl', sql.TYPES.NVarChar(sql.MAX), values),
|
||||
];
|
||||
|
||||
const tenant = insertValuesPlaceholder[0];
|
||||
@ -217,7 +258,7 @@ async function upsert(ctx, task, opt_updateUserIndex) {
|
||||
let updateColumns = `target.last_open_date = ${lastOpenDate}`;
|
||||
|
||||
if (task.callback) {
|
||||
const parameter = addSqlParameterObjectBased(JSON.stringify(task.callback), 'callback', values);
|
||||
const parameter = addSqlParameterObjectBased(JSON.stringify(task.callback), 'callback', sql.TYPES.NVarChar(sql.MAX), values);
|
||||
const concatenatedColumns = concatParams(
|
||||
'target.callback', `'${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":'`, '(target.user_index + 1)', `',"callback":'`, parameter, `'}'`
|
||||
);
|
||||
@ -259,68 +300,34 @@ async function insertChangesAsync(ctx, tableChanges, startIndex, objChanges, doc
|
||||
return { affectedRows: 0 };
|
||||
}
|
||||
|
||||
// const table = new sql.Table(tableChanges);
|
||||
// table.columns.add('tenant', sql.NVarChar, { nullable: false, length: 'max' });
|
||||
// table.columns.add('id', sql.NVarChar, { nullable: false, length: 'max' });
|
||||
// table.columns.add('change_id', sql.Int, { nullable: false });
|
||||
// table.columns.add('user_id', sql.NVarChar, { nullable: false , length: 'max' });
|
||||
// table.columns.add('user_id_original', sql.NVarChar, { nullable: false, length: 'max' });
|
||||
// table.columns.add('user_name', sql.NVarChar, { nullable: false, length: 'max' });
|
||||
// table.columns.add('change_data', sql.NVarChar, { nullable: false, length: 'max' });
|
||||
// table.columns.add('change_date', sql.DateTime2, { nullable: false });
|
||||
//
|
||||
// const msSqlParametersCapacity = 1000000000;
|
||||
// const parametersInQuery = 8;
|
||||
// const rowsLimit = Math.trunc(msSqlParametersCapacity / parametersInQuery);
|
||||
//
|
||||
// let rowCounts = 1;
|
||||
// let currentIndex = startIndex;
|
||||
// for (; currentIndex < objChanges.length && rowCounts <= rowsLimit; ++currentIndex, ++index) {
|
||||
// table.rows.add(ctx.tenant, docId, index, user.id, user.idOriginal, user.username, objChanges[currentIndex].change, objChanges[currentIndex].time);
|
||||
// }
|
||||
// return await executeBulk(ctx, table);
|
||||
const table = new sql.Table(tableChanges);
|
||||
table.columns.add('tenant', sql.TYPES.NVarChar(sql.MAX), { nullable: false, length: 'max' });
|
||||
table.columns.add('id', sql.TYPES.NVarChar(sql.MAX), { nullable: false, length: 'max' });
|
||||
table.columns.add('change_id', sql.TYPES.Int, { nullable: false });
|
||||
table.columns.add('user_id', sql.TYPES.NVarChar(sql.MAX), { nullable: false , length: 'max' });
|
||||
table.columns.add('user_id_original', sql.TYPES.NVarChar(sql.MAX), { nullable: false, length: 'max' });
|
||||
table.columns.add('user_name', sql.TYPES.NVarChar(sql.MAX), { nullable: false, length: 'max' });
|
||||
table.columns.add('change_data', sql.TYPES.NVarChar(sql.MAX), { nullable: false, length: 'max' });
|
||||
table.columns.add('change_date', sql.TYPES.DateTime, { nullable: false });
|
||||
|
||||
|
||||
let sqlInsert = `INSERT INTO ${tableChanges} VALUES`;
|
||||
|
||||
const values = {};
|
||||
// MS SQL Server can handle only 1000 parameters in one query.
|
||||
const msSqlParametersCapacity = 1000;
|
||||
const parametersInQuery = 8;
|
||||
const rowsLimit = Math.trunc(msSqlParametersCapacity / parametersInQuery);
|
||||
|
||||
let rowCounts = 1;
|
||||
const indexBytes = 4;
|
||||
const timeBytes = 8;
|
||||
let bytes = 0;
|
||||
let currentIndex = startIndex;
|
||||
for (; currentIndex < objChanges.length && rowCounts <= rowsLimit; ++currentIndex, ++index) {
|
||||
if (rowCounts !== 1) {
|
||||
sqlInsert += ',';
|
||||
}
|
||||
for (; currentIndex < objChanges.length && bytes <= cfgMaxPacketSize; ++currentIndex, ++index) {
|
||||
bytes += indexBytes + timeBytes
|
||||
+ 4 * (ctx.tenant.length + docId.length + user.id.length + user.idOriginal.length + user.username.length + objChanges[currentIndex].change.length);
|
||||
|
||||
rowCounts++;
|
||||
|
||||
const valuesPlaceholder = [
|
||||
addSqlParameterObjectBased(ctx.tenant, `tenant${currentIndex}`, values),
|
||||
addSqlParameterObjectBased(docId, `docId${currentIndex}`, values),
|
||||
addSqlParameterObjectBased(index, `index${currentIndex}`, values),
|
||||
addSqlParameterObjectBased(user.id, `id${currentIndex}`, values),
|
||||
addSqlParameterObjectBased(user.idOriginal, `idOriginal${currentIndex}`, values),
|
||||
addSqlParameterObjectBased(user.username, `username${currentIndex}`, values),
|
||||
addSqlParameterObjectBased(objChanges[currentIndex].change, `change${currentIndex}`, values),
|
||||
addSqlParameterObjectBased(objChanges[currentIndex].time, `time${currentIndex}`, values)
|
||||
];
|
||||
|
||||
sqlInsert += `(${valuesPlaceholder.join(',')})`;
|
||||
table.rows.add(ctx.tenant, docId, index, user.id, user.idOriginal, user.username, objChanges[currentIndex].change, objChanges[currentIndex].time);
|
||||
}
|
||||
|
||||
sqlInsert += ';';
|
||||
|
||||
const result = await executeSql(ctx,sqlInsert, values);
|
||||
const result = await executeBulk(ctx, table);
|
||||
if (currentIndex < objChanges.length) {
|
||||
const recursiveValue = await insertChangesAsync(ctx, tableChanges, currentIndex, objChanges, docId, index, user);
|
||||
result.affectedRows += recursiveValue.affectedRows;
|
||||
}
|
||||
|
||||
return result;
|
||||
return result
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
@ -328,6 +335,8 @@ module.exports = {
|
||||
addSqlParameter,
|
||||
concatParams,
|
||||
getTableColumns,
|
||||
getDocumentsWithChanges,
|
||||
getExpired,
|
||||
upsert,
|
||||
insertChanges
|
||||
};
|
||||
|
||||
@ -34,14 +34,12 @@
|
||||
|
||||
const crypto = require('crypto');
|
||||
var sqlBase = require('./baseConnector');
|
||||
var utils = require('./../../Common/sources/utils');
|
||||
var constants = require('./../../Common/sources/constants');
|
||||
var commonDefines = require('./../../Common/sources/commondefines');
|
||||
var tenantManager = require('./../../Common/sources/tenantManager');
|
||||
var config = require('config');
|
||||
|
||||
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult');
|
||||
const cfgTableChanges = config.get('services.CoAuthoring.sql.tableChanges');
|
||||
|
||||
let addSqlParam = sqlBase.baseConnector.addSqlParameter;
|
||||
let concatParams = sqlBase.baseConnector.concatParams;
|
||||
@ -314,24 +312,6 @@ function removeIf(ctx, mask) {
|
||||
}, undefined, undefined, values);
|
||||
});
|
||||
}
|
||||
function getExpired(ctx, maxCount, expireSeconds) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
let values = [];
|
||||
let expireDate = new Date();
|
||||
utils.addSeconds(expireDate, -expireSeconds);
|
||||
let sqlParam1 = addSqlParam(expireDate, values);
|
||||
let sqlParam2 = addSqlParam(maxCount, values);
|
||||
let sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE last_open_date <= ${sqlParam1}` +
|
||||
` AND NOT EXISTS(SELECT tenant, id FROM ${cfgTableChanges} WHERE ${cfgTableChanges}.tenant = ${cfgTableResult}.tenant AND ${cfgTableChanges}.id = ${cfgTableResult}.id LIMIT 1) LIMIT ${sqlParam2};`;
|
||||
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
}, undefined, undefined, values);
|
||||
});
|
||||
}
|
||||
|
||||
exports.TaskResultData = TaskResultData;
|
||||
exports.upsert = upsert;
|
||||
@ -342,4 +322,4 @@ exports.restoreInitialPassword = restoreInitialPassword;
|
||||
exports.addRandomKeyTask = addRandomKeyTask;
|
||||
exports.remove = remove;
|
||||
exports.removeIf = removeIf;
|
||||
exports.getExpired = getExpired;
|
||||
exports.getExpired = sqlBase.baseConnector.getExpired;
|
||||
|
||||
Reference in New Issue
Block a user