merge from TeamlabOffice_v3.7_ServerExpire

git-svn-id: svn://192.168.3.15/activex/AVS/Sources/TeamlabOffice/trunk/nodeJSProjects@68971 954022d7-b5bf-4e40-9824-e11837661b57
This commit is contained in:
Sergey.Konovalov
2016-03-18 14:12:49 +00:00
committed by Alexander.Trofimov
parent 9f8d64f434
commit a83889c4d8
9 changed files with 339 additions and 206 deletions

View File

@ -84,10 +84,9 @@
"maxChanges": 1000
},
"expire": {
"callback": 3600,
"userindex": 604800,
"saveLock": 60,
"editors": 604800,
"presence": 300,
"locks": 604800,
"changeindex": 86400,
"lockDoc": 60,
@ -95,10 +94,9 @@
"lastsave": 604800,
"forcesave": 604800,
"saved": 3600,
"documents": 300,
"documentsCron": "0 */5 * * * *",
"files": 604800,
"filesCron": "00 00 */1 * * *",
"documentsCron": "0 */2 * * * *",
"files": 300,
"filesCron": "00 */5 * * * *",
"filesremovedatonce": 10
}
}

View File

@ -531,6 +531,19 @@ function OutputAction(type, userid) {
this['type'] = type;
this['userid'] = userid;
}
var c_oPublishType = {
drop : 0,
releaseLock : 1,
participantsState : 2,
message : 3,
getLock : 4,
changes : 5,
auth : 6,
receiveTask : 7,
warning: 8,
cursor: 9,
expireDoc: 10
};
var c_oAscCsvDelimiter = {
None: 0,
Tab: 1,
@ -632,6 +645,7 @@ exports.InputCommand = InputCommand;
exports.OutputSfcData = OutputSfcData;
exports.OutputMailMerge = OutputMailMerge;
exports.OutputAction = OutputAction;
exports.c_oPublishType = c_oPublishType;
exports.c_oAscCsvDelimiter = c_oAscCsvDelimiter;
exports.c_oAscEncodings = c_oAscEncodings;
exports.c_oAscEncodingsMap = c_oAscEncodingsMap;

View File

@ -118,4 +118,18 @@ exports.QUEUE_PRIORITY_RESPONSE = 3;
exports.EDITOR_TYPE_WORD = 0;
exports.EDITOR_TYPE_SPREADSHEET = 1;
exports.EDITOR_TYPE_PRESENTATION = 2;
exports.EDITOR_TYPE_CONVERTATION = 3;
exports.EDITOR_TYPE_CONVERTATION = 3;
exports.REDIS_KEY_PUBSUB = 'pubsub';
exports.REDIS_KEY_USER_INDEX = 'userindex:';
exports.REDIS_KEY_SAVE_LOCK = 'savelock:';
exports.REDIS_KEY_PRESENCE_HASH = 'presence:hash:';
exports.REDIS_KEY_PRESENCE_SET = 'presence:set:';
exports.REDIS_KEY_LOCKS = 'locks:';
exports.REDIS_KEY_CHANGES_INDEX = 'changesindex:';
exports.REDIS_KEY_LOCK_DOCUMENT = 'lockdocument:';
exports.REDIS_KEY_MESSAGE = 'message:';
exports.REDIS_KEY_DOCUMENTS = 'documents';
exports.REDIS_KEY_LAST_SAVE = 'lastsave:';
exports.REDIS_KEY_FORCE_SAVE = 'forcesave:';
exports.REDIS_KEY_SAVED = 'saved:';

View File

@ -133,6 +133,18 @@ util.inherits(TaskQueueRabbitMQ, events.EventEmitter);
TaskQueueRabbitMQ.prototype.init = function (isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, callback) {
init(this, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, callback);
};
TaskQueueRabbitMQ.prototype.initPromise = function(isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive) {
var t = this;
return new Promise(function(resolve, reject) {
init(t, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, function(err) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
};
TaskQueueRabbitMQ.prototype.addTask = function (task, priority) {
//todo confirmation mode
var t = this;

View File

@ -44,7 +44,6 @@ var _ = require('underscore');
var https = require('https');
var http = require('http');
var url = require('url');
var cron = require('cron');
var co = require('co');
var storage = require('./../../Common/sources/storage-base');
var logger = require('./../../Common/sources/logger');
@ -56,9 +55,9 @@ var config = require('config').get('services.CoAuthoring');
var sqlBase = require('./baseConnector');
var canvasService = require('./canvasservice');
var converterService = require('./converterservice');
var checkExpire = require('./checkexpire');
var taskResult = require('./taskresult');
var redis = require(config.get('redis.name'));
var pubsubRedis = require('./pubsubRedis');
var pubsubService = require('./' + config.get('pubsub.name'));
var queueService = require('./../../Common/sources/taskqueueRabbitMQ');
var cfgSpellcheckerUrl = config.get('server.editor_settings_spellchecker_url');
@ -67,12 +66,9 @@ var cfgCallbackRequestTimeout = config.get('server.callbackRequestTimeout');
var cfgPubSubMaxChanges = config.get('pubsub.maxChanges');
var cfgRedisPrefix = config.get('redis.prefix');
var cfgRedisHost = config.get('redis.host');
var cfgRedisPort = config.get('redis.port');
var cfgExpCallback = config.get('expire.callback');
var cfgExpUserIndex = config.get('expire.userindex');
var cfgExpSaveLock = config.get('expire.saveLock');
var cfgExpEditors = config.get('expire.editors');
var cfgExpPresence = config.get('expire.presence');
var cfgExpLocks = config.get('expire.locks');
var cfgExpChangeIndex = config.get('expire.changeindex');
var cfgExpLockDoc = config.get('expire.lockDoc');
@ -80,37 +76,20 @@ var cfgExpMessage = config.get('expire.message');
var cfgExpLastSave = config.get('expire.lastsave');
var cfgExpForceSave = config.get('expire.forcesave');
var cfgExpSaved = config.get('expire.saved');
var cfgExpDocuments = config.get('expire.documents');
var cfgExpDocumentsCron = config.get('expire.documentsCron');
var cfgExpFiles = config.get('expire.files');
var cfgExpFilesCron = config.get('expire.filesCron');
var cfgSockjsUrl = config.get('server.sockjsUrl');
var redisKeyCallback = cfgRedisPrefix + 'callback:';
var redisKeyUserIndex = cfgRedisPrefix + 'userindex:';
var redisKeySaveLock = cfgRedisPrefix + 'savelock:';
var redisKeyEditors = cfgRedisPrefix + 'editors:';
var redisKeyLocks = cfgRedisPrefix + 'locks:';
var redisKeyChangeIndex = cfgRedisPrefix + 'changesindex:';
var redisKeyLockDoc = cfgRedisPrefix + 'lockdocument:';
var redisKeyMessage = cfgRedisPrefix + 'message:';
var redisKeyDocuments = cfgRedisPrefix + 'documents:';
var redisKeyLastSave = cfgRedisPrefix + 'lastsave:';
var redisKeyForceSave = cfgRedisPrefix + 'forcesave:';
var redisKeySaved = cfgRedisPrefix + 'saved:';
var PublishType = {
drop : 0,
releaseLock : 1,
participantsState : 2,
message : 3,
getLock : 4,
changes : 5,
auth : 6,
receiveTask : 7,
warning: 8,
cursor: 9
};
var redisKeyUserIndex = cfgRedisPrefix + constants.REDIS_KEY_USER_INDEX;
var redisKeySaveLock = cfgRedisPrefix + constants.REDIS_KEY_SAVE_LOCK;
var redisKeyPresenceHash = cfgRedisPrefix + constants.REDIS_KEY_PRESENCE_HASH;
var redisKeyPresenceSet = cfgRedisPrefix + constants.REDIS_KEY_PRESENCE_SET;
var redisKeyLocks = cfgRedisPrefix + constants.REDIS_KEY_LOCKS;
var redisKeyChangeIndex = cfgRedisPrefix + constants.REDIS_KEY_CHANGES_INDEX;
var redisKeyLockDoc = cfgRedisPrefix + constants.REDIS_KEY_LOCK_DOCUMENT;
var redisKeyMessage = cfgRedisPrefix + constants.REDIS_KEY_MESSAGE;
var redisKeyDocuments = cfgRedisPrefix + constants.REDIS_KEY_DOCUMENTS;
var redisKeyLastSave = cfgRedisPrefix + constants.REDIS_KEY_LAST_SAVE;
var redisKeyForceSave = cfgRedisPrefix + constants.REDIS_KEY_FORCE_SAVE;
var redisKeySaved = cfgRedisPrefix + constants.REDIS_KEY_SAVED;
var EditorTypes = {
document : 0,
@ -120,7 +99,7 @@ var EditorTypes = {
var defaultHttpPort = 80, defaultHttpsPort = 443; // Порты по умолчанию (для http и https)
var connections = []; // Активные соединения
var redisClient;
var redisClient = pubsubRedis.getClientRedis();
var pubsub;
var queue;
var clientStatsD = statsDClient.getClient();
@ -411,28 +390,76 @@ function getParticipantUser(docId, includeUserId) {
return el.docId === docId && el.user.id === includeUserId;
});
}
function* hasEditors(docId, optEditors) {
var elem, hasEditors = false;
if (!optEditors) {
optEditors = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + docId);
function updatePresenceCommandsToArray(outCommands, docId, userId, userInfo) {
var expireAt = new Date().getTime() + cfgExpPresence * 1000;
outCommands.push(
['zadd', redisKeyPresenceSet + docId, expireAt, userId],
['hset', redisKeyPresenceHash + docId, userId, userInfo],
['expire', redisKeyPresenceSet + docId, cfgExpPresence],
['expire', redisKeyPresenceHash + docId, cfgExpPresence]
);
}
function* updatePresence(docId, userId, userInfo) {
var commands = [];
updatePresenceCommandsToArray(commands, docId, userId, userInfo);
var expireAt = new Date().getTime() + cfgExpPresence * 1000;
commands.push(['zadd', redisKeyDocuments, expireAt, docId]);
var multi = redisClient.multi(commands);
yield utils.promiseRedis(multi, multi.exec);
}
function* getAllPresence(docId, optZRange, optHVals) {
var now = (new Date()).getTime();
var expiredKeys;
var hvals;
if (optHVals && optZRange) {
expiredKeys = optZRange;
hvals = optHVals;
} else {
var multi = redisClient.multi([
['zrangebyscore', redisKeyPresenceSet + docId, 0, now],
['hvals', redisKeyPresenceHash + docId]
]);
var multiRes = yield utils.promiseRedis(multi, multi.exec);
expiredKeys = multiRes[0];
hvals = multiRes[1];
}
for (var i = 0; i < optEditors.length; ++i) {
elem = JSON.parse(optEditors[i]);
if (expiredKeys.length > 0) {
var multi = [
['zremrangebyscore', redisKeyPresenceSet + docId, 0, now]
];
var expiredKeysMap = {};
for (var i = 0; i < expiredKeys.length; ++i) {
var expiredKey = expiredKeys[i];
expiredKeysMap[expiredKey] = 1;
commands.push(['hdel', redisKeyPresenceHash + docId, expiredKey]);
}
multi = redisClient.multi(commands);
yield utils.promiseRedis(multi, multi.exec);
hvals = hvals.filter(function(curValue) {
return null == expiredKeysMap[curValue];
})
}
return hvals;
}
function* hasEditors(docId, optZRange, optHVals) {
var elem, hasEditors = false;
var hvals = yield* getAllPresence(docId, optZRange, optHVals);
for (var i = 0; i < hvals.length; ++i) {
elem = JSON.parse(hvals[i]);
if(!elem.view) {
hasEditors = true;
break;
}
}
return hasEditors;
}
function* publish(data, optDocId, optUserId) {
var needPublish = true;
if(optDocId && optUserId) {
needPublish = false;
var hvalsRes = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + optDocId);
for (var i = 0; i < hvalsRes.length; ++i) {
var elem = JSON.parse(hvalsRes[i]);
var hvals = yield* getAllPresence(optDocId);
for (var i = 0; i < hvals.length; ++i) {
var elem = JSON.parse(hvals[i]);
if(optUserId != elem.id) {
needPublish = true;
break;
@ -444,8 +471,9 @@ function* publish(data, optDocId, optUserId) {
pubsub.publish(msg);
}
}
function* addTask(data, priority) {
yield queue.addTask(data, priority);
function* addTask(data, priority, opt_queue) {
var realQueue = opt_queue ? opt_queue : queue;
yield realQueue.addTask(data, priority);
}
function* removeResponse(data) {
yield queue.removeResponse(data);
@ -453,9 +481,9 @@ function* removeResponse(data) {
function* getOriginalParticipantsId(docId) {
var result = [], tmpObject = {};
var hvalsRes = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + docId);
for (var i = 0; i < hvalsRes.length; ++i) {
var elem = JSON.parse(hvalsRes[i]);
var hvals = yield* getAllPresence(docId);
for (var i = 0; i < hvals.length; ++i) {
var elem = JSON.parse(hvals[i]);
if (!elem.view) {
tmpObject[elem.idOriginal] = 1;
}
@ -500,28 +528,19 @@ function parseUrl(callbackUrl) {
function* deleteCallback(id) {
// Нужно удалить из базы callback-ов
yield utils.promiseRedis(redisClient, redisClient.del, redisKeyCallback + id);
yield sqlBase.deleteCallbackPromise(id);
}
function* getCallback(id) {
var callbackUrl = null;
var baseUrl = null;
var data = yield utils.promiseRedis(redisClient, redisClient.get, redisKeyCallback + id);
if (data) {
var dataParsed = JSON.parse(data);
callbackUrl = dataParsed.callbackUrl;
baseUrl = dataParsed.baseUrl;
}
else {
var selectRes = yield sqlBase.getCallbackPromise(id);
if (selectRes.length > 0) {
var row = selectRes[0];
if (row.dc_callback) {
callbackUrl = row.dc_callback;
}
if (row.dc_baseurl) {
baseUrl = row.dc_baseurl;
}
var selectRes = yield sqlBase.getCallbackPromise(id);
if (selectRes.length > 0) {
var row = selectRes[0];
if (row.dc_callback) {
callbackUrl = row.dc_callback;
}
if (row.dc_baseurl) {
baseUrl = row.dc_baseurl;
}
}
if (null != callbackUrl && null != baseUrl) {
@ -532,8 +551,6 @@ function* getCallback(id) {
}
function* addCallback(id, href, baseUrl) {
yield sqlBase.insertInTablePromise(sqlBase.tableId.callbacks, null, id, href, baseUrl);
yield utils.promiseRedis(redisClient, redisClient.setex, redisKeyCallback + id, cfgExpCallback,
JSON.stringify({callbackUrl: href, baseUrl: baseUrl}));
}
function* getChangesIndex(docId) {
var res = 0;
@ -634,7 +651,7 @@ function* onReplySendStatusDocument(docId, replyData) {
var oData = parseReplyData(docId, replyData);
if (!(oData && commonDefines.c_oAscServerCommandErrors.NoError == oData.error)) {
// Ошибка подписки на callback, посылаем warning
yield* publish({type: PublishType.warning, docId: docId, description: 'Error on save server subscription!'});
yield* publish({type: commonDefines.c_oPublishType.warning, docId: docId, description: 'Error on save server subscription!'});
}
}
function* dropUsersFromDocument(docId, replyData) {
@ -642,7 +659,7 @@ function* dropUsersFromDocument(docId, replyData) {
if (oData) {
users = Array.isArray(oData) ? oData : oData.users;
if (Array.isArray(users)) {
yield* publish({type: PublishType.drop, docId: docId, users: users, description: ''});
yield* publish({type: commonDefines.c_oPublishType.drop, docId: docId, users: users, description: ''});
}
}
}
@ -685,7 +702,7 @@ function* bindEvents(docId, callback, baseUrl, opt_userAction) {
function* cleanDocumentOnExit(docId, deleteChanges, deleteUserIndex) {
//clean redis
var redisArgs = [redisClient, redisClient.del, redisKeyLocks + docId, redisKeyEditors + docId,
var redisArgs = [redisClient, redisClient.del, redisKeyLocks + docId, redisKeyPresenceSet + docId, redisKeyPresenceHash + docId,
redisKeyMessage + docId, redisKeyChangeIndex + docId, redisKeyForceSave + docId, redisKeyLastSave + docId];
if (deleteUserIndex) {
redisArgs.push(redisKeyUserIndex + docId);
@ -707,19 +724,48 @@ function* cleanDocumentOnExitNoChanges(docId, opt_userId) {
yield* cleanDocumentOnExit(docId, false, false);
}
function* _createSaveTimer(docId, opt_userId, opt_queue, opt_noDelay) {
var updateMask = new taskResult.TaskResultData();
updateMask.key = docId;
updateMask.status = taskResult.FileStatus.Ok;
var updateTask = new taskResult.TaskResultData();
updateTask.status = taskResult.FileStatus.SaveVersion;
updateTask.statusInfo = utils.getMillisecondsOfHour(new Date());
var updateIfRes = yield taskResult.updateIf(updateTask, updateMask);
if (updateIfRes.affectedRows > 0) {
if(!opt_noDelay){
yield utils.sleep(c_oAscSaveTimeOutDelay);
}
while (true) {
if (!sqlBase.isLockCriticalSection(docId)) {
canvasService.saveFromChanges(docId, updateTask.statusInfo, null, opt_userId, opt_queue);
break;
}
yield utils.sleep(c_oAscLockTimeOutDelay);
}
} else {
//если не получилось - значит FileStatus=SaveVersion(кто-то другой начал сборку) или UpdateVersion(сборка закончена)
//в этом случае ничего делать не надо
logger.debug('_createSaveTimer updateIf no effect');
}
}
exports.version = asc_coAuthV;
exports.c_oAscServerStatus = c_oAscServerStatus;
exports.sendData = sendData;
exports.parseUrl = parseUrl;
exports.parseReplyData = parseReplyData;
exports.sendServerRequest = sendServerRequest;
exports.PublishType = PublishType;
exports.createSaveTimerPromise = co.wrap(_createSaveTimer);
exports.getAllPresencePromise = co.wrap(getAllPresence);
exports.publish = publish;
exports.addTask = addTask;
exports.removeResponse = removeResponse;
exports.hasEditors = hasEditors;
exports.getCallback = getCallback;
exports.cleanDocumentOnExit = cleanDocumentOnExit;
exports.getChangesIndexPromise = co.wrap(getChangesIndex);
exports.cleanDocumentOnExitPromise = co.wrap(cleanDocumentOnExit);
exports.cleanDocumentOnExitNoChangesPromise = co.wrap(cleanDocumentOnExitNoChanges);
exports.setForceSave= setForceSave;
exports.install = function(server, callbackFunction) {
'use strict';
@ -773,15 +819,15 @@ exports.install = function(server, callbackFunction) {
case 'unLockDocument' :
yield* checkEndAuthLock(data.isSave, docId, conn.user.id, conn);
break;
case 'ping':
yield utils.promiseRedis(redisClient, redisClient.zadd, redisKeyDocuments, new Date().getTime(), docId);
break;
case 'close':
yield* closeDocument(conn, false);
break;
case 'openDocument' :
canvasService.openDocument(conn, data);
break;
default:
logger.debug("unknown command %s", message);
break;
}
if(clientStatsD) {
if('openDocument' != data.type) {
@ -844,7 +890,7 @@ exports.install = function(server, callbackFunction) {
var state = (false == reconnected) ? false : undefined;
var tmpUser = conn.user;
yield* publish({type: PublishType.participantsState, docId: docId, user: tmpUser, state: state}, docId, tmpUser.id);
yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, user: tmpUser, state: state}, docId, tmpUser.id);
if (!reconnected) {
// Для данного пользователя снимаем лок с сохранения
@ -852,15 +898,17 @@ exports.install = function(server, callbackFunction) {
if (conn.user.id == saveLock) {
yield utils.promiseRedis(redisClient, redisClient.del, redisKeySaveLock + docId);
}
var commands = [
['hdel', redisKeyPresenceHash + docId, tmpUser.id],
['zrem', redisKeyPresenceSet + docId, tmpUser.id]
];
// Только если редактируем
if (false === tmpUser.view) {
var multi = redisClient.multi([
['hdel', redisKeyEditors + docId, tmpUser.id],
['hvals', redisKeyEditors + docId]
]);
commands.push(['zrangebyscore', redisKeyPresenceSet + docId, 0, (new Date()).getTime()],
['hvals', redisKeyPresenceHash + docId]);
var multi = redisClient.multi(commands);
var execRes = yield utils.promiseRedis(multi, multi.exec);
bHasEditors = yield* hasEditors(docId, execRes[1]);
bHasEditors = yield* hasEditors(docId, execRes[2], execRes[3]);
var puckerIndex = yield* getChangesIndex(docId);
bHasChanges = puckerIndex > 0;
@ -887,13 +935,14 @@ exports.install = function(server, callbackFunction) {
if (0 < userLocks.length) {
//todo на close себе ничего не шлем
//sendReleaseLock(conn, userLocks);
yield* publish({type: PublishType.releaseLock, docId: docId, userId: conn.user.id, locks: userLocks}, docId, conn.user.id);
yield* publish({type: commonDefines.c_oPublishType.releaseLock, docId: docId, userId: conn.user.id, locks: userLocks}, docId, conn.user.id);
}
// Для данного пользователя снимаем Lock с документа
yield* checkEndAuthLock(false, docId, conn.user.id);
} else {
yield utils.promiseRedis(redisClient, redisClient.hdel, redisKeyEditors + docId, tmpUser.id);
var multi = redisClient.multi(commands);
yield utils.promiseRedis(multi, multi.exec);
}
}
}
@ -954,9 +1003,9 @@ exports.install = function(server, callbackFunction) {
function* getParticipantMap(docId) {
var participantsMap = [];
var hvalsRes = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + docId);
for (var i = 0; i < hvalsRes.length; ++i) {
participantsMap.push(JSON.parse(hvalsRes[i]));
var hvals = yield* getAllPresence(docId);
for (var i = 0; i < hvals.length; ++i) {
participantsMap.push(JSON.parse(hvals[i]));
}
return participantsMap;
}
@ -968,7 +1017,7 @@ exports.install = function(server, callbackFunction) {
yield utils.promiseRedis(redisClient, redisClient.del, redisKeyLockDoc + docId);
var participantsMap = yield* getParticipantMap(docId);
yield* publish({type: PublishType.auth, docId: docId, userId: userId, participantsMap: participantsMap}, docId, userId);
yield* publish({type: commonDefines.c_oPublishType.auth, docId: docId, userId: userId, participantsMap: participantsMap}, docId, userId);
result = true;
} else if (isSave) {
@ -976,7 +1025,7 @@ exports.install = function(server, callbackFunction) {
var userLocks = yield* getUserLocks(docId, currentConnection.sessionId);
if (0 < userLocks.length) {
sendReleaseLock(currentConnection, userLocks);
yield* publish({type: PublishType.releaseLock, docId: docId, userId: userId, locks: userLocks}, docId, userId);
yield* publish({type: commonDefines.c_oPublishType.releaseLock, docId: docId, userId: userId, locks: userLocks}, docId, userId);
}
// Автоматически снимаем lock сами
@ -1288,14 +1337,9 @@ exports.install = function(server, callbackFunction) {
function* endAuth(conn, bIsRestore, documentCallbackUrl) {
var docId = conn.docId;
connections.push(conn);
var tmpUser = conn.user;
var multi = redisClient.multi([
['hset', redisKeyEditors + docId, tmpUser.id, JSON.stringify(tmpUser)],
['expire', redisKeyEditors + docId, cfgExpEditors]
]);
yield utils.promiseRedis(multi, multi.exec);
yield utils.promiseRedis(redisClient, redisClient.zadd, redisKeyDocuments, new Date().getTime(), docId);
connections.push(conn);
yield* updatePresence(docId, tmpUser.id, JSON.stringify(tmpUser));
var firstParticipantNoView, countNoView = 0;
var participantsMap = yield* getParticipantMap(docId);
for (var i = 0; i < participantsMap.length; ++i) {
@ -1350,7 +1394,7 @@ exports.install = function(server, callbackFunction) {
yield* sendAuthInfo(objChangesDocument.arrChanges, objChangesDocument.getLength(), conn, participantsMap);
}
}
yield* publish({type: PublishType.participantsState, docId: docId, user: tmpUser, state: true}, docId, tmpUser.id);
yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, user: tmpUser, state: true}, docId, tmpUser.id);
}
function* sendAuthInfo(objChangesDocument, changesIndex, conn, participantsMap) {
@ -1403,7 +1447,7 @@ exports.install = function(server, callbackFunction) {
var messages = [msg];
sendDataMessage(conn, messages);
yield* publish({type: PublishType.message, docId: docId, userId: userId, messages: messages}, docId, userId);
yield* publish({type: commonDefines.c_oPublishType.message, docId: docId, userId: userId, messages: messages}, docId, userId);
}
function* onCursor(conn, data) {
@ -1414,7 +1458,7 @@ exports.install = function(server, callbackFunction) {
logger.info("send cursor: docId = %s %s", docId, msg);
var messages = [msg];
yield* publish({type: PublishType.cursor, docId: docId, userId: userId, messages: messages}, docId, userId);
yield* publish({type: commonDefines.c_oPublishType.cursor, docId: docId, userId: userId, messages: messages}, docId, userId);
}
function* getLock(conn, data, bIsRestore) {
@ -1457,7 +1501,7 @@ exports.install = function(server, callbackFunction) {
}
//тому кто зделал запрос возвращаем максимально быстро
sendData(conn, {type: "getLock", locks: documentLocks});
yield* publish({type: PublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId);
yield* publish({type: commonDefines.c_oPublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId);
return true;
}
@ -1482,7 +1526,7 @@ exports.install = function(server, callbackFunction) {
}
//тому кто зделал запрос возвращаем максимально быстро
sendData(conn, {type: "getLock", locks: documentLocks});
yield* publish({type: PublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId);
yield* publish({type: commonDefines.c_oPublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId);
return true;
}
@ -1507,7 +1551,7 @@ exports.install = function(server, callbackFunction) {
}
//тому кто зделал запрос возвращаем максимально быстро
sendData(conn, {type: "getLock", locks: documentLocks});
yield* publish({type: PublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId);
yield* publish({type: commonDefines.c_oPublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId);
return true;
}
@ -1599,7 +1643,7 @@ exports.install = function(server, callbackFunction) {
if(changesToSend.length > cfgPubSubMaxChanges) {
changesToSend = null;
}
yield* publish({type: PublishType.changes, docId: docId, userId: userId,
yield* publish({type: commonDefines.c_oPublishType.changes, docId: docId, userId: userId,
changes: changesToSend, startIndex: startIndex, changesIndex: puckerIndex,
locks: arrLocks, excelAdditionalInfo: data.excelAdditionalInfo}, docId, userId);
}
@ -1612,7 +1656,7 @@ exports.install = function(server, callbackFunction) {
if(changesToSend.length > cfgPubSubMaxChanges) {
changesToSend = null;
}
yield* publish({type: PublishType.changes, docId: docId, userId: userId,
yield* publish({type: commonDefines.c_oPublishType.changes, docId: docId, userId: userId,
changes: changesToSend, startIndex: startIndex, changesIndex: puckerIndex,
locks: [], excelAdditionalInfo: undefined}, docId, userId);
sendData(conn, {type: 'savePartChanges', changesIndex: changesIndex});
@ -1767,30 +1811,6 @@ exports.install = function(server, callbackFunction) {
return {res: !isLock, documentLocks: documentLocks};
}
function* _createSaveTimer(docId, opt_userId) {
var updateMask = new taskResult.TaskResultData();
updateMask.key = docId;
updateMask.status = taskResult.FileStatus.Ok;
var updateTask = new taskResult.TaskResultData();
updateTask.status = taskResult.FileStatus.SaveVersion;
updateTask.statusInfo = utils.getMillisecondsOfHour(new Date());
var updateIfRes = yield taskResult.updateIf(updateTask, updateMask);
if (updateIfRes.affectedRows > 0) {
yield utils.sleep(c_oAscSaveTimeOutDelay);
while (true) {
if (!sqlBase.isLockCriticalSection(docId)) {
canvasService.saveFromChanges(docId, updateTask.statusInfo, null, opt_userId);
break;
}
yield utils.sleep(c_oAscLockTimeOutDelay);
}
} else {
//если не получилось - значит FileStatus=SaveVersion(кто-то другой начал сборку) или UpdateVersion(сборка закончена)
//в этом случае ничего делать не надо
logger.debug('_createSaveTimer updateIf no effect');
}
}
function _checkLicense(conn) {
sendData(conn, {type: 'license', license: licenseInfo});
}
@ -1800,51 +1820,6 @@ exports.install = function(server, callbackFunction) {
logger.info(message);
}});
var checkDocumentExpire = function() {
return co(function*() {
try {
logger.debug('checkDocumentExpire start');
var removedCount = 0;
var dateExpire = new Date();
utils.addSeconds(dateExpire, -cfgExpDocuments);
var expireDocs = yield utils.promiseRedis(redisClient, redisClient.zrangebyscore, redisKeyDocuments, '-inf', dateExpire.getTime());
for (var i = 0; i < expireDocs.length; ++i) {
var docId = expireDocs[i];
var numDelete = yield utils.promiseRedis(redisClient, redisClient.zrem, redisKeyDocuments, docId);
//если numDelete == 0, значит этот ключ удалил другой процесс
if (numDelete > 0) {
removedCount++;
var puckerIndex = yield* getChangesIndex(docId);
if (puckerIndex > 0) {
logger.debug('checkDocumentExpire commit %d changes: docId = %s', puckerIndex, docId);
yield* _createSaveTimer(docId);
} else {
logger.debug('checkDocumentExpire no changes: docId = %s', docId);
yield* cleanDocumentOnExitNoChanges(docId);
}
}
}
logger.debug('checkDocumentExpire end: removedCount = %d', removedCount);
} catch (e) {
logger.error('checkDocumentExpire error:\r\n%s', e.stack);
}
});
};
//удаление файлов от которых не приходит heartbeat
var documentExpireJob = new cron.CronJob(cfgExpDocumentsCron, checkDocumentExpire);
documentExpireJob.start();
var fileExpireJob = new cron.CronJob(cfgExpFilesCron, checkExpire.checkFileExpire);
fileExpireJob.start();
//cache
redisClient = redis.createClient(cfgRedisPort, cfgRedisHost, {});
redisClient.on('error', function(err) {
logger.error('redisClient error:\r\n%s', err.stack);
});
// redisClient.on("connect", function () {
// logger.debug('redisClient connect');
// });
//publish subscribe message brocker
function pubsubOnMessage(msg) {
return co(function* () {
@ -1856,32 +1831,32 @@ exports.install = function(server, callbackFunction) {
var objChangesDocument;
var i;
switch (data.type) {
case PublishType.drop:
case commonDefines.c_oPublishType.drop:
for (i = 0; i < data.users.length; ++i) {
dropUserFromDocument(data.docId, data.users[i], data.description);
}
break;
case PublishType.releaseLock:
case commonDefines.c_oPublishType.releaseLock:
participants = getParticipants(true, data.docId, data.userId, true);
_.each(participants, function(participant) {
sendReleaseLock(participant, data.locks);
});
break;
case PublishType.participantsState:
case commonDefines.c_oPublishType.participantsState:
participants = getParticipants(true, data.docId, data.user.id);
sendParticipantsState(participants, data);
break;
case PublishType.message:
case commonDefines.c_oPublishType.message:
participants = getParticipants(true, data.docId, data.userId);
_.each(participants, function(participant) {
sendDataMessage(participant, data.messages);
});
break;
case PublishType.getLock:
case commonDefines.c_oPublishType.getLock:
participants = getParticipants(true, data.docId, data.userId, true);
sendGetLock(participants, data.documentLocks);
break;
case PublishType.changes:
case commonDefines.c_oPublishType.changes:
participants = getParticipants(true, data.docId, data.userId, true);
if(participants.length > 0) {
var changes = data.changes;
@ -1895,7 +1870,7 @@ exports.install = function(server, callbackFunction) {
});
}
break;
case PublishType.auth:
case commonDefines.c_oPublishType.auth:
participants = getParticipants(true, data.docId, data.userId, true);
if(participants.length > 0) {
objChangesDocument = yield* getDocumentChanges(data.docId);
@ -1905,7 +1880,7 @@ exports.install = function(server, callbackFunction) {
}
}
break;
case PublishType.receiveTask:
case commonDefines.c_oPublishType.receiveTask:
var cmd = new commonDefines.InputCommand(data.cmd);
var output = new canvasService.OutputDataWrap();
output.fromObject(data.output);
@ -1937,21 +1912,43 @@ exports.install = function(server, callbackFunction) {
sendData(participant, output);
}
break;
case PublishType.warning:
case commonDefines.c_oPublishType.warning:
participants = getParticipants(false, data.docId);
_.each(participants, function(participant) {
sendDataWarning(participant, data.description);
});
break;
case PublishType.cursor:
case commonDefines.c_oPublishType.cursor:
participants = getParticipants(true, data.docId, data.userId);
_.each(participants, function(participant) {
sendDataCursor(participant, data.messages);
});
break;
case commonDefines.c_oPublishType.expireDoc:
logger.debug('pubsub expireDoc connections.length = %d', connections.length);
var commands = [];
var idSet = new Set();
for (i = 0; i < connections.length; ++i) {
var conn = connections[i];
if (!conn.isCloseCoAuthoring) {
idSet.add(conn.docId);
updatePresenceCommandsToArray(commands, conn.docId, conn.user.id, JSON.stringify(conn.user));
}
}
var expireAt = new Date().getTime() + cfgExpPresence * 1000;
idSet.forEach(function(value1, value2, set) {
commands.push(['zadd', redisKeyDocuments, expireAt, value1]);
});
if (commands.length > 0) {
var multi = redisClient.multi(commands);
yield utils.promiseRedis(multi, multi.exec);
}
break;
default:
logger.debug('pubsub unknown message type:%s', msg);
}
} catch (err) {
logger.debug('pubsub message error:\r\n%s', err.stack);
logger.error('pubsub message error:\r\n%s', err.stack);
}
});
}
@ -1996,7 +1993,7 @@ exports.commandFromServer = function (req, res) {
break;
case 'drop':
if (query.userid) {
yield* publish({type: PublishType.drop, docId: docId, users: [query.userid], description: query.description});
yield* publish({type: commonDefines.c_oPublishType.drop, docId: docId, users: [query.userid], description: query.description});
}
else if (query.users) {
yield* dropUsersFromDocument(docId, query.users);

File diff suppressed because one or more lines are too long

View File

@ -1,14 +1,25 @@
var config = require('config').get('services.CoAuthoring');
var co = require('co');
var cron = require('cron');
var taskResult = require('./taskresult');
var docsCoServer = require('./DocsCoServer');
var storage = require('./../../Common/sources/storage-base');
var utils = require('./../../Common/sources/utils');
var logger = require('./../../Common/sources/logger');
var commonDefines = require('./../../Common/sources/commondefines');
var constants = require('./../../Common/sources/constants');
var pubsubRedis = require('./pubsubRedis.js');
var pubsubService = require('./' + config.get('pubsub.name'));
var queueService = require('./../../Common/sources/taskqueueRabbitMQ');
var cfgRedisPrefix = config.get('redis.prefix');
var cfgExpFilesCron = config.get('expire.filesCron');
var cfgExpDocumentsCron = config.get('expire.documentsCron');
var cfgExpFiles = config.get('expire.files');
var cfgExpFilesRemovedAtOnce = config.get('expire.filesremovedatonce');
//todo checkDocumentExpire
var redisKeyDocuments = cfgRedisPrefix + constants.REDIS_KEY_DOCUMENTS;
var checkFileExpire = function() {
return co(function* () {
try {
@ -21,12 +32,17 @@ var checkFileExpire = function() {
expired = yield taskResult.getExpired(cfgExpFilesRemovedAtOnce, cfgExpFiles);
for (var i = 0; i < expired.length; ++i) {
var docId = expired[i].tr_key;
//todo drop user
var removeRes = yield taskResult.remove(docId);
//если ничего не удалилось, значит это сделал другой процесс
if (removeRes.affectedRows > 0) {
currentRemovedCount++;
yield storage.deletePath(docId);
//проверяем что никто не сидит в документе
var hvals = yield docsCoServer.getAllPresencePromise(docId);
if(0 == hvals.length){
var removeRes = yield taskResult.remove(docId);
//если ничего не удалилось, значит это сделал другой процесс
if (removeRes.affectedRows > 0) {
currentRemovedCount++;
yield storage.deletePath(docId);
}
} else {
logger.debug('checkFileExpire expire but presence: hvals = %s; docId = %s', hvals, docId);
}
}
removedCount += currentRemovedCount;
@ -37,4 +53,54 @@ var checkFileExpire = function() {
}
});
};
exports.checkFileExpire = checkFileExpire;
var checkDocumentExpire = function() {
return co(function* () {
try {
logger.debug('checkDocumentExpire start');
var removedCount = 0;
var startSaveCount = 0;
var redisClient = pubsubRedis.getClientRedis();
var pubsub = new pubsubService();
yield pubsub.initPromise();
//inner ping to update presence
pubsub.publish(JSON.stringify({type: commonDefines.c_oPublishType.expireDoc}));
var now = (new Date()).getTime();
var multi = redisClient.multi([
['zrangebyscore', redisKeyDocuments, 0, now],
['zremrangebyscore', redisKeyDocuments, 0, now]
]);
var execRes = yield utils.promiseRedis(multi, multi.exec);
var expiredKeys = execRes[0];
if (expiredKeys.length > 0) {
var queue = new queueService();
yield queue.initPromise(true, false, false, false);
for (var i = 0; i < expiredKeys.length; ++i) {
var docId = expiredKeys[i];
if (docId) {
var puckerIndex = yield docsCoServer.getChangesIndexPromise(docId);
if (puckerIndex > 0) {
yield docsCoServer.createSaveTimerPromise(docId, null, queue, true);
startSaveCount++;
} else {
yield docsCoServer.cleanDocumentOnExitNoChangesPromise(docId);
removedCount++;
}
}
}
}
logger.debug('checkDocumentExpire end: startSaveCount = %d, removedCount = %d', startSaveCount, removedCount);
} catch (e) {
logger.error('checkDocumentExpire error:\r\n%s', e.stack);
}
});
};
var documentExpireJob = new cron.CronJob(cfgExpDocumentsCron, checkDocumentExpire);
documentExpireJob.start();
var fileExpireJob = new cron.CronJob(cfgExpFilesCron, checkFileExpire);
fileExpireJob.start();

View File

@ -61,6 +61,18 @@ util.inherits(PubsubRabbitMQ, events.EventEmitter);
PubsubRabbitMQ.prototype.init = function (callback) {
init(this, callback);
};
PubsubRabbitMQ.prototype.initPromise = function() {
var t = this;
return new Promise(function(resolve, reject) {
init(t, function(err) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
};
PubsubRabbitMQ.prototype.publish = function (message) {
var data = new Buffer(message);
if (null != this.channelPublish) {

View File

@ -3,13 +3,14 @@ var config = require('config').get('services.CoAuthoring.redis');
var events = require('events');
var util = require('util');
var logger = require('./../../Common/sources/logger');
var constants = require('./../../Common/sources/constants');
var redis = require(config.get('name'));
var cfgRedisPrefix = config.get('prefix');
var cfgRedisHost = config.get('host');
var cfgRedisPort = config.get('port');
var channelName = cfgRedisPrefix + 'pubsub';
var channelName = cfgRedisPrefix + constants.REDIS_KEY_PUBSUB;
function createClientRedis() {
var redisClient = redis.createClient(cfgRedisPort, cfgRedisHost, {});
@ -18,6 +19,13 @@ function createClientRedis() {
});
return redisClient;
}
var g_redisClient = null;
function getClientRedis() {
if (!g_redisClient) {
g_redisClient = createClientRedis();
}
return g_redisClient;
}
function PubsubRedis() {
this.clientPublish = null;
@ -34,9 +42,21 @@ PubsubRedis.prototype.init = function(callback) {
});
callback(null);
};
PubsubRedis.prototype.initPromise = function() {
var t = this;
return new Promise(function(resolve, reject) {
t.init(function(err) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
};
PubsubRedis.prototype.publish = function(data) {
this.clientPublish.publish(channelName, data);
};
module.exports = PubsubRedis;
module.exports.createClientRedis = createClientRedis;
module.exports.getClientRedis = getClientRedis;