From 9edf6dff870d7b932b11b8d30fd1bda08107ffb7 Mon Sep 17 00:00:00 2001 From: ruozhuliufeng <18852971173@63.com> Date: Thu, 18 May 2023 16:20:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=BF=87=E6=BB=A4=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=8F=91=E9=80=81=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../filter/consumer/FilterBySQLConsumer.java | 45 +++++++++++++++++++ .../filter/consumer/FilterByTagConsumer.java | 44 ++++++++++++++++++ .../filter/producer/FilterBySQLProducer.java | 28 ++++++++++++ .../filter/producer/FilterByTagProducer.java | 29 ++++++++++++ 4 files changed, 146 insertions(+) create mode 100644 rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterBySQLConsumer.java create mode 100644 rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterByTagConsumer.java create mode 100644 rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java create mode 100644 rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterBySQLConsumer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterBySQLConsumer.java new file mode 100644 index 0000000..607a610 --- /dev/null +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterBySQLConsumer.java @@ -0,0 +1,45 @@ +package tech.msop.test.rocketmq.filter.consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import tech.msop.test.rocketmq.constants.RocketMQConstants; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * SQL过滤消费者 + */ +public class FilterBySQLConsumer { + public static void main(String[] args) throws MQClientException { + // 定义一个push消费者 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); + // 指定NameServer + consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 指定从第一条消息开始消费 + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + // 指定消费Topic与Tag + consumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6")); + // 注册消息监听器 + // 一旦Broker中有其订阅的消息就会触发该方法的执行, + // 其返回值为当前consumer消费的状态 + consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { + // 逐条消费消息 + for (MessageExt msg : list) { + // 输出消息被消费的时间 + System.out.println("消费时间:"+new SimpleDateFormat("mm:ss").format(new Date())); + System.out.println("消息信息:"+msg); + } + // 返回消费状态:消费成功 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + // 开启消费者消费 + consumer.start(); + System.out.println("consumer started"); + } +} diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterByTagConsumer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterByTagConsumer.java new file mode 100644 index 0000000..137cb82 --- /dev/null +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/consumer/FilterByTagConsumer.java @@ -0,0 +1,44 @@ +package tech.msop.test.rocketmq.filter.consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import tech.msop.test.rocketmq.constants.RocketMQConstants; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Tag过滤消费者 + */ +public class FilterByTagConsumer { + public static void main(String[] args) throws MQClientException { + // 定义一个push消费者 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); + // 指定NameServer + consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 指定从第一条消息开始消费 + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + // 指定消费Topic与Tag + consumer.subscribe("TopicTest", "myTagA || myTagB"); + // 注册消息监听器 + // 一旦Broker中有其订阅的消息就会触发该方法的执行, + // 其返回值为当前consumer消费的状态 + consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { + // 逐条消费消息 + for (MessageExt msg : list) { + // 输出消息被消费的时间 + System.out.println("消费时间:"+new SimpleDateFormat("mm:ss").format(new Date())); + System.out.println("消息信息:"+msg); + } + // 返回消费状态:消费成功 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + // 开启消费者消费 + consumer.start(); + System.out.println("consumer started"); + } +} diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java new file mode 100644 index 0000000..3d5226c --- /dev/null +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java @@ -0,0 +1,28 @@ +package tech.msop.test.rocketmq.filter.producer; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import tech.msop.test.rocketmq.constants.RocketMQConstants; + +/** + * SQL过滤Producer + */ +public class FilterBySQLProducer { + public static void main(String[] args) throws Exception { + // 创建一个Producer,参数为Producer Group名称 + DefaultMQProducer producer = new DefaultMQProducer("pg"); + // 指定NameServer地址 + producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 开启Producer + producer.start(); + for (int i = 0; i < 10; i++) { + byte[] body = ("Hi Filter SQL Msg " + i).getBytes(); + Message msg = new Message("TopicTest", "myTag", body); + msg.putUserProperty("age", String.valueOf(i)); + SendResult sendResult = producer.send(msg); + System.out.println("发送结果:" + sendResult); + } + producer.shutdown(); + } +} diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java new file mode 100644 index 0000000..89d6dfc --- /dev/null +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java @@ -0,0 +1,29 @@ +package tech.msop.test.rocketmq.filter.producer; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import tech.msop.test.rocketmq.constants.RocketMQConstants; + +/** + * Tag过滤Producer + */ +public class FilterByTagProducer { + public static void main(String[] args) throws Exception { + // 创建一个Producer,参数为Producer Group名称 + DefaultMQProducer producer = new DefaultMQProducer("pg"); + // 指定NameServer地址 + producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 开启Producer + producer.start(); + String[] tags = {"myTagA", "myTagB", "myTagC"}; + for (int i = 0; i < 10; i++) { + byte[] body = ("Hi Filter Tag Msg " + i).getBytes(); + String tag = tags[i % tags.length]; + Message msg = new Message("TopicTest", tag, body); + SendResult sendResult = producer.send(msg); + System.out.println("发送结果:" + sendResult); + } + producer.shutdown(); + } +} -- GitLab