merge db table doc_callbacks into task_result

This commit is contained in:
konovalovsergey
2017-02-01 16:47:29 +03:00
parent 926f8ce8f5
commit 30a82d093f
9 changed files with 45 additions and 122 deletions

View File

@ -65,7 +65,6 @@
"sql": {
"type": "postgres",
"tableChanges": "doc_changes",
"tableCallbacks": "doc_callbacks",
"tableResult": "task_result",
"dbHost": "localhost",
"dbPort": 5432,

View File

@ -608,14 +608,10 @@ function parseUrl(callbackUrl) {
return result;
}
function* deleteCallback(id) {
// Нужно удалить из базы callback-ов
yield sqlBase.deleteCallbackPromise(id);
}
function* getCallback(id) {
var callbackUrl = null;
var baseUrl = null;
var selectRes = yield sqlBase.getCallbackPromise(id);
var selectRes = yield taskResult.select(id);
if (selectRes.length > 0) {
var row = selectRes[0];
if (row.callback) {
@ -631,9 +627,6 @@ function* getCallback(id) {
return null;
}
}
function* addCallback(id, href, baseUrl) {
yield sqlBase.insertCallbackPromise(id, href, baseUrl);
}
function* getChangesIndex(docId) {
var res = 0;
var redisRes = yield utils.promiseRedis(redisClient, redisClient.get, redisKeyChangeIndex + docId);
@ -734,10 +727,20 @@ function* sendStatusDocument(docId, bChangeBase, userAction, callback, baseUrl,
if (c_oAscChangeBase.No !== bChangeBase) {
if (c_oAscServerStatus.Editing === status && c_oAscChangeBase.All === bChangeBase) {
// Добавить в базу
yield* addCallback(docId, callback.href, baseUrl);
} else if (c_oAscServerStatus.Closed === status) {
// Удалить из базы
yield* deleteCallback(docId);
var updateMask = new taskResult.TaskResultData();
updateMask.key = docId;
updateMask.callback = '';
updateMask.baseurl = '';
var updateTask = new taskResult.TaskResultData();
updateTask.key = docId;
updateTask.callback = callback.href;
updateTask.baseurl = baseUrl;
var updateIfRes = yield taskResult.updateIf(updateTask, updateMask);
if (updateIfRes.affectedRows > 0) {
logger.debug('sendStatusDocument updateIf: docId = %s', docId);
} else {
logger.debug('sendStatusDocument updateIf no effect: docId = %s', docId);
}
}
}
@ -839,8 +842,6 @@ function* cleanDocumentOnExit(docId, deleteChanges) {
var redisArgs = [redisClient, redisClient.del, redisKeyLocks + docId,
redisKeyMessage + docId, redisKeyChangeIndex + docId, redisKeyForceSave + docId, redisKeyLastSave + docId];
utils.promiseRedis.apply(this, redisArgs);
//remove callback
yield* deleteCallback(docId);
//remove changes
if (deleteChanges) {
sqlBase.deleteChanges(docId, null);
@ -1628,7 +1629,7 @@ exports.install = function(server, callbackFunction) {
// Если восстанавливаем, индекс тоже восстанавливаем
curIndexUser = user.indexUser;
} else {
upsertRes = yield canvasService.commandOpenStartPromise(docId, cmd, true);
upsertRes = yield canvasService.commandOpenStartPromise(docId, cmd, true, data.documentCallbackUrl, utils.getBaseUrlByConnection(conn));
upsertRes.affectedRows == 1 ? curIndexUser = 1 : curIndexUser = upsertRes.insertId;
}
if (constants.CONN_CLOSED === conn.readyState) {

View File

@ -39,7 +39,6 @@ var config = require('config').get('services.CoAuthoring.sql');
var baseConnector = (sqlDataBaseType.mySql === config.get('type')) ? require('./mySqlBaseConnector') : require('./postgreSqlBaseConnector');
var tableChanges = config.get('tableChanges'),
tableCallbacks = config.get('tableCallbacks'),
tableResult = config.get('tableResult');
var g_oCriticalSection = {};
@ -64,9 +63,6 @@ var c_oTableId = {
function getTableById (id) {
var res;
switch (id) {
case c_oTableId.callbacks:
res = tableCallbacks;
break;
case c_oTableId.changes:
res = tableChanges;
break;
@ -81,20 +77,6 @@ exports.loadTable = function (tableId, callbackFunction) {
var sqlCommand = "SELECT * FROM " + table + ";";
baseConnector.sqlQuery(sqlCommand, callbackFunction);
};
exports.insertCallback = function(id, href, baseUrl, callbackFunction) {
baseConnector.insertCallback(id, href, baseUrl, callbackFunction);
};
exports.insertCallbackPromise = function(id, href, baseUrl) {
return new Promise(function(resolve, reject) {
exports.insertCallback(id, href, baseUrl, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
};
exports.insertChanges = function (objChanges, docId, index, user) {
lockCriticalSection(docId, function () {_insertChanges(0, objChanges, docId, index, user);});
};
@ -185,34 +167,6 @@ exports.deleteChanges = function (docId, deleteIndex) {
function _deleteChanges (docId, deleteIndex) {
exports.deleteChangesCallback(docId, deleteIndex, function () {unLockCriticalSection(docId);});
}
exports.getCallback = function(docId, callback) {
getDataFromTable(c_oTableId.callbacks, "*", "id='" + docId + "'", callback);
};
exports.getCallbackPromise = function(docId) {
return new Promise(function(resolve, reject) {
exports.getCallback(docId, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
};
exports.deleteCallback = function (docId, callback) {
deleteFromTable(c_oTableId.callbacks, "id='" + docId + "'", callback);
};
exports.deleteCallbackPromise = function (docId) {
return new Promise(function(resolve, reject) {
exports.deleteCallback(docId, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
};
exports.getChangesIndex = function(docId, callback) {
var table = getTableById(c_oTableId.changes);
var sqlCommand = 'SELECT MAX(change_id) as change_id FROM ' + table + ' WHERE id=' + baseConnector.sqlEscape(docId) + ';';

File diff suppressed because one or more lines are too long

View File

@ -1 +1 @@
/*
/*

View File

@ -47,7 +47,6 @@ var pool = new pg.Pool({
ssl: false,
idleTimeoutMillis: 30000
});
var cfgTableCallbacks = configSql.get('tableCallbacks');
//todo datetime timezone
types.setTypeParser(1114, function(stringValue) {
return new Date(stringValue + '+0000');
@ -117,37 +116,18 @@ var isSupportOnConflict = false;
}, true, true);
})();
exports.insertCallback = function(id, href, baseUrl, callbackFunction) {
var sqlCommand = "INSERT INTO " + cfgTableCallbacks + " VALUES (" + exports.sqlEscape(id) + "," +
exports.sqlEscape(href) + "," + exports.sqlEscape(baseUrl) + ")";
if (isSupportOnConflict) {
sqlCommand += ' ON CONFLICT DO NOTHING;';
exports.sqlQuery(sqlCommand, callbackFunction);
} else {
sqlCommand += ';';
exports.sqlQuery(sqlCommand, function(error, result) {
if (error && error.code == '23505') {
//UNIQUE VIOLATION
callbackFunction(null, result);
} else {
callbackFunction(error, result);
}
});
}
};
function getUpsertString(task) {
task.completeDefaults();
var dateNow = sqlBase.getDateTime(new Date());
var commandArg = [task.key, task.status, task.statusInfo, dateNow, task.userIndex, task.changeId];
var commandArg = [task.key, task.status, task.statusInfo, dateNow, task.userIndex, task.changeId, task.callback, task.baseurl];
var commandArgEsc = commandArg.map(function(curVal) {
return exports.sqlEscape(curVal)
});
if (isSupportOnConflict) {
//http://stackoverflow.com/questions/34762732/how-to-find-out-if-an-upsert-was-an-update-with-postgresql-9-5-upsert
return "INSERT INTO task_result (id, status, status_info, last_open_date, user_index, change_id) SELECT " +
commandArgEsc.join(', ') +
" WHERE 'false' = set_config('myapp.isupdate', 'false', true) ON CONFLICT (id) DO UPDATE SET last_open_date = " +
return "INSERT INTO task_result (id, status, status_info, last_open_date, user_index, change_id, callback," +
" baseurl) SELECT " + commandArgEsc.join(', ') + " WHERE 'false' = set_config('myapp.isupdate', 'false', true) " +
"ON CONFLICT (id) DO UPDATE SET last_open_date = " +
sqlBase.baseConnector.sqlEscape(dateNow) +
", user_index = task_result.user_index + 1 WHERE 'true' = set_config('myapp.isupdate', 'true', true) RETURNING" +
" current_setting('myapp.isupdate') as isupdate, user_index as userindex;";

View File

@ -57,6 +57,8 @@ function TaskResultData() {
this.lastOpenDate = null;
this.userIndex = null;
this.changeId = null;
this.callback = null;
this.baseurl = null;
}
TaskResultData.prototype.completeDefaults = function() {
if (!this.key) {
@ -77,6 +79,12 @@ TaskResultData.prototype.completeDefaults = function() {
if (!this.changeId) {
this.changeId = 0;
}
if (!this.callback) {
this.callback = '';
}
if (!this.baseurl) {
this.baseurl = '';
}
};
function upsert(task, opt_updateUserIndex) {
@ -116,6 +124,12 @@ function toUpdateArray(task, updateTime) {
if (null != task.changeId) {
res.push('change_id=' + sqlBase.baseConnector.sqlEscape(task.changeId));
}
if (null != task.callback) {
res.push('callback=' + sqlBase.baseConnector.sqlEscape(task.callback));
}
if (null != task.baseurl) {
res.push('baseurl=' + sqlBase.baseConnector.sqlEscape(task.baseurl));
}
return res;
}
function getUpdateString(task) {
@ -160,12 +174,12 @@ function updateIf(task, mask) {
function getInsertString(task) {
var dateNow = sqlBase.getDateTime(new Date());
task.completeDefaults();
var commandArg = [task.key, task.status, task.statusInfo, dateNow, task.userIndex, task.changeId];
var commandArg = [task.key, task.status, task.statusInfo, dateNow, task.userIndex, task.changeId, task.callback, task.baseurl];
var commandArgEsc = commandArg.map(function(curVal) {
return sqlBase.baseConnector.sqlEscape(curVal)
});
return 'INSERT INTO task_result ( id, status, status_info, last_open_date, user_index, change_id) VALUES (' +
commandArgEsc.join(', ') + ');';
return 'INSERT INTO task_result ( id, status, status_info, last_open_date, user_index, change_id, callback,' +
' baseurl) VALUES (' + commandArgEsc.join(', ') + ');';
}
function addRandomKey(task) {
return new Promise(function(resolve, reject) {

View File

@ -28,24 +28,6 @@ DROP TABLE IF EXISTS `doc_callbacks`;
DROP TABLE IF EXISTS `doc_changes`;
DROP TABLE IF EXISTS `task_result`;
--
-- Definition of table `doc_callbacks`
--
CREATE TABLE IF NOT EXISTS `doc_callbacks` (
`id` varchar(255) NOT NULL,
`callback` text NOT NULL,
`baseurl` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--
-- Dumping data for table `doc_callbacks`
--
/*!40000 ALTER TABLE `doc_callbacks` DISABLE KEYS */;
/*!40000 ALTER TABLE `doc_callbacks` ENABLE KEYS */;
--
-- Definition of table `doc_changes`
--
@ -79,6 +61,8 @@ CREATE TABLE IF NOT EXISTS `task_result` (
`last_open_date` datetime NOT NULL,
`user_index` int(10) unsigned NOT NULL DEFAULT 1,
`change_id` int(10) unsigned NOT NULL DEFAULT 0,
`callback` text NOT NULL,
`baseurl` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

View File

@ -4,17 +4,6 @@
-- CREATE DATABASE onlyoffice ENCODING = 'UTF8' CONNECTION LIMIT = -1;
-- ----------------------------
-- Table structure for doc_callbacks
-- ----------------------------
CREATE TABLE IF NOT EXISTS "public"."doc_callbacks" (
"id" varchar(255) COLLATE "default" NOT NULL,
"callback" text COLLATE "default" NOT NULL,
"baseurl" text COLLATE "default" NOT NULL,
PRIMARY KEY ("id")
)
WITH (OIDS=FALSE);
-- ----------------------------
-- Table structure for doc_changes
-- ----------------------------
@ -40,11 +29,13 @@ CREATE TABLE IF NOT EXISTS "public"."task_result" (
"last_open_date" timestamp without time zone NOT NULL,
"user_index" int4 NOT NULL DEFAULT 1,
"change_id" int4 NOT NULL DEFAULT 0,
"callback" text COLLATE "default" NOT NULL,
"baseurl" text COLLATE "default" NOT NULL,
PRIMARY KEY ("id")
)
WITH (OIDS=FALSE);
CREATE OR REPLACE FUNCTION merge_db(_id varchar(255), _status int2, _status_info int4, _last_open_date timestamp without time zone, _user_index int4, _change_id int4, OUT isupdate char(5), OUT userindex int4) AS
CREATE OR REPLACE FUNCTION merge_db(_id varchar(255), _status int2, _status_info int4, _last_open_date timestamp without time zone, _user_index int4, _change_id int4, _callback text, _baseurl text, OUT isupdate char(5), OUT userindex int4) AS
$$
DECLARE
t_var "public"."task_result"."user_index"%TYPE;
@ -61,7 +52,7 @@ BEGIN
-- if someone else inserts the same key concurrently,
-- we could get a unique-key failure
BEGIN
INSERT INTO "public"."task_result"(id, status, status_info, last_open_date, user_index, change_id) VALUES(_id, _status, _status_info, _last_open_date, _user_index, _change_id) RETURNING user_index into userindex;
INSERT INTO "public"."task_result"(id, status, status_info, last_open_date, user_index, change_id, callback, baseurl) VALUES(_id, _status, _status_info, _last_open_date, _user_index, _change_id, _callback, _baseurl) RETURNING user_index into userindex;
isupdate := 'false';
RETURN;
EXCEPTION WHEN unique_violation THEN