未验证 提交 de6a731d 编写于 作者: Z Zhenxu Ke 提交者: GitHub

Save Envoy http access logs when error occurs (#6322)

上级 5289996e
......@@ -39,6 +39,7 @@ Release Notes.
* Require Zipkin receiver must work with `zipkin-elasticsearch7` storage option.
* Fix `DatabaseSlowStatementBuilder` statement maybe null.
* Remove fields of parent entity in the relation sources.
* Save Envoy http access logs when error occurs.
#### UI
* Update selector scroller to show in all pages.
......
......@@ -173,6 +173,10 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | gRPCSslCertChainPath| The file path of gRPC SSL cert chain| SW_RECEIVER_GRPC_SSL_CERT_CHAIN_PATH | - |
| - | - | maxConcurrentCallsPerConnection | The maximum number of concurrent calls permitted for each incoming connection. Defaults to no limit. | SW_RECEIVER_GRPC_MAX_CONCURRENT_CALL | - |
| - | - | authentication | The token text for the authentication. Work for gRPC connection only. Once this is set, the client is required to use the same token. | SW_AUTHENTICATION | - |
| log-analyzer | default | Log Analyzer. | SW_LOG_ANALYZER | default |
| - | - | lalFiles | The LAL configuration file names (without file extension) to be activated. Read [LAL](../../concepts-and-designs/lal.md) for more details. | SW_LOG_LAL_FILES | default |
| - | - | malFiles | The MAL configuration file names (without file extension) to be activated. Read [LAL](../../concepts-and-designs/lal.md) for more details. | SW_LOG_MAL_FILES | "" |
| event-analyzer | default | Event Analyzer. | SW_EVENT_ANALYZER | default |
| receiver-register|default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-trace|default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-jvm| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
......
......@@ -70,9 +70,10 @@ You can use `kubectl -n istio-system logs -l app=skywalking | grep "K8sALSServic
## SkyWalking ALS Analyzers
There are two available analyzers, `k8s-mesh` and `mx-mesh`, you can specify one or more analyzers to analyze the access logs.
When multiple analyzers are specified, it acts as a fast-success mechanism: SkyWalking loops over the analyzers and use it to analyze the logs, once
there is an analyzer that is able to produce a result, it stops the loop.
There are several available analyzers, `k8s-mesh`, `mx-mesh` and `persistence`, you can specify one or more
analyzers to analyze the access logs. When multiple analyzers are specified, it acts as a fast-success mechanism:
SkyWalking loops over the analyzers and use it to analyze the logs, once there is an analyzer that is able to produce a
result, it stops the loop.
### `k8s-mesh`
......@@ -87,3 +88,20 @@ this analyzer requires Istio to enable the metadata exchange plugin (you can ena
or if you're using Istio 1.7+ and installing it with profile `demo`/`preview`, it should be enabled then).
The [blog](https://skywalking.apache.org/blog/obs-service-mesh-vm-with-sw-and-als/) illustrates the detail of how it works, and a step-by-step tutorial to apply it into the [Online Boutique](https://github.com/GoogleCloudPlatform/microservices-demo) system.
### `persistence`
`persistence` analyzer adapts the Envoy access log format to
SkyWalking's [native log format](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto)
, and forwards the formatted logs to [LAL](../../concepts-and-designs/lal.md), where you can configure persistent
conditions, such as `sampler`, only persist error logs, etc. SkyWalking provides a default configuration
file [`envoy-als.yaml`](../../../../oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml) that you can
adjust as per your needs. Please make sure to activate this rule via adding the rule name `envoy-als`
into config item `log-analyzer/default/lalFiles` (or environment variable `SW_LOG_LAL_FILES`,
e.g. `SW_LOG_LAL_FILES=envoy-als`).
**Attention**: because `persistence` analyzer also needs a mechanism to map the logs into responding services, hence,
you need to configure at least one of `k8s-mesh` or `mx-mesh` as its antecedent so that `persistence` analyzer knows
which service the logs belong to. For example, you should set `envoy-metric/default/alsHTTPAnalysis` (or environment
variable `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS`) to something like `k8s-mesh,persistence`, `mx-mesh,persistence`
or `mx-mesh,k8s-mesh,persistence`.
......@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import groovy.lang.Closure;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
......@@ -107,7 +109,17 @@ public class ExtractorSpec extends AbstractSpec {
.stream()
.filter(it -> isNotBlank(it.getKey()))
.filter(it -> nonNull(it.getValue()) && isNotBlank(Objects.toString(it.getValue())))
.map(it -> KeyStringValuePair.newBuilder().setKey(it.getKey()).setValue(Objects.toString(it.getValue())).build())
.map(it -> {
final Object val = it.getValue();
String valStr = Objects.toString(val);
if (Collection.class.isAssignableFrom(val.getClass())) {
valStr = Joiner.on(",").skipNulls().join((Collection<?>) val);
}
return KeyStringValuePair.newBuilder()
.setKey(it.getKey())
.setValue(valStr)
.build();
})
.collect(Collectors.toList())
)
);
......
......@@ -20,19 +20,14 @@ package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink;
import groovy.lang.Closure;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.RateLimitingSampler;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.Sampler;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SamplerSpec extends AbstractSpec {
private static final Logger LOGGER = LoggerFactory.getLogger(SamplerSpec.class);
private final Map<String, Sampler> samplers;
private final RateLimitingSampler.ResetHandler rlsResetHandler;
......@@ -49,25 +44,13 @@ public class SamplerSpec extends AbstractSpec {
if (BINDING.get().shouldAbort()) {
return;
}
final RateLimitingSampler newSampler = new RateLimitingSampler(rlsResetHandler);
cl.setDelegate(newSampler);
cl.call();
final Sampler sampler = samplers.computeIfAbsent(id, $ -> Sampler.NOOP);
if (Objects.equals(sampler, newSampler)) { // Unchanged
sampleWith(sampler);
return;
}
final Sampler sampler = samplers.computeIfAbsent(id, $ -> new RateLimitingSampler(rlsResetHandler).start());
try {
sampler.close();
} catch (final Exception e) {
LOGGER.error("Failed to cancel old sampler: {}", sampler, e);
}
samplers.put(id, newSampler.start());
cl.setDelegate(sampler);
cl.call();
sampleWith(newSampler);
sampleWith(sampler);
}
private void sampleWith(final Sampler sampler) {
......
......@@ -18,8 +18,8 @@
package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler;
import java.util.HashSet;
import java.util.Set;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
......@@ -69,7 +69,7 @@ public class RateLimitingSampler implements Sampler {
@Slf4j
public static class ResetHandler {
private final Set<Sampler> samplers = new HashSet<>();
private final List<Sampler> samplers = new ArrayList<>();
private volatile ScheduledFuture<?> future;
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rules:
- name: envoy-als
dsl: |
filter {
json {
}
// only collect abnormal logs (http status code >= 300, or commonProperties?.responseFlags is not empty)
if (parsed?.response?.responseCode < 400 && !parsed?.commonProperties?.responseFlags) {
abort {}
}
extractor {
tag 'status.code': parsed?.response?.responseCode as int
tag 'response.flag': parsed?.commonProperties?.responseFlags?.keySet()
}
sink {
sampler {
if (parsed?.commonProperties?.responseFlags) {
// use service:errorCode as sampler id so that each service:errorCode has its own sampler,
// e.g. checkoutservice:[upstreamConnectionFailure], checkoutservice:[upstreamRetryLimitExceeded]
rateLimit("${log.service}:${parsed?.commonProperties?.responseFlags?.keySet()}") {
qps 100
}
} else {
// use service:responseCode as sampler id so that each service:responseCode has its own sampler,
// e.g. checkoutservice:500, checkoutservice:404.
rateLimit("${log.service}:${parsed?.response?.responseCode}") {
qps 100
}
}
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.library.util;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
......@@ -25,7 +26,15 @@ import java.io.IOException;
public class ProtoBufJsonUtils {
public static String toJSON(Message sourceMessage) throws IOException {
return JsonFormat.printer().print(sourceMessage);
return JsonFormat.printer()
.usingTypeRegistry(
JsonFormat
.TypeRegistry
.newBuilder()
.add(BytesValue.getDescriptor())
.build()
)
.print(sourceMessage);
}
/**
......
......@@ -39,6 +39,11 @@
<artifactId>meter-analyzer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>log-analyzer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-mesh-receiver-plugin</artifactId>
......
......@@ -30,7 +30,6 @@ import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
......@@ -117,15 +116,11 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
List<ServiceMeshMetric.Builder> result = new ArrayList<>();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
final List<ServiceMeshMetric.Builder> result =
analysis.analysis(identifier, log, role);
if (CollectionUtils.isNotEmpty(result)) {
// Once the analysis has results, don't need to continue analysis in lower priority analyzers.
sourceResult.addAll(result);
break;
}
result = analysis.analysis(result, identifier, log, role);
}
sourceResult.addAll(result);
}
sourceDispatcherCounter.inc(sourceResult.size());
......
......@@ -36,6 +36,8 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
@Getter
private String k8sServiceNameRule;
private final ServiceMetaInfoFactory serviceMetaInfoFactory = new ServiceMetaInfoFactoryImpl();
public List<String> getAlsHTTPAnalysis() {
if (Strings.isNullOrEmpty(alsHTTPAnalysis)) {
return Collections.emptyList();
......@@ -46,4 +48,8 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
public List<Rule> rules() throws ModuleStartException {
return Rules.loadRules("envoy-metrics-rules", Collections.singletonList("envoy"));
}
public ServiceMetaInfoFactory serviceMetaInfoFactory() {
return serviceMetaInfoFactory;
}
}
......@@ -57,7 +57,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
try {
FieldsHelper.SINGLETON.init(fieldMappingFile);
FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz());
} catch (final Exception e) {
throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
}
......
......@@ -36,7 +36,6 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter;
import org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters.ProtoMetricFamily2MetricsAdapter;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
......@@ -50,7 +49,11 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
private final HistogramMetrics histogram;
private final List<PrometheusMetricConverter> converters;
private final EnvoyMetricReceiverConfig config;
public MetricServiceGRPCHandler(final ModuleManager moduleManager, final EnvoyMetricReceiverConfig config) throws ModuleStartException {
this.config = config;
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
......@@ -86,7 +89,7 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
if (isFirst) {
isFirst = false;
service = new ServiceMetaInfoAdapter(message.getIdentifier().getNode().getMetadata());
service = config.serviceMetaInfoFactory().fromStruct(message.getIdentifier().getNode().getMetadata());
}
if (log.isDebugEnabled()) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy;
import com.google.protobuf.Struct;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
/**
* Factory to create {@link ServiceMetaInfo} instances from Kubernetes Pods, Envoy access log metadata, etc.
*/
public interface ServiceMetaInfoFactory {
/**
* @return the {@link Class} of the {@link ServiceMetaInfo} implementation.
*/
Class<? extends ServiceMetaInfo> clazz();
/**
* @return an UNKNOWN instance of {@link ServiceMetaInfo}.
*/
ServiceMetaInfo unknown();
/**
* Create an instance of {@link ServiceMetaInfo} from the given {@link Struct protobuf struct}.
*/
ServiceMetaInfo fromStruct(final Struct struct) throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy;
import com.google.protobuf.Struct;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter;
public class ServiceMetaInfoFactoryImpl implements ServiceMetaInfoFactory {
@Override
public Class<? extends ServiceMetaInfo> clazz() {
return ServiceMetaInfo.class;
}
@Override
public ServiceMetaInfo unknown() {
return new ServiceMetaInfo("UNKNOWN", "UNKNOWN");
}
@Override
public ServiceMetaInfo fromStruct(final Struct struct) throws Exception {
return new ServiceMetaInfoAdapter(struct);
}
}
......@@ -34,7 +34,23 @@ public interface ALSHTTPAnalysis {
void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
List<ServiceMeshMetric.Builder> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role);
/**
* The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one.
*
* To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
*
* @param result of the previous analyzer.
* @param identifier of the Envoy node where the logs are emitted.
* @param entry the log entry.
* @param role the role of the Envoy node where the logs are emitted.
* @return the analysis results.
*/
List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
);
Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
}
......@@ -32,10 +32,10 @@ public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
if (alsIdentifier == null) {
return defaultRole;
}
final Node node = alsIdentifier.getNode();
if (node == null) {
if (!alsIdentifier.hasNode()) {
return defaultRole;
}
final Node node = alsIdentifier.getNode();
final String id = node.getId();
if (id.startsWith("router~")) {
return Role.PROXY;
......
......@@ -131,7 +131,12 @@ public class LogEntry2MetricsAdapter {
}
protected String endpoint() {
return ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/");
if (!entry.hasRequest()) {
return "/";
}
final HTTPRequestProperties request = entry.getRequest();
final String method = request.getRequestMethod().name();
return method + ":" + request.getPath();
}
protected static long formatAsLong(final Timestamp timestamp) {
......
......@@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
/**
......@@ -57,7 +58,15 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
}
@Override
public List<ServiceMeshMetric.Builder> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) {
public List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
if (isNotEmpty(result)) {
return result;
}
if (serviceRegistry.isEmpty()) {
return Collections.emptyList();
}
......
......@@ -54,10 +54,6 @@ public enum FieldsHelper {
*/
private Map<String, Method> fieldSetterMapping;
public void init(final String file) throws Exception {
init(ResourceUtils.readToStream(file), ServiceMetaInfo.class);
}
public void init(final String file,
final Class<? extends ServiceMetaInfo> serviceInfoClass) throws Exception {
init(ResourceUtils.readToStream(file), serviceInfoClass);
......@@ -127,7 +123,10 @@ public enum FieldsHelper {
}
values[i] = value.getStringValue();
}
fieldSetterMapping.get(entry.getKey()).invoke(serviceMetaInfo, Strings.lenientFormat(serviceNameFormat.format, values));
final String value = Strings.lenientFormat(serviceNameFormat.format, values);
if (!Strings.isNullOrEmpty(value)) {
fieldSetterMapping.get(entry.getKey()).invoke(serviceMetaInfo, value);
}
}
}
......
......@@ -23,7 +23,6 @@ import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
......@@ -38,6 +37,7 @@ import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN;
......@@ -50,6 +50,8 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
protected String fieldMappingFile = "metadata-service-mapping.yaml";
protected EnvoyMetricReceiverConfig config;
@Override
public String name() {
return "mx-mesh";
......@@ -57,21 +59,30 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
@Override
public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
this.config = config;
try {
FieldsHelper.SINGLETON.init(fieldMappingFile);
FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz());
} catch (final Exception e) {
throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
}
}
@Override
public List<ServiceMeshMetric.Builder> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) {
final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
public List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
if (isNotEmpty(result)) {
return result;
}
if (!entry.hasCommonProperties()) {
return Collections.emptyList();
}
final AccessLogCommon properties = entry.getCommonProperties();
final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
if (stateMap == null) {
if (stateMap.isEmpty()) {
return Collections.emptyList();
}
final ServiceMetaInfo currSvc;
......@@ -82,7 +93,6 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
return Collections.emptyList();
}
final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
......@@ -129,7 +139,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception {
return new ServiceMetaInfoAdapter(identifier.getNode().getMetadata());
return config.serviceMetaInfoFactory().fromStruct(identifier.getNode().getMetadata());
}
}
......@@ -83,7 +83,7 @@ public class ServiceMetaInfoAdapter extends ServiceMetaInfo {
* @param node the flat buffer node where to extract the metadata
* @return the metadata {@link Struct}
*/
protected Struct extractStructFromNodeFlatBuffer(final FlatNode node) {
public static Struct extractStructFromNodeFlatBuffer(final FlatNode node) {
final Struct.Builder builder = Struct.newBuilder();
builder.putFields("NAME", Value.newBuilder().setStringValue(nullToEmpty(node.name())).build());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.persistence;
import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
* {@code LogsPersistence} analyzes the error logs and persists them to the log system.
*/
@Slf4j
public class LogsPersistence implements ALSHTTPAnalysis {
private ILogAnalyzerService logAnalyzerService;
@Override
public String name() {
return "persistence";
}
@Override
public void init(final ModuleManager manager, final EnvoyMetricReceiverConfig config) throws ModuleStartException {
logAnalyzerService = manager.find(LogAnalyzerModule.NAME)
.provider()
.getService(ILogAnalyzerService.class);
}
@Override
public List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
try {
result.stream()
.findFirst()
.ifPresent(metrics -> {
try {
final LogData logData = convertToLogData(entry, metrics);
logAnalyzerService.doAnalysis(logData);
} catch (IOException e) {
log.error(
"Failed to parse error log entry to log data: {}",
TextFormat.shortDebugString(entry),
e
);
}
});
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
return result;
}
@Override
public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role prev) {
return prev;
}
public LogData convertToLogData(final HTTPAccessLogEntry logEntry,
final ServiceMeshMetric.Builder metrics) throws IOException {
final boolean isServerSide = metrics.getDetectPoint() == DetectPoint.server;
final String svc = isServerSide ? metrics.getDestServiceName() : metrics.getSourceServiceName();
final String svcInst = isServerSide ? metrics.getDestServiceInstance() : metrics.getSourceServiceInstance();
return LogData
.newBuilder()
.setService(svc)
.setServiceInstance(svcInst)
.setEndpoint(metrics.getEndpoint())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
.newBuilder()
.setJson(
JSONLog
.newBuilder()
.setJson(toJSON(logEntry))
)
)
.build();
}
}
......@@ -19,3 +19,4 @@
org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis
org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer
org.apache.skywalking.oap.server.receiver.envoy.persistence.LogsPersistence
......@@ -23,6 +23,7 @@ import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
......@@ -77,7 +78,7 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
Assert.assertEquals(2, result.size());
......@@ -99,7 +100,7 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
Assert.assertEquals(1, result.size());
......@@ -116,7 +117,7 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
Assert.assertEquals(1, result.size());
......@@ -133,7 +134,7 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
Assert.assertEquals(1, result.size());
......
......@@ -15,4 +15,4 @@
endpoints:
- key: not null
label: /details/0
label: GET:/details/0
......@@ -15,4 +15,4 @@
endpoints:
- key: not null
label: /productpage
label: GET:/productpage
......@@ -15,4 +15,4 @@
endpoints:
- key: not null
label: /ratings/0
label: GET:/ratings/0
......@@ -15,4 +15,4 @@
endpoints:
- key: not null
label: /reviews/0
label: GET:/reviews/0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册