From 1570547f65477e311ea8b6bd8a486e8bbb76127c Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Wed, 13 Sep 2017 17:11:10 +0300 Subject: [PATCH 1/9] store changes in file via stream --- Common/config/default.json | 4 +- Common/sources/utils.js | 15 ++++ DocService/sources/baseConnector.js | 26 ++---- FileConverter/sources/converter.js | 126 ++++++++++++++++------------ 4 files changed, 98 insertions(+), 73 deletions(-) diff --git a/Common/config/default.json b/Common/config/default.json index ee43d744..999d1f66 100644 --- a/Common/config/default.json +++ b/Common/config/default.json @@ -171,7 +171,9 @@ "presentationThemesDir": "null", "filePath": "null", "args": "", - "errorfiles": "" + "errorfiles": "", + "streamWriterBufferSize": 8388608, + "maxRequestChanges": 50000 } }, "FileStorage": { diff --git a/Common/sources/utils.js b/Common/sources/utils.js index a8b84fde..01f04dcf 100644 --- a/Common/sources/utils.js +++ b/Common/sources/utils.js @@ -426,6 +426,21 @@ function promiseCreateWriteStream(strPath, optOptions) { }); }; exports.promiseCreateWriteStream = promiseCreateWriteStream; + +function promiseWaitDrain(stream) { + return new Promise(function(resolve, reject) { + stream.once('drain', resolve); + }); +} +exports.promiseWaitDrain = promiseWaitDrain; + +function promiseWaitClose(stream) { + return new Promise(function(resolve, reject) { + stream.once('close', resolve); + }); +} +exports.promiseWaitClose = promiseWaitClose; + function promiseCreateReadStream(strPath) { return new Promise(function(resolve, reject) { var file = fs.createReadStream(strPath); diff --git a/DocService/sources/baseConnector.js b/DocService/sources/baseConnector.js index 9e6232eb..759ce847 100644 --- a/DocService/sources/baseConnector.js +++ b/DocService/sources/baseConnector.js @@ -185,11 +185,17 @@ exports.getChangesIndexPromise = function(docId) { }); }); }; -exports.getChangesPromise = function (docId, optStartIndex, optEndIndex) { +exports.getChangesPromise = function (docId, optStartIndex, optEndIndex, opt_time) { return new Promise(function(resolve, reject) { var getCondition = 'id='+baseConnector.sqlEscape(docId); - if (null != optStartIndex && null != optEndIndex) { - getCondition += ' AND change_id>=' + optStartIndex + ' AND change_id<' + optEndIndex; + if (null != optStartIndex) { + getCondition += ' AND change_id>=' + optStartIndex; + } + if (null != optEndIndex) { + getCondition += ' AND change_id<' + optEndIndex; + } + if (null != opt_time) { + getCondition += ' AND change_date<=' + baseConnector.sqlEscape(_getDateTime(opt_time)); } getCondition += ' ORDER BY change_id ASC'; getDataFromTable(c_oTableId.changes, "*", getCondition, function(error, result) { @@ -201,20 +207,6 @@ exports.getChangesPromise = function (docId, optStartIndex, optEndIndex) { }); }); }; -exports.getChanges = function (docId, opt_time, opt_index, callback) { - lockCriticalSection(docId, function () {_getChanges(docId, opt_time, opt_index, callback);}); -}; -function _getChanges (docId, opt_time, opt_index, callback) { - var getCondition = "id='" + docId + "'"; - if (null != opt_time && null != opt_index) { - getCondition += " AND change_date<=" + baseConnector.sqlEscape(_getDateTime(opt_time)); - getCondition += " AND change_id<" + baseConnector.sqlEscape(opt_index); - } - getCondition += " ORDER BY change_id ASC"; - getDataFromTable(c_oTableId.changes, "*", getCondition, - function (error, result) {unLockCriticalSection(docId); if (callback) callback(error, result);}); -} - exports.checkStatusFile = function (docId, callbackFunction) { var sqlCommand = "SELECT status FROM " + tableResult + " WHERE id='" + docId + "';"; baseConnector.sqlQuery(sqlCommand, callbackFunction); diff --git a/FileConverter/sources/converter.js b/FileConverter/sources/converter.js index 2ef5a7ab..521d6ef2 100644 --- a/FileConverter/sources/converter.js +++ b/FileConverter/sources/converter.js @@ -58,6 +58,9 @@ var cfgPresentationThemesDir = configConverter.get('presentationThemesDir'); var cfgFilePath = configConverter.get('filePath'); var cfgArgs = configConverter.get('args'); var cfgErrorFiles = configConverter.get('errorfiles'); +const cfgStreamWriterBufferSize = configConverter.get('streamWriterBufferSize'); +//cfgMaxRequestChanges was obtained as a result of the test: 84408 changes - 5,16 MB +const cfgMaxRequestChanges = configConverter.get('maxRequestChanges'); var cfgTokenEnableRequestOutbox = config.get('services.CoAuthoring.token.enable.request.outbox'); const cfgForgottenFilesName = config.get('services.CoAuthoring.server.forgottenfilesname'); @@ -224,23 +227,6 @@ function* downloadFile(docId, uri, fileFrom) { } return res; } -function promiseGetChanges(key, forceSave) { - return new Promise(function(resolve, reject) { - var time; - var index; - if (forceSave) { - time = forceSave.getTime(); - index = forceSave.getIndex(); - } - baseConnector.getChanges(key, time, index, function(err, result) { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); -} function* downloadFileFromStorage(id, strPath, dir) { var list = yield storage.listObjects(strPath); logger.debug('downloadFileFromStorage list %s (id=%s)', list.toString(), id); @@ -319,53 +305,83 @@ function* processDownloadFromStorage(dataConvert, cmd, task, tempDirs) { } } if (task.getFromChanges()) { - var changesDir = path.join(tempDirs.source, 'changes'); - fs.mkdirSync(changesDir); - var indexFile = 0; - var changesAuthor = null; - var changesHistory = { - serverVersion: commonDefines.buildVersion, - changes: [] - }; - //todo writeable stream - let changesBuffers = null; - let changes = yield promiseGetChanges(cmd.getDocId(), cmd.getForceSave()); - for (var i = 0; i < changes.length; ++i) { - var change = changes[i]; + yield* processChanges(tempDirs, cmd); + } +} + +function* processChanges(tempDirs, cmd) { + let changesDir = path.join(tempDirs.source, 'changes'); + fs.mkdirSync(changesDir); + let indexFile = 0; + let changesAuthor = null; + let changesHistory = { + serverVersion: commonDefines.buildVersion, + changes: [] + }; + let forceSave = cmd.getForceSave(); + let forceSaveTime; + let forceSaveIndex = Number.MAX_VALUE; + if (forceSave) { + forceSaveTime = forceSave.getTime(); + forceSaveIndex = forceSave.getIndex(); + } + let streamObj = yield* streamCreate(cmd.getDocId(), changesDir, indexFile++, {highWaterMark: cfgStreamWriterBufferSize}); + let curIndexStart = 0; + let curIndexEnd = Math.min(curIndexStart + cfgMaxRequestChanges, forceSaveIndex); + while (curIndexStart < curIndexEnd) { + let changes = yield baseConnector.getChangesPromise(cmd.getDocId(), curIndexStart, curIndexEnd, forceSaveTime); + for (let i = 0; i < changes.length; ++i) { + let change = changes[i]; if (null === changesAuthor || changesAuthor !== change.user_id_original) { if (null !== changesAuthor) { - changesBuffers.push(new Buffer(']', 'utf8')); - let dataZipFile = Buffer.concat(changesBuffers); - changesBuffers = null; - var fileName = 'changes' + (indexFile++) + '.json'; - var filePath = path.join(changesDir, fileName); - fs.writeFileSync(filePath, dataZipFile); + yield* streamEnd(streamObj, ']'); + streamObj = yield* streamCreate(cmd.getDocId(), changesDir, indexFile++); } changesAuthor = change.user_id_original; - var strDate = baseConnector.getDateTime(change.change_date); - changesHistory.changes.push({ - 'created': strDate, 'user': { - 'id': changesAuthor, 'name': change.user_name - } - }); - changesBuffers = []; - changesBuffers.push(new Buffer('[', 'utf8')); + let strDate = baseConnector.getDateTime(change.change_date); + changesHistory.changes.push({'created': strDate, 'user': {'id': changesAuthor, 'name': change.user_name}}); + yield* streamWrite(streamObj, '['); } else { - changesBuffers.push(new Buffer(',', 'utf8')); + yield* streamWrite(streamObj, ','); } - changesBuffers.push(new Buffer(change.change_data, 'utf8')); + yield* streamWrite(streamObj, change.change_data); + streamObj.isNoChangesInFile = false; } - if (null !== changesBuffers) { - changesBuffers.push(new Buffer(']', 'utf8')); - let dataZipFile = Buffer.concat(changesBuffers); - changesBuffers = null; - var fileName = 'changes' + (indexFile++) + '.json'; - var filePath = path.join(changesDir, fileName); - fs.writeFileSync(filePath, dataZipFile); + if (changes.length === curIndexEnd - curIndexStart) { + curIndexStart += cfgMaxRequestChanges; + curIndexEnd = Math.min(curIndexStart + cfgMaxRequestChanges, forceSaveIndex); + } else { + break; } - cmd.setUserId(changesAuthor); - fs.writeFileSync(path.join(tempDirs.result, 'changesHistory.json'), JSON.stringify(changesHistory), 'utf8'); } + yield* streamEnd(streamObj, ']'); + if (streamObj.isNoChangesInFile) { + fs.unlinkSync(streamObj.filePath); + } + cmd.setUserId(changesAuthor); + fs.writeFileSync(path.join(tempDirs.result, 'changesHistory.json'), JSON.stringify(changesHistory), 'utf8'); +} + +function* streamCreate(docId, changesDir, indexFile, opt_options) { + let fileName = 'changes' + indexFile + '.json'; + let filePath = path.join(changesDir, fileName); + let writeStream = yield utils.promiseCreateWriteStream(filePath, opt_options); + writeStream.on('error', function(err) { + //todo integrate error handle in main thread (probable: set flag here and check it in main thread) + logger.error('WriteStreamError (id=%s)\r\n%s', docId, err.stack); + }); + return {writeStream: writeStream, filePath: filePath, isNoChangesInFile: true}; +} + +function* streamWrite(streamObj, text) { + if (!streamObj.writeStream.write(text, 'utf8')) { + yield utils.promiseWaitDrain(streamObj.writeStream); + } +} + +function* streamEnd(streamObj, text) { + streamObj.writeStream.end(text, 'utf8'); + yield utils.promiseWaitClose(streamObj.writeStream); } function* processUploadToStorage(dir, storagePath) { var list = yield utils.listObjects(dir); From ec416e686737c57ede6f1e642f49970dda10736c Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Mon, 18 Sep 2017 12:29:37 +0300 Subject: [PATCH 2/9] handle dead letters --- Common/sources/constants.js | 1 + Common/sources/utils.js | 1 + DocService/sources/DocsCoServer.js | 11 +++++++---- DocService/sources/canvasservice.js | 8 ++++++-- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/Common/sources/constants.js b/Common/sources/constants.js index 6003ac53..89c1b7b7 100644 --- a/Common/sources/constants.js +++ b/Common/sources/constants.js @@ -145,6 +145,7 @@ exports.CONVERT_PARAMS = -88; exports.CONVERT_NEED_PARAMS = -89; exports.CONVERT_DRM = -90; exports.CONVERT_PASSWORD = -91; +exports.CONVERT_DEAD_LETTER = -92; exports.UPLOAD = -100; exports.UPLOAD_CONTENT_LENGTH = -101; exports.UPLOAD_EXTENSION = -102; diff --git a/Common/sources/utils.js b/Common/sources/utils.js index 01f04dcf..19a55f95 100644 --- a/Common/sources/utils.js +++ b/Common/sources/utils.js @@ -324,6 +324,7 @@ exports.mapAscServerErrorToOldError = function(error) { res = -4; break; case constants.CONVERT_TIMEOUT : + case constants.CONVERT_DEAD_LETTER : res = -2; break; case constants.CONVERT_PASSWORD : diff --git a/DocService/sources/DocsCoServer.js b/DocService/sources/DocsCoServer.js index 091667e2..b7583cad 100644 --- a/DocService/sources/DocsCoServer.js +++ b/DocService/sources/DocsCoServer.js @@ -763,14 +763,13 @@ function handleDeadLetter(data) { return co(function*() { let docId = 'null'; try { - logger.debug('handleDeadLetter start: docId = %s %s', docId, data); var isRequeued = false; let task = new commonDefines.TaskQueueData(JSON.parse(data)); if (task) { let cmd = task.getCmd(); docId = cmd.getDocId(); + logger.warn('handleDeadLetter start: docId = %s %s', docId, data); let forceSave = cmd.getForceSave(); - //todo requeue other tasks if (forceSave && commonDefines.c_oAscForceSaveTypes.Timeout == forceSave.getType()) { let lastSave = yield* getLastSave(docId); //check that there are no new changes @@ -779,11 +778,15 @@ function handleDeadLetter(data) { yield* addTask(task, constants.QUEUE_PRIORITY_VERY_LOW, undefined, FORCE_SAVE_EXPIRATION); isRequeued = true; } + } else { + //simulate error response + cmd.setStatusInfo(constants.CONVERT_DEAD_LETTER); + canvasService.receiveTask(JSON.stringify(task)) } } - logger.debug('handleDeadLetter end: docId = %s; requeue = %s', docId, isRequeued); + logger.warn('handleDeadLetter end: docId = %s; requeue = %s', docId, isRequeued); } catch (err) { - logger.debug('handleDeadLetter error: docId = %s\r\n%s', docId, err.stack); + logger.error('handleDeadLetter error: docId = %s\r\n%s', docId, err.stack); } }); } diff --git a/DocService/sources/canvasservice.js b/DocService/sources/canvasservice.js index 9b4e168f..c54d54db 100644 --- a/DocService/sources/canvasservice.js +++ b/DocService/sources/canvasservice.js @@ -249,6 +249,8 @@ function getUpdateResponse(cmd) { updateTask.status = taskResult.FileStatus.NeedParams; } else if (constants.CONVERT_DRM == statusInfo || constants.CONVERT_PASSWORD == statusInfo) { updateTask.status = taskResult.FileStatus.NeedPassword; + } else if (constants.CONVERT_DEAD_LETTER == statusInfo) { + updateTask.status = taskResult.FileStatus.ErrToReload; } else { updateTask.status = taskResult.FileStatus.Err; } @@ -933,7 +935,7 @@ exports.saveFromChanges = function(docId, statusInfo, optFormat, opt_userId, opt } }); }; -exports.receiveTask = function(data, dataRaw) { +exports.receiveTask = function(data, opt_dataRaw) { return co(function* () { var docId = 'null'; try { @@ -973,7 +975,9 @@ exports.receiveTask = function(data, dataRaw) { }); } } - yield* docsCoServer.removeResponse(dataRaw); + if (opt_dataRaw) { + yield* docsCoServer.removeResponse(opt_dataRaw); + } logger.debug('End receiveTask: docId = %s', docId); } } catch (err) { From 3f6edddebf6c9b45585ba00e6c67e306404bf861 Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Mon, 18 Sep 2017 12:52:22 +0300 Subject: [PATCH 3/9] handle redelivered messages(x-redelivered-count) --- Common/config/default.json | 3 +- Common/sources/taskqueueRabbitMQ.js | 13 ++++--- FileConverter/sources/converter.js | 54 +++++++++++++++++++++-------- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/Common/config/default.json b/Common/config/default.json index 999d1f66..2da44031 100644 --- a/Common/config/default.json +++ b/Common/config/default.json @@ -173,7 +173,8 @@ "args": "", "errorfiles": "", "streamWriterBufferSize": 8388608, - "maxRequestChanges": 50000 + "maxRequestChanges": 50000, + "maxRedeliveredCount": 2 } }, "FileStorage": { diff --git a/Common/sources/taskqueueRabbitMQ.js b/Common/sources/taskqueueRabbitMQ.js index 9d72e88e..386dbf4e 100644 --- a/Common/sources/taskqueueRabbitMQ.js +++ b/Common/sources/taskqueueRabbitMQ.js @@ -145,15 +145,18 @@ function repeat(taskqueue) { //acknowledge data after reconnect raises an exception 'PRECONDITION_FAILED - unknown delivery tag' for (var i = 0; i < taskqueue.addTaskStore.length; ++i) { var elem = taskqueue.addTaskStore[i]; - addTask(taskqueue, elem.task, elem.priority, function () {}, elem.expiration); + addTask(taskqueue, elem.task, elem.priority, function () {}, elem.expiration, elem.headers); } taskqueue.addTaskStore.length = 0; } -function addTask(taskqueue, content, priority, callback, opt_expiration) { +function addTask(taskqueue, content, priority, callback, opt_expiration, opt_headers) { var options = {persistent: true, priority: priority}; if (undefined !== opt_expiration) { options.expiration = opt_expiration.toString(); } + if (undefined !== opt_headers) { + options.headers = opt_headers; + } taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask, content, options, callback); } function addResponse(taskqueue, content, callback) { @@ -193,7 +196,7 @@ TaskQueueRabbitMQ.prototype.initPromise = function(isAddTask, isAddResponse, isA }); }); }; -TaskQueueRabbitMQ.prototype.addTask = function (task, priority, opt_expiration) { +TaskQueueRabbitMQ.prototype.addTask = function (task, priority, opt_expiration, opt_headers) { //todo confirmation mode var t = this; return new Promise(function (resolve, reject) { @@ -206,9 +209,9 @@ TaskQueueRabbitMQ.prototype.addTask = function (task, priority, opt_expiration) } else { resolve(); } - }, opt_expiration); + }, opt_expiration, opt_headers); } else { - t.addTaskStore.push({task: content, priority: priority, expiration: opt_expiration}); + t.addTaskStore.push({task: content, priority: priority, expiration: opt_expiration, headers: opt_headers}); resolve(); } }); diff --git a/FileConverter/sources/converter.js b/FileConverter/sources/converter.js index 521d6ef2..4bd11c17 100644 --- a/FileConverter/sources/converter.js +++ b/FileConverter/sources/converter.js @@ -61,6 +61,7 @@ var cfgErrorFiles = configConverter.get('errorfiles'); const cfgStreamWriterBufferSize = configConverter.get('streamWriterBufferSize'); //cfgMaxRequestChanges was obtained as a result of the test: 84408 changes - 5,16 MB const cfgMaxRequestChanges = configConverter.get('maxRequestChanges'); +const cfgMaxRedeliveredCount = configConverter.get('maxRedeliveredCount') var cfgTokenEnableRequestOutbox = config.get('services.CoAuthoring.token.enable.request.outbox'); const cfgForgottenFilesName = config.get('services.CoAuthoring.server.forgottenfilesname'); @@ -596,26 +597,49 @@ function receiveTask(data, dataRaw) { return co(function* () { var res = null; var task = null; - try { - task = new commonDefines.TaskQueueData(JSON.parse(data)); - if (task) { - res = yield* ExecuteTask(task); - } - } catch (err) { - logger.error(err); - } finally { + if (!dataRaw.fields.redelivered) { try { - if (!res && task) { - //если все упало так что даже нет res, все равно пытаемся отдать ошибку. - var cmd = task.getCmd(); + task = new commonDefines.TaskQueueData(JSON.parse(data)); + if (task) { + res = yield* ExecuteTask(task); + } + } catch (err) { + logger.error(err); + } finally { + try { + if (!res && task) { + //если все упало так что даже нет res, все равно пытаемся отдать ошибку. + var cmd = task.getCmd(); + cmd.setStatusInfo(constants.CONVERT); + res = new commonDefines.TaskQueueData(); + res.setCmd(cmd); + } + if(res) { + yield queue.addResponse(res); + } + yield queue.removeTask(dataRaw); + } catch (err) { + logger.error(err); + } + } + } else { + try { + logger.warn('receiveTask redelivered data=%j', dataRaw); + //remove current task and add new into tail of queue to remove redelivered flag + yield queue.removeTask(dataRaw); + task = new commonDefines.TaskQueueData(JSON.parse(data)); + let redeliveredCount = dataRaw.properties.headers['x-redelivered-count']; + if (!redeliveredCount || redeliveredCount < cfgMaxRedeliveredCount) { + dataRaw.properties.headers['x-redelivered-count'] = redeliveredCount ? redeliveredCount + 1 : 1; + yield queue.addTask(task, dataRaw.properties.priority, undefined, dataRaw.properties.headers); + } else { + //simulate error response + let cmd = task.getCmd(); cmd.setStatusInfo(constants.CONVERT); res = new commonDefines.TaskQueueData(); res.setCmd(cmd); - } - if(res) { yield queue.addResponse(res); } - yield queue.removeTask(dataRaw); } catch (err) { logger.error(err); } @@ -625,7 +649,7 @@ function receiveTask(data, dataRaw) { function run() { queue = new queueService(); queue.on('task', receiveTask); - queue.init(false, true, true, false, function(err) { + queue.init(true, true, true, false, function(err) { if (null != err) { logger.error('createTaskQueue error :\r\n%s', err.stack); } From ece5196f05214f5ac965b67e7b57730a9c1be0c9 Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Tue, 19 Sep 2017 13:32:13 +0300 Subject: [PATCH 4/9] add storage.uploadObject,copyObject for memory optimization --- Common/package.json | 1 + Common/sources/storage-base.js | 6 ++++++ Common/sources/storage-fs.js | 10 ++++++++++ Common/sources/storage-s3.js | 18 ++++++++++++++++++ DocService/sources/canvasservice.js | 4 +--- FileConverter/sources/converter.js | 5 ++--- 6 files changed, 38 insertions(+), 6 deletions(-) diff --git a/Common/package.json b/Common/package.json index 1751f2f1..bbc17740 100644 --- a/Common/package.json +++ b/Common/package.json @@ -12,6 +12,7 @@ "dnscache": "^1.0.1", "escape-string-regexp": "^1.0.5", "forwarded": "^0.1.0", + "fs-extra": "^4.0.2", "ipaddr.js": "^1.2.0", "jsonwebtoken": "^7.1.9", "log4js": "^0.6.38", diff --git a/Common/sources/storage-base.js b/Common/sources/storage-base.js index 64c5ef9a..60e9cf31 100644 --- a/Common/sources/storage-base.js +++ b/Common/sources/storage-base.js @@ -45,6 +45,12 @@ exports.getObject = function(strPath) { exports.putObject = function(strPath, buffer, contentLength) { return storage.putObject(getStoragePath(strPath), buffer, contentLength); }; +exports.uploadObject = function(strPath, filePath) { + return storage.uploadObject(strPath, filePath); +}; +exports.copyObject = function(sourceKey, destinationKey) { + return storage.copyObject(sourceKey, destinationKey); +}; exports.listObjects = function(strPath) { return storage.listObjects(getStoragePath(strPath)).catch(function(e) { logger.error('storage.listObjects:\r\n%s', e.stack); diff --git a/Common/sources/storage-fs.js b/Common/sources/storage-fs.js index 8403c301..515f8d00 100644 --- a/Common/sources/storage-fs.js +++ b/Common/sources/storage-fs.js @@ -33,6 +33,7 @@ 'use strict'; var fs = require('fs'); +const fse = require('fs-extra') var path = require('path'); var mkdirp = require('mkdirp'); var utils = require("./utils"); @@ -112,6 +113,15 @@ exports.putObject = function(strPath, buffer, contentLength) { }); }); }; +exports.uploadObject = function(strPath, filePath) { + let fsPath = getFilePath(strPath); + return fse.copy(filePath, fsPath); +}; +exports.copyObject = function(sourceKey, destinationKey) { + let fsPathSource = getFilePath(sourceKey); + let fsPathSestination = getFilePath(destinationKey); + return fse.copy(fsPathSource, fsPathSestination); +}; exports.listObjects = function(strPath) { return utils.listObjects(getFilePath(strPath)).then(function(values) { return values.map(function(curvalue) { diff --git a/Common/sources/storage-s3.js b/Common/sources/storage-s3.js index dfc18d5a..cb08ef3c 100644 --- a/Common/sources/storage-s3.js +++ b/Common/sources/storage-s3.js @@ -146,6 +146,24 @@ exports.putObject = function(strPath, buffer, contentLength) { }); }); }; +exports.uploadObject = function(strPath, filePath) { + return new Promise(function(resolve, reject) { + fs.readFile(filePath, (err, data) => { + if (err) { + reject(err); + } else { + resolve(data); + } + }); + }).then(function(data) { + return exports.putObject(strPath, data, data.length); + }); +}; +exports.copyObject = function(sourceKey, destinationKey) { + return exports.getObject(sourceKey).then(function(data) { + return exports.putObject(destinationKey, data, data.length); + }); +}; exports.listObjects = function(strPath) { return new Promise(function(resolve, reject) { var params = {Bucket: cfgBucketName, Prefix: getFilePath(strPath)}; diff --git a/DocService/sources/canvasservice.js b/DocService/sources/canvasservice.js index c54d54db..cfe3595d 100644 --- a/DocService/sources/canvasservice.js +++ b/DocService/sources/canvasservice.js @@ -698,10 +698,8 @@ function* commandSfcCallback(cmd, isSfcm) { } if (storeForgotten && (!isError || isErrorCorrupted)) { try { - //todo implement storage.copy - let data = yield storage.getObject(savePathDoc); let forgottenName = cfgForgottenFilesName + pathModule.extname(cmd.getOutputPath()); - yield storage.putObject(cfgForgottenFiles + '/' + docId + '/' + forgottenName, data, data.length); + yield storage.copyObject(savePathDoc, cfgForgottenFiles + '/' + docId + '/' + forgottenName); } catch (err) { logger.error('Empty storeForgotten: docId = %s\r\n%s', docId, err.stack); } diff --git a/FileConverter/sources/converter.js b/FileConverter/sources/converter.js index 4bd11c17..53ca01b1 100644 --- a/FileConverter/sources/converter.js +++ b/FileConverter/sources/converter.js @@ -396,9 +396,8 @@ function* processUploadToStorage(dir, storagePath) { } function* processUploadToStorageChunk(list, dir, storagePath) { yield Promise.all(list.map(function (curValue) { - var data = fs.readFileSync(curValue); - var localValue = storagePath + '/' + curValue.substring(dir.length + 1); - return storage.putObject(localValue, data, data.length); + let localValue = storagePath + '/' + curValue.substring(dir.length + 1); + return storage.uploadObject(localValue, curValue); })); } function writeProcessOutputToLog(docId, childRes, isDebug) { From e922794ad64ac0805e95e37bdeea94392bec9aa4 Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Wed, 20 Sep 2017 13:28:04 +0300 Subject: [PATCH 5/9] spawnSync->spawn-async to prevent RabbitMQ connection timeout --- FileConverter/package.json | 1 + FileConverter/sources/converter.js | 36 +++++++++++++++++++++--------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/FileConverter/package.json b/FileConverter/package.json index e21d1b77..a77b2a39 100644 --- a/FileConverter/package.json +++ b/FileConverter/package.json @@ -4,6 +4,7 @@ "homepage": "http://www.onlyoffice.com", "private": true, "dependencies": { + "@expo/spawn-async": "^1.3.0", "co": "^4.6.0", "config": "^1.21.0" } diff --git a/FileConverter/sources/converter.js b/FileConverter/sources/converter.js index f0a9fa18..8d658ffd 100644 --- a/FileConverter/sources/converter.js +++ b/FileConverter/sources/converter.js @@ -38,6 +38,7 @@ var url = require('url'); var childProcess = require('child_process'); var co = require('co'); var config = require('config'); +var spawnAsync = require('@expo/spawn-async'); var configConverter = config.get('FileConverter.converter'); var commonDefines = require('./../../Common/sources/commondefines'); @@ -400,14 +401,14 @@ function* processUploadToStorageChunk(list, dir, storagePath) { } function writeProcessOutputToLog(docId, childRes, isDebug) { if (childRes) { - if (childRes.stdout) { + if (undefined !== childRes.stdout) { if (isDebug) { logger.debug('stdout (id=%s):%s', docId, childRes.stdout); } else { logger.error('stdout (id=%s):%s', docId, childRes.stdout); } } - if (childRes.stderr) { + if (undefined !== childRes.stderr) { if (isDebug) { logger.debug('stderr (id=%s):%s', docId, childRes.stderr); } else { @@ -416,21 +417,17 @@ function writeProcessOutputToLog(docId, childRes, isDebug) { } } } -function* postProcess(cmd, dataConvert, tempDirs, childRes, error) { +function* postProcess(cmd, dataConvert, tempDirs, childRes, error, isTimeout) { var exitCode = 0; var exitSignal = null; - var errorCode = null; if(childRes) { exitCode = childRes.status; exitSignal = childRes.signal; - if (childRes.error) { - errorCode = childRes.error.code; - } } if (0 !== exitCode || null !== exitSignal) { if (-1 !== exitCodesReturn.indexOf(-exitCode)) { error = -exitCode; - } else if('ETIMEDOUT' === errorCode) { + } else if(isTimeout) { error = constants.CONVERT_TIMEOUT; } else { error = constants.CONVERT; @@ -548,6 +545,7 @@ function* ExecuteTask(task) { error = constants.UNKNOWN; } var childRes = null; + let isTimeout = false; if (constants.NO_ERROR === error) { if(constants.AVS_OFFICESTUDIO_FILE_OTHER_HTMLZIP === dataConvert.formatTo && cmd.getSaveKey() && !dataConvert.mailMergeSend) { //todo заглушка.вся конвертация на клиенте, но нет простого механизма сохранения на клиенте @@ -562,15 +560,31 @@ function* ExecuteTask(task) { childArgs = []; } childArgs.push(paramsFile); - var waitMS = task.getVisibilityTimeout() * 1000 - (new Date().getTime() - getTaskTime.getTime()); - childRes = childProcess.spawnSync(cfgFilePath, childArgs, {timeout: waitMS}); + let timeoutId; + try { + let spawnAsyncPromise = spawnAsync(cfgFilePath, childArgs); + childRes = spawnAsyncPromise.child; + let waitMS = task.getVisibilityTimeout() * 1000 - (new Date().getTime() - getTaskTime.getTime()); + timeoutId = setTimeout(function() { + isTimeout = true; + timeoutId = undefined; + childRes.kill(); + }, waitMS); + childRes = yield spawnAsyncPromise; + } catch (err) { + logger.error('error spawnAsync(id=%s)\r\n%s', cmd.getDocId(), err.stack); + childRes = err; + } + if (undefined !== timeoutId) { + clearTimeout(timeoutId); + } } if(clientStatsD) { clientStatsD.timing('conv.spawnSync', new Date() - curDate); curDate = new Date(); } } - resData = yield* postProcess(cmd, dataConvert, tempDirs, childRes, error); + resData = yield* postProcess(cmd, dataConvert, tempDirs, childRes, error, isTimeout); logger.debug('postProcess (id=%s)', dataConvert.key); if(clientStatsD) { clientStatsD.timing('conv.postProcess', new Date() - curDate); From 4ac6ea97accb31feb36e49ad13a35845728401ad Mon Sep 17 00:00:00 2001 From: Alexey Golubev Date: Wed, 20 Sep 2017 19:07:48 +0300 Subject: [PATCH 6/9] Delete collectlost.js --- DocService/sources/collectlost.js | 113 ------------------------------ 1 file changed, 113 deletions(-) delete mode 100644 DocService/sources/collectlost.js diff --git a/DocService/sources/collectlost.js b/DocService/sources/collectlost.js deleted file mode 100644 index c2e95a6a..00000000 --- a/DocService/sources/collectlost.js +++ /dev/null @@ -1,113 +0,0 @@ -/* - * (c) Copyright Ascensio System SIA 2010-2017 - * - * This program is a free software product. You can redistribute it and/or - * modify it under the terms of the GNU Affero General Public License (AGPL) - * version 3 as published by the Free Software Foundation. In accordance with - * Section 7(a) of the GNU AGPL its Section 15 shall be amended to the effect - * that Ascensio System SIA expressly excludes the warranty of non-infringement - * of any third-party rights. - * - * This program is distributed WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For - * details, see the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html - * - * You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, - * EU, LV-1021. - * - * The interactive user interfaces in modified source and object code versions - * of the Program must display Appropriate Legal Notices, as required under - * Section 5 of the GNU AGPL version 3. - * - * Pursuant to Section 7(b) of the License you must retain the original Product - * logo when distributing the program. Pursuant to Section 7(e) we decline to - * grant you any rights under trademark law for use of our trademarks. - * - * All the Product's GUI elements, including illustrations and icon sets, as - * well as technical writing content are licensed under the terms of the - * Creative Commons Attribution-ShareAlike 4.0 International. See the License - * terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode - * - */ - -'use strict'; -const config = require('config'); -const co = require('co'); -const logger = require('./../../Common/sources/logger'); -const sqlBase = require('./baseConnector'); -const converterService = require('./converterservice'); -const pubsubRedis = require('./pubsubRedis.js'); -const queueService = require('./../../Common/sources/taskqueueRabbitMQ'); -const constants = require('./../../Common/sources/constants'); -const utils = require('./../../Common/sources/utils'); - -const cfgRedisPrefix = config.get('services.CoAuthoring.redis.prefix'); -const redisKeyCollectLost = cfgRedisPrefix + constants.REDIS_KEY_COLLECT_LOST; - -const LOOP_TIMEOUT = 1000; -const EXEC_TIMEOUT = utils.CONVERTION_TIMEOUT; - -(function collectlost() { - return co(function*() { - let exitCode = 0; - let queue = null; - try { - logger.debug('collectlost start'); - - var redisClient = pubsubRedis.getClientRedis(); - - queue = new queueService(); - yield queue.initPromise(true, false, false, false); - - //collect documents without callback url - const selectRes = yield sqlBase.getEmptyCallbacks(); - - let docIds = []; - for (let i = 0; i < selectRes.length; ++i) { - docIds.push(selectRes[i].id); - } - logger.debug('collectlost docIds:%j', docIds); - if (docIds.length > 0) { - let multi = redisClient.multi([['sadd', redisKeyCollectLost].concat(docIds)]); - yield utils.promiseRedis(multi, multi.exec); - } - for (let i = 0; i < docIds.length; ++i) { - let docId = docIds[i]; - yield* converterService.convertFromChanges(docId, undefined, false, undefined, undefined, undefined, undefined, - queue, redisKeyCollectLost); - } - - logger.debug('collectlost start wait'); - const startTime = new Date().getTime(); - while (true) { - let curTime = new Date().getTime() - startTime; - if (curTime >= EXEC_TIMEOUT) { - exitCode = 1; - logger.debug('collectlost timeout'); - break; - } - const remainingFiles = yield utils.promiseRedis(redisClient, redisClient.scard, redisKeyCollectLost); - logger.debug('collectlost remaining files:%d', remainingFiles); - if (remainingFiles <= 0) { - break; - } - yield utils.sleep(LOOP_TIMEOUT); - } - //clean up - yield utils.promiseRedis(redisClient, redisClient.del, redisKeyCollectLost); - - logger.debug('collectlost end'); - } catch (e) { - logger.error('collectlost error:\r\n%s', e.stack); - } finally { - try { - if (queue) { - yield queue.close(); - } - } catch (e) { - logger.error('collectlost error:\r\n%s', e.stack); - } - process.exit(exitCode); - } - }); -})(); From b96ef25748999ec4034cf626da366a20421e7c96 Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Wed, 20 Sep 2017 19:40:22 +0300 Subject: [PATCH 7/9] store connection in case of restore error --- DocService/sources/DocsCoServer.js | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/DocService/sources/DocsCoServer.js b/DocService/sources/DocsCoServer.js index ee5be763..0c559c67 100644 --- a/DocService/sources/DocsCoServer.js +++ b/DocService/sources/DocsCoServer.js @@ -1459,6 +1459,19 @@ exports.install = function(server, callbackFunction) { sendData(conn, {type: 'error', description: errorId}); } + function* sendFileErrorAuth(conn, sessionId, errorId) { + conn.sessionId = sessionId;//restore old + //Kill previous connections + connections = _.reject(connections, function(el) { + return el.sessionId === sessionId;//Delete this connection + }); + // Кладем в массив, т.к. нам нужно отправлять данные для открытия/сохранения документа + connections.push(conn); + yield* updatePresence(conn.docId, conn.user.id, getConnectionInfo(conn)); + + sendFileError(conn, errorId); + } + // Пересчет только для чужих Lock при сохранении на клиенте, который добавлял/удалял строки или столбцы function _recalcLockArray(userId, _locks, oRecalcIndexColumns, oRecalcIndexRows) { if (null == _locks) { @@ -1802,6 +1815,7 @@ exports.install = function(server, callbackFunction) { // Ситуация, когда пользователь уже отключен от совместного редактирования if (bIsRestore && data.isCloseCoAuthoring) { + conn.sessionId = data.sessionId;//restore old // Удаляем предыдущие соединения connections = _.reject(connections, function(el) { return el.sessionId === data.sessionId;//Delete this connection @@ -1842,16 +1856,16 @@ exports.install = function(server, callbackFunction) { var updateIfRes = yield taskResult.updateIf(updateTask, updateMask); if (!(updateIfRes.affectedRows > 0)) { // error version - sendFileError(conn, 'Update Version error'); + yield* sendFileErrorAuth(conn, data.sessionId, 'Update Version error'); return; } } else if (taskResult.FileStatus.UpdateVersion === status) { // error version - sendFileError(conn, 'Update Version error'); + yield* sendFileErrorAuth(conn, data.sessionId, 'Update Version error'); return; } else { // Other error - sendFileError(conn, 'Other error'); + yield* sendFileErrorAuth(conn, data.sessionId, 'Other error'); return; } @@ -1873,14 +1887,14 @@ exports.install = function(server, callbackFunction) { if (arrayBlocks && (0 === arrayBlocks.length || getLockRes)) { yield* authRestore(conn, data.sessionId); } else { - sendFileError(conn, 'Restore error. Locks not checked.'); + yield* sendFileErrorAuth(conn, data.sessionId, 'Restore error. Locks not checked.'); } } else { - sendFileError(conn, 'Restore error. Document modified.'); + yield* sendFileErrorAuth(conn, data.sessionId, 'Restore error. Document modified.'); } } catch (err) { logger.error("DataBase error: docId = %s %s", docId, err.stack); - sendFileError(conn, 'DataBase error'); + yield* sendFileErrorAuth(conn, data.sessionId, 'DataBase error'); } } else { yield* authRestore(conn, data.sessionId); From d7b1b3c450670c72a7e5f8705e3b9a89418bf8ac Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Fri, 22 Sep 2017 13:33:20 +0300 Subject: [PATCH 8/9] for bug 32236. send all participants instead of user in/out --- DocService/sources/DocsCoServer.js | 35 +++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/DocService/sources/DocsCoServer.js b/DocService/sources/DocsCoServer.js index 0c559c67..3405a8d5 100644 --- a/DocService/sources/DocsCoServer.js +++ b/DocService/sources/DocsCoServer.js @@ -1204,6 +1204,7 @@ exports.install = function(server, callbackFunction) { return; } var hvals; + let participantsTimestamp; var tmpUser = conn.user; var isView = tmpUser.view; logger.info("Connection closed or timed out: userId = %s isCloseConnection = %s docId = %s", tmpUser.id, isCloseConnection, docId); @@ -1222,6 +1223,7 @@ exports.install = function(server, callbackFunction) { ['zrem', redisKeyPresenceSet + docId, tmpUser.id]]); yield utils.promiseRedis(multi, multi.exec); hvals = yield* getAllPresence(docId); + participantsTimestamp = Date.now(); if (hvals.length <= 0) { yield utils.promiseRedis(redisClient, redisClient.zrem, redisKeyDocuments, docId); } @@ -1246,7 +1248,11 @@ exports.install = function(server, callbackFunction) { //revert old view to send event var tmpView = tmpUser.view; tmpUser.view = isView; - yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, user: tmpUser, state: false}, docId, tmpUser.id); + let participants = yield* getParticipantMap(docId, undefined, undefined, hvals); + if (!participantsTimestamp) { + participantsTimestamp = Date.now(); + } + yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, userId: tmpUser.id, participantsTimestamp: participantsTimestamp, participants: participants}, docId, tmpUser.id); tmpUser.view = tmpView; // Для данного пользователя снимаем лок с сохранения @@ -1395,9 +1401,14 @@ exports.install = function(server, callbackFunction) { return userLocks; } - function* getParticipantMap(docId, opt_userId, opt_connInfo) { + function* getParticipantMap(docId, opt_userId, opt_connInfo, opt_hvals) { var participantsMap = []; - var hvals = yield* getAllPresence(docId, opt_userId, opt_connInfo); + let hvals; + if (opt_hvals) { + hvals = opt_hvals; + } else { + hvals = yield* getAllPresence(docId, opt_userId, opt_connInfo); + } for (var i = 0; i < hvals.length; ++i) { var elem = JSON.parse(hvals[i]); if (!elem.isCloseCoAuthoring) { @@ -1447,8 +1458,9 @@ exports.install = function(server, callbackFunction) { _.each(participants, function(participant) { sendData(participant, { type: "connectState", - state: data.state, - user: data.user + participantsTimestamp: data.participantsTimestamp, + participants: data.participants, + waitAuth: !!data.waitAuthUserId }); }); } @@ -1921,6 +1933,7 @@ exports.install = function(server, callbackFunction) { connections.push(conn); var firstParticipantNoView, countNoView = 0; var participantsMap = yield* getParticipantMap(docId, tmpUser.id, getConnectionInfo(conn)); + let participantsTimestamp = Date.now(); for (var i = 0; i < participantsMap.length; ++i) { var elem = participantsMap[i]; if (!elem.view) { @@ -1970,8 +1983,9 @@ exports.install = function(server, callbackFunction) { } } } - + let waitAuthUserId; if (lockDocument && !tmpUser.view) { + waitAuthUserId = lockDocument.id; // Для view не ждем снятия lock-а var sendObject = { type: "waitAuth", @@ -1986,7 +2000,7 @@ exports.install = function(server, callbackFunction) { yield* sendAuthInfo(objChangesDocument.arrChanges, objChangesDocument.getLength(), conn, participantsMap, hasForgotten); } } - yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, user: tmpUser, state: true}, docId, tmpUser.id); + yield* publish({type: commonDefines.c_oPublishType.participantsState, docId: docId, userId: tmpUser.id, participantsTimestamp: participantsTimestamp, participants: participantsMap, waitAuthUserId: waitAuthUserId}, docId, tmpUser.id); } else { sendFileError(conn, 'ip filter'); res = false; @@ -2536,8 +2550,13 @@ exports.install = function(server, callbackFunction) { }); break; case commonDefines.c_oPublishType.participantsState: - participants = getParticipants(data.docId, true, data.user.id); + participants = getParticipants(data.docId, true, data.userId); sendParticipantsState(participants, data); + //release lock if participants is empty + if (0 == participants.length && data.waitAuthUserId) { + logger.warn('pubsub participantsState participants is empty docId = %s', data.docId); + yield* checkEndAuthLock(true, false, data.docId, data.waitAuthUserId); + } break; case commonDefines.c_oPublishType.message: participants = getParticipants(data.docId, true, data.userId); From 867f2153deb2ef0a11b15ba8094dc0cd5fbdcfb4 Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Fri, 22 Sep 2017 16:56:34 +0300 Subject: [PATCH 9/9] add serverVersion param for version history --- Common/sources/commondefines.js | 8 ++++++++ DocService/sources/DocsCoServer.js | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/Common/sources/commondefines.js b/Common/sources/commondefines.js index 86edabfd..f5bdd9bc 100644 --- a/Common/sources/commondefines.js +++ b/Common/sources/commondefines.js @@ -80,6 +80,7 @@ function InputCommand(data) { this['password'] = data['password']; this['outputurls'] = data['outputurls']; this['closeonerror'] = data['closeonerror']; + this['serverVersion'] = data['serverVersion']; this['rediskey'] = data['rediskey']; this['nobase64'] = data['nobase64']; this['forgotten'] = data['forgotten']; @@ -120,6 +121,7 @@ function InputCommand(data) { this['password'] = undefined; this['outputurls'] = undefined; this['closeonerror'] = undefined; + this['serverVersion'] = undefined; this['rediskey'] = undefined; this['nobase64'] = true; this['forgotten'] = undefined; @@ -315,6 +317,12 @@ InputCommand.prototype = { setCloseOnError: function(data) { this['closeonerror'] = data; }, + getServerVersion: function() { + return this['serverVersion']; + }, + setServerVersion: function(data) { + this['serverVersion'] = data; + }, getRedisKey: function() { return this['rediskey']; }, diff --git a/DocService/sources/DocsCoServer.js b/DocService/sources/DocsCoServer.js index 3405a8d5..d2d0c709 100644 --- a/DocService/sources/DocsCoServer.js +++ b/DocService/sources/DocsCoServer.js @@ -1731,7 +1731,7 @@ exports.install = function(server, callbackFunction) { } } function fillVersionHistoryFromJwt(decoded, cmd) { - if (decoded.changesUrl && decoded.previous) { + if (decoded.changesUrl && decoded.previous && (cmd.getServerVersion() === commonDefines.buildVersion)) { if (decoded.previous.url) { cmd.setUrl(decoded.previous.url); }