Acknowledge broker message after processing(was broken after 1797f3c839)
This commit is contained in:
Sergey Konovalov
2019-08-03 13:17:59 +03:00
parent 3163f65acd
commit 08a12e293d
4 changed files with 48 additions and 32 deletions

View File

@ -99,12 +99,14 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask,
function (message) {
co(function* () {
let redelivered = yield* pushBackRedeliveredRabbit(taskqueue, message);
let ack = function() {
taskqueue.channelConvertTaskReceive.ack(message);
};
let redelivered = yield* pushBackRedeliveredRabbit(taskqueue, message, ack);
if (!redelivered) {
if (message) {
taskqueue.emit('task', message.content.toString());
taskqueue.emit('task', message.content.toString(), ack);
}
taskqueue.channelConvertTaskReceive.ack(message);
}
});
}, optionsReceive);
@ -118,9 +120,10 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse,
function (message) {
if (message) {
taskqueue.emit('response', message.content.toString());
taskqueue.emit('response', message.content.toString(), function() {
taskqueue.channelConvertResponseReceive.ack(message);
});
}
taskqueue.channelConvertResponseReceive.ack(message);
}, optionsReceive);
}
if (isAddDelayed) {
@ -142,9 +145,10 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
yield rabbitMQCore.consumePromise(taskqueue.channelConvertDead, queue, function(message) {
if (null != taskqueue.channelConvertDead) {
if (message) {
taskqueue.emit('dead', message.content.toString());
taskqueue.emit('dead', message.content.toString(), function() {
taskqueue.channelConvertDead.ack(message);
});
}
taskqueue.channelConvertDead.ack(message);
}
}, {noAck: false});
}
@ -182,13 +186,15 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
receiver.add_credit(1);
receiver.on("message", function(context) {
co(function*() {
let redelivered = yield* pushBackRedeliveredActive(taskqueue, context);
if (!redelivered) {
if (context) {
taskqueue.emit('task', context.message.body);
}
let ack = function() {
context.delivery.accept();
receiver.add_credit(1);
};
let redelivered = yield* pushBackRedeliveredActive(taskqueue, context, ack);
if (!redelivered) {
if (context) {
taskqueue.emit('task', context.message.body, ack);
}
}
});
});
@ -200,10 +206,11 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
receiver.add_credit(1);
receiver.on("message", function(context) {
if (context) {
taskqueue.emit('response', context.message.body);
taskqueue.emit('response', context.message.body, function() {
context.delivery.accept();
receiver.add_credit(1);
});
}
context.delivery.accept();
receiver.add_credit(1);
});
taskqueue.channelConvertResponseReceive = receiver;
}
@ -216,10 +223,11 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
receiver.add_credit(1);
receiver.on("message", function(context) {
if (context) {
taskqueue.emit('dead', context.message.body);
taskqueue.emit('dead', context.message.body, function(){
context.delivery.accept();
receiver.add_credit(1);
});
}
context.delivery.accept();
receiver.add_credit(1);
});
taskqueue.channelConvertDead = receiver;
}
@ -242,13 +250,11 @@ function clear(taskqueue) {
taskqueue.channelConvertResponseReceive = null;
taskqueue.channelDelayed = null;
}
function* pushBackRedeliveredRabbit(taskqueue, message) {
function* pushBackRedeliveredRabbit(taskqueue, message, ack) {
if (message.fields.redelivered) {
try {
logger.warn('checkRedelivered redelivered data=%j', message);
//remove current task and add new into tail of queue to remove redelivered flag
taskqueue.channelConvertTaskReceive.ack(message);
let data = message.content.toString();
let redeliveredCount = message.properties.headers['x-redelivered-count'];
if (!redeliveredCount || redeliveredCount < cfgMaxRedeliveredCount) {
@ -259,21 +265,25 @@ function* pushBackRedeliveredRabbit(taskqueue, message) {
}
} catch (err) {
logger.error('checkRedelivered error: %s', err.stack);
} finally{
ack();
}
return true;
}
return false;
}
function* pushBackRedeliveredActive(taskqueue, context) {
function* pushBackRedeliveredActive(taskqueue, context, ack) {
if (undefined !== context.message.delivery_count) {
logger.warn('checkRedelivered redelivered data=%j', context.message);
if (context.message.delivery_count > cfgMaxRedeliveredCount) {
//remove current task and add new into tail of queue to remove redelivered flag
context.delivery.accept();
taskqueue.channelConvertTaskReceive.add_credit(1);
if (taskqueue.simulateErrorResponse) {
yield taskqueue.addResponse(taskqueue.simulateErrorResponse(context.message.body));
try {
if (taskqueue.simulateErrorResponse) {
yield taskqueue.addResponse(taskqueue.simulateErrorResponse(context.message.body));
}
} catch (err) {
logger.error('checkRedelivered error: %s', err.stack);
} finally {
ack();
}
return true;
}

View File

@ -803,7 +803,7 @@ function* startForceSave(docId, type, opt_userdata, opt_userConnectionId, opt_ba
logger.debug('startForceSave end:docId = %s', docId);
return res;
}
function handleDeadLetter(data) {
function handleDeadLetter(data, ack) {
return co(function*() {
let docId = 'null';
try {
@ -828,12 +828,14 @@ function handleDeadLetter(data) {
} else {
//simulate error response
cmd.setStatusInfo(constants.CONVERT_DEAD_LETTER);
canvasService.receiveTask(JSON.stringify(task))
canvasService.receiveTask(JSON.stringify(task), function(){});
}
}
logger.warn('handleDeadLetter end: docId = %s; requeue = %s', docId, isRequeued);
} catch (err) {
logger.error('handleDeadLetter error: docId = %s\r\n%s', docId, err.stack);
} finally {
ack();
}
});
}

View File

@ -1192,7 +1192,7 @@ exports.saveFromChanges = function(docId, statusInfo, optFormat, opt_userId, opt
}
});
};
exports.receiveTask = function(data) {
exports.receiveTask = function(data, ack) {
return co(function* () {
var docId = 'null';
try {
@ -1238,6 +1238,8 @@ exports.receiveTask = function(data) {
}
} catch (err) {
logger.debug('Error receiveTask: docId = %s\r\n%s', docId, err.stack);
} finally {
ack();
}
});
};

View File

@ -709,7 +709,7 @@ function* ExecuteTask(task) {
return resData;
}
function receiveTask(data) {
function receiveTask(data, ack) {
return co(function* () {
var res = null;
var task = null;
@ -734,6 +734,8 @@ function receiveTask(data) {
}
} catch (err) {
logger.error(err);
} finally {
ack();
}
}
});