未验证 提交 3b15f8d1 编写于 作者: Z Zhenxu Ke 提交者: GitHub

feature: Envoy access log receiver supports TCP logs (#6727)

上级 c2399c09
......@@ -45,7 +45,7 @@ jobs:
with:
languages: ${{ matrix.language }}
- run: ./mvnw -Dmaven.test.skip=true clean install
- run: ./mvnw -Dmaven.test.skip=true clean install || ./mvnw -Dmaven.test.skip=true clean install
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
......@@ -100,6 +100,7 @@ jobs:
--set elasticsearch.minimumMasterNodes=1 \
--set elasticsearch.imageTag=7.5.1 \
--set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=${{ matrix.analyzer }} \
--set oap.env.SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=${{ matrix.analyzer }} \
--set oap.env.K8S_SERVICE_NAME_RULE='e2e::${service.metadata.name}' \
--set oap.envoy.als.enabled=true \
--set oap.replicas=1 \
......@@ -117,7 +118,13 @@ jobs:
- name: Deploy demo services
if: env.SKIP_CI != 'true'
run: bash ${SCRIPTS_DIR}/demo.sh
run: |
bash ${SCRIPTS_DIR}/demo.sh
# Enable TCP services
kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/platform/kube/bookinfo-ratings-v2.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/platform/kube/bookinfo-db.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/networking/destination-rule-all.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/networking/virtual-service-ratings-db.yaml
- name: Cluster Info
if: ${{ failure() }}
......
......@@ -34,6 +34,7 @@ Release Notes.
* CVE: fix Jetty vulnerability. https://nvd.nist.gov/vuln/detail/CVE-2019-17638
* Fix: MAL function would miss samples name after creating new samples.
* perf: use iterator.remove() to remove modulesWithoutProvider
* Support analyzing Envoy TCP access logs.
#### UI
* Add logo for kong plugin.
......
Subproject commit 9a689b0188cbdc7bd8d6ddd99b4ad5283e82fe88
Subproject commit 7da226cfced7fa4eb91c6528e8c30827288531a0
......@@ -30,6 +30,8 @@ This calculates the metrics data from each request of the service.
| type | The type of each request. Such as: Database, HTTP, RPC, gRPC. | | enum |
| tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List<String>` |
| sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string|
| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long |
| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long |
### SCOPE `ServiceInstance`
......@@ -47,6 +49,8 @@ This calculates the metrics data from each request of the service instance.
| type | The type of each request, such as Database, HTTP, RPC, or gRPC. | | enum |
| tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List<String>` |
| sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string|
| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long |
| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long |
#### Secondary scopes of `ServiceInstance`
......@@ -120,6 +124,8 @@ This calculates the metrics data from each request of the endpoint in the servic
| type | The type of each request, such as Database, HTTP, RPC, or gRPC. | | enum |
| tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List<String>` |
| sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string|
| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long |
| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long |
### SCOPE `ServiceRelation`
......@@ -142,7 +148,8 @@ This calculates the metrics data from each request between services.
| detectPoint | Where the relation is detected. The value may be client, server, or proxy. | yes | enum|
| tlsMode | The TLS mode between source and destination services, such as `service_relation_mtls_cpm = from(ServiceRelation.*).filter(tlsMode == "mTLS").cpm()` || string|
| sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string|
| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long |
| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long |
### SCOPE `ServiceInstanceRelation`
......@@ -165,6 +172,8 @@ This calculates the metrics data from each request between service instances.
| detectPoint | Where the relation is detected. The value may be client, server, or proxy. | yes | enum|
| tlsMode | The TLS mode between source and destination service instances, such as `service_instance_relation_mtls_cpm = from(ServiceInstanceRelation.*).filter(tlsMode == "mTLS").cpm()` || string|
| sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string|
| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long |
| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long |
### SCOPE `EndpointRelation`
......
......@@ -191,7 +191,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| service-mesh| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| envoy-metric| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| - | - | acceptMetricsService | Open Envoy Metrics Service analysis | SW_ENVOY_METRIC_SERVICE | true|
| - | - | alsHTTPAnalysis | Open Envoy Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS | - |
| - | - | alsHTTPAnalysis | Open Envoy HTTP Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS | - |
| - | - | alsTCPAnalysis | Open Envoy TCP Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_TCP_ANALYSIS | - |
| - | - | k8sServiceNameRule | `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata, the available variables are `pod`, `service`, e.g., you can use `${service.metadata.name}-${pod.metadata.labels.version}` to append the version number to the service name. Be careful, when using environment variables to pass this configuration, use single quotes(`''`) to avoid it being evaluated by the shell. | - |
| receiver-otel | default | Read [receiver doc](backend-receivers.md) for more details | - | - |
| - | - | enabledHandlers|Enabled handlers for otel| SW_OTEL_RECEIVER_ENABLED_HANDLERS | - |
......
# Observe Service Mesh through ALS
[Envoy Access Log Service (ALS)](https://www.envoyproxy.io/docs/envoy/latest/api-v2/service/accesslog/v2/als.proto) provides
[Envoy Access Log Service (ALS)](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/service/accesslog/v2/als.proto) provides
full logs about RPC routed, including HTTP and TCP.
## Background
......@@ -29,8 +29,9 @@ On Istio version 1.6.0+, if Istio is installed with [`demo` profile](https://ist
- Activate SkyWalking [Envoy Receiver](../backend/backend-receivers.md). This is activated by default.
- Choose an ALS analyzer. There are two available analyzers, `k8s-mesh` and `mx-mesh`.
Set the system environment variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh`
- Choose an ALS analyzer. There are two available analyzers, `k8s-mesh` and `mx-mesh` for both HTTP access logs and TCP access logs.
Set the system environment variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** and **SW_ENVOY_METRIC_ALS_TCP_ANALYSIS**
such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=mx-mesh`, `SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=mx-mesh`
or in the `application.yaml` to activate the analyzer. For more about the analyzers, see [SkyWalking ALS Analyzers](#skywalking-als-analyzers)
```yaml
......@@ -39,6 +40,7 @@ On Istio version 1.6.0+, if Istio is installed with [`demo` profile](https://ist
default:
acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true}
alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""} # Setting the system env variable would override this.
alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""}
```
To use multiple analyzers as a fallback,please use `,` to concatenate.
......@@ -61,12 +63,13 @@ helm repo add elastic https://helm.elastic.co
helm dep up skywalking
helm install 8.1.0 skywalking -n istio-system \
--set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \
--set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=mx-mesh \
--set oap.env.SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=mx-mesh \
--set fullnameOverride=skywalking \
--set oap.envoy.als.enabled=true
```
You can use `kubectl -n istio-system logs -l app=skywalking | grep "K8sALSServiceMeshHTTPAnalysis"` to ensure OAP ALS `k8s-mesh` analyzer has been activated.
You can use `kubectl -n istio-system logs -l app=skywalking | grep "K8sALSServiceMeshHTTPAnalysis"` to ensure OAP ALS `mx-mesh` analyzer has been activated.
## SkyWalking ALS Analyzers
......
# Sending Envoy Metrics to SkyWalking OAP Server Example
This is an example of sending [Envoy Stats](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/observability/statistics#arch-overview-statistics) to SkyWalking OAP server
through [Metric Service](https://www.envoyproxy.io/docs/envoy/latest/api-v2/config/metrics/v2/metrics_service.proto).
through [Metric Service](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/config/metrics/v2/metrics_service.proto).
## Running the example
......
......@@ -6,15 +6,15 @@ SkyWalking has a built-in receiver that implements this protocol so that you can
As an APM system, SkyWalking does not only receive and store the metrics emitted by Envoy, it also analyzes the topology of services and service instances.
**Attention:** There are two versions of Envoy metrics service protocol up to date,
[v2](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) and
[v3](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/metrics/v3/metrics_service.proto), SkyWalking (8.3.0+) supports both of them.
[v2](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) and
[v3](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v3/config/metrics/v3/metrics_service.proto), SkyWalking (8.3.0+) supports both of them.
## Configure Envoy to send metrics to SkyWalking without Istio
Envoy can be used with / without Istio's control. This section introduces how to configure the standalone Envoy to send the metrics to SkyWalking.
In order to let Envoy send metrics to SkyWalking, we need to feed Envoy with a configuration which contains `stats_sinks` that includes `envoy.metrics_service`.
This `envoy.metrics_service` should be configured as a [`config.grpc_service`](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) entry.
This `envoy.metrics_service` should be configured as a [`config.grpc_service`](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) entry.
The interesting parts of the config is shown in the config below:
......
......@@ -327,6 +327,7 @@ envoy-metric:
default:
acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true}
alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""}
alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""}
# `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata,
# the available variables are `pod`, `service`, f.e., you can use `${service.metadata.name}-${pod.metadata.labels.version}`
# to append the version number to the service name.
......
......@@ -359,6 +359,9 @@ seata:
MyBatis:
id: 109
languages: Java
tcp:
id: 110
languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
......
......@@ -16,6 +16,9 @@
*
*/
// For services using protocols HTTP 1/2, gRPC, RPC, etc., the cpm metrics means "calls per minute",
// for services that are built on top of TCP, the cpm means "packages per minute".
// All scope metrics
all_percentile = from(All.latency).percentile(10); // Multiple values including p50, p75, p90, p95, p99
all_heatmap = from(All.latency).histogram(100, 20);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// TCP services' metrics
service_throughput_received = from(Service.tcpInfo.receivedBytes).filter(type == RequestType.TCP).longAvg();
service_throughput_sent = from(Service.tcpInfo.sentBytes).filter(type == RequestType.TCP).longAvg();
service_relation_client_received = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg();
service_relation_client_sent = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg();
service_relation_server_received = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg();
service_relation_server_sent = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg();
service_instance_throughput_received = from(ServiceInstance.tcpInfo.receivedBytes).filter(type == RequestType.TCP).longAvg();
service_instance_throughput_sent = from(ServiceInstance.tcpInfo.sentBytes).filter(type == RequestType.TCP).longAvg();
service_instance_relation_client_received = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg();
service_instance_relation_client_sent = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg();
service_instance_relation_server_received = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg();
service_instance_relation_server_sent = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg();
......@@ -45,7 +45,8 @@ templates:
"queryMetricType": "sortMetrics",
"chartType": "ChartSlow",
"parentService": false,
"unit": "CPM - calls per minute"
"unit": "CPM / PPM",
"tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)"
},
{
"width": 3,
......@@ -168,7 +169,8 @@ templates:
"metricName": "service_cpm",
"queryMetricType": "readMetricsValue",
"chartType": "ChartNum",
"unit": "CPM - calls per minute"
"unit": "CPM / PPM",
"tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)"
},
{
"width": 3,
......@@ -221,10 +223,24 @@ templates:
"metricName": "service_cpm",
"queryMetricType": "readMetricsValues",
"chartType": "ChartLine",
"unit": "CPM - calls per minute"
"unit": "CPM / PPM",
"tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)"
},
{
"width": "4",
"width": 3,
"title": "Service Throughput",
"height": "200",
"entityType": "Service",
"independentSelector": false,
"metricType": "REGULAR_VALUE",
"metricName": "service_throughput_received,service_throughput_sent",
"queryMetricType": "readMetricsValues",
"chartType": "ChartLine",
"unit": "Bytes",
"tips": "This metrics is only avaible for TCP services"
},
{
"width": "3",
"title": "Service Instances Load",
"height": "280",
"entityType": "ServiceInstance",
......@@ -234,10 +250,11 @@ templates:
"queryMetricType": "sortMetrics",
"chartType": "ChartSlow",
"parentService": true,
"unit": "CPM - calls per minute"
"unit": "CPM / PPM",
"tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)"
},
{
"width": "4",
"width": "3",
"title": "Slow Service Instance",
"height": "280",
"entityType": "ServiceInstance",
......@@ -250,7 +267,7 @@ templates:
"unit": "ms"
},
{
"width": "4",
"width": "3",
"title": "Service Instance Successful Rate",
"height": "280",
"entityType": "ServiceInstance",
......@@ -271,21 +288,34 @@ templates:
"name": "Instance",
"children": [
{
"width": "4",
"width": "3",
"title": "Service Instance Load",
"height": "150",
"height": "200",
"entityType": "ServiceInstance",
"independentSelector": false,
"metricType": "REGULAR_VALUE",
"metricName": "service_instance_cpm",
"queryMetricType": "readMetricsValues",
"chartType": "ChartLine",
"unit": "CPM - calls per minute"
"unit": "CPM / PPM",
"tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)"
},
{
"width": "4",
"width": 3,
"title": "Service Instance Throughput",
"height": "200",
"entityType": "ServiceInstance",
"independentSelector": false,
"metricType": "REGULAR_VALUE",
"metricName": "service_instance_throughput_received,service_instance_throughput_sent",
"queryMetricType": "readMetricsValues",
"chartType": "ChartLine",
"unit": "Bytes"
},
{
"width": "3",
"title": "Service Instance Successful Rate",
"height": "150",
"height": "200",
"entityType": "ServiceInstance",
"independentSelector": false,
"metricType": "REGULAR_VALUE",
......@@ -297,9 +327,9 @@ templates:
"aggregationNum": "100"
},
{
"width": "4",
"width": "3",
"title": "Service Instance Latency",
"height": "150",
"height": "200",
"entityType": "ServiceInstance",
"independentSelector": false,
"metricType": "REGULAR_VALUE",
......@@ -434,7 +464,8 @@ templates:
"queryMetricType": "sortMetrics",
"chartType": "ChartSlow",
"parentService": true,
"unit": "CPM - calls per minute"
"unit": "CPM / PPM",
"tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)"
},
{
"width": "4",
......@@ -525,4 +556,4 @@ templates:
# False means providing a basic template, user needs to add it manually.
activated: true
# True means wouldn't show up on the dashboard. Only keeps the definition in the storage.
disabled: false
\ No newline at end of file
disabled: false
......@@ -33,11 +33,17 @@ import static java.util.Objects.requireNonNull;
public abstract class OALDefine {
protected OALDefine(final String configFile,
final String sourcePackage) {
this(configFile, sourcePackage, sourcePackage);
}
protected OALDefine(final String configFile,
final String sourcePackage,
final String dispatcherPackage) {
this.configFile = requireNonNull(configFile);
this.sourcePackage = appendPoint(requireNonNull(sourcePackage));
this.dynamicMetricsClassPackage = appendPoint(sourcePackage + ".oal.rt.metrics");
this.dynamicMetricsBuilderClassPackage = appendPoint(sourcePackage + ".oal.rt.metrics.builder");
this.dynamicDispatcherClassPackage = appendPoint(sourcePackage + ".oal.rt.dispatcher");
this.dynamicDispatcherClassPackage = appendPoint(dispatcherPackage + ".oal.rt.dispatcher");
}
private final String configFile;
......
......@@ -29,5 +29,6 @@ public enum RequestType {
/**
* Logic request only.
*/
LOGIC
LOGIC,
TCP
}
......@@ -72,4 +72,7 @@ public class Service extends Source {
@Getter
@Setter
private SideCar sideCar = new SideCar();
@Getter
@Setter
private TCPInfo tcpInfo = new TCPInfo();
}
......@@ -76,6 +76,10 @@ public class ServiceInstance extends Source {
@Setter
private SideCar sideCar = new SideCar();
@Getter
@Setter
private TCPInfo tcpInfo = new TCPInfo();
@Override
public void prepare() {
serviceId = IDManager.ServiceID.buildId(serviceName, nodeType);
......
......@@ -105,6 +105,9 @@ public class ServiceInstanceRelation extends Source {
@Getter
@Setter
private SideCar sideCar = new SideCar();
@Getter
@Setter
private TCPInfo tcpInfo = new TCPInfo();
@Override
public void prepare() {
......
......@@ -99,6 +99,9 @@ public class ServiceRelation extends Source {
@Getter
@Setter
private SideCar sideCar = new SideCar();
@Getter
@Setter
private TCPInfo tcpInfo = new TCPInfo();
@Override
public void prepare() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@NoArgsConstructor
@AllArgsConstructor
public class TCPInfo {
@Getter
@Setter
private long receivedBytes;
@Getter
@Setter
private long sentBytes;
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.envoy;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
......@@ -32,6 +33,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
......@@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory;
public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
private final List<TCPAccessLogAnalyzer> envoyTCPAnalysisList;
private final CounterMetrics counter;
private final HistogramMetrics histogram;
......@@ -51,6 +54,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
public AccessLogServiceGRPCHandler(ModuleManager manager,
EnvoyMetricReceiverConfig config) throws ModuleStartException {
ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
ServiceLoader<TCPAccessLogAnalyzer> alsTcpAnalyzers = ServiceLoader.load(TCPAccessLogAnalyzer.class);
envoyHTTPAnalysisList = new ArrayList<>();
for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
......@@ -60,8 +64,17 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
}
}
}
envoyTCPAnalysisList = new ArrayList<>();
for (String analyzerName : config.getAlsTCPAnalysis()) {
for (TCPAccessLogAnalyzer tcpAnalyzer : alsTcpAnalyzers) {
if (analyzerName.equals(tcpAnalyzer.name())) {
tcpAnalyzer.init(manager, config);
envoyTCPAnalysisList.add(tcpAnalyzer);
}
}
}
LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
LOGGER.debug("envoy HTTP analysis: {}, envoy TCP analysis: {}", envoyHTTPAnalysisList, envoyTCPAnalysisList);
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
counter = metricCreator.createCounter(
......@@ -110,11 +123,11 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
.getId(), role, logCase, message);
}
List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
switch (logCase) {
case HTTP_LOGS:
StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
List<ServiceMeshMetric.Builder> result = new ArrayList<>();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
......@@ -123,10 +136,22 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
sourceResult.addAll(result);
}
sourceDispatcherCounter.inc(sourceResult.size());
sourceResult.forEach(TelemetryDataDispatcher::process);
break;
case TCP_LOGS:
StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
List<ServiceMeshMetric.Builder> result = new ArrayList<>();
for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
result = analyzer.analysis(result, identifier, tcpLog, role);
}
sourceResult.addAll(result);
}
break;
}
sourceDispatcherCounter.inc(sourceResult.size());
sourceResult.forEach(TelemetryDataDispatcher::process);
} finally {
timer.finish();
}
......
......@@ -33,6 +33,7 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
@Getter
private boolean acceptMetricsService = false;
private String alsHTTPAnalysis;
private String alsTCPAnalysis; // TODO: add to doc
@Getter
private String k8sServiceNameRule;
......@@ -45,6 +46,13 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig {
return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
}
public List<String> getAlsTCPAnalysis() {
if (Strings.isNullOrEmpty(alsTCPAnalysis)) {
return Collections.emptyList();
}
return Arrays.stream(alsTCPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList());
}
public List<Rule> rules() throws ModuleStartException {
return Rules.loadRules("envoy-metrics-rules", Collections.singletonList("envoy"));
}
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.receiver.envoy;
import org.apache.skywalking.aop.server.receiver.mesh.MeshReceiverModule;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
......@@ -65,6 +66,13 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
if (!config.getAlsTCPAnalysis().isEmpty()) {
getManager().find(CoreModule.NAME)
.provider()
.getService(OALEngineLoaderService.class)
.load(TCPOALDefine.INSTANCE);
}
GRPCHandlerRegister service = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.envoy;
import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
/**
* OAL rules to calculate TCP-specific metrics.
*/
public class TCPOALDefine extends OALDefine {
public static final TCPOALDefine INSTANCE = new TCPOALDefine();
private TCPOALDefine() {
super(
"oal/tcp.oal",
"org.apache.skywalking.oap.server.core.source",
"org.apache.skywalking.oap.server.core.source.tcp"
);
}
}
......@@ -19,38 +19,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.List;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
/**
* Analysis source metrics from ALS
*/
public interface ALSHTTPAnalysis {
String name();
void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
/**
* The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one.
*
* To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
*
* @param result of the previous analyzer.
* @param identifier of the Envoy node where the logs are emitted.
* @param entry the log entry.
* @param role the role of the Envoy node where the logs are emitted.
* @return the analysis results.
*/
List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
);
Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
public interface ALSHTTPAnalysis extends AccessLogAnalyzer<HTTPAccessLogEntry> {
}
......@@ -18,33 +18,13 @@
package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
@Slf4j
public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
@Override
public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
if (alsIdentifier == null) {
return defaultRole;
}
if (!alsIdentifier.hasNode()) {
return defaultRole;
}
final Node node = alsIdentifier.getNode();
final String id = node.getId();
if (id.startsWith("router~")) {
return Role.PROXY;
} else if (id.startsWith("sidecar~")) {
return Role.SIDECAR;
}
return defaultRole;
}
/**
* Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.List;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
public interface AccessLogAnalyzer<E> {
String name();
void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
/**
* The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one.
*
* To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty.
*
* @param result of the previous analyzer.
* @param identifier of the Envoy node where the logs are emitted.
* @param entry the log entry.
* @param role the role of the Envoy node where the logs are emitted.
* @return the analysis results.
*/
List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final E entry,
final Role role
);
default Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role defaultRole) {
if (alsIdentifier == null) {
return defaultRole;
}
if (!alsIdentifier.hasNode()) {
return defaultRole;
}
final Node node = alsIdentifier.getNode();
final String id = node.getId();
if (id.startsWith("router~")) {
return Role.PROXY;
} else if (id.startsWith("sidecar~")) {
return Role.SIDECAR;
}
return defaultRole;
}
}
......@@ -135,15 +135,15 @@ public class LogEntry2MetricsAdapter {
return method + ":" + request.getPath();
}
protected static long formatAsLong(final Timestamp timestamp) {
public static long formatAsLong(final Timestamp timestamp) {
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
}
protected static long formatAsLong(final Duration duration) {
public static long formatAsLong(final Duration duration) {
return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
}
protected static Protocol requestProtocol(final HTTPRequestProperties request) {
public static Protocol requestProtocol(final HTTPRequestProperties request) {
if (request == null) {
return Protocol.HTTP;
}
......@@ -154,7 +154,7 @@ public class LogEntry2MetricsAdapter {
return Protocol.gRPC;
}
protected static String parseTLS(final TLSProperties properties) {
public static String parseTLS(final TLSProperties properties) {
if (properties == null) {
return NON_TLS;
}
......@@ -175,7 +175,7 @@ public class LogEntry2MetricsAdapter {
* @param responseFlags in the ALS v2
* @return empty string if no internal error code, or literal string representing the code.
*/
protected static String parseInternalErrorCode(final ResponseFlags responseFlags) {
public static String parseInternalErrorCode(final ResponseFlags responseFlags) {
if (responseFlags != null) {
if (responseFlags.getFailedLocalHealthcheck()) {
return "failed_local_healthcheck";
......
......@@ -214,17 +214,13 @@ public class K8SServiceRegistry {
}
protected void addPod(final V1Pod pod) {
ofNullable(pod.getStatus()).ifPresent(
status -> ipPodMap.put(status.getPodIP(), pod)
);
ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(podIP -> ipPodMap.put(podIP, pod));
recompose();
}
protected void removePod(final V1Pod pod) {
ofNullable(pod.getStatus()).ifPresent(
status -> ipPodMap.remove(status.getPodIP())
);
ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(ipPodMap::remove);
}
protected void addEndpoints(final V1Endpoints endpoints) {
......@@ -239,7 +235,7 @@ public class K8SServiceRegistry {
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ipServiceMap.put(address.getIp(), namespace + ":" + name)
address -> ofNullable(address.getIp()).ifPresent(ip -> ipServiceMap.put(ip, namespace + ":" + name))
))
));
......@@ -249,7 +245,7 @@ public class K8SServiceRegistry {
protected void removeEndpoints(final V1Endpoints endpoints) {
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ipServiceMap.remove(address.getIp())
address -> ofNullable(address.getIp()).ifPresent(ipServiceMap::remove)
))
));
}
......@@ -264,7 +260,7 @@ public class K8SServiceRegistry {
.collect(Collectors.toList());
}
protected ServiceMetaInfo findService(final String ip) {
public ServiceMetaInfo findService(final String ip) {
final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip);
if (isNull(service)) {
log.debug("Unknown ip {}, ip -> service is null", ip);
......@@ -311,7 +307,7 @@ public class K8SServiceRegistry {
});
}
protected boolean isEmpty() {
public boolean isEmpty() {
return ipServiceMetaInfoMap.isEmpty();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
@Slf4j
public abstract class AbstractTCPAccessLogAnalyzer implements TCPAccessLogAnalyzer {
/**
* Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
*
* @param entry the access log entry that is to be adapted from.
* @param sourceService the source service.
* @param targetService the target/destination service.
* @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
*/
protected TCPLogEntry2MetricsAdapter newAdapter(
final TCPAccessLogEntry entry,
final ServiceMetaInfo sourceService,
final ServiceMetaInfo targetService) {
return new TCPLogEntry2MetricsAdapter(entry, sourceService, targetService);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
public interface TCPAccessLogAnalyzer extends AccessLogAnalyzer<TCPAccessLogEntry> {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als.tcp;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.ConnectionProperties;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.apm.network.servicemesh.v3.TCPInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import static org.apache.skywalking.apm.util.StringUtil.isBlank;
import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.formatAsLong;
import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseInternalErrorCode;
import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseTLS;
/**
* Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders.
*/
@RequiredArgsConstructor
public class TCPLogEntry2MetricsAdapter {
/**
* The access log entry that is to be adapted into metrics builders.
*/
protected final TCPAccessLogEntry entry;
protected final ServiceMetaInfo sourceService;
protected final ServiceMetaInfo targetService;
/**
* Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}.
*
* @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
*/
public ServiceMeshMetric.Builder adaptToDownstreamMetrics() {
final AccessLogCommon properties = entry.getCommonProperties();
final long startTime = formatAsLong(properties.getStartTime());
final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
return adaptCommonPart()
.setStartTime(startTime)
.setEndTime(startTime + duration)
.setLatency((int) Math.max(1L, duration))
.setDetectPoint(DetectPoint.server);
}
/**
* Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}.
*
* @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
*/
public ServiceMeshMetric.Builder adaptToUpstreamMetrics() {
final AccessLogCommon properties = entry.getCommonProperties();
final long startTime = formatAsLong(properties.getStartTime());
final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte());
final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte());
return adaptCommonPart()
.setStartTime(outboundStartTime)
.setEndTime(outboundEndTime)
.setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime))
.setDetectPoint(DetectPoint.client);
}
protected ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties();
final ConnectionProperties connectionProperties = entry.getConnectionProperties();
final String tlsMode = parseTLS(properties.getTlsProperties());
final String internalErrorCode = parseInternalErrorCode(properties.getResponseFlags());
final ServiceMeshMetric.Builder builder =
ServiceMeshMetric.newBuilder()
.setTlsMode(tlsMode)
.setProtocol(Protocol.TCP)
.setStatus(isBlank(internalErrorCode))
.setTcp(
TCPInfo.newBuilder()
.setReceivedBytes(connectionProperties.getReceivedBytes())
.setSentBytes(connectionProperties.getSentBytes())
)
.setInternalErrorCode(internalErrorCode);
Optional.ofNullable(sourceService)
.map(ServiceMetaInfo::getServiceName)
.ifPresent(builder::setSourceServiceName);
Optional.ofNullable(sourceService)
.map(ServiceMetaInfo::getServiceInstanceName)
.ifPresent(builder::setSourceServiceInstance);
Optional.ofNullable(targetService)
.map(ServiceMetaInfo::getServiceName)
.ifPresent(builder::setDestServiceName);
Optional.ofNullable(targetService)
.map(ServiceMetaInfo::getServiceInstanceName)
.ifPresent(builder::setDestServiceInstance);
return builder;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SServiceRegistry;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
/**
* Analysis log based on ingress and mesh scenarios.
*/
@Slf4j
public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
protected K8SServiceRegistry serviceRegistry;
@Override
public String name() {
return "k8s-mesh";
}
@Override
@SneakyThrows
public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) {
serviceRegistry = new K8SServiceRegistry(config);
serviceRegistry.start();
}
@Override
public List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry,
final Role role
) {
if (isNotEmpty(result)) {
return result;
}
if (serviceRegistry.isEmpty()) {
return Collections.emptyList();
}
switch (role) {
case PROXY:
return analyzeProxy(entry);
case SIDECAR:
return analyzeSideCar(entry);
}
return Collections.emptyList();
}
protected List<ServiceMeshMetric.Builder> analyzeSideCar(final TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
return Collections.emptyList();
}
final String cluster = properties.getUpstreamCluster();
if (cluster == null) {
return Collections.emptyList();
}
final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
final Address downstreamRemoteAddress =
properties.hasDownstreamDirectRemoteAddress()
? properties.getDownstreamDirectRemoteAddress()
: properties.getDownstreamRemoteAddress();
final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
if (cluster.startsWith("inbound|")) {
// Server side
final ServiceMeshMetric.Builder metrics;
if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
// Ingress -> sidecar(server side)
// Mesh telemetry without source, the relation would be generated.
metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics();
log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics);
} else {
// sidecar -> sidecar(server side)
metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics();
log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics);
}
sources.add(metrics);
} else if (cluster.startsWith("outbound|")) {
// sidecar(client side) -> sidecar
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics();
log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
sources.add(metric);
}
return sources;
}
protected List<ServiceMeshMetric.Builder> analyzeProxy(final TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
return Collections.emptyList();
}
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ?
properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
return Collections.emptyList();
}
final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress());
final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress());
final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics();
log.debug("Transformed ingress inbound mesh metric {}", metric);
result.add(metric);
final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress());
final ServiceMeshMetric.Builder outboundMetric =
newAdapter(entry, ingress, targetService)
.adaptToUpstreamMetrics()
// Can't parse it from tls properties, leave it to Server side.
.setTlsMode(NON_TLS);
log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
result.add(outboundMetric);
return result;
}
/**
* @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found.
*/
protected ServiceMetaInfo find(String ip) {
return serviceRegistry.findService(ip);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx;
import com.google.protobuf.Any;
import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.mx.FieldsHelper;
import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer;
import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN;
import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.DOWNSTREAM_KEY;
import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.UPSTREAM_KEY;
@Slf4j
public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyzer {
protected String fieldMappingFile = "metadata-service-mapping.yaml";
protected EnvoyMetricReceiverConfig config;
@Override
public String name() {
return "mx-mesh";
}
@Override
public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
this.config = config;
try {
FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz());
} catch (final Exception e) {
throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
}
}
@Override
public List<ServiceMeshMetric.Builder> analysis(
final List<ServiceMeshMetric.Builder> result,
final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry,
final Role role
) {
if (isNotEmpty(result)) {
return result;
}
if (!entry.hasCommonProperties()) {
return Collections.emptyList();
}
final AccessLogCommon properties = entry.getCommonProperties();
final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
if (stateMap.isEmpty()) {
return Collections.emptyList();
}
final ServiceMetaInfo currSvc;
try {
currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
return Collections.emptyList();
}
final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
return;
}
final ServiceMetaInfo svc;
try {
svc = adaptToServiceMetaInfo(value);
} catch (Exception e) {
log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray()));
return;
}
final ServiceMeshMetric.Builder metrics;
switch (key) {
case UPSTREAM_KEY:
metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS);
if (log.isDebugEnabled()) {
log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
}
result.add(metrics);
break;
case DOWNSTREAM_KEY:
metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics();
if (log.isDebugEnabled()) {
log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
}
result.add(metrics);
downstreamExists.set(true);
break;
}
});
if (role.equals(Role.PROXY) && !downstreamExists.get()) {
final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics();
if (log.isDebugEnabled()) {
log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric));
}
result.add(metric);
}
return result;
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
return new ServiceMetaInfoAdapter(value);
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception {
return config.serviceMetaInfoFactory().fromStruct(identifier.getNode().getMetadata());
}
}
......@@ -18,11 +18,16 @@
package org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters;
import com.google.common.base.Splitter;
import io.prometheus.client.Metrics;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import static java.util.stream.Collectors.toMap;
......@@ -42,6 +47,17 @@ public class ProtoMetricFamily2MetricsAdapter {
.timestamp(adaptTimestamp(it))
.labels(adaptLabels(it))
.build());
case HISTOGRAM:
return metricFamily.getMetricList()
.stream()
.map(it -> Histogram.builder()
.name(adaptMetricsName(it))
.labels(adaptLabels(it))
.sampleCount(it.getHistogram().getSampleCount())
.sampleSum(it.getHistogram().getSampleSum())
.buckets(buildBuckets(it.getHistogram().getBucketList()))
.build()
);
default:
return Stream.of();
}
......@@ -49,6 +65,13 @@ public class ProtoMetricFamily2MetricsAdapter {
@SuppressWarnings("unused")
public String adaptMetricsName(final Metrics.Metric metric) {
if (metricFamily.getName().startsWith("cluster.inbound|")) {
return Splitter.on(".")
.splitToList(metricFamily.getName())
.stream()
.filter(it -> !it.startsWith("inbound|"))
.collect(Collectors.joining("_"));
}
return metricFamily.getName();
}
......@@ -81,4 +104,12 @@ public class ProtoMetricFamily2MetricsAdapter {
return timestamp;
}
private static Map<Double, Long> buildBuckets(List<Metrics.Bucket> buckets) {
Map<Double, Long> result = new HashMap<>();
for (final Metrics.Bucket bucket : buckets) {
result.put(bucket.getUpperBound(), bucket.getCumulativeCount());
}
return result;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
......@@ -12,11 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
instances:
- key: not null
label: not null
- key: not null
label: not null
- key: not null
label: not null
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer
......@@ -148,6 +148,8 @@ public class TelemetryDataDispatcher {
service.setResponseCode(metrics.getResponseCode());
service.setType(protocol2Type(metrics.getProtocol()));
service.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode());
service.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes());
service.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes());
SOURCE_RECEIVER.receive(service);
}
......@@ -170,6 +172,8 @@ public class TelemetryDataDispatcher {
serviceRelation.setComponentId(protocol2Component(metrics.getProtocol()));
serviceRelation.setTlsMode(metrics.getTlsMode());
serviceRelation.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode());
serviceRelation.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes());
serviceRelation.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes());
SOURCE_RECEIVER.receive(serviceRelation);
}
......@@ -186,6 +190,8 @@ public class TelemetryDataDispatcher {
serviceInstance.setResponseCode(metrics.getResponseCode());
serviceInstance.setType(protocol2Type(metrics.getProtocol()));
serviceInstance.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode());
serviceInstance.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes());
serviceInstance.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes());
SOURCE_RECEIVER.receive(serviceInstance);
}
......@@ -208,6 +214,8 @@ public class TelemetryDataDispatcher {
serviceRelation.setComponentId(protocol2Component(metrics.getProtocol()));
serviceRelation.setTlsMode(metrics.getTlsMode());
serviceRelation.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode());
serviceRelation.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes());
serviceRelation.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes());
SOURCE_RECEIVER.receive(serviceRelation);
}
......@@ -234,6 +242,8 @@ public class TelemetryDataDispatcher {
return RequestType.gRPC;
case HTTP:
return RequestType.HTTP;
case TCP:
return RequestType.TCP;
case UNRECOGNIZED:
default:
return RequestType.RPC;
......@@ -248,6 +258,9 @@ public class TelemetryDataDispatcher {
case HTTP:
// HTTP in component-libraries.yml
return 49;
case TCP:
// TCP in component-libraries.yml
return 110;
case UNRECOGNIZED:
default:
// RPC in component-libraries.yml
......
......@@ -49,7 +49,9 @@ public class SQLExecutor implements InsertRequest, UpdateRequest {
preparedStatement.setObject(i + 1, param.get(i));
}
LOGGER.debug("execute sql in batch: {}", sql);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
}
preparedStatement.execute();
}
}
......@@ -154,9 +154,6 @@ public class ALSE2E extends SkyWalkingTestAdapter {
LOGGER.info("instances: {}", instances);
String file = "expected/als/instances.yml";
if (service.getLabel().equals("e2e::reviews")) {
file = "expected/als/instances-reviews.yml";
}
load(file).as(InstancesMatcher.class).verify(instances);
return instances;
......
......@@ -24,3 +24,5 @@ services:
label: e2e::details
- key: not null
label: e2e::istio-ingressgateway
- key: not null
label: e2e::mongodb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册