JeecgBoot 2.4 微服务正式版本发布,基于SpringBoot的低代码平台

This commit is contained in:
zhangdaiscott
2020-11-28 17:20:10 +08:00
parent 35ef0eff90
commit a004acee4b
614 changed files with 206292 additions and 29220 deletions

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>jeecg-boot-starter</artifactId>
<groupId>org.jeecgframework.boot</groupId>
<version>2.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jeecg-boot-starter-rabbitmq</artifactId>
<description>jeecg-boot-starter-消息队列</description>
<dependencies>
<!-- 消息总线 rabbitmq -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,363 @@
package org.jeecg.boot.starter.rabbitmq.client;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.event.EventObj;
import org.jeecg.boot.starter.rabbitmq.event.JeecgRemoteApplicationEvent;
import org.jeecg.boot.starter.rabbitmq.exchange.DelayExchangeBuilder;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 消息队列客户端
*/
@Slf4j
@Configuration
public class RabbitMqClient {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqClient.class);
private final RabbitAdmin rabbitAdmin;
private final RabbitTemplate rabbitTemplate;
@Resource
private SimpleMessageListenerContainer messageListenerContainer;
@Resource
BusProperties busProperties;
@Resource
private ApplicationEventPublisher publisher;
@Resource
private ApplicationContext applicationContext;
@Bean
public void initQueue() {
Map<String, Object> beansWithRqbbitComponentMap = this.applicationContext.getBeansWithAnnotation(RabbitComponent.class);
Class<? extends Object> clazz = null;
for (Map.Entry<String, Object> entry : beansWithRqbbitComponentMap.entrySet()) {
log.info("初始化时队列............");
//获取到实例对象的class信息
clazz = entry.getValue().getClass();
Method[] methods = clazz.getMethods();
RabbitListener rabbitListener = clazz.getAnnotation(RabbitListener.class);
if (ObjectUtil.isNotEmpty(rabbitListener)) {
createQueue(rabbitListener);
}
for (Method method : methods) {
RabbitListener methodRabbitListener = method.getAnnotation(RabbitListener.class);
if (ObjectUtil.isNotEmpty(methodRabbitListener)) {
createQueue(methodRabbitListener);
}
}
}
}
/**
* 初始化队列
*
* @param rabbitListener
*/
private void createQueue(RabbitListener rabbitListener) {
String[] queues = rabbitListener.queues();
DirectExchange directExchange = createExchange(DelayExchangeBuilder.DELAY_EXCHANGE);
//创建交换机
rabbitAdmin.declareExchange(directExchange);
if (ObjectUtil.isNotEmpty(queues)) {
for (String queueName : queues) {
Queue queue = new Queue(queueName);
addQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);
rabbitAdmin.declareBinding(binding);
log.info("队列创建成功:" + queueName);
}
}
}
private Map sentObj = new HashMap<>();
@Autowired
public RabbitMqClient(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate) {
this.rabbitAdmin = rabbitAdmin;
this.rabbitTemplate = rabbitTemplate;
}
/**
* 发送远程事件
*
* @param handlerName
* @param baseMap
*/
public void publishEvent(String handlerName, BaseMap baseMap) {
EventObj eventObj = new EventObj();
eventObj.setHandlerName(handlerName);
eventObj.setBaseMap(baseMap);
publisher.publishEvent(new JeecgRemoteApplicationEvent(eventObj, busProperties.getId()));
}
/**
* 转换Message对象
*
* @param messageType 返回消息类型 MessageProperties类中常量
* @param msg
* @return
*/
public Message getMessage(String messageType, Object msg) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(messageType);
Message message = new Message(msg.toString().getBytes(), messageProperties);
return message;
}
/**
* 有绑定Key的Exchange发送
*
* @param routingKey
* @param msg
*/
public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg) {
Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON, msg);
rabbitTemplate.send(topicExchange.getName(), routingKey, message);
}
/**
* 没有绑定KEY的Exchange发送
*
* @param exchange
* @param msg
*/
public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg) {
addExchange(exchange);
logger.info("RabbitMQ send " + exchange.getName() + "->" + msg);
rabbitTemplate.convertAndSend(topicExchange.getName(), msg);
}
/**
* 发送消息
*
* @param queueName 队列名称
* @param params 消息内容map
*/
public void sendMessage(String queueName, Object params) {
log.info("发送消息到mq");
try {
rabbitTemplate.convertAndSend(DelayExchangeBuilder.DELAY_EXCHANGE, queueName, params, message -> {
return message;
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送消息
*
* @param queueName 队列名称
*/
public void sendMessage(String queueName) {
this.send(queueName, this.sentObj, 0);
this.sentObj.clear();
}
public RabbitMqClient put(String key, Object value) {
this.sentObj.put(key, value);
return this;
}
/**
* 延迟发送消息
*
* @param queueName 队列名称
* @param params 消息内容params
* @param expiration 延迟时间 单位毫秒
*/
public void sendMessage(String queueName, Object params, Integer expiration) {
this.send(queueName, params, expiration);
}
private void send(String queueName, Object params, Integer expiration) {
Queue queue = new Queue(queueName);
addQueue(queue);
CustomExchange customExchange = DelayExchangeBuilder.buildExchange();
rabbitAdmin.declareExchange(customExchange);
Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
rabbitAdmin.declareBinding(binding);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.debug("发送时间:" + sf.format(new Date()));
messageListenerContainer.setQueueNames(queueName);
/* messageListenerContainer.setMessageListener(new MqListener<Message>() {
@Override
public void onMessage(Message message, Channel channel) {
MqListener messageListener = SpringContextHolder.getHandler(queueName + "Listener", MqListener.class);
if (ObjectUtil.isNotEmpty(messageListener)) {
messageListener.onMessage(message, channel);
}
}
});*/
rabbitTemplate.convertAndSend(DelayExchangeBuilder.DEFAULT_DELAY_EXCHANGE, queueName, params, message -> {
if (expiration != null && expiration > 0) {
message.getMessageProperties().setHeader("x-delay", expiration);
}
return message;
});
}
/**
* 给queue发送消息
*
* @param queueName
*/
public String receiveFromQueue(String queueName) {
return receiveFromQueue(DirectExchange.DEFAULT, queueName);
}
/**
* 给direct交换机指定queue发送消息
*
* @param directExchange
* @param queueName
*/
public String receiveFromQueue(DirectExchange directExchange, String queueName) {
Queue queue = new Queue(queueName);
addQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
rabbitAdmin.declareBinding(binding);
String messages = (String) rabbitTemplate.receiveAndConvert(queueName);
System.out.println("Receive:" + messages);
return messages;
}
/**
* 创建Exchange
*
* @param exchange
*/
public void addExchange(AbstractExchange exchange) {
rabbitAdmin.declareExchange(exchange);
}
/**
* 删除一个Exchange
*
* @param exchangeName
*/
public boolean deleteExchange(String exchangeName) {
return rabbitAdmin.deleteExchange(exchangeName);
}
/**
* 声明其名称自动命名的队列。它是用exclusive=true、autoDelete=true和 durable = false
*
* @return Queue
*/
public Queue addQueue() {
return rabbitAdmin.declareQueue();
}
/**
* 创建一个指定的Queue
*
* @param queue
* @return queueName
*/
public String addQueue(Queue queue) {
return rabbitAdmin.declareQueue(queue);
}
/**
* 删除一个队列
*
* @param queueName the name of the queue.
* @param unused true if the queue should be deleted only if not in use.
* @param empty true if the queue should be deleted only if empty.
*/
public void deleteQueue(String queueName, boolean unused, boolean empty) {
rabbitAdmin.deleteQueue(queueName, unused, empty);
}
/**
* 删除一个队列
*
* @param queueName
* @return true if the queue existed and was deleted.
*/
public boolean deleteQueue(String queueName) {
return rabbitAdmin.deleteQueue(queueName);
}
/**
* 绑定一个队列到一个匹配型交换器使用一个routingKey
*
* @param queue
* @param exchange
* @param routingKey
*/
public void addBinding(Queue queue, TopicExchange exchange, String routingKey) {
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
}
/**
* 绑定一个Exchange到一个匹配型Exchange 使用一个routingKey
*
* @param exchange
* @param topicExchange
* @param routingKey
*/
public void addBinding(Exchange exchange, TopicExchange topicExchange, String routingKey) {
Binding binding = BindingBuilder.bind(exchange).to(topicExchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
}
/**
* 去掉一个binding
*
* @param binding
*/
public void removeBinding(Binding binding) {
rabbitAdmin.removeBinding(binding);
}
/**
* 创建交换器
*
* @param exchangeName
* @return
*/
public DirectExchange createExchange(String exchangeName) {
return new DirectExchange(exchangeName, true, false);
}
}

View File

@ -0,0 +1,61 @@
package org.jeecg.boot.starter.rabbitmq.config;
import org.jeecg.boot.starter.rabbitmq.event.JeecgRemoteApplicationEvent;
import org.jeecg.boot.starter.rabbitmq.exchange.DelayExchangeBuilder;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
/**
* 消息队列配置类
*
* @author zyf
*/
@Configuration
@RemoteApplicationEventScan(basePackageClasses = JeecgRemoteApplicationEvent.class)
public class RabbitMqConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//设置忽略声明异常
rabbitAdmin.setIgnoreDeclarationExceptions(true);
return rabbitAdmin;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//当前的消费者数量
container.setConcurrentConsumers(1);
//最大的消费者数量
container.setMaxConcurrentConsumers(1);
//是否重回队列
container.setDefaultRequeueRejected(true);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
return container;
}
}

View File

@ -0,0 +1,31 @@
package org.jeecg.boot.starter.rabbitmq.core;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import java.io.IOException;
@Slf4j
public class BaseRabbiMqHandler<T> {
public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
try {
mqListener.handler(t, channel);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.info("接收消息失败,重新放回队列");
try {
/**
* deliveryTag:该消息的index
* multiple是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
* requeue被拒绝的是否重新入队列
*/
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,39 @@
package org.jeecg.boot.starter.rabbitmq.core;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.Map;
public class MapMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
} else {
ObjectInputStream objInt = null;
try {
ByteArrayInputStream byteInt = new ByteArrayInputStream(message.getBody());
objInt = new ObjectInputStream(byteInt);
//byte[]转map
Map map = (HashMap) objInt.readObject();
return map;
} catch (Exception e) {
e.printStackTrace();
}
}
return null;
}
}

View File

@ -0,0 +1,28 @@
package org.jeecg.boot.starter.rabbitmq.event;
import cn.hutool.core.util.ObjectUtil;
import org.jeecg.common.util.SpringContextHolder;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 监听远程事件,并分发消息到业务模块消息处理器
*/
@Component
public class BaseApplicationEvent implements ApplicationListener<JeecgRemoteApplicationEvent> {
@Override
public void onApplicationEvent(JeecgRemoteApplicationEvent jeecgRemoteApplicationEvent) {
EventObj eventObj = jeecgRemoteApplicationEvent.getEventObj();
if (ObjectUtil.isNotEmpty(eventObj)) {
//获取业务模块消息处理器
JeecgBusEventHandler busEventHandler = SpringContextHolder.getHandler(eventObj.getHandlerName(), JeecgBusEventHandler.class);
if (ObjectUtil.isNotEmpty(busEventHandler)) {
//通知业务模块
busEventHandler.onMessage(eventObj);
}
}
}
}

View File

@ -0,0 +1,21 @@
package org.jeecg.boot.starter.rabbitmq.event;
import lombok.Data;
import org.jeecg.common.base.BaseMap;
import java.io.Serializable;
/**
* 远程事件数据对象
*/
@Data
public class EventObj implements Serializable {
/**
* 数据对象
*/
private BaseMap baseMap;
/**
* 自定义业务模块消息处理器beanName
*/
private String handlerName;
}

View File

@ -0,0 +1,8 @@
package org.jeecg.boot.starter.rabbitmq.event;
/**
* 业务模块消息处理器接口
*/
public interface JeecgBusEventHandler {
void onMessage(EventObj map);
}

View File

@ -0,0 +1,29 @@
package org.jeecg.boot.starter.rabbitmq.event;
import lombok.Data;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
/**
* 自定义网关刷新远程事件
*
* @author : zyf
* @date :2020-11-10
*/
@Data
public class JeecgRemoteApplicationEvent extends RemoteApplicationEvent {
private JeecgRemoteApplicationEvent() {
}
private EventObj eventObj;
public JeecgRemoteApplicationEvent(EventObj source, String originService, String destinationService) {
super(source, originService, destinationService);
this.eventObj = source;
}
public JeecgRemoteApplicationEvent(EventObj source, String originService) {
super(source, originService, null);
this.eventObj = source;
}
}

View File

@ -0,0 +1,33 @@
package org.jeecg.boot.starter.rabbitmq.exchange;
import org.springframework.amqp.core.CustomExchange;
import java.util.HashMap;
import java.util.Map;
/**
* 延迟交换器构造器
* @author: zyf
* @date: 2019/3/8 13:31
* @description:
*/
public class DelayExchangeBuilder {
/**
* 默认延迟消息交换器
*/
public final static String DEFAULT_DELAY_EXCHANGE = "jeecg.delayed.exchange";
/**
* 普通交换器
*/
public final static String DELAY_EXCHANGE = "jeecg.direct.exchange";
/**
* 构建延迟消息交换器
* @return
*/
public static CustomExchange buildExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DEFAULT_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
}

View File

@ -0,0 +1,10 @@
package org.jeecg.boot.starter.rabbitmq.listenter;
import com.rabbitmq.client.Channel;
public interface MqListener<T> {
default void handler(T map, Channel channel) {
}
}