提交 9edf6dff 编写于 作者: R ruozhuliufeng

新增过滤消息发送示例

上级 0b350f78
package tech.msop.test.rocketmq.filter.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* SQL过滤消费者
*/
public class FilterBySQLConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 指定NameServer
consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费Topic与Tag
consumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));
// 注册消息监听器
// 一旦Broker中有其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 逐条消费消息
for (MessageExt msg : list) {
// 输出消息被消费的时间
System.out.println("消费时间:"+new SimpleDateFormat("mm:ss").format(new Date()));
System.out.println("消息信息:"+msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 开启消费者消费
consumer.start();
System.out.println("consumer started");
}
}
package tech.msop.test.rocketmq.filter.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Tag过滤消费者
*/
public class FilterByTagConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 指定NameServer
consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费Topic与Tag
consumer.subscribe("TopicTest", "myTagA || myTagB");
// 注册消息监听器
// 一旦Broker中有其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 逐条消费消息
for (MessageExt msg : list) {
// 输出消息被消费的时间
System.out.println("消费时间:"+new SimpleDateFormat("mm:ss").format(new Date()));
System.out.println("消息信息:"+msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 开启消费者消费
consumer.start();
System.out.println("consumer started");
}
}
package tech.msop.test.rocketmq.filter.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
/**
* SQL过滤Producer
*/
public class FilterBySQLProducer {
public static void main(String[] args) throws Exception {
// 创建一个Producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 开启Producer
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi Filter SQL Msg " + i).getBytes();
Message msg = new Message("TopicTest", "myTag", body);
msg.putUserProperty("age", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);
}
producer.shutdown();
}
}
package tech.msop.test.rocketmq.filter.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
/**
* Tag过滤Producer
*/
public class FilterByTagProducer {
public static void main(String[] args) throws Exception {
// 创建一个Producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 开启Producer
producer.start();
String[] tags = {"myTagA", "myTagB", "myTagC"};
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi Filter Tag Msg " + i).getBytes();
String tag = tags[i % tags.length];
Message msg = new Message("TopicTest", tag, body);
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);
}
producer.shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册