diff --git a/Common/sources/constants.js b/Common/sources/constants.js index c754e0e8..e7be6d0e 100644 --- a/Common/sources/constants.js +++ b/Common/sources/constants.js @@ -227,6 +227,10 @@ exports.REDIS_KEY_COLLECT_LOST = 'collectlost'; exports.REDIS_KEY_LICENSE = 'license'; exports.REDIS_KEY_LICENSE_T = 'licenseT'; exports.REDIS_KEY_EDITOR_CONNECTIONS = 'editorconnections'; +exports.REDIS_KEY_SHARD_CONNECTIONS_EDIT_ZSET = 'shardconnections:edit:zset'; +exports.REDIS_KEY_SHARD_CONNECTIONS_EDIT_HASH = 'shardconnections:edit:hash'; +exports.REDIS_KEY_SHARD_CONNECTIONS_VIEW_ZSET = 'shardconnections:view:zset'; +exports.REDIS_KEY_SHARD_CONNECTIONS_VIEW_HASH = 'shardconnections:view:hash'; exports.SHUTDOWN_CODE = 4001; exports.SHUTDOWN_REASON = 'server shutdown'; diff --git a/DocService/sources/DocsCoServer.js b/DocService/sources/DocsCoServer.js index 7db2b2b2..e80a36b6 100644 --- a/DocService/sources/DocsCoServer.js +++ b/DocService/sources/DocsCoServer.js @@ -77,6 +77,7 @@ const _ = require('underscore'); const url = require('url'); const os = require('os'); const cluster = require('cluster'); +const crypto = require('crypto'); const cron = require('cron'); const co = require('co'); const jwt = require('jsonwebtoken'); @@ -168,6 +169,7 @@ const MIN_SAVE_EXPIRATION = 60000; const FORCE_SAVE_EXPIRATION = Math.min(Math.max(cfgForceSaveInterval, MIN_SAVE_EXPIRATION), cfgQueueRetentionPeriod * 1000); const HEALTH_CHECK_KEY_MAX = 10000; +const SHARD_ID = crypto.randomBytes(16).toString('base64');//16 as guid const PRECISION = [{name: 'hour', val: ms('1h')}, {name: 'day', val: ms('1d')}, {name: 'week', val: ms('7d')}, {name: 'month', val: ms('31d')}, @@ -414,6 +416,38 @@ CRecalcIndex.prototype = { } }; +function updatePresenceCounters(conn, val) { + return co(function* () { + if (conn.isCloseCoAuthoring || (conn.user && conn.user.view)) { + yield editorData.incrViewerConnectionsCountByShard(SHARD_ID, val); + if (clientStatsD) { + let countView = yield editorData.getViewerConnectionsCount(connections); + clientStatsD.gauge('expireDoc.connections.view', countView); + } + } else { + yield editorData.incrEditorConnectionsCountByShard(SHARD_ID, val); + if (clientStatsD) { + let countEditors = yield editorData.getEditorConnectionsCount(connections); + clientStatsD.gauge('expireDoc.connections.edit', countEditors); + } + } + }); +} +function addPresence(conn, updateCunters) { + return co(function* () { + yield editorData.addPresence(conn.docId, conn.user.id, utils.getConnectionInfoStr(conn)); + if (updateCunters) { + yield updatePresenceCounters(conn, 1); + } + }); +} +function removePresence(conn) { + return co(function* () { + yield editorData.removePresence(conn.docId, conn.user.id); + yield updatePresenceCounters(conn, -1); + }); +} + function sendData(conn, data) { conn.write(JSON.stringify(data)); const type = data ? data.type : null; @@ -1219,7 +1253,7 @@ exports.install = function(server, callbackFunction) { if (reconnected) { logger.info("reconnected: userId = %s docId = %s", tmpUser.id, docId); } else { - yield editorData.removePresence(docId, tmpUser.id); + yield removePresence(conn); hvals = yield editorData.getPresence(docId, connections); participantsTimestamp = Date.now(); if (hvals.length <= 0) { @@ -1230,7 +1264,7 @@ exports.install = function(server, callbackFunction) { if (!conn.isCloseCoAuthoring) { tmpUser.view = true; conn.isCloseCoAuthoring = true; - yield editorData.addPresence(docId, tmpUser.id, utils.getConnectionInfoStr(conn)); + yield addPresence(conn, true); if (cfgTokenEnableBrowser) { sendDataRefreshToken(conn, fillJwtByConnection(conn)); } @@ -1329,9 +1363,8 @@ exports.install = function(server, callbackFunction) { } } if (docIdOld !== docIdNew) { - var tmpUser = conn.user; //remove presence(other data was removed before in closeDocument) - yield editorData.removePresence(docIdOld, tmpUser.id); + yield removePresence(conn); var hvals = yield editorData.getPresence(docIdOld, connections); if (hvals.length <= 0) { yield editorData.removePresenceDocument(docIdOld); @@ -1339,7 +1372,7 @@ exports.install = function(server, callbackFunction) { //apply new conn.docId = docIdNew; - yield editorData.addPresence(docIdNew, tmpUser.id, utils.getConnectionInfoStr(conn)); + yield addPresence(conn, true); if (cfgTokenEnableBrowser) { sendDataRefreshToken(conn, fillJwtByConnection(conn)); } @@ -1509,7 +1542,7 @@ exports.install = function(server, callbackFunction) { if (constants.CONN_CLOSED !== conn.readyState) { // Кладем в массив, т.к. нам нужно отправлять данные для открытия/сохранения документа connections.push(conn); - yield editorData.addPresence(conn.docId, conn.user.id, utils.getConnectionInfoStr(conn)); + yield addPresence(conn, true); sendFileError(conn, errorId, code); } @@ -1912,7 +1945,7 @@ exports.install = function(server, callbackFunction) { if (constants.CONN_CLOSED !== conn.readyState) { // Кладем в массив, т.к. нам нужно отправлять данные для открытия/сохранения документа connections.push(conn); - yield editorData.addPresence(docId, conn.user.id, utils.getConnectionInfoStr(conn)); + yield addPresence(conn, true); // Посылаем формальную авторизацию, чтобы подтвердить соединение yield* sendAuthInfo(conn, bIsRestore, undefined); if (cmd) { @@ -2027,7 +2060,7 @@ exports.install = function(server, callbackFunction) { } connections.push(conn); let firstParticipantNoView, countNoView = 0; - yield editorData.addPresence(docId, tmpUser.id, utils.getConnectionInfoStr(conn)); + yield addPresence(conn, true); let participantsMap = yield* getParticipantMap(docId); const participantsTimestamp = Date.now(); for (let i = 0; i < participantsMap.length; ++i) { @@ -2647,7 +2680,6 @@ exports.install = function(server, callbackFunction) { licenseWarningLimit = licenseInfo.usersCount * cfgWarningLimitPercents <= execRes.length; } } else { - // Warning. Cluster version or if workers > 1 will work with increasing numbers. let connectionsCount = 0; if (constants.PACKAGE_TYPE_OS === licenseInfo.packageType && c_LR.Error === licenseType) { connectionsCount = constants.LICENSE_CONNECTIONS; @@ -2656,9 +2688,7 @@ exports.install = function(server, callbackFunction) { connectionsCount = licenseInfo.connections; } if (connectionsCount) { - const editConnectionsCount = (_.filter(connections, function (el) { - return true !== el.isCloseCoAuthoring && el.user.view !== true; - })).length; + const editConnectionsCount = yield editorData.getEditorConnectionsCount(connections); licenseType = (connectionsCount > editConnectionsCount) ? licenseType : c_LR.Connections; licenseWarningLimit = connectionsCount * cfgWarningLimitPercents <= editConnectionsCount; } @@ -2891,8 +2921,8 @@ exports.install = function(server, callbackFunction) { var cronJob = this; return co(function* () { try { - var countEdit = 0; - var countView = 0; + var countEditByShard = 0; + var countViewByShard = 0; logger.debug('expireDoc connections.length = %d', connections.length); var nowMs = new Date().getTime(); var nextMs = cronJob.nextDate(); @@ -2927,16 +2957,20 @@ exports.install = function(server, callbackFunction) { if (constants.CONN_CLOSED === conn.readyState) { logger.error('expireDoc connection closed docId = %s', conn.docId); } - yield editorData.addPresence(conn.docId, conn.user.id, utils.getConnectionInfoStr(conn)); - if (conn.user && conn.user.view) { - countView++; + yield addPresence(conn, false); + if (conn.isCloseCoAuthoring || (conn.user && conn.user.view)) { + countViewByShard++; } else { - countEdit++; + countEditByShard++; } } - yield* collectStats(countEdit, countView); + yield* collectStats(countEditByShard, countViewByShard); + yield editorData.setEditorConnectionsCountByShard(SHARD_ID, countEditByShard); + yield editorData.setViewerConnectionsCountByShard(SHARD_ID, countViewByShard); if (clientStatsD) { + let countEdit = yield editorData.getEditorConnectionsCount(connections); clientStatsD.gauge('expireDoc.connections.edit', countEdit); + let countView = yield editorData.getViewerConnectionsCount(connections); clientStatsD.gauge('expireDoc.connections.view', countView); } } catch (err) { diff --git a/DocService/sources/editorDataMemory.js b/DocService/sources/editorDataMemory.js index c7eebcc5..22b56033 100644 --- a/DocService/sources/editorDataMemory.js +++ b/DocService/sources/editorDataMemory.js @@ -252,6 +252,38 @@ EditorData.prototype.setEditorConnections = function(countEdit, countView, now, EditorData.prototype.getEditorConnections = function() { return Promise.resolve(this.stat); }; +EditorData.prototype.setEditorConnectionsCountByShard = function(shardId, count) { + return Promise.resolve(); +}; +EditorData.prototype.incrEditorConnectionsCountByShard = function(shardId, count) { + return Promise.resolve(); +}; +EditorData.prototype.getEditorConnectionsCount = function(connections) { + let count = 0; + for (let i = 0; i < connections.length; ++i) { + let conn = connections[i]; + if (!(conn.isCloseCoAuthoring || (conn.user && conn.user.view))) { + count++; + } + } + return Promise.resolve(count); +}; +EditorData.prototype.setViewerConnectionsCountByShard = function(shardId, count) { + return Promise.resolve(); +}; +EditorData.prototype.incrViewerConnectionsCountByShard = function(shardId, count) { + return Promise.resolve(); +}; +EditorData.prototype.getViewerConnectionsCount = function(connections) { + let count = 0; + for (let i = 0; i < connections.length; ++i) { + let conn = connections[i]; + if (conn.isCloseCoAuthoring || (conn.user && conn.user.view)) { + count++; + } + } + return Promise.resolve(count); +}; EditorData.prototype.addShutdown = function(key, docId) { if (!this.shutdown[key]) {