未验证 提交 5e8b6e92 编写于 作者: Z Zhenxu Ke 提交者: GitHub

Fix NPE and open some methods for extensibility (#5832)

上级 c66ae0b0
...@@ -51,11 +51,11 @@ public class LogEntry2MetricsAdapter { ...@@ -51,11 +51,11 @@ public class LogEntry2MetricsAdapter {
/** /**
* The access log entry that is to be adapted into metrics builders. * The access log entry that is to be adapted into metrics builders.
*/ */
private final HTTPAccessLogEntry entry; protected final HTTPAccessLogEntry entry;
private final ServiceMetaInfo sourceService; protected final ServiceMetaInfo sourceService;
private final ServiceMetaInfo targetService; protected final ServiceMetaInfo targetService;
/** /**
* Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}. * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}.
...@@ -94,7 +94,7 @@ public class LogEntry2MetricsAdapter { ...@@ -94,7 +94,7 @@ public class LogEntry2MetricsAdapter {
protected ServiceMeshMetric.Builder adaptCommonPart() { protected ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties(); final AccessLogCommon properties = entry.getCommonProperties();
final String endpoint = ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/"); final String endpoint = endpoint();
final int responseCode = ofNullable(entry.getResponse()).map(HTTPResponseProperties::getResponseCode).map(UInt32Value::getValue).orElse(200); final int responseCode = ofNullable(entry.getResponse()).map(HTTPResponseProperties::getResponseCode).map(UInt32Value::getValue).orElse(200);
final boolean status = responseCode >= 200 && responseCode < 400; final boolean status = responseCode >= 200 && responseCode < 400;
final Protocol protocol = requestProtocol(entry.getRequest()); final Protocol protocol = requestProtocol(entry.getRequest());
...@@ -124,6 +124,10 @@ public class LogEntry2MetricsAdapter { ...@@ -124,6 +124,10 @@ public class LogEntry2MetricsAdapter {
return builder; return builder;
} }
protected String endpoint() {
return ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/");
}
protected static long formatAsLong(final Timestamp timestamp) { protected static long formatAsLong(final Timestamp timestamp) {
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
} }
......
...@@ -52,20 +52,20 @@ import static java.util.Objects.isNull; ...@@ -52,20 +52,20 @@ import static java.util.Objects.isNull;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
@Slf4j @Slf4j
class K8SServiceRegistry { public class K8SServiceRegistry {
final Map<String/* ip */, ServiceMetaInfo> ipServiceMetaInfoMap; protected final Map<String/* ip */, ServiceMetaInfo> ipServiceMetaInfoMap;
final Map<String/* namespace:serviceName */, V1Service> idServiceMap; protected final Map<String/* namespace:serviceName */, V1Service> idServiceMap;
final Map<String/* ip */, V1Pod> ipPodMap; protected final Map<String/* ip */, V1Pod> ipPodMap;
final Map<String/* ip */, String/* namespace:serviceName */> ipServiceMap; protected final Map<String/* ip */, String/* namespace:serviceName */> ipServiceMap;
final ExecutorService executor; protected final ExecutorService executor;
final ServiceNameFormatter serviceNameFormatter; protected final ServiceNameFormatter serviceNameFormatter;
K8SServiceRegistry(final EnvoyMetricReceiverConfig config) { public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) {
serviceNameFormatter = new ServiceNameFormatter(config.getK8sServiceNameRule()); serviceNameFormatter = new ServiceNameFormatter(config.getK8sServiceNameRule());
ipServiceMetaInfoMap = new ConcurrentHashMap<>(); ipServiceMetaInfoMap = new ConcurrentHashMap<>();
idServiceMap = new ConcurrentHashMap<>(); idServiceMap = new ConcurrentHashMap<>();
...@@ -79,7 +79,7 @@ class K8SServiceRegistry { ...@@ -79,7 +79,7 @@ class K8SServiceRegistry {
); );
} }
void start() throws IOException { public void start() throws IOException {
final ApiClient apiClient = Config.defaultClient(); final ApiClient apiClient = Config.defaultClient();
apiClient.setHttpClient(apiClient.getHttpClient() apiClient.setHttpClient(apiClient.getHttpClient()
.newBuilder() .newBuilder()
...@@ -200,7 +200,7 @@ class K8SServiceRegistry { ...@@ -200,7 +200,7 @@ class K8SServiceRegistry {
}); });
} }
private void addService(final V1Service service) { protected void addService(final V1Service service) {
Optional.ofNullable(service.getMetadata()).ifPresent( Optional.ofNullable(service.getMetadata()).ifPresent(
metadata -> idServiceMap.put(metadata.getNamespace() + ":" + metadata.getName(), service) metadata -> idServiceMap.put(metadata.getNamespace() + ":" + metadata.getName(), service)
); );
...@@ -208,13 +208,13 @@ class K8SServiceRegistry { ...@@ -208,13 +208,13 @@ class K8SServiceRegistry {
recompose(); recompose();
} }
private void removeService(final V1Service service) { protected void removeService(final V1Service service) {
Optional.ofNullable(service.getMetadata()).ifPresent( Optional.ofNullable(service.getMetadata()).ifPresent(
metadata -> idServiceMap.remove(metadata.getUid()) metadata -> idServiceMap.remove(metadata.getUid())
); );
} }
private void addPod(final V1Pod pod) { protected void addPod(final V1Pod pod) {
Optional.ofNullable(pod.getStatus()).ifPresent( Optional.ofNullable(pod.getStatus()).ifPresent(
status -> ipPodMap.put(status.getPodIP(), pod) status -> ipPodMap.put(status.getPodIP(), pod)
); );
...@@ -222,13 +222,13 @@ class K8SServiceRegistry { ...@@ -222,13 +222,13 @@ class K8SServiceRegistry {
recompose(); recompose();
} }
private void removePod(final V1Pod pod) { protected void removePod(final V1Pod pod) {
Optional.ofNullable(pod.getStatus()).ifPresent( Optional.ofNullable(pod.getStatus()).ifPresent(
status -> ipPodMap.remove(status.getPodIP()) status -> ipPodMap.remove(status.getPodIP())
); );
} }
private void addEndpoints(final V1Endpoints endpoints) { protected void addEndpoints(final V1Endpoints endpoints) {
final String namespace = requireNonNull(endpoints.getMetadata()).getNamespace(); final String namespace = requireNonNull(endpoints.getMetadata()).getNamespace();
final String name = requireNonNull(endpoints.getMetadata()).getName(); final String name = requireNonNull(endpoints.getMetadata()).getName();
...@@ -241,7 +241,7 @@ class K8SServiceRegistry { ...@@ -241,7 +241,7 @@ class K8SServiceRegistry {
recompose(); recompose();
} }
private void removeEndpoints(final V1Endpoints endpoints) { protected void removeEndpoints(final V1Endpoints endpoints) {
requireNonNull(endpoints.getSubsets()).forEach( requireNonNull(endpoints.getSubsets()).forEach(
subset -> requireNonNull(subset.getAddresses()).forEach( subset -> requireNonNull(subset.getAddresses()).forEach(
address -> ipServiceMap.remove(address.getIp()) address -> ipServiceMap.remove(address.getIp())
...@@ -249,7 +249,7 @@ class K8SServiceRegistry { ...@@ -249,7 +249,7 @@ class K8SServiceRegistry {
); );
} }
private List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final Map<String, String> labels) { protected List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final Map<String, String> labels) {
if (isNull(labels)) { if (isNull(labels)) {
return Collections.emptyList(); return Collections.emptyList();
} }
...@@ -259,7 +259,7 @@ class K8SServiceRegistry { ...@@ -259,7 +259,7 @@ class K8SServiceRegistry {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
ServiceMetaInfo findService(final String ip) { protected ServiceMetaInfo findService(final String ip) {
final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip); final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
if (isNull(service)) { if (isNull(service)) {
log.debug("Unknown ip {}, ip -> service is null", ip); log.debug("Unknown ip {}, ip -> service is null", ip);
...@@ -268,7 +268,7 @@ class K8SServiceRegistry { ...@@ -268,7 +268,7 @@ class K8SServiceRegistry {
return service; return service;
} }
private void recompose() { protected void recompose() {
ipPodMap.forEach((ip, pod) -> { ipPodMap.forEach((ip, pod) -> {
final String namespaceService = ipServiceMap.get(ip); final String namespaceService = ipServiceMap.get(ip);
final V1Service service; final V1Service service;
...@@ -297,7 +297,7 @@ class K8SServiceRegistry { ...@@ -297,7 +297,7 @@ class K8SServiceRegistry {
}); });
} }
boolean isEmpty() { protected boolean isEmpty() {
return ipServiceMetaInfoMap.isEmpty(); return ipServiceMetaInfoMap.isEmpty();
} }
} }
...@@ -127,7 +127,8 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -127,7 +127,8 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
return Collections.emptyList(); return Collections.emptyList();
} }
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
final Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ?
properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) { if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
return Collections.emptyList(); return Collections.emptyList();
......
...@@ -27,13 +27,13 @@ import java.util.regex.Pattern; ...@@ -27,13 +27,13 @@ import java.util.regex.Pattern;
import org.apache.commons.beanutils.PropertyUtils; import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
class ServiceNameFormatter { public class ServiceNameFormatter {
private final List<String> properties; private final List<String> properties;
private final StringBuffer serviceNamePattern; private final StringBuffer serviceNamePattern;
ServiceNameFormatter(String rule) { public ServiceNameFormatter(String rule) {
rule = StringUtils.defaultIfBlank(rule, "${service.metadata.name}"); rule = StringUtils.defaultIfBlank(rule, "${service.metadata.name}");
this.properties = new ArrayList<>(); this.properties = new ArrayList<>();
...@@ -48,7 +48,7 @@ class ServiceNameFormatter { ...@@ -48,7 +48,7 @@ class ServiceNameFormatter {
} }
} }
String format(final Map<String, Object> context) throws Exception { public String format(final Map<String, Object> context) throws Exception {
final Object[] values = new Object[properties.size()]; final Object[] values = new Object[properties.size()];
for (int i = 0; i < properties.size(); i++) { for (int i = 0; i < properties.size(); i++) {
......
...@@ -20,8 +20,6 @@ package org.apache.skywalking.oap.server.receiver.envoy.als.mx; ...@@ -20,8 +20,6 @@ package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.reflect.Invokable;
import com.google.common.reflect.TypeToken;
import com.google.protobuf.Struct; import com.google.protobuf.Struct;
import com.google.protobuf.Value; import com.google.protobuf.Value;
import java.io.InputStream; import java.io.InputStream;
...@@ -41,8 +39,7 @@ import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; ...@@ -41,8 +39,7 @@ import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.Yaml;
@Slf4j @Slf4j
@SuppressWarnings("UnstableApiUsage") public enum FieldsHelper {
enum FieldsHelper {
SINGLETON; SINGLETON;
private boolean initialized = false; private boolean initialized = false;
...@@ -55,17 +52,24 @@ enum FieldsHelper { ...@@ -55,17 +52,24 @@ enum FieldsHelper {
/** /**
* The mappings from the field name of {@link ServiceMetaInfo} to its {@code setter}. * The mappings from the field name of {@link ServiceMetaInfo} to its {@code setter}.
*/ */
private Map<String, Invokable<ServiceMetaInfo, ?>> fieldSetterMapping; private Map<String, Method> fieldSetterMapping;
public void init(final String file) throws Exception { public void init(final String file) throws Exception {
init(ResourceUtils.readToStream(file)); init(ResourceUtils.readToStream(file), ServiceMetaInfo.class);
}
public void init(final String file,
final Class<? extends ServiceMetaInfo> serviceInfoClass) throws Exception {
init(ResourceUtils.readToStream(file), serviceInfoClass);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
void init(final InputStream inputStream) throws ModuleStartException { public void init(final InputStream inputStream,
final Class<? extends ServiceMetaInfo> serviceInfoClass) throws ModuleStartException {
if (initialized) { if (initialized) {
return; return;
} }
final Yaml yaml = new Yaml(); final Yaml yaml = new Yaml();
final Map<String, String> config = (Map<String, String>) yaml.load(inputStream); final Map<String, String> config = (Map<String, String>) yaml.load(inputStream);
...@@ -92,11 +96,9 @@ enum FieldsHelper { ...@@ -92,11 +96,9 @@ enum FieldsHelper {
); );
try { try {
final Method setterMethod = ServiceMetaInfo.class.getMethod("set" + StringUtils.capitalize(serviceMetaInfoFieldName), String.class); final Method setterMethod = serviceInfoClass.getMethod("set" + StringUtils.capitalize(serviceMetaInfoFieldName), String.class);
final Invokable<ServiceMetaInfo, ?> setter = new TypeToken<ServiceMetaInfo>() { setterMethod.setAccessible(true);
}.method(setterMethod); fieldSetterMapping.put(serviceMetaInfoFieldName, setterMethod);
setter.setAccessible(true);
fieldSetterMapping.put(serviceMetaInfoFieldName, setter);
} catch (final NoSuchMethodException e) { } catch (final NoSuchMethodException e) {
throw new ModuleStartException("Initialize method error", e); throw new ModuleStartException("Initialize method error", e);
} }
...@@ -112,6 +114,7 @@ enum FieldsHelper { ...@@ -112,6 +114,7 @@ enum FieldsHelper {
* @throws Exception if failed to inflate the {@code serviceMetaInfo} * @throws Exception if failed to inflate the {@code serviceMetaInfo}
*/ */
public void inflate(final Struct metadata, final ServiceMetaInfo serviceMetaInfo) throws Exception { public void inflate(final Struct metadata, final ServiceMetaInfo serviceMetaInfo) throws Exception {
final Value empty = Value.newBuilder().setStringValue("-").build();
final Value root = Value.newBuilder().setStructValue(metadata).build(); final Value root = Value.newBuilder().setStructValue(metadata).build();
for (final Map.Entry<String, ServiceNameFormat> entry : fieldNameMapping.entrySet()) { for (final Map.Entry<String, ServiceNameFormat> entry : fieldNameMapping.entrySet()) {
final ServiceNameFormat serviceNameFormat = entry.getValue(); final ServiceNameFormat serviceNameFormat = entry.getValue();
...@@ -120,7 +123,7 @@ enum FieldsHelper { ...@@ -120,7 +123,7 @@ enum FieldsHelper {
final List<String> properties = serviceNameFormat.properties.get(i); final List<String> properties = serviceNameFormat.properties.get(i);
Value value = root; Value value = root;
for (final String property : properties) { for (final String property : properties) {
value = value.getStructValue().getFieldsOrThrow(property); value = value.getStructValue().getFieldsOrDefault(property, empty);
} }
values[i] = value.getStringValue(); values[i] = value.getStringValue();
} }
......
...@@ -44,9 +44,11 @@ import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInf ...@@ -44,9 +44,11 @@ import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInf
@Slf4j @Slf4j
public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
private static final String UPSTREAM_KEY = "wasm.upstream_peer"; public static final String UPSTREAM_KEY = "wasm.upstream_peer";
private static final String DOWNSTREAM_KEY = "wasm.downstream_peer"; public static final String DOWNSTREAM_KEY = "wasm.downstream_peer";
protected String fieldMappingFile = "metadata-service-mapping.yaml";
@Override @Override
public String name() { public String name() {
...@@ -56,7 +58,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { ...@@ -56,7 +58,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
@Override @Override
public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
try { try {
FieldsHelper.SINGLETON.init("metadata-service-mapping.yaml"); FieldsHelper.SINGLETON.init(fieldMappingFile);
} catch (final Exception e) { } catch (final Exception e) {
throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e); throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
} }
...@@ -74,7 +76,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { ...@@ -74,7 +76,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
} }
final ServiceMetaInfo currSvc; final ServiceMetaInfo currSvc;
try { try {
currSvc = new ServiceMetaInfoAdapter(identifier.getNode().getMetadata()); currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e); log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
return Collections.emptyList(); return Collections.emptyList();
...@@ -88,7 +90,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { ...@@ -88,7 +90,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
} }
final ServiceMetaInfo svc; final ServiceMetaInfo svc;
try { try {
svc = new ServiceMetaInfoAdapter(value); svc = adaptToServiceMetaInfo(value);
} catch (Exception e) { } catch (Exception e) {
log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray())); log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray()));
return; return;
...@@ -122,4 +124,12 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { ...@@ -122,4 +124,12 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
return result; return result;
} }
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
return new ServiceMetaInfoAdapter(value);
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception {
return new ServiceMetaInfoAdapter(identifier.getNode().getMetadata());
}
} }
...@@ -84,7 +84,7 @@ public class FieldsHelperTest { ...@@ -84,7 +84,7 @@ public class FieldsHelperTest {
final StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); final StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder); JsonFormat.parser().merge(isr, requestBuilder);
final ServiceMetaInfo info = new ServiceMetaInfo(); final ServiceMetaInfo info = new ServiceMetaInfo();
FieldsHelper.SINGLETON.init(new ByteArrayInputStream(mapping.getBytes())); FieldsHelper.SINGLETON.init(new ByteArrayInputStream(mapping.getBytes()), ServiceMetaInfo.class);
FieldsHelper.SINGLETON.inflate( FieldsHelper.SINGLETON.inflate(
requestBuilder.getIdentifier().getNode().getMetadata(), requestBuilder.getIdentifier().getNode().getMetadata(),
info info
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册