未验证 提交 718d0640 编写于 作者: K kezhenxu94 提交者: GitHub

Fix wrong service name when IP is node IP in `k8s-mesh` (#7737)

上级 224b6c6b
......@@ -67,6 +67,7 @@ Release Notes.
* Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
* Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
* Improve the speed of writing TiDB by batching the SQL execution.
* Fix wrong service name when IP is node IP in `k8s-mesh`.
#### UI
......
......@@ -66,6 +66,8 @@ public class K8SServiceRegistry {
private final EnvoyMetricReceiverConfig config;
private final KubernetesNodeRegistry nodeRegistry;
public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) {
this.config = config;
......@@ -80,6 +82,7 @@ public class K8SServiceRegistry {
.setDaemon(true)
.build()
);
nodeRegistry = new KubernetesNodeRegistry();
}
public void start() throws IOException {
......@@ -99,6 +102,8 @@ public class K8SServiceRegistry {
listenPodEvents(coreV1Api, factory);
factory.startAllRegisteredInformers();
nodeRegistry.start();
}
private void listenServiceEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
......@@ -268,6 +273,9 @@ public class K8SServiceRegistry {
}
public ServiceMetaInfo findService(final String ip) {
if (nodeRegistry.isNode(ip)) {
return config.serviceMetaInfoFactory().unknown();
}
final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
if (isNull(service)) {
log.debug("Unknown ip {}, ip -> service is null", ip);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Node;
import io.kubernetes.client.openapi.models.V1NodeAddress;
import io.kubernetes.client.openapi.models.V1NodeList;
import io.kubernetes.client.openapi.models.V1NodeStatus;
import io.kubernetes.client.util.Config;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
@Slf4j
final class KubernetesNodeRegistry implements ResourceEventHandler<V1Node> {
private final Set<String> nodeIPs;
private final ExecutorService executor;
public KubernetesNodeRegistry() {
nodeIPs = Collections.newSetFromMap(new ConcurrentHashMap<>());
executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("KubernetesNodeRegistry-%d")
.setDaemon(true)
.build()
);
}
public void start() throws IOException {
final ApiClient apiClient = Config.defaultClient();
apiClient.setHttpClient(apiClient.getHttpClient()
.newBuilder()
.readTimeout(0, TimeUnit.SECONDS)
.build());
Configuration.setDefaultApiClient(apiClient);
final CoreV1Api coreV1Api = new CoreV1Api();
final SharedInformerFactory factory = new SharedInformerFactory(executor);
listenNodeEvents(coreV1Api, factory);
factory.startAllRegisteredInformers();
}
private void listenNodeEvents(final CoreV1Api coreV1Api,
final SharedInformerFactory factory) {
factory.sharedIndexInformerFor(
params -> coreV1Api.listNodeCall(
null,
null,
null,
null,
null,
null,
params.resourceVersion,
null,
params.timeoutSeconds,
params.watch,
null
),
V1Node.class,
V1NodeList.class
).addEventHandler(this);
}
@Override
public void onAdd(final V1Node node) {
forEachAddress(node, nodeIPs::add);
}
@Override
public void onUpdate(final V1Node oldNode, final V1Node newNode) {
onAdd(newNode);
}
@Override
public void onDelete(final V1Node node,
final boolean deletedFinalStateUnknown) {
forEachAddress(node, nodeIPs::remove);
}
void forEachAddress(final V1Node node,
final Consumer<String> consume) {
Optional.ofNullable(node)
.map(V1Node::getStatus)
.map(V1NodeStatus::getAddresses)
.ifPresent(addresses ->
addresses.stream()
.map(V1NodeAddress::getAddress)
.filter(StringUtil::isNotBlank)
.forEach(consume)
);
}
boolean isNode(final String ip) {
return nodeIPs.contains(ip);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册