From d3de2411bc0bc670c2949e675366bfbe7041e049 Mon Sep 17 00:00:00 2001 From: konovalovsergey Date: Wed, 3 Aug 2016 12:46:55 +0300 Subject: [PATCH] error PRECONDITION_FAILED - unknown delivery tag --- DocService/sources/pubsubRabbitMQ.js | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/DocService/sources/pubsubRabbitMQ.js b/DocService/sources/pubsubRabbitMQ.js index 7789ce02..4653260b 100644 --- a/DocService/sources/pubsubRabbitMQ.js +++ b/DocService/sources/pubsubRabbitMQ.js @@ -54,14 +54,16 @@ function init(pubsub, callback) { pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub, 'fanout', {durable: true}); - var channelReceive = yield rabbitMQCore.createChannelPromise(conn); - var queue = yield rabbitMQCore.assertQueuePromise(channelReceive, '', {autoDelete: true, exclusive: true}); - channelReceive.bindQueue(queue, cfgRabbitExchangePubSub, ''); - yield rabbitMQCore.consumePromise(channelReceive, queue, function (message) { - if (message) { - pubsub.emit('message', message.content.toString()); + pubsub.channelReceive = yield rabbitMQCore.createChannelPromise(conn); + var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, '', {autoDelete: true, exclusive: true}); + pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub, ''); + yield rabbitMQCore.consumePromise(pubsub.channelReceive, queue, function (message) { + if(null != pubsub.channelReceive){ + if (message) { + pubsub.emit('message', message.content.toString()); + } + pubsub.channelReceive.ack(message); } - channelReceive.ack(message); }, {noAck: false}); //process messages received while reconnection time repeat(pubsub); @@ -76,6 +78,7 @@ function init(pubsub, callback) { function clear(pubsub) { pubsub.channelPublish = null; pubsub.exchangePublish = null; + pubsub.channelReceive = null; } function repeat(pubsub) { for (var i = 0; i < pubsub.publishStore.length; ++i) { @@ -92,6 +95,7 @@ function PubsubRabbitMQ() { this.connection = null; this.channelPublish = null; this.exchangePublish = null; + this.channelReceive = null; this.publishStore = []; } util.inherits(PubsubRabbitMQ, events.EventEmitter);