未验证 提交 bd62212e 编写于 作者: E EricZeng 提交者: GitHub

Merge pull request #472 from didi/dev_v2.5.0_addtest

LogiKM增加单元测试和集成测试
......@@ -111,3 +111,4 @@ dist/
dist/*
kafka-manager-web/src/main/resources/templates/
.DS_Store
kafka-manager-console/package-lock.json
---
![kafka-manager-logo](../assets/images/common/logo_name.png)
**一站式`Apache Kafka`集群指标监控与运维管控平台**
---
# LogiKM单元测试和集成测试
## 1、单元测试
### 1.1 单元测试介绍
单元测试又称模块测试,是针对软件设计的最小单位——程序模块进行正确性检验的测试工作。
其目的在于检查每个程序单元能否正确实现详细设计说明中的模块功能、性能、接口和设计约束等要求,
发现各模块内部可能存在的各种错误。单元测试需要从程序的内部结构出发设计测试用例。
多个模块可以平行地独立进行单元测试。
### 1.2 LogiKM单元测试思路
LogiKM单元测试思路主要是测试Service层的方法,通过罗列方法的各种参数,
判断方法返回的结果是否符合预期。单元测试的基类加了@SpringBootTest注解,即每次运行单测用例都启动容器
### 1.3 LogiKM单元测试注意事项
1. 单元测试用例在kafka-manager-core以及kafka-manager-extends下的test包中
2. 配置在resources/application.yml,包括运行单元测试用例启用的数据库配置等等
3. 编译打包项目时,加上参数-DskipTests可不执行测试用例,例如使用命令行mvn -DskipTests进行打包
## 2、集成测试
### 2.1 集成测试介绍
集成测试又称组装测试,是一种黑盒测试。通常在单元测试的基础上,将所有的程序模块进行有序的、递增的测试。
集成测试是检验程序单元或部件的接口关系,逐步集成为符合概要设计要求的程序部件或整个系统。
### 2.2 LogiKM集成测试思路
LogiKM集成测试主要思路是对Controller层的接口发送Http请求。
通过罗列测试用例,模拟用户的操作,对接口发送Http请求,判断结果是否达到预期。
本地运行集成测试用例时,无需加@SpringBootTest注解(即无需每次运行测试用例都启动容器)
### 2.3 LogiKM集成测试注意事项
1. 集成测试用例在kafka-manager-web的test包下
2. 因为对某些接口发送Http请求需要先登陆,比较麻烦,可以绕过登陆,方法可见教程见docs -> user_guide -> call_api_bypass_login
3. 集成测试的配置在resources/integrationTest-settings.properties文件下,包括集群地址,zk地址的配置等等
4. 如果需要运行集成测试用例,需要本地先启动LogiKM项目
5. 编译打包项目时,加上参数-DskipTests可不执行测试用例,例如使用命令行mvn -DskipTests进行打包
\ No newline at end of file
......@@ -82,15 +82,15 @@ public class Result<T> implements Serializable {
return JSON.toJSONString(this);
}
public static Result buildSuc() {
Result result = new Result();
public static <T> Result<T> buildSuc() {
Result<T> result = new Result<>();
result.setCode(ResultStatus.SUCCESS.getCode());
result.setMessage(ResultStatus.SUCCESS.getMessage());
return result;
}
public static <T> Result<T> buildSuc(T data) {
Result<T> result = new Result<T>();
Result<T> result = new Result<>();
result.setCode(ResultStatus.SUCCESS.getCode());
result.setMessage(ResultStatus.SUCCESS.getMessage());
result.setData(data);
......@@ -98,7 +98,7 @@ public class Result<T> implements Serializable {
}
public static <T> Result<T> buildGatewayFailure(String message) {
Result<T> result = new Result<T>();
Result<T> result = new Result<>();
result.setCode(ResultStatus.GATEWAY_INVALID_REQUEST.getCode());
result.setMessage(message);
result.setData(null);
......@@ -106,22 +106,22 @@ public class Result<T> implements Serializable {
}
public static <T> Result<T> buildFailure(String message) {
Result<T> result = new Result<T>();
Result<T> result = new Result<>();
result.setCode(ResultStatus.FAIL.getCode());
result.setMessage(message);
result.setData(null);
return result;
}
public static Result buildFrom(ResultStatus resultStatus) {
Result result = new Result();
public static <T> Result<T> buildFrom(ResultStatus resultStatus) {
Result<T> result = new Result<>();
result.setCode(resultStatus.getCode());
result.setMessage(resultStatus.getMessage());
return result;
}
public static Result buildFrom(ResultStatus resultStatus, Object data) {
Result result = new Result();
public static <T> Result<T> buildFrom(ResultStatus resultStatus, T data) {
Result<T> result = new Result<>();
result.setCode(resultStatus.getCode());
result.setMessage(resultStatus.getMessage());
result.setData(data);
......
......@@ -118,10 +118,7 @@ public class LogicalClusterDTO {
}
public boolean legal() {
if (ValidateUtils.isNull(clusterId)
|| ValidateUtils.isNull(clusterId)
|| ValidateUtils.isEmptyList(regionIdList)
|| ValidateUtils.isNull(mode)) {
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(regionIdList) || ValidateUtils.isNull(mode)) {
return false;
}
if (!ClusterModeEnum.SHARED_MODE.getCode().equals(mode) && ValidateUtils.isNull(appId)) {
......
......@@ -94,10 +94,7 @@ public class RegionDTO {
}
public boolean legal() {
if (ValidateUtils.isNull(clusterId)
|| ValidateUtils.isNull(clusterId)
|| ValidateUtils.isEmptyList(brokerIdList)
|| ValidateUtils.isNull(status)) {
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(brokerIdList) || ValidateUtils.isNull(status)) {
return false;
}
description = ValidateUtils.isNull(description)? "": description;
......
......@@ -13,6 +13,7 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
......@@ -81,16 +82,19 @@ public class SpringTool implements ApplicationContextAware, DisposableBean {
}
public static String getUserName(){
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String username = null;
if (TrickLoginConstant.TRICK_LOGIN_SWITCH_ON.equals(request.getHeader(TrickLoginConstant.TRICK_LOGIN_SWITCH))) {
// trick登录方式的获取用户
username = request.getHeader(TrickLoginConstant.TRICK_LOGIN_USER);
} else {
// 走页面登录方式登录的获取用户
HttpSession session = request.getSession();
username = (String) session.getAttribute(LoginConstant.SESSION_USERNAME_KEY);
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
if (!ValidateUtils.isNull(requestAttributes)) {
HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest();
if (TrickLoginConstant.TRICK_LOGIN_SWITCH_ON.equals(request.getHeader(TrickLoginConstant.TRICK_LOGIN_SWITCH))) {
// trick登录方式的获取用户
username = request.getHeader(TrickLoginConstant.TRICK_LOGIN_USER);
} else {
// 走页面登录方式登录的获取用户
HttpSession session = request.getSession();
username = (String) session.getAttribute(LoginConstant.SESSION_USERNAME_KEY);
}
}
if (ValidateUtils.isNull(username)) {
......
......@@ -29,10 +29,10 @@ public class TopicQuotaData {
public static TopicQuotaData getClientData(Long producerByteRate, Long consumerByteRate) {
TopicQuotaData clientData = new TopicQuotaData();
if (!ValidateUtils.isNull(producerByteRate) && consumerByteRate != -1) {
if (!ValidateUtils.isNull(consumerByteRate) && consumerByteRate != -1) {
clientData.setConsumer_byte_rate(consumerByteRate.toString());
}
if (!ValidateUtils.isNull(consumerByteRate) && producerByteRate != -1) {
if (!ValidateUtils.isNull(producerByteRate) && producerByteRate != -1) {
clientData.setProducer_byte_rate(producerByteRate.toString());
}
return clientData;
......
......@@ -95,5 +95,23 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- testng -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.testng/testng -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -42,6 +42,13 @@ public interface ConsumerService {
*/
List<String> getConsumerGroupConsumedTopicList(Long clusterId, String consumerGroup, String location);
/**
* 获取消费者offset
* @param clusterDO 集群
* @param topicName topic
* @param consumerGroup 消费组
* @return Map<partitionId, offset>
*/
Map<Integer, Long> getConsumerOffset(ClusterDO clusterDO, String topicName, ConsumerGroup consumerGroup);
/**
......@@ -52,7 +59,20 @@ public interface ConsumerService {
ConsumerGroup consumerGroup,
List<PartitionOffsetDTO> partitionOffsetDTOList);
/**
* 获取每个集群消费组的个数
* @param clusterDOList 物理集群列表
* @return Map<clusterId, consumerGroupNums>
*/
Map<Long, Integer> getConsumerGroupNumMap(List<ClusterDO> clusterDOList);
/**
* 验证消费组是否存在
* @param offsetLocation offset存放位置
* @param id 集群id
* @param topicName topic
* @param consumerGroup 消费组
* @return true:存在,false:不存在
*/
boolean checkConsumerGroupExist(OffsetLocationEnum offsetLocation, Long id, String topicName, String consumerGroup);
}
......@@ -54,12 +54,12 @@ public interface RegionService {
Map<Integer, RegionDO> convert2BrokerIdRegionMap(List<RegionDO> regionDOList);
/**
* 更新逻辑集群容量
* @param clusterId 集群id
* 根据RegionId更新Region
* @param regionId region的id
* @param newBrokerList 新的broker列表
* @return ResultStatus
*/
ResultStatus updateRegion(Long clusterId, String newBrokerList);
ResultStatus updateRegion(Long regionId, String newBrokerList);
/**
* 获取空闲的region的broker列表
......
......@@ -104,6 +104,13 @@ public interface TopicService {
*/
List<TopicBrokerDTO> getTopicBrokerList(Long clusterId, String topicName);
/**
* 判断topic是否有数据写入,即分区topic的offset变化
* @param physicalClusterId 物理集群Id
* @param topicName topic名称
* @param latestTime 离当前多久开始计算
* @return
*/
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
}
......@@ -159,7 +159,7 @@ public class ConsumerServiceImpl implements ConsumerService {
if (topicMetadata == null) {
logger.warn("class=ConsumerServiceImpl||method=getConsumeDetail||clusterId={}||topicName={}||msg=topicMetadata is null!",
clusterDO.getId(), topicName);
return null;
return Collections.emptyList();
}
List<ConsumeDetailDTO> consumerGroupDetailDTOList = null;
......@@ -170,7 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
}
if (consumerGroupDetailDTOList == null) {
logger.info("class=ConsumerServiceImpl||method=getConsumeDetail||msg=consumerGroupDetailDTOList is null!");
return null;
return Collections.emptyList();
}
Map<TopicPartition, Long> topicPartitionLongMap = topicService.getPartitionOffset(clusterDO, topicName, OffsetPosEnum.END);
......@@ -317,9 +317,6 @@ public class ConsumerServiceImpl implements ConsumerService {
String consumerGroup) {
Map<Integer, String> stringOffsetMap =
getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroup, topicName);
if (ValidateUtils.isNull(stringOffsetMap)) {
return new HashMap<>(0);
}
Map<Integer, Long> offsetMap = new HashMap<>(stringOffsetMap.size());
for (Map.Entry<Integer, String> entry: stringOffsetMap.entrySet()) {
......
......@@ -167,9 +167,11 @@ public class JmxServiceImpl implements JmxService {
if (ValidateUtils.isNull(jmxConnectorWrap)|| !jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) {
return null;
}
KafkaVersion kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId);
TopicMetrics metrics = new TopicMetrics(clusterId, topicName);
for (MbeanV2 mbeanV2: mbeanV2List) {
KafkaVersion kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId);
try {
getAndSupplyAttributes2BaseMetrics(
metrics,
......
......@@ -138,11 +138,11 @@ public class RegionServiceImpl implements RegionService {
@Override
public ResultStatus updateRegion(Long clusterId, String newBrokerList) {
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isExistBlank(newBrokerList)) {
public ResultStatus updateRegion(Long regionId, String newBrokerList) {
if (ValidateUtils.isNull(regionId) || ValidateUtils.isExistBlank(newBrokerList)) {
return ResultStatus.PARAM_ILLEGAL;
}
RegionDO regionDO = getById(clusterId);
RegionDO regionDO = getById(regionId);
if (ValidateUtils.isNull(regionDO)) {
return ResultStatus.CLUSTER_NOT_EXIST;
}
......
......@@ -419,6 +419,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
authorityDO.setTopicName(topicName);
authorityDO.setAccess(TopicAuthorityEnum.READ_WRITE.getCode());
authorityService.addAuthority(authorityDO);
return ResultStatus.SUCCESS;
} catch (Exception e) {
LOGGER.error("modify topic failed, clusterId:{} topicName:{} description:{} operator:{} ",
clusterId, topicName, description, operator, e);
......@@ -631,7 +632,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
// 该用户无应用,需要先申请应用
return ResultStatus.APP_NOT_EXIST;
}
List<Long> appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList());
List<String> appIds = appDOs.stream().map(AppDO::getAppId).collect(Collectors.toList());
if (!appIds.contains(authorityDO.getAppId())) {
// 入参中的appId,该用户未拥有
return ResultStatus.APP_NOT_EXIST;
......
......@@ -250,11 +250,11 @@ public class TopicServiceImpl implements TopicService {
@Override
public List<TopicPartitionDTO> getTopicPartitionDTO(ClusterDO clusterDO, String topicName, Boolean needDetail) {
if (ValidateUtils.isNull(clusterDO) || ValidateUtils.isNull(topicName)) {
return null;
return new ArrayList<>();
}
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName);
if (ValidateUtils.isNull(topicMetadata)) {
return null;
return new ArrayList<>();
}
List<PartitionState> partitionStateList = KafkaZookeeperUtils.getTopicPartitionState(
......@@ -419,9 +419,6 @@ public class TopicServiceImpl implements TopicService {
topicDO,
appDO
);
if (ValidateUtils.isNull(overview)) {
continue;
}
dtoList.add(overview);
}
......@@ -531,7 +528,7 @@ public class TopicServiceImpl implements TopicService {
public List<PartitionOffsetDTO> getPartitionOffsetList(ClusterDO clusterDO, String topicName, Long timestamp) {
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName);
if (topicMetadata == null) {
return null;
return new ArrayList<>();
}
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (Integer partitionId : topicMetadata.getPartitionMap().getPartitions().keySet()) {
......@@ -575,7 +572,7 @@ public class TopicServiceImpl implements TopicService {
kafkaConsumer.close();
}
}
return null;
return new ArrayList<>();
}
private List<String> fetchTopicData(KafkaConsumer kafkaConsumer, ClusterDO clusterDO, String topicName, TopicDataSampleDTO reqObj) {
......@@ -588,7 +585,7 @@ public class TopicServiceImpl implements TopicService {
tpList.add(new TopicPartition(topicName, partitionId));
}
if (ValidateUtils.isEmptyList(tpList)) {
return null;
return new ArrayList<>();
}
kafkaConsumer.assign(tpList);
......
......@@ -92,15 +92,15 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy {
return HEALTH_SCORE_BAD;
}
Object RequestHandlerAvgIdlePercentOneMinuteRate = metrics.getMetricsMap().get("RequestHandlerAvgIdlePercentOneMinuteRate");
Object NetworkProcessorAvgIdlePercentValue = metrics.getMetricsMap().get("NetworkProcessorAvgIdlePercentValue");
if (ValidateUtils.isNull(RequestHandlerAvgIdlePercentOneMinuteRate)
|| ValidateUtils.isNull(NetworkProcessorAvgIdlePercentValue)) {
Object requestHandlerAvgIdlePercentOneMinuteRate = metrics.getMetricsMap().get("RequestHandlerAvgIdlePercentOneMinuteRate");
Object networkProcessorAvgIdlePercentValue = metrics.getMetricsMap().get("NetworkProcessorAvgIdlePercentValue");
if (ValidateUtils.isNull(requestHandlerAvgIdlePercentOneMinuteRate)
|| ValidateUtils.isNull(networkProcessorAvgIdlePercentValue)) {
// 数据获取失败
return Constant.INVALID_CODE;
}
if (((Double) RequestHandlerAvgIdlePercentOneMinuteRate) < MIN_IDLE * KAFKA_REQUEST_HANDLER_POOL_SIZE
|| ((Double) NetworkProcessorAvgIdlePercentValue) < MIN_IDLE) {
if (((Double) requestHandlerAvgIdlePercentOneMinuteRate) < MIN_IDLE * KAFKA_REQUEST_HANDLER_POOL_SIZE
|| ((Double) networkProcessorAvgIdlePercentValue) < MIN_IDLE) {
return HEALTH_SCORE_NORMAL;
}
return HEALTH_SCORE_HEALTHY;
......@@ -117,7 +117,7 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy {
return Constant.INVALID_CODE;
}
List<Integer> brokerIdList = new ArrayList<>(metadata.getBrokerIdSet().size());
List<Integer> brokerIdList = new ArrayList<>(metadata.getBrokerIdSet());
FutureTask<Integer>[] taskList = new FutureTask[brokerIdList.size()];
for (int i = 0; i < brokerIdList.size(); ++i) {
......
package com.xiaojukeji.kafka.manager.service;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.when;
public class DemoTest extends BaseTest {
@Autowired
private ClusterService clusterService;
@Mock
private AppService appServiceMock;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void test() {
Assert.assertNull(clusterService.getById(100L));
}
@Test
public void testMock() {
when(appServiceMock.getByAppId("100")).thenReturn(new AppDO());
Assert.assertNotNull(appServiceMock.getByAppId("100"));
}
}
package com.xiaojukeji.kafka.manager.service;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.junit.Test;
public class FutureTest {
@Test
public void test() throws InterruptedException, ExecutionException {
FutureTask<Integer> f1 = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(1000L);
return 1;
}
});
FutureTask<Integer> f2 = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(1000L);
return 2;
}
});
ExecutorService threadPool = Executors.newCachedThreadPool();
long ct = System.currentTimeMillis();
threadPool.submit(f1);
threadPool.submit(f2);
threadPool.shutdown();
System.out.println(f1.get() + " : " + f2.get() + " use:"
+ (System.currentTimeMillis() - ct));
}
}
package com.xiaojukeji.kafka.manager.service.config;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.testng.AbstractTransactionalTestNGSpringContextTests;
@SpringBootTest(classes = CoreSpringBootStartUp.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ContextConfiguration(classes = CoreSpringBootStartUp.class)
public class BaseTest extends AbstractTransactionalTestNGSpringContextTests {
}
package com.xiaojukeji.kafka.manager.service.config;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableAsync
@EnableScheduling
@ServletComponentScan
@EnableAutoConfiguration
@SpringBootApplication(scanBasePackages = {"com.xiaojukeji.kafka.manager"})
public class CoreSpringBootStartUp {
public static void main(String[] args) {
SpringApplication sa = new SpringApplication(CoreSpringBootStartUp.class);
sa.run(args);
}
}
package com.xiaojukeji.kafka.manager.service.config;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @author zengqiao
* @date 20/3/17
*/
@Configuration
public class DataSourceConfig {
@Bean(name = "dataSource")
@ConfigurationProperties(prefix = "spring.datasource.kafka-manager")
@Primary
public DataSource dataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
bean.setConfigLocation(new PathMatchingResourcePatternResolver().getResource("classpath:mybatis-config.xml"));
return bean.getObject();
}
@Bean(name = "transactionManager")
@Primary
public DataSourceTransactionManager transactionManager(@Qualifier("dataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "sqlSession")
@Primary
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.exception.ConfigException;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
/**
* @author xuguang
* @Date 2021/12/24
*/
public class AdminServiceTest extends BaseTest {
/**
* 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上
*/
@Value("${test.topic.name1}")
private String REAL_TOPIC1_IN_ZK;
@Value("${test.topic.name3}")
private String REAL_TOPIC3_IN_ZK;
/**
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
*/
@Value("${test.topic.name2}")
private String REAL_TOPIC2_IN_ZK;
private final static String INVALID_TOPIC = "xxxxx";
private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets";
private final static String CREATE_TOPIC_TEST = "createTopicTest";
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Value("${test.broker.id1}")
private Integer REAL_BROKER_ID_IN_ZK;
private final static Long INVALID_CLUSTER_ID = -1L;
private final static Integer INVALID_PARTITION_ID = -1;
private final static Integer REAL_PARTITION_ID = 0;
private final static Integer INVALID_BROKER_ID = -1;
@Value("${test.app.id}")
private String APP_ID;
private final static Long INVALID_REGION_ID = -1L;
private final static Long REAL_REGION_ID_IN_MYSQL = 1L;
@Value("${test.admin}")
private String ADMIN;
@Value("${test.phyCluster.name}")
private String REAL_PHYSICAL_CLUSTER_NAME;
@Value("${test.ZK.address}")
private String ZOOKEEPER_ADDRESS;
@Value("${test.ZK.bootstrap-servers}")
private String BOOTSTRAP_SERVERS;
private final static String SECURITY_PROTOCOL = "{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }";
// 优先副本节点在zk上的路径
private final static String ZK_NODE_PATH_PREFERRED = "/admin/preferred_replica_election";
// 创建的topic节点在zk上的路径;brokers节点下的
private final static String ZK_NODE_PATH_BROKERS_TOPIC = "/brokers/topics/createTopicTest";
// config节点下的
private final static String ZK_NODE_PATH_CONFIG_TOPIC = "/config/topics/createTopicTest";
@Autowired
private AdminService adminService;
@Autowired
private TopicManagerService topicManagerService;
private TopicDO getTopicDO() {
TopicDO topicDO = new TopicDO();
topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicDO.setTopicName(CREATE_TOPIC_TEST);
topicDO.setAppId(APP_ID);
topicDO.setDescription(CREATE_TOPIC_TEST);
topicDO.setPeakBytesIn(100000L);
return topicDO;
}
public ClusterDO getClusterDO() {
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
clusterDO.setClusterName(REAL_PHYSICAL_CLUSTER_NAME);
clusterDO.setZookeeper(ZOOKEEPER_ADDRESS);
clusterDO.setBootstrapServers(BOOTSTRAP_SERVERS);
clusterDO.setSecurityProperties(SECURITY_PROTOCOL);
clusterDO.setStatus(1);
clusterDO.setGmtCreate(new Date());
clusterDO.setGmtModify(new Date());
return clusterDO;
}
@Test(description = "测试创建topic")
public void createTopicTest() throws ConfigException {
// broker not exist
createTopic2BrokerNotExistTest();
// success to create topic
createTopic2SuccessTest();
// failure to create topic, topic already exists
createTopic2FailureTest();
// 创建成功后,数据库和zk中会存在该Topic,需要删除防止影响后面测试
// 写入数据库的整个Test结束后回滚,因此只用删除zk上的topic节点
ZkConfigImpl zkConfig = new ZkConfigImpl(ZOOKEEPER_ADDRESS);
zkConfig.delete(ZK_NODE_PATH_BROKERS_TOPIC);
zkConfig.delete(ZK_NODE_PATH_CONFIG_TOPIC);
zkConfig.close();
}
private void createTopic2BrokerNotExistTest() {
TopicDO topicDO = getTopicDO();
ClusterDO clusterDO = getClusterDO();
ResultStatus result = adminService.createTopic(
clusterDO,
topicDO,
1,
1,
INVALID_REGION_ID,
Arrays.asList(INVALID_BROKER_ID),
new Properties(),
ADMIN,
ADMIN);
Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode());
}
private void createTopic2FailureTest() {
TopicDO topicDO = getTopicDO();
ClusterDO clusterDO = getClusterDO();
ResultStatus result = adminService.createTopic(
clusterDO,
topicDO,
1,
1,
INVALID_REGION_ID,
Arrays.asList(REAL_BROKER_ID_IN_ZK),
new Properties(),
ADMIN,
ADMIN);
Assert.assertNotEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
private void createTopic2SuccessTest() {
TopicDO topicDO = getTopicDO();
ClusterDO clusterDO = getClusterDO();
ResultStatus result = adminService.createTopic(
clusterDO,
topicDO,
1,
1,
INVALID_REGION_ID,
Arrays.asList(REAL_BROKER_ID_IN_ZK),
new Properties(),
ADMIN,
ADMIN);
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试删除topic")
public void deleteTopicTest() {
// topic does not exist
deleteTopic2FailureTest();
// success to delete
deleteTopic2SuccessTest();
}
private void deleteTopic2FailureTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.deleteTopic(
clusterDO,
INVALID_TOPIC,
ADMIN
);
Assert.assertNotEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
private void deleteTopic2SuccessTest() {
TopicDO topicDO = getTopicDO();
ClusterDO clusterDO = getClusterDO();
ResultStatus result = adminService.createTopic(
clusterDO,
topicDO,
1,
1,
INVALID_REGION_ID,
Arrays.asList(REAL_BROKER_ID_IN_ZK),
new Properties(),
ADMIN,
ADMIN);
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
ResultStatus resultStatus = adminService.deleteTopic(
clusterDO,
CREATE_TOPIC_TEST,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试优先副本选举状态")
public void preferredReplicaElectionStatusTest() throws ConfigException {
// running
// preferredReplicaElectionStatus2RunningTest();
// not running
preferredReplicaElectionStatus2NotRunningTest();
}
private void preferredReplicaElectionStatus2RunningTest() throws ConfigException{
// zk上需要创建/admin/preferred_replica_election节点
ZkConfigImpl zkConfig = new ZkConfigImpl(ZOOKEEPER_ADDRESS);
zkConfig.setOrCreatePersistentNodeStat(ZK_NODE_PATH_PREFERRED, "");
ClusterDO clusterDO = getClusterDO();
TaskStatusEnum taskStatusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
Assert.assertEquals(taskStatusEnum.getCode(), TaskStatusEnum.RUNNING.getCode());
// 删除之前创建的节点,防止影响后续测试
zkConfig.delete(ZK_NODE_PATH_PREFERRED);
zkConfig.close();
}
private void preferredReplicaElectionStatus2NotRunningTest() throws ConfigException {
ClusterDO clusterDO = getClusterDO();
// zk上无/admin/preferred_replica_election节点
TaskStatusEnum taskStatusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
Assert.assertEquals(taskStatusEnum.getCode(), TaskStatusEnum.SUCCEED.getCode());
// 删除创建的节点,防止影响后续测试
ZkConfigImpl zkConfig = new ZkConfigImpl(ZOOKEEPER_ADDRESS);
zkConfig.delete(ZK_NODE_PATH_PREFERRED);
zkConfig.close();
}
@Test(description = "测试集群纬度优先副本选举")
public void preferredReplicaElectionOfCluster2Test() throws ConfigException {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(clusterDO, ADMIN);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
// 删除创建的节点,防止影响后续测试
ZkConfigImpl zkConfig = new ZkConfigImpl(ZOOKEEPER_ADDRESS);
zkConfig.delete(ZK_NODE_PATH_PREFERRED);
zkConfig.close();
}
@Test(description = "Broker纬度优先副本选举")
public void preferredReplicaElectionOfBrokerTest() throws ConfigException {
// 参数异常
preferredReplicaElectionOfBroker2ParamIllegalTest();
// success
preferredReplicaElectionOfBroker2SuccessTest();
}
private void preferredReplicaElectionOfBroker2ParamIllegalTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
INVALID_BROKER_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARAM_ILLEGAL.getCode());
}
private void preferredReplicaElectionOfBroker2SuccessTest() throws ConfigException {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_BROKER_ID_IN_ZK,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
// 删除创建的节点,防止影响后续测试
ZkConfigImpl zkConfig = new ZkConfigImpl(ZOOKEEPER_ADDRESS);
zkConfig.delete(ZK_NODE_PATH_PREFERRED);
zkConfig.close();
}
@Test(description = "Topic纬度优先副本选举")
public void preferredReplicaElectionOfTopicTest() throws ConfigException {
// topic not exist
preferredReplicaElectionOfTopic2TopicNotExistTest();
// success
preferredReplicaElectionOfTopic2SuccessTest();
}
private void preferredReplicaElectionOfTopic2TopicNotExistTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
INVALID_TOPIC,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode());
}
private void preferredReplicaElectionOfTopic2SuccessTest() throws ConfigException {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_TOPIC1_IN_ZK,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
// 删除创建的节点,防止影响后续测试
ZkConfigImpl zkConfig = new ZkConfigImpl(ZOOKEEPER_ADDRESS);
zkConfig.delete(ZK_NODE_PATH_PREFERRED);
zkConfig.close();
}
@Test(description = "分区纬度优先副本选举")
public void preferredReplicaElectionOfPartitionTest() throws ConfigException {
// topic not exist
preferredReplicaElectionOfPartition2TopicNotExistTest();
// partition Not Exist
preferredReplicaElectionOfPartition2PartitionNotExistTest();
// success
preferredReplicaElectionOfPartition2SuccessTest();
}
private void preferredReplicaElectionOfPartition2TopicNotExistTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
INVALID_TOPIC,
INVALID_PARTITION_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode());
}
private void preferredReplicaElectionOfPartition2PartitionNotExistTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_TOPIC2_IN_ZK,
INVALID_PARTITION_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARTITION_NOT_EXIST.getCode());
}
private void preferredReplicaElectionOfPartition2SuccessTest() throws ConfigException {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_TOPIC2_IN_ZK,
REAL_PARTITION_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
// 删除创建的节点,防止影响后续测试
ZkConfigImpl zkConfig = new ZkConfigImpl(ZOOKEEPER_ADDRESS);
zkConfig.delete(ZK_NODE_PATH_PREFERRED);
zkConfig.close();
}
@Test(description = "测试获取Topic配置")
public void getTopicConfigTest() {
// result is null
getTopicConfig2NullTest();
// result not null
getTopicConfig2NotNullTest();
}
private void getTopicConfig2NullTest() {
ClusterDO clusterDO = getClusterDO();
clusterDO.setId(INVALID_CLUSTER_ID);
Properties topicConfig = adminService.getTopicConfig(clusterDO, REAL_TOPIC1_IN_ZK);
Assert.assertNull(topicConfig);
}
private void getTopicConfig2NotNullTest() {
ClusterDO clusterDO = getClusterDO();
Properties topicConfig = adminService.getTopicConfig(clusterDO, REAL_TOPIC1_IN_ZK);
Assert.assertNotNull(topicConfig);
}
@Test(description = "测试修改Topic配置")
public void modifyTopicConfigTest() {
ClusterDO clusterDO = getClusterDO();
Properties properties = new Properties();
properties.put("retention.ms", "21600000");
ResultStatus resultStatus = adminService.modifyTopicConfig(
clusterDO,
REAL_TOPIC1_IN_ZK,
properties,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试扩分区")
// 该测试会导致真实topic分区发生变化
public void expandPartitionsTest() {
// broker not exist
// expandPartitions2BrokerNotExistTest();
// success
// expandPartitions2SuccessTest();
}
private void expandPartitions2BrokerNotExistTest() {
// 存在两个下线broker, region中包含一个
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.expandPartitions(
clusterDO,
REAL_TOPIC1_IN_ZK,
2,
REAL_REGION_ID_IN_MYSQL,
Arrays.asList(INVALID_BROKER_ID),
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode());
}
private void expandPartitions2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.expandPartitions(
clusterDO,
REAL_TOPIC3_IN_ZK,
2,
INVALID_REGION_ID,
Arrays.asList(REAL_BROKER_ID_IN_ZK),
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ao.analysis.AnalysisBrokerDTO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
* @author xuguang
* @Date 2021/12/23
*/
public class AnalysisServiceTest extends BaseTest {
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Value("${test.broker.id1}")
private final static Integer REAL_BROKER_ID_IN_ZK = 1;
private final static Long INVALID_CLUSTER_ID = -1L;
@Autowired
private AnalysisService analysisService;
@Test
public void doAnalysisBrokerTest() {
// brokerMetrics is null
doAnalysisBroker2brokerMetricsIsNullTest();
// brokerMetrics is not null
doAnalysisBroker2brokerMetricsIsNotNullTest();
}
private void doAnalysisBroker2brokerMetricsIsNullTest() {
AnalysisBrokerDTO analysisBrokerDTO = analysisService.doAnalysisBroker(
INVALID_CLUSTER_ID,
REAL_BROKER_ID_IN_ZK
);
Assert.assertNotNull(analysisBrokerDTO);
Assert.assertEquals(analysisBrokerDTO.getBrokerId(), REAL_BROKER_ID_IN_ZK);
Assert.assertEquals(analysisBrokerDTO.getClusterId(), INVALID_CLUSTER_ID);
Assert.assertNull(analysisBrokerDTO.getBytesIn());
}
private void doAnalysisBroker2brokerMetricsIsNotNullTest() {
AnalysisBrokerDTO analysisBrokerDTO = analysisService.doAnalysisBroker(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK
);
Assert.assertNotNull(analysisBrokerDTO);
Assert.assertEquals(analysisBrokerDTO.getBrokerId(), REAL_BROKER_ID_IN_ZK);
Assert.assertEquals(analysisBrokerDTO.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
Assert.assertNotNull(analysisBrokerDTO.getBytesIn());
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.BrokerBasicDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.BrokerOverviewDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.TopicDiskLocation;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.apache.kafka.common.TopicPartition;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.*;
/**
* @author xuguang
* @Date 2021/12/10
*/
public class BrokerServiceTest extends BaseTest {
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Value("${test.broker.id1}")
private Integer REAL_BROKER_ID_IN_ZK;
@Value("${test.sasl-plaintext}")
private String END_POINTS_IN_BROKER;
@Autowired
@InjectMocks
private BrokerService brokerService;
@Mock
private JmxService jmxService;
@Mock
private TopicService topicService;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
private BrokerDO getBrokerDO() {
BrokerDO brokerDO = new BrokerDO();
brokerDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
brokerDO.setBrokerId(100);
brokerDO.setHost("127.0.0.1");
brokerDO.setPort(9093);
brokerDO.setTimestamp(1638605696062L);
brokerDO.setMaxAvgBytesIn(0d);
brokerDO.setStatus(0);
brokerDO.setGmtCreate(new Date(1638605696062L));
brokerDO.setGmtModify(new Date(1638605696062L));
return brokerDO;
}
private BrokerMetadata getBrokerMetadata() {
BrokerMetadata brokerMetadata = new BrokerMetadata();
brokerMetadata.setBrokerId(REAL_BROKER_ID_IN_ZK);
brokerMetadata.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
brokerMetadata.setHost("127.0.0.1");
brokerMetadata.setPort(9092);
brokerMetadata.setEndpoints(Arrays.asList(END_POINTS_IN_BROKER));
brokerMetadata.setTimestamp(1638605696062L);
brokerMetadata.setJmxPort(9999);
brokerMetadata.setRack("CY");
brokerMetadata.setVersion("2");
return brokerMetadata;
}
private TopicDiskLocation getTopicDiskLocation() {
TopicDiskLocation topicDiskLocation = new TopicDiskLocation();
topicDiskLocation.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicDiskLocation.setBrokerId(1);
topicDiskLocation.setTopicName("testTopic");
topicDiskLocation.setDiskName("disk");
topicDiskLocation.setLeaderPartitions(new ArrayList<>());
topicDiskLocation.setFollowerPartitions(Arrays.asList(0));
topicDiskLocation.setUnderReplicatedPartitions(new ArrayList<>());
topicDiskLocation.setUnderReplicated(false);
return topicDiskLocation;
}
private TopicPartition getTopicPartition() {
TopicPartition topicPartition = new TopicPartition("testTopic", 0);
return topicPartition;
}
private Map<TopicPartition, String> getDiskNameMap() {
Map<TopicPartition, String> diskNameMap = new HashMap<>();
TopicPartition topicPartition = getTopicPartition();
diskNameMap.put(topicPartition, "disk");
return diskNameMap;
}
private PartitionState getPartitionState() {
PartitionState partitionState = new PartitionState();
return partitionState;
}
private Map<String, List<PartitionState>> getStateMap() {
PartitionState partitionState = getPartitionState();
Map<String, List<PartitionState>> stateMap = new HashMap<>();
stateMap.put("string", Arrays.asList(partitionState));
return stateMap;
}
public BrokerMetrics getBrokerMetrics() {
BrokerMetrics brokerMetrics = new BrokerMetrics(1L, 1);
Map<String, Object> metricsMap = new HashMap<>();
metricsMap.put("PartitionCountValue", 100);
metricsMap.put("LeaderCountValue", 100);
brokerMetrics.setMetricsMap(metricsMap);
return brokerMetrics;
}
@Test
public void getBrokerVersionTest() {
String version = "1.4";
Mockito.when(jmxService.getBrokerVersion(Mockito.anyLong(), Mockito.anyInt())).thenReturn(version);
String brokerVersion = brokerService.getBrokerVersion(1L, 1);
Assert.assertNotNull(brokerVersion);
Assert.assertEquals(brokerVersion, version);
}
@Test(description = "根据Cluster和brokerId获取broker的具体信息测试")
public void getBrokerBasicDTO() {
// 测试结果为null
getBrokerBasicDTO2nullTest();
// 获取的brokerMetrics为空
getBrokerBasicDTO2brokerMetricsNullTest();
// 获取的brokerMetrics不为空
getBrokerBasicDTO2brokerMetricsNotNullTest();
}
private void getBrokerBasicDTO2nullTest() {
BrokerBasicDTO result1 = brokerService.getBrokerBasicDTO(null, 1);
Assert.assertNull(result1);
BrokerBasicDTO result2 = brokerService.getBrokerBasicDTO(1L, null);
Assert.assertNull(result2);
BrokerBasicDTO result3 = brokerService.getBrokerBasicDTO(100L, 100);
Assert.assertNull(result3);
}
private void getBrokerBasicDTO2brokerMetricsNullTest() {
BrokerBasicDTO result1 = brokerService.getBrokerBasicDTO(1L, 1);
Assert.assertNotNull(result1);
Assert.assertNull(result1.getPartitionCount());
Assert.assertNull(result1.getLeaderCount());
}
private void getBrokerBasicDTO2brokerMetricsNotNullTest() {
Mockito.when(jmxService.getBrokerMetrics(
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt())).thenReturn(getBrokerMetrics());
BrokerBasicDTO result1 = brokerService.getBrokerBasicDTO(1L, 1);
Assert.assertNotNull(result1);
Assert.assertNotNull(result1.getPartitionCount());
Assert.assertNotNull(result1.getLeaderCount());
}
@Test
public void getBrokerTopicLocationTest() {
Map<TopicPartition, String> diskNameMap = getDiskNameMap();
Mockito.when(jmxService.getBrokerTopicLocation(Mockito.any(), Mockito.any())).thenReturn(diskNameMap);
Map<String, List<PartitionState>> stateMap = getStateMap();
Mockito.when(topicService.getTopicPartitionState(Mockito.any(), Mockito.any())).thenReturn(stateMap);
TopicDiskLocation topicDiskLocation = getTopicDiskLocation();
List<TopicDiskLocation> expectedResult = Arrays.asList(topicDiskLocation);
List<TopicDiskLocation> actualResult = brokerService.getBrokerTopicLocation(1L, 1);
Assert.assertEquals(expectedResult.toString(), actualResult.toString());
}
@Test(description = "计算Broker的峰值均值流量测试")
public void calBrokerMaxAvgBytesInTest() {
// 参数异常
calBrokerMaxAvgBytesIn2ParamIllegalTest();
// 获取的指标为空
calBrokerMaxAvgBytesIn2ZeroTest();
// 整个流程
calBrokerMaxAvgBytesIn2Success();
}
private void calBrokerMaxAvgBytesIn2ParamIllegalTest() {
Double result1 = brokerService.calBrokerMaxAvgBytesIn(null, 1, 1, new Date(), new Date());
Assert.assertEquals(result1, -1.0);
Double result2 = brokerService.calBrokerMaxAvgBytesIn(1L, null, 1, new Date(), new Date());
Assert.assertEquals(result2, -1.0);
Double result3 = brokerService.calBrokerMaxAvgBytesIn(1L, 1, null, new Date(), new Date());
Assert.assertEquals(result3, -1.0);
Double result4 = brokerService.calBrokerMaxAvgBytesIn(1L, 1, 1, null, new Date());
Assert.assertEquals(result4, -1.0);
Double result5 = brokerService.calBrokerMaxAvgBytesIn(1L, 1, 1, new Date(), null);
Assert.assertEquals(result5, -1.0);
}
private void calBrokerMaxAvgBytesIn2ZeroTest() {
Double result = brokerService.calBrokerMaxAvgBytesIn(1L, 100, 100, new Date(), new Date());
Assert.assertEquals(result, 0.0);
}
private void calBrokerMaxAvgBytesIn2Success() {
// 此测试需要brokerId=1的broker上有真实的流量
long startTime = 0L;
long endTime = new Date().getTime();
Double result = brokerService.calBrokerMaxAvgBytesIn(
1L, 1, 2, new Date(startTime), new Date(endTime));
Assert.assertTrue(result > 0.0);
}
@Test(description = "获取BrokerMetrics信息测试,单个broker")
public void getBrokerMetricsFromJmxTest() {
// 参数错误
getBrokerMetricsFromJmx2ParamIllegalTest();
// 返回为null
getBrokerMetricsFromJmx2nullTest();
// 获取成功
getBrokerMetricsFromJmx2SuccessTest();
}
private void getBrokerMetricsFromJmx2ParamIllegalTest() {
BrokerMetrics result1 = brokerService.getBrokerMetricsFromJmx(null, 1, 200);
Assert.assertNull(result1);
BrokerMetrics result3 = brokerService.getBrokerMetricsFromJmx(1L, 1, null);
Assert.assertNull(result3);
}
private void getBrokerMetricsFromJmx2nullTest() {
BrokerMetrics result1 = brokerService.getBrokerMetricsFromJmx(1L, 1, 200);
Assert.assertNull(result1);
}
private void getBrokerMetricsFromJmx2SuccessTest() {
Mockito.when(jmxService.getBrokerMetrics(
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt())).thenReturn(new BrokerMetrics(1L, 1));
BrokerMetrics result1 = brokerService.getBrokerMetricsFromJmx(1L, 1, 200);
Assert.assertNotNull(result1);
Assert.assertEquals(Optional.ofNullable(result1.getClusterId()), Optional.ofNullable(1L));
Assert.assertEquals(Optional.ofNullable(result1.getBrokerId()), Optional.ofNullable(1));
}
@Test(description = "获取BrokerMetrics信息测试,多个broker")
public void getBrokerMetricsFromJmxWithMoreBrokersTest() {
Mockito.when(jmxService.getBrokerMetrics(
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt())).thenReturn(new BrokerMetrics(1L, 1));
Set<Integer> set = new HashSet<>();
set.add(1);
set.add(2);
set.add(3);
List<BrokerMetrics> result = brokerService.getBrokerMetricsFromJmx(1L, set, 200);
Assert.assertNotNull(result);
Assert.assertTrue(result.stream().allMatch(brokerMetric ->
brokerMetric.getClusterId().equals(1L)));
}
@Test(description = "获取Broker列表信息")
public void getBrokerOverviewListTest() {
// brokerIdSet为空时
getBrokerOverviewList2BrokerIdSetIsNullTest();
// brokerIdSet不为空时
getBrokerOverviewList2BrokerIdSetNotNullTest();
}
private void getBrokerOverviewList2BrokerIdSetIsNullTest() {
List<BrokerOverviewDTO> brokerOverviewList = brokerService.getBrokerOverviewList(1L, null);
Assert.assertFalse(brokerOverviewList.isEmpty());
Assert.assertTrue(brokerOverviewList.stream().allMatch(brokerOverviewDTO ->
brokerOverviewDTO.getPort().equals(9093)));
}
private void getBrokerOverviewList2BrokerIdSetNotNullTest() {
Set<Integer> set = new HashSet<>();
set.add(1);
set.add(2);
List<BrokerOverviewDTO> brokerOverviewList = brokerService.getBrokerOverviewList(1L, set);
Assert.assertFalse(brokerOverviewList.isEmpty());
Assert.assertTrue(brokerOverviewList.stream().allMatch(brokerOverviewDTO ->
brokerOverviewDTO.getPort().equals(9093)));
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.expert.TopicExpiredConfig;
import com.xiaojukeji.kafka.manager.common.entity.dto.config.ConfigDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ConfigDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author wyc
* @date 2021/12/9
*/
public class ConfigServiceTest extends BaseTest {
@Autowired
private ConfigService configService;
@DataProvider(name = "configDTO")
public Object[][] provideConfigDO() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey("key1");
dto.setConfigValue("value1");
dto.setConfigDescription("test");
return new Object[][] {{dto}};
}
public ConfigDTO getConfigDTO() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey("key1");
dto.setConfigValue("value1");
dto.setConfigDescription("test");
return dto;
}
@Test(dataProvider = "configDTO")
public void insertTest(ConfigDTO dto) {
// 插入时,MySQL错误
insert2MySQLErrorTest(dto);
// 插入成功测试
insert2SuccessTest(dto);
// 插入时,资源已存在测试
insert2ResourceExistedTest(dto);
}
private void insert2SuccessTest(ConfigDTO dto) {
dto.setConfigKey("key1");
ResultStatus result = configService.insert(dto);
Assert.assertEquals(result, ResultStatus.SUCCESS);
}
private void insert2ResourceExistedTest(ConfigDTO dto2) {
ResultStatus result2 = configService.insert(dto2);
Assert.assertEquals(result2, ResultStatus.RESOURCE_ALREADY_EXISTED);
}
private void insert2MySQLErrorTest(ConfigDTO dto) {
dto.setConfigKey(null);
ResultStatus result = configService.insert(dto);
Assert.assertEquals(result, ResultStatus.MYSQL_ERROR);
}
@Test
public void deleteByKeyTest() {
// deleteByKey, key时null
deleteByKey2NullTest();
// deleteByKey, 配置不存在测试
deleteByKey2ConfigNotExistTest();
// deleteByKey, 成功测试
deleteByKey2SuccessTest();
}
private void deleteByKey2NullTest() {
ResultStatus result = configService.deleteByKey(null);
Assert.assertEquals(result, ResultStatus.PARAM_ILLEGAL);
}
private void deleteByKey2SuccessTest() {
ConfigDTO dto = getConfigDTO();
ResultStatus insertResult = configService.insert(dto);
Assert.assertEquals(insertResult, ResultStatus.SUCCESS);
ResultStatus deleteResult = configService.deleteByKey(dto.getConfigKey());
Assert.assertEquals(deleteResult, ResultStatus.SUCCESS);
}
private void deleteByKey2ConfigNotExistTest() {
ResultStatus result = configService.deleteByKey("key");
Assert.assertEquals(result, ResultStatus.CONFIG_NOT_EXIST);
}
@Test(dataProvider = "configDTO")
public void updateByKeyTest(ConfigDTO dto) {
configService.insert(dto);
// updateByKey, 成功测试
updateByKey2SuccessTest(dto);
// updateByKey, 配置不存在测试
updateByKey2ConfigNotExistTest(dto);
}
private void updateByKey2SuccessTest(ConfigDTO dto) {
dto.setConfigValue("newValue");
ResultStatus updateResult = configService.updateByKey(dto);
Assert.assertEquals(updateResult, ResultStatus.SUCCESS);
}
@Test(dataProvider = "configDTO", description = "updateByKey, 配置不存在测试")
private void updateByKey2ConfigNotExistTest(ConfigDTO dto) {
dto.setConfigKey("newKey");
ResultStatus updateResult = configService.updateByKey(dto);
Assert.assertEquals(updateResult, ResultStatus.CONFIG_NOT_EXIST);
}
@Test(dataProvider = "configDTO")
public void updateByKeyTest2(ConfigDTO dto) {
configService.insert(dto);
// updateByKey重载方法,成功测试
updateByKey2SuccessTest1(dto);
// updateByKey重载方法,资源不存在测试
updateByKey2ConfigNotExistTest1(dto);
}
private void updateByKey2SuccessTest1(ConfigDTO dto) {
String key = dto.getConfigKey();
String value = "newValue";
Assert.assertEquals(configService.updateByKey(key, value), ResultStatus.SUCCESS);
}
private void updateByKey2ConfigNotExistTest1(ConfigDTO dto) {
Assert.assertEquals(configService.updateByKey("key2", "newValue"), ResultStatus.CONFIG_NOT_EXIST);
}
@Test(dataProvider = "configDTO")
public void getByKeyTest(ConfigDTO dto) {
configService.insert(dto);
// getByKey, 成功测试
getByKey2SuccessTest(dto);
// getByKey, 获取失败测试
getByKey2NullTest();
}
private void getByKey2SuccessTest(ConfigDTO dto) {
ConfigDO result = configService.getByKey(dto.getConfigKey());
Assert.assertNotNull(result);
Assert.assertTrue(result.getConfigKey().equals(dto.getConfigKey()) &&
result.getConfigValue().equals(dto.getConfigValue()) &&
result.getConfigDescription().equals(dto.getConfigDescription()));
}
private void getByKey2NullTest() {
Assert.assertNull(configService.getByKey("key2"));
}
@Test(dataProvider = "configDTO")
public void getByKeyTest2(ConfigDTO dto) {
// 需要用到TopicExpiredConfig类
TopicExpiredConfig config = getTopicExpiredConfig();
dto.setConfigValue(JSON.toJSONString(config));
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
// getByKey, 成功测试
getByKey2SuccessTest1(dto);
// getByKey, 返回null测试
getByKey2NullTest1(dto);
}
private TopicExpiredConfig getTopicExpiredConfig() {
TopicExpiredConfig config = new TopicExpiredConfig();
List<Long> list = new ArrayList<>();
list.add(1L);
list.add(2L);
config.setIgnoreClusterIdList(list);
return config;
}
private void getByKey2SuccessTest1(ConfigDTO dto) {
TopicExpiredConfig result = configService.getByKey(dto.getConfigKey(), TopicExpiredConfig.class);
Assert.assertEquals(result.toString(), getTopicExpiredConfig().toString());
}
private void getByKey2NullTest1(ConfigDTO dto) {
Assert.assertNull(configService.getByKey("key", TopicExpiredConfig.class));
}
@Test(dataProvider = "configDTO")
public void getArrayByKeyTest(ConfigDTO dto) {
dto.setConfigValue(JSON.toJSONString(getStringArray()));
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
// getArrayByKey 成功测试
getArrayByKey2SuccessTest(dto);
// getArrayByKey 返回null测试
getArrayByKey2NullTest();
}
private List<String> getStringArray() {
List<String> list = new ArrayList<>();
list.add("value1");
list.add("value2");
list.add("value3");
return list;
}
private void getArrayByKey2SuccessTest(ConfigDTO dto) {
List<String> result = configService.getArrayByKey(dto.getConfigKey(), String.class);
Assert.assertEquals(result, getStringArray());
}
private void getArrayByKey2NullTest() {
Assert.assertNull(configService.getArrayByKey(null, String.class));
}
@Test(dataProvider = "configDTO", description = "getLongValue, 成功测试")
public void getLongValue2SuccessTest(ConfigDTO dto) {
dto.setConfigValue("100");
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
Assert.assertEquals(configService.getLongValue(dto.getConfigKey(), 0L), Long.valueOf(dto.getConfigValue()));
}
@Test(description = "getLongValue, 不存在key,返回默认值测试")
public void getLongValue2NotExistTest() {
Assert.assertEquals(configService.getLongValue("key", 100L), Long.valueOf(100L));
}
@Test(dataProvider = "configDTO", description = "getLongValue, 存在key但是value是null")
public void getLongValue2ValueIsNull(ConfigDTO dto) {
dto.setConfigValue(null);
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
Assert.assertEquals(configService.getLongValue(dto.getConfigKey(), 100L), Long.valueOf(100L));
}
@Test(dataProvider = "configDTO", description = "listAll, 成功测试")
public void listAll2SuccessTest(ConfigDTO dto) {
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
List<ConfigDO> result = configService.listAll();
Assert.assertNotNull(result);
List<ConfigDTO> list = new ArrayList<>();
list.add(dto);
// 判断key字段是否相同
Assert.assertEquals(result.stream().map(ConfigDO::getConfigKey).collect(Collectors.toList()),
list.stream().map(ConfigDTO::getConfigKey).collect(Collectors.toList()));
Assert.assertEquals(result.stream().map(ConfigDO::getConfigValue).collect(Collectors.toList()),
list.stream().map(ConfigDTO::getConfigValue).collect(Collectors.toList()));
}
public CreateTopicConfig getCreateTopicConfig() {
return new CreateTopicConfig();
}
public ConfigDTO getConfigDTO1() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey(TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY);
dto.setConfigValue(JSON.toJSONString(getCreateTopicConfig()));
dto.setConfigDescription("test");
return dto;
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, config表中不存在INNER_CREATE_TOPIC_CONFIG_KEY" +
"对应的记录,返回默认值测试")
public void getAutoPassedTopicApplyOrderNumPerTask2NotExistTest() {
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK);
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中,记录的maxPassedOrderNumPerTask属性为null测试")
public void getAutoPassedTopicApplyOrderNumPerTask2NullTest() {
configService.insert(getConfigDTO1());
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK);
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中,记录的maxPassedOrderNumPerTask" +
"比TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK大时测试")
public void getAutoPassedTopicApplyOrderNumPerTask2BiggerMaxTest() {
ConfigDTO configDTO = getConfigDTO1();
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
createTopicConfig.setMaxPassedOrderNumPerTask(TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK + 10);
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK);
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中,记录的maxPassedOrderNumPerTask" +
"比TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK小时测试")
public void getAutoPassedTopicApplyOrderNumPerTask2SmallerMaxTest() {
ConfigDTO configDTO = getConfigDTO1();
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
int val = TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK - 10;
createTopicConfig.setMaxPassedOrderNumPerTask(val);
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), Integer.valueOf(val));
}
public CreateTopicElemConfig getCreateTopicElemConfig(Long clusterId) {
CreateTopicElemConfig config = new CreateTopicElemConfig();
config.setClusterId(clusterId);
config.setBrokerIdList(new ArrayList<>());
config.setRegionIdList(new ArrayList<>());
config.setPartitionNum(TopicCreationConstant.DEFAULT_PARTITION_NUM);
config.setReplicaNum(TopicCreationConstant.DEFAULT_REPLICA);
config.setRetentionTimeUnitHour(TopicCreationConstant.DEFAULT_RETENTION_TIME_UNIT_HOUR);
config.setAutoExecMaxPeakBytesInUnitB(TopicCreationConstant.AUTO_EXEC_MAX_BYTES_IN_UNIT_B);
return config;
}
@Test(description = "getCreateTopicConfig, config表中不存在key时测试")
public void getCreateTopicConfig2NotExistKeyTest() {
CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(10L);
Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), createTopicElemConfig.toString());
}
@Test(description = "getCreateTopicConfig, value中存在和clusterId一致的记录")
public void getCreateTopicConfig2ExistTest() {
CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(10L);
createTopicElemConfig.setReplicaNum(4);
List<CreateTopicElemConfig> list = new ArrayList<>();
list.add(createTopicElemConfig);
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
createTopicConfig.setConfigList(list);
ConfigDTO configDTO = getConfigDTO1();
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), createTopicElemConfig.toString());
}
@Test(description = "getCreateTopicConfig, value中不存在和clusterId一致的记录")
public void getCreateTopicConfig2NotExitConfigEleTest() {
CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(11L);
createTopicElemConfig.setReplicaNum(4);
List<CreateTopicElemConfig> list = new ArrayList<>();
list.add(createTopicElemConfig);
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
createTopicConfig.setConfigList(list);
ConfigDTO configDTO = getConfigDTO1();
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), getCreateTopicElemConfig(10L).toString());
}
public ConfigDTO getConfigDTO2() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey(ConfigConstant.KAFKA_CLUSTER_DO_CONFIG_KEY);
dto.setConfigDescription("test");
return dto;
}
public ConfigDO getConfigDO() {
return new ConfigDO();
}
@Test(description = "getClusterDO, config表中不存在ConfigConstant.KAFKA_CLUSTER_DO_CONFIG_KEY这个key")
public void getClusterDO2NotExistKeyTest() {
Assert.assertNull(configService.getClusterDO(10L));
}
@Test(description = "getClusterDO, config表中key对应的value没法解析成ConfigDO测试")
public void getClusterDO2ParseFailTest() {
ConfigDTO configDTO2 = getConfigDTO2();
configDTO2.setConfigValue("value");
configService.insert(configDTO2);
Assert.assertNull(configService.getClusterDO(10L));
}
public List<ClusterDO> getClusterDOList() {
ClusterDO clusterDO1 = new ClusterDO();
clusterDO1.setId(10L);
clusterDO1.setClusterName("test1");
ClusterDO clusterDO2 = new ClusterDO();
clusterDO2.setId(20L);
clusterDO2.setClusterName("test2");
List<ClusterDO> list = new ArrayList<>();
list.add(clusterDO1);
list.add(clusterDO2);
return list;
}
@Test(description = "getClusterDO, 成功查到测试")
public void getClusterDO2SuccessTest() {
ConfigDTO configDTO2 = getConfigDTO2();
List<ClusterDO> clusterDOList = getClusterDOList();
configDTO2.setConfigValue(JSON.toJSONString(clusterDOList));
configService.insert(configDTO2);
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(20L);
clusterDO.setClusterName("test2");
Assert.assertEquals(configService.getClusterDO(20L), clusterDO);
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetLocationEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumeDetailDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroup;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupSummary;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* 测试消费组消费情况需要保证集群中存在消费组
* @author xuguang
* @Date 2021/12/23
*/
public class ConsumerServiceTest extends BaseTest {
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
/**
* 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上
*/
@Value("${test.topic.name1}")
private String REAL_TOPIC1_IN_ZK;
/**
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
*/
private final static String REAL_TOPIC2_IN_ZK = "xgTest";
private final static String INVALID_TOPIC = "xxxxxx";
@Value("${test.consumer-group}")
private String REAL_CONSUMER_GROUP_NAME;
private final static String INVALID_CONSUMER_GROUP_NAME = "xxxxxxxx";
@Value("${test.phyCluster.name}")
private String REAL_PHYSICAL_CLUSTER_NAME;
@Value("${test.ZK.address}")
private String ZOOKEEPER_ADDRESS;
@Value("${test.ZK.bootstrap-servers}")
private String BOOTSTRAP_SERVERS;
private String SECURITY_PROTOCOL = "{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }";
@Autowired
private ConsumerService consumerService;
private ClusterDO getClusterDO() {
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
clusterDO.setClusterName(REAL_PHYSICAL_CLUSTER_NAME);
clusterDO.setZookeeper(ZOOKEEPER_ADDRESS);
clusterDO.setBootstrapServers(BOOTSTRAP_SERVERS);
clusterDO.setSecurityProperties(SECURITY_PROTOCOL);
clusterDO.setStatus(1);
clusterDO.setGmtCreate(new Date());
clusterDO.setGmtModify(new Date());
return clusterDO;
}
private ConsumerGroup getConsumerGroup() {
return new ConsumerGroup(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_CONSUMER_GROUP_NAME,
OffsetLocationEnum.BROKER);
}
private PartitionOffsetDTO getPartitionOffsetDTO() {
PartitionOffsetDTO partitionOffsetDTO = new PartitionOffsetDTO();
partitionOffsetDTO.setOffset(0L);
partitionOffsetDTO.setPartitionId(0);
return partitionOffsetDTO;
}
// @Test(description = "测试获取消费组列表")
// 因定时任务暂时无法跑通
public void getConsumerGroupListTest() {
List<ConsumerGroup> consumerGroupList = consumerService.getConsumerGroupList(REAL_CLUSTER_ID_IN_MYSQL);
Assert.assertFalse(consumerGroupList.isEmpty());
Assert.assertTrue(consumerGroupList.stream().allMatch(consumerGroup ->
consumerGroup.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL)));
}
// @Test(description = "测试查询消费Topic的消费组")
// 因定时任务暂时无法跑通
public void getConsumerGroupListWithTopicTest() {
List<ConsumerGroup> consumerGroupList = consumerService.getConsumerGroupList(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC1_IN_ZK
);
Assert.assertFalse(consumerGroupList.isEmpty());
Assert.assertTrue(consumerGroupList.stream().allMatch(consumerGroup ->
consumerGroup.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL)));
}
// @Test(description = "测试获取消费Topic的消费组概要信息")
// 因定时任务暂时无法跑通
public void getConsumerGroupSummariesTest() {
// result is empty
getConsumerGroupSummaries2EmptyTest();
// result is not empty
getConsumerGroupSummaries2NotEmptyTest();
}
private void getConsumerGroupSummaries2EmptyTest() {
List<ConsumerGroupSummary> consumerGroupSummaries = consumerService.getConsumerGroupSummaries(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_TOPIC
);
Assert.assertTrue(consumerGroupSummaries.isEmpty());
}
private void getConsumerGroupSummaries2NotEmptyTest() {
List<ConsumerGroupSummary> consumerGroupSummaries = consumerService.getConsumerGroupSummaries(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC1_IN_ZK
);
Assert.assertFalse(consumerGroupSummaries.isEmpty());
}
@Test(description = "测试查询消费详情")
public void getConsumeDetail() {
// result is empty
getConsumeDetail2Empty();
// result is not empty
getConsumeDetail2NotEmpty();
}
private void getConsumeDetail2Empty() {
ClusterDO clusterDO = getClusterDO();
List<ConsumeDetailDTO> consumeDetail1 =
consumerService.getConsumeDetail(clusterDO, INVALID_TOPIC, null);
Assert.assertTrue(consumeDetail1.isEmpty());
ConsumerGroup consumerGroup = getConsumerGroup();
consumerGroup.setOffsetStoreLocation(null);
List<ConsumeDetailDTO> consumeDetail2 =
consumerService.getConsumeDetail(clusterDO, REAL_TOPIC1_IN_ZK, consumerGroup);
Assert.assertTrue(consumeDetail2.isEmpty());
}
private void getConsumeDetail2NotEmpty() {
ClusterDO clusterDO = getClusterDO();
ConsumerGroup consumerGroup = getConsumerGroup();
List<ConsumeDetailDTO> consumeDetail1 =
consumerService.getConsumeDetail(clusterDO, REAL_TOPIC1_IN_ZK, consumerGroup);
Assert.assertFalse(consumeDetail1.isEmpty());
}
@Test(description = "测试获取消费组消费的Topic列表")
public void getConsumerGroupConsumedTopicListTest() {
// result is empty
getConsumerGroupConsumedTopicList2Empty();
// result is not empty
// 因定时任务暂时无法跑通
// getConsumerGroupConsumedTopicList2NotEmpty();
}
private void getConsumerGroupConsumedTopicList2Empty() {
List<String> list = consumerService.getConsumerGroupConsumedTopicList(
null,
null,
null);
Assert.assertTrue(list.isEmpty());
}
private void getConsumerGroupConsumedTopicList2NotEmpty() {
List<String> list = consumerService.getConsumerGroupConsumedTopicList(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_CONSUMER_GROUP_NAME,
"broker");
Assert.assertFalse(list.isEmpty());
}
@Test(description = "测试获取消费者offset")
public void getConsumerOffsetTest() {
// result is null
getConsumerOffset2NullTest();
// result is not null
getConsumerOffset2NotNullTest();
}
private void getConsumerOffset2NullTest() {
Map<Integer, Long> consumerOffset1 = consumerService.getConsumerOffset(null, null, null);
Assert.assertNull(consumerOffset1);
ClusterDO clusterDO = getClusterDO();
ConsumerGroup consumerGroup = getConsumerGroup();
consumerGroup.setOffsetStoreLocation(null);
Map<Integer, Long> consumerOffset2 = consumerService.getConsumerOffset(
clusterDO,
REAL_TOPIC1_IN_ZK,
consumerGroup
);
Assert.assertNull(consumerOffset2);
}
private void getConsumerOffset2NotNullTest() {
ClusterDO clusterDO = getClusterDO();
ConsumerGroup consumerGroup = getConsumerGroup();
Map<Integer, Long> consumerOffset = consumerService.getConsumerOffset(
clusterDO,
REAL_TOPIC1_IN_ZK,
consumerGroup
);
Assert.assertNotNull(consumerOffset);
Assert.assertFalse(consumerOffset.isEmpty());
}
@Test(description = "测试获取每个集群消费组的个数")
public void getConsumerGroupNumMapTest() {
ClusterDO clusterDO = getClusterDO();
Map<Long, Integer> map = consumerService.getConsumerGroupNumMap(Arrays.asList(clusterDO));
Assert.assertFalse(map.isEmpty());
Assert.assertTrue(clusterDO.getId() >= 0);
}
@Test(description = "验证消费组是否存在")
public void checkConsumerGroupExistTest() {
// 不存在
checkConsumerGroupExist2FalseTest();
// 存在
// 因定时任务暂时无法跑通
// checkConsumerGroupExist2TrueTest();
}
private void checkConsumerGroupExist2FalseTest() {
boolean result = consumerService.checkConsumerGroupExist(
OffsetLocationEnum.BROKER,
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC1_IN_ZK,
INVALID_CONSUMER_GROUP_NAME
);
Assert.assertFalse(result);
}
private void checkConsumerGroupExist2TrueTest() {
boolean result = consumerService.checkConsumerGroupExist(
OffsetLocationEnum.BROKER,
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC1_IN_ZK,
REAL_CONSUMER_GROUP_NAME
);
Assert.assertTrue(result);
}
@Test(description = "测试重置offset")
public void resetConsumerOffsetTest() {
ClusterDO clusterDO = getClusterDO();
ConsumerGroup consumerGroup = getConsumerGroup();
PartitionOffsetDTO partitionOffsetDTO1 = getPartitionOffsetDTO();
List<Result> results = consumerService.resetConsumerOffset(
clusterDO,
REAL_TOPIC1_IN_ZK,
consumerGroup,
Arrays.asList(partitionOffsetDTO1)
);
Assert.assertFalse(results.isEmpty());
Assert.assertTrue(results.stream().allMatch(result ->
result.getCode() == ResultStatus.SUCCESS.getCode()));
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.expert.RegionTopicHotConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.expert.TopicExpiredConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.expert.TopicInsufficientPartitionConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.expert.TopicInsufficientPartition;
import com.xiaojukeji.kafka.manager.common.entity.ao.expert.TopicRegionHot;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.validation.constraints.AssertTrue;
import java.util.*;
/**
* @author wyc
* @date 2021/12/27
*/
public class ExpertServiceTest extends BaseTest {
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Value("${test.topic.name4}")
private String REAL_TOPIC_IN_ZK;
private final static Set<Integer> REAL_BROKER_ID_SET = new HashSet<>();
private String metrics = "{\"TotalFetchRequestsPerSecFiveMinuteRate\":4.132236103122026,\"BytesRejectedPerSecFiveMinuteRate\":0.0,\"TotalFetchRequestsPerSecFifteenMinuteRate\":1.5799208507558833,\"ProduceTotalTimeMs98thPercentile\":0.0,\"MessagesInPerSecMeanRate\":0.0,\"ProduceTotalTimeMs75thPercentile\":0.0,\"ProduceTotalTimeMs99thPercentile\":0.0,\"TotalProduceRequestsPerSecOneMinuteRate\":0.0,\"FailedProduceRequestsPerSecFifteenMinuteRate\":0.0,\"BytesInPerSecMeanRate\":0.0,\"TotalProduceRequestsPerSecFiveMinuteRate\":0.0,\"FetchConsumerTotalTimeMs999thPercentile\":0.0,\"FetchConsumerTotalTimeMs98thPercentile\":0.0,\"FetchConsumerTotalTimeMsMean\":0.0,\"FetchConsumerTotalTimeMs99thPercentile\":0.0,\"FailedFetchRequestsPerSecFifteenMinuteRate\":0.0,\"MessagesInPerSecFiveMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentOneMinuteRate\":0.999221766772746,\"ProduceTotalTimeMsMean\":0.0,\"BytesInPerSecFiveMinuteRate\":0.0,\"FailedProduceRequestsPerSecMeanRate\":0.0,\"FailedFetchRequestsPerSecMeanRate\":0.0,\"FailedProduceRequestsPerSecFiveMinuteRate\":0.0,\"BytesOutPerSecFifteenMinuteRate\":0.0,\"BytesInPerSecOneMinuteRate\":100.0,\"BytesOutPerSecFiveMinuteRate\":0.0,\"HealthScore\":90,\"FailedFetchRequestsPerSecOneMinuteRate\":0.0,\"MessagesInPerSecOneMinuteRate\":0.0,\"BytesRejectedPerSecFifteenMinuteRate\":0.0,\"FailedFetchRequestsPerSecFiveMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentFiveMinuteRate\":0.999803118809842,\"BytesOutPerSecOneMinuteRate\":0.0,\"ResponseQueueSizeValue\":0,\"MessagesInPerSecFifteenMinuteRate\":0.0,\"TotalProduceRequestsPerSecMeanRate\":0.0,\"BytesRejectedPerSecMeanRate\":0.0,\"TotalFetchRequestsPerSecMeanRate\":1.2674449706628523,\"NetworkProcessorAvgIdlePercentValue\":1.0,\"TotalFetchRequestsPerSecOneMinuteRate\":10.457259856316893,\"BytesInPerSecFifteenMinuteRate\":0.0,\"BytesOutPerSecMeanRate\":0.0,\"TotalProduceRequestsPerSecFifteenMinuteRate\":0.0,\"FetchConsumerTotalTimeMs50thPercentile\":0.0,\"RequestHandlerAvgIdlePercentFifteenMinuteRate\":0.9999287809186348,\"FetchConsumerTotalTimeMs95thPercentile\":0.0,\"FailedProduceRequestsPerSecOneMinuteRate\":0.0,\"CreateTime\":1638792321071,\"FetchConsumerTotalTimeMs75thPercentile\":0.0,\"ProduceTotalTimeMs999thPercentile\":0.0,\"RequestQueueSizeValue\":0,\"ProduceTotalTimeMs50thPercentile\":0.0,\"BytesRejectedPerSecOneMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentMeanRate\":0.9999649184090593,\"ProduceTotalTimeMs95thPercentile\":0.0}";
static {
REAL_BROKER_ID_SET.add(1);
REAL_BROKER_ID_SET.add(2);
}
@Autowired
@InjectMocks
private ExpertService expertService;
@Mock
private ConfigService configService;
@Mock
private RegionService regionService;
@Mock
private ClusterService clusterService;
@Mock
private TopicManagerService topicManagerService;
@Autowired
private TopicMetricsDao topicMetricsDao;
@BeforeMethod
public void init() {
MockitoAnnotations.initMocks(this);
}
private ClusterDO getClusterDO() {
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
return clusterDO;
}
private RegionTopicHotConfig getRegionTopicHotConfig() {
RegionTopicHotConfig config = new RegionTopicHotConfig();
config.setMaxDisPartitionNum(-1);// 为了通过检测
// ignoreClusterIdList字段不用设置
config.setMinTopicBytesInUnitB(-1L);// 为了通过检测
return config;
}
private TopicRegionHot getTopicRegionHot() {
ClusterDO clusterDO = getClusterDO();
TopicRegionHot hotTopic = new TopicRegionHot(clusterDO, REAL_TOPIC_IN_ZK, null, new HashMap<>());
return hotTopic;
}
private Map<String, Set<Integer>> getTopicNameRegionBrokerIdMap() {
Map<String, Set<Integer>> map = new HashMap<>();
map.put(REAL_TOPIC_IN_ZK, REAL_BROKER_ID_SET);
return map;
}
private Map<String, List<Double>> getMaxAvgBytesInMap() {
Map<String, List<Double>> map = new HashMap<>();
map.put(REAL_TOPIC_IN_ZK, new ArrayList<>());
return map;
}
private TopicInsufficientPartitionConfig getTopicInsufficientPartitionConfig() {
TopicInsufficientPartitionConfig config = new TopicInsufficientPartitionConfig();
config.setMinTopicBytesInUnitB(-1L);// 为了通过测试
config.setMaxBytesInPerPartitionUnitB(10L);// 为了通过测试
return config;
}
private TopicExpiredDO getTopicExpiredDO() {
TopicExpiredDO topicExpiredDO = new TopicExpiredDO();
topicExpiredDO.setTopicName(REAL_TOPIC_IN_ZK);
topicExpiredDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
return topicExpiredDO;
}
private TopicExpiredConfig getTopicExpiredConfig() {
TopicExpiredConfig config = new TopicExpiredConfig();
config.setIgnoreClusterIdList(new ArrayList<>());
return config;
}
private TopicMetricsDO getTopicMetricsDO() {
TopicMetricsDO topicMetricsDO = new TopicMetricsDO();
topicMetricsDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicMetricsDO.setTopicName(REAL_TOPIC_IN_ZK);
topicMetricsDO.setMetrics(metrics);
return topicMetricsDO;
}
private TopicInsufficientPartition getTopicInsufficientPartition() {
ClusterDO clusterDO = getClusterDO();
return new TopicInsufficientPartition(
clusterDO,
REAL_TOPIC_IN_ZK,
null,
null,
null,
null,
new ArrayList<>(REAL_BROKER_ID_SET)
);
}
@Test
public void getRegionHotTopicsTest() {
// 返回空集合测试
getRegionHotTopics2EmptyTest();
getRegionHotTopics2SuccessTest();
}
private void getRegionHotTopics2EmptyTest() {
Mockito.when(configService.getByKey(Mockito.anyString(), Mockito.any())).thenReturn(null);
Assert.assertTrue(expertService.getRegionHotTopics().isEmpty());
}
private void getRegionHotTopics2SuccessTest() {
RegionTopicHotConfig config = getRegionTopicHotConfig();
Mockito.when(configService.getByKey(Mockito.anyString(), Mockito.any())).thenReturn(config);
ClusterDO clusterDO = getClusterDO();
List<ClusterDO> clusterDOList = new ArrayList<>();
clusterDOList.add(clusterDO);
Mockito.when(clusterService.list()).thenReturn(clusterDOList);
Map<String, Set<Integer>> map = getTopicNameRegionBrokerIdMap();
Mockito.when(regionService.getTopicNameRegionBrokerIdMap(Mockito.anyLong())).thenReturn(map);
Assert.assertTrue(expertService.getRegionHotTopics().stream().allMatch(hotTopic -> hotTopic.getClusterDO().getId().equals(clusterDO.getId())));
}
@Test
public void getPartitionInsufficientTopicsTest() {
// 返回空集合测试
getPartitionInsufficientTopic2EmptyTest();
// 成功测试
getPartitionInsufficientTopicsSuccessTest();
}
private void getPartitionInsufficientTopic2EmptyTest() {
TopicInsufficientPartitionConfig config = getTopicInsufficientPartitionConfig();
Mockito.when(configService.getByKey(Mockito.anyString(), Mockito.any())).thenReturn(config);
Mockito.when(clusterService.list()).thenReturn(new ArrayList<>());
Assert.assertTrue(expertService.getPartitionInsufficientTopics().isEmpty());
}
private void getPartitionInsufficientTopicsSuccessTest() {
// 先向数据库中插入
List<TopicMetricsDO> topicMetricsDOList = new ArrayList<>();
topicMetricsDOList.add(getTopicMetricsDO());
topicMetricsDao.batchAdd(topicMetricsDOList);
TopicInsufficientPartitionConfig config = getTopicInsufficientPartitionConfig();
Mockito.when(configService.getByKey(Mockito.anyString(), Mockito.any())).thenReturn(config);
ClusterDO clusterDO = getClusterDO();
List<ClusterDO> clusterDOList = new ArrayList<>();
clusterDOList.add(clusterDO);
Mockito.when(clusterService.list()).thenReturn(clusterDOList);
Map<String, Set<Integer>> map = getTopicNameRegionBrokerIdMap();
Mockito.when(regionService.getTopicNameRegionBrokerIdMap(Mockito.anyLong())).thenReturn(map);
Map<String, List<Double>> maxAvgBytesInMap = getMaxAvgBytesInMap();
Mockito.when(topicManagerService.getTopicMaxAvgBytesIn(Mockito.anyLong(), Mockito.anyInt(), Mockito.anyDouble())).thenReturn(maxAvgBytesInMap);
TopicInsufficientPartition expectResult = getTopicInsufficientPartition();
Assert.assertTrue(expertService.getPartitionInsufficientTopics().stream().allMatch(topic -> topic.getClusterDO().getId().equals(expectResult.getClusterDO().getId()) &&
topic.getTopicName().equals(expectResult.getTopicName())));
}
@Test
public void getExpiredTopicsTest() {
// 返回空集合测试
getExpiredTopics2EmptyTest();
// 成功测试
getExpiredTopics2SuccessTest();
}
private void getExpiredTopics2EmptyTest() {
TopicExpiredConfig topicExpiredConfig = getTopicExpiredConfig();
Mockito.when(configService.getByKey(Mockito.anyString(), Mockito.any())).thenReturn(topicExpiredConfig);
Mockito.when(topicManagerService.getExpiredTopics(Mockito.anyInt())).thenReturn(new ArrayList<>());
Assert.assertTrue(expertService.getExpiredTopics().isEmpty());
}
public void getExpiredTopics2SuccessTest() {
TopicExpiredConfig topicExpiredConfig = getTopicExpiredConfig();
Mockito.when(configService.getByKey(Mockito.anyString(), Mockito.any())).thenReturn(topicExpiredConfig);
TopicExpiredDO topicExpiredDO = getTopicExpiredDO();
List<TopicExpiredDO> topicExpiredDOList = new ArrayList<>();
topicExpiredDOList.add(topicExpiredDO);
Mockito.when(topicManagerService.getExpiredTopics(Mockito.anyInt())).thenReturn(topicExpiredDOList);
Assert.assertTrue(expertService.getExpiredTopics().stream().allMatch(expiredDO -> expiredDO.getClusterId().equals(topicExpiredDO.getClusterId()) &&
expiredDO.getTopicName().equals(topicExpiredDO.getTopicName())));
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionAttributeDTO;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.*;
/**
* @author xuguang
* @Date 2021/12/14
*/
public class JmxServiceTest extends BaseTest {
/**
* 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上
*/
@Value("${test.topic.name1}")
private String REAL_TOPIC1_IN_ZK;
/**
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
*/
@Value("${test.topic.name2}")
private String REAL_TOPIC2_IN_ZK;
private final static String INVALID_TOPIC = "xxxxx";
private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets";
private final static String NO_OFFSET_CHANGE_TOPIC_IN_ZK = "NoOffsetChangeTopic";
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Value("${test.broker.id1}")
private Integer REAL_BROKER_ID_IN_ZK;
private final static Integer INVALID_BROKER_ID = -1;
private final static Long INVALID_CLUSTER_ID = -1L;
private final static Integer INVALID_PARTITION_ID = -1;
@Value("${test.client-id}")
private String CLIENT_ID;
private final static Integer INVALID_METRICS_CODE = -1;
@Autowired
private JmxService jmxService;
private PartitionState getPartitionState() {
PartitionState partitionState = new PartitionState();
partitionState.setPartitionId(0);
partitionState.setLeader(2);
return partitionState;
}
@Test
public void getBrokerMetricsTest() {
// 结果为空
getBrokerMetrics2NullTest();
// mbeanV2ListEmpty
getBrokerMetrics2mbeanV2ListEmptyTest();
// 获取成功
getBrokerMetrics2SuccessTest();
}
private void getBrokerMetrics2NullTest() {
BrokerMetrics brokerMetrics1 = jmxService.getBrokerMetrics(null, null, null);
Assert.assertNull(brokerMetrics1);
BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_BROKER_ID,
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS);
Assert.assertNull(brokerMetrics2);
}
private void getBrokerMetrics2mbeanV2ListEmptyTest() {
BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK,
-1);
Assert.assertNotNull(brokerMetrics2);
Assert.assertEquals(brokerMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
Assert.assertEquals(brokerMetrics2.getBrokerId(), REAL_BROKER_ID_IN_ZK);
Assert.assertTrue(brokerMetrics2.getMetricsMap().isEmpty());
}
private void getBrokerMetrics2SuccessTest() {
BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK,
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS);
Assert.assertNotNull(brokerMetrics2);
Assert.assertEquals(brokerMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
Assert.assertEquals(brokerMetrics2.getBrokerId(), REAL_BROKER_ID_IN_ZK);
Assert.assertFalse(brokerMetrics2.getMetricsMap().isEmpty());
}
@Test
public void getTopicMetricsWithBrokerIdTest() {
// 结果为空
getTopicMetricsWithBrokerId2nullTest();
// 获取的metrics为空
getTopicMetricsWithBrokerId2MetricsIsNullTest();
// 获取指标成功
getTopicMetricsWithBrokerId2SuccessTest();
}
private void getTopicMetricsWithBrokerId2nullTest() {
TopicMetrics topicMetrics1 = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK,
REAL_TOPIC1_IN_ZK,
-1, true);
Assert.assertNull(topicMetrics1);
TopicMetrics topicMetrics2 = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_BROKER_ID,
REAL_TOPIC1_IN_ZK,
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS
, true);
Assert.assertNull(topicMetrics2);
}
private void getTopicMetricsWithBrokerId2MetricsIsNullTest() {
// brokerId为3,不在该topic下
TopicMetrics topicMetrics2 = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
3,
REAL_TOPIC1_IN_ZK,
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS
, true);
Assert.assertNotNull(topicMetrics2);
Assert.assertEquals(topicMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
Assert.assertEquals(topicMetrics2.getTopicName(), REAL_TOPIC1_IN_ZK);
Assert.assertTrue(topicMetrics2.getMetricsMap().isEmpty());
}
private void getTopicMetricsWithBrokerId2SuccessTest() {
TopicMetrics topicMetrics2 = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK,
REAL_TOPIC1_IN_ZK,
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
, true);
Assert.assertNotNull(topicMetrics2);
Assert.assertEquals(topicMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
Assert.assertEquals(topicMetrics2.getTopicName(), REAL_TOPIC1_IN_ZK);
Assert.assertFalse(topicMetrics2.getMetricsMap().isEmpty());
}
@Test
public void getTopicMetricsWithoutBrokerId() {
// 返回为空
getTopicMetricsWithoutBrokerId2Null();
// add
getTopicMetricsWithoutBrokerId2Add();
// max
getTopicMetricsWithoutBrokerId2Max();
}
private void getTopicMetricsWithoutBrokerId2Null() {
TopicMetrics topicMetrics = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_TOPIC,
KafkaMetricsCollections.TOPIC_METRICS_TO_DB
, true);
Assert.assertNull(topicMetrics);
}
private void getTopicMetricsWithoutBrokerId2Add() {
TopicMetrics topicMetrics = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC1_IN_ZK,
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
, true);
Assert.assertNotNull(topicMetrics);
Assert.assertNotNull(topicMetrics.getBrokerMetricsList());
Assert.assertNotNull(topicMetrics.getMetricsMap());
}
private void getTopicMetricsWithoutBrokerId2Max() {
TopicMetrics topicMetrics = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC2_IN_ZK,
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
, false);
Assert.assertNotNull(topicMetrics);
Assert.assertNotNull(topicMetrics.getBrokerMetricsList());
Assert.assertNotNull(topicMetrics.getMetricsMap());
}
@Test(description = "测试获取集群下所有topic指标")
public void getTopicMetricsList() {
List<TopicMetrics> topicMetrics = jmxService.getTopicMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
, false);
Assert.assertFalse(topicMetrics.isEmpty());
Assert.assertTrue(topicMetrics.stream().allMatch(topicMetric ->
topicMetric.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL)));
}
@Test(description = "测试获取broker版本")
public void getBrokerVersion() {
// 结果为空
getBrokerVersion2Empty();
// 结果不为空
getBrokerVersion2NotEmpty();
}
private void getBrokerVersion2Empty() {
String brokerVersion = jmxService.getBrokerVersion(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_BROKER_ID);
Assert.assertEquals(brokerVersion, "");
}
private void getBrokerVersion2NotEmpty() {
String brokerVersion = jmxService.getBrokerVersion(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK);
Assert.assertNotEquals(brokerVersion, "");
}
@Test(description = "获取客户端限流信息")
public void getTopicAppThrottleTest() {
// 结果为0
getTopicAppThrottle2ZeroTest();
// 结果不为0
// getTopicAppThrottle2NotZeroTest();
}
private void getTopicAppThrottle2ZeroTest() {
double topicAppThrottle = jmxService.getTopicAppThrottle(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_BROKER_ID,
"1",
KafkaClientEnum.FETCH_CLIENT);
Assert.assertEquals(topicAppThrottle, 0.0d);
}
private void getTopicAppThrottle2NotZeroTest() {
double topicAppThrottle = jmxService.getTopicAppThrottle(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK,
CLIENT_ID,
KafkaClientEnum.FETCH_CLIENT);
// 未设置限流,所以还是为0
Assert.assertEquals(topicAppThrottle, 0.0d);
}
@Test(description = "获取被限流信息")
public void getBrokerThrottleClientsTest() {
// 结果为空
getBrokerThrottleClients2EmptyTest();
// 构造限流client,返回结果不为空
// 需要流量达到限制值,比较难构造
// getBrokerThrottleClients2NotEmptyTest();
}
private void getBrokerThrottleClients2EmptyTest() {
Set<String> brokerThrottleClients = jmxService.getBrokerThrottleClients(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_BROKER_ID,
KafkaClientEnum.FETCH_CLIENT);
Assert.assertTrue(brokerThrottleClients.isEmpty());
}
private void getBrokerThrottleClients2NotEmptyTest() {
Set<String> brokerThrottleClients = jmxService.getBrokerThrottleClients(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_BROKER_ID_IN_ZK,
KafkaClientEnum.FETCH_CLIENT);
Assert.assertFalse(brokerThrottleClients.isEmpty());
}
@Test(description = "测试获取topic消息压缩指标")
public void getTopicCodeCValueTest() {
// 结果为null
getTopicCodeCValue2NullTest();
// 结果不为null
getTopicCodeCValue2SuccessTest();
}
private void getTopicCodeCValue2NullTest() {
String result = jmxService.getTopicCodeCValue(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC);
Assert.assertNull(result);
}
private void getTopicCodeCValue2SuccessTest() {
String result = jmxService.getTopicCodeCValue(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC2_IN_ZK);
Assert.assertNotNull(result);
}
@Test(description = "测试从JMX中获取appId维度的的流量信息")
public void getTopicAppMetricsTest() {
// result is empty
getTopicAppMetrics2Empty();
// result is not empty
getTopicAppMetrics2NotEmpty();
}
private void getTopicAppMetrics2Empty() {
List<TopicMetrics> topicAppMetrics = jmxService.getTopicAppMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_METRICS_CODE);
Assert.assertTrue(topicAppMetrics.isEmpty());
List<TopicMetrics> topicAppMetrics2 = jmxService.getTopicAppMetrics(
INVALID_CLUSTER_ID,
KafkaMetricsCollections.APP_TOPIC_METRICS_TO_DB);
Assert.assertTrue(topicAppMetrics2.isEmpty());
}
private void getTopicAppMetrics2NotEmpty() {
List<TopicMetrics> topicAppMetrics = jmxService.getTopicAppMetrics(
REAL_CLUSTER_ID_IN_MYSQL,
KafkaMetricsCollections.APP_TOPIC_METRICS_TO_DB
);
Assert.assertFalse(topicAppMetrics.isEmpty());
}
// @Test
public void getBrokerTopicLocationTest() {
// result is empty
getBrokerTopicLocation2EmptyTest();
// result is not empty
getBrokerTopicLocation2NotEmptyTest();
}
private void getBrokerTopicLocation2EmptyTest() {
Map<TopicPartition, String> brokerTopicLocation = jmxService.getBrokerTopicLocation(
REAL_CLUSTER_ID_IN_MYSQL,
INVALID_BROKER_ID
);
Assert.assertTrue(brokerTopicLocation.isEmpty());
}
private void getBrokerTopicLocation2NotEmptyTest() {
Map<TopicPartition, String> brokerTopicLocation = jmxService.getBrokerTopicLocation(
REAL_CLUSTER_ID_IN_MYSQL,
2
);
Assert.assertFalse(brokerTopicLocation.isEmpty());
}
@Test
public void getPartitionAttributeTest() {
// result is empty
getPartitionAttribute2EmptyTest();
// result is not empty
getPartitionAttribute2NotEmptyTest();
}
private void getPartitionAttribute2EmptyTest() {
Map<Integer, PartitionAttributeDTO> list = jmxService.getPartitionAttribute(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC2_IN_ZK,
Collections.emptyList());
Assert.assertTrue(list.isEmpty());
}
private void getPartitionAttribute2NotEmptyTest() {
// 需要确定leader所在broker
PartitionState partitionState1 = getPartitionState();
PartitionState partitionState2 = getPartitionState();
partitionState2.setLeader(3);
partitionState2.setPartitionId(1);
Map<Integer, PartitionAttributeDTO> list = jmxService.getPartitionAttribute(
REAL_CLUSTER_ID_IN_MYSQL,
REAL_TOPIC2_IN_ZK,
Arrays.asList(partitionState1, partitionState1, partitionState2)
);
Assert.assertFalse(list.isEmpty());
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ao.cluster.LogicalClusterMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaBillDO;
import com.xiaojukeji.kafka.manager.dao.KafkaBillDao;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* @author xuguang
* @Date 2021/12/14
*/
public class KafkaBillServiceTest extends BaseTest {
@Autowired
@InjectMocks
private KafkaBillService kafkaBillService;
@Mock
private KafkaBillDao kafkaBillDao;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Value("${test.admin}")
private String ADMIN;
private KafkaBillDO getKafkaBillDO() {
KafkaBillDO kafkaBillDO = new KafkaBillDO();
kafkaBillDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
kafkaBillDO.setCost(100.0d);
kafkaBillDO.setGmtCreate(new Date(1638605696062L));
kafkaBillDO.setGmtDay("10");
kafkaBillDO.setPrincipal(ADMIN);
kafkaBillDO.setQuota(1000.0d);
kafkaBillDO.setTopicName("moduleTest");
return kafkaBillDO;
}
private BrokerMetricsDO getBrokerMetricsDO() {
BrokerMetricsDO metricsDO = new BrokerMetricsDO();
metricsDO.setMetrics("");
return metricsDO;
}
@Test()
public void replaceTest() {
KafkaBillDO kafkaBillDO = getKafkaBillDO();
// 插入成功
replace2SuccessTest(kafkaBillDO);
// 插入失败
replace2ExceptionTest(kafkaBillDO);
}
private void replace2SuccessTest(KafkaBillDO kafkaBillDO) {
Mockito.when(kafkaBillDao.replace(Mockito.any())).thenReturn(1);
int result = kafkaBillService.replace(kafkaBillDO);
Assert.assertEquals(result, 1);
}
private void replace2ExceptionTest(KafkaBillDO kafkaBillDO) {
Mockito.when(kafkaBillDao.replace(Mockito.any())).thenThrow(RuntimeException.class);
int result = kafkaBillService.replace(kafkaBillDO);
Assert.assertEquals(result, 0);
}
@Test()
public void getByTopicNameTest() {
KafkaBillDO kafkaBillDO = getKafkaBillDO();
// 查询成功
getByTopicName2SuccessTest(kafkaBillDO);
// 查询异常
getByTopicName2ExceptionTest();
}
private void getByTopicName2SuccessTest(KafkaBillDO kafkaBillDO) {
Mockito.when(kafkaBillDao.getByTopicName(
Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(Arrays.asList(kafkaBillDO));
List<KafkaBillDO> result = kafkaBillService.getByTopicName(1L, "moudleTest", new Date(0L), new Date());
Assert.assertFalse(result.isEmpty());
Assert.assertTrue(result.stream().allMatch(kafkaBillDO1 ->
kafkaBillDO1.getTopicName().equals(kafkaBillDO.getTopicName()) &&
kafkaBillDO1.getClusterId().equals(kafkaBillDO.getClusterId())));
}
private void getByTopicName2ExceptionTest() {
Mockito.when(kafkaBillDao.getByTopicName(
Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenThrow(RuntimeException.class);
List<KafkaBillDO> result = kafkaBillService.getByTopicName(1L, "moudleTest", new Date(0L), new Date());
Assert.assertTrue(result.isEmpty());
}
@Test()
public void getByPrincipalTest() {
KafkaBillDO kafkaBillDO = getKafkaBillDO();
// 查询成功
getByPrincipal2SuccessTest(kafkaBillDO);
// 查询失败
getByPrincipal2ExceptionTest();
}
private void getByPrincipal2SuccessTest(KafkaBillDO kafkaBillDO) {
Mockito.when(kafkaBillDao.getByPrincipal(
Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(Arrays.asList(kafkaBillDO));
List<KafkaBillDO> result = kafkaBillService.getByPrincipal("admin", new Date(0L), new Date());
Assert.assertFalse(result.isEmpty());
Assert.assertTrue(result.stream().allMatch(kafkaBillDO1 ->
kafkaBillDO1.getTopicName().equals(kafkaBillDO.getTopicName()) &&
kafkaBillDO1.getClusterId().equals(kafkaBillDO.getClusterId())));
}
private void getByPrincipal2ExceptionTest() {
Mockito.when(kafkaBillDao.getByPrincipal(
Mockito.anyString(), Mockito.any(), Mockito.any())).thenThrow(RuntimeException.class);
List<KafkaBillDO> result = kafkaBillService.getByPrincipal("admin", new Date(0L), new Date());
Assert.assertTrue(result.isEmpty());
}
@Test()
public void getByTimeBetweenTest() {
KafkaBillDO kafkaBillDO = getKafkaBillDO();
// 查询成功
getByTimeBetween2SuccessTest(kafkaBillDO);
// 查询失败
getByTimeBetween2ExceptionTest();
}
private void getByTimeBetween2SuccessTest(KafkaBillDO kafkaBillDO) {
Mockito.when(kafkaBillDao.getByTimeBetween(
Mockito.any(), Mockito.any())).thenReturn(Arrays.asList(kafkaBillDO));
List<KafkaBillDO> result = kafkaBillService.getByTimeBetween(new Date(0L), new Date());
Assert.assertFalse(result.isEmpty());
Assert.assertTrue(result.stream().allMatch(kafkaBillDO1 ->
kafkaBillDO1.getTopicName().equals(kafkaBillDO.getTopicName()) &&
kafkaBillDO1.getClusterId().equals(kafkaBillDO.getClusterId())));
}
private void getByTimeBetween2ExceptionTest() {
Mockito.when(kafkaBillDao.getByTimeBetween(
Mockito.any(), Mockito.any())).thenThrow(RuntimeException.class);
List<KafkaBillDO> result = kafkaBillService.getByTimeBetween(new Date(0L), new Date());
Assert.assertTrue(result.isEmpty());
}
@Test()
public void getByGmtDayTest() {
KafkaBillDO kafkaBillDO = getKafkaBillDO();
// 查询成功
getByGmtDay2SuccessTest(kafkaBillDO);
// 查询失败
getByGmtDay2ExceptionTest();
}
private void getByGmtDay2SuccessTest(KafkaBillDO kafkaBillDO) {
Mockito.when(kafkaBillDao.getByGmtDay(
Mockito.anyString())).thenReturn(Arrays.asList(kafkaBillDO));
List<KafkaBillDO> result = kafkaBillService.getByGmtDay("10");
Assert.assertFalse(result.isEmpty());
Assert.assertTrue(result.stream().allMatch(kafkaBillDO1 ->
kafkaBillDO1.getTopicName().equals(kafkaBillDO.getTopicName()) &&
kafkaBillDO1.getClusterId().equals(kafkaBillDO.getClusterId())));
}
private void getByGmtDay2ExceptionTest() {
Mockito.when(kafkaBillDao.getByGmtDay(
Mockito.anyString())).thenThrow(RuntimeException.class);
List<KafkaBillDO> result = kafkaBillService.getByGmtDay("10");
Assert.assertTrue(result.isEmpty());
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum;
import com.xiaojukeji.kafka.manager.common.entity.dto.rd.OperateRecordDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wyc
* @date 2021/12/8
*/
public class OperateRecordServiceTest extends BaseTest {
@Autowired
private OperateRecordService operateRecordService;
@DataProvider(name = "operateRecordDO")
public Object[][] provideOperateRecordDO() {
OperateRecordDO operateRecordDO = new OperateRecordDO();
operateRecordDO.setId(3L);
// 0:topic, 1:应用, 2:配额, 3:权限, 4:集群, 5:分区, 6:Gateway配置, -1:未知
operateRecordDO.setModuleId(ModuleEnum.CLUSTER.getCode());
// 0:新增, 1:删除, 2:修改
operateRecordDO.setOperateId(OperateEnum.ADD.getCode());
// topic名称、app名称
operateRecordDO.setResource("testOpRecord");
operateRecordDO.setContent("testContent");
operateRecordDO.setOperator("admin");
return new Object[][] {{operateRecordDO}};
}
private OperateRecordDTO getOperateRecordDTO() {
OperateRecordDTO dto = new OperateRecordDTO();
dto.setModuleId(ModuleEnum.CLUSTER.getCode());
dto.setOperateId(OperateEnum.ADD.getCode());
dto.setOperator("admin");
return dto;
}
@Test(dataProvider = "operateRecordDO", description = "插入操作记录成功测试")
public void insert2SuccessTest(OperateRecordDO operateRecordDO) {
int result = operateRecordService.insert(operateRecordDO);
Assert.assertEquals(result, 1);
}
@Test(description = "插入的重载方法操作成功测试")
public void insert2SuccessTest1() {
Map<String, String> content = new HashMap<>();
content.put("key", "value");
int result = operateRecordService.insert("admin", ModuleEnum.CLUSTER, "testOpRecord", OperateEnum.ADD, content);
Assert.assertEquals(result, 1);
}
@Test(dataProvider = "operateRecordDO")
public void queryByConditionTest(OperateRecordDO operateRecordDO) {
operateRecordService.insert(operateRecordDO);
// endTime和startTime都是null
queryByConditionTest3(operateRecordDO);
// startTime是null
queryByConditionTest1(operateRecordDO);
// endTime是null
queryByConditionTest2(operateRecordDO);
// endTime和startTime都不是null
queryByConditionTest4(operateRecordDO);
}
private void queryByConditionTest1(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
dto.setEndTime(new Date().getTime());
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertFalse(queryResult.isEmpty());
// 判断查询得到的OperateRecordDO中日期是否符合要求
Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 ->
operateRecordDO1.getCreateTime().after(new Date(0L)) &&
operateRecordDO1.getCreateTime().before(new Date()) &&
operateRecordDO1.getModuleId().equals(dto.getModuleId()) &&
operateRecordDO1.getOperateId().equals(dto.getOperateId()) &&
operateRecordDO1.getOperator().equals(dto.getOperator())));
}
private void queryByConditionTest2(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
dto.setStartTime(new Date().getTime());
// 查询的是create_time >= startTime, 因为创建时间在当前时间之前,因此查到的数据是空的
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertTrue(queryResult.isEmpty());
}
private void queryByConditionTest3(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertFalse(queryResult.isEmpty());
Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 ->
operateRecordDO1.getCreateTime().after(new Date(0L)) &&
operateRecordDO1.getCreateTime().before(new Date()) &&
operateRecordDO1.getModuleId().equals(dto.getModuleId()) &&
operateRecordDO1.getOperateId().equals(dto.getOperateId()) &&
operateRecordDO1.getOperator().equals(dto.getOperator())));
}
private void queryByConditionTest4(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
dto.setStartTime(0L);
dto.setEndTime(1649036393371L);
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertFalse(queryResult.isEmpty());
Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 ->
operateRecordDO1.getCreateTime().after(new Date(0L)) &&
operateRecordDO1.getCreateTime().before(new Date()) &&
operateRecordDO1.getModuleId().equals(dto.getModuleId()) &&
operateRecordDO1.getOperateId().equals(dto.getOperateId()) &&
operateRecordDO1.getOperator().equals(dto.getOperator())));
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.LogicalClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author wyc
* @date 2021/12/8
*/
public class RegionServiceTest extends BaseTest{
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Value("${test.region-name}")
private String REAL_REGION_NAME_IN_CLUSTER;
@Value("${test.topic.name1}")
private String REAL_TOPIC1_IN_ZK;
@Autowired
private RegionService regionService;
@DataProvider(name = "regionDO")
public Object[][] provideRegionDO() {
RegionDO regionDO = new RegionDO();
regionDO.setStatus(0);
regionDO.setName("region1");
// 物理集群id
regionDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
regionDO.setDescription("test");
List<Integer> brokerIdList = new ArrayList<>();
brokerIdList.add(3);
regionDO.setBrokerList(ListUtils.intList2String(brokerIdList));
return new Object[][] {{regionDO}};
}
private RegionDO getRegionDO() {
RegionDO regionDO = new RegionDO();
regionDO.setStatus(0);
regionDO.setName("region1");
// 物理集群id
regionDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
regionDO.setDescription("test");
List<Integer> brokerIdList = new ArrayList<>();
brokerIdList.add(3);
regionDO.setBrokerList(ListUtils.intList2String(brokerIdList));
return regionDO;
}
@Test(description = "creatRegion, 参数为null测试")
public void createRegion2ParamIllegalTest() {
Assert.assertEquals(regionService.createRegion(null), ResultStatus.PARAM_ILLEGAL);
}
@Test(description = "createRegion, 成功测试")
public void createRegion2SuccessTest() {
RegionDO regionDO = getRegionDO();
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
}
@Test(description = "createRegion, clusterId为空测试")
public void createRegion2ExistBrokerIdAlreadyInRegionTest1() {
RegionDO regionDO = getRegionDO();
regionDO.setClusterId(null);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_USED);
}
@Test(description = "createRegion, 创建时传入的brokerList中有被使用过的")
public void createRegion2ExistBrokerIdAlreadyInRegionTest2() {
RegionDO regionDO = getRegionDO();
// 真实物理集群和数据库中region使用1,2broker
// 再创建一个Region, 使用1,3broker
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(1);
newBrokerIdList.add(3);
regionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_USED);
}
@Test(description = "createRegion, 创建时,region使用到的broker挂掉了")
public void createRegion2BrokerNotExistTest() {
RegionDO regionDO = getRegionDO();
// 传入一个不存在的物理集群,检测时,会认为该集群存活的broker个数为0
regionDO.setClusterId(-1L);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.BROKER_NOT_EXIST);
}
@Test(description = "createRegion, 创建时,regionName重复")
public void createRegion2ResourceAlreadyExistTest() {
RegionDO regionDO = getRegionDO();
// 插入同名Region,注意brokerList需要保持不一样,不然会返回RESOURCE_ALREADY_USED
regionDO.setName(REAL_REGION_NAME_IN_CLUSTER);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_EXISTED);
}
@Test
public void deleteByIdTest() {
RegionDO regionDO = getRegionDO();
// 参数非法测试
deleteById2ParamIllegalTest(regionDO);
// 资源不存在测试
deleteById2ResourceNotExistTest(regionDO);
// 删除成功测试
deleteById2SuccessTest(regionDO);
}
private void deleteById2ParamIllegalTest(RegionDO regionDO) {
Assert.assertEquals(regionService.deleteById(null), ResultStatus.PARAM_ILLEGAL);
}
private void deleteById2ResourceNotExistTest(RegionDO regionDO) {
Assert.assertEquals(regionService.deleteById(10L), ResultStatus.RESOURCE_NOT_EXIST);
}
private void deleteById2SuccessTest(RegionDO regionDO) {
regionDO.setId(1L);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 插入时,xml文件中没用到id,id交给数据库自增,因此需要先查出Region的id,再根据id删除
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO region = regionDOList.get(0);
Assert.assertEquals(regionService.deleteById(region.getId()), ResultStatus.SUCCESS);
}
@Test(description = "updateRegion, 参数非法测试")
public void updateRegion2ParamIllegalTest1() {
RegionDO regionDO = getRegionDO();
Assert.assertEquals(regionService.updateRegion(null), ResultStatus.PARAM_ILLEGAL);
Assert.assertEquals(regionService.updateRegion(regionDO), ResultStatus.PARAM_ILLEGAL);
}
@Test(description = "updateRegion, 资源不存在测试")
public void updateRegion2ResourceNotExistTest1() {
RegionDO regionDO = getRegionDO();
// 不插入Region,直接更新
regionDO.setId(-1L);
Assert.assertEquals(regionService.updateRegion(regionDO), ResultStatus.RESOURCE_NOT_EXIST);
}
@Test(description = "updateRegion, brokerList未改变,成功测试")
public void updateRegion2SuccessWithBrokerListNotChangeTest1() {
RegionDO regionDO = getRegionDO();
// 先在数据库中创建一个Region
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 查询出创建的Region,并修改一些参数后,作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
newRegionDO.setStatus(1);
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.SUCCESS);
}
@Test(description = "updateRegion, 传入的broker已经被使用测试")
public void updateRegion2ResourceAlreadyUsedTest1() {
RegionDO regionDO = getRegionDO();
// 先在数据库中创建一个Region
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 查询出创建的Region,并修改brokerList后,作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(1);
newBrokerIdList.add(3);
// 更新Region的brokerList
newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
// 构造情况
newRegionDO.setClusterId(null);
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.RESOURCE_ALREADY_USED);
}
@Test(description = "updateRegion, 更新的broker不存在")
public void updateRegion2BrokerNotExistTest1() {
RegionDO regionDO = getRegionDO();
// 先在数据库中创建一个Region
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 查询出创建的Region,并修改brokerList后,作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
// 构造情况
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(4);
newBrokerIdList.add(5);
newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.BROKER_NOT_EXIST);
}
@Test(description = "updateRegion, brokeList发生了改变,成功测试")
public void updateRegion2SuccessWithBrokerListChangeTest1() {
RegionDO regionDO = getRegionDO();
// 查询出创建的Region,并修改brokerList后,作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
// 构造情况
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(1);
newBrokerIdList.add(3);
newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.SUCCESS);
}
@Test(description = "updateRegion重载方法,参数非法测试")
public void updateRegion2ParamIllegalTest2() {
RegionDO regionDO = getRegionDO();
Assert.assertEquals(regionService.updateRegion(null, "1,3"), ResultStatus.PARAM_ILLEGAL);
Assert.assertEquals(regionService.updateRegion(1L, "1, 3"), ResultStatus.PARAM_ILLEGAL);
}
@Test(description = "updateRegion重载方法,成功测试")
public void updateRegion2SuccessTest2() {
RegionDO regionDO = getRegionDO();
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO region = regionDOList.get(0);
Assert.assertEquals(regionService.updateRegion(region.getId(), "1,3"), ResultStatus.SUCCESS);
}
@Test
public void updateCapacityByIdTest() {
RegionDO regionDO = getRegionDO();
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
RegionDO region = regionService.getByClusterId(1L).get(0);
region.setCapacity(1000L);
// 成功测试
updateCapacityById2SuccessTest(region);
// 失败测试
// 集群中不存在regionId是100的
region.setId(100L);
updateCapacityByIdFailureTest(region);
}
private void updateCapacityById2SuccessTest(RegionDO regionDO) {
Assert.assertEquals(regionService.updateCapacityById(regionDO), 1);
}
private void updateCapacityByIdFailureTest(RegionDO regionDO) {
Assert.assertEquals(regionService.updateCapacityById(regionDO), 0);
}
@Test
public void getByIdTest() {
RegionDO regionDO = getRegionDO();
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 获取成功测试
RegionDO region = regionService.getByClusterId(1L).get(0);
getById2SuccessTest(region);
// 获取失败测试
region.setId(-1L);
getById2FailureTest(region);
}
private void getById2SuccessTest(RegionDO regionDO) {
Assert.assertEquals(regionService.getById(regionDO.getId()).toString(), regionDO.toString());
}
private void getById2FailureTest(RegionDO regionDO) {
Assert.assertNull(regionService.getById(regionDO.getId()));
}
private void getByClusterId2SuccessTest(RegionDO regionDO) {
Assert.assertNotNull(regionService.getByClusterId(regionDO.getClusterId()));
Assert.assertTrue(regionService.getByClusterId(regionDO.getClusterId()).stream().allMatch(regionDO1 ->
regionDO1.getName().equals(regionDO.getName())));
}
private void getByClusterId2FailureTest(RegionDO regionDO) {
Assert.assertTrue(regionService.getByClusterId(-1L).isEmpty());
}
@Test(dataProvider = "regionDO")
public void getRegionNumTest(RegionDO regionDO) {
// 插入一条数据
Map<Long, Integer> regionNum = regionService.getRegionNum();
for(Map.Entry<Long, Integer> entry : regionNum.entrySet()) {
Assert.assertEquals(entry.getKey(), Long.valueOf(1));
Assert.assertEquals(entry.getValue(), Integer.valueOf(1));
}
}
@Test(dataProvider = "regionDO")
public void getFullBrokerIdListTest(RegionDO regionDO) {
List<Integer> brokerIdList = new ArrayList<>();
brokerIdList.add(3);
// regionId是null测试
getFullBrokerIdList2RegionIdIsNullTest(regionDO, brokerIdList);
// 数据库中不存在对应的regionId数据
getFullBrokerIdList2RegionNotExistTest(regionDO, brokerIdList);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
RegionDO region = regionService.getByClusterId(1L).get(0);
// 传进来的brokerList是空的
getFullBrokerIdList2BrokerIdListIsEmpty(regionDO, region, new ArrayList<>());
// 传进来的brokerList不是空的
getFullBrokerIdList2Success(regionDO, region, brokerIdList);
}
private void getFullBrokerIdList2RegionIdIsNullTest(RegionDO regionDO, List<Integer> brokerIdList) {
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(1L, null, brokerIdList);
Assert.assertEquals(fullBrokerIdList, brokerIdList);
}
private void getFullBrokerIdList2RegionNotExistTest(RegionDO regionDO, List<Integer> brokerIdList) {
Assert.assertEquals(regionService.getFullBrokerIdList(1L, -1L, brokerIdList), brokerIdList);
}
private void getFullBrokerIdList2BrokerIdListIsEmpty(RegionDO regionDO, RegionDO regionInDataBase, List<Integer> brokerIdList) {
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(1L, regionInDataBase.getId(), brokerIdList);
Assert.assertEquals(fullBrokerIdList, ListUtils.string2IntList(regionInDataBase.getBrokerList()));
}
private void getFullBrokerIdList2Success(RegionDO regionDO, RegionDO regionInDataBase, List<Integer> brokerIdList) {
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(1L, regionInDataBase.getId(), brokerIdList);
List<Integer> allBrokerIdList = ListUtils.string2IntList(regionInDataBase.getBrokerList());
allBrokerIdList.addAll(brokerIdList);
Assert.assertEquals(allBrokerIdList, fullBrokerIdList);
}
private void convert2BrokerIdRegionMap2RegionListDOIsNull() {
Assert.assertTrue(regionService.convert2BrokerIdRegionMap(null).isEmpty());
}
private void convert2BrokerIdRegionMap2Success(RegionDO regionDO) {
// 预期结果, key是brokerId, value是Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO region = regionDOList.get(0);
Map<Integer, RegionDO> brokerIdRegionDOMap = ListUtils.string2IntList(regionDO.getBrokerList()).stream().collect(Collectors.toMap(brokerId -> brokerId, regionDO1 -> region));
// 实际结果
Map<Integer, RegionDO> result = regionService.convert2BrokerIdRegionMap(regionDOList);
Assert.assertEquals(brokerIdRegionDOMap, result);
}
@Test(dataProvider = "regionDO")
public void getIdleRegionBrokerListTest(RegionDO regionDO) {
// 物理集群id和regionIdList是null测试
getIdleRegionBrokerList2PhysicalClusterIdIsNullTest();
// 参数物理集群下的regionDOList为空测试
getIdleRegionBrokerList2RegionDOListIsEmptyTest();
// 成功测试
getIdleRegionBrokerList2SuccessTest(regionDO);
}
private void getIdleRegionBrokerList2PhysicalClusterIdIsNullTest() {
Assert.assertNull(regionService.getIdleRegionBrokerList(null, new ArrayList<>()));
}
private void getIdleRegionBrokerList2RegionDOListIsEmptyTest() {
List<Long> regionIdList = new ArrayList<>();
regionIdList.add(-1L);
Assert.assertNull(regionService.getIdleRegionBrokerList(1L, regionIdList));
}
private void getIdleRegionBrokerList2SuccessTest(RegionDO regionDO) {
// 从数据库中查找
List<Long> regionIdList = regionService.getByClusterId(1L).stream().map(RegionDO::getId).collect(Collectors.toList());
List<Integer> brokerIdList = regionService.getByClusterId(1L)
.stream().flatMap(regionDO1 -> ListUtils.string2IntList(regionDO1.getBrokerList()).stream())
.collect(Collectors.toList());
Assert.assertEquals(regionService.getIdleRegionBrokerList(1L, regionIdList), brokerIdList);
}
@Test
public void getTopicNameRegionBrokerIdMap2SuccessTest() {
// 创建逻辑集群,创建Topic,均已在数据库写入
// 逻辑集群基于物理集群1建立,region的brokerList是1,2
// Topic基于region建立,也就是使用到broker1和2
// 这个方法是返回topicName -> topic所使用broker以及这些broker所在region中所有的broker
Map<String, Set<Integer>> topicNameRegionBrokerIdMap = regionService.getTopicNameRegionBrokerIdMap(1L);
Set<Integer> set = new HashSet<>();
set.add(1);
set.add(2);
Assert.assertEquals(topicNameRegionBrokerIdMap.get(REAL_TOPIC1_IN_ZK), set);
}
@Test
public void getRegionListByTopicNameTest() {
// 数据库中依然建立了Region, LogicalCluster, Topic
getRegionListByTopicName2EmptyTest();
// 返回集合不为空测试
getRegionListByTopicName2Success();
}
private void getRegionListByTopicName2EmptyTest() {
// 传入一个不存在的topic
Assert.assertEquals(regionService.getRegionListByTopicName(1L, "notExistTopic"), new ArrayList<>());
}
private void getRegionListByTopicName2Success() {
List<RegionDO> expectedResult = regionService.getByClusterId(1L);
Assert.assertEquals(regionService.getRegionListByTopicName(1L, REAL_TOPIC1_IN_ZK), expectedResult);
}
}
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
import com.xiaojukeji.kafka.manager.dao.TopicExpiredDao;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
/**
* @author wyc
* @date 2021/12/20
*/
public class TopicExpiredServiceTest extends BaseTest {
/*
该topic在region_1上,region_1使用了1,2broker,该topic3个分区,2个副本
*/
@Value("${test.topic.name4}")
private String REAL_TOPIC1_IN_ZK;
@Value("${test.phyCluster.id}")
private Long REAL_CLUSTER_ID_IN_MYSQL;
@Autowired
private TopicExpiredDao topicExpiredDao;
@Autowired
private TopicExpiredService topicExpiredService;
private TopicExpiredDO getTopicExpiredDO() {
TopicExpiredDO topicExpiredDO = new TopicExpiredDO();
topicExpiredDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicExpiredDO.setExpiredDay(30);
topicExpiredDO.setTopicName(REAL_TOPIC1_IN_ZK);
topicExpiredDO.setStatus(0);
return topicExpiredDO;
}
@Test
public void retainExpiredTopicTest() {
// 参数非法测试
Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, "topic_a", -1), ResultStatus.PARAM_ILLEGAL);
// Topic不存在测试
Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, "topicNotExist", 40), ResultStatus.TOPIC_NOT_EXIST);
// 成功测试
// 过期Topic插入到topic_expired表中时,会先检查这个Topic是否在这个物理集群中,所以测试基于集群中建立了"topic_a"的topic
topicExpiredDao.replace(getTopicExpiredDO());
Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, getTopicExpiredDO().getTopicName(), 40), ResultStatus.SUCCESS);
}
@Test
public void deleteByNameTest() {
// 删除失败
Assert.assertEquals(topicExpiredService.deleteByTopicName(1L, "notExistTopic"), 0);
// 删除成功
// 先在topic_expired表中插入数据,可以插入不存在的topic,因为这个删除只是从数据库中删除,删除的时候并没有检验topic是否存在于集群
// 根据返回值判断是否删除成功
TopicExpiredDO topicExpiredDO = getTopicExpiredDO();
topicExpiredDO.setTopicName("test-topic");
topicExpiredDao.replace(topicExpiredDO);
Assert.assertEquals(topicExpiredService.deleteByTopicName(getTopicExpiredDO().getClusterId(), "test-topic"), 1);
}
}
package com.xiaojukeji.kafka.manager.service.utils;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Created by arthur on 2017/5/31.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:biz-test.xml" })
public class SpringTestBase {
}
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册