提交 6aac903e 编写于 作者: 老丢丢's avatar 老丢丢

更新

上级 aae56926
......@@ -5,18 +5,21 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import static com.pannk.cons.Constant.DIRECT_QUEUE;
/**
* 直连消费
* Created by wolf on 20-11-17.
*
* @author wolf
* @date 20-11-17
*/
@Component
@RabbitListener(queues = {"TEST_DIRECT_QUEUE"})
@RabbitListener(queues = {DIRECT_QUEUE})
public class DirectReceiveListener {
@RabbitHandler
public void handler(MessageEntity messageEntity) {
System.out.println("====================");
System.out.println("Receive message " + messageEntity.getId() + "," + messageEntity.getCode() + "," + messageEntity.getContent());
}
......
......@@ -5,18 +5,21 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import static com.pannk.cons.Constant.DIRECT_QUEUE;
/**
* 直连消费
* Created by wolf on 20-11-17.
*
* @author wolf
* @date 20-11-17
*/
@Component
//@Component
public class DirectRecevicer2Listener {
@RabbitListener(queues = {"TEST_DIRECT_QUEUE"})
@RabbitListener(queues = {DIRECT_QUEUE})
@RabbitHandler
public void process(MessageEntity messageEntity) {
System.out.println("**************");
System.out.println("Other process id " + messageEntity.getId() + ", code " + messageEntity.getCode() + ", content " + messageEntity.getContent());
}
}
......@@ -7,7 +7,9 @@ import org.springframework.stereotype.Component;
/**
* 广播消费
* Created by wolf on 20-11-17.
*
* @author wolf
* @date 20-11-17
*/
@Component
public class FanoutReceiveListener {
......
......@@ -8,9 +8,11 @@ import org.springframework.stereotype.Component;
/**
* 主题消费
* Created by wolf on 20-11-17.
*
* @author wolf
* @date 20-11-17
*/
@Component
//@Component
public class TopicReceiveListener {
/**
......
......@@ -23,6 +23,10 @@ public class SendMessageServiceImpl implements SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Autowired
// private AmqpTemplate amqpTemplate;
@Override
public void sendTopicMsg(String content) {
if (!StringUtils.isEmpty(content)) {
......@@ -63,6 +67,6 @@ public class SendMessageServiceImpl implements SendMessageService {
messageEntity.setContent(content);
messageEntity.setSource("测试Direct队列数据");
messageEntity.setTime(new Date());
rabbitTemplate.convertAndSend("", messageEntity);
rabbitTemplate.convertAndSend(Constant.DIRECT_QUEUE, messageEntity);
}
}
......@@ -7,11 +7,11 @@ spring:
password: guest
port: 5672
virtual-host: /
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
\ No newline at end of file
# publisher-confirm-type: correlated
# #确认消息已发送到队列(Queue)
# publisher-returns: true
# listener:
# direct:
# acknowledge-mode: manual
# simple:
# acknowledge-mode: manual
\ No newline at end of file
一、MQ是什么?
Message Queue
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,
流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。
组成:
Broker
消息服务器,作为server提供消息核心服务
Producer
消息生产者,业务的发起方,负责生产消息传输给broker,
Consumer
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
Topic
主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播
Queue
队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
Message
消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
模式:
点对点、发布/订阅
二、使用场景
异步通信、系统解耦、流量削峰
三、产品
RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ、Kafka、IBM WebSphere
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册