未验证 提交 47676139 编写于 作者: M mrazkong 提交者: GitHub

Merge pull request #183 from didi/dev

support dynamic change cluster auth
package com.xiaojukeji.kafka.manager.common.entity.pojo; package com.xiaojukeji.kafka.manager.common.entity.pojo;
import java.util.Date; import java.util.Date;
import java.util.Objects;
/** /**
* @author zengqiao * @author zengqiao
...@@ -116,4 +117,22 @@ public class ClusterDO implements Comparable<ClusterDO> { ...@@ -116,4 +117,22 @@ public class ClusterDO implements Comparable<ClusterDO> {
public int compareTo(ClusterDO clusterDO) { public int compareTo(ClusterDO clusterDO) {
return this.id.compareTo(clusterDO.id); return this.id.compareTo(clusterDO.id);
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterDO clusterDO = (ClusterDO) o;
return Objects.equals(id, clusterDO.id)
&& Objects.equals(clusterName, clusterDO.clusterName)
&& Objects.equals(zookeeper, clusterDO.zookeeper)
&& Objects.equals(bootstrapServers, clusterDO.bootstrapServers)
&& Objects.equals(securityProperties, clusterDO.securityProperties)
&& Objects.equals(jmxProperties, clusterDO.jmxProperties);
}
@Override
public int hashCode() {
return Objects.hash(id, clusterName, zookeeper, bootstrapServers, securityProperties, jmxProperties);
}
} }
\ No newline at end of file
package com.xiaojukeji.kafka.manager.service.cache; package com.xiaojukeji.kafka.manager.service.cache;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory; import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
import kafka.admin.AdminClient; import kafka.admin.AdminClient;
import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool;
...@@ -103,6 +103,21 @@ public class KafkaClientPool { ...@@ -103,6 +103,21 @@ public class KafkaClientPool {
} }
} }
public static void closeKafkaConsumerPool(Long clusterId) {
lock.lock();
try {
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
if (objectPool == null) {
return;
}
objectPool.close();
} catch (Exception e) {
LOGGER.error("close kafka consumer pool failed, clusterId:{}.", clusterId, e);
} finally {
lock.unlock();
}
}
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
if (ValidateUtils.isNull(clusterDO)) { if (ValidateUtils.isNull(clusterDO)) {
return null; return null;
...@@ -132,7 +147,11 @@ public class KafkaClientPool { ...@@ -132,7 +147,11 @@ public class KafkaClientPool {
if (ValidateUtils.isNull(objectPool)) { if (ValidateUtils.isNull(objectPool)) {
return; return;
} }
objectPool.returnObject(kafkaConsumer); try {
objectPool.returnObject(kafkaConsumer);
} catch (Exception e) {
LOGGER.error("return kafka consumer client failed, clusterId:{}", physicalClusterId, e);
}
} }
public static AdminClient getAdminClient(Long clusterId) { public static AdminClient getAdminClient(Long clusterId) {
......
...@@ -4,21 +4,23 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum; ...@@ -4,21 +4,23 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion; import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils; import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
import com.xiaojukeji.kafka.manager.dao.ControllerDao; import com.xiaojukeji.kafka.manager.dao.ControllerDao;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.zookeeper.*;
import com.xiaojukeji.kafka.manager.service.service.ClusterService; import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil; import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.zookeeper.BrokerStateListener;
import com.xiaojukeji.kafka.manager.service.zookeeper.ControllerStateListener;
import com.xiaojukeji.kafka.manager.service.zookeeper.TopicStateListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -160,8 +162,12 @@ public class PhysicalClusterMetadataManager { ...@@ -160,8 +162,12 @@ public class PhysicalClusterMetadataManager {
CLUSTER_MAP.remove(clusterId); CLUSTER_MAP.remove(clusterId);
} }
public Set<Long> getClusterIdSet() { public static Map<Long, ClusterDO> getClusterMap() {
return CLUSTER_MAP.keySet(); return CLUSTER_MAP;
}
public static void updateClusterMap(ClusterDO clusterDO) {
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
} }
public static ClusterDO getClusterFromCache(Long clusterId) { public static ClusterDO getClusterFromCache(Long clusterId) {
......
package com.xiaojukeji.kafka.manager.task.schedule.metadata; package com.xiaojukeji.kafka.manager.task.schedule.metadata;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService; import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashSet; import java.util.Map;
import java.util.List; import java.util.function.Function;
import java.util.Set; import java.util.stream.Collectors;
/** /**
* @author zengqiao * @author zengqiao
...@@ -25,24 +27,63 @@ public class FlushClusterMetadata { ...@@ -25,24 +27,63 @@ public class FlushClusterMetadata {
@Scheduled(cron="0/30 * * * * ?") @Scheduled(cron="0/30 * * * * ?")
public void flush() { public void flush() {
List<ClusterDO> doList = clusterService.list(); Map<Long, ClusterDO> dbClusterMap = clusterService.list().stream().collect(Collectors.toMap(ClusterDO::getId, Function.identity(), (key1, key2) -> key2));
Set<Long> newClusterIdSet = new HashSet<>(); Map<Long, ClusterDO> cacheClusterMap = PhysicalClusterMetadataManager.getClusterMap();
Set<Long> oldClusterIdSet = physicalClusterMetadataManager.getClusterIdSet();
for (ClusterDO clusterDO: doList) {
newClusterIdSet.add(clusterDO.getId());
// 添加集群 // 新增的集群
physicalClusterMetadataManager.addNew(clusterDO); for (ClusterDO clusterDO: dbClusterMap.values()) {
} if (cacheClusterMap.containsKey(clusterDO.getId())) {
// 已经存在
continue;
}
add(clusterDO);
}
for (Long clusterId: oldClusterIdSet) { // 移除的集群
if (newClusterIdSet.contains(clusterId)) { for (ClusterDO clusterDO: cacheClusterMap.values()) {
continue; if (dbClusterMap.containsKey(clusterDO.getId())) {
} // 已经存在
continue;
}
remove(clusterDO.getId());
}
// 移除集群 // 被修改配置的集群
physicalClusterMetadataManager.remove(clusterId); for (ClusterDO dbClusterDO: dbClusterMap.values()) {
} ClusterDO cacheClusterDO = cacheClusterMap.get(dbClusterDO.getId());
if (ValidateUtils.anyNull(cacheClusterDO) || dbClusterDO.equals(cacheClusterDO)) {
// 不存在 || 相等
continue;
}
modifyConfig(dbClusterDO);
}
} }
private void add(ClusterDO clusterDO) {
if (ValidateUtils.anyNull(clusterDO)) {
return;
}
physicalClusterMetadataManager.addNew(clusterDO);
}
private void modifyConfig(ClusterDO clusterDO) {
if (ValidateUtils.anyNull(clusterDO)) {
return;
}
PhysicalClusterMetadataManager.updateClusterMap(clusterDO);
KafkaClientPool.closeKafkaConsumerPool(clusterDO.getId());
}
private void remove(Long clusterId) {
if (ValidateUtils.anyNull(clusterId)) {
return;
}
// 移除缓存信息
physicalClusterMetadataManager.remove(clusterId);
// 清除客户端池子
KafkaClientPool.closeKafkaConsumerPool(clusterId);
}
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册