From 1797f3c83929497e90c2fb978a2b9467b5b20310 Mon Sep 17 00:00:00 2001 From: Sergey Konovalov Date: Thu, 18 Oct 2018 12:44:28 +0300 Subject: [PATCH] [rabbitMQ] Move redelivery logic into TaskQueueRabbitMQ abstraction --- Common/sources/taskqueueRabbitMQ.js | 104 +++++++++++++++------------- DocService/sources/DocsCoServer.js | 4 -- DocService/sources/canvasservice.js | 5 +- FileConverter/sources/converter.js | 62 ++++++----------- 4 files changed, 81 insertions(+), 94 deletions(-) diff --git a/Common/sources/taskqueueRabbitMQ.js b/Common/sources/taskqueueRabbitMQ.js index 52b4fb96..475dd283 100644 --- a/Common/sources/taskqueueRabbitMQ.js +++ b/Common/sources/taskqueueRabbitMQ.js @@ -38,7 +38,9 @@ var co = require('co'); var utils = require('./utils'); var constants = require('./constants'); var rabbitMQCore = require('./rabbitMQCore'); +const logger = require('./logger'); +const cfgMaxRedeliveredCount = config.get('FileConverter.converter.maxRedeliveredCount'); var cfgVisibilityTimeout = config.get('queue.visibilityTimeout'); var cfgQueueRetentionPeriod = config.get('queue.retentionPeriod'); var cfgRabbitQueueConvertTask = config.get('rabbitmq.queueconverttask'); @@ -104,9 +106,15 @@ function init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddRespon } yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask, function (message) { - if (message) { - taskqueue.emit('task', message.content.toString(), message); - } + co(function* () { + let redelivered = yield* pushBackRedelivered(taskqueue, message); + if (!redelivered) { + if (message) { + taskqueue.emit('task', message.content.toString()); + } + taskqueue.channelConvertTaskReceive.ack(message); + } + }); }, optionsReceive); } if (isAddResponseReceive) { @@ -118,8 +126,9 @@ function init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddRespon yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse, function (message) { if (message) { - taskqueue.emit('response', message.content.toString(), message); + taskqueue.emit('response', message.content.toString()); } + taskqueue.channelConvertResponseReceive.ack(message); }, optionsReceive); } //process messages received while reconnection time @@ -139,6 +148,28 @@ function clear(taskqueue) { taskqueue.channelConvertResponse = null; taskqueue.channelConvertResponseReceive = null; } +function* pushBackRedelivered(taskqueue, message) { + if (true || message.fields.redelivered) { + try { + logger.warn('checkRedelivered redelivered data=%j', message); + //remove current task and add new into tail of queue to remove redelivered flag + taskqueue.channelConvertTaskReceive.ack(message); + + let data = message.content.toString(); + let redeliveredCount = message.properties.headers['x-redelivered-count']; + if (!redeliveredCount || redeliveredCount < cfgMaxRedeliveredCount) { + message.properties.headers['x-redelivered-count'] = redeliveredCount ? redeliveredCount + 1 : 1; + yield addTaskString(taskqueue, data, message.properties.priority, undefined, message.properties.headers); + } else if (taskqueue.simulateErrorResponse) { + yield taskqueue.addResponse(taskqueue.simulateErrorResponse(data)); + } + } catch (err) { + logger.error('checkRedelivered error: %s', err.stack); + } + return true; + } + return false; +} function repeat(taskqueue) { //repeat addTask because they are lost after the reconnection //unlike unconfirmed task will come again @@ -159,18 +190,30 @@ function addTask(taskqueue, content, priority, callback, opt_expiration, opt_hea } taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask, content, options, callback); } +function addTaskString(taskqueue, task, priority, opt_expiration, opt_headers) { + //todo confirmation mode + return new Promise(function (resolve, reject) { + var content = new Buffer(task); + if (null != taskqueue.channelConvertTask) { + addTask(taskqueue, content, priority, function (err, ok) { + if (null != err) { + reject(err); + } else { + resolve(); + } + }, opt_expiration, opt_headers); + } else { + taskqueue.addTaskStore.push({task: content, priority: priority, expiration: opt_expiration, headers: opt_headers}); + resolve(); + } + }); +} function addResponse(taskqueue, content, callback) { var options = {persistent: true}; taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse, content, options, callback); } -function removeTask(taskqueue, data) { - taskqueue.channelConvertTaskReceive.ack(data); -} -function removeResponse(taskqueue, data) { - taskqueue.channelConvertResponseReceive.ack(data); -} -function TaskQueueRabbitMQ() { +function TaskQueueRabbitMQ(simulateErrorResponse) { this.isClose = false; this.connection = null; this.channelConvertTask = null; @@ -179,6 +222,7 @@ function TaskQueueRabbitMQ() { this.channelConvertResponse = null; this.channelConvertResponseReceive = null; this.addTaskStore = []; + this.simulateErrorResponse = simulateErrorResponse; } util.inherits(TaskQueueRabbitMQ, events.EventEmitter); TaskQueueRabbitMQ.prototype.init = function (isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, callback) { @@ -197,24 +241,8 @@ TaskQueueRabbitMQ.prototype.initPromise = function(isAddTask, isAddResponse, isA }); }; TaskQueueRabbitMQ.prototype.addTask = function (task, priority, opt_expiration, opt_headers) { - //todo confirmation mode - var t = this; - return new Promise(function (resolve, reject) { - task.setVisibilityTimeout(cfgVisibilityTimeout); - var content = new Buffer(JSON.stringify(task)); - if (null != t.channelConvertTask) { - addTask(t, content, priority, function (err, ok) { - if (null != err) { - reject(err); - } else { - resolve(); - } - }, opt_expiration, opt_headers); - } else { - t.addTaskStore.push({task: content, priority: priority, expiration: opt_expiration, headers: opt_headers}); - resolve(); - } - }); + task.setVisibilityTimeout(cfgVisibilityTimeout); + return addTaskString(this, JSON.stringify(task), priority, opt_expiration); }; TaskQueueRabbitMQ.prototype.addResponse = function (task) { var t = this; @@ -233,24 +261,6 @@ TaskQueueRabbitMQ.prototype.addResponse = function (task) { } }); }; -TaskQueueRabbitMQ.prototype.removeTask = function (data) { - var t = this; - return new Promise(function (resolve, reject) { - if (null != t.channelConvertTaskReceive) { - removeTask(t, data); - } - resolve(); - }); -}; -TaskQueueRabbitMQ.prototype.removeResponse = function (data) { - var t = this; - return new Promise(function (resolve, reject) { - if (null != t.channelConvertResponseReceive) { - removeResponse(t, data); - } - resolve(); - }); -}; TaskQueueRabbitMQ.prototype.close = function () { var t = this; this.isClose = true; diff --git a/DocService/sources/DocsCoServer.js b/DocService/sources/DocsCoServer.js index 5a6c9723..ae8e23a5 100644 --- a/DocService/sources/DocsCoServer.js +++ b/DocService/sources/DocsCoServer.js @@ -600,9 +600,6 @@ function* addTask(data, priority, opt_queue, opt_expiration) { var realQueue = opt_queue ? opt_queue : queue; yield realQueue.addTask(data, priority, opt_expiration); } -function* removeResponse(data) { - yield queue.removeResponse(data); -} function* getOriginalParticipantsId(docId) { var result = [], tmpObject = {}; @@ -1142,7 +1139,6 @@ exports.createSaveTimerPromise = co.wrap(_createSaveTimer); exports.getAllPresencePromise = co.wrap(getAllPresence); exports.publish = publish; exports.addTask = addTask; -exports.removeResponse = removeResponse; exports.hasEditors = hasEditors; exports.getEditorsCountPromise = co.wrap(getEditorsCount); exports.getCallback = getCallback; diff --git a/DocService/sources/canvasservice.js b/DocService/sources/canvasservice.js index 7ec2d689..b8ef595e 100644 --- a/DocService/sources/canvasservice.js +++ b/DocService/sources/canvasservice.js @@ -1126,7 +1126,7 @@ exports.saveFromChanges = function(docId, statusInfo, optFormat, opt_userId, opt } }); }; -exports.receiveTask = function(data, opt_dataRaw) { +exports.receiveTask = function(data) { return co(function* () { var docId = 'null'; try { @@ -1168,9 +1168,6 @@ exports.receiveTask = function(data, opt_dataRaw) { }); } } - if (opt_dataRaw) { - yield* docsCoServer.removeResponse(opt_dataRaw); - } logger.debug('End receiveTask: docId = %s', docId); } } catch (err) { diff --git a/FileConverter/sources/converter.js b/FileConverter/sources/converter.js index bd667b9c..367471d8 100644 --- a/FileConverter/sources/converter.js +++ b/FileConverter/sources/converter.js @@ -70,7 +70,6 @@ var cfgInputLimits = configConverter.get('inputLimits'); const cfgStreamWriterBufferSize = configConverter.get('streamWriterBufferSize'); //cfgMaxRequestChanges was obtained as a result of the test: 84408 changes - 5,16 MB const cfgMaxRequestChanges = config.get('services.CoAuthoring.server.maxRequestChanges'); -const cfgMaxRedeliveredCount = configConverter.get('maxRedeliveredCount'); var cfgTokenEnableRequestOutbox = config.get('services.CoAuthoring.token.enable.request.outbox'); const cfgForgottenFilesName = config.get('services.CoAuthoring.server.forgottenfilesname'); @@ -698,51 +697,27 @@ function* ExecuteTask(task) { return resData; } -function receiveTask(data, dataRaw) { +function receiveTask(data) { return co(function* () { var res = null; var task = null; - if (!dataRaw.fields.redelivered) { - try { - task = new commonDefines.TaskQueueData(JSON.parse(data)); - if (task) { - res = yield* ExecuteTask(task); - } - } catch (err) { - logger.error(err); - } finally { - try { - if (!res && task) { - //если все упало так что даже нет res, все равно пытаемся отдать ошибку. - var cmd = task.getCmd(); - cmd.setStatusInfo(constants.CONVERT); - res = new commonDefines.TaskQueueData(); - res.setCmd(cmd); - } - if(res) { - yield queue.addResponse(res); - } - yield queue.removeTask(dataRaw); - } catch (err) { - logger.error(err); - } + try { + task = new commonDefines.TaskQueueData(JSON.parse(data)); + if (task) { + res = yield* ExecuteTask(task); } - } else { + } catch (err) { + logger.error(err); + } finally { try { - logger.warn('receiveTask redelivered data=%j', dataRaw); - //remove current task and add new into tail of queue to remove redelivered flag - yield queue.removeTask(dataRaw); - task = new commonDefines.TaskQueueData(JSON.parse(data)); - let redeliveredCount = dataRaw.properties.headers['x-redelivered-count']; - if (!redeliveredCount || redeliveredCount < cfgMaxRedeliveredCount) { - dataRaw.properties.headers['x-redelivered-count'] = redeliveredCount ? redeliveredCount + 1 : 1; - yield queue.addTask(task, dataRaw.properties.priority, undefined, dataRaw.properties.headers); - } else { - //simulate error response - let cmd = task.getCmd(); + if (!res && task) { + //если все упало так что даже нет res, все равно пытаемся отдать ошибку. + var cmd = task.getCmd(); cmd.setStatusInfo(constants.CONVERT); res = new commonDefines.TaskQueueData(); res.setCmd(cmd); + } + if (res) { yield queue.addResponse(res); } } catch (err) { @@ -751,8 +726,17 @@ function receiveTask(data, dataRaw) { } }); } +function simulateErrorResponse(data){ + let task = new commonDefines.TaskQueueData(JSON.parse(data)); + //simulate error response + let cmd = task.getCmd(); + cmd.setStatusInfo(constants.CONVERT); + let res = new commonDefines.TaskQueueData(); + res.setCmd(cmd); + return res; +} function run() { - queue = new queueService(); + queue = new queueService(simulateErrorResponse); queue.on('task', receiveTask); queue.init(true, true, true, false, function(err) { if (null != err) {