diff --git a/DocService/sources/baseConnector.js b/DocService/sources/baseConnector.js index 964fba31..cc6f3db1 100644 --- a/DocService/sources/baseConnector.js +++ b/DocService/sources/baseConnector.js @@ -82,10 +82,7 @@ exports.loadTable = function (tableId, callbackFunction) { baseConnector.sqlQuery(sqlCommand, callbackFunction); }; exports.insertCallback = function(id, href, baseUrl, callbackFunction) { - var sqlCommand = "INSERT " + baseConnector.ignoreStr + " INTO " + tableCallbacks + " VALUES (" + baseConnector.sqlEscape(id) + "," + - baseConnector.sqlEscape(href) + "," + baseConnector.sqlEscape(baseUrl) + ") " + baseConnector.doNothingStr + ";"; - - baseConnector.sqlQuery(sqlCommand, callbackFunction); + baseConnector.insertCallback(id, href, baseUrl, callbackFunction); }; exports.insertCallbackPromise = function(id, href, baseUrl) { return new Promise(function(resolve, reject) { diff --git a/DocService/sources/mySqlBaseConnector.js b/DocService/sources/mySqlBaseConnector.js index a872b38e..ead210ac 100644 --- a/DocService/sources/mySqlBaseConnector.js +++ b/DocService/sources/mySqlBaseConnector.js @@ -1 +1 @@ -/* * (c) Copyright Ascensio System SIA 2010-2016 * * This program is a free software product. You can redistribute it and/or * modify it under the terms of the GNU Affero General Public License (AGPL) * version 3 as published by the Free Software Foundation. In accordance with * Section 7(a) of the GNU AGPL its Section 15 shall be amended to the effect * that Ascensio System SIA expressly excludes the warranty of non-infringement * of any third-party rights. * * This program is distributed WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For * details, see the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html * * You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, * EU, LV-1021. * * The interactive user interfaces in modified source and object code versions * of the Program must display Appropriate Legal Notices, as required under * Section 5 of the GNU AGPL version 3. * * Pursuant to Section 7(b) of the License you must retain the original Product * logo when distributing the program. Pursuant to Section 7(e) we decline to * grant you any rights under trademark law for use of our trademarks. * * All the Product's GUI elements, including illustrations and icon sets, as * well as technical writing content are licensed under the terms of the * Creative Commons Attribution-ShareAlike 4.0 International. See the License * terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode * */ var mysql = require('mysql'); var sqlBase = require('./baseConnector'); var configSql = require('config').get('services.CoAuthoring.sql'); var pool = mysql.createPool({ host : configSql.get('dbHost'), port : configSql.get('dbPort'), user : configSql.get('dbUser'), password : configSql.get('dbPass'), database : configSql.get('dbName'), charset : configSql.get('charset'), connectionLimit : configSql.get('connectionlimit'), timezone : '+0000', flags : '-FOUND_ROWS' }); var logger = require('./../../Common/sources/logger'); exports.sqlQuery = function (sqlCommand, callbackFunction) { pool.getConnection(function(err, connection) { if (err) { logger.error('pool.getConnection error: %s', err); if (callbackFunction) callbackFunction(err, null); return; } connection.query(sqlCommand, function (error, result) { connection.release(); if (error) { logger.error('________________________error_____________________'); logger.error('sqlQuery: %s sqlCommand: %s', error.code, sqlCommand); logger.error(error); logger.error('_____________________end_error_____________________'); } if (callbackFunction) callbackFunction(error, result); }); }); }; exports.sqlEscape = function (value) { return pool.escape(value); }; exports.ignoreStr = 'IGNORE'; exports.doNothingStr = ''; function getUpsertString(task, opt_updateUserIndex) { task.completeDefaults(); var dateNow = sqlBase.getDateTime(new Date()); var commandArg = [task.key, task.status, task.statusInfo, dateNow, task.title, task.userIndex, task.changeId]; var commandArgEsc = commandArg.map(function(curVal) { return exports.sqlEscape(curVal) }); var sql = 'INSERT INTO task_result ( id, status, status_info, last_open_date, title,' + ' user_index, change_id ) VALUES (' + commandArgEsc.join(', ') + ') ON DUPLICATE KEY UPDATE' + ' last_open_date = ' + exports.sqlEscape(dateNow); if (opt_updateUserIndex) { sql += ', user_index = LAST_INSERT_ID(user_index + 1);'; } else { sql += ';'; } return sql; } exports.upsert = function(task, opt_updateUserIndex) { return new Promise(function(resolve, reject) { var sqlCommand = getUpsertString(task, opt_updateUserIndex); exports.sqlQuery(sqlCommand, function(error, result) { if (error) { reject(error); } else { resolve(result); } }); }); }; \ No newline at end of file +/* * (c) Copyright Ascensio System SIA 2010-2016 * * This program is a free software product. You can redistribute it and/or * modify it under the terms of the GNU Affero General Public License (AGPL) * version 3 as published by the Free Software Foundation. In accordance with * Section 7(a) of the GNU AGPL its Section 15 shall be amended to the effect * that Ascensio System SIA expressly excludes the warranty of non-infringement * of any third-party rights. * * This program is distributed WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For * details, see the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html * * You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, * EU, LV-1021. * * The interactive user interfaces in modified source and object code versions * of the Program must display Appropriate Legal Notices, as required under * Section 5 of the GNU AGPL version 3. * * Pursuant to Section 7(b) of the License you must retain the original Product * logo when distributing the program. Pursuant to Section 7(e) we decline to * grant you any rights under trademark law for use of our trademarks. * * All the Product's GUI elements, including illustrations and icon sets, as * well as technical writing content are licensed under the terms of the * Creative Commons Attribution-ShareAlike 4.0 International. See the License * terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode * */ var mysql = require('mysql'); var sqlBase = require('./baseConnector'); var configSql = require('config').get('services.CoAuthoring.sql'); var pool = mysql.createPool({ host : configSql.get('dbHost'), port : configSql.get('dbPort'), user : configSql.get('dbUser'), password : configSql.get('dbPass'), database : configSql.get('dbName'), charset : configSql.get('charset'), connectionLimit : configSql.get('connectionlimit'), timezone : '+0000', flags : '-FOUND_ROWS' }); var cfgTableCallbacks = configSql.get('tableCallbacks'); var logger = require('./../../Common/sources/logger'); exports.sqlQuery = function (sqlCommand, callbackFunction) { pool.getConnection(function(err, connection) { if (err) { logger.error('pool.getConnection error: %s', err); if (callbackFunction) callbackFunction(err, null); return; } connection.query(sqlCommand, function (error, result) { connection.release(); if (error) { logger.error('________________________error_____________________'); logger.error('sqlQuery: %s sqlCommand: %s', error.code, sqlCommand); logger.error(error); logger.error('_____________________end_error_____________________'); } if (callbackFunction) callbackFunction(error, result); }); }); }; exports.sqlEscape = function (value) { return pool.escape(value); }; exports.insertCallback = function(id, href, baseUrl, callbackFunction) { var sqlCommand = "INSERT IGNORE INTO " + cfgTableCallbacks + " VALUES (" + exports.sqlEscape(id) + "," + exports.sqlEscape(href) + "," + exports.sqlEscape(baseUrl) + ");"; exports.sqlQuery(sqlCommand, callbackFunction); }; function getUpsertString(task, opt_updateUserIndex) { task.completeDefaults(); var dateNow = sqlBase.getDateTime(new Date()); var commandArg = [task.key, task.status, task.statusInfo, dateNow, task.title, task.userIndex, task.changeId]; var commandArgEsc = commandArg.map(function(curVal) { return exports.sqlEscape(curVal) }); var sql = 'INSERT INTO task_result ( id, status, status_info, last_open_date, title,' + ' user_index, change_id ) VALUES (' + commandArgEsc.join(', ') + ') ON DUPLICATE KEY UPDATE' + ' last_open_date = ' + exports.sqlEscape(dateNow); if (opt_updateUserIndex) { sql += ', user_index = LAST_INSERT_ID(user_index + 1);'; } else { sql += ';'; } return sql; } exports.upsert = function(task, opt_updateUserIndex) { return new Promise(function(resolve, reject) { var sqlCommand = getUpsertString(task, opt_updateUserIndex); exports.sqlQuery(sqlCommand, function(error, result) { if (error) { reject(error); } else { resolve(result); } }); }); }; \ No newline at end of file diff --git a/DocService/sources/postgreSqlBaseConnector.js b/DocService/sources/postgreSqlBaseConnector.js index 2757afbf..0a62ac3b 100644 --- a/DocService/sources/postgreSqlBaseConnector.js +++ b/DocService/sources/postgreSqlBaseConnector.js @@ -47,6 +47,7 @@ var pool = new pg.Pool({ ssl: false, idleTimeoutMillis: 30000 }); +var cfgTableCallbacks = configSql.get('tableCallbacks'); //todo datetime timezone types.setTypeParser(1114, function(stringValue) { return new Date(stringValue + '+0000'); @@ -57,7 +58,7 @@ types.setTypeParser(1184, function(stringValue) { var logger = require('./../../Common/sources/logger'); -exports.sqlQuery = function(sqlCommand, callbackFunction, opt_noModifyRes) { +exports.sqlQuery = function(sqlCommand, callbackFunction, opt_noModifyRes, opt_noLog) { co(function *() { var client = null; var result = null; @@ -67,10 +68,12 @@ exports.sqlQuery = function(sqlCommand, callbackFunction, opt_noModifyRes) { result = yield client.query(sqlCommand); } catch (err) { error = err; - if (client) { - logger.error('sqlQuery error sqlCommand: %s:\r\n%s', sqlCommand.slice(0, 50), err.stack); - } else { - logger.error('pool.getConnection error: %s', err); + if (!opt_noLog) { + if (client) { + logger.error('sqlQuery error sqlCommand: %s:\r\n%s', sqlCommand.slice(0, 50), err.stack); + } else { + logger.error('pool.getConnection error: %s', err); + } } } finally { if (client) { @@ -94,35 +97,67 @@ exports.sqlEscape = function(value) { //todo parameterized queries return undefined !== value ? pgEscape.literal(value.toString()) : 'NULL'; }; -exports.ignoreStr = ''; -exports.doNothingStr = 'ON CONFLICT DO NOTHING'; +var isSupportOnConflict = false; +(function checkIsSupportOnConflict() { + var sqlCommand = 'INSERT INTO checkIsSupportOnConflict (id) VALUES(1) ON CONFLICT DO NOTHING;'; + exports.sqlQuery(sqlCommand, function(error, result) { + if (error) { + if ('42601' == error.code) { + //SYNTAX ERROR + isSupportOnConflict = false; + logger.debug('checkIsSupportOnConflict false'); + } else if ('42P01' == error.code) { + // UNDEFINED TABLE + isSupportOnConflict = true; + logger.debug('checkIsSupportOnConflict true'); + } else { + logger.error('checkIsSupportOnConflict unexpected error code:\r\n%s', error.stack); + } + } + }, true, true); +})(); -function getUpsertString(task, opt_updateUserIndex) { +exports.insertCallback = function(id, href, baseUrl, callbackFunction) { + var sqlCommand = "INSERT INTO " + cfgTableCallbacks + " VALUES (" + exports.sqlEscape(id) + "," + + exports.sqlEscape(href) + "," + exports.sqlEscape(baseUrl) + ")"; + if (isSupportOnConflict) { + sqlCommand += ' ON CONFLICT DO NOTHING;'; + exports.sqlQuery(sqlCommand, callbackFunction); + } else { + sqlCommand += ';'; + exports.sqlQuery(sqlCommand, function(error, result) { + if (error && error.code == '23505') { + //UNIQUE VIOLATION + callbackFunction(null, result); + } else { + callbackFunction(error, result); + } + }); + } +}; + +function getUpsertString(task) { task.completeDefaults(); var dateNow = sqlBase.getDateTime(new Date()); var commandArg = [task.key, task.status, task.statusInfo, dateNow, task.title, task.userIndex, task.changeId]; var commandArgEsc = commandArg.map(function(curVal) { return exports.sqlEscape(curVal) }); - //http://stackoverflow.com/questions/34762732/how-to-find-out-if-an-upsert-was-an-update-with-postgresql-9-5-upsert - var sql = "INSERT INTO task_result (id, status, status_info, last_open_date, title, user_index, change_id) SELECT " + - commandArgEsc.join(', ') + - " WHERE 'false' = set_config('myapp.isupdate', 'false', true) ON CONFLICT (id) DO UPDATE SET last_open_date = " + - sqlBase.baseConnector.sqlEscape(dateNow); - if (opt_updateUserIndex) { - sql += ', user_index = task_result.user_index + 1'; + if (isSupportOnConflict) { + //http://stackoverflow.com/questions/34762732/how-to-find-out-if-an-upsert-was-an-update-with-postgresql-9-5-upsert + return "INSERT INTO task_result (id, status, status_info, last_open_date, title, user_index, change_id) SELECT " + + commandArgEsc.join(', ') + + " WHERE 'false' = set_config('myapp.isupdate', 'false', true) ON CONFLICT (id) DO UPDATE SET last_open_date = " + + sqlBase.baseConnector.sqlEscape(dateNow) + + ", user_index = task_result.user_index + 1 WHERE 'true' = set_config('myapp.isupdate', 'true', true) RETURNING" + + " current_setting('myapp.isupdate') as isupdate, user_index as userindex;"; + } else { + return "SELECT * FROM merge_db(" + commandArgEsc.join(', ') + ");"; } - sql += - " WHERE 'true' = set_config('myapp.isupdate', 'true', true) RETURNING current_setting('myapp.isupdate') as update"; - if (opt_updateUserIndex) { - sql += ', user_index'; - } - sql += ';'; - return sql; } -exports.upsert = function(task, opt_updateUserIndex) { +exports.upsert = function(task) { return new Promise(function(resolve, reject) { - var sqlCommand = getUpsertString(task, opt_updateUserIndex); + var sqlCommand = getUpsertString(task); exports.sqlQuery(sqlCommand, function(error, result) { if (error) { reject(error); @@ -130,8 +165,8 @@ exports.upsert = function(task, opt_updateUserIndex) { if (result && result.rows.length > 0) { var first = result.rows[0]; result = {affectedRows: 0, insertId: 0}; - result.affectedRows = 'true' == first.update ? 2 : 1; - result.insertId = opt_updateUserIndex ? first.user_index : 0; + result.affectedRows = 'true' == first.isupdate ? 2 : 1; + result.insertId = first.userindex; } resolve(result); } diff --git a/schema/postgresql/createdb.sql b/schema/postgresql/createdb.sql index 99831f0d..2d05cab8 100644 --- a/schema/postgresql/createdb.sql +++ b/schema/postgresql/createdb.sql @@ -49,4 +49,33 @@ CREATE TABLE IF NOT EXISTS "public"."task_result" ( "change_id" int8 NOT NULL DEFAULT 0, PRIMARY KEY ("id") ) -WITH (OIDS=FALSE); \ No newline at end of file +WITH (OIDS=FALSE); + +--https://www.postgresql.org/docs/current/static/plpgsql-control-structures.html#PLPGSQL-UPSERT-EXAMPLE +CREATE OR REPLACE FUNCTION merge_db(_id varchar(255), _status int2, _status_info int8, _last_open_date timestamp without time zone, _title varchar(255), _user_index int8, _change_id int8, OUT isupdate char(5), OUT userindex int8) AS +$$ +DECLARE + t_var "public"."task_result"."user_index"%TYPE; +BEGIN + LOOP + -- first try to update the key + -- note that "a" must be unique + UPDATE "public"."task_result" SET last_open_date=_last_open_date, user_index=user_index+1 WHERE id = _id RETURNING user_index into userindex; + IF found THEN + isupdate := 'true'; + RETURN; + END IF; + -- not there, so try to insert the key + -- if someone else inserts the same key concurrently, + -- we could get a unique-key failure + BEGIN + INSERT INTO "public"."task_result"(id, status, status_info, last_open_date, title, user_index, change_id) VALUES(_id, _status, _status_info, _last_open_date, _title, _user_index, _change_id) RETURNING user_index into userindex; + isupdate := 'false'; + RETURN; + EXCEPTION WHEN unique_violation THEN + -- do nothing, and loop to try the UPDATE again + END; + END LOOP; +END; +$$ +LANGUAGE plpgsql;