gc don't close rabbitMQ connections

This commit is contained in:
konovalovsergey
2016-04-12 15:02:03 +03:00
parent 4acadcb6d7
commit 1cb9df6f62
5 changed files with 72 additions and 17 deletions

View File

@ -50,7 +50,7 @@ var connetOptions = {
}
};
function connetPromise(closeCallack) {
function connetPromise(closeCallback) {
return new Promise(function(resolve, reject) {
function startConnect() {
amqp.connect(cfgRabbitUrl, connetOptions, function(err, conn) {
@ -61,13 +61,13 @@ function connetPromise(closeCallack) {
conn.on('error', function(err) {
logger.error('[AMQP] conn error', err.stack);
});
var closeCallback = function() {
var closeEventCallback = function() {
//in some case receive multiple close events
conn.removeListener('close', closeCallback);
console.error("[AMQP] conn close");
closeCallack();
conn.removeListener('close', closeEventCallback);
console.debug("[AMQP] conn close");
closeCallback();
};
conn.on('close', closeCallback);
conn.on('close', closeEventCallback);
logger.debug('[AMQP] connected');
resolve(conn);
}

View File

@ -40,11 +40,13 @@ function init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddRespon
return co(function* () {
var e = null;
try {
var conn = yield rabbitMQCore.connetPromise(function () {
var conn = yield rabbitMQCore.connetPromise(function() {
clear(taskqueue);
init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, null);
if (!taskqueue.isClose) {
init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, null);
}
});
taskqueue.connection = conn;
var bAssertTaskQueue = false;
var optionsTaskQueue = {
durable: true,
@ -144,6 +146,8 @@ function removeResponse(taskqueue, data) {
}
function TaskQueueRabbitMQ() {
this.isClose = false;
this.connection = null;
this.channelConvertTask = null;
this.channelConvertTaskReceive = null;
this.channelConvertResponse = null;
@ -229,5 +233,18 @@ TaskQueueRabbitMQ.prototype.removeResponse = function (data) {
resolve();
});
};
TaskQueueRabbitMQ.prototype.close = function () {
var t = this;
this.isClose = true;
return new Promise(function(resolve, reject) {
t.connection.close(function(err) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
};
module.exports = TaskQueueRabbitMQ;

View File

@ -77,13 +77,15 @@ var checkFileExpire = function() {
};
var checkDocumentExpire = function() {
return co(function* () {
var pubsub = null;
var queue = null;
var removedCount = 0;
var startSaveCount = 0;
try {
logger.debug('checkDocumentExpire start');
var removedCount = 0;
var startSaveCount = 0;
var redisClient = pubsubRedis.getClientRedis();
var pubsub = new pubsubService();
pubsub = new pubsubService();
yield pubsub.initPromise();
//inner ping to update presence
pubsub.publish(JSON.stringify({type: commonDefines.c_oPublishType.expireDoc}));
@ -96,7 +98,7 @@ var checkDocumentExpire = function() {
var execRes = yield utils.promiseRedis(multi, multi.exec);
var expiredKeys = execRes[0];
if (expiredKeys.length > 0) {
var queue = new queueService();
queue = new queueService();
yield queue.initPromise(true, false, false, false);
for (var i = 0; i < expiredKeys.length; ++i) {
@ -113,10 +115,20 @@ var checkDocumentExpire = function() {
}
}
}
logger.debug('checkDocumentExpire end: startSaveCount = %d, removedCount = %d', startSaveCount, removedCount);
} catch (e) {
logger.error('checkDocumentExpire error:\r\n%s', e.stack);
} finally {
try {
if (pubsub) {
yield pubsub.close();
}
if (queue) {
yield queue.close();
}
} catch (e) {
logger.error('checkDocumentExpire error:\r\n%s', e.stack);
}
logger.debug('checkDocumentExpire end: startSaveCount = %d, removedCount = %d', startSaveCount, removedCount);
}
});
};

View File

@ -35,10 +35,13 @@ function init(pubsub, callback) {
return co(function* () {
var e = null;
try {
var conn = yield rabbitMQCore.connetPromise(function () {
var conn = yield rabbitMQCore.connetPromise(function() {
clear(pubsub);
init(pubsub, null);
if (!pubsub.isClose) {
init(pubsub, null);
}
});
pubsub.connection = conn;
pubsub.channelPublish = yield rabbitMQCore.createChannelPromise(conn);
pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub,
'fanout', {durable: true});
@ -77,6 +80,8 @@ function publish(pubsub, data) {
}
function PubsubRabbitMQ() {
this.isClose = false;
this.connection = null;
this.channelPublish = null;
this.exchangePublish = null;
this.publishStore = [];
@ -105,5 +110,18 @@ PubsubRabbitMQ.prototype.publish = function (message) {
this.publishStore.push(data);
}
};
PubsubRabbitMQ.prototype.close = function() {
var t = this;
this.isClose = true;
return new Promise(function(resolve, reject) {
t.connection.close(function(err) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
};
module.exports = PubsubRabbitMQ;

View File

@ -81,6 +81,14 @@ PubsubRedis.prototype.initPromise = function() {
PubsubRedis.prototype.publish = function(data) {
this.clientPublish.publish(channelName, data);
};
PubsubRedis.prototype.close = function() {
var t = this;
return new Promise(function(resolve, reject) {
t.clientPublish.quit();
t.clientSubscribe.quit();
resolve();
});
};
module.exports = PubsubRedis;
module.exports.getClientRedis = getClientRedis;