mirror of
https://github.com/ONLYOFFICE/server.git
synced 2026-04-07 14:04:35 +08:00
[bug] Refactoring rabbitmq config; For bug 67127
This commit is contained in:
@ -114,42 +114,63 @@
|
||||
"rabbitmq": {
|
||||
"url": "amqp://guest:guest@localhost:5672",
|
||||
"socketOptions": {},
|
||||
"exchangepubsub": "ds.pubsub",
|
||||
"queuepubsubOptions": {
|
||||
"autoDelete": true,
|
||||
"exclusive": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
"exchangepubsub": {
|
||||
"name": "ds.pubsub",
|
||||
"options": {
|
||||
"durable": true
|
||||
}
|
||||
},
|
||||
"queueconverttask": "ds.converttask6",
|
||||
"queueconverttaskOptions": {
|
||||
"durable": true,
|
||||
"maxPriority": 6,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
"queuepubsub": {
|
||||
"name": "",
|
||||
"options": {
|
||||
"autoDelete": true,
|
||||
"exclusive": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
}
|
||||
}
|
||||
},
|
||||
"queueconvertresponse": "ds.convertresponse",
|
||||
"queueconvertresponseOptions": {
|
||||
"durable": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
"queueconverttask": {
|
||||
"name": "ds.converttask6",
|
||||
"options": {
|
||||
"durable": true,
|
||||
"maxPriority": 6,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
}
|
||||
}
|
||||
},
|
||||
"exchangeconvertdead": "ds.exchangeconvertdead",
|
||||
"queueconvertdead": "ds.convertdead",
|
||||
"queueconvertdeadOptions": {
|
||||
"durable": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
"queueconvertresponse": {
|
||||
"name": "ds.convertresponse",
|
||||
"options": {
|
||||
"durable": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
}
|
||||
}
|
||||
},
|
||||
"queuedelayed": "ds.delayed",
|
||||
"queuedelayedOptions": {
|
||||
"durable": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
"exchangeconvertdead": {
|
||||
"name": "ds.exchangeconvertdead",
|
||||
"options": {
|
||||
"durable": true
|
||||
}
|
||||
},
|
||||
"queueconvertdead": {
|
||||
"name": "ds.convertdead",
|
||||
"options": {
|
||||
"durable": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
}
|
||||
}
|
||||
},
|
||||
"queuedelayed": {
|
||||
"name": "ds.delayed",
|
||||
"options": {
|
||||
"durable": true,
|
||||
"arguments": {
|
||||
"x-queue-type": "classic"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@ -39,7 +39,6 @@ var utils = require('./utils');
|
||||
var constants = require('./constants');
|
||||
var rabbitMQCore = require('./rabbitMQCore');
|
||||
var activeMQCore = require('./activeMQCore');
|
||||
const logger = require('./logger');
|
||||
const commonDefines = require('./commondefines');
|
||||
const operationContext = require('./operationContext');
|
||||
|
||||
@ -48,20 +47,15 @@ const cfgQueueType = config.get('queue.type');
|
||||
var cfgVisibilityTimeout = config.get('queue.visibilityTimeout');
|
||||
var cfgQueueRetentionPeriod = config.get('queue.retentionPeriod');
|
||||
var cfgRabbitQueueConvertTask = config.get('rabbitmq.queueconverttask');
|
||||
var cfgRabbitQueueConvertTaskOptions = config.get('rabbitmq.queueconverttaskOptions');
|
||||
var cfgRabbitQueueConvertResponse = config.get('rabbitmq.queueconvertresponse');
|
||||
var cfgRabbitQueueConvertResponseOptions = config.get('rabbitmq.queueconvertresponseOptions');
|
||||
var cfgRabbitExchangeConvertDead = config.get('rabbitmq.exchangeconvertdead');
|
||||
var cfgRabbitQueueConvertDead = config.get('rabbitmq.queueconvertdead');
|
||||
var cfgRabbitQueueConvertDeadOptions = config.get('rabbitmq.queueconvertdeadOptions');
|
||||
var cfgRabbitQueueDelayed = config.get('rabbitmq.queuedelayed');
|
||||
var cfgRabbitQueueDelayedOptions = config.get('rabbitmq.queuedelayedOptions');
|
||||
var cfgActiveQueueConvertTask = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconverttask');
|
||||
var cfgActiveQueueConvertResponse = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertresponse');
|
||||
var cfgActiveQueueConvertDead = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertdead');
|
||||
var cfgActiveQueueDelayed = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queuedelayed');
|
||||
|
||||
const optionsExchnangeDead = {durable: true};
|
||||
function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, isEmitDead, isAddDelayed, callback) {
|
||||
return co(function* () {
|
||||
var e = null;
|
||||
@ -78,20 +72,20 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
|
||||
var bAssertTaskQueue = false;
|
||||
let optionsTaskQueueDefault = {
|
||||
messageTtl: cfgQueueRetentionPeriod * 1000,
|
||||
deadLetterExchange: cfgRabbitExchangeConvertDead
|
||||
deadLetterExchange: cfgRabbitExchangeConvertDead.name
|
||||
};
|
||||
let optionsTaskQueue = {...optionsTaskQueueDefault, ...cfgRabbitQueueConvertTaskOptions};
|
||||
let optionsTaskQueue = {...optionsTaskQueueDefault, ...cfgRabbitQueueConvertTask.options};
|
||||
if (isAddTask) {
|
||||
taskqueue.channelConvertTask = yield rabbitMQCore.createConfirmChannelPromise(conn);
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTask, cfgRabbitQueueConvertTask,
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTask, cfgRabbitQueueConvertTask.name,
|
||||
optionsTaskQueue);
|
||||
bAssertTaskQueue = true;
|
||||
}
|
||||
var bAssertResponseQueue = false;
|
||||
if (isAddResponse) {
|
||||
taskqueue.channelConvertResponse = yield rabbitMQCore.createConfirmChannelPromise(conn);
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponse, cfgRabbitQueueConvertResponse,
|
||||
cfgRabbitQueueConvertResponseOptions);
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponse, cfgRabbitQueueConvertResponse.name,
|
||||
cfgRabbitQueueConvertResponse.options);
|
||||
bAssertResponseQueue = true;
|
||||
}
|
||||
var optionsReceive = {noAck: false};
|
||||
@ -99,10 +93,10 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
|
||||
taskqueue.channelConvertTaskReceive = yield rabbitMQCore.createChannelPromise(conn);
|
||||
taskqueue.channelConvertTaskReceive.prefetch(1);
|
||||
if (!bAssertTaskQueue) {
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask,
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask.name,
|
||||
optionsTaskQueue);
|
||||
}
|
||||
yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask,
|
||||
yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask.name,
|
||||
function (message) {
|
||||
co(function* () {
|
||||
let ack = function() {
|
||||
@ -120,10 +114,10 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
|
||||
if (isAddResponseReceive) {
|
||||
taskqueue.channelConvertResponseReceive = yield rabbitMQCore.createChannelPromise(conn);
|
||||
if (!bAssertResponseQueue) {
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse,
|
||||
optionsResponseQueue);
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse.name,
|
||||
cfgRabbitQueueConvertResponse.options);
|
||||
}
|
||||
yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse,
|
||||
yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse.name,
|
||||
function (message) {
|
||||
if (message) {
|
||||
taskqueue.emit('response', message.content.toString(), function() {
|
||||
@ -134,19 +128,19 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
|
||||
}
|
||||
if (isAddDelayed) {
|
||||
let optionsDelayedQueueDefault = {
|
||||
deadLetterExchange: cfgRabbitExchangeConvertDead
|
||||
deadLetterExchange: cfgRabbitExchangeConvertDead.name
|
||||
};
|
||||
let optionsDelayedQueue = {...optionsDelayedQueueDefault, ...cfgRabbitQueueDelayedOptions};
|
||||
let optionsDelayedQueue = {...optionsDelayedQueueDefault, ...cfgRabbitQueueDelayed.options};
|
||||
taskqueue.channelDelayed = yield rabbitMQCore.createConfirmChannelPromise(conn);
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelDelayed, cfgRabbitQueueDelayed, optionsDelayedQueue);
|
||||
yield rabbitMQCore.assertQueuePromise(taskqueue.channelDelayed, cfgRabbitQueueDelayed.name, optionsDelayedQueue);
|
||||
}
|
||||
if (isEmitDead) {
|
||||
taskqueue.channelConvertDead = yield rabbitMQCore.createChannelPromise(conn);
|
||||
yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead, 'fanout',
|
||||
optionsExchnangeDead);
|
||||
var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead, cfgRabbitQueueConvertDeadOptions);
|
||||
yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead.name, 'fanout',
|
||||
cfgRabbitExchangeConvertDead.options);
|
||||
var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead.name, cfgRabbitQueueConvertDead.options);
|
||||
|
||||
taskqueue.channelConvertDead.bindQueue(queue, cfgRabbitExchangeConvertDead, '');
|
||||
taskqueue.channelConvertDead.bindQueue(queue, cfgRabbitExchangeConvertDead.name, '');
|
||||
yield rabbitMQCore.consumePromise(taskqueue.channelConvertDead, queue, function(message) {
|
||||
if (null != taskqueue.channelConvertDead) {
|
||||
if (message) {
|
||||
@ -370,7 +364,7 @@ function addTaskRabbit(taskqueue, content, priority, callback, opt_expiration, o
|
||||
if (undefined !== opt_headers) {
|
||||
options.headers = opt_headers;
|
||||
}
|
||||
taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask, content, options, callback);
|
||||
taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask.name, content, options, callback);
|
||||
}
|
||||
function addTaskActive(taskqueue, content, priority, callback, opt_expiration, opt_headers) {
|
||||
var msg = {durable: true, priority: priority, body: content, ttl: cfgQueueRetentionPeriod * 1000};
|
||||
@ -402,7 +396,7 @@ function addTaskString(taskqueue, task, priority, opt_expiration, opt_headers) {
|
||||
}
|
||||
function addResponseRabbit(taskqueue, content, callback) {
|
||||
var options = {persistent: true};
|
||||
taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse, content, options, callback);
|
||||
taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse.name, content, options, callback);
|
||||
}
|
||||
function addResponseActive(taskqueue, content, callback) {
|
||||
var msg = {durable: true, body: content};
|
||||
@ -419,7 +413,7 @@ function closeActive(conn) {
|
||||
}
|
||||
function addDelayedRabbit(taskqueue, content, ttl, callback) {
|
||||
var options = {persistent: true, expiration: ttl.toString()};
|
||||
taskqueue.channelDelayed.sendToQueue(cfgRabbitQueueDelayed, content, options, callback);
|
||||
taskqueue.channelDelayed.sendToQueue(cfgRabbitQueueDelayed.name, content, options, callback);
|
||||
}
|
||||
function addDelayedActive(taskqueue, content, ttl, callback) {
|
||||
var msg = {durable: true, body: content, ttl: ttl};
|
||||
@ -434,8 +428,8 @@ function healthCheckRabbit(taskqueue) {
|
||||
if (!taskqueue.channelConvertDead) {
|
||||
return false;
|
||||
}
|
||||
const exchange = yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead,
|
||||
'fanout', optionsExchnangeDead);
|
||||
const exchange = yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead.name,
|
||||
'fanout', cfgRabbitExchangeConvertDead.options);
|
||||
return !!exchange;
|
||||
});
|
||||
}
|
||||
|
||||
@ -42,11 +42,10 @@ var rabbitMQCore = require('./../../Common/sources/rabbitMQCore');
|
||||
var activeMQCore = require('./../../Common/sources/activeMQCore');
|
||||
|
||||
const cfgQueueType = config.get('queue.type');
|
||||
var cfgRabbitExchangePubSub = config.get('rabbitmq.exchangepubsub');
|
||||
const cfgRabbitQueuePubsubOptions = config.get('rabbitmq.queuepubsubOptions');
|
||||
const cfgRabbitExchangePubSub = config.get('rabbitmq.exchangepubsub');
|
||||
const cfgRabbitQueuePubsub = config.get('rabbitmq.queuepubsub');
|
||||
var cfgActiveTopicPubSub = constants.ACTIVEMQ_TOPIC_PREFIX + config.get('activemq.topicpubsub');
|
||||
|
||||
const optionsExchange = {durable: true};
|
||||
function initRabbit(pubsub, callback) {
|
||||
return co(function* () {
|
||||
var e = null;
|
||||
@ -61,12 +60,12 @@ function initRabbit(pubsub, callback) {
|
||||
});
|
||||
pubsub.connection = conn;
|
||||
pubsub.channelPublish = yield rabbitMQCore.createChannelPromise(conn);
|
||||
pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub,
|
||||
'fanout', {durable: true});
|
||||
pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub.name,
|
||||
'fanout', cfgRabbitExchangePubSub.options);
|
||||
|
||||
pubsub.channelReceive = yield rabbitMQCore.createChannelPromise(conn);
|
||||
var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, '', cfgRabbitQueuePubsubOptions);
|
||||
pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub, '');
|
||||
var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, cfgRabbitQueuePubsub.name, cfgRabbitQueuePubsub.options);
|
||||
pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub.name, '');
|
||||
yield rabbitMQCore.consumePromise(pubsub.channelReceive, queue, function (message) {
|
||||
if(null != pubsub.channelReceive){
|
||||
if (message) {
|
||||
@ -190,8 +189,8 @@ function healthCheckRabbit(pubsub) {
|
||||
if (!pubsub.channelPublish) {
|
||||
return false;
|
||||
}
|
||||
const exchange = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub,
|
||||
'fanout', optionsExchange);
|
||||
const exchange = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub.name,
|
||||
'fanout', cfgRabbitExchangePubSub.options);
|
||||
return !!exchange;
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user