当前组件提供统一的消息处理框架,该组件具有以下特点:
@DynamicEventStrategy
和@EnableMQ
注解简化消息处理配置IEventType
和IMessageType
接口实现灵活的事件和消息类型管理EventLogService
实现,可记录事件处理日志- 配置MQ依赖,默认集成RocketMQ
```xml
<dependencies>
<!--默认集成RocketMQ依赖-->
<dependency>
<groupId>org.gy.framework</groupId>
<artifactId>spring-base-mq</artifactId>
</dependency>
</dependencies>
### 使用说明
#### 1. 启用MQ功能
在Spring Boot启动类上添加`@EnableMQ`注解:
- 【注意】如果`IEventType`、`IMessageType`的实现类不在启动类包路径下面,则需要添加`@EnableMQ`注解的`basePackages`或`basePackageClasses`属性
```java
@EnableMQ
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
实现IEventType
接口定义业务事件类型:
// 枚举方式实现事件类型
@Getter
@AllArgsConstructor
@CommonService //注册到spring容器,方便统一服务管理
public enum DefaultEventType implements IEventType {
DEMO_EVENT(DEFAULT_DEMO, "示例事件");
/**
* 标识
*/
private final String code;
/**
* 描述
*/
private final String desc;
}
interface EventTypeCode {
String DEFAULT_DEMO = "DEMO_EVENT";
}
实现IMessageType
接口定义业务消息类型:
// 枚举方式实现消息类型
@Getter
@AllArgsConstructor
@CommonService //注册到spring容器,方便统一服务管理
public enum DefaultMessageType implements IMessageType {
NORMAL(DEFAULT_NORMAL, "普通消息", ROCKETMQ),
ORDERLY(DEFAULT_ORDERLY, "顺序消息", ROCKETMQ),
KAFKA_DEFAULT(DEFAULT_KAFKA, "kafka消息", KAFKA);
/**
* 标识,必须唯一
*/
private final String code;
/**
* 描述
*/
private final String desc;
/**
* MQ类型
*/
private final MqType mqType;
}
interface MessageTypeCode {
String DEFAULT_NORMAL = "normal";
String DEFAULT_ORDERLY = "orderly";
String DEFAULT_KAFKA = "defaultConfig";
}
{
"requestId": "requestId_af91566ba30e",//请求唯一标识
"timestamp": 1747197048412,//事件毫秒时间戳
"eventTypeCode": "DEMO_EVENT",//事件类型编码(必须)
"bizKey": "bizKey_a7fa55a231be",//业务唯一标识,如果有则幂等处理
"data": { //业务数据示例,后续基于不同事件,定义不同的data结构
"age": 20,
"name": "test"
},
"tag": "tag_a7fa55a231be",//消息标签,默认不设置
"delayTimeLevel": 0, //延迟消息级别,18个等级(1~18),默认不延迟
"orderlyKey": "orderlyKey_1ab3c128b5db" //顺序消息key,默认不设置
}
在业务方法上使用@DynamicEventStrategy
注解:
@Service
public class UserService {
@DynamicEventStrategy(eventTypeCode = EventTypeCode.DEFAULT_DEMO)
public void handleUserRegister(User user) {
// 业务逻辑处理
System.out.println("处理用户注册: " + user);
}
}
@Test
void sendMessageTest(){
//messageTypeCode指定消息类型,当前使用DEFAULT_NORMAL类型作为示例
EventMessageProducerService sendService = EventMessageServiceManager.getSendService(MessageTypeCode.DEFAULT_NORMAL);
//异步发送消息(普通消息)
sendService.asyncSend("测试消息对象", IEventType.DEMO_EVENT);
//异步发送消息(延时消息),需要指定延时等级delayTimeLevel,kafka暂时不支持
//sendService.asyncSend("测试消息对象", IEventType.DEMO_EVENT,2);
//异步发送消息(顺序消息),需要指定顺序消费orderlyKey,例如订单号
//sendService.asyncSend("测试消息对象", IEventType.DEMO_EVENT,"orderlyKey");
//异步发送消息(消息自定义扩展),例如指定消息tag,kafka暂时不支持
//sendService.asyncSend("test", DefaultEventType.DEMO_EVENT, req -> req.setTag("tag1"));
}
spring:
base-mq:
global-config:
idempotent-expire-millis: 7200000 #消息幂等性过期时间,单位毫秒,默认2小时
idempotent-key-prefix: default:mq:idempotentKey #幂等性key前缀,默认为default:mq:idempotentKey
retry-times: 5 #重试次数,默认5次
initial-interval: 2000 #重试间隔时间,默认2000毫秒,仅kafka有效
multiplier: 1.5 #重试间隔时间倍数,默认1.5倍,仅kafka有效
max-interval: 10000 #重试最大间隔时间,默认10000毫秒,仅kafka有效
defaultNormalListener
:默认实现的普通消费监听器,可直接使用,参考DefaultNormalListener
defaultOrderlyListener
:默认实现的顺序消费监听器,可直接使用,参考DefaultOrderlyListener
spring:
base-mq:
rocketmq:
normal: #配置标识,需要与IMessageType的code一致
nameServer: dev.rocketmq:9876 #【必须】nameServer地址,格式: `host:port;host:port`
topic: normal_topic_dev #【必须】消息topic
producer: #【可选】producer配置,需要发送消息则配置
groupName: NormalProducerGroupDev #【producer存在时必须】组名称,保证唯一
instanceName: DEFAULT #同一个组定义多个实例,需要定义不同的实例名称,避免冲突,默认:DEFAULT
sendMessageTimeout: 3000 #消息发送超时时间,单位:毫秒,默认:3000
retryTimesWhenSendFailed: 5 #消息同步发送失败重试次数,默认:2
retryTimesWhenSendAsyncFailed: 5 #消息异步发送失败重试次数,默认:2
consumer: #【可选】consumer配置,需要监听消息则配置
groupName: NormalConsumerGroupDev #【consumer存在时必须】组名称,保证唯一
instanceName: DEFAULT #同一个组定义多个实例,需要定义不同的实例名称,避免冲突,默认:DEFAULT
messageModel: CLUSTERING #消费模式,CLUSTERING集群,BROADCASTING广播,默认:CLUSTERING
selectorExpression: '*' #消费tag表达式定义,*匹配所有,格式:tag1||tag2,默认:*
consumeMessageBatchMaxSize: 1 #批量消费消息数量,默认:1
consumeThreadMin: 16 #消费线程最小值定义,默认:20
consumeThreadMax: 16 #消费线程最大值定义,默认:20
consumeFromWhere: CONSUME_FROM_LAST_OFFSET #消费位置定义,参考ConsumeFromWhere枚举,默认:CONSUME_FROM_LAST_OFFSET
listenerBeanName: defaultNormalListener #消费监听器bean名称
defaultKafkaListener
:默认实现的消费监听器,可直接使用,参考DefaultKafkaListener
spring:
base-mq:
kafka:
defaultConfig: #配置标识,需要与IMessageType的code一致,必须唯一
bootstrapServers:
- dev.kafka:9092
topic: default_topic_dev
producer:
retries: 5 #消息发送失败重试次数
key-serializer: org.apache.kafka.common.serialization.StringSerializer #key序列化器,默认:StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer #value序列化器,默认:StringSerializer
acks: 1 #消息发送成功确认,0:不确认,1:Leader确认,-1/all:所有ISR确认
consumer:
group-id: default_consumer_group_dev #【consumer存在时必须】消费组ID,保证唯一
enable-auto-commit: false #是否自动提交消费位点
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key反序列化器,默认:StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value反序列化器,默认:StringDeserializer
auto-offset-reset: latest #自动偏移量,latest:最新,earliest:最旧,none:无,默认:latest
max-poll-records: 50 #每次拉取最大消息数,默认:500
listener:
concurrency: 4 #监听器容器中线程数,不能超过topic分区数
ack-mode: MANUAL_IMMEDIATE #手动提交确认模式,参考AckMode枚举,建议:MANUAL_IMMEDIATE
listener-bean-name: defaultKafkaListener #消费监听器bean名称
可以通过实现以下接口进行自定义扩展:
EventLogService
:自定义事件日志服务TraceService
:自定义追踪服务EventMessageDispatchService
:自定义消息分发服务EventMessageConsumerService
:自定义消息消费者服务EventMessageProducerService
:自定义消息生产者服务EventAnnotationMethodProcessor
:自定义事件注解方法处理器EventMessageHandler
:自定义消息处理器DynamicEventStrategyAspect
:动态事件策略切面处理DynamicEventStrategyRegister
:动态事件策略注册器EventMessageProducerRegister
:消息生产者服务注册器EventMessageServiceManager
:事件消息服务管理器CommonServiceManager
:通用服务管理器MqManager
:MQ管理器