Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
db044caf
K
kafka-manager
项目概览
DiDi
/
kafka-manager
8 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
db044caf
编写于
4月 26, 2023
作者:
Z
ZQKC
提交者:
EricZeng
4月 26, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[Optimize]Group元信息更新优化(#1005)
1、Group元信息未变化时,则不进行updateById操作; 2、失效的Group信息直接删除;
上级
82fbea4e
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
114 addition
and
67 deletion
+114
-67
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java
...know/streaming/km/common/bean/po/group/GroupMemberPO.java
+13
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java
...ukeji/know/streaming/km/common/bean/po/group/GroupPO.java
+16
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java
...ji/know/streaming/km/common/converter/GroupConverter.java
+2
-0
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java
...ji/know/streaming/km/core/service/group/GroupService.java
+5
-4
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java
...treaming/km/core/service/group/impl/GroupServiceImpl.java
+71
-51
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaGroupTask.java
.../streaming/km/task/kafka/metadata/SyncKafkaGroupTask.java
+7
-12
未找到文件。
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java
浏览文件 @
db044caf
...
...
@@ -7,6 +7,7 @@ import lombok.Data;
import
lombok.NoArgsConstructor
;
import
java.util.Date
;
import
java.util.Objects
;
@Data
@NoArgsConstructor
...
...
@@ -37,4 +38,16 @@ public class GroupMemberPO extends BasePO {
this
.
memberCount
=
memberCount
;
this
.
updateTime
=
updateTime
;
}
public
boolean
equal2GroupMemberPO
(
GroupMemberPO
that
)
{
if
(
that
==
null
)
{
return
false
;
}
return
Objects
.
equals
(
clusterPhyId
,
that
.
clusterPhyId
)
&&
Objects
.
equals
(
topicName
,
that
.
topicName
)
&&
Objects
.
equals
(
groupName
,
that
.
groupName
)
&&
Objects
.
equals
(
state
,
that
.
state
)
&&
Objects
.
equals
(
memberCount
,
that
.
memberCount
);
}
}
\ No newline at end of file
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java
浏览文件 @
db044caf
...
...
@@ -9,6 +9,8 @@ import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
java.util.Objects
;
@Data
@NoArgsConstructor
...
...
@@ -58,4 +60,18 @@ public class GroupPO extends BasePO {
*/
private
int
coordinatorId
;
public
boolean
equal2GroupPO
(
GroupPO
groupPO
)
{
if
(
groupPO
==
null
)
{
return
false
;
}
return
coordinatorId
==
groupPO
.
coordinatorId
&&
Objects
.
equals
(
clusterPhyId
,
groupPO
.
clusterPhyId
)
&&
Objects
.
equals
(
type
,
groupPO
.
type
)
&&
Objects
.
equals
(
name
,
groupPO
.
name
)
&&
Objects
.
equals
(
state
,
groupPO
.
state
)
&&
Objects
.
equals
(
memberCount
,
groupPO
.
memberCount
)
&&
Objects
.
equals
(
topicMembers
,
groupPO
.
topicMembers
)
&&
Objects
.
equals
(
partitionAssignor
,
groupPO
.
partitionAssignor
);
}
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java
浏览文件 @
db044caf
...
...
@@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.stream.Collectors
;
/**
...
...
@@ -57,6 +58,7 @@ public class GroupConverter {
po
.
setTopicMembers
(
ConvertUtil
.
obj2Json
(
group
.
getTopicMembers
()));
po
.
setType
(
group
.
getType
().
getCode
());
po
.
setState
(
group
.
getState
().
getState
());
po
.
setUpdateTime
(
new
Date
());
return
po
;
}
}
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java
浏览文件 @
db044caf
...
...
@@ -12,9 +12,9 @@ import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import
com.xiaojukeji.know.streaming.km.common.exception.NotExistException
;
import
org.apache.kafka.common.TopicPartition
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
public
interface
GroupService
{
/**
...
...
@@ -35,10 +35,11 @@ public interface GroupService {
/**
* 批量更新DB
* @param clusterPhyId 集群ID
* @param newGroupList 新的group列表
* @param getFailedGroupSet 元信息获取失败的group列表
*/
void
batchReplaceGroupsAndMembers
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
long
updateTime
);
int
deleteByUpdateTimeBeforeInDB
(
Long
clusterPhyId
,
Date
beforeTime
);
void
batchReplaceGroupsAndMembers
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
Set
<
String
>
getFailedGroupSet
);
/**
* DB-Group相关接口
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java
浏览文件 @
db044caf
...
...
@@ -49,7 +49,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
@Service
public
class
GroupServiceImpl
extends
BaseKafkaVersionControlService
implements
GroupService
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
GroupServiceImpl
.
class
);
private
static
final
ILog
LOGGER
=
LogFactory
.
getLog
(
GroupServiceImpl
.
class
);
@Autowired
private
GroupDAO
groupDAO
;
...
...
@@ -92,7 +92,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
return
groupNameList
;
}
catch
(
Exception
e
)
{
log
.
error
(
"method=listGroupsFromKafka||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
LOGGER
.
error
(
"method=listGroupsFromKafka||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
throw
new
AdminOperateException
(
e
.
getMessage
(),
e
,
ResultStatus
.
KAFKA_OPERATE_FAILED
);
}
finally
{
...
...
@@ -142,7 +142,8 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
member
.
setMemberCount
(
member
.
getMemberCount
()
+
1
);
}
}
group
.
setTopicMembers
(
memberMap
.
values
().
stream
().
collect
(
Collectors
.
toList
()));
group
.
setTopicMembers
(
new
ArrayList
<>(
memberMap
.
values
()));
return
group
;
}
...
...
@@ -161,7 +162,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
return
offsetMap
;
}
catch
(
Exception
e
)
{
log
.
error
(
"method=getGroupOffset||clusterPhyId={}|groupName={}||errMsg=exception!"
,
clusterPhyId
,
groupName
,
e
);
LOGGER
.
error
(
"method=getGroupOffset||clusterPhyId={}|groupName={}||errMsg=exception!"
,
clusterPhyId
,
groupName
,
e
);
throw
new
AdminOperateException
(
e
.
getMessage
(),
e
,
ResultStatus
.
KAFKA_OPERATE_FAILED
);
}
...
...
@@ -187,7 +188,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
return
describeGroupsResult
.
all
().
get
().
get
(
groupName
);
}
catch
(
Exception
e
){
log
.
error
(
"method=getGroupDescription||clusterPhyId={}|groupName={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
groupName
,
e
);
LOGGER
.
error
(
"method=getGroupDescription||clusterPhyId={}|groupName={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
groupName
,
e
);
throw
new
AdminOperateException
(
e
.
getMessage
(),
e
,
ResultStatus
.
KAFKA_OPERATE_FAILED
);
}
finally
{
...
...
@@ -202,12 +203,12 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
}
@Override
public
void
batchReplaceGroupsAndMembers
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
long
updateTime
)
{
public
void
batchReplaceGroupsAndMembers
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
Set
<
String
>
getFailedGroupSet
)
{
// 更新Group信息
this
.
batchReplaceGroups
(
clusterPhyId
,
newGroupList
,
updateTime
);
this
.
batchReplaceGroups
(
clusterPhyId
,
newGroupList
,
getFailedGroupSet
);
// 更新Group-Topic信息
this
.
batchReplaceGroupMembers
(
clusterPhyId
,
newGroupList
,
updateTime
);
this
.
batchReplaceGroupMembers
(
clusterPhyId
,
newGroupList
,
getFailedGroupSet
);
}
@Override
...
...
@@ -283,21 +284,6 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
return
groupDAO
.
selectList
(
lambdaQueryWrapper
).
stream
().
map
(
elem
->
GroupConverter
.
convert2Group
(
elem
)).
collect
(
Collectors
.
toList
());
}
@Override
public
int
deleteByUpdateTimeBeforeInDB
(
Long
clusterPhyId
,
Date
beforeTime
)
{
// 删除过期Group信息
LambdaQueryWrapper
<
GroupPO
>
groupPOLambdaQueryWrapper
=
new
LambdaQueryWrapper
<>();
groupPOLambdaQueryWrapper
.
eq
(
GroupPO:
:
getClusterPhyId
,
clusterPhyId
);
groupPOLambdaQueryWrapper
.
le
(
GroupPO:
:
getUpdateTime
,
beforeTime
);
groupDAO
.
delete
(
groupPOLambdaQueryWrapper
);
// 删除过期GroupMember信息
LambdaQueryWrapper
<
GroupMemberPO
>
queryWrapper
=
new
LambdaQueryWrapper
<>();
queryWrapper
.
eq
(
GroupMemberPO:
:
getClusterPhyId
,
clusterPhyId
);
queryWrapper
.
le
(
GroupMemberPO:
:
getUpdateTime
,
beforeTime
);
return
groupMemberDAO
.
delete
(
queryWrapper
);
}
@Override
public
List
<
String
>
getGroupsFromDB
(
Long
clusterPhyId
)
{
LambdaQueryWrapper
<
GroupPO
>
lambdaQueryWrapper
=
new
LambdaQueryWrapper
<>();
...
...
@@ -368,7 +354,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
return
Result
.
buildSuc
();
}
catch
(
Exception
e
){
log
.
error
(
"method=resetGroupOffsets||clusterPhyId={}|groupName={}||errMsg=exception!"
,
clusterPhyId
,
groupName
,
e
);
LOGGER
.
error
(
"method=resetGroupOffsets||clusterPhyId={}|groupName={}||errMsg=exception!"
,
clusterPhyId
,
groupName
,
e
);
throw
new
AdminOperateException
(
e
.
getMessage
(),
e
,
ResultStatus
.
KAFKA_OPERATE_FAILED
);
}
...
...
@@ -378,62 +364,96 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
/**************************************************** private method ****************************************************/
private
void
batchReplaceGroupMembers
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
long
updateTime
)
{
if
(
ValidateUtils
.
isEmptyList
(
newGroupList
))
{
return
;
}
List
<
GroupMemberPO
>
dbPOList
=
this
.
listClusterGroupsMemberPO
(
clusterPhyId
);
Map
<
String
,
GroupMemberPO
>
dbPOMap
=
dbPOList
.
stream
().
collect
(
Collectors
.
toMap
(
elem
->
elem
.
getGroupName
()
+
elem
.
getTopicName
(),
Function
.
identity
()));
private
void
batchReplaceGroupMembers
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
Set
<
String
>
getFailedGroupSet
)
{
// DB 中的数据
Map
<
String
,
GroupMemberPO
>
dbPOMap
=
this
.
listClusterGroupsMemberPO
(
clusterPhyId
)
.
stream
()
.
collect
(
Collectors
.
toMap
(
elem
->
elem
.
getGroupName
()
+
elem
.
getTopicName
(),
Function
.
identity
()));
// 进行数据的更新
for
(
Group
group:
newGroupList
)
{
for
(
GroupTopicMember
member
:
group
.
getTopicMembers
())
{
try
{
GroupMemberPO
newPO
=
new
GroupMemberPO
(
clusterPhyId
,
member
.
getTopicName
(),
group
.
getName
(),
group
.
getState
().
getState
(),
member
.
getMemberCount
(),
new
Date
(
updateTime
));
GroupMemberPO
newPO
=
new
GroupMemberPO
(
clusterPhyId
,
member
.
getTopicName
(),
group
.
getName
(),
group
.
getState
().
getState
(),
member
.
getMemberCount
(),
new
Date
());
GroupMemberPO
dbPO
=
dbPOMap
.
remove
(
newPO
.
getGroupName
()
+
newPO
.
getTopicName
());
if
(
dbPO
!=
null
)
{
if
(
dbPO
==
null
)
{
// 数据不存在则直接写入
groupMemberDAO
.
insert
(
newPO
);
}
else
if
(!
dbPO
.
equal2GroupMemberPO
(
newPO
))
{
// 数据发生了变化则进行更新
newPO
.
setId
(
dbPO
.
getId
());
groupMemberDAO
.
updateById
(
newPO
);
continue
;
}
groupMemberDAO
.
insert
(
newPO
);
}
catch
(
Exception
e
)
{
log
.
error
(
LOGGER
.
error
(
"method=batchReplaceGroupMembers||clusterPhyId={}||groupName={}||topicName={}||errMsg=exception"
,
clusterPhyId
,
group
.
getName
(),
member
.
getTopicName
(),
e
);
}
}
}
}
private
void
batchReplaceGroups
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
long
updateTime
)
{
if
(
ValidateUtils
.
isEmptyList
(
newGroupList
))
{
return
;
}
// 删除剩余不存在的
dbPOMap
.
values
().
forEach
(
elem
->
{
try
{
if
(
getFailedGroupSet
.
contains
(
elem
.
getGroupName
()))
{
// 该group信息获取失败,所以忽略对该数据的删除
return
;
}
groupDAO
.
deleteById
(
elem
.
getId
());
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"method=batchReplaceGroupMembers||clusterPhyId={}||groupName={}||topicName={}||msg=delete expired group data in db failed||errMsg=exception"
,
clusterPhyId
,
elem
.
getGroupName
(),
elem
.
getTopicName
(),
e
);
}
});
}
List
<
GroupPO
>
dbGroupList
=
this
.
listClusterGroupsPO
(
clusterPhyId
);
Map
<
String
,
GroupPO
>
dbGroupMap
=
dbGroupList
.
stream
().
collect
(
Collectors
.
toMap
(
elem
->
elem
.
getName
(),
Function
.
identity
()));
private
void
batchReplaceGroups
(
Long
clusterPhyId
,
List
<
Group
>
newGroupList
,
Set
<
String
>
getFailedGroupSet
)
{
// 获取 DB 中的数据
Map
<
String
,
GroupPO
>
dbGroupMap
=
this
.
listClusterGroupsPO
(
clusterPhyId
)
.
stream
()
.
collect
(
Collectors
.
toMap
(
elem
->
elem
.
getName
(),
Function
.
identity
()));
// 进行数据的更新
for
(
Group
newGroup:
newGroupList
)
{
try
{
GroupPO
newPO
=
GroupConverter
.
convert2GroupPO
(
newGroup
);
newPO
.
setUpdateTime
(
new
Date
(
updateTime
));
GroupPO
dbPO
=
dbGroupMap
.
remove
(
newGroup
.
getName
());
if
(
dbPO
!=
null
)
{
if
(
dbPO
==
null
)
{
// 一条新的数据,则直接insert
groupDAO
.
insert
(
GroupConverter
.
convert2GroupPO
(
newGroup
));
continue
;
}
GroupPO
newPO
=
GroupConverter
.
convert2GroupPO
(
newGroup
);
if
(!
newPO
.
equal2GroupPO
(
dbPO
))
{
// 如果不相等,则直接更新
newPO
.
setId
(
dbPO
.
getId
());
groupDAO
.
updateById
(
newPO
);
continue
;
}
groupDAO
.
insert
(
newPO
);
// 其他情况,则不需要进行任何操作
}
catch
(
Exception
e
)
{
log
.
error
(
"method=batchGroupReplace
||clusterPhyId={}||groupName={}||errMsg=exception"
,
clusterPhyId
,
newGroup
.
getName
(),
e
);
LOGGER
.
error
(
"method=batchReplaceGroups
||clusterPhyId={}||groupName={}||errMsg=exception"
,
clusterPhyId
,
newGroup
.
getName
(),
e
);
}
}
// 删除剩余不存在的
dbGroupMap
.
values
().
forEach
(
elem
->
{
try
{
if
(
getFailedGroupSet
.
contains
(
elem
.
getName
()))
{
// 该group信息获取失败,所以忽略对该数据的删除
return
;
}
groupDAO
.
deleteById
(
elem
.
getId
());
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"method=batchReplaceGroups||clusterPhyId={}||groupName={}||msg=delete expired group data in db failed||errMsg=exception"
,
clusterPhyId
,
elem
.
getName
(),
e
);
}
});
}
private
List
<
GroupPO
>
listClusterGroupsPO
(
Long
clusterPhyId
)
{
...
...
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaGroupTask.java
浏览文件 @
db044caf
...
...
@@ -36,7 +36,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
// 获取集群的Group列表
List
<
String
>
groupNameList
=
groupService
.
listGroupsFromKafka
(
clusterPhy
);
TaskResult
allSuccess
=
TaskResult
.
SUCCESS
;
Set
<
String
>
getFailedGroupSet
=
new
HashSet
<>()
;
// 获取Group详细信息
List
<
Group
>
groupList
=
new
ArrayList
<>();
...
...
@@ -44,13 +44,16 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
try
{
Group
group
=
groupService
.
getGroupFromKafka
(
clusterPhy
,
groupName
);
if
(
group
==
null
)
{
// 获取到为空的 group 信息,直接忽略不要
continue
;
}
groupList
.
add
(
group
);
}
catch
(
Exception
e
)
{
log
.
error
(
"method=processClusterTask||clusterPhyId={}||groupName={}||errMsg=exception"
,
clusterPhy
.
getId
(),
groupName
,
e
);
allSuccess
=
TaskResult
.
FAIL
;
// 记录获取失败的 group 信息
getFailedGroupSet
.
add
(
groupName
);
}
}
...
...
@@ -58,17 +61,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
this
.
filterTopicIfTopicNotExist
(
clusterPhy
.
getId
(),
groupList
);
// 更新DB中的Group信息
groupService
.
batchReplaceGroupsAndMembers
(
clusterPhy
.
getId
(),
groupList
,
triggerTimeUnitMs
);
// 如果存在错误,则直接返回
if
(!
TaskResult
.
SUCCESS
.
equals
(
allSuccess
))
{
return
allSuccess
;
}
// 删除历史的Group
groupService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
5
*
60
*
1000
));
groupService
.
batchReplaceGroupsAndMembers
(
clusterPhy
.
getId
(),
groupList
,
getFailedGroupSet
);
return
allSuccess
;
return
getFailedGroupSet
.
isEmpty
()?
TaskResult
.
SUCCESS
:
TaskResult
.
FAIL
;
}
private
void
filterTopicIfTopicNotExist
(
Long
clusterPhyId
,
List
<
Group
>
groupList
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录