[feature] Add capability option for open_sender, open_receiver calls

This commit is contained in:
Sergey Konovalov
2022-12-11 16:43:37 +03:00
parent 077fc48501
commit 73972657be
3 changed files with 68 additions and 16 deletions

View File

@ -75,19 +75,13 @@ function connetPromise(reconnectOnConnectionError, closeCallback) {
startConnect();
});
}
function openSenderPromise(conn, name) {
function openSenderPromise(conn, options) {
return new Promise(function(resolve, reject) {
let options = {target: name};
resolve(conn.open_sender(options));
});
}
function openReceiverPromise(conn, name, autoaccept) {
function openReceiverPromise(conn, options) {
return new Promise(function(resolve, reject) {
let options = {source: name};
if (!autoaccept) {
options.credit_window = 0;
options.autoaccept = false;
}
resolve(conn.open_receiver(options));
});
}

View File

@ -176,13 +176,34 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
});
taskqueue.connection = conn;
if (isAddTask) {
taskqueue.channelConvertTask = yield activeMQCore.openSenderPromise(conn, cfgActiveQueueConvertTask);
//https://github.com/amqp/rhea/issues/251#issuecomment-535076570
let optionsConvertTask = {
target: {
address: cfgActiveQueueConvertTask,
capabilities: ['queue']
}
};
taskqueue.channelConvertTask = yield activeMQCore.openSenderPromise(conn, optionsConvertTask);
}
if (isAddResponse) {
taskqueue.channelConvertResponse = yield activeMQCore.openSenderPromise(conn, cfgActiveQueueConvertResponse);
let optionsConvertResponse = {
target: {
address: cfgActiveQueueConvertResponse,
capabilities: ['queue']
}
};
taskqueue.channelConvertResponse = yield activeMQCore.openSenderPromise(conn, optionsConvertResponse);
}
if (isAddTaskReceive) {
let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveQueueConvertTask, false);
let optionsConvertTask = {
source: {
address: cfgActiveQueueConvertTask,
capabilities: ['queue']
},
credit_window: 0,
autoaccept: false
};
let receiver = yield activeMQCore.openReceiverPromise(conn, optionsConvertTask);
//todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1
receiver.add_credit(1);
receiver.on("message", function(context) {
@ -202,7 +223,15 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
taskqueue.channelConvertTaskReceive = receiver;
}
if (isAddResponseReceive) {
let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveQueueConvertResponse, false);
let optionsConvertResponse = {
source: {
address: cfgActiveQueueConvertResponse,
capabilities: ['queue']
},
credit_window: 0,
autoaccept: false
};
let receiver = yield activeMQCore.openReceiverPromise(conn, optionsConvertResponse);
//todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1
receiver.add_credit(1);
receiver.on("message", function(context) {
@ -216,10 +245,24 @@ function initActive(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
taskqueue.channelConvertResponseReceive = receiver;
}
if (isAddDelayed) {
taskqueue.channelDelayed = yield activeMQCore.openSenderPromise(conn, cfgActiveQueueDelayed);
let optionsDelayed = {
target: {
address: cfgActiveQueueDelayed,
capabilities: ['queue']
}
};
taskqueue.channelDelayed = yield activeMQCore.openSenderPromise(conn, optionsDelayed);
}
if (isEmitDead) {
let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveQueueConvertDead, false);
let optionsConvertDead = {
source: {
address: cfgActiveQueueConvertDead,
capabilities: ['queue']
},
credit_window: 0,
autoaccept: false
};
let receiver = yield activeMQCore.openReceiverPromise(conn, optionsConvertDead);
//todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1
receiver.add_credit(1);
receiver.on("message", function(context) {

View File

@ -92,9 +92,24 @@ function initActive(pubsub, callback) {
}
});
pubsub.connection = conn;
pubsub.channelPublish = yield activeMQCore.openSenderPromise(conn, cfgActiveTopicPubSub);
//https://github.com/amqp/rhea/issues/251#issuecomment-535076570
let optionsPubSubSender = {
target: {
address: cfgActiveTopicPubSub,
capabilities: ['topic']
}
};
pubsub.channelPublish = yield activeMQCore.openSenderPromise(conn, optionsPubSubSender);
let receiver = yield activeMQCore.openReceiverPromise(conn, cfgActiveTopicPubSub, false);
let optionsPubSubReceiver = {
source: {
address: cfgActiveTopicPubSub,
capabilities: ['topic']
},
credit_window: 0,
autoaccept: false
};
let receiver = yield activeMQCore.openReceiverPromise(conn, optionsPubSubReceiver);
//todo ?consumer.dispatchAsync=false&consumer.prefetchSize=1
receiver.add_credit(1);
receiver.on("message", function(context) {