提交 826f2192 编写于 作者: R ruozhuliufeng

新增事务消息发送示例

上级 57f5fdb3
package tech.msop.test.rocketmq.transaction.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
import java.util.List;
/**
* 事务消息消费者
*/
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 指定NameServer
consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费Topic与Tag
consumer.subscribe("somTopic","*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一旦Broker中有其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 逐条消费消息
for (MessageExt msg:list){
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者消费
consumer.start();
System.out.println("consumer started");
}
}
package tech.msop.test.rocketmq.transaction.listener;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 工行事务监听器
*/
public class ICBCTransactionListener implements TransactionListener {
// 回调操作方法
// 消息预提交成功就会出发该方法的执行,用于完成本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("预提交消息成功:" + message);
// 假设接收到 TAGA的消息就表示扣款成功,TAGB的消息标识扣款失败
// TAGC的消息表示扣款结果不清楚,需要执行消息回查
if (StringUtils.equals("TAGA", message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", message.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", message.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
// 消息回查操作
// 引发消息回查的原因最常见的有两个:
// 1) 回调操作返回UNKNOW
// 2) TC没有接收到TM的最终全局事务确认指令
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("执行消息回查" + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
package tech.msop.test.rocketmq.transaction.producer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
import tech.msop.test.rocketmq.transaction.listener.ICBCTransactionListener;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 创建一个Producer,参数为Producer Group名称
TransactionMQProducer producer = new TransactionMQProducer("tpg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
/**
* 定义一个线程池
*
* @param corePoolSize 线程池中核心线程数量
* @param maximumPoolSize 线程池中最多线程数
* @param keepAliveTime 这是一个时间。当线程池中线程数量大于核心线程数量时,多余空闲线程的存货时长
* @param unit 时间单位
* @param workQueue 临时存放任务的队列,其参数就是队列的长度
* @param threadFactory 线程工厂
*/
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 为生产者指定一个线程池
producer.setExecutorService(executorService);
// 为生产者添加事务监听器
producer.setTransactionListener(new ICBCTransactionListener());
// 开启Producer
producer.start();
String[] tags = {"TAGA","TAGB","TAGC"};
for (int i = 0; i < 3; i++) {
byte[] body = ("Hi Transaction Msg "+i).getBytes();
Message msg = new Message("TTopic",tags[i],body);
// 发送事务消息
// 第二个参数用于指定在执行本地事务时要使用的业务参数
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
System.out.println("发送结果:"+sendResult);
}
producer.shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册