From 279c81e9fc8da798b0cc1d88ef418fa160c54e2b Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Tue, 30 Oct 2018 16:05:18 +0800 Subject: [PATCH] Fix k8s api connection leak (#1848) * Update submodule skywalking-ui * Fix k8s api bugs * Fix api connection leak --- .../kubernetes/KubernetesCoordinator.java | 3 ++ .../dependencies/NamespacedPodListWatch.java | 41 +++++++++++++------ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java index ae8ef01073..5c3e72f7e3 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java @@ -92,6 +92,9 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery private void generateRemoteNodes() { for (Event event : watch) { + if (event == null) { + break; + } logger.debug("Received event {} {}-{}", event.getType(), event.getUid(), event.getHost()); switch (event.getType()) { case "ADDED": diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java index 3fbcae827b..985deae514 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies; -import com.google.common.collect.Iterators; import com.google.common.reflect.TypeToken; import io.kubernetes.client.ApiClient; import io.kubernetes.client.ApiException; @@ -28,9 +27,10 @@ import io.kubernetes.client.models.V1Pod; import io.kubernetes.client.util.Config; import io.kubernetes.client.util.Watch; import java.io.IOException; -import java.util.Collections; import java.util.Iterator; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event; import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch; import org.slf4j.Logger; @@ -83,16 +83,33 @@ public class NamespacedPodListWatch implements ReusableWatch { } @Override public Iterator iterator() { - try { - return Iterators.transform(watch.iterator(), response -> { - if (response == null) { - throw new NullPointerException("Original event is null"); + final Iterator> watchItr = watch.iterator(); + return new Iterator() { + @Override public boolean hasNext() { + return wrap(watchItr::hasNext, false); + } + + @Override public Event next() { + return wrap(() -> { + final Watch.Response response = watchItr.next(); + return new Event(response.type, response.object.getMetadata().getUid(), response.object.getStatus().getPodIP()); + }, null); + } + + private R wrap(final Supplier action, final R defaultValue) { + Objects.requireNonNull(action); + try { + return action.get(); + } catch (final Throwable t) { + logger.trace("Throwable", t); + try { + watch.close(); + } catch (IOException e) { + logger.error("Close watch error", e); + } } - return new Event(response.type, response.object.getMetadata().getUid(), response.object.getStatus().getPodIP()); - }); - } catch (final RuntimeException exp) { - logger.trace("Runtime exception", exp); - } - return Collections.emptyIterator(); + return defaultValue; + } + }; } } -- GitLab