[rabbitMQ] Move redelivery logic into TaskQueueRabbitMQ abstraction

This commit is contained in:
Sergey Konovalov
2018-10-18 12:44:28 +03:00
parent 4e289a99b4
commit 1797f3c839
4 changed files with 81 additions and 94 deletions

View File

@ -38,7 +38,9 @@ var co = require('co');
var utils = require('./utils');
var constants = require('./constants');
var rabbitMQCore = require('./rabbitMQCore');
const logger = require('./logger');
const cfgMaxRedeliveredCount = config.get('FileConverter.converter.maxRedeliveredCount');
var cfgVisibilityTimeout = config.get('queue.visibilityTimeout');
var cfgQueueRetentionPeriod = config.get('queue.retentionPeriod');
var cfgRabbitQueueConvertTask = config.get('rabbitmq.queueconverttask');
@ -104,9 +106,15 @@ function init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddRespon
}
yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask,
function (message) {
if (message) {
taskqueue.emit('task', message.content.toString(), message);
}
co(function* () {
let redelivered = yield* pushBackRedelivered(taskqueue, message);
if (!redelivered) {
if (message) {
taskqueue.emit('task', message.content.toString());
}
taskqueue.channelConvertTaskReceive.ack(message);
}
});
}, optionsReceive);
}
if (isAddResponseReceive) {
@ -118,8 +126,9 @@ function init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddRespon
yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse,
function (message) {
if (message) {
taskqueue.emit('response', message.content.toString(), message);
taskqueue.emit('response', message.content.toString());
}
taskqueue.channelConvertResponseReceive.ack(message);
}, optionsReceive);
}
//process messages received while reconnection time
@ -139,6 +148,28 @@ function clear(taskqueue) {
taskqueue.channelConvertResponse = null;
taskqueue.channelConvertResponseReceive = null;
}
function* pushBackRedelivered(taskqueue, message) {
if (true || message.fields.redelivered) {
try {
logger.warn('checkRedelivered redelivered data=%j', message);
//remove current task and add new into tail of queue to remove redelivered flag
taskqueue.channelConvertTaskReceive.ack(message);
let data = message.content.toString();
let redeliveredCount = message.properties.headers['x-redelivered-count'];
if (!redeliveredCount || redeliveredCount < cfgMaxRedeliveredCount) {
message.properties.headers['x-redelivered-count'] = redeliveredCount ? redeliveredCount + 1 : 1;
yield addTaskString(taskqueue, data, message.properties.priority, undefined, message.properties.headers);
} else if (taskqueue.simulateErrorResponse) {
yield taskqueue.addResponse(taskqueue.simulateErrorResponse(data));
}
} catch (err) {
logger.error('checkRedelivered error: %s', err.stack);
}
return true;
}
return false;
}
function repeat(taskqueue) {
//repeat addTask because they are lost after the reconnection
//unlike unconfirmed task will come again
@ -159,18 +190,30 @@ function addTask(taskqueue, content, priority, callback, opt_expiration, opt_hea
}
taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask, content, options, callback);
}
function addTaskString(taskqueue, task, priority, opt_expiration, opt_headers) {
//todo confirmation mode
return new Promise(function (resolve, reject) {
var content = new Buffer(task);
if (null != taskqueue.channelConvertTask) {
addTask(taskqueue, content, priority, function (err, ok) {
if (null != err) {
reject(err);
} else {
resolve();
}
}, opt_expiration, opt_headers);
} else {
taskqueue.addTaskStore.push({task: content, priority: priority, expiration: opt_expiration, headers: opt_headers});
resolve();
}
});
}
function addResponse(taskqueue, content, callback) {
var options = {persistent: true};
taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse, content, options, callback);
}
function removeTask(taskqueue, data) {
taskqueue.channelConvertTaskReceive.ack(data);
}
function removeResponse(taskqueue, data) {
taskqueue.channelConvertResponseReceive.ack(data);
}
function TaskQueueRabbitMQ() {
function TaskQueueRabbitMQ(simulateErrorResponse) {
this.isClose = false;
this.connection = null;
this.channelConvertTask = null;
@ -179,6 +222,7 @@ function TaskQueueRabbitMQ() {
this.channelConvertResponse = null;
this.channelConvertResponseReceive = null;
this.addTaskStore = [];
this.simulateErrorResponse = simulateErrorResponse;
}
util.inherits(TaskQueueRabbitMQ, events.EventEmitter);
TaskQueueRabbitMQ.prototype.init = function (isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, callback) {
@ -197,24 +241,8 @@ TaskQueueRabbitMQ.prototype.initPromise = function(isAddTask, isAddResponse, isA
});
};
TaskQueueRabbitMQ.prototype.addTask = function (task, priority, opt_expiration, opt_headers) {
//todo confirmation mode
var t = this;
return new Promise(function (resolve, reject) {
task.setVisibilityTimeout(cfgVisibilityTimeout);
var content = new Buffer(JSON.stringify(task));
if (null != t.channelConvertTask) {
addTask(t, content, priority, function (err, ok) {
if (null != err) {
reject(err);
} else {
resolve();
}
}, opt_expiration, opt_headers);
} else {
t.addTaskStore.push({task: content, priority: priority, expiration: opt_expiration, headers: opt_headers});
resolve();
}
});
task.setVisibilityTimeout(cfgVisibilityTimeout);
return addTaskString(this, JSON.stringify(task), priority, opt_expiration);
};
TaskQueueRabbitMQ.prototype.addResponse = function (task) {
var t = this;
@ -233,24 +261,6 @@ TaskQueueRabbitMQ.prototype.addResponse = function (task) {
}
});
};
TaskQueueRabbitMQ.prototype.removeTask = function (data) {
var t = this;
return new Promise(function (resolve, reject) {
if (null != t.channelConvertTaskReceive) {
removeTask(t, data);
}
resolve();
});
};
TaskQueueRabbitMQ.prototype.removeResponse = function (data) {
var t = this;
return new Promise(function (resolve, reject) {
if (null != t.channelConvertResponseReceive) {
removeResponse(t, data);
}
resolve();
});
};
TaskQueueRabbitMQ.prototype.close = function () {
var t = this;
this.isClose = true;

View File

@ -600,9 +600,6 @@ function* addTask(data, priority, opt_queue, opt_expiration) {
var realQueue = opt_queue ? opt_queue : queue;
yield realQueue.addTask(data, priority, opt_expiration);
}
function* removeResponse(data) {
yield queue.removeResponse(data);
}
function* getOriginalParticipantsId(docId) {
var result = [], tmpObject = {};
@ -1142,7 +1139,6 @@ exports.createSaveTimerPromise = co.wrap(_createSaveTimer);
exports.getAllPresencePromise = co.wrap(getAllPresence);
exports.publish = publish;
exports.addTask = addTask;
exports.removeResponse = removeResponse;
exports.hasEditors = hasEditors;
exports.getEditorsCountPromise = co.wrap(getEditorsCount);
exports.getCallback = getCallback;

View File

@ -1126,7 +1126,7 @@ exports.saveFromChanges = function(docId, statusInfo, optFormat, opt_userId, opt
}
});
};
exports.receiveTask = function(data, opt_dataRaw) {
exports.receiveTask = function(data) {
return co(function* () {
var docId = 'null';
try {
@ -1168,9 +1168,6 @@ exports.receiveTask = function(data, opt_dataRaw) {
});
}
}
if (opt_dataRaw) {
yield* docsCoServer.removeResponse(opt_dataRaw);
}
logger.debug('End receiveTask: docId = %s', docId);
}
} catch (err) {

View File

@ -70,7 +70,6 @@ var cfgInputLimits = configConverter.get('inputLimits');
const cfgStreamWriterBufferSize = configConverter.get('streamWriterBufferSize');
//cfgMaxRequestChanges was obtained as a result of the test: 84408 changes - 5,16 MB
const cfgMaxRequestChanges = config.get('services.CoAuthoring.server.maxRequestChanges');
const cfgMaxRedeliveredCount = configConverter.get('maxRedeliveredCount');
var cfgTokenEnableRequestOutbox = config.get('services.CoAuthoring.token.enable.request.outbox');
const cfgForgottenFilesName = config.get('services.CoAuthoring.server.forgottenfilesname');
@ -698,51 +697,27 @@ function* ExecuteTask(task) {
return resData;
}
function receiveTask(data, dataRaw) {
function receiveTask(data) {
return co(function* () {
var res = null;
var task = null;
if (!dataRaw.fields.redelivered) {
try {
task = new commonDefines.TaskQueueData(JSON.parse(data));
if (task) {
res = yield* ExecuteTask(task);
}
} catch (err) {
logger.error(err);
} finally {
try {
if (!res && task) {
//если все упало так что даже нет res, все равно пытаемся отдать ошибку.
var cmd = task.getCmd();
cmd.setStatusInfo(constants.CONVERT);
res = new commonDefines.TaskQueueData();
res.setCmd(cmd);
}
if(res) {
yield queue.addResponse(res);
}
yield queue.removeTask(dataRaw);
} catch (err) {
logger.error(err);
}
try {
task = new commonDefines.TaskQueueData(JSON.parse(data));
if (task) {
res = yield* ExecuteTask(task);
}
} else {
} catch (err) {
logger.error(err);
} finally {
try {
logger.warn('receiveTask redelivered data=%j', dataRaw);
//remove current task and add new into tail of queue to remove redelivered flag
yield queue.removeTask(dataRaw);
task = new commonDefines.TaskQueueData(JSON.parse(data));
let redeliveredCount = dataRaw.properties.headers['x-redelivered-count'];
if (!redeliveredCount || redeliveredCount < cfgMaxRedeliveredCount) {
dataRaw.properties.headers['x-redelivered-count'] = redeliveredCount ? redeliveredCount + 1 : 1;
yield queue.addTask(task, dataRaw.properties.priority, undefined, dataRaw.properties.headers);
} else {
//simulate error response
let cmd = task.getCmd();
if (!res && task) {
//если все упало так что даже нет res, все равно пытаемся отдать ошибку.
var cmd = task.getCmd();
cmd.setStatusInfo(constants.CONVERT);
res = new commonDefines.TaskQueueData();
res.setCmd(cmd);
}
if (res) {
yield queue.addResponse(res);
}
} catch (err) {
@ -751,8 +726,17 @@ function receiveTask(data, dataRaw) {
}
});
}
function simulateErrorResponse(data){
let task = new commonDefines.TaskQueueData(JSON.parse(data));
//simulate error response
let cmd = task.getCmd();
cmd.setStatusInfo(constants.CONVERT);
let res = new commonDefines.TaskQueueData();
res.setCmd(cmd);
return res;
}
function run() {
queue = new queueService();
queue = new queueService(simulateErrorResponse);
queue.on('task', receiveTask);
queue.init(true, true, true, false, function(err) {
if (null != err) {