提交 5bf113a1 编写于 作者: D dongeforever

Polish tests, separate ports, disable log, etc.

上级 5ce20e8a
## Apache RocketMQ [![Build Status](https://travis-ci.org/apache/rocketmq.svg?branch=master)](https://travis-ci.org/apache/rocketmq) [![Coverage Status](https://coveralls.io/repos/github/apache/rocketmq/badge.svg?branch=master)](https://coveralls.io/github/apache/rocketmq?branch=master) ## Apache RocketMQ [![Build Status](https://travis-ci.org/apache/rocketmq.svg?branch=master)](https://travis-ci.org/apache/rocketmq) [![Coverage Status](https://coveralls.io/repos/github/apache/rocketmq/badge.svg?branch=master)](https://coveralls.io/github/apache/rocketmq?branch=master)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.rocketmq/rocketmq-all/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Corg.apache.rocketmq) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.rocketmq/rocketmq-all/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Corg.apache.rocketmq)
[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://rocketmq.apache.org/dowloading/releases) [![GitHub release](https://img.shields.io/badge/release-download-orange.svg)]
(https://rocketmq.apache.org/dowloading/releases)
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
**[Apache RocketMQ](https://rocketmq.apache.org) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.** **[Apache RocketMQ](https://rocketmq.apache.org) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.**
...@@ -47,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu ...@@ -47,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu
---------- ----------
## License ## License
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
...@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.MixAll; ...@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
...@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; ...@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.filter.FilterFactory; import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
...@@ -548,10 +548,10 @@ public class MQClientInstance { ...@@ -548,10 +548,10 @@ public class MQClientInstance {
} }
} catch (Exception e) { } catch (Exception e) {
if (this.isBrokerInNameServer(addr)) { if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr); log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else { } else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr); id, addr, e);
} }
} }
} }
......
...@@ -42,9 +42,9 @@ public class DefaultMessageStoreShutDownTest { ...@@ -42,9 +42,9 @@ public class DefaultMessageStoreShutDownTest {
public void init() throws Exception { public void init() throws Exception {
messageStore = spy(buildMessageStore()); messageStore = spy(buildMessageStore());
boolean load = messageStore.load(); boolean load = messageStore.load();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
assertTrue(load); assertTrue(load);
messageStore.start(); messageStore.start();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
} }
@Test @Test
......
...@@ -115,7 +115,7 @@ public class MessageStoreTestBase extends StoreTestBase { ...@@ -115,7 +115,7 @@ public class MessageStoreTestBase extends StoreTestBase {
protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) { protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, beginLogicsOffset + i, 3, null); GetMessageResult getMessageResult = messageStore.getMessage("group", topic, queueId, beginLogicsOffset + i, 3, null);
Assert.assertNotNull(getMessageResult); Assert.assertNotNull(getMessageResult);
Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty()); Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty());
MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0)); MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0));
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
</logger> </logger>
<root level="ERROR"> <root level="OFF">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
</root> </root>
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.factory; package org.apache.rocketmq.test.factory;
import java.util.UUID;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
...@@ -64,6 +65,7 @@ public class ConsumerFactory { ...@@ -64,6 +65,7 @@ public class ConsumerFactory {
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception { public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString());
defaultMQPullConsumer.setNamesrvAddr(nsAddr); defaultMQPullConsumer.setNamesrvAddr(nsAddr);
defaultMQPullConsumer.start(); defaultMQPullConsumer.start();
return defaultMQPullConsumer; return defaultMQPullConsumer;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.factory; package org.apache.rocketmq.test.factory;
import java.util.UUID;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.test.util.RandomUtil; import org.apache.rocketmq.test.util.RandomUtil;
...@@ -25,6 +26,7 @@ public class ProducerFactory { ...@@ -25,6 +26,7 @@ public class ProducerFactory {
public static DefaultMQProducer getRMQProducer(String ns) { public static DefaultMQProducer getRMQProducer(String ns) {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID()); DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
producer.setInstanceName(UUID.randomUUID().toString());
producer.setNamesrvAddr(ns); producer.setNamesrvAddr(ns);
try { try {
producer.start(); producer.start();
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.test.util; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.test.util;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
...@@ -40,6 +41,7 @@ public class MQAdmin { ...@@ -40,6 +41,7 @@ public class MQAdmin {
int queueNum, int waitTimeSec) { int queueNum, int waitTimeSec) {
boolean createResult = false; boolean createResult = false;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setInstanceName(UUID.randomUUID().toString());
mqAdminExt.setNamesrvAddr(nameSrvAddr); mqAdminExt.setNamesrvAddr(nameSrvAddr);
try { try {
mqAdminExt.start(); mqAdminExt.start();
......
...@@ -39,7 +39,7 @@ public class BaseConf { ...@@ -39,7 +39,7 @@ public class BaseConf {
protected static String clusterName; protected static String clusterName;
protected static int brokerNum; protected static int brokerNum;
protected static int waitTime = 5; protected static int waitTime = 5;
protected static int consumeTime = 5 * 60 * 1000; protected static int consumeTime = 2 * 60 * 1000;
protected static NamesrvController namesrvController; protected static NamesrvController namesrvController;
protected static BrokerController brokerController1; protected static BrokerController brokerController1;
protected static BrokerController brokerController2; protected static BrokerController brokerController2;
......
...@@ -50,10 +50,10 @@ public class IntegrationTestBase { ...@@ -50,10 +50,10 @@ public class IntegrationTestBase {
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100; protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100;
protected static final int INDEX_NUM = 1000; protected static final int INDEX_NUM = 1000;
private static final AtomicInteger port = new AtomicInteger(50000); private static final AtomicInteger port = new AtomicInteger(40000);
public static synchronized int nextPort() { public static synchronized int nextPort() {
return port.addAndGet(5); return port.addAndGet(random.nextInt(10) + 10);
} }
protected static Random random = new Random(); protected static Random random = new Random();
...@@ -110,7 +110,7 @@ public class IntegrationTestBase { ...@@ -110,7 +110,7 @@ public class IntegrationTestBase {
namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json"); namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties"); namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties");
nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000)); nameServerNettyServerConfig.setListenPort(nextPort());
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
try { try {
Assert.assertTrue(namesrvController.initialize()); Assert.assertTrue(namesrvController.initialize());
......
...@@ -19,12 +19,13 @@ import org.apache.rocketmq.test.base.IntegrationTestBase; ...@@ -19,12 +19,13 @@ import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory; import org.apache.rocketmq.test.factory.ProducerFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort; import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort;
import static sun.util.locale.BaseLocale.SEP; import static sun.util.locale.BaseLocale.SEP;
public class ProduceAndConsumeTest { public class DLedgerProduceAndConsumeIT {
public BrokerConfig buildBrokerConfig(String cluster, String brokerName) { public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
BrokerConfig brokerConfig = new BrokerConfig(); BrokerConfig brokerConfig = new BrokerConfig();
...@@ -58,7 +59,7 @@ public class ProduceAndConsumeTest { ...@@ -58,7 +59,7 @@ public class ProduceAndConsumeTest {
BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName); BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName);
MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId); MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId);
BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig); BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
Thread.sleep(1000); Thread.sleep(3000);
Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole()); Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole());
...@@ -97,6 +98,8 @@ public class ProduceAndConsumeTest { ...@@ -97,6 +98,8 @@ public class ProduceAndConsumeTest {
Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody()); Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody());
} }
producer.shutdown();
consumer.shutdown();
brokerController.shutdown(); brokerController.shutdown();
} }
} }
...@@ -29,10 +29,15 @@ import org.apache.rocketmq.test.util.TestUtils; ...@@ -29,10 +29,15 @@ import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils; import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
/**
* Currently, dose not support the ordered broadcast message
*/
@Ignore
public class OrderMsgBroadCastIT extends BaseBroadCastIT { public class OrderMsgBroadCastIT extends BaseBroadCastIT {
private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class); private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class);
private RMQNormalProducer producer = null; private RMQNormalProducer producer = null;
......
...@@ -51,7 +51,8 @@ public class NormalMsgDelayIT extends DelayConf { ...@@ -51,7 +51,8 @@ public class NormalMsgDelayIT extends DelayConf {
} }
@Test @Test
public void testDelayLevell() { public void testDelayLevel1() throws Exception {
Thread.sleep(3000);
int delayLevel = 1; int delayLevel = 1;
List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize); List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize);
producer.send(delayMsgs); producer.send(delayMsgs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册