mirror of
https://github.com/ONLYOFFICE/server.git
synced 2026-04-07 14:04:35 +08:00
gc don't close rabbitMQ connections
This commit is contained in:
committed by
Alexander.Trofimov
parent
6a620b368c
commit
92bc069410
@ -26,7 +26,7 @@ var connetOptions = {
|
||||
}
|
||||
};
|
||||
|
||||
function connetPromise(closeCallack) {
|
||||
function connetPromise(closeCallback) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
function startConnect() {
|
||||
amqp.connect(cfgRabbitUrl, connetOptions, function(err, conn) {
|
||||
@ -37,13 +37,13 @@ function connetPromise(closeCallack) {
|
||||
conn.on('error', function(err) {
|
||||
logger.error('[AMQP] conn error', err.stack);
|
||||
});
|
||||
var closeCallback = function() {
|
||||
var closeEventCallback = function() {
|
||||
//in some case receive multiple close events
|
||||
conn.removeListener('close', closeCallback);
|
||||
console.error("[AMQP] conn close");
|
||||
closeCallack();
|
||||
conn.removeListener('close', closeEventCallback);
|
||||
console.debug("[AMQP] conn close");
|
||||
closeCallback();
|
||||
};
|
||||
conn.on('close', closeCallback);
|
||||
conn.on('close', closeEventCallback);
|
||||
logger.debug('[AMQP] connected');
|
||||
resolve(conn);
|
||||
}
|
||||
|
||||
@ -16,11 +16,13 @@ function init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddRespon
|
||||
return co(function* () {
|
||||
var e = null;
|
||||
try {
|
||||
var conn = yield rabbitMQCore.connetPromise(function () {
|
||||
var conn = yield rabbitMQCore.connetPromise(function() {
|
||||
clear(taskqueue);
|
||||
init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, null);
|
||||
if (!taskqueue.isClose) {
|
||||
init(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, null);
|
||||
}
|
||||
});
|
||||
|
||||
taskqueue.connection = conn;
|
||||
var bAssertTaskQueue = false;
|
||||
var optionsTaskQueue = {
|
||||
durable: true,
|
||||
@ -120,6 +122,8 @@ function removeResponse(taskqueue, data) {
|
||||
}
|
||||
|
||||
function TaskQueueRabbitMQ() {
|
||||
this.isClose = false;
|
||||
this.connection = null;
|
||||
this.channelConvertTask = null;
|
||||
this.channelConvertTaskReceive = null;
|
||||
this.channelConvertResponse = null;
|
||||
@ -205,5 +209,18 @@ TaskQueueRabbitMQ.prototype.removeResponse = function (data) {
|
||||
resolve();
|
||||
});
|
||||
};
|
||||
TaskQueueRabbitMQ.prototype.close = function () {
|
||||
var t = this;
|
||||
this.isClose = true;
|
||||
return new Promise(function(resolve, reject) {
|
||||
t.connection.close(function(err) {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = TaskQueueRabbitMQ;
|
||||
|
||||
@ -53,13 +53,15 @@ var checkFileExpire = function() {
|
||||
};
|
||||
var checkDocumentExpire = function() {
|
||||
return co(function* () {
|
||||
var pubsub = null;
|
||||
var queue = null;
|
||||
var removedCount = 0;
|
||||
var startSaveCount = 0;
|
||||
try {
|
||||
logger.debug('checkDocumentExpire start');
|
||||
var removedCount = 0;
|
||||
var startSaveCount = 0;
|
||||
var redisClient = pubsubRedis.getClientRedis();
|
||||
|
||||
var pubsub = new pubsubService();
|
||||
pubsub = new pubsubService();
|
||||
yield pubsub.initPromise();
|
||||
//inner ping to update presence
|
||||
pubsub.publish(JSON.stringify({type: commonDefines.c_oPublishType.expireDoc}));
|
||||
@ -72,7 +74,7 @@ var checkDocumentExpire = function() {
|
||||
var execRes = yield utils.promiseRedis(multi, multi.exec);
|
||||
var expiredKeys = execRes[0];
|
||||
if (expiredKeys.length > 0) {
|
||||
var queue = new queueService();
|
||||
queue = new queueService();
|
||||
yield queue.initPromise(true, false, false, false);
|
||||
|
||||
for (var i = 0; i < expiredKeys.length; ++i) {
|
||||
@ -89,10 +91,20 @@ var checkDocumentExpire = function() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug('checkDocumentExpire end: startSaveCount = %d, removedCount = %d', startSaveCount, removedCount);
|
||||
} catch (e) {
|
||||
logger.error('checkDocumentExpire error:\r\n%s', e.stack);
|
||||
} finally {
|
||||
try {
|
||||
if (pubsub) {
|
||||
yield pubsub.close();
|
||||
}
|
||||
if (queue) {
|
||||
yield queue.close();
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error('checkDocumentExpire error:\r\n%s', e.stack);
|
||||
}
|
||||
logger.debug('checkDocumentExpire end: startSaveCount = %d, removedCount = %d', startSaveCount, removedCount);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
@ -11,10 +11,13 @@ function init(pubsub, callback) {
|
||||
return co(function* () {
|
||||
var e = null;
|
||||
try {
|
||||
var conn = yield rabbitMQCore.connetPromise(function () {
|
||||
var conn = yield rabbitMQCore.connetPromise(function() {
|
||||
clear(pubsub);
|
||||
init(pubsub, null);
|
||||
if (!pubsub.isClose) {
|
||||
init(pubsub, null);
|
||||
}
|
||||
});
|
||||
pubsub.connection = conn;
|
||||
pubsub.channelPublish = yield rabbitMQCore.createChannelPromise(conn);
|
||||
pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub,
|
||||
'fanout', {durable: true});
|
||||
@ -53,6 +56,8 @@ function publish(pubsub, data) {
|
||||
}
|
||||
|
||||
function PubsubRabbitMQ() {
|
||||
this.isClose = false;
|
||||
this.connection = null;
|
||||
this.channelPublish = null;
|
||||
this.exchangePublish = null;
|
||||
this.publishStore = [];
|
||||
@ -81,5 +86,18 @@ PubsubRabbitMQ.prototype.publish = function (message) {
|
||||
this.publishStore.push(data);
|
||||
}
|
||||
};
|
||||
PubsubRabbitMQ.prototype.close = function() {
|
||||
var t = this;
|
||||
this.isClose = true;
|
||||
return new Promise(function(resolve, reject) {
|
||||
t.connection.close(function(err) {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = PubsubRabbitMQ;
|
||||
|
||||
@ -57,6 +57,14 @@ PubsubRedis.prototype.initPromise = function() {
|
||||
PubsubRedis.prototype.publish = function(data) {
|
||||
this.clientPublish.publish(channelName, data);
|
||||
};
|
||||
PubsubRedis.prototype.close = function() {
|
||||
var t = this;
|
||||
return new Promise(function(resolve, reject) {
|
||||
t.clientPublish.quit();
|
||||
t.clientSubscribe.quit();
|
||||
resolve();
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = PubsubRedis;
|
||||
module.exports.getClientRedis = getClientRedis;
|
||||
|
||||
Reference in New Issue
Block a user