From 73972657bec1792751abccd0b2dbe5e3db075d62 Mon Sep 17 00:00:00 2001 From: Sergey Konovalov Date: Sun, 11 Dec 2022 16:43:37 +0300 Subject: [PATCH] [feature] Add capability option for open_sender, open_receiver calls --- Common/sources/activeMQCore.js | 10 +---- Common/sources/taskqueueRabbitMQ.js | 55 +++++++++++++++++++++++++--- DocService/sources/pubsubRabbitMQ.js | 19 +++++++++- 3 files changed, 68 insertions(+), 16 deletions(-) diff --git a/Common/sources/activeMQCore.js b/Common/sources/activeMQCore.js index 26c37475..ccbed752 100644 --- a/Common/sources/activeMQCore.js +++ b/Common/sources/activeMQCore.js @@ -75,19 +75,13 @@ function connetPromise(reconnectOnConnectionError, closeCallback) { startConnect(); }); } -function openSenderPromise(conn, name) { +function openSenderPromise(conn, options) { return new Promise(function(resolve, reject) { - let options = {target: name}; resolve(conn.open_sender(options)); }); } -function openReceiverPromise(conn, name, autoaccept) { +function openReceiverPromise(conn, options) { return new Promise(function(resolve, reject) { - let options = {source: name}; - if (!autoaccept) { - options.credit_window = 0; - options.autoaccept = false; - } resolve(conn.open_receiver(options)); }); } diff --git a/Common/sources/taskqueueRabbitMQ.js b/Common/sources/taskqueueRabbitMQ.js index 9d7091a4..2cb3b623 100644 --- a/Common/sources/taskqueueRabbitMQ.js +++ b/Common/sources/taskqueueRabbitMQ.js @@ -176,13 +176,34 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd }); taskqueue.connection = conn; if (isAddTask) { - taskqueue.channelConvertTask = yield activeMQCore.openSenderPromise(conn, cfgActiveQueueConvertTask); + //https://github.com/amqp/rhea/issues/251#issuecomment-535076570 + let optionsConvertTask = { + target: { + address: cfgActiveQueueConvertTask, + capabilities: ['queue'] + } + }; + taskqueue.channelConvertTask = yield activeMQCore.openSenderPromise(conn, optionsConvertTask); } if (isAddResponse) { - taskqueue.channelConvertResponse = yield activeMQCore.openSenderPromise(conn, cfgActiveQueueConvertResponse); + let optionsConvertResponse = { + target: { + address: cfgActiveQueueConvertResponse, + capabilities: ['queue'] + } + }; + taskqueue.channelConvertResponse = yield activeMQCore.openSenderPromise(conn, optionsConvertResponse); } if (isAddTaskReceive) { - let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveQueueConvertTask, false); + let optionsConvertTask = { + source: { + address: cfgActiveQueueConvertTask, + capabilities: ['queue'] + }, + credit_window: 0, + autoaccept: false + }; + let receiver = yield activeMQCore.openReceiverPromise(conn, optionsConvertTask); //todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1 receiver.add_credit(1); receiver.on("message", function(context) { @@ -202,7 +223,15 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd taskqueue.channelConvertTaskReceive = receiver; } if (isAddResponseReceive) { - let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveQueueConvertResponse, false); + let optionsConvertResponse = { + source: { + address: cfgActiveQueueConvertResponse, + capabilities: ['queue'] + }, + credit_window: 0, + autoaccept: false + }; + let receiver = yield activeMQCore.openReceiverPromise(conn, optionsConvertResponse); //todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1 receiver.add_credit(1); receiver.on("message", function(context) { @@ -216,10 +245,24 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd taskqueue.channelConvertResponseReceive = receiver; } if (isAddDelayed) { - taskqueue.channelDelayed = yield activeMQCore.openSenderPromise(conn, cfgActiveQueueDelayed); + let optionsDelayed = { + target: { + address: cfgActiveQueueDelayed, + capabilities: ['queue'] + } + }; + taskqueue.channelDelayed = yield activeMQCore.openSenderPromise(conn, optionsDelayed); } if (isEmitDead) { - let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveQueueConvertDead, false); + let optionsConvertDead = { + source: { + address: cfgActiveQueueConvertDead, + capabilities: ['queue'] + }, + credit_window: 0, + autoaccept: false + }; + let receiver = yield activeMQCore.openReceiverPromise(conn, optionsConvertDead); //todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1 receiver.add_credit(1); receiver.on("message", function(context) { diff --git a/DocService/sources/pubsubRabbitMQ.js b/DocService/sources/pubsubRabbitMQ.js index f37a5ab2..746230a1 100644 --- a/DocService/sources/pubsubRabbitMQ.js +++ b/DocService/sources/pubsubRabbitMQ.js @@ -92,9 +92,24 @@ function initActive(pubsub, callback) { } }); pubsub.connection = conn; - pubsub.channelPublish = yield activeMQCore.openSenderPromise(conn, cfgActiveTopicPubSub); + //https://github.com/amqp/rhea/issues/251#issuecomment-535076570 + let optionsPubSubSender = { + target: { + address: cfgActiveTopicPubSub, + capabilities: ['topic'] + } + }; + pubsub.channelPublish = yield activeMQCore.openSenderPromise(conn, optionsPubSubSender); - let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveTopicPubSub, false); + let optionsPubSubReceiver = { + source: { + address: cfgActiveTopicPubSub, + capabilities: ['topic'] + }, + credit_window: 0, + autoaccept: false + }; + let receiver = yield activeMQCore.openReceiverPromise(conn, optionsPubSubReceiver); //todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1 receiver.add_credit(1); receiver.on("message", function(context) {