Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
eb8fe775
K
kafka-manager
项目概览
DiDi
/
kafka-manager
9 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
eb8fe775
编写于
5月 12, 2021
作者:
T
tangcongfa_v@didichuxing.com
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
配额调整
上级
b68ba0bf
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
67 addition
and
53 deletion
+67
-53
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java
...kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java
+10
-0
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java
...iaojukeji/kafka/manager/service/service/TopicService.java
+0
-6
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
.../kafka/manager/service/service/impl/TopicServiceImpl.java
+0
-35
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java
...ager/web/api/versionone/normal/NormalTopicController.java
+0
-11
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java
...eb/api/versionone/thirdpart/ThirdPartTopicController.java
+57
-1
未找到文件。
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java
浏览文件 @
eb8fe775
package
com.xiaojukeji.kafka.manager.common.entity.dto.normal
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
...
...
@@ -39,4 +40,13 @@ public class TopicQuotaDTO extends ClusterTopicDTO {
public
void
setConsumeQuota
(
Long
consumeQuota
)
{
this
.
consumeQuota
=
consumeQuota
;
}
public
boolean
paramLegal
()
{
if
(
ValidateUtils
.
isNull
(
clusterId
)
||
ValidateUtils
.
isNull
(
topicName
)
||
ValidateUtils
.
isNull
(
appId
))
{
return
false
;
}
return
true
;
}
}
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java
浏览文件 @
eb8fe775
...
...
@@ -6,7 +6,6 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
import
com.xiaojukeji.kafka.manager.common.entity.ao.*
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.*
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBrokerDTO
;
...
...
@@ -107,9 +106,4 @@ public interface TopicService {
Result
<
TopicOffsetChangedEnum
>
checkTopicOffsetChanged
(
Long
physicalClusterId
,
String
topicName
,
Long
latestTime
);
/**
* 配额调整
*/
Result
addTopicQuota
(
TopicQuotaDTO
dto
);
}
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
浏览文件 @
eb8fe775
...
...
@@ -3,8 +3,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import
com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum
;
import
com.xiaojukeji.kafka.manager.common.entity.Result
;
import
com.xiaojukeji.kafka.manager.common.entity.ResultStatus
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO
;
import
com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
...
...
@@ -25,14 +23,12 @@ import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao;
import
com.xiaojukeji.kafka.manager.dao.TopicMetricsDao
;
import
com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.*
;
import
com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao
;
import
com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool
;
import
com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache
;
import
com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager
;
import
com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager
;
import
com.xiaojukeji.kafka.manager.service.service.*
;
import
com.xiaojukeji.kafka.manager.service.service.gateway.AppService
;
import
com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService
;
import
com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy
;
import
com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils
;
import
com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils
;
...
...
@@ -91,15 +87,6 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private
AbstractHealthScoreStrategy
healthScoreStrategy
;
@Autowired
private
AdminService
adminService
;
@Autowired
private
QuotaService
quotaService
;
@Autowired
private
AuthorityDao
authorityDao
;
@Override
public
List
<
TopicMetricsDO
>
getTopicMetricsFromDB
(
Long
clusterId
,
String
topicName
,
Date
startTime
,
Date
endTime
)
{
try
{
...
...
@@ -837,28 +824,6 @@ public class TopicServiceImpl implements TopicService {
return
new
Result
<>(
TopicOffsetChangedEnum
.
UNKNOWN
);
}
@Override
public
Result
addTopicQuota
(
TopicQuotaDTO
dto
)
{
//获取物理集群id
Long
physicalClusterId
=
logicalClusterMetadataManager
.
getPhysicalClusterId
(
dto
.
getClusterId
());
if
(
ValidateUtils
.
isNull
(
physicalClusterId
))
{
return
Result
.
buildFrom
(
ResultStatus
.
CLUSTER_NOT_EXIST
);
}
//构建配额
TopicQuota
topicQuota
=
new
TopicQuota
();
topicQuota
.
setClusterId
(
physicalClusterId
);
topicQuota
.
setAppId
(
dto
.
getAppId
());
topicQuota
.
setTopicName
(
dto
.
getTopicName
());
topicQuota
.
setProduceQuota
(
dto
.
getProduceQuota
());
topicQuota
.
setConsumeQuota
(
dto
.
getConsumeQuota
());
//配额调整
int
result
=
quotaService
.
addTopicQuota
(
topicQuota
);
if
(
result
>
0
)
{
return
Result
.
buildFrom
(
ResultStatus
.
SUCCESS
);
}
return
Result
.
buildFrom
(
ResultStatus
.
MYSQL_ERROR
);
}
private
Result
<
TopicOffsetChangedEnum
>
checkTopicOffsetChanged
(
ClusterDO
clusterDO
,
String
topicName
,
Map
<
TopicPartition
,
Long
>
endOffsetMap
)
{
...
...
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java
浏览文件 @
eb8fe775
...
...
@@ -7,7 +7,6 @@ import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO
;
import
com.xiaojukeji.kafka.manager.common.entity.vo.normal.TopicBusinessInfoVO
;
...
...
@@ -361,14 +360,4 @@ public class NormalTopicController {
return
new
Result
<>(
new
TopicStatisticMetricsVO
(
maxAvgBytesIn
));
}
@ApiOperation
(
value
=
"配额调整"
,
notes
=
"配额调整"
)
@RequestMapping
(
value
=
"{topics/quota/add}"
,
method
=
RequestMethod
.
POST
)
@ResponseBody
public
Result
addTopicQuota
(
@RequestBody
TopicQuotaDTO
dto
)
{
if
(
ValidateUtils
.
isNull
(
dto
))
{
return
Result
.
buildFrom
(
ResultStatus
.
PARAM_ILLEGAL
);
}
return
topicService
.
addTopicQuota
(
dto
);
}
}
\ No newline at end of file
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java
浏览文件 @
eb8fe775
...
...
@@ -5,7 +5,10 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections
;
import
com.xiaojukeji.kafka.manager.common.entity.Result
;
import
com.xiaojukeji.kafka.manager.common.entity.ResultStatus
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO
;
import
com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO
;
import
com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO
;
import
com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupVO
;
...
...
@@ -15,9 +18,12 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata
import
com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
import
com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager
;
import
com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager
;
import
com.xiaojukeji.kafka.manager.service.service.*
;
import
com.xiaojukeji.kafka.manager.common.constant.ApiPrefix
;
import
com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService
;
import
com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService
;
import
com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter
;
import
com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter
;
import
com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter
;
...
...
@@ -52,6 +58,15 @@ public class ThirdPartTopicController {
@Autowired
private
TopicManagerService
topicManagerService
;
@Autowired
private
AuthorityService
authorityService
;
@Autowired
private
QuotaService
quotaService
;
@Autowired
private
LogicalClusterMetadataManager
logicalClusterMetadataManager
;
@ApiOperation
(
value
=
"Topic元信息"
,
notes
=
"LogX调用"
)
@RequestMapping
(
value
=
"clusters/{clusterId}/topics/{topicName}/metadata"
,
method
=
RequestMethod
.
GET
)
@ResponseBody
...
...
@@ -133,4 +148,45 @@ public class ThirdPartTopicController {
topicManagerService
.
getTopicAuthorizedApps
(
physicalClusterId
,
topicName
))
);
}
}
\ No newline at end of file
@ApiOperation
(
value
=
"配额调整"
,
notes
=
"配额调整"
)
@RequestMapping
(
value
=
"{topics/quota/add}"
,
method
=
RequestMethod
.
POST
)
@ResponseBody
public
Result
addTopicQuota
(
@RequestBody
TopicQuotaDTO
dto
)
{
//非空校验
if
(
ValidateUtils
.
isNull
(
dto
)
||
!
dto
.
paramLegal
())
{
return
Result
.
buildFrom
(
ResultStatus
.
PARAM_ILLEGAL
);
}
//获取物理集群id
Long
physicalClusterId
=
logicalClusterMetadataManager
.
getPhysicalClusterId
(
dto
.
getClusterId
());
if
(
ValidateUtils
.
isNull
(
physicalClusterId
))
{
return
Result
.
buildFrom
(
ResultStatus
.
CLUSTER_NOT_EXIST
);
}
//权限判断(access 0:无权限, 1:读, 2:写, 3:读写)
AuthorityDO
authority
=
authorityService
.
getAuthority
(
physicalClusterId
,
dto
.
getTopicName
(),
dto
.
getAppId
());
if
(
ValidateUtils
.
isNull
(
authority
)
||
authority
.
getAccess
()
==
0
)
{
return
Result
.
buildFrom
(
ResultStatus
.
USER_WITHOUT_AUTHORITY
);
}
if
(
authority
.
getAccess
()
==
1
)
{
//可以消费
dto
.
setProduceQuota
(
null
);
}
if
(
authority
.
getAccess
()
==
2
)
{
//可以生产
dto
.
setConsumeQuota
(
null
);
}
//构建topicquota
TopicQuota
topicQuotaDO
=
new
TopicQuota
();
topicQuotaDO
.
setAppId
(
dto
.
getAppId
());
topicQuotaDO
.
setClusterId
(
physicalClusterId
);
topicQuotaDO
.
setTopicName
(
dto
.
getTopicName
());
topicQuotaDO
.
setConsumeQuota
(
dto
.
getConsumeQuota
());
topicQuotaDO
.
setProduceQuota
(
dto
.
getProduceQuota
());
//添加配额
if
(
quotaService
.
addTopicQuota
(
topicQuotaDO
)
>
0
)
{
return
Result
.
buildFrom
(
ResultStatus
.
SUCCESS
);
}
return
Result
.
buildFrom
(
ResultStatus
.
MYSQL_ERROR
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录