提交 aae56926 编写于 作者: Q qinxiaodong@pannk.com

集成mq

上级 cdd777f6
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.pannk</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>integrate-mq</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.pannk;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Created by wolf on 20-11-17.
*/
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
package com.pannk.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.pannk.cons.Constant.*;
/**
* 默认模式-直连模式
* Created by wolf on 20-11-17.
*/
@Configuration
public class DirectRabbitMQConfig {
@Bean
public Queue testDirectQueue() {
return new Queue(DIRECT_QUEUE);
}
/**
* 声明一个Direct exchange
* 在发送消息时候就需要指定exchange
* 不指定exchange时,默认使用的是direct exchange,而且routingKey与queue名一致
*
* @return
*/
// @Bean
public DirectExchange testDirectExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
// @Bean
public Binding bindingDirect() {
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(DIRECT_ROUTING);
}
}
package com.pannk.config;
import com.pannk.cons.Constant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by wolf on 20-11-17.
*/
@Configuration
public class FanoutRabbitMQConfig {
@Bean
public Queue fanoutAQueue() {
return new Queue(Constant.FANOUT_A);
}
@Bean
public Queue fanoutBQueue() {
return new Queue(Constant.FANOUT_B);
}
@Bean
public Queue fanoutCQueue() {
return new Queue(Constant.FANOUT_C);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding bindingFanoutAExchange(){
return BindingBuilder.bind(fanoutAQueue()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutBExchange(){
return BindingBuilder.bind(fanoutAQueue()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutCExchange(){
return BindingBuilder.bind(fanoutAQueue()).to(fanoutExchange());
}
}
package com.pannk.config;
import com.pannk.cons.Constant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by wolf on 20-11-17.
*/
@Configuration
public class TopicRabbitMQConfig {
@Bean
public Queue orderQueue() {
return new Queue(Constant.TOPIC_ORDER);
}
@Bean
public Queue consultQueue() {
return new Queue(Constant.TOPIC_CONSULT);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Binding bindingExchangeOrder() {
return BindingBuilder.bind(orderQueue()).to(topicExchange()).with(Constant.TOPIC_ORDER);
}
@Bean
public Binding bindingExchangeConsult() {
return BindingBuilder.bind(consultQueue()).to(topicExchange()).with(Constant.TOPIC_ALL);
}
}
package com.pannk.cons;
/**
* Created by wolf on 20-11-17.
*/
public class Constant {
/**
* DIRECT_QUEUE
*/
public static final String DIRECT_QUEUE = "DIRECT_QUEUE";
/**
* DIRECT_EXCHANGE
*/
public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";
/**
* DIRECT_ROUTING
*/
public static final String DIRECT_ROUTING = "DirectRouting";
/**
* 订单
*/
public static final String TOPIC_ORDER = "TOPIC.ORDER";
/**
* 咨询
*/
public static final String TOPIC_CONSULT = "TOPIC.CONSULT";
/**
* 所有
*/
public static final String TOPIC_ALL = "TOPIC.#";
/**
* fanout a
*/
public static final String FANOUT_A = "FANOUT_A";
/**
* fanout b
*/
public static final String FANOUT_B = "FANOUT_B";
/**
* fanout c
*/
public static final String FANOUT_C = "FANOUT_C";
}
package com.pannk.controller;
import com.pannk.service.SendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Created by wolf on 20-11-17.
*/
@RestController
public class MessageSendController {
@Autowired
private SendMessageService sendMessageService;
/**
* 直连消息
*
* @param content
* @return
*/
@GetMapping("/sendDirect")
public String sendDirectMsg(String content) {
sendMessageService.sendDirectMsg(content);
return content;
}
/**
* 主题消息
*
* @param content
* @return
*/
@GetMapping("/sendTopic")
public String sendTopicMsg(String content) {
sendMessageService.sendTopicMsg(content);
return content;
}
/**
* 广播消息
*
* @param content
* @return
*/
@GetMapping("/sendFanout")
public String sendFanoutMsg(String content) {
sendMessageService.sendFanoutMsg(content);
return content;
}
}
package com.pannk.entity;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* Created by wolf on 20-11-17.
*/
@Data
public class MessageEntity implements Serializable {
private String id;
private String code;
private String title;
private String content;
private String source;
private Date time;
}
package com.pannk.listener;
import com.pannk.entity.MessageEntity;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 直连消费
* Created by wolf on 20-11-17.
*/
@Component
@RabbitListener(queues = {"TEST_DIRECT_QUEUE"})
public class DirectReceiveListener {
@RabbitHandler
public void handler(MessageEntity messageEntity) {
System.out.println("====================");
System.out.println("Receive message " + messageEntity.getId() + "," + messageEntity.getCode() + "," + messageEntity.getContent());
}
}
package com.pannk.listener;
import com.pannk.entity.MessageEntity;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 直连消费
* Created by wolf on 20-11-17.
*/
@Component
public class DirectRecevicer2Listener {
@RabbitListener(queues = {"TEST_DIRECT_QUEUE"})
@RabbitHandler
public void process(MessageEntity messageEntity) {
System.out.println("**************");
System.out.println("Other process id " + messageEntity.getId() + ", code " + messageEntity.getCode() + ", content " + messageEntity.getContent());
}
}
package com.pannk.listener;
import com.pannk.cons.Constant;
import com.pannk.entity.MessageEntity;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 广播消费
* Created by wolf on 20-11-17.
*/
@Component
public class FanoutReceiveListener {
@RabbitListener(queues = {Constant.FANOUT_A, Constant.FANOUT_B, Constant.FANOUT_C})
public void handleMsg(MessageEntity messageEntity) {
System.out.println("Receive msg " + messageEntity.getContent());
}
}
package com.pannk.listener;
import com.pannk.cons.Constant;
import com.pannk.entity.MessageEntity;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 主题消费
* Created by wolf on 20-11-17.
*/
@Component
public class TopicReceiveListener {
/**
* 处理订单
*
* @param messageEntity
*/
@RabbitListener(queues = {Constant.TOPIC_ORDER})
@RabbitHandler
public void handleOrder(MessageEntity messageEntity) {
System.out.println("Receive order msg id " + messageEntity.getId() + ", code " + messageEntity.getCode() + ", content " + messageEntity.getContent());
}
/**
* 处理咨询
*
* @param messageEntity
*/
@RabbitListener(queues = {Constant.TOPIC_CONSULT})
@RabbitHandler
public void handleConsult(MessageEntity messageEntity) {
System.out.println("Receive consult msg id " + messageEntity.getId() + ", code " + messageEntity.getCode() + ", content " + messageEntity.getContent());
}
}
package com.pannk.service;
/**
* Created by wolf on 20-11-17.
*/
public interface SendMessageService {
/**
* 发送topic消息
*
* @param content
*/
void sendTopicMsg(String content);
/**
* 发送广播消息
*
* @param content
*/
void sendFanoutMsg(String content);
/**
* 发送直连消息
*
* @param content
*/
void sendDirectMsg(String content);
}
package com.pannk.service.impl;
import com.pannk.cons.Constant;
import com.pannk.entity.MessageEntity;
import com.pannk.service.SendMessageService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.Date;
import java.util.UUID;
/**
* Created by wolf on 20-11-17.
*/
@Service
public class SendMessageServiceImpl implements SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendTopicMsg(String content) {
if (!StringUtils.isEmpty(content)) {
MessageEntity messageEntity = new MessageEntity();
messageEntity.setId(UUID.randomUUID().toString());
messageEntity.setCode(UUID.randomUUID().toString());
messageEntity.setTitle("测试Topic队列");
messageEntity.setContent(content);
messageEntity.setSource("测试Topic队列数据");
messageEntity.setTime(new Date());
if (content.contains("order")) {
rabbitTemplate.convertAndSend("topicExchange", Constant.TOPIC_ORDER, messageEntity);
} else {
rabbitTemplate.convertAndSend("topicExchange", Constant.TOPIC_CONSULT, messageEntity);
}
}
}
@Override
public void sendFanoutMsg(String content) {
MessageEntity messageEntity = new MessageEntity();
messageEntity.setId(UUID.randomUUID().toString());
messageEntity.setCode(UUID.randomUUID().toString());
messageEntity.setTitle("测试Fanout队列");
messageEntity.setContent(content);
messageEntity.setSource("测试Fanout队列数据");
messageEntity.setTime(new Date());
rabbitTemplate.convertAndSend("fanoutExchange", null, messageEntity);
}
@Override
public void sendDirectMsg(String content) {
MessageEntity messageEntity = new MessageEntity();
messageEntity.setId(UUID.randomUUID().toString());
messageEntity.setCode(UUID.randomUUID().toString());
messageEntity.setTitle("测试Direct队列");
messageEntity.setContent(content);
messageEntity.setSource("测试Direct队列数据");
messageEntity.setTime(new Date());
rabbitTemplate.convertAndSend("", messageEntity);
}
}
spring:
main:
banner-mode: off
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
virtual-host: /
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
\ No newline at end of file
......@@ -18,6 +18,7 @@
<module>integrate-druid</module>
<module>integrate-redis</module>
<module>integrate-mongodb</module>
<module>integrate-mq</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册