提交 933a6d8f 编写于 作者: 武汉红喜's avatar 武汉红喜

数据类型转换优化

上级 6b42e962
......@@ -4,6 +4,8 @@
- 发送延时消息方法参数优化(魔法参数改为枚举)
- 优化getMessageType方法,支持 MyConsumer extends AbstractConsumer implements RocketMQListener <br>(官方只支持MyConsumer implements RocketMQListener)
- RocketMQTemplate方法重载(加入keys)
- 让@RocketMQMessageListener自带@Service注解
- 消息数据类型转换优化,增加Integer,Boolean等基础类型的转换
- 暂未加入事务消息功能 (官方最新版支持)
### 关于RocketMQTemplate往多集群发送消息的说明
......
......@@ -26,10 +26,12 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Service;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface RocketMQMessageListener {
/**
......
......@@ -225,8 +225,22 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return str;
} else if (Objects.equals(messageType, Byte.class)) {
return Byte.parseByte(str);
} else if (Objects.equals(messageType, Short.class)) {
return Short.parseShort(str);
} else if (Objects.equals(messageType, Integer.class)) {
return Integer.parseInt(str);
} else if (Objects.equals(messageType, Long.class)) {
return Long.parseLong(str);
} else if (Objects.equals(messageType, Float.class)) {
return Float.parseFloat(str);
} else if (Objects.equals(messageType, Double.class)) {
return Double.parseDouble(str);
} else if (Objects.equals(messageType, Boolean.class)) {
return Boolean.parseBoolean(str);
} else {
// if msgType not string, use objectMapper change it.
// if msgType not primitive, use objectMapper change it.
try {
return objectMapper.readValue(str, messageType);
} catch (Exception e) {
......
......@@ -498,8 +498,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
Object payloadObj = message.getPayload();
byte[] payloads;
if (payloadObj instanceof String) {
payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
if (isPrimitiveType(payloadObj)) {
payloads = payloadObj.toString().getBytes(Charset.forName(charset));
} else {
try {
String jsonObj = this.objectMapper.writeValueAsString(payloadObj);
......@@ -554,6 +554,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return rocketMsg;
}
private boolean isPrimitiveType(Object obj) {
return obj instanceof String || obj instanceof Byte || obj instanceof Short
|| obj instanceof Integer || obj instanceof Long || obj instanceof Float
|| obj instanceof Double || obj instanceof Boolean;
}
private Message<?> doConvert(Object payload, String keys) {
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, keys);
......
......@@ -3,10 +3,8 @@ package org.hongxi.whatsmars.rocketmq.boot.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer implements RocketMQListener<String> {
@Override
......
......@@ -4,10 +4,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.hongxi.whatsmars.rocketmq.boot.OrderPaidEvent;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class MyConsumer2 implements RocketMQListener<OrderPaidEvent> {
@Override
......
......@@ -3,16 +3,13 @@ package org.hongxi.whatsmars.rocketmq.boot.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.hongxi.whatsmars.rocketmq.boot.OrderPaidEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* 指定连接某个MQ集群
*/
@ConditionalOnProperty(prefix = "trade.mq", value = {"nameServer"})
@Slf4j
@Service
@RocketMQMessageListener(nameServer = "${trade.mq.nameServer}", instanceName = "${trade.mq.clusterName}", topic = "test-topic-3", consumerGroup = "my-consumer_test-topic-3")
public class MyConsumer3 implements RocketMQListener<String> {
@Override
......
......@@ -4,13 +4,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
import org.springframework.stereotype.Service;
/**
* 顺序消息消费失败,默认不重试(本人修改点)
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-4", consumerGroup = "my-consumer_test-topic-4",
consumeMode = ConsumeMode.ORDERLY)
public class MyConsumer4 implements RocketMQListener<String> {
......
......@@ -5,13 +5,11 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
import org.springframework.stereotype.Service;
/**
* 配置重试次数(本人修改点) reconsumeTimes
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-4", consumerGroup = "my-consumer_test-topic-5",
consumeMode = ConsumeMode.ORDERLY, reconsumeTimes = 3)
public class MyConsumer5 implements RocketMQListener<MessageExt> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册