提交 57f5fdb3 编写于 作者: R ruozhuliufeng

新增延迟消息发送示例

上级 74f7a0d6
package tech.msop.test.rocketmq.delay.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 延时消息消费者
*/
public class DelayConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 指定NameServer
consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费Topic与Tag
consumer.subscribe("TopicB", "*");
// 注册消息监听器
// 一旦Broker中有其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 逐条消费消息
for (MessageExt msg : list) {
// 输出消息被消费的时间
System.out.println("消费时间:"+new SimpleDateFormat("mm:ss").format(new Date()));
System.out.println("消息信息:"+msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 开启消费者消费
consumer.start();
System.out.println("consumer started");
}
}
package tech.msop.test.rocketmq.delay.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 延时消息生产者
*/
public class DelayProducer {
public static void main(String[] args) throws Exception {
// 创建一个Producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 开启Producer
producer.start();
for (int i = 0; i < 100; i++) {
Integer orderId = i;
byte[] body = ("Hi, Delay Msg "+i).getBytes();
Message msg = new Message("TopicB","TagB",body);
// 指定消息延迟时间等级为3级,即延迟10秒
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
System.out.println("发送时间:"+new SimpleDateFormat("mm:ss").format(new Date()));
System.out.println("发送结果:"+sendResult);
}
producer.shutdown();
}
}
package tech.msop.test.rocketmq.ordered.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import tech.msop.test.rocketmq.constants.RocketMQConstants;
import java.util.List;
/**
* 顺序消息
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册