未验证 提交 c6bcc0e3 编写于 作者: E EricZeng 提交者: GitHub

Merge pull request #297 from didi/dev

split op util controller to topic controller and leader controller, and add authority controller, quota controller
package com.xiaojukeji.kafka.manager.common.entity.ao.gateway;
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
/**
* @author zhongyuankai
* @date 2020/4/27
......@@ -65,4 +67,15 @@ public class TopicQuota {
", consumeQuota=" + consumeQuota +
'}';
}
public static TopicQuota buildFrom(TopicQuotaDTO dto) {
TopicQuota topicQuota = new TopicQuota();
topicQuota.setAppId(dto.getAppId());
topicQuota.setClusterId(dto.getClusterId());
topicQuota.setTopicName(dto.getTopicName());
topicQuota.setProduceQuota(dto.getProduceQuota());
topicQuota.setConsumeQuota(dto.getConsumeQuota());
return topicQuota;
}
}
package com.xiaojukeji.kafka.manager.common.entity.dto.gateway;
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@ApiModel(description = "配额调整")
public class TopicQuotaDTO extends ClusterTopicDTO {
@ApiModelProperty(value = "appId")
private String appId;
@ApiModelProperty(value = "发送数据速率B/s")
private Long produceQuota;
@ApiModelProperty(value = "消费数据速率B/s")
private Long consumeQuota;
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public Long getProduceQuota() {
return produceQuota;
}
public void setProduceQuota(Long produceQuota) {
this.produceQuota = produceQuota;
}
public Long getConsumeQuota() {
return consumeQuota;
}
public void setConsumeQuota(Long consumeQuota) {
this.consumeQuota = consumeQuota;
}
@Override
public boolean paramLegal() {
return !ValidateUtils.isNullOrLessThanZero(clusterId) && !ValidateUtils.isBlank(topicName) && !ValidateUtils.isBlank(appId);
}
}
......@@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import java.util.Date;
import java.util.List;
......@@ -122,5 +123,12 @@ public interface TopicManagerService {
List<TopicStatisticsDO> getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime);
TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName);
/**
* topic权限调整
* @param authorityDO topic权限
* @return
*/
ResultStatus addAuthority(AuthorityDO authorityDO);
}
......@@ -105,4 +105,5 @@ public interface TopicService {
List<TopicBrokerDTO> getTopicBrokerList(Long clusterId, String topicName);
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
}
package com.xiaojukeji.kafka.manager.service.service.gateway;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
/**
......@@ -34,4 +35,11 @@ public interface QuotaService {
TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId);
Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota);
/**
* topic配额调整
* @param topicQuota topic配额
* @return
*/
ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota);
}
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy;
......@@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService {
@Autowired
private AbstractAllocateQuotaStrategy allocateQuotaStrategy;
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Autowired
private AuthorityService authorityService;
@Override
public int addTopicQuota(TopicQuota topicQuotaDO) {
return KafkaZookeeperUtils.setTopicQuota(
......@@ -78,4 +89,34 @@ public class QuotaServiceImpl implements QuotaService {
}
return Boolean.TRUE;
}
@Override
public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) {
// 获取物理集群id
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId());
if (ValidateUtils.isNull(physicalClusterId)) {
return ResultStatus.CLUSTER_NOT_EXIST;
}
// 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理)
AuthorityDO authority = authorityService.getAuthority(physicalClusterId,
topicQuota.getTopicName(), topicQuota.getAppId());
if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) {
return ResultStatus.USER_WITHOUT_AUTHORITY;
}
if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) {
// 可以消费
topicQuota.setProduceQuota(null);
}
if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) {
// 可以生产
topicQuota.setConsumeQuota(null);
}
// 设置物理集群id
topicQuota.setClusterId(physicalClusterId);
// 添加配额
if (addTopicQuota(topicQuota) > 0) {
return ResultStatus.SUCCESS;
}
return ResultStatus.ZOOKEEPER_WRITE_FAILED;
}
}
\ No newline at end of file
......@@ -20,6 +20,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData;
......@@ -618,6 +619,38 @@ public class TopicManagerServiceImpl implements TopicManagerService {
return topicBusinessInfo;
}
@Override
public ResultStatus addAuthority(AuthorityDO authorityDO) {
// 查询该用户拥有的应用
List<AppDO> appDOs = appService.getByPrincipal(SpringTool.getUserName());
if (ValidateUtils.isEmptyList(appDOs)) {
// 该用户无应用,需要先申请应用
return ResultStatus.APP_NOT_EXIST;
}
List<Long> appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList());
if (!appIds.contains(authorityDO.getAppId())) {
// 入参中的appId,该用户未拥有
return ResultStatus.APP_NOT_EXIST;
}
// 获取物理集群id
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(authorityDO.getClusterId());
if (ValidateUtils.isNull(physicalClusterId)) {
// 集群不存在
return ResultStatus.CLUSTER_NOT_EXIST;
}
TopicDO topic = getByTopicName(physicalClusterId, authorityDO.getTopicName());
if (ValidateUtils.isNull(topic)) {
// topic不存在
return ResultStatus.TOPIC_NOT_EXIST;
}
// 设置物理集群id
authorityDO.setClusterId(physicalClusterId);
if (authorityService.addAuthority(authorityDO) > 0) {
return ResultStatus.SUCCESS;
}
return ResultStatus.MYSQL_ERROR;
}
private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO,
String topicName,
TopicDO topicDO,
......
package com.xiaojukeji.kafka.manager.openapi.common.dto;
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@ApiModel(description = "权限调整")
public class TopicAuthorityDTO extends ClusterTopicDTO {
@ApiModelProperty(value = "appId")
private String appId;
@ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写, 4:可管理")
private Integer access;
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public Integer getAccess() {
return access;
}
public void setAccess(Integer access) {
this.access = access;
}
@Override
public boolean paramLegal() {
return !ValidateUtils.isNullOrLessThanZero(clusterId)
&& !ValidateUtils.isBlank(topicName)
&& !ValidateUtils.isBlank(appId)
&& !ValidateUtils.isNullOrLessThanZero(access);
}
}
......@@ -337,7 +337,7 @@ public class NormalTopicController {
}
return new Result<>(TopicModelConverter.convert2TopicMineAppVOList(
topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName()))
topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName()))
);
}
......
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* Authority操作相关接口
* @author zengqiao
* @date 21/5/18
*/
@Api(tags = "OP-Authority操作相关接口(REST)")
@RestController
public class OpAuthorityController {
@Autowired
private TopicManagerService topicManagerService;
@ApiOperation(value = "权限调整",notes = "权限调整")
@PostMapping(value = "topic-authorities")
@ResponseBody
public Result addAuthority(@RequestBody TopicAuthorityDTO dto) {
//非空校验
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
return Result.buildFrom(topicManagerService.addAuthority(AuthorityConverter.convert2AuthorityDO(dto)));
}
}
package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart;
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.bizenum.RebalanceDimensionEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.RebalanceDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.AdminService;
......@@ -16,22 +18,35 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* Leader操作[选举|切换]相关接口
* @author zengqiao
* @date 20/9/23
* @date 21/5/18
*/
@Api(tags = "开放接口-OP相关接口(REST)")
@Api(tags = "OP-Leader操作相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
public class ThirdPartOpUtilController {
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
public class OpLeaderController {
@Autowired
private AdminService adminService;
@Autowired
private ClusterService clusterService;
@ApiOperation(value = "优先副本选举状态")
@RequestMapping(value = {"leaders/preferred-replica-election-status", "utils/rebalance-status"}, method = RequestMethod.GET)
@ResponseBody
public Result preferredReplicaElectStatus(@RequestParam("clusterId") Long clusterId) {
ClusterDO clusterDO = clusterService.getById(clusterId);
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
TaskStatusEnum statusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
return new Result<>(JsonUtils.toJson(statusEnum));
}
@ApiOperation(value = "优先副本选举")
@RequestMapping(value = "op/rebalance", method = RequestMethod.POST)
@RequestMapping(value = {"leaders/preferred-replica-election", "utils/rebalance"}, method = RequestMethod.POST)
@ResponseBody
public Result preferredReplicaElect(@RequestBody RebalanceDTO reqObj) {
if (!reqObj.paramLegal()) {
......
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* Quota操作相关接口
* @author zengqiao
* @date 21/5/18
*/
@Api(tags = "OP-Quota操作相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
public class OpQuotaController {
@Autowired
private QuotaService quotaService;
@ApiOperation(value = "配额调整",notes = "配额调整")
@RequestMapping(value = "topic-quotas",method = RequestMethod.POST)
@ResponseBody
public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) {
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
// 非空校验
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto)));
}
}
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.bizenum.RebalanceDimensionEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.RebalanceDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.*;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicCreationDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicDeletionDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicExpansionDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicModificationDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.AdminService;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
import com.xiaojukeji.kafka.manager.service.utils.TopicCommands;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
......@@ -30,25 +29,25 @@ import java.util.List;
import java.util.Properties;
/**
* 运维工具类
* Topic操作相关接口
* @author zengqiao
* @date 20/4/2
* @date 21/5/18
*/
@Api(tags = "OP-Utils相关接口(REST)")
@Api(tags = "OP-Topic操作相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
public class OpUtilsController {
public class OpTopicController {
@Autowired
private ClusterService clusterService;
private AdminService adminService;
@Autowired
private AdminService adminService;
private ClusterService clusterService;
@Autowired
private TopicManagerService topicManagerService;
@ApiOperation(value = "创建Topic")
@RequestMapping(value = {"utils/topics"}, method = RequestMethod.POST)
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.POST)
@ResponseBody
public Result createCommonTopic(@RequestBody TopicCreationDTO dto) {
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
......@@ -76,44 +75,8 @@ public class OpUtilsController {
return Result.buildFrom(rs);
}
@ApiOperation(value = "Topic扩分区", notes = "")
@RequestMapping(value = {"utils/expand-partitions"}, method = RequestMethod.PUT)
@ResponseBody
public Result<List<TopicOperationResult>> expandTopics(@RequestBody List<TopicExpansionDTO> dtoList) {
if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
List<TopicOperationResult> resultList = new ArrayList<>();
for (TopicExpansionDTO dto: dtoList) {
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
if (!Constant.SUCCESS.equals(rc.getCode())) {
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), rc));
continue;
}
// 参数检查合法, 开始对Topic进行扩分区
ResultStatus statusEnum = adminService.expandPartitions(
rc.getData(),
dto.getTopicName(),
dto.getPartitionNum(),
dto.getRegionId(),
dto.getBrokerIdList(),
SpringTool.getUserName()
);
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), statusEnum));
}
for (TopicOperationResult operationResult: resultList) {
if (!Constant.SUCCESS.equals(operationResult.getCode())) {
return Result.buildFrom(ResultStatus.OPERATION_FAILED, resultList);
}
}
return new Result<>(resultList);
}
@ApiOperation(value = "Topic删除", notes = "单次不允许超过10个Topic")
@RequestMapping(value = {"utils/topics"}, method = RequestMethod.DELETE)
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.DELETE)
@ResponseBody
public Result<List<TopicOperationResult>> deleteTopics(@RequestBody List<TopicDeletionDTO> dtoList) {
if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) {
......@@ -143,7 +106,7 @@ public class OpUtilsController {
}
@ApiOperation(value = "修改Topic", notes = "")
@RequestMapping(value = {"utils/topics"}, method = RequestMethod.PUT)
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.PUT)
@ResponseBody
public Result modifyTopic(@RequestBody TopicModificationDTO dto) {
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
......@@ -170,48 +133,40 @@ public class OpUtilsController {
return new Result();
}
@ApiOperation(value = "优先副本选举状态")
@RequestMapping(value = "utils/rebalance-status", method = RequestMethod.GET)
@ApiOperation(value = "Topic扩分区", notes = "")
@RequestMapping(value = {"topics/expand-partitions", "utils/expand-partitions"}, method = RequestMethod.PUT)
@ResponseBody
public Result preferredReplicaElectStatus(@RequestParam("clusterId") Long clusterId) {
ClusterDO clusterDO = clusterService.getById(clusterId);
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
public Result<List<TopicOperationResult>> expandTopics(@RequestBody List<TopicExpansionDTO> dtoList) {
if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
TaskStatusEnum statusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
return new Result<>(JsonUtils.toJson(statusEnum));
}
List<TopicOperationResult> resultList = new ArrayList<>();
for (TopicExpansionDTO dto: dtoList) {
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
if (!Constant.SUCCESS.equals(rc.getCode())) {
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), rc));
continue;
}
@ApiOperation(value = "优先副本选举")
@RequestMapping(value = "utils/rebalance", method = RequestMethod.POST)
@ResponseBody
public Result preferredReplicaElect(@RequestBody RebalanceDTO reqObj) {
if (!reqObj.paramLegal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
ClusterDO clusterDO = clusterService.getById(reqObj.getClusterId());
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
// 参数检查合法, 开始对Topic进行扩分区
ResultStatus statusEnum = adminService.expandPartitions(
rc.getData(),
dto.getTopicName(),
dto.getPartitionNum(),
dto.getRegionId(),
dto.getBrokerIdList(),
SpringTool.getUserName()
);
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), statusEnum));
}
ResultStatus rs = null;
if (RebalanceDimensionEnum.CLUSTER.getCode().equals(reqObj.getDimension())) {
// 按照Cluster纬度均衡
rs = adminService.preferredReplicaElection(clusterDO, SpringTool.getUserName());
} else if (RebalanceDimensionEnum.BROKER.getCode().equals(reqObj.getDimension())) {
// 按照Broker纬度均衡
rs = adminService.preferredReplicaElection(clusterDO, reqObj.getBrokerId(), SpringTool.getUserName());
} else if (RebalanceDimensionEnum.TOPIC.getCode().equals(reqObj.getDimension())) {
// 按照Topic纬度均衡
rs = adminService.preferredReplicaElection(clusterDO, reqObj.getTopicName(), SpringTool.getUserName());
} else if (RebalanceDimensionEnum.PARTITION.getCode().equals(reqObj.getDimension())) {
// 按照Partition纬度均衡
rs = adminService.preferredReplicaElection(clusterDO, reqObj.getTopicName(), reqObj.getPartitionId(), SpringTool.getUserName());
} else {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
for (TopicOperationResult operationResult: resultList) {
if (!Constant.SUCCESS.equals(operationResult.getCode())) {
return Result.buildFrom(ResultStatus.OPERATION_FAILED, resultList);
}
}
return Result.buildFrom(rs);
return new Result<>(resultList);
}
private Result<ClusterDO> checkParamAndGetClusterDO(ClusterTopicDTO dto) {
......@@ -226,4 +181,3 @@ public class OpUtilsController {
return new Result<>(clusterDO);
}
}
......@@ -133,4 +133,4 @@ public class ThirdPartTopicController {
topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName))
);
}
}
\ No newline at end of file
}
package com.xiaojukeji.kafka.manager.web.converters;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
public class AuthorityConverter {
public static AuthorityDO convert2AuthorityDO(TopicAuthorityDTO dto) {
AuthorityDO authorityDO = new AuthorityDO();
authorityDO.setAppId(dto.getAppId());
authorityDO.setClusterId(dto.getClusterId());
authorityDO.setTopicName(dto.getTopicName());
authorityDO.setAccess(dto.getAccess());
return authorityDO;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册