未验证 提交 64f77fca 编写于 作者: Z ZQKC 提交者: GitHub

Merge pull request #71 from didi/dev_2.x

开放接口
package com.xiaojukeji.kafka.manager.common.bizenum;
/**
* 消费健康
* @author zengqiao
* @date 20/5/22
*/
public enum ConsumeHealthEnum {
UNKNOWN(-1, "unknown"),
HEALTH(0, "health"),
UNHEALTH(1, "unhealth"),
;
private Integer code;
private String message;
ConsumeHealthEnum(Integer code, String message) {
this.code = code;
this.message = message;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "ConsumeHealthEnum{" +
"code=" + code +
", message='" + message + '\'' +
'}';
}
}
package com.xiaojukeji.kafka.manager.common.bizenum;
/**
* @author zengqiao
* @date 20/10/26
*/
public enum OffsetResetTypeEnum {
RESET_BY_TIME(0),
RESET_BY_OFFSET(1);
private final Integer code;
OffsetResetTypeEnum(Integer code) {
this.code = code;
}
public Integer getCode() {
return code;
}
}
......@@ -6,21 +6,20 @@ package com.xiaojukeji.kafka.manager.common.constant;
* @date 20/4/16
*/
public class ApiPrefix {
public static final String API_V1_SSO_PREFIX = "/api/v1/sso/";
public static final String API_V1_NORMAL_PREFIX = "/api/v1/normal/";
public static final String API_V1_RD_PREFIX = "/api/v1/rd/";
public static final String API_V1_OP_PREFIX = "/api/v1/op/";
public static final String API_V1_THIRD_PART_PREFIX = "/api/v1/third-part/";
public static final String API_V2_THIRD_PART_PREFIX = "/api/v2/third-part/";
public static final String API_V1_OBSOLETE_PREFIX = "/api/v1/";
public static final String API_V2_OBSOLETE_PREFIX = "/api/v2/";
public static final String GATEWAY_API_V1_PREFIX = "/gateway/api/v1/";
public static final String API_PREFIX = "/api/";
public static final String API_V1_PREFIX = API_PREFIX + "v1/";
public static final String API_V2_PREFIX = API_PREFIX + "v2/";
// console
public static final String API_V1_SSO_PREFIX = API_V1_PREFIX + "sso/";
public static final String API_V1_NORMAL_PREFIX = API_V1_PREFIX + "normal/";
public static final String API_V1_RD_PREFIX = API_V1_PREFIX + "rd/";
public static final String API_V1_OP_PREFIX = API_V1_PREFIX + "op/";
// open
public static final String API_V1_THIRD_PART_PREFIX = API_V1_PREFIX + "third-part/";
public static final String API_V2_THIRD_PART_PREFIX = API_V2_PREFIX + "third-part/";
// gateway
public static final String GATEWAY_API_V1_PREFIX = "/gateway" + API_V1_PREFIX;
}
\ No newline at end of file
......@@ -5,13 +5,5 @@ package com.xiaojukeji.kafka.manager.common.constant;
* @date 20/7/28
*/
public class SystemCodeConstant {
public static final String LOG_X = "LogX";
public static final String LEO = "leo";
public static final String DATA_DREAM = "datadream";
public static final String KAFKA_MANAGER = "kafka-manager";
public static final String CHORUS = "chorus"; // 治理平台-服务治理
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.common.entity;
/**
* @author zengqiao
* @date 20/7/27
*/
public class DeprecatedResponseResult<T> {
public static final String SUCCESS_STATUS = "success";
public static final String FAILED_STATUS = "failure";
public static final String SUCCESS_MESSAGE = "process succeeded!";
public static final String FAILED_MESSAGE = "process failed!";
private String status;
private String message;
private T data;
public static <T> DeprecatedResponseResult<T> success(T data) {
DeprecatedResponseResult<T> responseCommonResult = new DeprecatedResponseResult<T>();
responseCommonResult.setMessage(SUCCESS_MESSAGE);
responseCommonResult.setStatus(SUCCESS_STATUS);
responseCommonResult.setData(data);
return responseCommonResult;
}
public static <T> DeprecatedResponseResult<T> success() {
DeprecatedResponseResult<T> responseCommonResult = new DeprecatedResponseResult<T>();
responseCommonResult.setStatus(SUCCESS_STATUS);
responseCommonResult.setMessage(SUCCESS_MESSAGE);
return responseCommonResult;
}
public static <T> DeprecatedResponseResult<T> failure() {
DeprecatedResponseResult<T> responseCommonResult = new DeprecatedResponseResult<T>();
responseCommonResult.setMessage(FAILED_MESSAGE);
responseCommonResult.setStatus(FAILED_STATUS);
return responseCommonResult;
}
public static <T> DeprecatedResponseResult<T> failure(String message) {
DeprecatedResponseResult<T> responseCommonResult = new DeprecatedResponseResult<T>();
responseCommonResult.setMessage(message);
responseCommonResult.setStatus(FAILED_STATUS);
return responseCommonResult;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
@Override
public String toString() {
return "DeprecatedResponseResult{" +
"status='" + status + '\'' +
", message='" + message + '\'' +
", data=" + data +
'}';
}
}
\ No newline at end of file
......@@ -88,6 +88,22 @@ public class Result<T> implements Serializable {
return result;
}
public static <T> Result<T> buildSuc(T data) {
Result<T> result = new Result<T>();
result.setCode(ResultStatus.SUCCESS.getCode());
result.setMessage(ResultStatus.SUCCESS.getMessage());
result.setData(data);
return result;
}
public static <T> Result<T> buildFailure(String message) {
Result<T> result = new Result<T>();
result.setCode(ResultStatus.GATEWAY_INVALID_REQUEST.getCode());
result.setMessage(message);
result.setData(null);
return result;
}
public static Result buildFrom(ResultStatus resultStatus) {
Result result = new Result();
result.setCode(resultStatus.getCode());
......
......@@ -8,7 +8,10 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant;
* @date 20/4/16
*/
public enum ResultStatus {
GATEWAY_INVALID_REQUEST(-1, "invalid request"),
SUCCESS(Constant.SUCCESS, "success"),
LOGIN_FAILED(1, "login failed, please check username and password"),
......
......@@ -8,18 +8,18 @@ import java.util.Map;
* @date 20/7/29
*/
public class KafkaBootstrapServerConfig extends BaseGatewayConfig {
private Map<Long, List<String>> clusterIdBootstrapServersMap;
private Map<String, List<String>> clusterIdBootstrapServersMap;
public KafkaBootstrapServerConfig(Long version, Map<Long, List<String>> clusterIdBootstrapServersMap) {
public KafkaBootstrapServerConfig(Long version, Map<String, List<String>> clusterIdBootstrapServersMap) {
this.version = version;
this.clusterIdBootstrapServersMap = clusterIdBootstrapServersMap;
}
public Map<Long, List<String>> getClusterIdBootstrapServersMap() {
public Map<String, List<String>> getClusterIdBootstrapServersMap() {
return clusterIdBootstrapServersMap;
}
public void setClusterIdBootstrapServersMap(Map<Long, List<String>> clusterIdBootstrapServersMap) {
public void setClusterIdBootstrapServersMap(Map<String, List<String>> clusterIdBootstrapServersMap) {
this.clusterIdBootstrapServersMap = clusterIdBootstrapServersMap;
}
......
package com.xiaojukeji.kafka.manager.common.entity.pojo;
import java.util.Date;
/**
* @author zengqiao
* @date 20/4/29
*/
public class OperationHistoryDO {
private Long id;
private Date gmtCreate;
private Long clusterId;
private String topicName;
private String operator;
private String operation;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Date getGmtCreate() {
return gmtCreate;
}
public void setGmtCreate(Date gmtCreate) {
this.gmtCreate = gmtCreate;
}
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getOperator() {
return operator;
}
public void setOperator(String operator) {
this.operator = operator;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "OperationHistoryDO{" +
"id=" + id +
", gmtCreate=" + gmtCreate +
", clusterId=" + clusterId +
", topicName='" + topicName + '\'' +
", operator='" + operator + '\'' +
", operation='" + operation + '\'' +
'}';
}
public static OperationHistoryDO newInstance(Long clusterId, String topicName, String operator, String operation) {
OperationHistoryDO operationHistoryDO = new OperationHistoryDO();
operationHistoryDO.setClusterId(clusterId);
operationHistoryDO.setTopicName(topicName);
operationHistoryDO.setOperator(operator);
operationHistoryDO.setOperation(operation);
return operationHistoryDO;
}
}
\ No newline at end of file
......@@ -18,6 +18,9 @@ public class AppVO {
@ApiModelProperty(value="App密码")
private String password;
@ApiModelProperty(value="申请人")
private String applicant;
@ApiModelProperty(value="App描述")
private String description;
......@@ -48,6 +51,14 @@ public class AppVO {
this.password = password;
}
public String getApplicant() {
return applicant;
}
public void setApplicant(String applicant) {
this.applicant = applicant;
}
public String getDescription() {
return description;
}
......@@ -70,6 +81,7 @@ public class AppVO {
"appId='" + appId + '\'' +
", name='" + name + '\'' +
", password='" + password + '\'' +
", applicant='" + applicant + '\'' +
", description='" + description + '\'' +
", principals='" + principals + '\'' +
'}';
......
package com.xiaojukeji.kafka.manager.common.entity.vo.thirdpart;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* @author zhongyuankai
* @date 2020/6/18
*/
@Deprecated
@ApiModel(description="AppID基本信息")
public class AppBasicInfoVO {
@ApiModelProperty(value="appId")
private String appId;
@ApiModelProperty(value="app密码")
private String password;
@ApiModelProperty(value="app名称")
private String name;
@ApiModelProperty(value="申请人")
private String applicant;
@ApiModelProperty(value="appId负责人")
private String principal;
@ApiModelProperty(value="描述信息")
private String description;
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getApplicant() {
return applicant;
}
public void setApplicant(String applicant) {
this.applicant = applicant;
}
public String getPrincipal() {
return principal;
}
public void setPrincipal(String principal) {
this.principal = principal;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
@Override
public String toString() {
return "AppBasicInfoVO{" +
"appId='" + appId + '\'' +
", password='" + password + '\'' +
", name='" + name + '\'' +
", applicant='" + applicant + '\'' +
", principal='" + principal + '\'' +
", description='" + description + '\'' +
'}';
}
}
......@@ -35,6 +35,7 @@ public class HttpUtils {
private static final String METHOD_GET = "GET";
private static final String METHOD_POST = "POST";
private static final String METHOD_PUT = "PUT";
private static final String METHOD_DELETE = "DELETE";
private static final String CHARSET_UTF8 = "UTF-8";
......@@ -119,6 +120,18 @@ public class HttpUtils {
return sendRequest(url, METHOD_PUT, null, headers, in);
}
public static String deleteForString(String url, String content, Map<String, String> headers) {
InputStream in = null;
try {
if (content != null && !content.isEmpty()) {
in = new ByteArrayInputStream(content.getBytes(CHARSET_UTF8));
}
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
return sendRequest(url, METHOD_DELETE, null, headers, in);
}
/**
* @param url 请求的链接, 只支持 http 和 https 链接
* @param method GET or POST
......
......@@ -28,10 +28,13 @@ public class JmxConnectorWrap {
private AtomicInteger atomicInteger;
public JmxConnectorWrap(String host, int port) {
public JmxConnectorWrap(String host, int port, int maxConn) {
this.host = host;
this.port = port;
this.atomicInteger = new AtomicInteger(25);
if (maxConn <= 0) {
maxConn = 1;
}
this.atomicInteger = new AtomicInteger(maxConn);
}
public boolean checkJmxConnectionAndInitIfNeed() {
......
......@@ -14,6 +14,7 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
import com.xiaojukeji.kafka.manager.dao.ControllerDao;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
import com.xiaojukeji.kafka.manager.service.zookeeper.*;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
......@@ -44,6 +45,9 @@ public class PhysicalClusterMetadataManager {
@Autowired
private ClusterService clusterService;
@Autowired
private ConfigUtils configUtils;
private final static Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
private final static Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
......@@ -89,7 +93,7 @@ public class PhysicalClusterMetadataManager {
BROKER_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
JMX_CONNECTOR_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
KAFKA_VERSION_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig);
BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, configUtils.getJmxMaxConn());
brokerListener.init();
zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener);
......@@ -255,7 +259,7 @@ public class PhysicalClusterMetadataManager {
//---------------------------Broker元信息相关--------------
public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata) {
public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata, Integer jmxMaxConn) {
Map<Integer, BrokerMetadata> metadataMap = BROKER_METADATA_MAP.get(clusterId);
if (metadataMap == null) {
return;
......@@ -263,7 +267,7 @@ public class PhysicalClusterMetadataManager {
metadataMap.put(brokerId, brokerMetadata);
Map<Integer, JmxConnectorWrap> jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort()));
jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxMaxConn));
JMX_CONNECTOR_MAP.put(clusterId, jmxMap);
Map<Integer, KafkaVersion> versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
......
......@@ -38,10 +38,10 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
}
Long maxVersion = Long.MIN_VALUE;
Map<Long, List<String>> clusterIdBootstrapServersMap = new HashMap<>(doList.size());
Map<String, List<String>> clusterIdBootstrapServersMap = new HashMap<>(doList.size());
for (GatewayConfigDO configDO: doList) {
clusterIdBootstrapServersMap.put(
Long.valueOf(configDO.getName()),
configDO.getName().trim(),
ListUtils.string2StrList(configDO.getValue())
);
if (configDO.getVersion().compareTo(maxVersion) > 0) {
......
......@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.*;
......@@ -11,7 +10,6 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ConfigDO;
import com.xiaojukeji.kafka.manager.dao.ConfigDao;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -166,11 +164,6 @@ public class ConfigServiceImpl implements ConfigService {
@Override
public CreateTopicElemConfig getCreateTopicConfig(Long clusterId, String systemCode) {
String configKey = TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY;
if (SystemCodeConstant.LOG_X.equals(systemCode)) {
configKey = TopicCreationConstant.LOG_X_CREATE_TOPIC_CONFIG_KEY_NAME;
} else if (SystemCodeConstant.CHORUS.equals(systemCode)) {
configKey = TopicCreationConstant.CHORUS_CREATE_TOPIC_CONFIG_KEY_NAME;
}
CreateTopicConfig configValue = this.getByKey(
configKey,
CreateTopicConfig.class
......
......@@ -390,7 +390,7 @@ public class ConsumerServiceImpl implements ConsumerService {
@Override
public boolean checkConsumerGroupExist(OffsetLocationEnum offsetLocation, Long clusterId, String topicName, String consumerGroup) {
List<ConsumerGroupDTO> consumerGroupList = getConsumerGroupList(clusterId, topicName).stream()
.filter(group -> offsetLocation.location.equals(group.getOffsetStoreLocation()) && consumerGroup.equals(group.getConsumerGroup()))
.filter(group -> offsetLocation.location.equals(group.getOffsetStoreLocation().location) && consumerGroup.equals(group.getConsumerGroup()))
.collect(Collectors.toList());
return !ValidateUtils.isEmptyList(consumerGroupList);
}
......
......@@ -186,7 +186,7 @@ public class ExpertServiceImpl implements ExpertService {
continue;
}
Integer suggestedPartitionNum = (int) Math.round(
bytesIn / topicMetadata.getPartitionNum() / config.getMaxBytesInPerPartitionUnitB()
bytesIn / config.getMaxBytesInPerPartitionUnitB()
);
if (suggestedPartitionNum - topicMetadata.getPartitionNum() < 1) {
continue;
......
......@@ -13,6 +13,9 @@ public class ConfigUtils {
@Value(value = "${custom.idc}")
private String idc;
@Value("${custom.jmx.max-conn}")
private Integer jmxMaxConn;
@Value(value = "${spring.profiles.active}")
private String kafkaManagerEnv;
......@@ -24,6 +27,14 @@ public class ConfigUtils {
this.idc = idc;
}
public Integer getJmxMaxConn() {
return jmxMaxConn;
}
public void setJmxMaxConn(Integer jmxMaxConn) {
this.jmxMaxConn = jmxMaxConn;
}
public String getKafkaManagerEnv() {
return kafkaManagerEnv;
}
......
......@@ -22,9 +22,12 @@ public class BrokerStateListener implements StateChangeListener {
private ZkConfigImpl zkConfig;
public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig) {
private Integer jmxMaxConn;
public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig, Integer jmxMaxConn) {
this.clusterId = clusterId;
this.zkConfig = zkConfig;
this.jmxMaxConn = jmxMaxConn;
}
@Override
......@@ -81,7 +84,7 @@ public class BrokerStateListener implements StateChangeListener {
}
brokerMetadata.setClusterId(clusterId);
brokerMetadata.setBrokerId(brokerId);
PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata);
PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxMaxConn);
} catch (Exception e) {
LOGGER.error("add broker failed, clusterId:{} brokerMetadata:{}.", clusterId, brokerMetadata, e);
}
......
......@@ -38,6 +38,10 @@
SELECT * FROM cluster where id=#{id}
</select>
<delete id="deleteById" parameterType="java.lang.Long">
DELETE FROM cluster where id=#{id}
</delete>
<select id="list" resultMap="ClusterMap">
SELECT * FROM cluster WHERE status = 1
</select>
......
......@@ -55,4 +55,4 @@ public class BaseEnterpriseStaffService extends AbstractEnterpriseStaffService {
}
return new ArrayList<>();
}
}
\ No newline at end of file
}
......@@ -30,19 +30,19 @@ import java.util.Map;
public class N9e extends AbstractAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(N9e.class);
@Value("${agent.n9e.base-url}")
@Value("${kcm.n9e.base-url}")
private String baseUrl;
@Value("${agent.n9e.username}")
@Value("${kcm.n9e.username}")
private String username;
@Value("${agent.n9e.user-token}")
@Value("${kcm.n9e.user-token}")
private String userToken;
@Value("${agent.n9e.tpl-id}")
@Value("${kcm.n9e.tpl-id}")
private Integer tplId;
@Value("${agent.n9e.timeout}")
@Value("${kcm.n9e.timeout}")
private Integer timeout;
/**
......
......@@ -6,7 +6,6 @@ package com.xiaojukeji.kafka.manager.kcm.component.storage.common;
* @date 20/4/29
*/
public enum StorageEnum {
GIFT(0, "gift"),
GIT(1, "git"),
S3(2, "S3"),
;
......
agent:
n9e:
base-url: http://127.0.0.1/api
username: admin
user-token: admin
tpl-id: 123456
timeout: 30
\ No newline at end of file
package com.xiaojukeji.kafka.manager.monitor.component.n9e;
import com.xiaojukeji.kafka.manager.monitor.common.entry.MetricSinkPoint;
import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eMetricSinkPoint;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.monitor.common.entry.*;
import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
......@@ -27,4 +29,127 @@ public class N9eConverter {
}
return n9ePointList;
}
public static N9eStrategy convert2N9eStrategy(Strategy strategy, Integer monitorN9eNid) {
if (strategy == null) {
return null;
}
N9eStrategy n9eStrategy = new N9eStrategy();
n9eStrategy.setId(strategy.getId().intValue());
n9eStrategy.setCategory(1);
n9eStrategy.setName(strategy.getName());
n9eStrategy.setNid(monitorN9eNid);
n9eStrategy.setExcl_nid(new ArrayList<>());
n9eStrategy.setPriority(strategy.getPriority());
n9eStrategy.setAlert_dur(60);
List<N9eStrategyExpression> exprs = new ArrayList<>();
for (StrategyExpression strategyExpression: strategy.getStrategyExpressionList()) {
N9eStrategyExpression n9eStrategyExpression = new N9eStrategyExpression();
n9eStrategyExpression.setMetric(strategyExpression.getMetric());
n9eStrategyExpression.setFunc(strategyExpression.getFunc());
n9eStrategyExpression.setEopt(strategyExpression.getEopt());
n9eStrategyExpression.setThreshold(strategyExpression.getThreshold().intValue());
n9eStrategyExpression.setParams(ListUtils.string2IntList(strategyExpression.getParams()));
exprs.add(n9eStrategyExpression);
}
n9eStrategy.setExprs(exprs);
List<N9eStrategyFilter> tags = new ArrayList<>();
for (StrategyFilter strategyFilter: strategy.getStrategyFilterList()) {
N9eStrategyFilter n9eStrategyFilter = new N9eStrategyFilter();
n9eStrategyFilter.setTkey(strategyFilter.getTkey());
n9eStrategyFilter.setTopt(strategyFilter.getTopt());
n9eStrategyFilter.setTval(Arrays.asList(strategyFilter.getTval()));
tags.add(n9eStrategyFilter);
}
n9eStrategy.setTags(tags);
n9eStrategy.setRecovery_dur(0);
n9eStrategy.setRecovery_notify(0);
StrategyAction strategyAction = strategy.getStrategyActionList().get(0);
n9eStrategy.setConverge(ListUtils.string2IntList(strategyAction.getConverge()));
n9eStrategy.setNotify_group(ListUtils.string2StrList(strategyAction.getNotifyGroup()));
n9eStrategy.setNotify_user(new ArrayList<>());
n9eStrategy.setCallback(strategyAction.getCallback());
n9eStrategy.setEnable_stime("00:00");
n9eStrategy.setEnable_etime("23:59");
n9eStrategy.setEnable_days_of_week(ListUtils.string2IntList(strategy.getPeriodDaysOfWeek()));
n9eStrategy.setNeed_upgrade(0);
n9eStrategy.setAlert_upgrade(new ArrayList<>());
return n9eStrategy;
}
public static List<Strategy> convert2StrategyList(List<N9eStrategy> n9eStrategyList) {
if (n9eStrategyList == null || n9eStrategyList.isEmpty()) {
return new ArrayList<>();
}
List<Strategy> strategyList = new ArrayList<>();
for (N9eStrategy n9eStrategy: n9eStrategyList) {
strategyList.add(convert2Strategy(n9eStrategy));
}
return strategyList;
}
public static Strategy convert2Strategy(N9eStrategy n9eStrategy) {
if (n9eStrategy == null) {
return null;
}
Strategy strategy = new Strategy();
strategy.setId(n9eStrategy.getId().longValue());
strategy.setName(n9eStrategy.getName());
strategy.setPriority(n9eStrategy.getPriority());
strategy.setPeriodHoursOfDay("0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23");
strategy.setPeriodDaysOfWeek(ListUtils.intList2String(n9eStrategy.getEnable_days_of_week()));
List<StrategyExpression> strategyExpressionList = new ArrayList<>();
for (N9eStrategyExpression n9eStrategyExpression: n9eStrategy.getExprs()) {
StrategyExpression strategyExpression = new StrategyExpression();
strategyExpression.setMetric(n9eStrategyExpression.getMetric());
strategyExpression.setFunc(n9eStrategyExpression.getFunc());
strategyExpression.setEopt(n9eStrategyExpression.getEopt());
strategyExpression.setThreshold(n9eStrategyExpression.getThreshold().longValue());
strategyExpression.setParams(ListUtils.intList2String(n9eStrategyExpression.getParams()));
strategyExpressionList.add(strategyExpression);
}
strategy.setStrategyExpressionList(strategyExpressionList);
List<StrategyFilter> strategyFilterList = new ArrayList<>();
for (N9eStrategyFilter n9eStrategyFilter: n9eStrategy.getTags()) {
StrategyFilter strategyFilter = new StrategyFilter();
strategyFilter.setTkey(n9eStrategyFilter.getTkey());
strategyFilter.setTopt(n9eStrategyFilter.getTopt());
strategyFilter.setTval(ListUtils.strList2String(n9eStrategyFilter.getTval()));
strategyFilterList.add(strategyFilter);
}
strategy.setStrategyFilterList(strategyFilterList);
StrategyAction strategyAction = new StrategyAction();
strategyAction.setNotifyGroup(ListUtils.strList2String(n9eStrategy.getNotify_group()));
strategyAction.setConverge(ListUtils.intList2String(n9eStrategy.getConverge()));
strategyAction.setCallback(n9eStrategy.getCallback());
strategy.setStrategyActionList(Arrays.asList(strategyAction));
return strategy;
}
public static List<NotifyGroup> convert2NotifyGroupList(N9eNotifyGroup n9eNotifyGroup) {
if (n9eNotifyGroup == null || n9eNotifyGroup.getList() == null) {
return new ArrayList<>();
}
List<NotifyGroup> notifyGroupList = new ArrayList<>();
for (N9eNotifyGroupElem n9eNotifyGroupElem: n9eNotifyGroup.getList()) {
NotifyGroup notifyGroup = new NotifyGroup();
notifyGroup.setId(n9eNotifyGroupElem.getId().longValue());
notifyGroup.setName(n9eNotifyGroupElem.getName());
notifyGroup.setComment(n9eNotifyGroupElem.getNote());
notifyGroupList.add(notifyGroup);
}
return notifyGroupList;
}
}
\ No newline at end of file
......@@ -2,17 +2,18 @@ package com.xiaojukeji.kafka.manager.monitor.component.n9e;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.utils.HttpUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.monitor.component.AbstractMonitorService;
import com.xiaojukeji.kafka.manager.monitor.common.entry.*;
import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eNotifyGroup;
import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eResult;
import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.*;
/**
* 夜莺
......@@ -23,21 +24,28 @@ import java.util.Properties;
public class N9eService extends AbstractMonitorService {
private static final Logger LOGGER = LoggerFactory.getLogger(N9eService.class);
@Value("${monitor.n9e.nid}")
private Integer monitorN9eNid;
@Value("${monitor.n9e.user-token}")
private String monitorN9eToken;
@Value("${monitor.n9e.base-url}")
private String monitorN9eBaseUrl;
/**
* 告警策略
*/
private static final String STRATEGY_ADD_URL = "/auth/v1/strategy/add";
private static final String STRATEGY_ADD_URL = "/api/mon/stra";
private static final String STRATEGY_DEL_URL = "/auth/v1/strategy/del";
private static final String STRATEGY_DEL_URL = "/api/mon/stra";
private static final String STRATEGY_MODIFY_URL = "/auth/v1/strategy/modify";
private static final String STRATEGY_MODIFY_URL = "/api/mon/stra";
private static final String STRATEGY_QUERY_BY_NS_URL = "/auth/v1/strategy/query/ns";
private static final String STRATEGY_QUERY_BY_NS_URL = "/api/mon/stra";
private static final String STRATEGY_QUERY_BY_ID_URL = "/api/mon/stra";
private static final String STRATEGY_QUERY_BY_ID_URL = "/auth/v1/strategy/query/id";
private static final String ALERT_QUERY_BY_NS_AND_PERIOD_URL = "/auth/v1/event/query/ns/period";
......@@ -57,41 +65,121 @@ public class N9eService extends AbstractMonitorService {
/**
* 指标数据
*/
private static final String COLLECTOR_SINK_DATA_URL = "/api/collector/push";
private static final String COLLECTOR_SINK_DATA_URL = "/api/transfer/push";
private static final String COLLECTOR_DOWNLOAD_DATA_URL = "/data/query/graph/dashboard/history";
/**
* 告警组
*/
private static final String ALL_NOTIFY_GROUP_URL = "/auth/v1/usergroup/group/all";
private static final String ALL_NOTIFY_GROUP_URL = "/api/mon/teams/all";
/**
* 监控策略的增删改查
*/
@Override
public Integer createStrategy(Strategy strategy) {
return 0;
String response = null;
try {
response = HttpUtils.postForString(
monitorN9eBaseUrl + STRATEGY_ADD_URL,
JSON.toJSONString(N9eConverter.convert2N9eStrategy(strategy, monitorN9eNid)),
buildHeader()
);
N9eResult n9eResult = JSON.parseObject(response, N9eResult.class);
if (!ValidateUtils.isBlank(n9eResult.getErr())) {
LOGGER.error("create strategy failed, strategy:{} response:{}.", strategy, response);
return null;
}
return (Integer) n9eResult.getDat();
} catch (Exception e) {
LOGGER.error("create strategy failed, strategy:{} response:{}.", strategy, response, e);
}
return null;
}
@Override
public Boolean deleteStrategyById(Long strategyId) {
return true;
Map<String, List<Long>> params = new HashMap<>(1);
params.put("ids", Arrays.asList(strategyId));
String response = null;
try {
response = HttpUtils.deleteForString(
monitorN9eBaseUrl + STRATEGY_DEL_URL,
JSON.toJSONString(params),
buildHeader()
);
N9eResult n9eResult = JSON.parseObject(response, N9eResult.class);
if (!ValidateUtils.isBlank(n9eResult.getErr())) {
LOGGER.error("delete strategy failed, strategyId:{} response:{}.", strategyId, response);
return Boolean.FALSE;
}
return Boolean.TRUE;
} catch (Exception e) {
LOGGER.error("delete strategy failed, strategyId:{} response:{}.", strategyId, response, e);
}
return Boolean.FALSE;
}
@Override
public Boolean modifyStrategy(Strategy strategy) {
return true;
String response = null;
try {
response = HttpUtils.putForString(
monitorN9eBaseUrl + STRATEGY_MODIFY_URL,
JSON.toJSONString(N9eConverter.convert2N9eStrategy(strategy, monitorN9eNid)),
buildHeader()
);
N9eResult n9eResult = JSON.parseObject(response, N9eResult.class);
if (!ValidateUtils.isBlank(n9eResult.getErr())) {
LOGGER.error("modify strategy failed, strategy:{} response:{}.", strategy, response);
return Boolean.FALSE;
}
return Boolean.TRUE;
} catch (Exception e) {
LOGGER.error("modify strategy failed, strategy:{} response:{}.", strategy, response, e);
}
return Boolean.FALSE;
}
@Override
public List<Strategy> getStrategies() {
Map<String, String> params = new HashMap<>();
params.put("nid", String.valueOf(monitorN9eNid));
String response = null;
try {
response = HttpUtils.get(monitorN9eBaseUrl + STRATEGY_QUERY_BY_NS_URL, params, buildHeader());
N9eResult n9eResult = JSON.parseObject(response, N9eResult.class);
if (!ValidateUtils.isBlank(n9eResult.getErr())) {
LOGGER.error("get monitor strategies failed, response:{}.", response);
return new ArrayList<>();
}
return N9eConverter.convert2StrategyList(JSON.parseArray(JSON.toJSONString(n9eResult.getDat()), N9eStrategy.class));
} catch (Exception e) {
LOGGER.error("get monitor strategies failed, response:{}.", response, e);
}
return new ArrayList<>();
}
@Override
public Strategy getStrategyById(Long strategyId) {
return new Strategy();
String uri = STRATEGY_QUERY_BY_ID_URL + "/" + String.valueOf(strategyId);
String response = null;
try {
response = HttpUtils.get(monitorN9eBaseUrl + uri, new HashMap<>(0), buildHeader());
N9eResult n9eResult = JSON.parseObject(response, N9eResult.class);
if (!ValidateUtils.isBlank(n9eResult.getErr())) {
LOGGER.error("get monitor strategy failed, response:{}.", response);
return null;
}
return N9eConverter.convert2Strategy(JSON.parseObject(JSON.toJSONString(n9eResult.getDat()), N9eStrategy.class));
} catch (Exception e) {
LOGGER.error("get monitor strategy failed, response:{}.", response, e);
}
return null;
}
@Override
......@@ -161,6 +249,26 @@ public class N9eService extends AbstractMonitorService {
@Override
public List<NotifyGroup> getNotifyGroups() {
String response = null;
try {
response = HttpUtils.get(monitorN9eBaseUrl + ALL_NOTIFY_GROUP_URL, new HashMap<>(0), buildHeader());
N9eResult n9eResult = JSON.parseObject(response, N9eResult.class);
if (!ValidateUtils.isBlank(n9eResult.getErr())) {
LOGGER.error("get notify group failed, response:{}.", response);
return new ArrayList<>();
}
return N9eConverter.convert2NotifyGroupList(JSON.parseObject(JSON.toJSONString(n9eResult.getDat()), N9eNotifyGroup.class));
} catch (Exception e) {
LOGGER.error("get notify group failed, response:{}.", response, e);
}
return new ArrayList<>();
}
}
\ No newline at end of file
private Map<String, String> buildHeader() {
Map<String, String> header = new HashMap<>(2);
header.put("Content-Type", "application/json");
header.put("X-User-Token", monitorN9eToken);
return header;
}
}
package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry;
import java.util.List;
/**
* @author zengqiao
* @date 20/10/19
*/
public class N9eNotifyGroup {
private List<N9eNotifyGroupElem> list;
public List<N9eNotifyGroupElem> getList() {
return list;
}
public void setList(List<N9eNotifyGroupElem> list) {
this.list = list;
}
@Override
public String toString() {
return "N9eNotifyGroup{" +
"list=" + list +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry;
/**
* @author zengqiao
* @date 20/10/19
*/
public class N9eNotifyGroupElem {
private Integer creator;
private Integer id;
private String ident;
private String last_updated;
private Integer mgmt;
private String name;
private String note;
public Integer getCreator() {
return creator;
}
public void setCreator(Integer creator) {
this.creator = creator;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getIdent() {
return ident;
}
public void setIdent(String ident) {
this.ident = ident;
}
public String getLast_updated() {
return last_updated;
}
public void setLast_updated(String last_updated) {
this.last_updated = last_updated;
}
public Integer getMgmt() {
return mgmt;
}
public void setMgmt(Integer mgmt) {
this.mgmt = mgmt;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getNote() {
return note;
}
public void setNote(String note) {
this.note = note;
}
@Override
public String toString() {
return "N9eNotifyGroupElem{" +
"creator=" + creator +
", id=" + id +
", ident='" + ident + '\'' +
", last_updated='" + last_updated + '\'' +
", mgmt=" + mgmt +
", name='" + name + '\'' +
", note='" + note + '\'' +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry;
import java.util.ArrayList;
import java.util.List;
/**
* @author zengqiao
* @date 20/10/18
*/
public class N9eStrategy {
private Integer id;
private Integer category = 1;
/**
* 策略名称
*/
private String name;
/**
* 策略关联的对象树节点id
*/
private Integer nid;
private List<Integer> excl_nid = new ArrayList<>();
private Integer priority;
private Integer alert_dur = 60;
private List<N9eStrategyExpression> exprs;
private List<N9eStrategyFilter> tags;
private Integer recovery_dur;
private Integer recovery_notify;
private List<N9eStrategyAlertUpgrade> alert_upgrade = new ArrayList<>();
private List<Integer> converge;
private List<String> notify_group;
private List<Integer> notify_user;
private String callback;
private String enable_stime;
private String enable_etime;
private List<Integer> enable_days_of_week;
private Integer need_upgrade;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getCategory() {
return category;
}
public void setCategory(Integer category) {
this.category = category;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getNid() {
return nid;
}
public void setNid(Integer nid) {
this.nid = nid;
}
public List<Integer> getExcl_nid() {
return excl_nid;
}
public void setExcl_nid(List<Integer> excl_nid) {
this.excl_nid = excl_nid;
}
public Integer getPriority() {
return priority;
}
public void setPriority(Integer priority) {
this.priority = priority;
}
public Integer getAlert_dur() {
return alert_dur;
}
public void setAlert_dur(Integer alert_dur) {
this.alert_dur = alert_dur;
}
public List<N9eStrategyExpression> getExprs() {
return exprs;
}
public void setExprs(List<N9eStrategyExpression> exprs) {
this.exprs = exprs;
}
public List<N9eStrategyFilter> getTags() {
return tags;
}
public void setTags(List<N9eStrategyFilter> tags) {
this.tags = tags;
}
public Integer getRecovery_dur() {
return recovery_dur;
}
public void setRecovery_dur(Integer recovery_dur) {
this.recovery_dur = recovery_dur;
}
public Integer getRecovery_notify() {
return recovery_notify;
}
public void setRecovery_notify(Integer recovery_notify) {
this.recovery_notify = recovery_notify;
}
public List<N9eStrategyAlertUpgrade> getAlert_upgrade() {
return alert_upgrade;
}
public void setAlert_upgrade(List<N9eStrategyAlertUpgrade> alert_upgrade) {
this.alert_upgrade = alert_upgrade;
}
public List<Integer> getConverge() {
return converge;
}
public void setConverge(List<Integer> converge) {
this.converge = converge;
}
public List<String> getNotify_group() {
return notify_group;
}
public void setNotify_group(List<String> notify_group) {
this.notify_group = notify_group;
}
public List<Integer> getNotify_user() {
return notify_user;
}
public void setNotify_user(List<Integer> notify_user) {
this.notify_user = notify_user;
}
public String getCallback() {
return callback;
}
public void setCallback(String callback) {
this.callback = callback;
}
public String getEnable_stime() {
return enable_stime;
}
public void setEnable_stime(String enable_stime) {
this.enable_stime = enable_stime;
}
public String getEnable_etime() {
return enable_etime;
}
public void setEnable_etime(String enable_etime) {
this.enable_etime = enable_etime;
}
public List<Integer> getEnable_days_of_week() {
return enable_days_of_week;
}
public void setEnable_days_of_week(List<Integer> enable_days_of_week) {
this.enable_days_of_week = enable_days_of_week;
}
public Integer getNeed_upgrade() {
return need_upgrade;
}
public void setNeed_upgrade(Integer need_upgrade) {
this.need_upgrade = need_upgrade;
}
@Override
public String toString() {
return "N9eStrategy{" +
"id=" + id +
", category=" + category +
", name='" + name + '\'' +
", nid=" + nid +
", excl_nid=" + excl_nid +
", priority=" + priority +
", alert_dur=" + alert_dur +
", exprs=" + exprs +
", tags=" + tags +
", recovery_dur=" + recovery_dur +
", recovery_notify=" + recovery_notify +
", alert_upgrade=" + alert_upgrade +
", converge=" + converge +
", notify_group=" + notify_group +
", notify_user=" + notify_user +
", callback='" + callback + '\'' +
", enable_stime='" + enable_stime + '\'' +
", enable_etime='" + enable_etime + '\'' +
", enable_days_of_week=" + enable_days_of_week +
", need_upgrade=" + need_upgrade +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry;
import java.util.List;
/**
* @author zengqiao
* @date 20/10/19
*/
public class N9eStrategyAlertUpgrade {
private Integer duration;
private Integer level;
private List<Integer> users;
private List<String> groups;
public Integer getDuration() {
return duration;
}
public void setDuration(Integer duration) {
this.duration = duration;
}
public Integer getLevel() {
return level;
}
public void setLevel(Integer level) {
this.level = level;
}
public List<Integer> getUsers() {
return users;
}
public void setUsers(List<Integer> users) {
this.users = users;
}
public List<String> getGroups() {
return groups;
}
public void setGroups(List<String> groups) {
this.groups = groups;
}
@Override
public String toString() {
return "N9eStrategyAlertUpgrade{" +
"duration=" + duration +
", level=" + level +
", users=" + users +
", groups=" + groups +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry;
import java.util.List;
/**
* @author zengqiao
* @date 20/10/18
*/
public class N9eStrategyExpression {
private String metric;
private String func;
private String eopt;
private Integer threshold;
private List<Integer> params;
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public String getFunc() {
return func;
}
public void setFunc(String func) {
this.func = func;
}
public String getEopt() {
return eopt;
}
public void setEopt(String eopt) {
this.eopt = eopt;
}
public Integer getThreshold() {
return threshold;
}
public void setThreshold(Integer threshold) {
this.threshold = threshold;
}
public List<Integer> getParams() {
return params;
}
public void setParams(List<Integer> params) {
this.params = params;
}
@Override
public String toString() {
return "N9eStrategyExpression{" +
"metric='" + metric + '\'' +
", func='" + func + '\'' +
", eopt='" + eopt + '\'' +
", threshold=" + threshold +
", params=" + params +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry;
import java.util.List;
/**
* @author zengqiao
* @date 20/10/18
*/
public class N9eStrategyFilter {
private String topt;
private String tkey;
private List<String> tval;
public String getTopt() {
return topt;
}
public void setTopt(String topt) {
this.topt = topt;
}
public String getTkey() {
return tkey;
}
public void setTkey(String tkey) {
this.tkey = tkey;
}
public List<String> getTval() {
return tval;
}
public void setTval(List<String> tval) {
this.tval = tval;
}
@Override
public String toString() {
return "N9eStrategyFilter{" +
"topt='" + topt + '\'' +
", tkey='" + tkey + '\'' +
", tval=" + tval +
'}';
}
}
\ No newline at end of file
......@@ -10,10 +10,10 @@ import org.springframework.stereotype.Service;
*/
@Service("notifyService")
public class KafkaNotifierService extends AbstractNotifyService {
@Value("${kafka.cluster-id:}")
@Value("${notify.kafka.cluster-id:}")
private Long clusterId;
@Value("${notify.topic-name:}")
@Value("${notify.kafka.topic-name:}")
private String topicName;
@Override
......
notify:
order:
detail-url: http://127.0.0.1
kafka:
cluster-id: 12
topic-name: 123
package com.xiaojukeji.kafka.manager.openapi;
import com.xiaojukeji.kafka.manager.common.bizenum.ConsumeHealthEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.openapi.common.dto.*;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import java.util.List;
/**
* @author zengqiao
* @date 20/5/22
*/
public interface ThirdPartService {
Result<ConsumeHealthEnum> checkConsumeHealth(Long clusterId,
String topicName,
String consumerGroup,
Long maxDelayTime);
List<Result> resetOffsets(ClusterDO clusterDO, OffsetResetDTO dto);}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.openapi;
import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum;
/**
* @author zhongyuankai
* @date 2020/08/31
*/
public class ThirdPartUtils {
public static String getOrderLimitKey(OrderTypeEnum orderTypeEnum, String systemCode) {
return orderTypeEnum.getOrderName() + "_" + systemCode;
}
}
package com.xiaojukeji.kafka.manager.openapi.common.constant;
import java.util.Arrays;
import java.util.List;
/**
* @author zengqiao
* @date 20/10/26
*/
public class ThirdPartConstant {
public final static List<Long> QUOTA_MODIFY_WHITE_CLUSTER_LIST = Arrays.asList(70L, 46L);
public final static Integer DATA_DREAM_MAX_APP_NUM = 20;
public final static Integer DATA_DREAM_MAX_AUTHORITY_NUM = 500;
public final static String SELF_SYSTEM_CODE = "kafkamanager";
}
package com.xiaojukeji.kafka.manager.openapi.common.dto;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
/**
* @author zengqiao
* @date 20/6/2
*/
@ApiModel(description = "消费健康")
public class ConsumeHealthDTO {
@ApiModelProperty(value = "集群ID")
private Long clusterId;
@ApiModelProperty(value = "Topic名称")
private List<String> topicNameList;
@ApiModelProperty(value = "消费组")
private String consumerGroup;
@ApiModelProperty(value = "允许最大延迟(ms)")
private Long maxDelayTime;
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public List<String> getTopicNameList() {
return topicNameList;
}
public void setTopicNameList(List<String> topicNameList) {
this.topicNameList = topicNameList;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public Long getMaxDelayTime() {
return maxDelayTime;
}
public void setMaxDelayTime(Long maxDelayTime) {
this.maxDelayTime = maxDelayTime;
}
@Override
public String toString() {
return "ConsumeHealthDTO{" +
"clusterId=" + clusterId +
", topicNameList=" + topicNameList +
", consumerGroup='" + consumerGroup + '\'' +
", maxDelayTime=" + maxDelayTime +
'}';
}
public boolean paramLegal() {
if (ValidateUtils.isNull(clusterId)
|| ValidateUtils.isEmptyList(topicNameList)
|| ValidateUtils.isBlank(consumerGroup)
|| ValidateUtils.isNullOrLessThanZero(maxDelayTime)) {
return false;
}
for (String topicName: topicNameList) {
if (ValidateUtils.isExistBlank(topicName)) {
return false;
}
}
return true;
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.openapi.common.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetResetTypeEnum;
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import org.springframework.util.StringUtils;
import java.util.List;
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "重置消费偏移")
public class OffsetResetDTO {
@ApiModelProperty(value = "集群ID")
private Long clusterId;
@ApiModelProperty(value = "Topic名称")
private String topicName;
@ApiModelProperty(value = "消费组")
private String consumerGroup;
@ApiModelProperty(value = "消费组位置")
private String location;
@ApiModelProperty(value = "重置的方式[0:依据时间进行重置, 1:指定分区offset进行重置]")
private Integer offsetResetType;
@ApiModelProperty(value = "依据时间进行重置时, 传的参数, 13位时间戳")
private Long timestamp;
@ApiModelProperty(value = "指定分区进行重置时, 传的参数")
private List<PartitionOffsetDTO> partitionOffsetDTOList;
@ApiModelProperty(value = "如果消费组不存在则创建")
private Boolean createIfAbsent = Boolean.FALSE;
@ApiModelProperty(value = "使用的AppID")
private String appId;
@ApiModelProperty(value = "App密码")
private String password;
@ApiModelProperty(value = "操作人")
private String operator;
@ApiModelProperty(value = "系统code")
private String systemCode;
/**
* 默认使用assign的方式进行重置,
* 但是使用assign方式对于多个Topic的消费使用同一个消费组的场景, 需要停掉所有的client才可以重置成功, 否则重置失败
*
* 使用subscribe重置offset, 针对上面的场景可以重置成功, 但是涉及到poll函数调用, 所以默认是关闭的
*/
private Boolean subscribeReset = Boolean.FALSE; // 订阅重置, 默认是assign方式重置
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public Integer getOffsetResetType() {
return offsetResetType;
}
public void setOffsetResetType(Integer offsetResetType) {
this.offsetResetType = offsetResetType;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public List<PartitionOffsetDTO> getPartitionOffsetDTOList() {
return partitionOffsetDTOList;
}
public void setPartitionOffsetDTOList(List<PartitionOffsetDTO> partitionOffsetDTOList) {
this.partitionOffsetDTOList = partitionOffsetDTOList;
}
public Boolean getCreateIfAbsent() {
return createIfAbsent;
}
public void setCreateIfAbsent(Boolean createIfAbsent) {
this.createIfAbsent = createIfAbsent;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getOperator() {
return operator;
}
public void setOperator(String operator) {
this.operator = operator;
}
public String getSystemCode() {
return systemCode;
}
public void setSystemCode(String systemCode) {
this.systemCode = systemCode;
}
public Boolean getSubscribeReset() {
return subscribeReset;
}
public void setSubscribeReset(Boolean subscribeReset) {
this.subscribeReset = subscribeReset;
}
@Override
public String toString() {
return "OffsetResetModel{" +
"clusterId=" + clusterId +
", topicName='" + topicName + '\'' +
", consumerGroup='" + consumerGroup + '\'' +
", location='" + location + '\'' +
", offsetResetType=" + offsetResetType +
", timestamp=" + timestamp +
", partitionOffsetDTOList=" + partitionOffsetDTOList +
", createIfAbsent=" + createIfAbsent +
", appId='" + appId + '\'' +
", password='" + password + '\'' +
", operator='" + operator + '\'' +
", systemCode='" + systemCode + '\'' +
", subscribeReset=" + subscribeReset +
'}';
}
public boolean legal() {
if (clusterId == null
|| StringUtils.isEmpty(topicName)
|| StringUtils.isEmpty(consumerGroup)
|| StringUtils.isEmpty(location)
|| offsetResetType == null
|| StringUtils.isEmpty(operator)) {
return false;
}
appId = (appId == null? "": appId);
password = (password == null? "": password);
if (createIfAbsent == null) {
createIfAbsent = false;
}
if (subscribeReset == null) {
subscribeReset = false;
}
// 只能依据时间或者offset中的一个进行重置
if (OffsetResetTypeEnum.RESET_BY_TIME.getCode().equals(offsetResetType)) {
return timestamp != null;
} else if (OffsetResetTypeEnum.RESET_BY_OFFSET.getCode().equals(offsetResetType)) {
return partitionOffsetDTOList != null;
}
return false;
}
}
package com.xiaojukeji.kafka.manager.openapi.common.vo;
/**
* @author zengqiao
* @date 20/9/14
*/
public class BrokerRegionVO {
private Long clusterId;
private Integer brokerId;
private String hostname;
private String regionName;
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public Integer getBrokerId() {
return brokerId;
}
public void setBrokerId(Integer brokerId) {
this.brokerId = brokerId;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public String getRegionName() {
return regionName;
}
public void setRegionName(String regionName) {
this.regionName = regionName;
}
@Override
public String toString() {
return "BrokerRegionVO{" +
"clusterId=" + clusterId +
", brokerId=" + brokerId +
", hostname='" + hostname + '\'' +
", regionName='" + regionName + '\'' +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.openapi.common.vo;
/**
* @author zengqiao
* @date 20/10/26
*/
public class ConsumeHealthVO {
private Integer healthCode;
public ConsumeHealthVO(Integer healthCode) {
this.healthCode = healthCode;
}
public Integer getHealthCode() {
return healthCode;
}
public void setHealthCode(Integer healthCode) {
this.healthCode = healthCode;
}
@Override
public String toString() {
return "ConsumeHealthVO{" +
"healthCode=" + healthCode +
'}';
}
}
package com.xiaojukeji.kafka.manager.openapi.common.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* @author zengqiao
* @date 20/9/9
*/
@ApiModel(description="第三方-Broker概览")
public class ThirdPartBrokerOverviewVO {
@ApiModelProperty(value = "集群ID")
private Long clusterId;
@ApiModelProperty(value = "BrokerId")
private Integer brokerId;
@ApiModelProperty(value = "处于同步状态 false:已同步, true:未同步")
private Boolean underReplicated;
public ThirdPartBrokerOverviewVO(Long clusterId, Integer brokerId, Boolean underReplicated) {
this.clusterId = clusterId;
this.brokerId = brokerId;
this.underReplicated = underReplicated;
}
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public Integer getBrokerId() {
return brokerId;
}
public void setBrokerId(Integer brokerId) {
this.brokerId = brokerId;
}
public Boolean getUnderReplicated() {
return underReplicated;
}
public void setUnderReplicated(Boolean underReplicated) {
this.underReplicated = underReplicated;
}
@Override
public String toString() {
return "ThirdPartBrokerOverviewVO{" +
"clusterId=" + clusterId +
", brokerId=" + brokerId +
", underReplicated=" + underReplicated +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.openapi.common.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* @author zengqiao
* @date 20/8/24
*/
@ApiModel(description="TopicOffset变化")
public class TopicOffsetChangedVO {
@ApiModelProperty(value="Offset是否变化, 0:否, 1:是, -1:未知")
private Integer offsetChanged;
public TopicOffsetChangedVO(Integer offsetChanged) {
this.offsetChanged = offsetChanged;
}
public Integer getOffsetChanged() {
return offsetChanged;
}
public void setOffsetChanged(Integer offsetChanged) {
this.offsetChanged = offsetChanged;
}
@Override
public String toString() {
return "TopicOffsetChangedVO{" +
"offsetChanged=" + offsetChanged +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.openapi.common.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* @author zengqiao
* @date 20/8/14
*/
@ApiModel(description="Topic流量统计信息")
public class TopicStatisticMetricsVO {
@ApiModelProperty(value="峰值流入流量(B/s)")
private Double peakBytesIn;
public TopicStatisticMetricsVO(Double peakBytesIn) {
this.peakBytesIn = peakBytesIn;
}
public Double getPeakBytesIn() {
return peakBytesIn;
}
public void setPeakBytesIn(Double peakBytesIn) {
this.peakBytesIn = peakBytesIn;
}
@Override
public String toString() {
return "TopicStatisticMetricsVO{" +
"peakBytesIn=" + peakBytesIn +
'}';
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.openapi.impl;
import com.xiaojukeji.kafka.manager.common.bizenum.*;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.openapi.ThirdPartService;
import com.xiaojukeji.kafka.manager.openapi.common.dto.*;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import kafka.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.collection.JavaConversions;
import java.util.*;
/**
* @author zengqiao
* @date 20/5/22
*/
@Service("thirdPartService")
public class ThirdPartServiceImpl implements ThirdPartService {
private static Logger LOGGER = LoggerFactory.getLogger(ThirdPartServiceImpl.class);
@Autowired
private ClusterService clusterService;
@Autowired
private TopicService topicService;
@Autowired
private ConsumerService consumerService;
@Override
public Result<ConsumeHealthEnum> checkConsumeHealth(Long clusterId,
String topicName,
String consumerGroup,
Long maxDelayTime) {
ClusterDO clusterDO = clusterService.getById(clusterId);
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (ValidateUtils.isNull(topicMetadata)) {
return Result.buildFrom(ResultStatus.TOPIC_NOT_EXIST);
}
// 获取消费组当前的offset
Map<TopicPartition, Object> consumeOffsetMap = listGroupOffsets(clusterId, consumerGroup);
if (ValidateUtils.isNull(consumeOffsetMap)) {
return new Result<>(ConsumeHealthEnum.UNKNOWN);
}
if (consumeOffsetMap.isEmpty()) {
return Result.buildFrom(ResultStatus.CONSUMER_GROUP_NOT_EXIST);
}
Long delayTimestamp = System.currentTimeMillis() - maxDelayTime;
// 获取指定时间的offset
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimeMap =
offsetsForTimes(clusterDO, topicMetadata, delayTimestamp);
if (ValidateUtils.isNull(offsetAndTimeMap)) {
return new Result<>(ConsumeHealthEnum.UNKNOWN);
}
for (TopicPartition tp : offsetAndTimeMap.keySet()) {
OffsetAndTimestamp offsetAndTimestamp = offsetAndTimeMap.get(tp);
Long consumeOffset = (Long) consumeOffsetMap.get(tp);
if (ValidateUtils.isNull(consumeOffset)) {
return new Result<>(ConsumeHealthEnum.UNKNOWN);
}
if (offsetAndTimestamp.offset() <= consumeOffset) {
// 健康的
continue;
}
return new Result<>(ConsumeHealthEnum.UNHEALTH);
}
return new Result<>(ConsumeHealthEnum.HEALTH);
}
private Map<TopicPartition, Object> listGroupOffsets(Long clusterId, String consumerGroup) {
AdminClient client = KafkaClientPool.getAdminClient(clusterId);
if (ValidateUtils.isNull(client)) {
return null;
}
try {
return JavaConversions.asJavaMap(client.listGroupOffsets(consumerGroup));
} catch (Exception e) {
LOGGER.error("list group offsets failed, clusterId:{}, consumerGroup:{}.", clusterId, consumerGroup, e);
}
return null;
}
private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(ClusterDO clusterDO,
TopicMetadata topicMetadata,
Long timestamp) {
KafkaConsumer kafkaConsumer = null;
try {
kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO);
if (ValidateUtils.isNull(kafkaConsumer)) {
return null;
}
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (Integer partitionId : topicMetadata.getPartitionMap().getPartitions().keySet()) {
timestampsToSearch.put(new TopicPartition(topicMetadata.getTopic(), partitionId), timestamp);
}
return kafkaConsumer.offsetsForTimes(timestampsToSearch);
} catch (Exception e) {
LOGGER.error("get offset for time failed, clusterDO:{} topicMetadata:{} timestamp:{}.",
clusterDO, topicMetadata, timestamp, e);
} finally {
KafkaClientPool.returnKafkaConsumerClient(clusterDO.getId(), kafkaConsumer);
}
return null;
}
@Override
public List<Result> resetOffsets(ClusterDO clusterDO, OffsetResetDTO dto) {
if (ValidateUtils.isNull(dto)) {
return null;
}
List<PartitionOffsetDTO> offsetDTOList = dto.getPartitionOffsetDTOList();
if (ValidateUtils.isEmptyList(offsetDTOList)) {
offsetDTOList = topicService.getPartitionOffsetList(
clusterDO, dto.getTopicName(), dto.getTimestamp());
}
if (ValidateUtils.isEmptyList(offsetDTOList)) {
return null;
}
OffsetLocationEnum offsetLocation = dto.getLocation().equals(
OffsetLocationEnum.ZOOKEEPER.location) ? OffsetLocationEnum.ZOOKEEPER : OffsetLocationEnum.BROKER;
ResultStatus result = checkConsumerGroupExist(clusterDO, dto.getTopicName(), dto.getConsumerGroup(), offsetLocation, dto.getCreateIfAbsent());
if (ResultStatus.SUCCESS.getCode() != result.getCode()) {
return null;
}
ConsumerGroupDTO consumerGroupDTO = new ConsumerGroupDTO(
clusterDO.getId(),
dto.getConsumerGroup(),
new ArrayList<>(),
OffsetLocationEnum.getOffsetStoreLocation(dto.getLocation())
);
return consumerService.resetConsumerOffset(
clusterDO,
dto.getTopicName(),
consumerGroupDTO,
offsetDTOList
);
}
private ResultStatus checkConsumerGroupExist(ClusterDO clusterDO,
String topicName,
String consumerGroup,
OffsetLocationEnum offsetLocation,
Boolean createIfAbsent) {
if (createIfAbsent) {
// 如果不存在, 则直接创建
return isCreateIfAbsentOverflow(clusterDO, topicName);
}
if (!consumerService.checkConsumerGroupExist(offsetLocation, clusterDO.getId(), topicName, consumerGroup)) {
return ResultStatus.PARAM_ILLEGAL;
}
return ResultStatus.SUCCESS;
}
/**
* 限制单天单集群的重置次数不能超过20个
* <clusterId-topicName, timestamp * 100 + count>
*/
private static final Map<String, Long> createIfAbsentCountMap = new HashMap<>();
private synchronized ResultStatus isCreateIfAbsentOverflow(ClusterDO clusterDO, String topicName) {
String key = clusterDO.getId() + "_" + topicName;
Long timestampAndCount = createIfAbsentCountMap.get(key);
if (ValidateUtils.isNull(timestampAndCount) ||
(System.currentTimeMillis() - (timestampAndCount / 100) >= (24 *60 * 60 * 1000))) {
// 24小时卫触发, 统计归0
timestampAndCount = System.currentTimeMillis() * 100L + 1;
} else if (timestampAndCount % 100 > 20) {
return ResultStatus.OPERATION_FORBIDDEN;
} else {
timestampAndCount += 1;
}
createIfAbsentCountMap.put(key, timestampAndCount);
return ResultStatus.SUCCESS;
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.dao.*;
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
......
package com.xiaojukeji.kafka.manager.web;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
......@@ -18,10 +20,13 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableAutoConfiguration
@SpringBootApplication(scanBasePackages = {"com.xiaojukeji.kafka.manager"})
public class MainApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(MainApplication.class);
public static void main(String[] args) {
try {
SpringApplication sa = new SpringApplication(MainApplication.class);
sa.run(args);
LOGGER.info("MainApplication started");
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -3,7 +3,7 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.gateway;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel;
import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent;
import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicConnectionDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService;
......@@ -34,19 +34,19 @@ public class GatewayHeartbeatController {
@ApiOperation(value = "连接信息上报入口", notes = "Broker主动上报信息")
@RequestMapping(value = "heartbeat/survive-user", method = RequestMethod.POST)
@ResponseBody
public DeprecatedResponseResult receiveTopicConnections(@RequestParam("clusterId") String clusterId,
@RequestParam("brokerId") String brokerId,
@RequestBody List<TopicConnectionDTO> dtoList) {
public Result receiveTopicConnections(@RequestParam("clusterId") String clusterId,
@RequestParam("brokerId") String brokerId,
@RequestBody List<TopicConnectionDTO> dtoList) {
try {
if (ValidateUtils.isEmptyList(dtoList)) {
return DeprecatedResponseResult.success("success");
return Result.buildSuc();
}
topicConnectionService.batchAdd(dtoList);
return DeprecatedResponseResult.success("success");
return Result.buildSuc();
} catch (Exception e) {
LOGGER.error("receive topic connections failed, clusterId:{} brokerId:{} req:{}",
clusterId, brokerId, JSON.toJSONString(dtoList), e);
}
return DeprecatedResponseResult.failure("fail");
return Result.buildFailure("fail");
}
}
\ No newline at end of file
......@@ -2,7 +2,7 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.gateway;
import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel;
import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent;
import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicReportDO;
......@@ -35,15 +35,15 @@ public class GatewayReportController {
@ApiOperation(value = "查询开启JMX采集的Topic", notes = "")
@RequestMapping(value = "report/jmx/topics", method = RequestMethod.GET)
@ResponseBody
public DeprecatedResponseResult getJmxReportTopics(@RequestParam("clusterId") Long clusterId) {
public Result getJmxReportTopics(@RequestParam("clusterId") Long clusterId) {
List<TopicReportDO> doList = topicReportService.getNeedReportTopic(clusterId);
if (ValidateUtils.isEmptyList(doList)) {
return DeprecatedResponseResult.success();
if (ValidateUtils.isNull(doList)) {
doList = new ArrayList<>();
}
List<String> topicNameList = new ArrayList<>();
for (TopicReportDO elem: doList) {
topicNameList.add(elem.getTopicName());
}
return DeprecatedResponseResult.success(ListUtils.strList2String(topicNameList));
return Result.buildSuc(ListUtils.strList2String(topicNameList));
}
}
\ No newline at end of file
......@@ -3,7 +3,8 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.gateway;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel;
import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent;
import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.KafkaAclSearchDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.KafkaUserSearchDTO;
import com.xiaojukeji.kafka.manager.common.entity.vo.gateway.KafkaSecurityVO;
......@@ -40,9 +41,9 @@ public class GatewaySecurityController {
@ApiOperation(value = "Kafka用户查询", notes = "")
@RequestMapping(value = "security/users", method = RequestMethod.POST)
@ResponseBody
public DeprecatedResponseResult<String> getKafkaUsers(@RequestBody KafkaUserSearchDTO dto) {
public Result<String> getKafkaUsers(@RequestBody KafkaUserSearchDTO dto) {
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
return DeprecatedResponseResult.failure("invalid request");
return Result.buildFrom(ResultStatus.GATEWAY_INVALID_REQUEST);
}
try {
......@@ -50,16 +51,16 @@ public class GatewaySecurityController {
dto.getStart(),
dto.getEnd().equals(0L)? System.currentTimeMillis(): dto.getEnd()
);
if (ValidateUtils.isEmptyList(doList)) {
return DeprecatedResponseResult.success();
if (ValidateUtils.isNull(doList)) {
doList = new ArrayList<>();
}
KafkaSecurityVO vo = new KafkaSecurityVO();
vo.setRows(new ArrayList<>(GatewayModelConverter.convert2KafkaUserVOList(doList)));
return DeprecatedResponseResult.success(JSON.toJSONString(vo));
return Result.buildSuc(JSON.toJSONString(vo));
} catch (Exception e) {
LOGGER.error("get kafka users failed, req:{}.", dto, e);
return DeprecatedResponseResult.failure("get kafka users exception");
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
}
}
......@@ -67,9 +68,9 @@ public class GatewaySecurityController {
@ApiOperation(value = "Kafka用户权限查询", notes = "")
@RequestMapping(value = "security/acls", method = RequestMethod.POST)
@ResponseBody
public DeprecatedResponseResult<String> getKafkaAcls(@RequestBody KafkaAclSearchDTO dto) {
public Result<String> getKafkaAcls(@RequestBody KafkaAclSearchDTO dto) {
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
return DeprecatedResponseResult.failure("invalid request");
return Result.buildFrom(ResultStatus.GATEWAY_INVALID_REQUEST);
}
try {
......@@ -78,16 +79,16 @@ public class GatewaySecurityController {
dto.getStart(),
dto.getEnd().equals(0L)? System.currentTimeMillis(): dto.getEnd()
);
if (ValidateUtils.isEmptyList(doList)) {
return DeprecatedResponseResult.success();
if (ValidateUtils.isNull(doList)) {
doList = new ArrayList<>();
}
KafkaSecurityVO vo = new KafkaSecurityVO();
vo.setRows(new ArrayList<>(GatewayModelConverter.convert2KafkaAclVOList(doList)));
return DeprecatedResponseResult.success(JSON.toJSONString(vo));
return Result.buildSuc(JSON.toJSONString(vo));
} catch (Exception e) {
LOGGER.error("get kafka acls failed, req:{}.", dto, e);
return DeprecatedResponseResult.failure("get kafka acls exception");
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
}
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel;
import com.xiaojukeji.kafka.manager.common.bizenum.gateway.GatewayConfigKeyEnum;
import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent;
import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.*;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.GatewayConfigDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.gateway.GatewayConfigVO;
......@@ -54,33 +54,27 @@ public class GatewayServiceDiscoveryController {
@ApiOperation(value = "获取集群服务地址", notes = "")
@RequestMapping(value = "discovery/init", method = RequestMethod.GET)
@ResponseBody
public DeprecatedResponseResult<String> getAllKafkaBootstrapServers() {
public Result<String> getAllKafkaBootstrapServers() {
KafkaBootstrapServerConfig config =
gatewayConfigService.getKafkaBootstrapServersConfig(Long.MIN_VALUE);
if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getClusterIdBootstrapServersMap())) {
return DeprecatedResponseResult.failure("call init kafka bootstrap servers failed");
return Result.buildFailure("call init kafka bootstrap servers failed");
}
if (config.getClusterIdBootstrapServersMap().isEmpty()) {
return DeprecatedResponseResult.success();
}
return DeprecatedResponseResult.success(JSON.toJSONString(config.getClusterIdBootstrapServersMap()));
return Result.buildSuc(JSON.toJSONString(config.getClusterIdBootstrapServersMap()));
}
@ApiLevel(level = ApiLevelContent.LEVEL_IMPORTANT_2)
@ApiOperation(value = "获取集群服务地址", notes = "")
@RequestMapping(value = "discovery/update", method = RequestMethod.GET)
@ResponseBody
public DeprecatedResponseResult getBootstrapServersIfNeeded(@RequestParam("versionNumber") long versionNumber) {
public Result<String> getBootstrapServersIfNeeded(@RequestParam("versionNumber") long versionNumber) {
KafkaBootstrapServerConfig config =
gatewayConfigService.getKafkaBootstrapServersConfig(versionNumber);
if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getClusterIdBootstrapServersMap())) {
return DeprecatedResponseResult.failure("call update kafka bootstrap servers failed");
return Result.buildFailure("call update kafka bootstrap servers failed");
}
if (config.getClusterIdBootstrapServersMap().isEmpty()) {
return DeprecatedResponseResult.success();
}
return DeprecatedResponseResult.success(JSON.toJSONString(new GatewayConfigVO(
return Result.buildSuc(JSON.toJSONString(new GatewayConfigVO(
String.valueOf(config.getVersion()),
JSON.toJSONString(config.getClusterIdBootstrapServersMap())
)));
......@@ -90,15 +84,13 @@ public class GatewayServiceDiscoveryController {
@ApiOperation(value = "最大并发请求数", notes = "")
@RequestMapping(value = "discovery/max-request-num", method = RequestMethod.GET)
@ResponseBody
public DeprecatedResponseResult getMaxRequestNum(@RequestParam("versionNumber") long versionNumber) {
public Result<String> getMaxRequestNum(@RequestParam("versionNumber") long versionNumber) {
RequestQueueConfig config = gatewayConfigService.getRequestQueueConfig(versionNumber);
if (ValidateUtils.isNull(config)) {
return DeprecatedResponseResult.failure("call get request queue size config failed");
}
if (ValidateUtils.isNull(config.getMaxRequestQueueSize())) {
return DeprecatedResponseResult.success();
return Result.buildFailure("call get request queue size config failed");
}
return DeprecatedResponseResult.success(JSON.toJSONString(
return Result.buildSuc(JSON.toJSONString(
new GatewayConfigVO(
String.valueOf(config.getVersion()),
String.valueOf(config.getMaxRequestQueueSize())
......@@ -110,15 +102,13 @@ public class GatewayServiceDiscoveryController {
@ApiOperation(value = "最大APP请求速率", notes = "")
@RequestMapping(value = "discovery/appId-rate", method = RequestMethod.GET)
@ResponseBody
public DeprecatedResponseResult getAppIdRate(@RequestParam("versionNumber") long versionNumber) {
public Result<String> getAppIdRate(@RequestParam("versionNumber") long versionNumber) {
AppRateConfig config = gatewayConfigService.getAppRateConfig(versionNumber);
if (ValidateUtils.isNull(config)) {
return DeprecatedResponseResult.failure("call get app rate config failed");
}
if (ValidateUtils.isNull(config.getAppRateLimit())) {
return DeprecatedResponseResult.success();
return Result.buildFailure("call get app rate config failed");
}
return DeprecatedResponseResult.success(JSON.toJSONString(
return Result.buildSuc(JSON.toJSONString(
new GatewayConfigVO(
String.valueOf(config.getVersion()),
String.valueOf(config.getAppRateLimit())
......@@ -130,15 +120,12 @@ public class GatewayServiceDiscoveryController {
@ApiOperation(value = "最大IP请求速率", notes = "")
@RequestMapping(value = "discovery/ip-rate", method = RequestMethod.GET)
@ResponseBody
public DeprecatedResponseResult getIpRate(@RequestParam("versionNumber") long versionNumber) {
public Result getIpRate(@RequestParam("versionNumber") long versionNumber) {
IpRateConfig config = gatewayConfigService.getIpRateConfig(versionNumber);
if (ValidateUtils.isNull(config)) {
return DeprecatedResponseResult.failure("call get ip rate config failed");
}
if (ValidateUtils.isNull(config.getIpRateLimit())) {
return DeprecatedResponseResult.success();
return Result.buildFailure("call get ip rate config failed");
}
return DeprecatedResponseResult.success(JSON.toJSONString(
return Result.buildSuc(JSON.toJSONString(
new GatewayConfigVO(
String.valueOf(config.getVersion()),
String.valueOf(config.getIpRateLimit())
......@@ -150,15 +137,11 @@ public class GatewayServiceDiscoveryController {
@ApiOperation(value = "最大SP请求速率", notes = "")
@RequestMapping(value = "discovery/sp-limit", method = RequestMethod.GET)
@ResponseBody
public DeprecatedResponseResult getSpLimit(@RequestParam("versionNumber") long versionNumber) {
public Result<String> getSpLimit(@RequestParam("versionNumber") long versionNumber) {
SpRateConfig config =
gatewayConfigService.getSpRateConfig(versionNumber);
if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getSpRateMap())) {
return DeprecatedResponseResult.failure("call update kafka bootstrap servers failed");
}
if (config.getSpRateMap().isEmpty()) {
return DeprecatedResponseResult.success();
return Result.buildFailure("call update kafka bootstrap servers failed");
}
List<String> strList = new ArrayList<>();
......@@ -166,7 +149,7 @@ public class GatewayServiceDiscoveryController {
strList.add(entry.getKey() + "#" + String.valueOf(entry.getValue()));
}
return DeprecatedResponseResult.success(JSON.toJSONString(new GatewayConfigVO(
return Result.buildSuc(JSON.toJSONString(new GatewayConfigVO(
String.valueOf(config.getVersion()),
ListUtils.strList2String(strList)
)));
......
package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.app.AppVO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.web.converters.AppConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 20/9/23
*/
@Api(tags = "开放接口-App相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX)
public class ThirdPartAppController {
private final static Logger LOGGER = LoggerFactory.getLogger(ThirdPartAppController.class);
@Autowired
private AppService appService;
@ApiOperation(value = "查询负责的应用", notes = "")
@RequestMapping(value = "principal-apps/{principal}/basic-info", method = RequestMethod.GET)
@ResponseBody
public Result<List<AppVO>> searchPrincipalApps(@PathVariable("principal") String principal,
@RequestParam("system-code") String systemCode) {
LOGGER.info("search principal-apps, principal:{} systemCode:{}.", principal, systemCode);
if (ValidateUtils.isBlank(principal) || ValidateUtils.isBlank(systemCode)) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
if (!SystemCodeConstant.KAFKA_MANAGER.equals(systemCode)) {
return Result.buildFrom(ResultStatus.OPERATION_FORBIDDEN);
}
return new Result<>(AppConverter.convert2AppVOList(
appService.getByPrincipal(principal)
));
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.openapi.common.vo.ThirdPartBrokerOverviewVO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.kafka.manager.openapi.common.vo.BrokerRegionVO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.BrokerService;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.RegionService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author zengqiao
* @date 20/9/9
*/
@Api(tags = "开放接口-Broker相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX)
public class ThirdPartBrokerController {
@Autowired
private BrokerService brokerService;
@Autowired
private RegionService regionService;
@Autowired
private ClusterService clusterService;
@ApiOperation(value = "Broker信息概览", notes = "")
@RequestMapping(value = "{clusterId}/brokers/{brokerId}/overview", method = RequestMethod.GET)
@ResponseBody
public Result<ThirdPartBrokerOverviewVO> getBrokerOverview(@PathVariable Long clusterId,
@PathVariable Integer brokerId) {
BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
if (ValidateUtils.isNull(brokerMetadata)) {
return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST);
}
BrokerMetrics brokerMetrics = brokerService.getBrokerMetricsFromJmx(
clusterId,
brokerId,
KafkaMetricsCollections.BROKER_STATUS_PAGE_METRICS
);
if (ValidateUtils.isNull(brokerMetrics)) {
return Result.buildFrom(ResultStatus.OPERATION_FAILED);
}
Integer underReplicated = brokerMetrics.getSpecifiedMetrics("UnderReplicatedPartitionsValue", Integer.class);
if (ValidateUtils.isNull(underReplicated)) {
return Result.buildFrom(ResultStatus.OPERATION_FAILED);
}
return new Result<>(new ThirdPartBrokerOverviewVO(clusterId, brokerId, underReplicated.equals(0)));
}
@ApiOperation(value = "BrokerRegion信息", notes = "所有集群的")
@RequestMapping(value = "broker-regions", method = RequestMethod.GET)
@ResponseBody
public Result<List<BrokerRegionVO>> getBrokerRegions() {
List<ClusterDO> clusterDOList = clusterService.list();
if (ValidateUtils.isNull(clusterDOList)) {
clusterDOList = new ArrayList<>();
}
List<RegionDO> regionDOList = regionService.listAll();
if (ValidateUtils.isNull(regionDOList)) {
regionDOList = new ArrayList<>();
}
List<BrokerRegionVO> voList = new ArrayList<>();
for (ClusterDO clusterDO: clusterDOList) {
Map<Integer, RegionDO> brokerIdRegionMap = regionService.convert2BrokerIdRegionMap(
regionDOList.stream().filter(elem -> clusterDO.getId().equals(elem.getClusterId())).collect(Collectors.toList())
);
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterDO.getId())) {
BrokerRegionVO vo = new BrokerRegionVO();
vo.setClusterId(clusterDO.getId());
vo.setBrokerId(brokerId);
BrokerMetadata metadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterDO.getId(), brokerId);
if (!ValidateUtils.isNull(metadata)) {
vo.setHostname(metadata.getHost());
}
RegionDO regionDO = brokerIdRegionMap.get(brokerId);
if (!ValidateUtils.isNull(regionDO)) {
vo.setRegionName(regionDO.getName());
}
voList.add(vo);
}
}
return new Result<>(voList);
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart;
import com.xiaojukeji.kafka.manager.common.bizenum.ConsumeHealthEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetLocationEnum;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumeDetailDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupDTO;
import com.xiaojukeji.kafka.manager.openapi.common.dto.ConsumeHealthDTO;
import com.xiaojukeji.kafka.manager.openapi.common.dto.OffsetResetDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupDetailVO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.openapi.ThirdPartService;
import com.xiaojukeji.kafka.manager.openapi.common.vo.ConsumeHealthVO;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author zengqiao
* @date 20/10/12
*/
@Api(tags = "开放接口-Consumer相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX)
public class ThirdPartConsumeController {
private final static Logger LOGGER = LoggerFactory.getLogger(ThirdPartConsumeController.class);
@Autowired
private AppService appService;
@Autowired
private ClusterService clusterService;
@Autowired
private ConsumerService consumerService;
@Autowired
private AuthorityService authorityService;
@Autowired
private ThirdPartService thirdPartService;
@ApiOperation(value = "消费组健康", notes = "消费组是否健康")
@RequestMapping(value = "clusters/consumer-health", method = RequestMethod.POST)
@ResponseBody
public Result<ConsumeHealthVO> checkConsumeHealth(@RequestBody ConsumeHealthDTO dto) {
LOGGER.info("");
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
Result<ConsumeHealthEnum> subEnumResult = null;
for (String topicName: dto.getTopicNameList()) {
subEnumResult = thirdPartService.checkConsumeHealth(
dto.getClusterId(),
topicName,
dto.getConsumerGroup(),
dto.getMaxDelayTime()
);
if (!Constant.SUCCESS.equals(subEnumResult.getCode())) {
return new Result<>(subEnumResult.getCode(), subEnumResult.getMessage());
}
}
if (ValidateUtils.isNull(subEnumResult)) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
return new Result<>(new ConsumeHealthVO(subEnumResult.getData().getCode()));
}
@ApiOperation(value = "重置消费组", notes = "")
@RequestMapping(value = "consumers/offsets", method = RequestMethod.PUT)
@ResponseBody
public Result<List<Result>> resetOffsets(@RequestBody OffsetResetDTO dto) {
LOGGER.info("rest offset, req:{}.", dto);
if (ValidateUtils.isNull(dto) || !dto.legal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
ClusterDO clusterDO = clusterService.getById(dto.getClusterId());
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
// 检查AppID权限
if (!appService.verifyAppIdByPassword(dto.getAppId(), dto.getPassword())) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
// 检查权限
AuthorityDO authority =
authorityService.getAuthority(dto.getClusterId(), dto.getTopicName(), dto.getAppId());
if (ValidateUtils.isNull(authority) || (authority.getAccess() & 1) <= 0) {
authority = authorityService.getAuthority(dto.getClusterId(), "*", dto.getAppId());
}
if (authority == null || (authority.getAccess() & 1) <= 0) {
return Result.buildFrom(ResultStatus.USER_WITHOUT_AUTHORITY);
}
List<Result> resultList = thirdPartService.resetOffsets(clusterDO, dto);
if (ValidateUtils.isNull(resultList)) {
return Result.buildFrom(ResultStatus.OPERATION_FAILED);
}
for (Result result: resultList) {
if (!Constant.SUCCESS.equals(result.getCode())) {
return Result.buildFrom(ResultStatus.OPERATION_FAILED);
}
}
return new Result<>(resultList);
}
@ApiOperation(value = "查询消费组的消费详情", notes = "")
@RequestMapping(value = "{physicalClusterId}/consumers/{consumerGroup}/topics/{topicName}/consume-details",
method = RequestMethod.GET)
@ResponseBody
public Result<List<ConsumerGroupDetailVO>> getConsumeDetail(@PathVariable Long physicalClusterId,
@PathVariable String consumerGroup,
@PathVariable String topicName,
@RequestParam("location") String location) {
if (ValidateUtils.isNull(location)) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
location = location.toLowerCase();
OffsetLocationEnum offsetStoreLocation = OffsetLocationEnum.getOffsetStoreLocation(location);
if (ValidateUtils.isNull(offsetStoreLocation)) {
return Result.buildFrom(ResultStatus.CG_LOCATION_ILLEGAL);
}
ConsumerGroupDTO consumeGroupDTO = new ConsumerGroupDTO(
clusterDO.getId(),
consumerGroup,
new ArrayList<>(),
offsetStoreLocation
);
try {
List<ConsumeDetailDTO> consumeDetailDTOList =
consumerService.getConsumeDetail(clusterDO, topicName, consumeGroupDTO);
return new Result<>(
ConsumerModelConverter.convert2ConsumerGroupDetailVO(
topicName,
consumerGroup,
location,
consumeDetailDTOList
)
);
} catch (Exception e) {
LOGGER.error("get consume detail failed, consumerGroup:{}.", consumeGroupDTO, e);
}
return Result.buildFrom(ResultStatus.OPERATION_FAILED);
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicStatisticMetricsVO;
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
import java.util.List;
/**
* @author zengqiao
* @date 20/7/24
*/
@Api(tags = "开放接口-Topic相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX)
public class ThirdPartTopicController {
private final static Logger LOGGER = LoggerFactory.getLogger(ThirdPartTopicController.class);
@Autowired
private TopicService topicService;
@Autowired
private ClusterService clusterService;
@Autowired
private ConsumerService consumerService;
@Autowired
private TopicManagerService topicManagerService;
@ApiOperation(value = "Topic元信息", notes = "LogX调用")
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
@ResponseBody
public Result<TopicMetadataVO> getTopicMetadata(@PathVariable Long clusterId, @PathVariable String topicName) {
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (ValidateUtils.isNull(topicMetadata)) {
return Result.buildFrom(ResultStatus.TOPIC_NOT_EXIST);
}
TopicMetadataVO vo = new TopicMetadataVO();
vo.setTopicName(topicMetadata.getTopic());
vo.setPartitionNum(topicMetadata.getPartitionNum());
return new Result<>(vo);
}
@ApiOperation(value = "Topic流量统计信息", notes = "")
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET)
@ResponseBody
public Result<TopicStatisticMetricsVO> getTopicStatisticMetrics(@PathVariable Long physicalClusterId,
@PathVariable String topicName,
@RequestParam("latest-day") Integer latestDay) {
try {
return new Result<>(new TopicStatisticMetricsVO(topicManagerService.getTopicMaxAvgBytesIn(
physicalClusterId,
topicName,
new Date(DateUtils.getDayStarTime(-1 * latestDay)),
new Date(),
1
)));
} catch (Exception e) {
LOGGER.error("get topic statistic metrics failed, clusterId:{} topicName:{} latestDay:{}."
, physicalClusterId, topicName, latestDay, e);
}
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
}
@ApiOperation(value = "Topic是否有流量", notes = "")
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/offset-changed", method = RequestMethod.GET)
@ResponseBody
public Result<TopicOffsetChangedVO> checkTopicExpired(@PathVariable Long physicalClusterId,
@PathVariable String topicName,
@RequestParam("latest-time") Long latestTime) {
Result<TopicOffsetChangedEnum> enumResult =
topicService.checkTopicOffsetChanged(physicalClusterId, topicName, latestTime);
if (!Constant.SUCCESS.equals(enumResult.getCode())) {
return new Result<>(enumResult.getCode(), enumResult.getMessage());
}
return new Result<>(new TopicOffsetChangedVO(enumResult.getData().getCode()));
}
@ApiOperation(value = "Topic实时流量信息", notes = "")
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/metrics", method = RequestMethod.GET)
@ResponseBody
public Result<RealTimeMetricsVO> getTopicMetrics(@PathVariable Long physicalClusterId,
@PathVariable String topicName) {
return new Result<>(CommonModelConverter.convert2RealTimeMetricsVO(
topicService.getTopicMetricsFromJMX(
physicalClusterId,
topicName,
KafkaMetricsCollections.COMMON_DETAIL_METRICS,
true
)
));
}
@ApiOperation(value = "Topic实时请求耗时信息", notes = "")
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/request-time", method = RequestMethod.GET)
@ResponseBody
public Result<List<TopicRequestTimeDetailVO>> getTopicRequestMetrics(@PathVariable Long physicalClusterId,
@PathVariable String topicName) {
BaseMetrics metrics = topicService.getTopicMetricsFromJMX(
physicalClusterId,
topicName,
KafkaMetricsCollections.TOPIC_REQUEST_TIME_DETAIL_PAGE_METRICS,
false
);
return new Result<>(TopicModelConverter.convert2TopicRequestTimeDetailVOList(metrics));
}
@ApiOperation(value = "查询Topic的消费组列表", notes = "")
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/consumer-groups", method = RequestMethod.GET)
@ResponseBody
public Result<List<ConsumerGroupVO>> getConsumeDetail(@PathVariable Long physicalClusterId,
@PathVariable String topicName) {
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
return new Result<>(ConsumerModelConverter.convert2ConsumerGroupVOList(
consumerService.getConsumerGroupList(physicalClusterId, topicName)
));
}
@ApiOperation(value = "Topic应用信息", notes = "")
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/apps", method = RequestMethod.GET)
@ResponseBody
public Result<List<TopicAuthorizedAppVO>> getTopicAppIds(@PathVariable Long physicalClusterId,
@PathVariable String topicName) {
return new Result<>(TopicModelConverter.convert2TopicAuthorizedAppVOList(
topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName))
);
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.web.config;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.web.inteceptor.PermissionInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
......@@ -31,7 +32,7 @@ public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(permissionInterceptor).addPathPatterns("/api/v1/**");
registry.addInterceptor(permissionInterceptor).addPathPatterns(ApiPrefix.API_PREFIX + "**");
}
@Override
......
......@@ -3,7 +3,7 @@ package com.xiaojukeji.kafka.manager.web.inteceptor;
import com.codahale.metrics.Timer;
import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel;
import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent;
import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.api.ApiCount;
......@@ -118,8 +118,8 @@ public class WebMetricsInterceptor {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String uri = attributes.getRequest().getRequestURI();
if (uri.contains("gateway/api/v1")) {
return DeprecatedResponseResult.failure("api limited");
if (uri.contains(ApiPrefix.GATEWAY_API_V1_PREFIX)) {
return Result.buildFailure("api limited");
}
return new Result<>(ResultStatus.OPERATION_FORBIDDEN);
}
......
......@@ -30,8 +30,13 @@ logging:
custom:
idc: cn
jmx:
max-conn: 10
agent:
account:
ldap:
kcm:
n9e:
base-url: http://127.0.0.1/api
username: admin
......@@ -42,11 +47,13 @@ agent:
monitor:
n9e:
base-url: http://127.0.0.1/api
username: admin
user-token: admin
nid: 10
notify:
kafka:
cluster-id: 95
topic-name: didi-kafka-notify
order:
detail-url: http://127.0.0.1
kafka:
cluster-id: 12
topic-name: 123
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册