提交 f8d875b9 编写于 作者: zlt2000's avatar zlt2000

新增`rocketmq-demo`演示工程

上级 1a3a16d3
......@@ -38,8 +38,8 @@
<txlcn.version>5.0.2.RELEASE</txlcn.version>
<fastdfs-client.version>1.26.5</fastdfs-client.version>
<platform-bom>Cairo-SR3</platform-bom>
<spring-cloud-alibaba-dependencies.version>0.2.1.RELEASE</spring-cloud-alibaba-dependencies.version>
<spring-boot-dependencies.version>2.0.8.RELEASE</spring-boot-dependencies.version>
<spring-cloud-alibaba-dependencies.version>0.2.2.RELEASE</spring-cloud-alibaba-dependencies.version>
<spring-boot-dependencies.version>2.0.9.RELEASE</spring-boot-dependencies.version>
<spring-cloud-dependencies.version>Finchley.SR3</spring-cloud-dependencies.version>
<spring-boot-maven-plugin.version>2.1.1.RELEASE</spring-boot-maven-plugin.version>
<docker-maven-plugin.version>1.2.0</docker-maven-plugin.version>
......
......@@ -14,5 +14,7 @@
<module>txlcn-demo</module>
<!-- sharding-jdbc分库分表demo -->
<module>sharding-jdbc-demo</module>
<!-- rocketmq消息队列demo -->
<module>rocketmq-demo</module>
</modules>
</project>
\ No newline at end of file
[TOC]
## 一、说明
rocketMQ的demo主要模拟两个场景:
1. 集成Spring-Cloud-Stream消息框架的消息生产和消费
* `rocketmq-produce`:消息生产者
* `rocketmq-consume`:消息消费者
2. rocketMQ的事务消息,模拟场景:生成订单记录 -> MQ -> 增加积分
* `rocketmq-transactional`
## 二、环境准备
安装`RocketMQ`
## 三、`produce`和`consume` demo
### 1. 修改application.yml配置
修改`rocketmq-produce``rocketmq-consume``namesrv-addr`的值为`RocketMQ`的服务地址
### 2. 启动consume
运行`rocketmq-consume``RocketMqConsumeApplication`
> 消费者有3个组
>
> input:为字符串消息,消费所有消息
>
> input2:为对象消息,只消费tag为`tagObj`的消息
>
> input3:为spring.messaging对象消息,消费所有消息
### 3. 启动produce
运行`rocketmq-produce``RocketMqProduceApplication`
> 运行后会发送5条消息:2条字符串消息,3条对象消息(带tag)
&nbsp;
## 四、事务消息demo
### 1. 修改application.yml配置
修改`rocketmq-transactional``namesrv-addr`的值为`RocketMQ`的服务地址
### 2. 启动
运行`rocketmq-transactional``RocketMqTxApplication`
### 3. 测试
测试的场景主要有3个:
#### 3.1. 正常情况
流程如下:
1. 订单创建
2. 发送mq消息
3. 消费消息增加积分
http://localhost:11002/success
#### 3.2. 发送消息失败
流程如下:
1. 订单创建
2. 发送mq消息 -> 失败
3. 事务回查(等待1分钟左右)
4. 发送mq消息
5. 消费消息增加积分
http://localhost:11002/produceError
#### 3.3. 消费消息失败
流程如下:
1. 订单创建
2. 发送mq消息
3. 消费消息增加积分 -> 失败
4. 重试消费消息 -> 失败
5. 进入死信队列
6. 消费死信队列的消息
7. 记录日志并发出预警
http://localhost:11002/consumeError
> 注意:消费死信队列topic,必需把topic的perm改成6才能消费,默认是2
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zlt</groupId>
<artifactId>zlt-demo</artifactId>
<version>1.5.0</version>
</parent>
<artifactId>rocketmq-demo</artifactId>
<packaging>pom</packaging>
<modules>
<!-- rocketmq事务消息demo -->
<module>rocketmq-transactional</module>
<!-- rocketmq生产者demo -->
<module>rocketmq-produce</module>
<!-- rocketmq消费者demo -->
<module>rocketmq-consume</module>
</modules>
</project>
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zlt</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.5.0</version>
</parent>
<artifactId>rocketmq-consume</artifactId>
<description>rocketMQ消费者demo</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Rocketmq消费者 demo
*
* @author zlt
*/
@SpringBootApplication
public class RocketMqConsumeApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqConsumeApplication.class, args);
}
}
package com.rocketmq.demo.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import com.rocketmq.demo.config.RocketMqConfig.MySink;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.SubscribableChannel;
/**
* @author zlt
*/
@EnableBinding({MySink.class})
public class RocketMqConfig {
public interface MySink {
@Input(Sink.INPUT)
SubscribableChannel input();
@Input("input2")
SubscribableChannel input2();
@Input("input3")
SubscribableChannel input3();
}
}
package com.rocketmq.demo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 订单实体
*
* @author zlt
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class Order implements Serializable {
private static final long serialVersionUID = 2801814838883246461L;
private Long orderId;
private String orderNo;
}
package com.rocketmq.demo.service;
import com.rocketmq.demo.model.Order;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
/**
* @author zlt
*/
@Service
public class ReceiveService {
/**
* 字符串消息
*/
@StreamListener(Sink.INPUT)
public void receiveInput(String receiveMsg) {
System.out.println("input receive: " + receiveMsg);
}
/**
* 对象消息
*/
@StreamListener("input2")
public void receiveInput2(@Payload Order order) {
System.out.println("input2 receive: " + order);
}
/**
* 通过spring.messaging对象来接收消息
*/
@StreamListener("input3")
public void receiveInput3(Message msg) {
System.out.println("input3 receive: " + msg);
}
}
\ No newline at end of file
server:
port: 11003
spring:
application:
name: rocketmq-produce
cloud:
stream:
rocketmq:
binder:
# RocketMQ 服务器地址
name-server: 192.168.28.130:9876
bindings:
input2:
consumer:
#增加tag过滤
tags: tagObj
bindings:
input:
destination: test-topic
content-type: text/plain
group: consume-group1
consumer:
#并发消费线程数
concurrency: 20
input2:
destination: test-topic
content-type: application/json
group: consume-group2
consumer:
#并发消费线程数
concurrency: 20
input3:
destination: test-topic
content-type: text/plain
group: consume-group3
consumer:
#并发消费线程数
concurrency: 20
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zlt</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.5.0</version>
</parent>
<artifactId>rocketmq-produce</artifactId>
<description>rocketMQ生产者demo</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.rocketmq;
import com.rocketmq.demo.model.Order;
import com.rocketmq.demo.service.SenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
/**
* Rocketmq生产者 demo
*
* @author zlt
*/
@SpringBootApplication
public class RocketMqProduceApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqProduceApplication.class, args);
}
@Bean
public CustomRunner customRunner() {
return new CustomRunner();
}
/**
* 工程启动后执行
* 共发送5条消息:2条为字符消息,3条为带tag的对象消息
*/
public static class CustomRunner implements CommandLineRunner {
@Autowired
private SenderService senderService;
@Override
public void run(String... args) {
int count = 5;
for (int index = 1; index <= count; index++) {
String msgContent = "msg-" + index;
if (index % 2 == 0) {
senderService.send(msgContent);
} else {
senderService.sendWithTags(new Order((long)index, "order-"+index), "tagObj");
}
}
}
}
}
package com.rocketmq.demo.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import com.rocketmq.demo.config.RocketMqConfig.MySource;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
/**
* 配置消息生产者
*
* @author zlt
*/
@EnableBinding({MySource.class})
public class RocketMqConfig {
public interface MySource {
@Output(Source.OUTPUT)
MessageChannel output();
}
}
package com.rocketmq.demo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 订单实体
*
* @author zlt
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class Order implements Serializable {
private static final long serialVersionUID = 2801814838883246461L;
private Long orderId;
private String orderNo;
}
package com.rocketmq.demo.service;
import org.apache.rocketmq.common.message.MessageConst;
import com.rocketmq.demo.config.RocketMqConfig.MySource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
/**
* @author zlt
*/
@Service
public class SenderService {
@Autowired
private MySource source;
/**
* 发送字符消息
*/
public void send(String msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
}
/**
* 发送带tag的对象消息
*/
public <T> void sendWithTags(T msg, String tag) {
Message message = MessageBuilder.withPayload(msg)
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
source.output().send(message);
}
}
\ No newline at end of file
server:
port: 11004
spring:
application:
name: rocketmq-produce
cloud:
stream:
rocketmq:
binder:
# RocketMQ 服务地址
name-server: 192.168.28.130:9876
bindings:
output:
producer:
group: produce-group
output2:
producer:
group: test-group2
bindings:
output:
destination: test-topic
content-type: application/json
logging:
level:
org:
springframework:
cloud:
stream:
binder:
rocketmq: DEBUG
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zlt</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.5.0</version>
</parent>
<artifactId>rocketmq-transactional</artifactId>
<description>rocketMQ事务消息demo</description>
<dependencies>
<dependency>
<groupId>com.zlt</groupId>
<artifactId>zlt-common-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* Rocketmq事务 demo
*
* @author zlt
*/
@EnableTransactionManagement
@SpringBootApplication
public class RocketMqTxApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqTxApplication.class, args);
}
}
package com.rocketmq.demo.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.SubscribableChannel;
import com.rocketmq.demo.config.RocketMqConfig.MySink;
/**
* @author zlt
*/
@EnableBinding({ Source.class, MySink.class })
public class RocketMqConfig {
public interface MySink {
@Input(Sink.INPUT)
SubscribableChannel input();
@Input("inputDlq")
SubscribableChannel inputDlq();
}
}
package com.rocketmq.demo.controller;
import cn.hutool.core.util.RandomUtil;
import com.central.common.utils.IdGenerator;
import com.rocketmq.demo.model.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author zlt
*/
@RestController
public class OrderController {
private final Source source;
@Autowired
public OrderController(Source source) {
this.source = source;
}
/**
* 正常情况
*/
@GetMapping("/success")
public String success() {
Order order = new Order();
order.setOrderId(IdGenerator.getId());
order.setOrderNo(RandomUtil.randomString(4));
Message message = MessageBuilder
.withPayload(order)
.setHeader("orderId", order.getOrderId())
.build();
//发送半消息
source.output().send(message);
return "下单成功";
}
/**
* 发送消息失败
*/
@GetMapping("/produceError")
public String produceError() {
Order order = new Order();
order.setOrderId(IdGenerator.getId());
order.setOrderNo(RandomUtil.randomString(4));
Message message = MessageBuilder
.withPayload(order)
.setHeader("orderId", order.getOrderId())
.setHeader("produceError", "1")
.build();
//发送半消息
source.output().send(message);
return "发送消息失败";
}
/**
* 消费消息失败
*/
@GetMapping("/consumeError")
public String consumeError() {
Order order = new Order();
order.setOrderId(IdGenerator.getId());
order.setOrderNo(RandomUtil.randomString(4));
Message message = MessageBuilder
.withPayload(order)
.setHeader("orderId", order.getOrderId())
.setHeader("consumeError", "1")
.build();
//发送半消息
source.output().send(message);
return "消费消息失败";
}
}
package com.rocketmq.demo.listener;
import com.alibaba.fastjson.JSON;
import com.rocketmq.demo.model.Order;
import com.rocketmq.demo.service.IOrderService;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
/**
* @author zlt
*/
@RocketMQTransactionListener(txProducerGroup = "order-tx-produce-group", corePoolSize = 5, maximumPoolSize = 10)
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private IOrderService orderService;
/**
* 提交本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
//插入订单数据
String orderJson = new String(((byte[])message.getPayload()));
Order order = JSON.parseObject(orderJson, Order.class);
orderService.save(order);
String produceError = (String)message.getHeaders().get("produceError");
if ("1".equals(produceError)) {
System.err.println("============Exception:订单进程挂了,事务消息没提交");
//模拟插入订单后服务器挂了,没有commit事务消息
throw new RuntimeException("============订单服务器挂了");
}
//提交事务消息
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 事务回查接口
*
* 如果事务消息一直没提交,则定时判断订单数据是否已经插入
* 是:提交事务消息
* 否:回滚事务消息
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String orderId = (String)message.getHeaders().get("orderId");
System.out.println("============事务回查-orderId:" + orderId);
//判断之前的事务是否已经提交:订单记录是否已经保存
int count = 1;
//select count(1) from t_order where order_id = ${orderId}
System.out.println("============事务回查-订单已生成-提交事务消息");
return count > 0 ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}
\ No newline at end of file
package com.rocketmq.demo.model;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* 订单实体
*
* @author zlt
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class Order implements Serializable {
private static final long serialVersionUID = 2801814838883246461L;
private Long orderId;
private String orderNo;
}
package com.rocketmq.demo.service;
import com.rocketmq.demo.model.Order;
/**
* @author zlt
*/
public interface IOrderService {
void save(Order order);
}
package com.rocketmq.demo.service.impl;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
/**
* 积分服务的消费者,接收到下单成功后增加积分
*
* @author zlt
*/
@Service
public class IntegralReceiveService {
@StreamListener(Sink.INPUT)
public void receive(Message message) {
//模拟消费异常
String consumeError = (String)message.getHeaders().get("consumeError");
if ("1".equals(consumeError)) {
System.err.println("============Exception:积分进程挂了,消费消息失败");
//模拟插入订单后服务器挂了,没有commit事务消息
throw new RuntimeException("积分服务器挂了");
}
System.out.println("============收到订单信息,增加积分:" + message);
}
/**
* 消费死信队列
*/
@StreamListener("inputDlq")
public void receiveDlq(Message message) {
String orderId = (String)message.getHeaders().get("orderId");
System.err.println("============消费死信队列消息,记录日志并预警:" + orderId);
}
}
package com.rocketmq.demo.service.impl;
import com.rocketmq.demo.model.Order;
import com.rocketmq.demo.service.IOrderService;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
/**
* @author zlt
*/
@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {
@Override
public void save(Order order) {
System.out.println("============保存订单成功:" + order.getOrderId());
}
}
\ No newline at end of file
server:
port: 11002
spring:
application:
name: rocketmq-tx
cloud:
stream:
rocketmq:
binder:
# RocketMQ 服务器地址
name-server: 192.168.28.130:9876
bindings:
output:
producer:
group: order-tx-produce-group
#开启事务消息
transactional: true
input:
consumer:
#重试失败直接进入死信队列
delayLevelWhenNextConsume: -1
bindings:
# 生产者
output:
destination: TransactionTopic
content-type: application/json
# 消费者
input:
destination: TransactionTopic
content-type: application/json
group: order-tx-consume-group
consumer:
#多线程
concurrency: 20
#重试1次
maxAttempts: 2
inputDlq:
destination: '%DLQ%${spring.cloud.stream.bindings.input.group}'
content-type: application/json
group: order-dlq-group
consumer:
concurrency: 20
logging:
level:
org:
springframework:
cloud:
stream:
binder:
rocketmq: DEBUG
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册