提交 f946cb4c 编写于 作者: 武汉红喜's avatar 武汉红喜

RocketMQ User Guide

上级 6cdad98e
### [RocketMQ](https://github.com/apache/rocketmq)
# [RocketMQ](https://github.com/apache/rocketmq)
### Test
- .properties指定rocketmqHome,namesrvAddr等,依次启动NamesrvStartup,BrokerStartup,Consumer,Producer
- 管理后台:https://github.com/apache/rocketmq-externals
### User Guide
- rocketmq原理与实践 http://wely.iteye.com/blog/2392089
- 每个主题可设置队列个数,默认4个,需要顺序消费的消息发往同一队列,比如同一订单号相关的几条需要顺序消费的消息发往同一队列,
顺序消费的特点的是,不会有两个消费者共同消费任一队列,且当消费者数量小于队列数时,消费者会消费多个队列。至于消息重复,在消
费端处理。事务消息可参考rocketmq的做法,也可直接用本地事务。
- Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到
所有队列中,最终效果就是所有消息都平均落在每个Broker上。
- ConsumerQueue相当于CommitLog的索引文件,消费者消费时会先从ConsumerQueue中查找消息的在commitLog中的offset,再去
CommitLog中找元数据。如果某个消息只在CommitLog中有数据,没在ConsumerQueue中, 则消费者无法消费,RocketMQ的事务消息就
是这个原理。
Other MQ: https://github.com/javahongxi/whatsmars/blob/master/whatsmars-mq/othermq.md
......@@ -66,12 +66,6 @@
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
......
package org.hongxi.whatsmars.mq.zeromq;
import org.zeromq.ZMQ;
public class Publisher {
public static void main(String args[]) {
ZMQ.Context context = ZMQ.context(1); // 创建包含一个I/O线程的context
ZMQ.Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5555");
while (!Thread.currentThread ().isInterrupted()) {
String message = "toutiao hello";
publisher.send(message.getBytes());
System.out.println("sent : " + message);
}
publisher.close();
context.term();
}
}
\ No newline at end of file
package org.hongxi.whatsmars.mq.zeromq;
import org.zeromq.ZMQ;
public class Subscriber {
public static void main(String args[]) {
for (int j = 0; j < 10; j++) {
new Thread(new Runnable(){
public void run() {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://127.0.0.1:5555");
subscriber.subscribe("toutiao".getBytes());
try {
while (true) {
byte[] message = subscriber.recv();
System.out.println(Thread.currentThread().getName() + " receive : " + new String(message));
}
} finally {
subscriber.close();
context.term();
}
}
}).start();
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册