From a83889c4d8a239186df5d75b05757f595f791173 Mon Sep 17 00:00:00 2001 From: "Sergey.Konovalov" Date: Fri, 18 Mar 2016 14:12:49 +0000 Subject: [PATCH] merge from TeamlabOffice_v3.7_ServerExpire git-svn-id: svn://192.168.3.15/activex/AVS/Sources/TeamlabOffice/trunk/nodeJSProjects@68971 954022d7-b5bf-4e40-9824-e11837661b57 --- Common/config/default.json | 10 +- Common/sources/commondefines.js | 14 + Common/sources/constants.js | 16 +- Common/sources/taskqueueRabbitMQ.js | 12 + DocService/sources/DocsCoServer.js | 373 +++++++++++++-------------- DocService/sources/canvasservice.js | 2 +- DocService/sources/checkexpire.js | 82 +++++- DocService/sources/pubsubRabbitMQ.js | 12 + DocService/sources/pubsubRedis.js | 24 +- 9 files changed, 339 insertions(+), 206 deletions(-) diff --git a/Common/config/default.json b/Common/config/default.json index dd74a81c..71a4aa39 100644 --- a/Common/config/default.json +++ b/Common/config/default.json @@ -84,10 +84,9 @@ "maxChanges": 1000 }, "expire": { - "callback": 3600, "userindex": 604800, "saveLock": 60, - "editors": 604800, + "presence": 300, "locks": 604800, "changeindex": 86400, "lockDoc": 60, @@ -95,10 +94,9 @@ "lastsave": 604800, "forcesave": 604800, "saved": 3600, - "documents": 300, - "documentsCron": "0 */5 * * * *", - "files": 604800, - "filesCron": "00 00 */1 * * *", + "documentsCron": "0 */2 * * * *", + "files": 300, + "filesCron": "00 */5 * * * *", "filesremovedatonce": 10 } } diff --git a/Common/sources/commondefines.js b/Common/sources/commondefines.js index 46dd5b74..2a6bd68a 100644 --- a/Common/sources/commondefines.js +++ b/Common/sources/commondefines.js @@ -531,6 +531,19 @@ function OutputAction(type, userid) { this['type'] = type; this['userid'] = userid; } +var c_oPublishType = { + drop : 0, + releaseLock : 1, + participantsState : 2, + message : 3, + getLock : 4, + changes : 5, + auth : 6, + receiveTask : 7, + warning: 8, + cursor: 9, + expireDoc: 10 +}; var c_oAscCsvDelimiter = { None: 0, Tab: 1, @@ -632,6 +645,7 @@ exports.InputCommand = InputCommand; exports.OutputSfcData = OutputSfcData; exports.OutputMailMerge = OutputMailMerge; exports.OutputAction = OutputAction; +exports.c_oPublishType = c_oPublishType; exports.c_oAscCsvDelimiter = c_oAscCsvDelimiter; exports.c_oAscEncodings = c_oAscEncodings; exports.c_oAscEncodingsMap = c_oAscEncodingsMap; diff --git a/Common/sources/constants.js b/Common/sources/constants.js index 78362c3a..3b60f2fd 100644 --- a/Common/sources/constants.js +++ b/Common/sources/constants.js @@ -118,4 +118,18 @@ exports.QUEUE_PRIORITY_RESPONSE = 3; exports.EDITOR_TYPE_WORD = 0; exports.EDITOR_TYPE_SPREADSHEET = 1; exports.EDITOR_TYPE_PRESENTATION = 2; -exports.EDITOR_TYPE_CONVERTATION = 3; \ No newline at end of file +exports.EDITOR_TYPE_CONVERTATION = 3; + +exports.REDIS_KEY_PUBSUB = 'pubsub'; +exports.REDIS_KEY_USER_INDEX = 'userindex:'; +exports.REDIS_KEY_SAVE_LOCK = 'savelock:'; +exports.REDIS_KEY_PRESENCE_HASH = 'presence:hash:'; +exports.REDIS_KEY_PRESENCE_SET = 'presence:set:'; +exports.REDIS_KEY_LOCKS = 'locks:'; +exports.REDIS_KEY_CHANGES_INDEX = 'changesindex:'; +exports.REDIS_KEY_LOCK_DOCUMENT = 'lockdocument:'; +exports.REDIS_KEY_MESSAGE = 'message:'; +exports.REDIS_KEY_DOCUMENTS = 'documents'; +exports.REDIS_KEY_LAST_SAVE = 'lastsave:'; +exports.REDIS_KEY_FORCE_SAVE = 'forcesave:'; +exports.REDIS_KEY_SAVED = 'saved:'; diff --git a/Common/sources/taskqueueRabbitMQ.js b/Common/sources/taskqueueRabbitMQ.js index 7b83e2bb..f4f5482c 100644 --- a/Common/sources/taskqueueRabbitMQ.js +++ b/Common/sources/taskqueueRabbitMQ.js @@ -133,6 +133,18 @@ util.inherits(TaskQueueRabbitMQ, events.EventEmitter); TaskQueueRabbitMQ.prototype.init = function (isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, callback) { init(this, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, callback); }; +TaskQueueRabbitMQ.prototype.initPromise = function(isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive) { + var t = this; + return new Promise(function(resolve, reject) { + init(t, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, function(err) { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +}; TaskQueueRabbitMQ.prototype.addTask = function (task, priority) { //todo confirmation mode var t = this; diff --git a/DocService/sources/DocsCoServer.js b/DocService/sources/DocsCoServer.js index eaf89941..9fd97536 100644 --- a/DocService/sources/DocsCoServer.js +++ b/DocService/sources/DocsCoServer.js @@ -44,7 +44,6 @@ var _ = require('underscore'); var https = require('https'); var http = require('http'); var url = require('url'); -var cron = require('cron'); var co = require('co'); var storage = require('./../../Common/sources/storage-base'); var logger = require('./../../Common/sources/logger'); @@ -56,9 +55,9 @@ var config = require('config').get('services.CoAuthoring'); var sqlBase = require('./baseConnector'); var canvasService = require('./canvasservice'); var converterService = require('./converterservice'); -var checkExpire = require('./checkexpire'); var taskResult = require('./taskresult'); var redis = require(config.get('redis.name')); +var pubsubRedis = require('./pubsubRedis'); var pubsubService = require('./' + config.get('pubsub.name')); var queueService = require('./../../Common/sources/taskqueueRabbitMQ'); var cfgSpellcheckerUrl = config.get('server.editor_settings_spellchecker_url'); @@ -67,12 +66,9 @@ var cfgCallbackRequestTimeout = config.get('server.callbackRequestTimeout'); var cfgPubSubMaxChanges = config.get('pubsub.maxChanges'); var cfgRedisPrefix = config.get('redis.prefix'); -var cfgRedisHost = config.get('redis.host'); -var cfgRedisPort = config.get('redis.port'); -var cfgExpCallback = config.get('expire.callback'); var cfgExpUserIndex = config.get('expire.userindex'); var cfgExpSaveLock = config.get('expire.saveLock'); -var cfgExpEditors = config.get('expire.editors'); +var cfgExpPresence = config.get('expire.presence'); var cfgExpLocks = config.get('expire.locks'); var cfgExpChangeIndex = config.get('expire.changeindex'); var cfgExpLockDoc = config.get('expire.lockDoc'); @@ -80,37 +76,20 @@ var cfgExpMessage = config.get('expire.message'); var cfgExpLastSave = config.get('expire.lastsave'); var cfgExpForceSave = config.get('expire.forcesave'); var cfgExpSaved = config.get('expire.saved'); -var cfgExpDocuments = config.get('expire.documents'); -var cfgExpDocumentsCron = config.get('expire.documentsCron'); -var cfgExpFiles = config.get('expire.files'); -var cfgExpFilesCron = config.get('expire.filesCron'); var cfgSockjsUrl = config.get('server.sockjsUrl'); -var redisKeyCallback = cfgRedisPrefix + 'callback:'; -var redisKeyUserIndex = cfgRedisPrefix + 'userindex:'; -var redisKeySaveLock = cfgRedisPrefix + 'savelock:'; -var redisKeyEditors = cfgRedisPrefix + 'editors:'; -var redisKeyLocks = cfgRedisPrefix + 'locks:'; -var redisKeyChangeIndex = cfgRedisPrefix + 'changesindex:'; -var redisKeyLockDoc = cfgRedisPrefix + 'lockdocument:'; -var redisKeyMessage = cfgRedisPrefix + 'message:'; -var redisKeyDocuments = cfgRedisPrefix + 'documents:'; -var redisKeyLastSave = cfgRedisPrefix + 'lastsave:'; -var redisKeyForceSave = cfgRedisPrefix + 'forcesave:'; -var redisKeySaved = cfgRedisPrefix + 'saved:'; - -var PublishType = { - drop : 0, - releaseLock : 1, - participantsState : 2, - message : 3, - getLock : 4, - changes : 5, - auth : 6, - receiveTask : 7, - warning: 8, - cursor: 9 -}; +var redisKeyUserIndex = cfgRedisPrefix + constants.REDIS_KEY_USER_INDEX; +var redisKeySaveLock = cfgRedisPrefix + constants.REDIS_KEY_SAVE_LOCK; +var redisKeyPresenceHash = cfgRedisPrefix + constants.REDIS_KEY_PRESENCE_HASH; +var redisKeyPresenceSet = cfgRedisPrefix + constants.REDIS_KEY_PRESENCE_SET; +var redisKeyLocks = cfgRedisPrefix + constants.REDIS_KEY_LOCKS; +var redisKeyChangeIndex = cfgRedisPrefix + constants.REDIS_KEY_CHANGES_INDEX; +var redisKeyLockDoc = cfgRedisPrefix + constants.REDIS_KEY_LOCK_DOCUMENT; +var redisKeyMessage = cfgRedisPrefix + constants.REDIS_KEY_MESSAGE; +var redisKeyDocuments = cfgRedisPrefix + constants.REDIS_KEY_DOCUMENTS; +var redisKeyLastSave = cfgRedisPrefix + constants.REDIS_KEY_LAST_SAVE; +var redisKeyForceSave = cfgRedisPrefix + constants.REDIS_KEY_FORCE_SAVE; +var redisKeySaved = cfgRedisPrefix + constants.REDIS_KEY_SAVED; var EditorTypes = { document : 0, @@ -120,7 +99,7 @@ var EditorTypes = { var defaultHttpPort = 80, defaultHttpsPort = 443; // Порты по умолчанию (для http и https) var connections = []; // Активные соединения -var redisClient; +var redisClient = pubsubRedis.getClientRedis(); var pubsub; var queue; var clientStatsD = statsDClient.getClient(); @@ -411,28 +390,76 @@ function getParticipantUser(docId, includeUserId) { return el.docId === docId && el.user.id === includeUserId; }); } -function* hasEditors(docId, optEditors) { - var elem, hasEditors = false; - if (!optEditors) { - optEditors = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + docId); +function updatePresenceCommandsToArray(outCommands, docId, userId, userInfo) { + var expireAt = new Date().getTime() + cfgExpPresence * 1000; + outCommands.push( + ['zadd', redisKeyPresenceSet + docId, expireAt, userId], + ['hset', redisKeyPresenceHash + docId, userId, userInfo], + ['expire', redisKeyPresenceSet + docId, cfgExpPresence], + ['expire', redisKeyPresenceHash + docId, cfgExpPresence] + ); +} +function* updatePresence(docId, userId, userInfo) { + var commands = []; + updatePresenceCommandsToArray(commands, docId, userId, userInfo); + var expireAt = new Date().getTime() + cfgExpPresence * 1000; + commands.push(['zadd', redisKeyDocuments, expireAt, docId]); + var multi = redisClient.multi(commands); + yield utils.promiseRedis(multi, multi.exec); +} +function* getAllPresence(docId, optZRange, optHVals) { + var now = (new Date()).getTime(); + var expiredKeys; + var hvals; + if (optHVals && optZRange) { + expiredKeys = optZRange; + hvals = optHVals; + } else { + var multi = redisClient.multi([ + ['zrangebyscore', redisKeyPresenceSet + docId, 0, now], + ['hvals', redisKeyPresenceHash + docId] + ]); + var multiRes = yield utils.promiseRedis(multi, multi.exec); + expiredKeys = multiRes[0]; + hvals = multiRes[1]; } - for (var i = 0; i < optEditors.length; ++i) { - elem = JSON.parse(optEditors[i]); + if (expiredKeys.length > 0) { + var multi = [ + ['zremrangebyscore', redisKeyPresenceSet + docId, 0, now] + ]; + var expiredKeysMap = {}; + for (var i = 0; i < expiredKeys.length; ++i) { + var expiredKey = expiredKeys[i]; + expiredKeysMap[expiredKey] = 1; + commands.push(['hdel', redisKeyPresenceHash + docId, expiredKey]); + } + multi = redisClient.multi(commands); + yield utils.promiseRedis(multi, multi.exec); + hvals = hvals.filter(function(curValue) { + return null == expiredKeysMap[curValue]; + }) + } + return hvals; +} +function* hasEditors(docId, optZRange, optHVals) { + var elem, hasEditors = false; + var hvals = yield* getAllPresence(docId, optZRange, optHVals); + for (var i = 0; i < hvals.length; ++i) { + elem = JSON.parse(hvals[i]); if(!elem.view) { hasEditors = true; break; } } - return hasEditors; } function* publish(data, optDocId, optUserId) { var needPublish = true; if(optDocId && optUserId) { needPublish = false; - var hvalsRes = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + optDocId); - for (var i = 0; i < hvalsRes.length; ++i) { - var elem = JSON.parse(hvalsRes[i]); + var hvals = yield* getAllPresence(optDocId); + for (var i = 0; i < hvals.length; ++i) { + var elem = JSON.parse(hvals[i]); if(optUserId != elem.id) { needPublish = true; break; @@ -444,8 +471,9 @@ function* publish(data, optDocId, optUserId) { pubsub.publish(msg); } } -function* addTask(data, priority) { - yield queue.addTask(data, priority); +function* addTask(data, priority, opt_queue) { + var realQueue = opt_queue ? opt_queue : queue; + yield realQueue.addTask(data, priority); } function* removeResponse(data) { yield queue.removeResponse(data); @@ -453,9 +481,9 @@ function* removeResponse(data) { function* getOriginalParticipantsId(docId) { var result = [], tmpObject = {}; - var hvalsRes = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + docId); - for (var i = 0; i < hvalsRes.length; ++i) { - var elem = JSON.parse(hvalsRes[i]); + var hvals = yield* getAllPresence(docId); + for (var i = 0; i < hvals.length; ++i) { + var elem = JSON.parse(hvals[i]); if (!elem.view) { tmpObject[elem.idOriginal] = 1; } @@ -500,28 +528,19 @@ function parseUrl(callbackUrl) { function* deleteCallback(id) { // Нужно удалить из базы callback-ов - yield utils.promiseRedis(redisClient, redisClient.del, redisKeyCallback + id); yield sqlBase.deleteCallbackPromise(id); } function* getCallback(id) { var callbackUrl = null; var baseUrl = null; - var data = yield utils.promiseRedis(redisClient, redisClient.get, redisKeyCallback + id); - if (data) { - var dataParsed = JSON.parse(data); - callbackUrl = dataParsed.callbackUrl; - baseUrl = dataParsed.baseUrl; - } - else { - var selectRes = yield sqlBase.getCallbackPromise(id); - if (selectRes.length > 0) { - var row = selectRes[0]; - if (row.dc_callback) { - callbackUrl = row.dc_callback; - } - if (row.dc_baseurl) { - baseUrl = row.dc_baseurl; - } + var selectRes = yield sqlBase.getCallbackPromise(id); + if (selectRes.length > 0) { + var row = selectRes[0]; + if (row.dc_callback) { + callbackUrl = row.dc_callback; + } + if (row.dc_baseurl) { + baseUrl = row.dc_baseurl; } } if (null != callbackUrl && null != baseUrl) { @@ -532,8 +551,6 @@ function* getCallback(id) { } function* addCallback(id, href, baseUrl) { yield sqlBase.insertInTablePromise(sqlBase.tableId.callbacks, null, id, href, baseUrl); - yield utils.promiseRedis(redisClient, redisClient.setex, redisKeyCallback + id, cfgExpCallback, - JSON.stringify({callbackUrl: href, baseUrl: baseUrl})); } function* getChangesIndex(docId) { var res = 0; @@ -634,7 +651,7 @@ function* onReplySendStatusDocument(docId, replyData) { var oData = parseReplyData(docId, replyData); if (!(oData && commonDefines.c_oAscServerCommandErrors.NoError == oData.error)) { // Ошибка подписки на callback, посылаем warning - yield* publish({type: PublishType.warning, docId: docId, description: 'Error on save server subscription!'}); + yield* publish({type: commonDefines.c_oPublishType.warning, docId: docId, description: 'Error on save server subscription!'}); } } function* dropUsersFromDocument(docId, replyData) { @@ -642,7 +659,7 @@ function* dropUsersFromDocument(docId, replyData) { if (oData) { users = Array.isArray(oData) ? oData : oData.users; if (Array.isArray(users)) { - yield* publish({type: PublishType.drop, docId: docId, users: users, description: ''}); + yield* publish({type: commonDefines.c_oPublishType.drop, docId: docId, users: users, description: ''}); } } } @@ -685,7 +702,7 @@ function* bindEvents(docId, callback, baseUrl, opt_userAction) { function* cleanDocumentOnExit(docId, deleteChanges, deleteUserIndex) { //clean redis - var redisArgs = [redisClient, redisClient.del, redisKeyLocks + docId, redisKeyEditors + docId, + var redisArgs = [redisClient, redisClient.del, redisKeyLocks + docId, redisKeyPresenceSet + docId, redisKeyPresenceHash + docId, redisKeyMessage + docId, redisKeyChangeIndex + docId, redisKeyForceSave + docId, redisKeyLastSave + docId]; if (deleteUserIndex) { redisArgs.push(redisKeyUserIndex + docId); @@ -707,19 +724,48 @@ function* cleanDocumentOnExitNoChanges(docId, opt_userId) { yield* cleanDocumentOnExit(docId, false, false); } +function* _createSaveTimer(docId, opt_userId, opt_queue, opt_noDelay) { + var updateMask = new taskResult.TaskResultData(); + updateMask.key = docId; + updateMask.status = taskResult.FileStatus.Ok; + var updateTask = new taskResult.TaskResultData(); + updateTask.status = taskResult.FileStatus.SaveVersion; + updateTask.statusInfo = utils.getMillisecondsOfHour(new Date()); + var updateIfRes = yield taskResult.updateIf(updateTask, updateMask); + if (updateIfRes.affectedRows > 0) { + if(!opt_noDelay){ + yield utils.sleep(c_oAscSaveTimeOutDelay); + } + while (true) { + if (!sqlBase.isLockCriticalSection(docId)) { + canvasService.saveFromChanges(docId, updateTask.statusInfo, null, opt_userId, opt_queue); + break; + } + yield utils.sleep(c_oAscLockTimeOutDelay); + } + } else { + //если не получилось - значит FileStatus=SaveVersion(кто-то другой начал сборку) или UpdateVersion(сборка закончена) + //в этом случае ничего делать не надо + logger.debug('_createSaveTimer updateIf no effect'); + } +} + exports.version = asc_coAuthV; exports.c_oAscServerStatus = c_oAscServerStatus; exports.sendData = sendData; exports.parseUrl = parseUrl; exports.parseReplyData = parseReplyData; exports.sendServerRequest = sendServerRequest; -exports.PublishType = PublishType; +exports.createSaveTimerPromise = co.wrap(_createSaveTimer); +exports.getAllPresencePromise = co.wrap(getAllPresence); exports.publish = publish; exports.addTask = addTask; exports.removeResponse = removeResponse; exports.hasEditors = hasEditors; exports.getCallback = getCallback; -exports.cleanDocumentOnExit = cleanDocumentOnExit; +exports.getChangesIndexPromise = co.wrap(getChangesIndex); +exports.cleanDocumentOnExitPromise = co.wrap(cleanDocumentOnExit); +exports.cleanDocumentOnExitNoChangesPromise = co.wrap(cleanDocumentOnExitNoChanges); exports.setForceSave= setForceSave; exports.install = function(server, callbackFunction) { 'use strict'; @@ -773,15 +819,15 @@ exports.install = function(server, callbackFunction) { case 'unLockDocument' : yield* checkEndAuthLock(data.isSave, docId, conn.user.id, conn); break; - case 'ping': - yield utils.promiseRedis(redisClient, redisClient.zadd, redisKeyDocuments, new Date().getTime(), docId); - break; case 'close': yield* closeDocument(conn, false); break; case 'openDocument' : canvasService.openDocument(conn, data); break; + default: + logger.debug("unknown command %s", message); + break; } if(clientStatsD) { if('openDocument' != data.type) { @@ -844,7 +890,7 @@ exports.install = function(server, callbackFunction) { var state = (false == reconnected) ? false : undefined; var tmpUser = conn.user; - yield* publish({type: PublishType.participantsState, docId: docId, user: tmpUser, state: state}, docId, tmpUser.id); + yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, user: tmpUser, state: state}, docId, tmpUser.id); if (!reconnected) { // Для данного пользователя снимаем лок с сохранения @@ -852,15 +898,17 @@ exports.install = function(server, callbackFunction) { if (conn.user.id == saveLock) { yield utils.promiseRedis(redisClient, redisClient.del, redisKeySaveLock + docId); } - + var commands = [ + ['hdel', redisKeyPresenceHash + docId, tmpUser.id], + ['zrem', redisKeyPresenceSet + docId, tmpUser.id] + ]; // Только если редактируем if (false === tmpUser.view) { - var multi = redisClient.multi([ - ['hdel', redisKeyEditors + docId, tmpUser.id], - ['hvals', redisKeyEditors + docId] - ]); + commands.push(['zrangebyscore', redisKeyPresenceSet + docId, 0, (new Date()).getTime()], + ['hvals', redisKeyPresenceHash + docId]); + var multi = redisClient.multi(commands); var execRes = yield utils.promiseRedis(multi, multi.exec); - bHasEditors = yield* hasEditors(docId, execRes[1]); + bHasEditors = yield* hasEditors(docId, execRes[2], execRes[3]); var puckerIndex = yield* getChangesIndex(docId); bHasChanges = puckerIndex > 0; @@ -887,13 +935,14 @@ exports.install = function(server, callbackFunction) { if (0 < userLocks.length) { //todo на close себе ничего не шлем //sendReleaseLock(conn, userLocks); - yield* publish({type: PublishType.releaseLock, docId: docId, userId: conn.user.id, locks: userLocks}, docId, conn.user.id); + yield* publish({type: commonDefines.c_oPublishType.releaseLock, docId: docId, userId: conn.user.id, locks: userLocks}, docId, conn.user.id); } // Для данного пользователя снимаем Lock с документа yield* checkEndAuthLock(false, docId, conn.user.id); } else { - yield utils.promiseRedis(redisClient, redisClient.hdel, redisKeyEditors + docId, tmpUser.id); + var multi = redisClient.multi(commands); + yield utils.promiseRedis(multi, multi.exec); } } } @@ -954,9 +1003,9 @@ exports.install = function(server, callbackFunction) { function* getParticipantMap(docId) { var participantsMap = []; - var hvalsRes = yield utils.promiseRedis(redisClient, redisClient.hvals, redisKeyEditors + docId); - for (var i = 0; i < hvalsRes.length; ++i) { - participantsMap.push(JSON.parse(hvalsRes[i])); + var hvals = yield* getAllPresence(docId); + for (var i = 0; i < hvals.length; ++i) { + participantsMap.push(JSON.parse(hvals[i])); } return participantsMap; } @@ -968,7 +1017,7 @@ exports.install = function(server, callbackFunction) { yield utils.promiseRedis(redisClient, redisClient.del, redisKeyLockDoc + docId); var participantsMap = yield* getParticipantMap(docId); - yield* publish({type: PublishType.auth, docId: docId, userId: userId, participantsMap: participantsMap}, docId, userId); + yield* publish({type: commonDefines.c_oPublishType.auth, docId: docId, userId: userId, participantsMap: participantsMap}, docId, userId); result = true; } else if (isSave) { @@ -976,7 +1025,7 @@ exports.install = function(server, callbackFunction) { var userLocks = yield* getUserLocks(docId, currentConnection.sessionId); if (0 < userLocks.length) { sendReleaseLock(currentConnection, userLocks); - yield* publish({type: PublishType.releaseLock, docId: docId, userId: userId, locks: userLocks}, docId, userId); + yield* publish({type: commonDefines.c_oPublishType.releaseLock, docId: docId, userId: userId, locks: userLocks}, docId, userId); } // Автоматически снимаем lock сами @@ -1288,14 +1337,9 @@ exports.install = function(server, callbackFunction) { function* endAuth(conn, bIsRestore, documentCallbackUrl) { var docId = conn.docId; - connections.push(conn); var tmpUser = conn.user; - var multi = redisClient.multi([ - ['hset', redisKeyEditors + docId, tmpUser.id, JSON.stringify(tmpUser)], - ['expire', redisKeyEditors + docId, cfgExpEditors] - ]); - yield utils.promiseRedis(multi, multi.exec); - yield utils.promiseRedis(redisClient, redisClient.zadd, redisKeyDocuments, new Date().getTime(), docId); + connections.push(conn); + yield* updatePresence(docId, tmpUser.id, JSON.stringify(tmpUser)); var firstParticipantNoView, countNoView = 0; var participantsMap = yield* getParticipantMap(docId); for (var i = 0; i < participantsMap.length; ++i) { @@ -1350,7 +1394,7 @@ exports.install = function(server, callbackFunction) { yield* sendAuthInfo(objChangesDocument.arrChanges, objChangesDocument.getLength(), conn, participantsMap); } } - yield* publish({type: PublishType.participantsState, docId: docId, user: tmpUser, state: true}, docId, tmpUser.id); + yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, user: tmpUser, state: true}, docId, tmpUser.id); } function* sendAuthInfo(objChangesDocument, changesIndex, conn, participantsMap) { @@ -1403,7 +1447,7 @@ exports.install = function(server, callbackFunction) { var messages = [msg]; sendDataMessage(conn, messages); - yield* publish({type: PublishType.message, docId: docId, userId: userId, messages: messages}, docId, userId); + yield* publish({type: commonDefines.c_oPublishType.message, docId: docId, userId: userId, messages: messages}, docId, userId); } function* onCursor(conn, data) { @@ -1414,7 +1458,7 @@ exports.install = function(server, callbackFunction) { logger.info("send cursor: docId = %s %s", docId, msg); var messages = [msg]; - yield* publish({type: PublishType.cursor, docId: docId, userId: userId, messages: messages}, docId, userId); + yield* publish({type: commonDefines.c_oPublishType.cursor, docId: docId, userId: userId, messages: messages}, docId, userId); } function* getLock(conn, data, bIsRestore) { @@ -1457,7 +1501,7 @@ exports.install = function(server, callbackFunction) { } //тому кто зделал запрос возвращаем максимально быстро sendData(conn, {type: "getLock", locks: documentLocks}); - yield* publish({type: PublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId); + yield* publish({type: commonDefines.c_oPublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId); return true; } @@ -1482,7 +1526,7 @@ exports.install = function(server, callbackFunction) { } //тому кто зделал запрос возвращаем максимально быстро sendData(conn, {type: "getLock", locks: documentLocks}); - yield* publish({type: PublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId); + yield* publish({type: commonDefines.c_oPublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId); return true; } @@ -1507,7 +1551,7 @@ exports.install = function(server, callbackFunction) { } //тому кто зделал запрос возвращаем максимально быстро sendData(conn, {type: "getLock", locks: documentLocks}); - yield* publish({type: PublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId); + yield* publish({type: commonDefines.c_oPublishType.getLock, docId: docId, userId: userId, documentLocks: documentLocks}, docId, userId); return true; } @@ -1599,7 +1643,7 @@ exports.install = function(server, callbackFunction) { if(changesToSend.length > cfgPubSubMaxChanges) { changesToSend = null; } - yield* publish({type: PublishType.changes, docId: docId, userId: userId, + yield* publish({type: commonDefines.c_oPublishType.changes, docId: docId, userId: userId, changes: changesToSend, startIndex: startIndex, changesIndex: puckerIndex, locks: arrLocks, excelAdditionalInfo: data.excelAdditionalInfo}, docId, userId); } @@ -1612,7 +1656,7 @@ exports.install = function(server, callbackFunction) { if(changesToSend.length > cfgPubSubMaxChanges) { changesToSend = null; } - yield* publish({type: PublishType.changes, docId: docId, userId: userId, + yield* publish({type: commonDefines.c_oPublishType.changes, docId: docId, userId: userId, changes: changesToSend, startIndex: startIndex, changesIndex: puckerIndex, locks: [], excelAdditionalInfo: undefined}, docId, userId); sendData(conn, {type: 'savePartChanges', changesIndex: changesIndex}); @@ -1767,30 +1811,6 @@ exports.install = function(server, callbackFunction) { return {res: !isLock, documentLocks: documentLocks}; } - function* _createSaveTimer(docId, opt_userId) { - var updateMask = new taskResult.TaskResultData(); - updateMask.key = docId; - updateMask.status = taskResult.FileStatus.Ok; - var updateTask = new taskResult.TaskResultData(); - updateTask.status = taskResult.FileStatus.SaveVersion; - updateTask.statusInfo = utils.getMillisecondsOfHour(new Date()); - var updateIfRes = yield taskResult.updateIf(updateTask, updateMask); - if (updateIfRes.affectedRows > 0) { - yield utils.sleep(c_oAscSaveTimeOutDelay); - while (true) { - if (!sqlBase.isLockCriticalSection(docId)) { - canvasService.saveFromChanges(docId, updateTask.statusInfo, null, opt_userId); - break; - } - yield utils.sleep(c_oAscLockTimeOutDelay); - } - } else { - //если не получилось - значит FileStatus=SaveVersion(кто-то другой начал сборку) или UpdateVersion(сборка закончена) - //в этом случае ничего делать не надо - logger.debug('_createSaveTimer updateIf no effect'); - } - } - function _checkLicense(conn) { sendData(conn, {type: 'license', license: licenseInfo}); } @@ -1800,51 +1820,6 @@ exports.install = function(server, callbackFunction) { logger.info(message); }}); - var checkDocumentExpire = function() { - return co(function*() { - try { - logger.debug('checkDocumentExpire start'); - var removedCount = 0; - var dateExpire = new Date(); - utils.addSeconds(dateExpire, -cfgExpDocuments); - var expireDocs = yield utils.promiseRedis(redisClient, redisClient.zrangebyscore, redisKeyDocuments, '-inf', dateExpire.getTime()); - for (var i = 0; i < expireDocs.length; ++i) { - var docId = expireDocs[i]; - var numDelete = yield utils.promiseRedis(redisClient, redisClient.zrem, redisKeyDocuments, docId); - //если numDelete == 0, значит этот ключ удалил другой процесс - if (numDelete > 0) { - removedCount++; - var puckerIndex = yield* getChangesIndex(docId); - if (puckerIndex > 0) { - logger.debug('checkDocumentExpire commit %d changes: docId = %s', puckerIndex, docId); - yield* _createSaveTimer(docId); - } else { - logger.debug('checkDocumentExpire no changes: docId = %s', docId); - yield* cleanDocumentOnExitNoChanges(docId); - } - } - } - logger.debug('checkDocumentExpire end: removedCount = %d', removedCount); - } catch (e) { - logger.error('checkDocumentExpire error:\r\n%s', e.stack); - } - }); - }; - //удаление файлов от которых не приходит heartbeat - var documentExpireJob = new cron.CronJob(cfgExpDocumentsCron, checkDocumentExpire); - documentExpireJob.start(); - var fileExpireJob = new cron.CronJob(cfgExpFilesCron, checkExpire.checkFileExpire); - fileExpireJob.start(); - - //cache - redisClient = redis.createClient(cfgRedisPort, cfgRedisHost, {}); - redisClient.on('error', function(err) { - logger.error('redisClient error:\r\n%s', err.stack); - }); -// redisClient.on("connect", function () { -// logger.debug('redisClient connect'); -// }); - //publish subscribe message brocker function pubsubOnMessage(msg) { return co(function* () { @@ -1856,32 +1831,32 @@ exports.install = function(server, callbackFunction) { var objChangesDocument; var i; switch (data.type) { - case PublishType.drop: + case commonDefines.c_oPublishType.drop: for (i = 0; i < data.users.length; ++i) { dropUserFromDocument(data.docId, data.users[i], data.description); } break; - case PublishType.releaseLock: + case commonDefines.c_oPublishType.releaseLock: participants = getParticipants(true, data.docId, data.userId, true); _.each(participants, function(participant) { sendReleaseLock(participant, data.locks); }); break; - case PublishType.participantsState: + case commonDefines.c_oPublishType.participantsState: participants = getParticipants(true, data.docId, data.user.id); sendParticipantsState(participants, data); break; - case PublishType.message: + case commonDefines.c_oPublishType.message: participants = getParticipants(true, data.docId, data.userId); _.each(participants, function(participant) { sendDataMessage(participant, data.messages); }); break; - case PublishType.getLock: + case commonDefines.c_oPublishType.getLock: participants = getParticipants(true, data.docId, data.userId, true); sendGetLock(participants, data.documentLocks); break; - case PublishType.changes: + case commonDefines.c_oPublishType.changes: participants = getParticipants(true, data.docId, data.userId, true); if(participants.length > 0) { var changes = data.changes; @@ -1895,7 +1870,7 @@ exports.install = function(server, callbackFunction) { }); } break; - case PublishType.auth: + case commonDefines.c_oPublishType.auth: participants = getParticipants(true, data.docId, data.userId, true); if(participants.length > 0) { objChangesDocument = yield* getDocumentChanges(data.docId); @@ -1905,7 +1880,7 @@ exports.install = function(server, callbackFunction) { } } break; - case PublishType.receiveTask: + case commonDefines.c_oPublishType.receiveTask: var cmd = new commonDefines.InputCommand(data.cmd); var output = new canvasService.OutputDataWrap(); output.fromObject(data.output); @@ -1937,21 +1912,43 @@ exports.install = function(server, callbackFunction) { sendData(participant, output); } break; - case PublishType.warning: + case commonDefines.c_oPublishType.warning: participants = getParticipants(false, data.docId); _.each(participants, function(participant) { sendDataWarning(participant, data.description); }); break; - case PublishType.cursor: + case commonDefines.c_oPublishType.cursor: participants = getParticipants(true, data.docId, data.userId); _.each(participants, function(participant) { sendDataCursor(participant, data.messages); }); break; + case commonDefines.c_oPublishType.expireDoc: + logger.debug('pubsub expireDoc connections.length = %d', connections.length); + var commands = []; + var idSet = new Set(); + for (i = 0; i < connections.length; ++i) { + var conn = connections[i]; + if (!conn.isCloseCoAuthoring) { + idSet.add(conn.docId); + updatePresenceCommandsToArray(commands, conn.docId, conn.user.id, JSON.stringify(conn.user)); + } + } + var expireAt = new Date().getTime() + cfgExpPresence * 1000; + idSet.forEach(function(value1, value2, set) { + commands.push(['zadd', redisKeyDocuments, expireAt, value1]); + }); + if (commands.length > 0) { + var multi = redisClient.multi(commands); + yield utils.promiseRedis(multi, multi.exec); + } + break; + default: + logger.debug('pubsub unknown message type:%s', msg); } } catch (err) { - logger.debug('pubsub message error:\r\n%s', err.stack); + logger.error('pubsub message error:\r\n%s', err.stack); } }); } @@ -1996,7 +1993,7 @@ exports.commandFromServer = function (req, res) { break; case 'drop': if (query.userid) { - yield* publish({type: PublishType.drop, docId: docId, users: [query.userid], description: query.description}); + yield* publish({type: commonDefines.c_oPublishType.drop, docId: docId, users: [query.userid], description: query.description}); } else if (query.users) { yield* dropUsersFromDocument(docId, query.users); diff --git a/DocService/sources/canvasservice.js b/DocService/sources/canvasservice.js index a82cc5a3..045d3133 100644 --- a/DocService/sources/canvasservice.js +++ b/DocService/sources/canvasservice.js @@ -1 +1 @@ -var pathModule = require('path'); var urlModule = require('url'); var co = require('co'); var sqlBase = require('./baseConnector'); var docsCoServer = require('./DocsCoServer'); var taskResult = require('./taskresult'); var logger = require('./../../Common/sources/logger'); var utils = require('./../../Common/sources/utils'); var constants = require('./../../Common/sources/constants'); var commonDefines = require('./../../Common/sources/commondefines'); var storage = require('./../../Common/sources/storage-base'); var formatChecker = require('./../../Common/sources/formatchecker'); var statsDClient = require('./../../Common/sources/statsdclient'); var config = require('config'); var config_server = config.get('services.CoAuthoring.server'); var config_utils = config.get('services.CoAuthoring.utils'); var pubsubRedis = require('./pubsubRedis'); var cfgTypesUpload = config_utils.get('limits_image_types_upload'); var cfgTypesCopy = config_utils.get('limits_image_types_copy'); var cfgImageSize = config_server.get('limits_image_size'); var cfgImageDownloadTimeout = config_server.get('limits_image_download_timeout'); var cfgRedisPrefix = config.get('services.CoAuthoring.redis.prefix'); var SAVE_TYPE_PART_START = 0; var SAVE_TYPE_PART = 1; var SAVE_TYPE_COMPLETE = 2; var SAVE_TYPE_COMPLETE_ALL = 3; var clientStatsD = statsDClient.getClient(); var redisClient = pubsubRedis.createClientRedis(); var redisKeySaved = cfgRedisPrefix + 'saved:'; function OutputDataWrap(type, data) { this['type'] = type; this['data'] = data; } OutputDataWrap.prototype = { fromObject: function(data) { this['type'] = data['type']; this['data'] = new OutputData(); this['data'].fromObject(data['data']); }, getType: function() { return this['type']; }, setType: function(data) { this['type'] = data; }, getData: function() { return this['data']; }, setData: function(data) { this['data'] = data; } }; function OutputData(type) { this['type'] = type; this['status'] = undefined; this['data'] = undefined; } OutputData.prototype = { fromObject: function(data) { this['type'] = data['type']; this['status'] = data['status']; this['data'] = data['data']; }, getType: function() { return this['type']; }, setType: function(data) { this['type'] = data; }, getStatus: function() { return this['status']; }, setStatus: function(data) { this['status'] = data; }, getData: function() { return this['data']; }, setData: function(data) { this['data'] = data; } }; function* getOutputData(cmd, outputData, key, status, statusInfo, optConn, optAdditionalOutput) { var docId = cmd.getDocId(); switch (status) { case taskResult.FileStatus.SaveVersion: case taskResult.FileStatus.UpdateVersion: case taskResult.FileStatus.Ok: if(taskResult.FileStatus.Ok == status) { outputData.setStatus('ok'); } else if(taskResult.FileStatus.SaveVersion == status) { if (optConn && optConn.user.view) { outputData.setStatus('updateversion'); } else { var updateMask = new taskResult.TaskResultData(); updateMask.key = docId; updateMask.status = status; updateMask.statusInfo = statusInfo; var updateTask = new taskResult.TaskResultData(); updateTask.status = taskResult.FileStatus.Ok; updateTask.statusInfo = constants.NO_ERROR; var updateIfRes = yield taskResult.updateIf(updateTask, updateMask); if (updateIfRes.affectedRows > 0) { outputData.setStatus('ok'); } else { outputData.setStatus('updateversion'); } } } else { outputData.setStatus('updateversion'); } var command = cmd.getCommand(); if ('open' != command && 'reopen' != command) { var strPath = key + '/' + cmd.getOutputPath(); if (optConn) { outputData.setData(yield storage.getSignedUrl(optConn.baseUrl, strPath, null, cmd.getTitle())); } else if (optAdditionalOutput) { optAdditionalOutput.needUrlKey = strPath; optAdditionalOutput.needUrlMethod = 2; } } else { if (optConn) { outputData.setData(yield storage.getSignedUrls(optConn.baseUrl, key)); } else if (optAdditionalOutput) { optAdditionalOutput.needUrlKey = key; optAdditionalOutput.needUrlMethod = 0; } } break; case taskResult.FileStatus.NeedParams: outputData.setStatus('needparams'); var settingsPath = key + '/' + 'settings.json'; if (optConn) { outputData.setData(yield storage.getSignedUrl(optConn.baseUrl, settingsPath)); } else if (optAdditionalOutput) { optAdditionalOutput.needUrlKey = settingsPath; optAdditionalOutput.needUrlMethod = 1; } break; case taskResult.FileStatus.Err: case taskResult.FileStatus.ErrToReload: outputData.setStatus('err'); outputData.setData(statusInfo); if (taskResult.FileStatus.ErrToReload == status) { yield taskResult.remove(key); } break; } } function* addRandomKeyTaskCmd(cmd) { var task = yield* taskResult.addRandomKeyTask(cmd.getDocId()); cmd.setSaveKey(task.key); } function* saveParts(cmd) { var result = false; var saveType = cmd.getSaveType(); var filename; if (SAVE_TYPE_COMPLETE_ALL === saveType) { filename = 'Editor.bin'; } else { filename = 'Editor' + (cmd.getSaveIndex() || '') + '.bin'; } if (SAVE_TYPE_PART_START === saveType || SAVE_TYPE_COMPLETE_ALL === saveType) { yield* addRandomKeyTaskCmd(cmd); } if (cmd.getUrl()) { result = true; } else { var buffer = cmd.getData(); yield storage.putObject(cmd.getSaveKey() + '/' + filename, buffer, buffer.length); //delete data to prevent serialize into json cmd.data = null; result = (SAVE_TYPE_COMPLETE_ALL === saveType || SAVE_TYPE_COMPLETE === saveType); } return result; } function getSaveTask(cmd) { cmd.setData(null); var queueData = new commonDefines.TaskQueueData(); queueData.setCmd(cmd); queueData.setToFile(constants.OUTPUT_NAME + '.' + formatChecker.getStringFromFormat(cmd.getOutputFormat())); //todo paid //if (cmd.vkey) { // bool // bPaid; // Signature.getVKeyParams(cmd.vkey, out bPaid); // oTaskQueueData.m_bPaid = bPaid; //} return queueData; } function getUpdateResponse(cmd) { var updateTask = new taskResult.TaskResultData(); updateTask.key = cmd.getSaveKey() ? cmd.getSaveKey() : cmd.getDocId(); var statusInfo = cmd.getStatusInfo(); if (constants.NO_ERROR == statusInfo) { updateTask.status = taskResult.FileStatus.Ok; } else if (constants.CONVERT_DOWNLOAD == statusInfo) { updateTask.status = taskResult.FileStatus.ErrToReload; } else if (constants.CONVERT_NEED_PARAMS == statusInfo) { updateTask.status = taskResult.FileStatus.NeedParams; } else { updateTask.status = taskResult.FileStatus.Err; } updateTask.statusInfo = statusInfo; if (cmd.getTitle()) { updateTask.title = cmd.getTitle(); } return updateTask; } function* commandOpen(conn, cmd, outputData) { var task = new taskResult.TaskResultData(); task.key = cmd.getDocId(); task.format = cmd.getFormat(); task.status = taskResult.FileStatus.WaitQueue; task.statusInfo = constants.NO_ERROR; task.title = cmd.getTitle(); var upsertRes = yield taskResult.upsert(task); //if CLIENT_FOUND_ROWS don't specify 1 row is inserted , 2 row is updated, and 0 row is set to its current values //http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html var bCreate = upsertRes.affectedRows == 1; if (!bCreate) { var selectRes = yield taskResult.select(task); if (selectRes.length > 0) { var row = selectRes[0]; yield* getOutputData(cmd, outputData, cmd.getDocId(), row.tr_status, row.tr_status_info, conn); } } else { //add task cmd.setOutputFormat(constants.AVS_OFFICESTUDIO_FILE_CANVAS); cmd.setEmbeddedFonts(false); var dataQueue = new commonDefines.TaskQueueData(); dataQueue.setCmd(cmd); dataQueue.setToFile('Editor.bin'); yield* docsCoServer.addTask(dataQueue, constants.QUEUE_PRIORITY_HIGH); } } function* commandReopen(cmd) { var task = new taskResult.TaskResultData(); task.key = cmd.getDocId(); task.status = taskResult.FileStatus.WaitQueue; task.statusInfo = constants.NO_ERROR; var upsertRes = yield taskResult.update(task); if (upsertRes.affectedRows > 0) { //add task cmd.setUrl(null);//url may expire cmd.setSaveKey(cmd.getDocId()); cmd.setOutputFormat(constants.AVS_OFFICESTUDIO_FILE_CANVAS); cmd.setEmbeddedFonts(false); var dataQueue = new commonDefines.TaskQueueData(); dataQueue.setCmd(cmd); dataQueue.setToFile('Editor.bin'); dataQueue.setFromSettings(true); yield* docsCoServer.addTask(dataQueue, constants.QUEUE_PRIORITY_HIGH); } } function* commandSave(cmd, outputData) { var completeParts = yield* saveParts(cmd); if (completeParts) { var queueData = getSaveTask(cmd); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); } outputData.setStatus('ok'); outputData.setData(cmd.getSaveKey()); } function* commandSendMailMerge(cmd, outputData) { var completeParts = yield* saveParts(cmd); var isErr = false; if (completeParts) { isErr = true; var getRes = yield* docsCoServer.getCallback(cmd.getDocId()); if (getRes) { var mailMergeSend = cmd.getMailMergeSend(); mailMergeSend.setUrl(getRes.server.href); mailMergeSend.setBaseUrl(getRes.baseUrl); //меняем JsonKey и SaveKey, новый key нужет потому что за одну конвертацию делается часть, а json нужен всегда mailMergeSend.setJsonKey(cmd.getSaveKey()); yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); isErr = false; } } if (isErr) { outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); } else { outputData.setStatus('ok'); outputData.setData(cmd.getSaveKey()); } } function* commandSfctByCmd(cmd) { yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); queueData.setFromChanges(true); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); } function* commandSfct(cmd, outputData) { yield* commandSfctByCmd(cmd); outputData.setStatus('ok'); } function isDisplayedImage(strName) { var res = 0; if (strName) { //шаблон display[N]image.ext var findStr = constants.DISPLAY_PREFIX; var index = strName.indexOf(findStr); if (-1 != index) { if (index + findStr.length < strName.length) { var displayN = parseInt(strName[index + findStr.length]); if (1 <= displayN && displayN <= 6) { var imageIndex = index + findStr.length + 1; if (imageIndex == strName.indexOf("image", imageIndex)) res = displayN; } } } } return res; } function* commandImgurls(conn, cmd, outputData) { var supportedFormats; var urls; var errorCode = constants.NO_ERROR; var isImgUrl = 'imgurl' == cmd.getCommand(); if (isImgUrl) { urls = [cmd.getData()]; supportedFormats = cfgTypesUpload || 'jpg'; } else { urls = cmd.getData(); supportedFormats = cfgTypesCopy || 'jpg'; } //todo Promise.all() var displayedImageMap = {};//to make one imageIndex for ole object urls var imageCount = 0; var outputUrls = []; for (var i = 0; i < urls.length; ++i) { var urlSource = urls[i]; var urlParsed; var data = undefined; if (urlSource.startsWith('data:')) { var delimiterIndex = urlSource.indexOf(','); if (-1 != delimiterIndex && (urlSource.length - (delimiterIndex + 1)) * 0.75 <= cfgImageSize) { data = new Buffer(urlSource.substring(delimiterIndex + 1), 'base64'); } } else if(urlSource) { //todo stream data = yield utils.downloadUrlPromise(urlSource, cfgImageDownloadTimeout * 1000, cfgImageSize); urlParsed = urlModule.parse(urlSource); } var outputUrl = {url: 'error', path: 'error'}; if (data) { var format = formatChecker.getImageFormat(data); var formatStr; if (constants.AVS_OFFICESTUDIO_FILE_UNKNOWN == format && urlParsed) { //bin, txt occur in ole object case var ext = pathModule.extname(urlParsed.pathname); if ('.bin' == ext || '.txt' == ext) { formatStr = ext.substring(1); } } else { formatStr = formatChecker.getStringFromFormat(format); } if (formatStr && -1 !== supportedFormats.indexOf(formatStr)) { var userid = cmd.getUserId(); var imageIndex = cmd.getSaveIndex() + imageCount; imageCount++; var strLocalPath = 'media/' + utils.crc32(userid).toString(16) + '_'; if (urlParsed) { var urlBasename = pathModule.basename(urlParsed.pathname); var displayN = isDisplayedImage(urlBasename); if (displayN > 0) { var displayedImageName = urlBasename.substring(0, urlBasename.length - formatStr.length - 1); var tempIndex = displayedImageMap[displayedImageName]; if (null != tempIndex) { imageIndex = tempIndex; imageCount--; } else { displayedImageMap[displayedImageName] = imageIndex; } strLocalPath += constants.DISPLAY_PREFIX + displayN; } } strLocalPath += 'image' + imageIndex + '.' + formatStr; var strPath = cmd.getDocId() + '/' + strLocalPath; yield storage.putObject(strPath, data, data.length); var imgUrl = yield storage.getSignedUrl(conn.baseUrl, strPath); outputUrl = {url: imgUrl, path: strLocalPath}; } } if (isImgUrl && ('error' === outputUrl.url || 'error' === outputUrl.path)) { errorCode = constants.UPLOAD_EXTENSION; break; } outputUrls.push(outputUrl); } if (constants.NO_ERROR !== errorCode) { outputData.setStatus('err'); outputData.setData(errorCode); } else { outputData.setStatus('ok'); outputData.setData(outputUrls); } } function* commandSaveFromOrigin(cmd, outputData) { yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); queueData.setFromOrigin(true); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); outputData.setStatus('ok'); outputData.setData(cmd.getSaveKey()); } function* commandSfcCallback(cmd, isSfcm) { var docId = cmd.getDocId(); logger.debug('Start commandSfcCallback: docId = %s', docId); var saveKey = cmd.getSaveKey(); var statusInfo = cmd.getStatusInfo(); var isError = constants.NO_ERROR != statusInfo && constants.CONVERT_CORRUPTED != statusInfo; var savePathDoc = saveKey + '/' + cmd.getOutputPath(); var savePathChanges = saveKey + '/changes.zip'; var savePathHistory = saveKey + '/changesHistory.json'; var getRes = yield* docsCoServer.getCallback(docId); if (getRes) { logger.debug('Callback commandSfcCallback: docId = %s callback = %s', docId, getRes.server.href); var outputSfc = new commonDefines.OutputSfcData(); outputSfc.setKey(docId); if (cmd.getUserId()) { outputSfc.setUsers([cmd.getUserId()]); } if (isSfcm) { outputSfc.setActions(undefined); outputSfc.setUserData(cmd.getUserData()); } else { //use UserId case UserActionId miss in gc convertion var userActionId = cmd.getUserActionId() || cmd.getUserId(); if (userActionId) { outputSfc.setActions([new commonDefines.OutputAction(commonDefines.c_oAscUserAction.Out, userActionId)]); } } if (!isError) { try { var data = yield storage.getObject(savePathHistory); outputSfc.setChangeHistory(data.toString('utf-8')); outputSfc.setUrl(yield storage.getSignedUrl(getRes.baseUrl, savePathDoc)); outputSfc.setChangeUrl(yield storage.getSignedUrl(getRes.baseUrl, savePathChanges)); } catch (e) { logger.error('Error commandSfcCallback: docId = %s\r\n%s', docId, e.stack); } if (outputSfc.getUrl() && outputSfc.getUsers().length > 0) { if (isSfcm) { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.MustSaveForce); } else { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.MustSave); } } else { isError = true; } } if (isError) { if (isSfcm) { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.CorruptedForce); } else { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.Corrupted); } } var uri = getRes.server.href; var postData = JSON.stringify(outputSfc); if (isSfcm) { var lastSave = cmd.getLastSave(); if (lastSave && !isError) { yield* docsCoServer.setForceSave(docId, lastSave, savePathDoc); } try { yield* docsCoServer.sendServerRequest(docId, uri, postData); } catch (err) { logger.error('sendServerRequest error: docId = %s;url = %s;data = %s\r\n%s', docId, uri, postData, err.stack); } } else { //if anybody in document stop save var hasEditors = yield* docsCoServer.hasEditors(docId); logger.debug('hasEditors commandSfcCallback: docId = %s hasEditors = %d', docId, hasEditors); if (!hasEditors) { var updateMask = new taskResult.TaskResultData(); updateMask.key = docId; updateMask.status = taskResult.FileStatus.SaveVersion; updateMask.statusInfo = cmd.getData(); var updateIfTask = new taskResult.TaskResultData(); updateIfTask.status = taskResult.FileStatus.UpdateVersion; updateIfTask.statusInfo = constants.NO_ERROR; var updateIfRes = yield taskResult.updateIf(updateIfTask, updateMask); if (updateIfRes.affectedRows > 0) { var replyStr = null; try { replyStr = yield* docsCoServer.sendServerRequest(docId, uri, postData); } catch (err) { replyStr = null; logger.error('sendServerRequest error: docId = %s;url = %s;data = %s\r\n%s', docId, uri, postData, err.stack); } var requestRes = false; var replyData = docsCoServer.parseReplyData(docId, replyStr); if (replyData && commonDefines.c_oAscServerCommandErrors.NoError == replyData.error) { //в случае comunity server придет запрос в CommandService проверяем результат var multi = redisClient.multi([ ['get', redisKeySaved + docId], ['del', redisKeySaved + docId] ]); var execRes = yield utils.promiseRedis(multi, multi.exec); var savedVal = execRes[0]; requestRes = (null == savedVal || '1' === savedVal); } if (requestRes) { yield* docsCoServer.cleanDocumentOnExit(docId, true, true); } else { var updateTask = new taskResult.TaskResultData(); updateTask.key = docId; updateTask.status = taskResult.FileStatus.Ok; updateTask.statusInfo = constants.NO_ERROR; yield taskResult.update(updateTask); } } } } } logger.debug('End commandSfcCallback: docId = %s', docId); } function* commandSendMMCallback(cmd) { var docId = cmd.getDocId(); logger.debug('Start commandSendMMCallback: docId = %s', docId); var saveKey = cmd.getSaveKey(); var statusInfo = cmd.getStatusInfo(); var outputSfc = new commonDefines.OutputSfcData(); outputSfc.setKey(docId); if (constants.NO_ERROR == statusInfo) { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.MailMerge); } else { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.Corrupted); } var mailMergeSendData = cmd.getMailMergeSend(); var outputMailMerge = new commonDefines.OutputMailMerge(mailMergeSendData); outputSfc.setMailMerge(outputMailMerge); outputSfc.setUsers([mailMergeSendData.getUserId()]); var data = yield storage.getObject(saveKey + '/' + cmd.getOutputPath()); var xml = data.toString('utf8'); var files = xml.match(/[< ]file.*?\/>/g); var recordRemain = (mailMergeSendData.getRecordTo() - mailMergeSendData.getRecordFrom() + 1); var recordIndexStart = mailMergeSendData.getRecordCount() - recordRemain; for (var i = 0; i < files.length; ++i) { var file = files[i]; var fieldRes = /field=["'](.*?)["']/.exec(file); outputMailMerge.setTo(fieldRes[1]); outputMailMerge.setRecordIndex(recordIndexStart + i); var pathRes = /path=["'](.*?)["']/.exec(file); var signedUrl = yield storage.getSignedUrl(mailMergeSendData.getBaseUrl(), saveKey + '/' + pathRes[1]); outputSfc.setUrl(signedUrl); var uri = mailMergeSendData.getUrl(); var postData = JSON.stringify(outputSfc); try { yield* docsCoServer.sendServerRequest(docId, uri, postData); } catch (err) { logger.error('sendServerRequest error: docId = %s;url = %s;data = %s\r\n%s', docId, uri, postData, err.stack); } } var newRecordFrom = mailMergeSendData.getRecordFrom() + Math.max(files.length, 1); if (newRecordFrom <= mailMergeSendData.getRecordTo()) { mailMergeSendData.setRecordFrom(newRecordFrom); yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); } else { logger.debug('End MailMerge: docId = %s', docId); } logger.debug('End commandSendMMCallback: docId = %s', docId); } exports.openDocument = function(conn, input) { return co(function* () { var outputData; var docId = conn ? conn.docId : 'null'; try { var startDate = null; if(clientStatsD) { startDate = new Date(); } var cmd = new commonDefines.InputCommand(input.message); logger.debug('Start command: docId = %s %s', docId, JSON.stringify(cmd)); outputData = new OutputData(cmd.getCommand()); switch (cmd.getCommand()) { case 'open': //yield utils.sleep(5000); yield* commandOpen(conn, cmd, outputData); break; case 'reopen': yield* commandReopen(cmd); break; case 'imgurl': case 'imgurls': yield* commandImgurls(conn, cmd, outputData); break; default: outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); break; } if(clientStatsD) { clientStatsD.timing('coauth.openDocument.' + cmd.getCommand(), new Date() - startDate); } } catch (e) { logger.error('Error openDocument: docId = %s\r\n%s', docId, e.stack); if (!outputData) { outputData = new OutputData(); } outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); } finally { if (outputData && outputData.getStatus()) { logger.debug('Response command: docId = %s %s', docId, JSON.stringify(outputData)); docsCoServer.sendData(conn, new OutputDataWrap('documentOpen', outputData)); } logger.debug('End command: docId = %s', docId); } }); }; exports.downloadAs = function(req, res) { return co(function* () { var docId = 'null'; try { var startDate = null; if(clientStatsD) { startDate = new Date(); } var strCmd = req.query['cmd']; var cmd = new commonDefines.InputCommand(JSON.parse(strCmd)); docId = cmd.getDocId(); logger.debug('Start downloadAs: docId = %s %s', docId, strCmd); cmd.setData(req.body); var outputData = new OutputData(cmd.getCommand()); switch (cmd.getCommand()) { case 'save': yield* commandSave(cmd, outputData); break; case 'savefromorigin': yield* commandSaveFromOrigin(cmd, outputData); break; case 'sendmm': yield* commandSendMailMerge(cmd, outputData); break; case 'sfct': yield* commandSfct(cmd, outputData); break; default: outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); break; } var strRes = JSON.stringify(outputData); res.send(strRes); logger.debug('End downloadAs: docId = %s %s', docId, strRes); if(clientStatsD) { clientStatsD.timing('coauth.downloadAs.' + cmd.getCommand(), new Date() - startDate); } } catch (e) { logger.error('Error downloadAs: docId = %s\r\n%s', docId, e.stack); res.sendStatus(400); } }); }; exports.saveFromChanges = function(docId, statusInfo, optFormat, opt_userId) { return co(function* () { try { var startDate = null; if(clientStatsD) { startDate = new Date(); } logger.debug('Start saveFromChanges: docId = %s', docId); var task = new taskResult.TaskResultData(); task.key = docId; //делаем select, потому что за время timeout информация могла измениться var selectRes = yield taskResult.select(task); var row = selectRes.length > 0 ? selectRes[0] : null; if (row && row.tr_status == taskResult.FileStatus.SaveVersion && row.tr_status_info == statusInfo) { if (null == optFormat) { optFormat = constants.AVS_OFFICESTUDIO_FILE_OTHER_TEAMLAB_INNER; } var cmd = new commonDefines.InputCommand(); cmd.setCommand('sfc'); cmd.setDocId(docId); cmd.setOutputFormat(optFormat); cmd.setData(statusInfo); cmd.setUserActionId(opt_userId); yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); queueData.setFromChanges(true); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_NORMAL); logger.debug('AddTask saveFromChanges: docId = %s', docId); } else { if (row) { logger.debug('saveFromChanges status mismatch: docId = %s; row: %d; %d; expected: %d', docId, row.tr_status, row.tr_status_info, statusInfo); } } if (clientStatsD) { clientStatsD.timing('coauth.saveFromChanges', new Date() - startDate); } } catch (e) { logger.error('Error saveFromChanges: docId = %s\r\n%s', docId, e.stack); } }); }; exports.receiveTask = function(data, dataRaw) { return co(function* () { var docId = 'null'; try { var task = new commonDefines.TaskQueueData(JSON.parse(data)); if (task) { var cmd = task.getCmd(); docId = cmd.getDocId(); logger.debug('Start receiveTask: docId = %s %s', docId, data); var updateTask = getUpdateResponse(cmd); var updateRes = yield taskResult.update(updateTask); if (updateRes.affectedRows > 0) { var outputData = new OutputData(cmd.getCommand()); var command = cmd.getCommand(); var additionalOutput = {needUrlKey: null, needUrlMethod: null}; if ('open' == command || 'reopen' == command) { //yield utils.sleep(5000); yield* getOutputData(cmd, outputData, cmd.getDocId(), updateTask.status, updateTask.statusInfo, null, additionalOutput); } else if ('save' == command || 'savefromorigin' == command || 'sfct' == command) { yield* getOutputData(cmd, outputData, cmd.getSaveKey(), updateTask.status, updateTask.statusInfo, null, additionalOutput); } else if ('sfcm' == command) { yield* commandSfcCallback(cmd, true); } else if ('sfc' == command) { yield* commandSfcCallback(cmd, false); } else if ('sendmm' == command) { yield* commandSendMMCallback(cmd); } else if ('conv' == command) { //nothing } if (outputData.getStatus()) { logger.debug('Send receiveTask: docId = %s %s', docId, JSON.stringify(outputData)); var output = new OutputDataWrap('documentOpen', outputData); yield* docsCoServer.publish({ type: docsCoServer.PublishType.receiveTask, cmd: cmd, output: output, needUrlKey: additionalOutput.needUrlKey, needUrlMethod: additionalOutput.needUrlMethod }); } } yield* docsCoServer.removeResponse(dataRaw); logger.debug('End receiveTask: docId = %s', docId); } } catch (err) { logger.debug('Error receiveTask: docId = %s\r\n%s', docId, err.stack); } }); }; exports.commandSfctByCmd = commandSfctByCmd; exports.OutputDataWrap = OutputDataWrap; exports.OutputData = OutputData; \ No newline at end of file +var pathModule = require('path'); var urlModule = require('url'); var co = require('co'); var sqlBase = require('./baseConnector'); var docsCoServer = require('./DocsCoServer'); var taskResult = require('./taskresult'); var logger = require('./../../Common/sources/logger'); var utils = require('./../../Common/sources/utils'); var constants = require('./../../Common/sources/constants'); var commonDefines = require('./../../Common/sources/commondefines'); var storage = require('./../../Common/sources/storage-base'); var formatChecker = require('./../../Common/sources/formatchecker'); var statsDClient = require('./../../Common/sources/statsdclient'); var config = require('config'); var config_server = config.get('services.CoAuthoring.server'); var config_utils = config.get('services.CoAuthoring.utils'); var pubsubRedis = require('./pubsubRedis'); var cfgTypesUpload = config_utils.get('limits_image_types_upload'); var cfgTypesCopy = config_utils.get('limits_image_types_copy'); var cfgImageSize = config_server.get('limits_image_size'); var cfgImageDownloadTimeout = config_server.get('limits_image_download_timeout'); var cfgRedisPrefix = config.get('services.CoAuthoring.redis.prefix'); var SAVE_TYPE_PART_START = 0; var SAVE_TYPE_PART = 1; var SAVE_TYPE_COMPLETE = 2; var SAVE_TYPE_COMPLETE_ALL = 3; var clientStatsD = statsDClient.getClient(); var redisClient = pubsubRedis.getClientRedis(); var redisKeySaved = cfgRedisPrefix + 'saved:'; function OutputDataWrap(type, data) { this['type'] = type; this['data'] = data; } OutputDataWrap.prototype = { fromObject: function(data) { this['type'] = data['type']; this['data'] = new OutputData(); this['data'].fromObject(data['data']); }, getType: function() { return this['type']; }, setType: function(data) { this['type'] = data; }, getData: function() { return this['data']; }, setData: function(data) { this['data'] = data; } }; function OutputData(type) { this['type'] = type; this['status'] = undefined; this['data'] = undefined; } OutputData.prototype = { fromObject: function(data) { this['type'] = data['type']; this['status'] = data['status']; this['data'] = data['data']; }, getType: function() { return this['type']; }, setType: function(data) { this['type'] = data; }, getStatus: function() { return this['status']; }, setStatus: function(data) { this['status'] = data; }, getData: function() { return this['data']; }, setData: function(data) { this['data'] = data; } }; function* getOutputData(cmd, outputData, key, status, statusInfo, optConn, optAdditionalOutput) { var docId = cmd.getDocId(); switch (status) { case taskResult.FileStatus.SaveVersion: case taskResult.FileStatus.UpdateVersion: case taskResult.FileStatus.Ok: if(taskResult.FileStatus.Ok == status) { outputData.setStatus('ok'); } else if(taskResult.FileStatus.SaveVersion == status) { if (optConn && optConn.user.view) { outputData.setStatus('updateversion'); } else { var updateMask = new taskResult.TaskResultData(); updateMask.key = docId; updateMask.status = status; updateMask.statusInfo = statusInfo; var updateTask = new taskResult.TaskResultData(); updateTask.status = taskResult.FileStatus.Ok; updateTask.statusInfo = constants.NO_ERROR; var updateIfRes = yield taskResult.updateIf(updateTask, updateMask); if (updateIfRes.affectedRows > 0) { outputData.setStatus('ok'); } else { outputData.setStatus('updateversion'); } } } else { outputData.setStatus('updateversion'); } var command = cmd.getCommand(); if ('open' != command && 'reopen' != command) { var strPath = key + '/' + cmd.getOutputPath(); if (optConn) { outputData.setData(yield storage.getSignedUrl(optConn.baseUrl, strPath, null, cmd.getTitle())); } else if (optAdditionalOutput) { optAdditionalOutput.needUrlKey = strPath; optAdditionalOutput.needUrlMethod = 2; } } else { if (optConn) { outputData.setData(yield storage.getSignedUrls(optConn.baseUrl, key)); } else if (optAdditionalOutput) { optAdditionalOutput.needUrlKey = key; optAdditionalOutput.needUrlMethod = 0; } } break; case taskResult.FileStatus.NeedParams: outputData.setStatus('needparams'); var settingsPath = key + '/' + 'settings.json'; if (optConn) { outputData.setData(yield storage.getSignedUrl(optConn.baseUrl, settingsPath)); } else if (optAdditionalOutput) { optAdditionalOutput.needUrlKey = settingsPath; optAdditionalOutput.needUrlMethod = 1; } break; case taskResult.FileStatus.Err: case taskResult.FileStatus.ErrToReload: outputData.setStatus('err'); outputData.setData(statusInfo); if (taskResult.FileStatus.ErrToReload == status) { yield taskResult.remove(key); } break; } } function* addRandomKeyTaskCmd(cmd) { var task = yield* taskResult.addRandomKeyTask(cmd.getDocId()); cmd.setSaveKey(task.key); } function* saveParts(cmd) { var result = false; var saveType = cmd.getSaveType(); var filename; if (SAVE_TYPE_COMPLETE_ALL === saveType) { filename = 'Editor.bin'; } else { filename = 'Editor' + (cmd.getSaveIndex() || '') + '.bin'; } if (SAVE_TYPE_PART_START === saveType || SAVE_TYPE_COMPLETE_ALL === saveType) { yield* addRandomKeyTaskCmd(cmd); } if (cmd.getUrl()) { result = true; } else { var buffer = cmd.getData(); yield storage.putObject(cmd.getSaveKey() + '/' + filename, buffer, buffer.length); //delete data to prevent serialize into json cmd.data = null; result = (SAVE_TYPE_COMPLETE_ALL === saveType || SAVE_TYPE_COMPLETE === saveType); } return result; } function getSaveTask(cmd) { cmd.setData(null); var queueData = new commonDefines.TaskQueueData(); queueData.setCmd(cmd); queueData.setToFile(constants.OUTPUT_NAME + '.' + formatChecker.getStringFromFormat(cmd.getOutputFormat())); //todo paid //if (cmd.vkey) { // bool // bPaid; // Signature.getVKeyParams(cmd.vkey, out bPaid); // oTaskQueueData.m_bPaid = bPaid; //} return queueData; } function getUpdateResponse(cmd) { var updateTask = new taskResult.TaskResultData(); updateTask.key = cmd.getSaveKey() ? cmd.getSaveKey() : cmd.getDocId(); var statusInfo = cmd.getStatusInfo(); if (constants.NO_ERROR == statusInfo) { updateTask.status = taskResult.FileStatus.Ok; } else if (constants.CONVERT_DOWNLOAD == statusInfo) { updateTask.status = taskResult.FileStatus.ErrToReload; } else if (constants.CONVERT_NEED_PARAMS == statusInfo) { updateTask.status = taskResult.FileStatus.NeedParams; } else { updateTask.status = taskResult.FileStatus.Err; } updateTask.statusInfo = statusInfo; if (cmd.getTitle()) { updateTask.title = cmd.getTitle(); } return updateTask; } function* commandOpen(conn, cmd, outputData) { var task = new taskResult.TaskResultData(); task.key = cmd.getDocId(); task.format = cmd.getFormat(); task.status = taskResult.FileStatus.WaitQueue; task.statusInfo = constants.NO_ERROR; task.title = cmd.getTitle(); var upsertRes = yield taskResult.upsert(task); //if CLIENT_FOUND_ROWS don't specify 1 row is inserted , 2 row is updated, and 0 row is set to its current values //http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html var bCreate = upsertRes.affectedRows == 1; if (!bCreate) { var selectRes = yield taskResult.select(task); if (selectRes.length > 0) { var row = selectRes[0]; yield* getOutputData(cmd, outputData, cmd.getDocId(), row.tr_status, row.tr_status_info, conn); } } else { //add task cmd.setOutputFormat(constants.AVS_OFFICESTUDIO_FILE_CANVAS); cmd.setEmbeddedFonts(false); var dataQueue = new commonDefines.TaskQueueData(); dataQueue.setCmd(cmd); dataQueue.setToFile('Editor.bin'); yield* docsCoServer.addTask(dataQueue, constants.QUEUE_PRIORITY_HIGH); } } function* commandReopen(cmd) { var task = new taskResult.TaskResultData(); task.key = cmd.getDocId(); task.status = taskResult.FileStatus.WaitQueue; task.statusInfo = constants.NO_ERROR; var upsertRes = yield taskResult.update(task); if (upsertRes.affectedRows > 0) { //add task cmd.setUrl(null);//url may expire cmd.setSaveKey(cmd.getDocId()); cmd.setOutputFormat(constants.AVS_OFFICESTUDIO_FILE_CANVAS); cmd.setEmbeddedFonts(false); var dataQueue = new commonDefines.TaskQueueData(); dataQueue.setCmd(cmd); dataQueue.setToFile('Editor.bin'); dataQueue.setFromSettings(true); yield* docsCoServer.addTask(dataQueue, constants.QUEUE_PRIORITY_HIGH); } } function* commandSave(cmd, outputData) { var completeParts = yield* saveParts(cmd); if (completeParts) { var queueData = getSaveTask(cmd); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); } outputData.setStatus('ok'); outputData.setData(cmd.getSaveKey()); } function* commandSendMailMerge(cmd, outputData) { var completeParts = yield* saveParts(cmd); var isErr = false; if (completeParts) { isErr = true; var getRes = yield* docsCoServer.getCallback(cmd.getDocId()); if (getRes) { var mailMergeSend = cmd.getMailMergeSend(); mailMergeSend.setUrl(getRes.server.href); mailMergeSend.setBaseUrl(getRes.baseUrl); //меняем JsonKey и SaveKey, новый key нужет потому что за одну конвертацию делается часть, а json нужен всегда mailMergeSend.setJsonKey(cmd.getSaveKey()); yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); isErr = false; } } if (isErr) { outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); } else { outputData.setStatus('ok'); outputData.setData(cmd.getSaveKey()); } } function* commandSfctByCmd(cmd) { yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); queueData.setFromChanges(true); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); } function* commandSfct(cmd, outputData) { yield* commandSfctByCmd(cmd); outputData.setStatus('ok'); } function isDisplayedImage(strName) { var res = 0; if (strName) { //шаблон display[N]image.ext var findStr = constants.DISPLAY_PREFIX; var index = strName.indexOf(findStr); if (-1 != index) { if (index + findStr.length < strName.length) { var displayN = parseInt(strName[index + findStr.length]); if (1 <= displayN && displayN <= 6) { var imageIndex = index + findStr.length + 1; if (imageIndex == strName.indexOf("image", imageIndex)) res = displayN; } } } } return res; } function* commandImgurls(conn, cmd, outputData) { var supportedFormats; var urls; var errorCode = constants.NO_ERROR; var isImgUrl = 'imgurl' == cmd.getCommand(); if (isImgUrl) { urls = [cmd.getData()]; supportedFormats = cfgTypesUpload || 'jpg'; } else { urls = cmd.getData(); supportedFormats = cfgTypesCopy || 'jpg'; } //todo Promise.all() var displayedImageMap = {};//to make one imageIndex for ole object urls var imageCount = 0; var outputUrls = []; for (var i = 0; i < urls.length; ++i) { var urlSource = urls[i]; var urlParsed; var data = undefined; if (urlSource.startsWith('data:')) { var delimiterIndex = urlSource.indexOf(','); if (-1 != delimiterIndex && (urlSource.length - (delimiterIndex + 1)) * 0.75 <= cfgImageSize) { data = new Buffer(urlSource.substring(delimiterIndex + 1), 'base64'); } } else if(urlSource) { //todo stream data = yield utils.downloadUrlPromise(urlSource, cfgImageDownloadTimeout * 1000, cfgImageSize); urlParsed = urlModule.parse(urlSource); } var outputUrl = {url: 'error', path: 'error'}; if (data) { var format = formatChecker.getImageFormat(data); var formatStr; if (constants.AVS_OFFICESTUDIO_FILE_UNKNOWN == format && urlParsed) { //bin, txt occur in ole object case var ext = pathModule.extname(urlParsed.pathname); if ('.bin' == ext || '.txt' == ext) { formatStr = ext.substring(1); } } else { formatStr = formatChecker.getStringFromFormat(format); } if (formatStr && -1 !== supportedFormats.indexOf(formatStr)) { var userid = cmd.getUserId(); var imageIndex = cmd.getSaveIndex() + imageCount; imageCount++; var strLocalPath = 'media/' + utils.crc32(userid).toString(16) + '_'; if (urlParsed) { var urlBasename = pathModule.basename(urlParsed.pathname); var displayN = isDisplayedImage(urlBasename); if (displayN > 0) { var displayedImageName = urlBasename.substring(0, urlBasename.length - formatStr.length - 1); var tempIndex = displayedImageMap[displayedImageName]; if (null != tempIndex) { imageIndex = tempIndex; imageCount--; } else { displayedImageMap[displayedImageName] = imageIndex; } strLocalPath += constants.DISPLAY_PREFIX + displayN; } } strLocalPath += 'image' + imageIndex + '.' + formatStr; var strPath = cmd.getDocId() + '/' + strLocalPath; yield storage.putObject(strPath, data, data.length); var imgUrl = yield storage.getSignedUrl(conn.baseUrl, strPath); outputUrl = {url: imgUrl, path: strLocalPath}; } } if (isImgUrl && ('error' === outputUrl.url || 'error' === outputUrl.path)) { errorCode = constants.UPLOAD_EXTENSION; break; } outputUrls.push(outputUrl); } if (constants.NO_ERROR !== errorCode) { outputData.setStatus('err'); outputData.setData(errorCode); } else { outputData.setStatus('ok'); outputData.setData(outputUrls); } } function* commandSaveFromOrigin(cmd, outputData) { yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); queueData.setFromOrigin(true); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); outputData.setStatus('ok'); outputData.setData(cmd.getSaveKey()); } function* commandSfcCallback(cmd, isSfcm) { var docId = cmd.getDocId(); logger.debug('Start commandSfcCallback: docId = %s', docId); var saveKey = cmd.getSaveKey(); var statusInfo = cmd.getStatusInfo(); var isError = constants.NO_ERROR != statusInfo && constants.CONVERT_CORRUPTED != statusInfo; var savePathDoc = saveKey + '/' + cmd.getOutputPath(); var savePathChanges = saveKey + '/changes.zip'; var savePathHistory = saveKey + '/changesHistory.json'; var getRes = yield* docsCoServer.getCallback(docId); if (getRes) { logger.debug('Callback commandSfcCallback: docId = %s callback = %s', docId, getRes.server.href); var outputSfc = new commonDefines.OutputSfcData(); outputSfc.setKey(docId); if (cmd.getUserId()) { outputSfc.setUsers([cmd.getUserId()]); } if (isSfcm) { outputSfc.setActions(undefined); outputSfc.setUserData(cmd.getUserData()); } else { //use UserId case UserActionId miss in gc convertion var userActionId = cmd.getUserActionId() || cmd.getUserId(); if (userActionId) { outputSfc.setActions([new commonDefines.OutputAction(commonDefines.c_oAscUserAction.Out, userActionId)]); } } if (!isError) { try { var data = yield storage.getObject(savePathHistory); outputSfc.setChangeHistory(data.toString('utf-8')); outputSfc.setUrl(yield storage.getSignedUrl(getRes.baseUrl, savePathDoc)); outputSfc.setChangeUrl(yield storage.getSignedUrl(getRes.baseUrl, savePathChanges)); } catch (e) { logger.error('Error commandSfcCallback: docId = %s\r\n%s', docId, e.stack); } if (outputSfc.getUrl() && outputSfc.getUsers().length > 0) { if (isSfcm) { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.MustSaveForce); } else { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.MustSave); } } else { isError = true; } } if (isError) { if (isSfcm) { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.CorruptedForce); } else { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.Corrupted); } } var uri = getRes.server.href; var postData = JSON.stringify(outputSfc); if (isSfcm) { var lastSave = cmd.getLastSave(); if (lastSave && !isError) { yield* docsCoServer.setForceSave(docId, lastSave, savePathDoc); } try { yield* docsCoServer.sendServerRequest(docId, uri, postData); } catch (err) { logger.error('sendServerRequest error: docId = %s;url = %s;data = %s\r\n%s', docId, uri, postData, err.stack); } } else { //if anybody in document stop save var hasEditors = yield* docsCoServer.hasEditors(docId); logger.debug('hasEditors commandSfcCallback: docId = %s hasEditors = %d', docId, hasEditors); if (!hasEditors) { var updateMask = new taskResult.TaskResultData(); updateMask.key = docId; updateMask.status = taskResult.FileStatus.SaveVersion; updateMask.statusInfo = cmd.getData(); var updateIfTask = new taskResult.TaskResultData(); updateIfTask.status = taskResult.FileStatus.UpdateVersion; updateIfTask.statusInfo = constants.NO_ERROR; var updateIfRes = yield taskResult.updateIf(updateIfTask, updateMask); if (updateIfRes.affectedRows > 0) { var replyStr = null; try { replyStr = yield* docsCoServer.sendServerRequest(docId, uri, postData); } catch (err) { replyStr = null; logger.error('sendServerRequest error: docId = %s;url = %s;data = %s\r\n%s', docId, uri, postData, err.stack); } var requestRes = false; var replyData = docsCoServer.parseReplyData(docId, replyStr); if (replyData && commonDefines.c_oAscServerCommandErrors.NoError == replyData.error) { //в случае comunity server придет запрос в CommandService проверяем результат var multi = redisClient.multi([ ['get', redisKeySaved + docId], ['del', redisKeySaved + docId] ]); var execRes = yield utils.promiseRedis(multi, multi.exec); var savedVal = execRes[0]; requestRes = (null == savedVal || '1' === savedVal); } if (requestRes) { yield docsCoServer.cleanDocumentOnExitPromise(docId, true, true); } else { var updateTask = new taskResult.TaskResultData(); updateTask.key = docId; updateTask.status = taskResult.FileStatus.Ok; updateTask.statusInfo = constants.NO_ERROR; yield taskResult.update(updateTask); } } } } } logger.debug('End commandSfcCallback: docId = %s', docId); } function* commandSendMMCallback(cmd) { var docId = cmd.getDocId(); logger.debug('Start commandSendMMCallback: docId = %s', docId); var saveKey = cmd.getSaveKey(); var statusInfo = cmd.getStatusInfo(); var outputSfc = new commonDefines.OutputSfcData(); outputSfc.setKey(docId); if (constants.NO_ERROR == statusInfo) { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.MailMerge); } else { outputSfc.setStatus(docsCoServer.c_oAscServerStatus.Corrupted); } var mailMergeSendData = cmd.getMailMergeSend(); var outputMailMerge = new commonDefines.OutputMailMerge(mailMergeSendData); outputSfc.setMailMerge(outputMailMerge); outputSfc.setUsers([mailMergeSendData.getUserId()]); var data = yield storage.getObject(saveKey + '/' + cmd.getOutputPath()); var xml = data.toString('utf8'); var files = xml.match(/[< ]file.*?\/>/g); var recordRemain = (mailMergeSendData.getRecordTo() - mailMergeSendData.getRecordFrom() + 1); var recordIndexStart = mailMergeSendData.getRecordCount() - recordRemain; for (var i = 0; i < files.length; ++i) { var file = files[i]; var fieldRes = /field=["'](.*?)["']/.exec(file); outputMailMerge.setTo(fieldRes[1]); outputMailMerge.setRecordIndex(recordIndexStart + i); var pathRes = /path=["'](.*?)["']/.exec(file); var signedUrl = yield storage.getSignedUrl(mailMergeSendData.getBaseUrl(), saveKey + '/' + pathRes[1]); outputSfc.setUrl(signedUrl); var uri = mailMergeSendData.getUrl(); var postData = JSON.stringify(outputSfc); try { yield* docsCoServer.sendServerRequest(docId, uri, postData); } catch (err) { logger.error('sendServerRequest error: docId = %s;url = %s;data = %s\r\n%s', docId, uri, postData, err.stack); } } var newRecordFrom = mailMergeSendData.getRecordFrom() + Math.max(files.length, 1); if (newRecordFrom <= mailMergeSendData.getRecordTo()) { mailMergeSendData.setRecordFrom(newRecordFrom); yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_LOW); } else { logger.debug('End MailMerge: docId = %s', docId); } logger.debug('End commandSendMMCallback: docId = %s', docId); } exports.openDocument = function(conn, input) { return co(function* () { var outputData; var docId = conn ? conn.docId : 'null'; try { var startDate = null; if(clientStatsD) { startDate = new Date(); } var cmd = new commonDefines.InputCommand(input.message); logger.debug('Start command: docId = %s %s', docId, JSON.stringify(cmd)); outputData = new OutputData(cmd.getCommand()); switch (cmd.getCommand()) { case 'open': //yield utils.sleep(5000); yield* commandOpen(conn, cmd, outputData); break; case 'reopen': yield* commandReopen(cmd); break; case 'imgurl': case 'imgurls': yield* commandImgurls(conn, cmd, outputData); break; default: outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); break; } if(clientStatsD) { clientStatsD.timing('coauth.openDocument.' + cmd.getCommand(), new Date() - startDate); } } catch (e) { logger.error('Error openDocument: docId = %s\r\n%s', docId, e.stack); if (!outputData) { outputData = new OutputData(); } outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); } finally { if (outputData && outputData.getStatus()) { logger.debug('Response command: docId = %s %s', docId, JSON.stringify(outputData)); docsCoServer.sendData(conn, new OutputDataWrap('documentOpen', outputData)); } logger.debug('End command: docId = %s', docId); } }); }; exports.downloadAs = function(req, res) { return co(function* () { var docId = 'null'; try { var startDate = null; if(clientStatsD) { startDate = new Date(); } var strCmd = req.query['cmd']; var cmd = new commonDefines.InputCommand(JSON.parse(strCmd)); docId = cmd.getDocId(); logger.debug('Start downloadAs: docId = %s %s', docId, strCmd); cmd.setData(req.body); var outputData = new OutputData(cmd.getCommand()); switch (cmd.getCommand()) { case 'save': yield* commandSave(cmd, outputData); break; case 'savefromorigin': yield* commandSaveFromOrigin(cmd, outputData); break; case 'sendmm': yield* commandSendMailMerge(cmd, outputData); break; case 'sfct': yield* commandSfct(cmd, outputData); break; default: outputData.setStatus('err'); outputData.setData(constants.UNKNOWN); break; } var strRes = JSON.stringify(outputData); res.send(strRes); logger.debug('End downloadAs: docId = %s %s', docId, strRes); if(clientStatsD) { clientStatsD.timing('coauth.downloadAs.' + cmd.getCommand(), new Date() - startDate); } } catch (e) { logger.error('Error downloadAs: docId = %s\r\n%s', docId, e.stack); res.sendStatus(400); } }); }; exports.saveFromChanges = function(docId, statusInfo, optFormat, opt_userId, opt_queue) { return co(function* () { try { var startDate = null; if(clientStatsD) { startDate = new Date(); } logger.debug('Start saveFromChanges: docId = %s', docId); var task = new taskResult.TaskResultData(); task.key = docId; //делаем select, потому что за время timeout информация могла измениться var selectRes = yield taskResult.select(task); var row = selectRes.length > 0 ? selectRes[0] : null; if (row && row.tr_status == taskResult.FileStatus.SaveVersion && row.tr_status_info == statusInfo) { if (null == optFormat) { optFormat = constants.AVS_OFFICESTUDIO_FILE_OTHER_TEAMLAB_INNER; } var cmd = new commonDefines.InputCommand(); cmd.setCommand('sfc'); cmd.setDocId(docId); cmd.setOutputFormat(optFormat); cmd.setData(statusInfo); cmd.setUserActionId(opt_userId); yield* addRandomKeyTaskCmd(cmd); var queueData = getSaveTask(cmd); queueData.setFromChanges(true); yield* docsCoServer.addTask(queueData, constants.QUEUE_PRIORITY_NORMAL, opt_queue); logger.debug('AddTask saveFromChanges: docId = %s', docId); } else { if (row) { logger.debug('saveFromChanges status mismatch: docId = %s; row: %d; %d; expected: %d', docId, row.tr_status, row.tr_status_info, statusInfo); } } if (clientStatsD) { clientStatsD.timing('coauth.saveFromChanges', new Date() - startDate); } } catch (e) { logger.error('Error saveFromChanges: docId = %s\r\n%s', docId, e.stack); } }); }; exports.receiveTask = function(data, dataRaw) { return co(function* () { var docId = 'null'; try { var task = new commonDefines.TaskQueueData(JSON.parse(data)); if (task) { var cmd = task.getCmd(); docId = cmd.getDocId(); logger.debug('Start receiveTask: docId = %s %s', docId, data); var updateTask = getUpdateResponse(cmd); var updateRes = yield taskResult.update(updateTask); if (updateRes.affectedRows > 0) { var outputData = new OutputData(cmd.getCommand()); var command = cmd.getCommand(); var additionalOutput = {needUrlKey: null, needUrlMethod: null}; if ('open' == command || 'reopen' == command) { //yield utils.sleep(5000); yield* getOutputData(cmd, outputData, cmd.getDocId(), updateTask.status, updateTask.statusInfo, null, additionalOutput); } else if ('save' == command || 'savefromorigin' == command || 'sfct' == command) { yield* getOutputData(cmd, outputData, cmd.getSaveKey(), updateTask.status, updateTask.statusInfo, null, additionalOutput); } else if ('sfcm' == command) { yield* commandSfcCallback(cmd, true); } else if ('sfc' == command) { yield* commandSfcCallback(cmd, false); } else if ('sendmm' == command) { yield* commandSendMMCallback(cmd); } else if ('conv' == command) { //nothing } if (outputData.getStatus()) { logger.debug('Send receiveTask: docId = %s %s', docId, JSON.stringify(outputData)); var output = new OutputDataWrap('documentOpen', outputData); yield* docsCoServer.publish({ type: commonDefines.c_oPublishType.receiveTask, cmd: cmd, output: output, needUrlKey: additionalOutput.needUrlKey, needUrlMethod: additionalOutput.needUrlMethod }); } } yield* docsCoServer.removeResponse(dataRaw); logger.debug('End receiveTask: docId = %s', docId); } } catch (err) { logger.debug('Error receiveTask: docId = %s\r\n%s', docId, err.stack); } }); }; exports.commandSfctByCmd = commandSfctByCmd; exports.OutputDataWrap = OutputDataWrap; exports.OutputData = OutputData; \ No newline at end of file diff --git a/DocService/sources/checkexpire.js b/DocService/sources/checkexpire.js index b8f7c97d..5c82763f 100644 --- a/DocService/sources/checkexpire.js +++ b/DocService/sources/checkexpire.js @@ -1,14 +1,25 @@ var config = require('config').get('services.CoAuthoring'); var co = require('co'); +var cron = require('cron'); var taskResult = require('./taskresult'); +var docsCoServer = require('./DocsCoServer'); var storage = require('./../../Common/sources/storage-base'); var utils = require('./../../Common/sources/utils'); var logger = require('./../../Common/sources/logger'); +var commonDefines = require('./../../Common/sources/commondefines'); +var constants = require('./../../Common/sources/constants'); +var pubsubRedis = require('./pubsubRedis.js'); +var pubsubService = require('./' + config.get('pubsub.name')); +var queueService = require('./../../Common/sources/taskqueueRabbitMQ'); +var cfgRedisPrefix = config.get('redis.prefix'); +var cfgExpFilesCron = config.get('expire.filesCron'); +var cfgExpDocumentsCron = config.get('expire.documentsCron'); var cfgExpFiles = config.get('expire.files'); var cfgExpFilesRemovedAtOnce = config.get('expire.filesremovedatonce'); -//todo checkDocumentExpire +var redisKeyDocuments = cfgRedisPrefix + constants.REDIS_KEY_DOCUMENTS; + var checkFileExpire = function() { return co(function* () { try { @@ -21,12 +32,17 @@ var checkFileExpire = function() { expired = yield taskResult.getExpired(cfgExpFilesRemovedAtOnce, cfgExpFiles); for (var i = 0; i < expired.length; ++i) { var docId = expired[i].tr_key; - //todo drop user - var removeRes = yield taskResult.remove(docId); - //если ничего не удалилось, значит это сделал другой процесс - if (removeRes.affectedRows > 0) { - currentRemovedCount++; - yield storage.deletePath(docId); + //проверяем что никто не сидит в документе + var hvals = yield docsCoServer.getAllPresencePromise(docId); + if(0 == hvals.length){ + var removeRes = yield taskResult.remove(docId); + //если ничего не удалилось, значит это сделал другой процесс + if (removeRes.affectedRows > 0) { + currentRemovedCount++; + yield storage.deletePath(docId); + } + } else { + logger.debug('checkFileExpire expire but presence: hvals = %s; docId = %s', hvals, docId); } } removedCount += currentRemovedCount; @@ -37,4 +53,54 @@ var checkFileExpire = function() { } }); }; -exports.checkFileExpire = checkFileExpire; +var checkDocumentExpire = function() { + return co(function* () { + try { + logger.debug('checkDocumentExpire start'); + var removedCount = 0; + var startSaveCount = 0; + var redisClient = pubsubRedis.getClientRedis(); + + var pubsub = new pubsubService(); + yield pubsub.initPromise(); + //inner ping to update presence + pubsub.publish(JSON.stringify({type: commonDefines.c_oPublishType.expireDoc})); + + var now = (new Date()).getTime(); + var multi = redisClient.multi([ + ['zrangebyscore', redisKeyDocuments, 0, now], + ['zremrangebyscore', redisKeyDocuments, 0, now] + ]); + var execRes = yield utils.promiseRedis(multi, multi.exec); + var expiredKeys = execRes[0]; + if (expiredKeys.length > 0) { + var queue = new queueService(); + yield queue.initPromise(true, false, false, false); + + for (var i = 0; i < expiredKeys.length; ++i) { + var docId = expiredKeys[i]; + if (docId) { + var puckerIndex = yield docsCoServer.getChangesIndexPromise(docId); + if (puckerIndex > 0) { + yield docsCoServer.createSaveTimerPromise(docId, null, queue, true); + startSaveCount++; + } else { + yield docsCoServer.cleanDocumentOnExitNoChangesPromise(docId); + removedCount++; + } + } + } + } + + logger.debug('checkDocumentExpire end: startSaveCount = %d, removedCount = %d', startSaveCount, removedCount); + } catch (e) { + logger.error('checkDocumentExpire error:\r\n%s', e.stack); + } + }); +}; + +var documentExpireJob = new cron.CronJob(cfgExpDocumentsCron, checkDocumentExpire); +documentExpireJob.start(); + +var fileExpireJob = new cron.CronJob(cfgExpFilesCron, checkFileExpire); +fileExpireJob.start(); diff --git a/DocService/sources/pubsubRabbitMQ.js b/DocService/sources/pubsubRabbitMQ.js index bb589076..fe15af37 100644 --- a/DocService/sources/pubsubRabbitMQ.js +++ b/DocService/sources/pubsubRabbitMQ.js @@ -61,6 +61,18 @@ util.inherits(PubsubRabbitMQ, events.EventEmitter); PubsubRabbitMQ.prototype.init = function (callback) { init(this, callback); }; +PubsubRabbitMQ.prototype.initPromise = function() { + var t = this; + return new Promise(function(resolve, reject) { + init(t, function(err) { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +}; PubsubRabbitMQ.prototype.publish = function (message) { var data = new Buffer(message); if (null != this.channelPublish) { diff --git a/DocService/sources/pubsubRedis.js b/DocService/sources/pubsubRedis.js index 17d7454d..3fd04c43 100644 --- a/DocService/sources/pubsubRedis.js +++ b/DocService/sources/pubsubRedis.js @@ -3,13 +3,14 @@ var config = require('config').get('services.CoAuthoring.redis'); var events = require('events'); var util = require('util'); var logger = require('./../../Common/sources/logger'); +var constants = require('./../../Common/sources/constants'); var redis = require(config.get('name')); var cfgRedisPrefix = config.get('prefix'); var cfgRedisHost = config.get('host'); var cfgRedisPort = config.get('port'); -var channelName = cfgRedisPrefix + 'pubsub'; +var channelName = cfgRedisPrefix + constants.REDIS_KEY_PUBSUB; function createClientRedis() { var redisClient = redis.createClient(cfgRedisPort, cfgRedisHost, {}); @@ -18,6 +19,13 @@ function createClientRedis() { }); return redisClient; } +var g_redisClient = null; +function getClientRedis() { + if (!g_redisClient) { + g_redisClient = createClientRedis(); + } + return g_redisClient; +} function PubsubRedis() { this.clientPublish = null; @@ -34,9 +42,21 @@ PubsubRedis.prototype.init = function(callback) { }); callback(null); }; +PubsubRedis.prototype.initPromise = function() { + var t = this; + return new Promise(function(resolve, reject) { + t.init(function(err) { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +}; PubsubRedis.prototype.publish = function(data) { this.clientPublish.publish(channelName, data); }; module.exports = PubsubRedis; -module.exports.createClientRedis = createClientRedis; +module.exports.getClientRedis = getClientRedis;