From 324b37b87524a238645295d1f1164be4f11840c9 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sun, 25 Apr 2021 18:11:52 +0800 Subject: [PATCH] v2.4.0 be code --- .../manager/common/bizenum/OperateEnum.java | 2 +- .../entity/dto/rd/OperateRecordDTO.java | 7 +----- .../topic}/TopicStatisticMetricsVO.java | 3 +-- .../service/service/OperateRecordService.java | 2 +- .../service/service/TopicManagerService.java | 9 +++++++ .../service/impl/AdminServiceImpl.java | 5 +++- .../service/impl/ClusterServiceImpl.java | 1 + .../impl/OperateRecordServiceImpl.java | 4 ++-- .../service/impl/TopicManagerServiceImpl.java | 14 ++++++----- .../kafka/manager/dao/OperateRecordDao.java | 2 +- .../dao/impl/OperateRecordDaoImpl.java | 4 ++-- .../resources/mapper/OperateRecordDao.xml | 2 +- .../normal/NormalTopicController.java | 21 ++++++++++++++++ .../rd/RdOperateRecordController.java | 2 +- .../thirdpart/ThirdPartTopicController.java | 24 ------------------- 15 files changed, 54 insertions(+), 48 deletions(-) rename {kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo => kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic}/TopicStatisticMetricsVO.java (91%) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java index 2bc874ec..af69ea50 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java @@ -46,7 +46,7 @@ public enum OperateEnum { public static boolean validate(Integer code) { if (code == null) { - return false; + return true; } for (OperateEnum state : OperateEnum.values()) { if (state.getCode() == code) { diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java index 1837ecfc..7f191017 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java @@ -81,11 +81,6 @@ public class OperateRecordDTO { } public boolean legal() { - if (!ModuleEnum.validate(moduleId) || - (!ValidateUtils.isNull(operateId) && OperateEnum.validate(operateId)) - ) { - return false; - } - return true; + return !ValidateUtils.isNull(moduleId) && ModuleEnum.validate(moduleId) && OperateEnum.validate(operateId); } } diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicStatisticMetricsVO.java similarity index 91% rename from kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java rename to kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicStatisticMetricsVO.java index 3665b7ac..c83c24d2 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicStatisticMetricsVO.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.openapi.common.vo; +package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -14,7 +14,6 @@ public class TopicStatisticMetricsVO { public TopicStatisticMetricsVO(Double peakBytesIn) { this.peakBytesIn = peakBytesIn; - } public Double getPeakBytesIn() { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java index 5b2909ca..f70548de 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java @@ -17,5 +17,5 @@ public interface OperateRecordService { int insert(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map content); - List queryByCondt(OperateRecordDTO dto); + List queryByCondition(OperateRecordDTO dto); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java index cfa2920f..8dc0e0c1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java @@ -32,6 +32,15 @@ public interface TopicManagerService { Map> getTopicMaxAvgBytesIn(Long clusterId, Integer latestDay, Double minMaxAvgBytesIn); + /** + * 获取指定时间范围内Topic的峰值均值流量 + * @param clusterId 集群ID + * @param topicName Topic名称 + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param maxAvgDay 最大几天的均值 + * @return + */ Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay); TopicStatisticsDO getByTopicAndDay(Long clusterId, String topicName, String gmtDay); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java index 26d7ef4d..8a0028c7 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java @@ -66,7 +66,10 @@ public class AdminServiceImpl implements AdminService { String applicant, String operator) { List fullBrokerIdList = regionService.getFullBrokerIdList(clusterDO.getId(), regionId, brokerIdList); - if (PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList) > DEFAULT_DEAD_BROKER_LIMIT_NUM) { + + Long notAliveBrokerNum = PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList); + if (notAliveBrokerNum >= fullBrokerIdList.size() || notAliveBrokerNum > DEFAULT_DEAD_BROKER_LIMIT_NUM) { + // broker全挂了,或者是挂的数量大于了DEFAULT_DEAD_BROKER_LIMIT_NUM时, 则认为broker参数不合法 return ResultStatus.BROKER_NOT_EXIST; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index b505bad0..ea9d22da 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -82,6 +82,7 @@ public class ClusterServiceImpl implements ClusterService { content.put("security properties", clusterDO.getSecurityProperties()); content.put("jmx properties", clusterDO.getJmxProperties()); operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content); + if (clusterDao.insert(clusterDO) <= 0) { LOGGER.error("add new cluster failed, clusterDO:{}.", clusterDO); return ResultStatus.MYSQL_ERROR; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java index 290bbae5..e232d970 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java @@ -41,8 +41,8 @@ public class OperateRecordServiceImpl implements OperateRecordService { } @Override - public List queryByCondt(OperateRecordDTO dto) { - return operateRecordDao.queryByCondt( + public List queryByCondition(OperateRecordDTO dto) { + return operateRecordDao.queryByCondition( dto.getModuleId(), dto.getOperateId(), dto.getOperator(), diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index 1d761eb8..bce5fbe7 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -147,12 +147,14 @@ public class TopicManagerServiceImpl implements TopicManagerService { } @Override - public Double getTopicMaxAvgBytesIn(Long clusterId, - String topicName, - Date startTime, - Date endTime, - Integer maxAvgDay) { - return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay); + public Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay) { + try { + return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay); + } catch (Exception e) { + LOGGER.error("class=TopicManagerServiceImpl||method=getTopicMaxAvgBytesIn||clusterId={}||topicName={}||startTime={}||endTime={}||maxAvgDay={}||errMsg={}", + clusterId, topicName, startTime, endTime, maxAvgDay, e.getMessage()); + } + return null; } @Override diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java index 05983754..4bfa6999 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java @@ -14,5 +14,5 @@ public interface OperateRecordDao { int insert(OperateRecordDO operateRecordDO); - List queryByCondt(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime); + List queryByCondition(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime); } diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java index b08e6b83..20f37fd1 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java @@ -30,13 +30,13 @@ public class OperateRecordDaoImpl implements OperateRecordDao { } @Override - public List queryByCondt(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime) { + public List queryByCondition(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime) { Map params = new HashMap<>(5); params.put("moduleId", moduleId); params.put("operateId", operateId); params.put("operator", operator); params.put("startTime", startTime); params.put("endTime", endTime); - return sqlSession.selectList("OperateRecordDao.queryByCondt", params); + return sqlSession.selectList("OperateRecordDao.queryByCondition", params); } } diff --git a/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml b/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml index b65c3e6f..db505b6b 100644 --- a/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml @@ -21,7 +21,7 @@ ) - select * from operate_record where diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 6e59816b..7b5d97c3 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -11,11 +11,13 @@ import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.TopicBusinessInfoVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.*; +import com.xiaojukeji.kafka.manager.common.utils.DateUtils; import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaBillDO; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxAttributeEnum; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicStatisticMetricsVO; import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; @@ -339,4 +341,23 @@ public class NormalTopicController { ); } + @ApiOperation(value = "Topic流量统计信息", notes = "") + @RequestMapping(value = "{clusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET) + @ResponseBody + public Result getTopicStatisticMetrics(@PathVariable Long clusterId, + @PathVariable String topicName, + @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId, + @RequestParam("latest-day") Integer latestDay) { + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId); + if (ValidateUtils.isNull(physicalClusterId)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + + Double maxAvgBytesIn = topicManagerService.getTopicMaxAvgBytesIn(physicalClusterId, topicName, new Date(DateUtils.getDayStarTime(-1 * latestDay)), new Date(), 1); + if (ValidateUtils.isNull(maxAvgBytesIn)) { + return Result.buildFrom(ResultStatus.MYSQL_ERROR); + } + return new Result<>(new TopicStatisticMetricsVO(maxAvgBytesIn)); + } + } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java index 68068f97..f600aab5 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java @@ -36,7 +36,7 @@ public class RdOperateRecordController { if (ValidateUtils.isNull(dto) || !dto.legal()) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } - List voList = OperateRecordModelConverter.convert2OperateRecordVOList(operateRecordService.queryByCondt(dto)); + List voList = OperateRecordModelConverter.convert2OperateRecordVOList(operateRecordService.queryByCondition(dto)); if (voList.size() > MAX_RECORD_COUNT) { voList = voList.subList(0, MAX_RECORD_COUNT); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java index 4d029fb6..b247cdb8 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -13,8 +13,6 @@ import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorize import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO; -import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicStatisticMetricsVO; -import com.xiaojukeji.kafka.manager.common.utils.DateUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; @@ -30,7 +28,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; -import java.util.Date; import java.util.List; /** @@ -69,27 +66,6 @@ public class ThirdPartTopicController { return new Result<>(vo); } - @ApiOperation(value = "Topic流量统计信息", notes = "") - @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET) - @ResponseBody - public Result getTopicStatisticMetrics(@PathVariable Long physicalClusterId, - @PathVariable String topicName, - @RequestParam("latest-day") Integer latestDay) { - try { - return new Result<>(new TopicStatisticMetricsVO(topicManagerService.getTopicMaxAvgBytesIn( - physicalClusterId, - topicName, - new Date(DateUtils.getDayStarTime(-1 * latestDay)), - new Date(), - 1 - ))); - } catch (Exception e) { - LOGGER.error("get topic statistic metrics failed, clusterId:{} topicName:{} latestDay:{}." - , physicalClusterId, topicName, latestDay, e); - } - return Result.buildFrom(ResultStatus.MYSQL_ERROR); - } - @ApiOperation(value = "Topic是否有流量", notes = "") @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/offset-changed", method = RequestMethod.GET) @ResponseBody -- GitLab