配额调整

上级 8b95b3ff
......@@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicDTO;
......@@ -122,5 +123,12 @@ public interface TopicManagerService {
List<TopicStatisticsDO> getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime);
TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName);
/**
* topic配额调整
* @param topicQuota topic配额
* @return
*/
ResultStatus addTopicQuota(TopicQuota topicQuota);
}
......@@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo;
......@@ -34,6 +35,7 @@ import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -86,6 +88,9 @@ public class TopicManagerServiceImpl implements TopicManagerService {
@Autowired
private OperateRecordService operateRecordService;
@Autowired
private QuotaService quotaService;
@Override
public List<TopicDO> listAll() {
try {
......@@ -618,6 +623,36 @@ public class TopicManagerServiceImpl implements TopicManagerService {
return topicBusinessInfo;
}
@Override
public ResultStatus addTopicQuota(TopicQuota topicQuota) {
// 获取物理集群id
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId());
if (ValidateUtils.isNull(physicalClusterId)) {
return ResultStatus.CLUSTER_NOT_EXIST;
}
// 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理)
AuthorityDO authority = authorityService.getAuthority(physicalClusterId,
topicQuota.getTopicName(), topicQuota.getAppId());
if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) {
return ResultStatus.USER_WITHOUT_AUTHORITY;
}
if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) {
// 可以消费
topicQuota.setProduceQuota(null);
}
if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) {
// 可以生产
topicQuota.setConsumeQuota(null);
}
// 设置物理集群id
topicQuota.setClusterId(physicalClusterId);
// 添加配额
if (quotaService.addTopicQuota(topicQuota) > 0) {
return ResultStatus.SUCCESS;
}
return ResultStatus.MYSQL_ERROR;
}
private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO,
String topicName,
TopicDO topicDO,
......
......@@ -17,11 +17,9 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter;
......@@ -56,12 +54,6 @@ public class ThirdPartTopicController {
@Autowired
private TopicManagerService topicManagerService;
@Autowired
private QuotaService quotaService;
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@ApiOperation(value = "Topic元信息", notes = "LogX调用")
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
@ResponseBody
......@@ -148,21 +140,11 @@ public class ThirdPartTopicController {
@RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST)
@ResponseBody
public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) {
//非空校验
// 非空校验
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
//获取物理集群id
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
if (ValidateUtils.isNull(physicalClusterId)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
dto.setClusterId(physicalClusterId);
// 添加配额
if (quotaService.addTopicQuota(TopicQuota.buildFrom(dto)) > 0) {
return Result.buildFrom(ResultStatus.SUCCESS);
}
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
return Result.buildFrom(topicManagerService.addTopicQuota(TopicQuota.buildFrom(dto)));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册