未验证 提交 523944a5 编写于 作者: W wankai123 提交者: GitHub

Add arg `namespace` to func `retagByK8sMeta`, rebuild the relationship...

Add  arg `namespace` to func `retagByK8sMeta`, rebuild the relationship between pod and service by labels (#6650)

* Add `namespace` to func `retagByK8sMeta`, rebuild the relationship between pod and service by labels

* add param check
上级 0c0a8d43
......@@ -65,22 +65,22 @@ MAL supports using the metadata of k8s to manipulate the tags and their values.
This feature requires OAP Server to have the authority to access the K8s's `API Server`.
##### retagByK8sMeta
`retagByK8sMeta(newLabelName, K8sRetagType, existingLabelName)`. Add a new tag to the sample family based on an existing label's value. Provide several internal converting types, including
`retagByK8sMeta(newLabelName, K8sRetagType, existingLabelName, namespaceLabelName)`. Add a new tag to the sample family based on an existing label's value. Provide several internal converting types, including
- K8sRetagType.Pod2Service
Add a tag to the sample by using `service` as the key, `$serviceName.$namespace` as the value, by the given value of the tag key, which represents the name of a pod.
For example:
```
container_cpu_usage_seconds_total{container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh} 2
container_cpu_usage_seconds_total{namespace=default, container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh} 2
```
Expression:
```
container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')
container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')
```
Output:
```
container_cpu_usage_seconds_total{container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh, service='nginx-service.default'} 2
container_cpu_usage_seconds_total{namespace=default, container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh, service='nginx-service.default'} 2
```
### Binary operators
......
......@@ -312,15 +312,16 @@ public class SampleFamily {
}
/* k8s retags*/
public SampleFamily retagByK8sMeta(String newLabelName, K8sRetagType type, String existingLabelName) {
public SampleFamily retagByK8sMeta(String newLabelName, K8sRetagType type, String existingLabelName, String namespaceLabelName) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(newLabelName));
Preconditions.checkArgument(!Strings.isNullOrEmpty(existingLabelName));
Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceLabelName));
ExpressionParsingContext.get().ifPresent(ctx -> ctx.isRetagByK8sMeta = true);
if (this == EMPTY) {
return EMPTY;
}
return SampleFamily.build(this.context, type.execute(samples, newLabelName, existingLabelName));
return SampleFamily.build(this.context, type.execute(samples, newLabelName, existingLabelName, namespaceLabelName));
}
public SampleFamily histogram() {
......
......@@ -30,12 +30,15 @@ public enum K8sRetagType implements Retag {
Pod2Service {
@Override
public Sample[] execute(final Sample[] ss, final String newLabelName, final String existingLabelName) {
public Sample[] execute(final Sample[] ss,
final String newLabelName,
final String existingLabelName,
final String namespaceLabelName) {
Sample[] samples = Arrays.stream(ss).map(sample -> {
String podName = sample.getLabels().get(existingLabelName);
if (!Strings.isNullOrEmpty(podName)) {
String serviceName = K8sInfoRegistry.getInstance().findServiceName(podName);
String namespace = sample.getLabels().get(namespaceLabelName);
if (!Strings.isNullOrEmpty(podName) && !Strings.isNullOrEmpty(namespace)) {
String serviceName = K8sInfoRegistry.getInstance().findServiceName(namespace, podName);
if (!Strings.isNullOrEmpty(serviceName)) {
Map<String, String> labels = Maps.newHashMap(sample.getLabels());
labels.put(newLabelName, serviceName);
......
......@@ -21,5 +21,5 @@ package org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
public interface Retag {
Sample[] execute(Sample[] ss, String newLabelName, String existingLabelName);
Sample[] execute(Sample[] ss, String newLabelName, String existingLabelName, String namespaceLabelName);
}
......@@ -18,19 +18,21 @@
package org.apache.skywalking.oap.meter.analyzer.k8s;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1EndpointsList;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.models.V1Pod;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -39,7 +41,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
......@@ -48,10 +49,11 @@ public class K8sInfoRegistry {
private final static K8sInfoRegistry INSTANCE = new K8sInfoRegistry();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final Map<String/* ip */, V1Pod> ipPodMap = new ConcurrentHashMap<>();
private final Map<String/* ip */, String/* serviceName.namespace */> ipServiceMap = new ConcurrentHashMap<>();
private final Map<String/* podName */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
private final Map<String/* podName.namespace */, V1Pod> namePodMap = new ConcurrentHashMap<>();
protected final Map<String/* serviceName.namespace */, V1Service> nameServiceMap = new ConcurrentHashMap<>();
private final Map<String/* podName.namespace */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
private ExecutorService executor;
private static final String SEPARATOR = ".";
public static K8sInfoRegistry getInstance() {
return INSTANCE;
......@@ -79,16 +81,15 @@ public class K8sInfoRegistry {
final CoreV1Api coreV1Api = new CoreV1Api();
final SharedInformerFactory factory = new SharedInformerFactory(executor);
listenEndpointsEvents(coreV1Api, factory);
listenServiceEvents(coreV1Api, factory);
listenPodEvents(coreV1Api, factory);
factory.startAllRegisteredInformers();
}
}
private void listenEndpointsEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
private void listenServiceEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
factory.sharedIndexInformerFor(
params -> coreV1Api.listEndpointsForAllNamespacesCall(
params -> coreV1Api.listServiceForAllNamespacesCall(
null,
null,
null,
......@@ -100,22 +101,22 @@ public class K8sInfoRegistry {
params.watch,
null
),
V1Endpoints.class,
V1EndpointsList.class
).addEventHandler(new ResourceEventHandler<V1Endpoints>() {
V1Service.class,
V1ServiceList.class
).addEventHandler(new ResourceEventHandler<V1Service>() {
@Override
public void onAdd(final V1Endpoints endpoints) {
addEndpoints(endpoints);
public void onAdd(final V1Service service) {
addService(service);
}
@Override
public void onUpdate(final V1Endpoints oldEndpoints, final V1Endpoints newEndpoints) {
addEndpoints(newEndpoints);
public void onUpdate(final V1Service oldService, final V1Service newService) {
addService(newService);
}
@Override
public void onDelete(final V1Endpoints endpoints, final boolean deletedFinalStateUnknown) {
removeEndpoints(endpoints);
public void onDelete(final V1Service service, final boolean deletedFinalStateUnknown) {
removeService(service);
}
});
}
......@@ -154,71 +155,76 @@ public class K8sInfoRegistry {
});
}
private void addPod(final V1Pod pod) {
ofNullable(pod.getStatus()).ifPresent(
status -> ofNullable(status.getPodIP()).ifPresent(
ip -> ipPodMap.put(ip, pod))
protected void addService(final V1Service service) {
ofNullable(service.getMetadata()).ifPresent(
metadata -> nameServiceMap.put(metadata.getName() + SEPARATOR + metadata.getNamespace(), service)
);
recompose();
}
private void removePod(final V1Pod pod) {
ofNullable(pod.getStatus()).ifPresent(
status -> ipPodMap.remove(status.getPodIP())
);
ofNullable(pod.getMetadata()).ifPresent(
metadata -> podServiceMap.remove(pod.getMetadata().getName())
protected void removeService(final V1Service service) {
ofNullable(service.getMetadata()).ifPresent(
metadata -> nameServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace())
);
recompose();
}
private void addEndpoints(final V1Endpoints endpoints) {
V1ObjectMeta endpointsMetadata = endpoints.getMetadata();
if (isNull(endpointsMetadata)) {
log.error("Endpoints metadata is null: {}", endpoints);
return;
}
final String namespace = endpointsMetadata.getNamespace();
final String name = endpointsMetadata.getName();
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ipServiceMap.put(address.getIp(), name + "." + namespace)
))
));
protected void addPod(final V1Pod pod) {
ofNullable(pod.getMetadata()).ifPresent(
metadata -> namePodMap.put(metadata.getName() + SEPARATOR + metadata.getNamespace(), pod));
recompose();
}
private void removeEndpoints(final V1Endpoints endpoints) {
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ipServiceMap.remove(address.getIp())
))
));
recompose();
protected void removePod(final V1Pod pod) {
ofNullable(pod.getMetadata()).ifPresent(
metadata -> namePodMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
ofNullable(pod.getMetadata()).ifPresent(
metadata -> podServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
}
private void recompose() {
ipPodMap.forEach((ip, pod) -> {
final String namespaceService = ipServiceMap.get(ip);
if (isNullOrEmpty(namespaceService)) {
podServiceMap.remove(ip);
return;
}
final V1ObjectMeta podMetadata = pod.getMetadata();
if (isNull(podMetadata)) {
log.warn("Pod metadata is null, {}", pod);
return;
}
podServiceMap.put(pod.getMetadata().getName(), namespaceService);
namePodMap.forEach((podName, pod) -> {
nameServiceMap.forEach((serviceName, service) -> {
if (isNull(pod.getMetadata()) || isNull(service.getMetadata()) || isNull(service.getSpec())) {
return;
}
Map<String, String> selector = service.getSpec().getSelector();
Map<String, String> labels = pod.getMetadata().getLabels();
if (isNull(labels) || isNull(selector)) {
return;
}
String podNamespace = pod.getMetadata().getNamespace();
String serviceNamespace = service.getMetadata().getNamespace();
if (Strings.isNullOrEmpty(podNamespace) || Strings.isNullOrEmpty(
serviceNamespace) || !podNamespace.equals(serviceNamespace)) {
return;
}
if (hasIntersection(selector.entrySet(), labels.entrySet())) {
podServiceMap.put(podName, serviceName);
}
});
});
}
public String findServiceName(String podName) {
return this.podServiceMap.get(podName);
public String findServiceName(String namespace, String podName) {
return this.podServiceMap.get(podName + SEPARATOR + namespace);
}
private boolean hasIntersection(Collection<?> o, Collection<?> c) {
Objects.requireNonNull(o);
Objects.requireNonNull(c);
for (final Object value : o) {
if (c.contains(value)) {
return true;
}
}
return false;
}
}
......@@ -19,8 +19,14 @@
package org.apache.skywalking.oap.meter.analyzer.dsl;
import com.google.common.collect.ImmutableMap;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
import org.junit.Before;
......@@ -28,13 +34,13 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.reflect.Whitebox;
import static com.google.common.collect.ImmutableMap.of;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
@Slf4j
@RunWith(Parameterized.class)
......@@ -63,24 +69,28 @@ public class K8sTagTest {
of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-mbczh"))
of(
"namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
"my-nginx-5dc4865748-mbczh"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx"
))
.value(1)
.build()
).build()),
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')",
Result.success(SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of(
"container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-mbczh",
"namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
"my-nginx-5dc4865748-mbczh",
"service", "nginx-service.default"
))
.value(2)
......@@ -88,7 +98,7 @@ public class K8sTagTest {
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx",
"service", "kube-state-metrics.kube-system"
))
......@@ -102,31 +112,35 @@ public class K8sTagTest {
of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-pod"))
of(
"namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
"my-nginx-5dc4865748-no-pod"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx"
))
.value(1)
.build()
).build()),
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')",
Result.success(SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of(
"container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-pod"
"namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
"my-nginx-5dc4865748-no-pod"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx",
"service", "kube-state-metrics.kube-system"
))
......@@ -140,31 +154,35 @@ public class K8sTagTest {
of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-service"))
of(
"namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
"my-nginx-5dc4865748-no-service"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx"
))
.value(1)
.build()
).build()),
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')",
Result.success(SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of(
"container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-service"
"namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
"my-nginx-5dc4865748-no-service"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx",
"service", "kube-state-metrics.kube-system"
))
......@@ -176,19 +194,44 @@ public class K8sTagTest {
});
}
@SneakyThrows
@Before
public void setup() {
Whitebox.setInternalState(K8sInfoRegistry.class, "INSTANCE",
Mockito.spy(K8sInfoRegistry.getInstance())
);
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-mbczh")).thenReturn(
"nginx-service.default");
when(K8sInfoRegistry.getInstance().findServiceName("kube-state-metrics-6f979fd498-z7xwx")).thenReturn(
"kube-state-metrics.kube-system");
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-pod")).thenReturn(
null);
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-service")).thenReturn(
null);
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
.thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addService",
mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"))
).thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addPod",
mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
).thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addPod",
mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"))
).thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "removeService", mockService("nginx-service", "default", of("run", "nginx")))
.thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "removePod",
mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
).thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
.thenCallRealMethod();
PowerMockito.when(
K8sInfoRegistry.getInstance(), "addPod",
mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
).thenCallRealMethod();
}
@Test
......@@ -209,4 +252,29 @@ public class K8sTagTest {
}
assertThat(r, is(want));
}
private V1Service mockService(String name, String namespace, Map<String, String> selector) {
V1Service service = new V1Service();
V1ObjectMeta serviceMeta = new V1ObjectMeta();
V1ServiceSpec v1ServiceSpec = new V1ServiceSpec();
serviceMeta.setName(name);
serviceMeta.setNamespace(namespace);
service.setMetadata(serviceMeta);
v1ServiceSpec.setSelector(selector);
service.setSpec(v1ServiceSpec);
return service;
}
private V1Pod mockPod(String name, String namespace, Map<String, String> labels) {
V1Pod v1Pod = new V1Pod();
V1ObjectMeta podMeta = new V1ObjectMeta();
podMeta.setName(name);
podMeta.setNamespace(namespace);
podMeta.setLabels(labels);
v1Pod.setMetadata(podMeta);
return v1Pod;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册