From 3b15f8d132284c6eec6bb670341910b640cbc65d Mon Sep 17 00:00:00 2001 From: Zhenxu Ke Date: Fri, 7 May 2021 07:50:03 +0800 Subject: [PATCH] feature: Envoy access log receiver supports TCP logs (#6727) --- .github/workflows/codeql.yaml | 2 +- .github/workflows/e2e.istio.yaml | 9 +- CHANGES.md | 1 + apm-protocol/apm-network/src/main/proto | 2 +- .../concepts-and-designs/scope-definitions.md | 11 +- .../setup/backend/configuration-vocabulary.md | 3 +- docs/en/setup/envoy/als_setting.md | 13 +- .../en/setup/envoy/examples/metrics/README.md | 2 +- .../en/setup/envoy/metrics_service_setting.md | 6 +- .../src/main/resources/application.yml | 1 + .../main/resources/component-libraries.yml | 3 + .../src/main/resources/oal/core.oal | 3 + .../src/main/resources/oal/tcp.oal | 32 ++++ .../ui-initialized-templates/apm.yml | 63 ++++-- .../oap/server/core/oal/rt/OALDefine.java | 8 +- .../oap/server/core/source/RequestType.java | 3 +- .../oap/server/core/source/Service.java | 3 + .../server/core/source/ServiceInstance.java | 4 + .../core/source/ServiceInstanceRelation.java | 3 + .../server/core/source/ServiceRelation.java | 3 + .../oap/server/core/source/TCPInfo.java | 36 ++++ .../envoy/AccessLogServiceGRPCHandler.java | 33 +++- .../envoy/EnvoyMetricReceiverConfig.java | 8 + .../envoy/EnvoyMetricReceiverProvider.java | 8 + .../server/receiver/envoy/TCPOALDefine.java | 35 ++++ .../receiver/envoy/als/ALSHTTPAnalysis.java | 31 +-- .../envoy/als/AbstractALSAnalyzer.java | 20 -- .../receiver/envoy/als/AccessLogAnalyzer.java | 68 +++++++ .../envoy/als/LogEntry2MetricsAdapter.java | 10 +- .../envoy/als/k8s/K8SServiceRegistry.java | 16 +- .../als/tcp/AbstractTCPAccessLogAnalyzer.java | 45 +++++ .../envoy/als/tcp/TCPAccessLogAnalyzer.java | 25 +++ .../als/tcp/TCPLogEntry2MetricsAdapter.java | 122 ++++++++++++ .../tcp/k8s/K8sALSServiceMeshTCPAnalysis.java | 181 ++++++++++++++++++ .../mx/MetaExchangeTCPAccessLogAnalyzer.java | 144 ++++++++++++++ .../ProtoMetricFamily2MetricsAdapter.java | 31 +++ ...eceiver.envoy.als.tcp.TCPAccessLogAnalyzer | 12 +- .../mesh/TelemetryDataDispatcher.java | 13 ++ .../storage/plugin/jdbc/SQLExecutor.java | 4 +- .../apache/skywalking/e2e/mesh/ALSE2E.java | 3 - .../test/resources/expected/als/services.yml | 2 + 41 files changed, 910 insertions(+), 112 deletions(-) create mode 100644 oap-server/server-bootstrap/src/main/resources/oal/tcp.oal create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TCPInfo.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/TCPOALDefine.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java rename test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml => oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer (80%) diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml index 769db83def..9378a9cda5 100644 --- a/.github/workflows/codeql.yaml +++ b/.github/workflows/codeql.yaml @@ -45,7 +45,7 @@ jobs: with: languages: ${{ matrix.language }} - - run: ./mvnw -Dmaven.test.skip=true clean install + - run: ./mvnw -Dmaven.test.skip=true clean install || ./mvnw -Dmaven.test.skip=true clean install - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml index 252eda82d0..c0ee36fbbf 100644 --- a/.github/workflows/e2e.istio.yaml +++ b/.github/workflows/e2e.istio.yaml @@ -100,6 +100,7 @@ jobs: --set elasticsearch.minimumMasterNodes=1 \ --set elasticsearch.imageTag=7.5.1 \ --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=${{ matrix.analyzer }} \ + --set oap.env.SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=${{ matrix.analyzer }} \ --set oap.env.K8S_SERVICE_NAME_RULE='e2e::${service.metadata.name}' \ --set oap.envoy.als.enabled=true \ --set oap.replicas=1 \ @@ -117,7 +118,13 @@ jobs: - name: Deploy demo services if: env.SKIP_CI != 'true' - run: bash ${SCRIPTS_DIR}/demo.sh + run: | + bash ${SCRIPTS_DIR}/demo.sh + # Enable TCP services + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/platform/kube/bookinfo-ratings-v2.yaml + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/platform/kube/bookinfo-db.yaml + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/networking/destination-rule-all.yaml + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/networking/virtual-service-ratings-db.yaml - name: Cluster Info if: ${{ failure() }} diff --git a/CHANGES.md b/CHANGES.md index c6bc4d69bd..037e945efe 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -34,6 +34,7 @@ Release Notes. * CVE: fix Jetty vulnerability. https://nvd.nist.gov/vuln/detail/CVE-2019-17638 * Fix: MAL function would miss samples name after creating new samples. * perf: use iterator.remove() to remove modulesWithoutProvider +* Support analyzing Envoy TCP access logs. #### UI * Add logo for kong plugin. diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index 9a689b0188..7da226cfce 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit 9a689b0188cbdc7bd8d6ddd99b4ad5283e82fe88 +Subproject commit 7da226cfced7fa4eb91c6528e8c30827288531a0 diff --git a/docs/en/concepts-and-designs/scope-definitions.md b/docs/en/concepts-and-designs/scope-definitions.md index ede1252af3..cc7518ff8e 100644 --- a/docs/en/concepts-and-designs/scope-definitions.md +++ b/docs/en/concepts-and-designs/scope-definitions.md @@ -30,6 +30,8 @@ This calculates the metrics data from each request of the service. | type | The type of each request. Such as: Database, HTTP, RPC, gRPC. | | enum | | tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List` | | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `ServiceInstance` @@ -47,6 +49,8 @@ This calculates the metrics data from each request of the service instance. | type | The type of each request, such as Database, HTTP, RPC, or gRPC. | | enum | | tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List` | | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | #### Secondary scopes of `ServiceInstance` @@ -120,6 +124,8 @@ This calculates the metrics data from each request of the endpoint in the servic | type | The type of each request, such as Database, HTTP, RPC, or gRPC. | | enum | | tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List` | | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `ServiceRelation` @@ -142,7 +148,8 @@ This calculates the metrics data from each request between services. | detectPoint | Where the relation is detected. The value may be client, server, or proxy. | yes | enum| | tlsMode | The TLS mode between source and destination services, such as `service_relation_mtls_cpm = from(ServiceRelation.*).filter(tlsMode == "mTLS").cpm()` || string| | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| - +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `ServiceInstanceRelation` @@ -165,6 +172,8 @@ This calculates the metrics data from each request between service instances. | detectPoint | Where the relation is detected. The value may be client, server, or proxy. | yes | enum| | tlsMode | The TLS mode between source and destination service instances, such as `service_instance_relation_mtls_cpm = from(ServiceInstanceRelation.*).filter(tlsMode == "mTLS").cpm()` || string| | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `EndpointRelation` diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index 5d8746ec34..67100d39f8 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -191,7 +191,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | service-mesh| default| Read [receiver doc](backend-receivers.md) for more details | - | - | | envoy-metric| default| Read [receiver doc](backend-receivers.md) for more details | - | - | | - | - | acceptMetricsService | Open Envoy Metrics Service analysis | SW_ENVOY_METRIC_SERVICE | true| -| - | - | alsHTTPAnalysis | Open Envoy Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS | - | +| - | - | alsHTTPAnalysis | Open Envoy HTTP Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS | - | +| - | - | alsTCPAnalysis | Open Envoy TCP Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_TCP_ANALYSIS | - | | - | - | k8sServiceNameRule | `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata, the available variables are `pod`, `service`, e.g., you can use `${service.metadata.name}-${pod.metadata.labels.version}` to append the version number to the service name. Be careful, when using environment variables to pass this configuration, use single quotes(`''`) to avoid it being evaluated by the shell. | - | | receiver-otel | default | Read [receiver doc](backend-receivers.md) for more details | - | - | | - | - | enabledHandlers|Enabled handlers for otel| SW_OTEL_RECEIVER_ENABLED_HANDLERS | - | diff --git a/docs/en/setup/envoy/als_setting.md b/docs/en/setup/envoy/als_setting.md index bc22a7abd4..c23298bb28 100644 --- a/docs/en/setup/envoy/als_setting.md +++ b/docs/en/setup/envoy/als_setting.md @@ -1,6 +1,6 @@ # Observe Service Mesh through ALS -[Envoy Access Log Service (ALS)](https://www.envoyproxy.io/docs/envoy/latest/api-v2/service/accesslog/v2/als.proto) provides +[Envoy Access Log Service (ALS)](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/service/accesslog/v2/als.proto) provides full logs about RPC routed, including HTTP and TCP. ## Background @@ -29,8 +29,9 @@ On Istio version 1.6.0+, if Istio is installed with [`demo` profile](https://ist - Activate SkyWalking [Envoy Receiver](../backend/backend-receivers.md). This is activated by default. -- Choose an ALS analyzer. There are two available analyzers, `k8s-mesh` and `mx-mesh`. - Set the system environment variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh` +- Choose an ALS analyzer. There are two available analyzers, `k8s-mesh` and `mx-mesh` for both HTTP access logs and TCP access logs. + Set the system environment variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** and **SW_ENVOY_METRIC_ALS_TCP_ANALYSIS** + such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=mx-mesh`, `SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=mx-mesh` or in the `application.yaml` to activate the analyzer. For more about the analyzers, see [SkyWalking ALS Analyzers](#skywalking-als-analyzers) ```yaml @@ -39,6 +40,7 @@ On Istio version 1.6.0+, if Istio is installed with [`demo` profile](https://ist default: acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true} alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""} # Setting the system env variable would override this. + alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""} ``` To use multiple analyzers as a fallbackļ¼Œplease use `,` to concatenate. @@ -61,12 +63,13 @@ helm repo add elastic https://helm.elastic.co helm dep up skywalking helm install 8.1.0 skywalking -n istio-system \ - --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \ + --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=mx-mesh \ + --set oap.env.SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=mx-mesh \ --set fullnameOverride=skywalking \ --set oap.envoy.als.enabled=true ``` -You can use `kubectl -n istio-system logs -l app=skywalking | grep "K8sALSServiceMeshHTTPAnalysis"` to ensure OAP ALS `k8s-mesh` analyzer has been activated. +You can use `kubectl -n istio-system logs -l app=skywalking | grep "K8sALSServiceMeshHTTPAnalysis"` to ensure OAP ALS `mx-mesh` analyzer has been activated. ## SkyWalking ALS Analyzers diff --git a/docs/en/setup/envoy/examples/metrics/README.md b/docs/en/setup/envoy/examples/metrics/README.md index 0f24657254..dddd2c57a1 100644 --- a/docs/en/setup/envoy/examples/metrics/README.md +++ b/docs/en/setup/envoy/examples/metrics/README.md @@ -1,7 +1,7 @@ # Sending Envoy Metrics to SkyWalking OAP Server Example This is an example of sending [Envoy Stats](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/observability/statistics#arch-overview-statistics) to SkyWalking OAP server -through [Metric Service](https://www.envoyproxy.io/docs/envoy/latest/api-v2/config/metrics/v2/metrics_service.proto). +through [Metric Service](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/config/metrics/v2/metrics_service.proto). ## Running the example diff --git a/docs/en/setup/envoy/metrics_service_setting.md b/docs/en/setup/envoy/metrics_service_setting.md index bc3babcf57..8750de9844 100644 --- a/docs/en/setup/envoy/metrics_service_setting.md +++ b/docs/en/setup/envoy/metrics_service_setting.md @@ -6,15 +6,15 @@ SkyWalking has a built-in receiver that implements this protocol so that you can As an APM system, SkyWalking does not only receive and store the metrics emitted by Envoy, it also analyzes the topology of services and service instances. **Attention:** There are two versions of Envoy metrics service protocol up to date, -[v2](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) and -[v3](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/metrics/v3/metrics_service.proto), SkyWalking (8.3.0+) supports both of them. +[v2](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) and +[v3](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v3/config/metrics/v3/metrics_service.proto), SkyWalking (8.3.0+) supports both of them. ## Configure Envoy to send metrics to SkyWalking without Istio Envoy can be used with / without Istio's control. This section introduces how to configure the standalone Envoy to send the metrics to SkyWalking. In order to let Envoy send metrics to SkyWalking, we need to feed Envoy with a configuration which contains `stats_sinks` that includes `envoy.metrics_service`. -This `envoy.metrics_service` should be configured as a [`config.grpc_service`](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) entry. +This `envoy.metrics_service` should be configured as a [`config.grpc_service`](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) entry. The interesting parts of the config is shown in the config below: diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 239858b00b..7e5ed87ca1 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -327,6 +327,7 @@ envoy-metric: default: acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true} alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""} + alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""} # `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata, # the available variables are `pod`, `service`, f.e., you can use `${service.metadata.name}-${pod.metadata.labels.version}` # to append the version number to the service name. diff --git a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml index ddcd3e9aed..73d8012de8 100755 --- a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml +++ b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml @@ -359,6 +359,9 @@ seata: MyBatis: id: 109 languages: Java +tcp: + id: 110 + languages: Java # .NET/.NET Core components # [3000, 4000) for C#/.NET only diff --git a/oap-server/server-bootstrap/src/main/resources/oal/core.oal b/oap-server/server-bootstrap/src/main/resources/oal/core.oal index e02934de67..16dc46d17d 100755 --- a/oap-server/server-bootstrap/src/main/resources/oal/core.oal +++ b/oap-server/server-bootstrap/src/main/resources/oal/core.oal @@ -16,6 +16,9 @@ * */ +// For services using protocols HTTP 1/2, gRPC, RPC, etc., the cpm metrics means "calls per minute", +// for services that are built on top of TCP, the cpm means "packages per minute". + // All scope metrics all_percentile = from(All.latency).percentile(10); // Multiple values including p50, p75, p90, p95, p99 all_heatmap = from(All.latency).histogram(100, 20); diff --git a/oap-server/server-bootstrap/src/main/resources/oal/tcp.oal b/oap-server/server-bootstrap/src/main/resources/oal/tcp.oal new file mode 100644 index 0000000000..006abd1e58 --- /dev/null +++ b/oap-server/server-bootstrap/src/main/resources/oal/tcp.oal @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// TCP services' metrics +service_throughput_received = from(Service.tcpInfo.receivedBytes).filter(type == RequestType.TCP).longAvg(); +service_throughput_sent = from(Service.tcpInfo.sentBytes).filter(type == RequestType.TCP).longAvg(); +service_relation_client_received = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_relation_client_sent = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_relation_server_received = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); +service_relation_server_sent = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); + +service_instance_throughput_received = from(ServiceInstance.tcpInfo.receivedBytes).filter(type == RequestType.TCP).longAvg(); +service_instance_throughput_sent = from(ServiceInstance.tcpInfo.sentBytes).filter(type == RequestType.TCP).longAvg(); +service_instance_relation_client_received = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_instance_relation_client_sent = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_instance_relation_server_received = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); +service_instance_relation_server_sent = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); diff --git a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml index fa41898233..901bd88538 100644 --- a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml +++ b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml @@ -45,7 +45,8 @@ templates: "queryMetricType": "sortMetrics", "chartType": "ChartSlow", "parentService": false, - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { "width": 3, @@ -168,7 +169,8 @@ templates: "metricName": "service_cpm", "queryMetricType": "readMetricsValue", "chartType": "ChartNum", - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { "width": 3, @@ -221,10 +223,24 @@ templates: "metricName": "service_cpm", "queryMetricType": "readMetricsValues", "chartType": "ChartLine", - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { - "width": "4", + "width": 3, + "title": "Service Throughput", + "height": "200", + "entityType": "Service", + "independentSelector": false, + "metricType": "REGULAR_VALUE", + "metricName": "service_throughput_received,service_throughput_sent", + "queryMetricType": "readMetricsValues", + "chartType": "ChartLine", + "unit": "Bytes", + "tips": "This metrics is only avaible for TCP services" + }, + { + "width": "3", "title": "Service Instances Load", "height": "280", "entityType": "ServiceInstance", @@ -234,10 +250,11 @@ templates: "queryMetricType": "sortMetrics", "chartType": "ChartSlow", "parentService": true, - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { - "width": "4", + "width": "3", "title": "Slow Service Instance", "height": "280", "entityType": "ServiceInstance", @@ -250,7 +267,7 @@ templates: "unit": "ms" }, { - "width": "4", + "width": "3", "title": "Service Instance Successful Rate", "height": "280", "entityType": "ServiceInstance", @@ -271,21 +288,34 @@ templates: "name": "Instance", "children": [ { - "width": "4", + "width": "3", "title": "Service Instance Load", - "height": "150", + "height": "200", "entityType": "ServiceInstance", "independentSelector": false, "metricType": "REGULAR_VALUE", "metricName": "service_instance_cpm", "queryMetricType": "readMetricsValues", "chartType": "ChartLine", - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { - "width": "4", + "width": 3, + "title": "Service Instance Throughput", + "height": "200", + "entityType": "ServiceInstance", + "independentSelector": false, + "metricType": "REGULAR_VALUE", + "metricName": "service_instance_throughput_received,service_instance_throughput_sent", + "queryMetricType": "readMetricsValues", + "chartType": "ChartLine", + "unit": "Bytes" + }, + { + "width": "3", "title": "Service Instance Successful Rate", - "height": "150", + "height": "200", "entityType": "ServiceInstance", "independentSelector": false, "metricType": "REGULAR_VALUE", @@ -297,9 +327,9 @@ templates: "aggregationNum": "100" }, { - "width": "4", + "width": "3", "title": "Service Instance Latency", - "height": "150", + "height": "200", "entityType": "ServiceInstance", "independentSelector": false, "metricType": "REGULAR_VALUE", @@ -434,7 +464,8 @@ templates: "queryMetricType": "sortMetrics", "chartType": "ChartSlow", "parentService": true, - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { "width": "4", @@ -525,4 +556,4 @@ templates: # False means providing a basic template, user needs to add it manually. activated: true # True means wouldn't show up on the dashboard. Only keeps the definition in the storage. - disabled: false \ No newline at end of file + disabled: false diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java index 020dd8f4b7..b33cd0717f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java @@ -33,11 +33,17 @@ import static java.util.Objects.requireNonNull; public abstract class OALDefine { protected OALDefine(final String configFile, final String sourcePackage) { + this(configFile, sourcePackage, sourcePackage); + } + + protected OALDefine(final String configFile, + final String sourcePackage, + final String dispatcherPackage) { this.configFile = requireNonNull(configFile); this.sourcePackage = appendPoint(requireNonNull(sourcePackage)); this.dynamicMetricsClassPackage = appendPoint(sourcePackage + ".oal.rt.metrics"); this.dynamicMetricsBuilderClassPackage = appendPoint(sourcePackage + ".oal.rt.metrics.builder"); - this.dynamicDispatcherClassPackage = appendPoint(sourcePackage + ".oal.rt.dispatcher"); + this.dynamicDispatcherClassPackage = appendPoint(dispatcherPackage + ".oal.rt.dispatcher"); } private final String configFile; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java index 3298da0987..6eea8656ca 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java @@ -29,5 +29,6 @@ public enum RequestType { /** * Logic request only. */ - LOGIC + LOGIC, + TCP } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java index 3700271b3a..f140e70f8f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java @@ -72,4 +72,7 @@ public class Service extends Source { @Getter @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java index df882c18f4..a4070bde7b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java @@ -76,6 +76,10 @@ public class ServiceInstance extends Source { @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); + @Override public void prepare() { serviceId = IDManager.ServiceID.buildId(serviceName, nodeType); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java index d5f93fe492..775561a23f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java @@ -105,6 +105,9 @@ public class ServiceInstanceRelation extends Source { @Getter @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); @Override public void prepare() { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java index ca19163c52..9b13d71cc4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java @@ -99,6 +99,9 @@ public class ServiceRelation extends Source { @Getter @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); @Override public void prepare() { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TCPInfo.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TCPInfo.java new file mode 100644 index 0000000000..26539e5911 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TCPInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.source; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@NoArgsConstructor +@AllArgsConstructor +public class TCPInfo { + @Getter + @Setter + private long receivedBytes; + + @Getter + @Setter + private long sentBytes; +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java index bce63fb0ef..f3bc5ef230 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.receiver.envoy; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse; @@ -32,6 +33,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; @@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory; public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class); private final List envoyHTTPAnalysisList; + private final List envoyTCPAnalysisList; private final CounterMetrics counter; private final HistogramMetrics histogram; @@ -51,6 +54,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { ServiceLoader alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class); + ServiceLoader alsTcpAnalyzers = ServiceLoader.load(TCPAccessLogAnalyzer.class); envoyHTTPAnalysisList = new ArrayList<>(); for (String httpAnalysisName : config.getAlsHTTPAnalysis()) { for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) { @@ -60,8 +64,17 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS } } } + envoyTCPAnalysisList = new ArrayList<>(); + for (String analyzerName : config.getAlsTCPAnalysis()) { + for (TCPAccessLogAnalyzer tcpAnalyzer : alsTcpAnalyzers) { + if (analyzerName.equals(tcpAnalyzer.name())) { + tcpAnalyzer.init(manager, config); + envoyTCPAnalysisList.add(tcpAnalyzer); + } + } + } - LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList); + LOGGER.debug("envoy HTTP analysis: {}, envoy TCP analysis: {}", envoyHTTPAnalysisList, envoyTCPAnalysisList); MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); counter = metricCreator.createCounter( @@ -110,11 +123,11 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS .getId(), role, logCase, message); } + List sourceResult = new ArrayList<>(); switch (logCase) { case HTTP_LOGS: StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs(); - List sourceResult = new ArrayList<>(); for (final HTTPAccessLogEntry log : logs.getLogEntryList()) { List result = new ArrayList<>(); for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { @@ -123,10 +136,22 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS sourceResult.addAll(result); } - sourceDispatcherCounter.inc(sourceResult.size()); - sourceResult.forEach(TelemetryDataDispatcher::process); + break; + case TCP_LOGS: + StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs(); + + for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) { + List result = new ArrayList<>(); + for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) { + result = analyzer.analysis(result, identifier, tcpLog, role); + } + sourceResult.addAll(result); + } + break; } + sourceDispatcherCounter.inc(sourceResult.size()); + sourceResult.forEach(TelemetryDataDispatcher::process); } finally { timer.finish(); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java index 94f8c45838..925a820f4a 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java @@ -33,6 +33,7 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig { @Getter private boolean acceptMetricsService = false; private String alsHTTPAnalysis; + private String alsTCPAnalysis; // TODO: add to doc @Getter private String k8sServiceNameRule; @@ -45,6 +46,13 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig { return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList()); } + public List getAlsTCPAnalysis() { + if (Strings.isNullOrEmpty(alsTCPAnalysis)) { + return Collections.emptyList(); + } + return Arrays.stream(alsTCPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList()); + } + public List rules() throws ModuleStartException { return Rules.loadRules("envoy-metrics-rules", Collections.singletonList("envoy")); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java index 4098cbf626..2678f87dfd 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.receiver.envoy; import org.apache.skywalking.aop.server.receiver.mesh.MeshReceiverModule; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; @@ -65,6 +66,13 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { + if (!config.getAlsTCPAnalysis().isEmpty()) { + getManager().find(CoreModule.NAME) + .provider() + .getService(OALEngineLoaderService.class) + .load(TCPOALDefine.INSTANCE); + } + GRPCHandlerRegister service = getManager().find(SharingServerModule.NAME) .provider() .getService(GRPCHandlerRegister.class); diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/TCPOALDefine.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/TCPOALDefine.java new file mode 100644 index 0000000000..93ca89967a --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/TCPOALDefine.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.receiver.envoy; + +import org.apache.skywalking.oap.server.core.oal.rt.OALDefine; + +/** + * OAL rules to calculate TCP-specific metrics. + */ +public class TCPOALDefine extends OALDefine { + public static final TCPOALDefine INSTANCE = new TCPOALDefine(); + + private TCPOALDefine() { + super( + "oal/tcp.oal", + "org.apache.skywalking.oap.server.core.source", + "org.apache.skywalking.oap.server.core.source.tcp" + ); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java index 7058e30a7b..ce69eaa5d8 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java @@ -19,38 +19,9 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; -import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; -import java.util.List; -import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; -import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.apache.skywalking.oap.server.library.module.ModuleStartException; -import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; /** * Analysis source metrics from ALS */ -public interface ALSHTTPAnalysis { - String name(); - - void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException; - - /** - * The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one. - * - * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty. - * - * @param result of the previous analyzer. - * @param identifier of the Envoy node where the logs are emitted. - * @param entry the log entry. - * @param role the role of the Envoy node where the logs are emitted. - * @return the analysis results. - */ - List analysis( - final List result, - final StreamAccessLogsMessage.Identifier identifier, - final HTTPAccessLogEntry entry, - final Role role - ); - - Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev); +public interface ALSHTTPAnalysis extends AccessLogAnalyzer { } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java index 0ccc14769a..2bb71afd39 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java @@ -18,33 +18,13 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; -import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; -import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; @Slf4j public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis { - @Override - public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) { - if (alsIdentifier == null) { - return defaultRole; - } - if (!alsIdentifier.hasNode()) { - return defaultRole; - } - final Node node = alsIdentifier.getNode(); - final String id = node.getId(); - if (id.startsWith("router~")) { - return Role.PROXY; - } else if (id.startsWith("sidecar~")) { - return Role.SIDECAR; - } - return defaultRole; - } - /** * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. * diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java new file mode 100644 index 0000000000..4dcf883f48 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; +import java.util.List; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; + +public interface AccessLogAnalyzer { + String name(); + + void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException; + + /** + * The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one. + * + * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty. + * + * @param result of the previous analyzer. + * @param identifier of the Envoy node where the logs are emitted. + * @param entry the log entry. + * @param role the role of the Envoy node where the logs are emitted. + * @return the analysis results. + */ + List analysis( + final List result, + final StreamAccessLogsMessage.Identifier identifier, + final E entry, + final Role role + ); + + default Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role defaultRole) { + if (alsIdentifier == null) { + return defaultRole; + } + if (!alsIdentifier.hasNode()) { + return defaultRole; + } + final Node node = alsIdentifier.getNode(); + final String id = node.getId(); + if (id.startsWith("router~")) { + return Role.PROXY; + } else if (id.startsWith("sidecar~")) { + return Role.SIDECAR; + } + return defaultRole; + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java index fdeb0422d0..7b6f8fbf92 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java @@ -135,15 +135,15 @@ public class LogEntry2MetricsAdapter { return method + ":" + request.getPath(); } - protected static long formatAsLong(final Timestamp timestamp) { + public static long formatAsLong(final Timestamp timestamp) { return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); } - protected static long formatAsLong(final Duration duration) { + public static long formatAsLong(final Duration duration) { return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); } - protected static Protocol requestProtocol(final HTTPRequestProperties request) { + public static Protocol requestProtocol(final HTTPRequestProperties request) { if (request == null) { return Protocol.HTTP; } @@ -154,7 +154,7 @@ public class LogEntry2MetricsAdapter { return Protocol.gRPC; } - protected static String parseTLS(final TLSProperties properties) { + public static String parseTLS(final TLSProperties properties) { if (properties == null) { return NON_TLS; } @@ -175,7 +175,7 @@ public class LogEntry2MetricsAdapter { * @param responseFlags in the ALS v2 * @return empty string if no internal error code, or literal string representing the code. */ - protected static String parseInternalErrorCode(final ResponseFlags responseFlags) { + public static String parseInternalErrorCode(final ResponseFlags responseFlags) { if (responseFlags != null) { if (responseFlags.getFailedLocalHealthcheck()) { return "failed_local_healthcheck"; diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java index 158cd2043e..f57fa66056 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java @@ -214,17 +214,13 @@ public class K8SServiceRegistry { } protected void addPod(final V1Pod pod) { - ofNullable(pod.getStatus()).ifPresent( - status -> ipPodMap.put(status.getPodIP(), pod) - ); + ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(podIP -> ipPodMap.put(podIP, pod)); recompose(); } protected void removePod(final V1Pod pod) { - ofNullable(pod.getStatus()).ifPresent( - status -> ipPodMap.remove(status.getPodIP()) - ); + ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(ipPodMap::remove); } protected void addEndpoints(final V1Endpoints endpoints) { @@ -239,7 +235,7 @@ public class K8SServiceRegistry { ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach( subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach( - address -> ipServiceMap.put(address.getIp(), namespace + ":" + name) + address -> ofNullable(address.getIp()).ifPresent(ip -> ipServiceMap.put(ip, namespace + ":" + name)) )) )); @@ -249,7 +245,7 @@ public class K8SServiceRegistry { protected void removeEndpoints(final V1Endpoints endpoints) { ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach( subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach( - address -> ipServiceMap.remove(address.getIp()) + address -> ofNullable(address.getIp()).ifPresent(ipServiceMap::remove) )) )); } @@ -264,7 +260,7 @@ public class K8SServiceRegistry { .collect(Collectors.toList()); } - protected ServiceMetaInfo findService(final String ip) { + public ServiceMetaInfo findService(final String ip) { final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip); if (isNull(service)) { log.debug("Unknown ip {}, ip -> service is null", ip); @@ -311,7 +307,7 @@ public class K8SServiceRegistry { }); } - protected boolean isEmpty() { + public boolean isEmpty() { return ipServiceMetaInfoMap.isEmpty(); } } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java new file mode 100644 index 0000000000..05165094ba --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp; + +import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +@Slf4j +public abstract class AbstractTCPAccessLogAnalyzer implements TCPAccessLogAnalyzer { + + /** + * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. + * + * @param entry the access log entry that is to be adapted from. + * @param sourceService the source service. + * @param targetService the target/destination service. + * @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. + */ + protected TCPLogEntry2MetricsAdapter newAdapter( + final TCPAccessLogEntry entry, + final ServiceMetaInfo sourceService, + final ServiceMetaInfo targetService) { + return new TCPLogEntry2MetricsAdapter(entry, sourceService, targetService); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java new file mode 100644 index 0000000000..c073b82f9a --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp; + +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer; + +public interface TCPAccessLogAnalyzer extends AccessLogAnalyzer { +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java new file mode 100644 index 0000000000..e7d60d1dd5 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp; + +import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v3.ConnectionProperties; +import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.apm.network.common.v3.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.apm.network.servicemesh.v3.TCPInfo; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +import static org.apache.skywalking.apm.util.StringUtil.isBlank; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.formatAsLong; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseInternalErrorCode; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseTLS; + +/** + * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders. + */ +@RequiredArgsConstructor +public class TCPLogEntry2MetricsAdapter { + + /** + * The access log entry that is to be adapted into metrics builders. + */ + protected final TCPAccessLogEntry entry; + + protected final ServiceMetaInfo sourceService; + + protected final ServiceMetaInfo targetService; + + /** + * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}. + * + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public ServiceMeshMetric.Builder adaptToDownstreamMetrics() { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getStartTime()); + final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + + return adaptCommonPart() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setLatency((int) Math.max(1L, duration)) + .setDetectPoint(DetectPoint.server); + } + + /** + * Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}. + * + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public ServiceMeshMetric.Builder adaptToUpstreamMetrics() { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getStartTime()); + final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); + final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); + + return adaptCommonPart() + .setStartTime(outboundStartTime) + .setEndTime(outboundEndTime) + .setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime)) + .setDetectPoint(DetectPoint.client); + } + + protected ServiceMeshMetric.Builder adaptCommonPart() { + final AccessLogCommon properties = entry.getCommonProperties(); + final ConnectionProperties connectionProperties = entry.getConnectionProperties(); + final String tlsMode = parseTLS(properties.getTlsProperties()); + final String internalErrorCode = parseInternalErrorCode(properties.getResponseFlags()); + + final ServiceMeshMetric.Builder builder = + ServiceMeshMetric.newBuilder() + .setTlsMode(tlsMode) + .setProtocol(Protocol.TCP) + .setStatus(isBlank(internalErrorCode)) + .setTcp( + TCPInfo.newBuilder() + .setReceivedBytes(connectionProperties.getReceivedBytes()) + .setSentBytes(connectionProperties.getSentBytes()) + ) + .setInternalErrorCode(internalErrorCode); + + Optional.ofNullable(sourceService) + .map(ServiceMetaInfo::getServiceName) + .ifPresent(builder::setSourceServiceName); + Optional.ofNullable(sourceService) + .map(ServiceMetaInfo::getServiceInstanceName) + .ifPresent(builder::setSourceServiceInstance); + Optional.ofNullable(targetService) + .map(ServiceMetaInfo::getServiceName) + .ifPresent(builder::setDestServiceName); + Optional.ofNullable(targetService) + .map(ServiceMetaInfo::getServiceInstanceName) + .ifPresent(builder::setDestServiceInstance); + + return builder; + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java new file mode 100644 index 0000000000..c3a8f363d6 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s; + +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SServiceRegistry; +import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer; + +import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS; + +/** + * Analysis log based on ingress and mesh scenarios. + */ +@Slf4j +public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer { + protected K8SServiceRegistry serviceRegistry; + + @Override + public String name() { + return "k8s-mesh"; + } + + @Override + @SneakyThrows + public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) { + serviceRegistry = new K8SServiceRegistry(config); + serviceRegistry.start(); + } + + @Override + public List analysis( + final List result, + final StreamAccessLogsMessage.Identifier identifier, + final TCPAccessLogEntry entry, + final Role role + ) { + if (isNotEmpty(result)) { + return result; + } + if (serviceRegistry.isEmpty()) { + return Collections.emptyList(); + } + switch (role) { + case PROXY: + return analyzeProxy(entry); + case SIDECAR: + return analyzeSideCar(entry); + } + + return Collections.emptyList(); + } + + protected List analyzeSideCar(final TCPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final String cluster = properties.getUpstreamCluster(); + if (cluster == null) { + return Collections.emptyList(); + } + + final List sources = new ArrayList<>(); + + final Address downstreamRemoteAddress = + properties.hasDownstreamDirectRemoteAddress() + ? properties.getDownstreamDirectRemoteAddress() + : properties.getDownstreamRemoteAddress(); + final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); + + if (cluster.startsWith("inbound|")) { + // Server side + final ServiceMeshMetric.Builder metrics; + if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { + // Ingress -> sidecar(server side) + // Mesh telemetry without source, the relation would be generated. + metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics(); + + log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics); + } else { + // sidecar -> sidecar(server side) + metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics(); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics); + } + sources.add(metrics); + } else if (cluster.startsWith("outbound|")) { + // sidecar(client side) -> sidecar + final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); + + final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics(); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + sources.add(metric); + } + + return sources; + } + + protected List analyzeProxy(final TCPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ? + properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress(); + final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) { + return Collections.emptyList(); + } + + final List result = new ArrayList<>(2); + final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); + final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); + + final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); + final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); + + final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics(); + + log.debug("Transformed ingress inbound mesh metric {}", metric); + result.add(metric); + + final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); + final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); + + final ServiceMeshMetric.Builder outboundMetric = + newAdapter(entry, ingress, targetService) + .adaptToUpstreamMetrics() + // Can't parse it from tls properties, leave it to Server side. + .setTlsMode(NON_TLS); + + log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); + result.add(outboundMetric); + + return result; + } + + /** + * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. + */ + protected ServiceMetaInfo find(String ip) { + return serviceRegistry.findService(ip); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java new file mode 100644 index 0000000000..1f4cdcc317 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx; + +import com.google.protobuf.Any; +import com.google.protobuf.TextFormat; +import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import org.apache.skywalking.oap.server.receiver.envoy.als.mx.FieldsHelper; +import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter; +import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer; + +import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS; +import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN; +import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.DOWNSTREAM_KEY; +import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.UPSTREAM_KEY; + +@Slf4j +public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyzer { + protected String fieldMappingFile = "metadata-service-mapping.yaml"; + + protected EnvoyMetricReceiverConfig config; + + @Override + public String name() { + return "mx-mesh"; + } + + @Override + public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { + this.config = config; + try { + FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz()); + } catch (final Exception e) { + throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e); + } + } + + @Override + public List analysis( + final List result, + final StreamAccessLogsMessage.Identifier identifier, + final TCPAccessLogEntry entry, + final Role role + ) { + if (isNotEmpty(result)) { + return result; + } + if (!entry.hasCommonProperties()) { + return Collections.emptyList(); + } + final AccessLogCommon properties = entry.getCommonProperties(); + final Map stateMap = properties.getFilterStateObjectsMap(); + if (stateMap.isEmpty()) { + return Collections.emptyList(); + } + final ServiceMetaInfo currSvc; + try { + currSvc = adaptToServiceMetaInfo(identifier); + } catch (Exception e) { + log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e); + return Collections.emptyList(); + } + + final AtomicBoolean downstreamExists = new AtomicBoolean(); + stateMap.forEach((key, value) -> { + if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) { + return; + } + final ServiceMetaInfo svc; + try { + svc = adaptToServiceMetaInfo(value); + } catch (Exception e) { + log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray())); + return; + } + final ServiceMeshMetric.Builder metrics; + switch (key) { + case UPSTREAM_KEY: + metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics)); + } + result.add(metrics); + break; + case DOWNSTREAM_KEY: + metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics(); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics)); + } + result.add(metrics); + downstreamExists.set(true); + break; + } + }); + if (role.equals(Role.PROXY) && !downstreamExists.get()) { + final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics(); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric)); + } + result.add(metric); + } + return result; + } + + protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception { + return new ServiceMetaInfoAdapter(value); + } + + protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception { + return config.serviceMetaInfoFactory().fromStruct(identifier.getNode().getMetadata()); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java index ffcef39279..030fa373e0 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java @@ -18,11 +18,16 @@ package org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters; +import com.google.common.base.Splitter; import io.prometheus.client.Metrics; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge; +import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram; import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric; import static java.util.stream.Collectors.toMap; @@ -42,6 +47,17 @@ public class ProtoMetricFamily2MetricsAdapter { .timestamp(adaptTimestamp(it)) .labels(adaptLabels(it)) .build()); + case HISTOGRAM: + return metricFamily.getMetricList() + .stream() + .map(it -> Histogram.builder() + .name(adaptMetricsName(it)) + .labels(adaptLabels(it)) + .sampleCount(it.getHistogram().getSampleCount()) + .sampleSum(it.getHistogram().getSampleSum()) + .buckets(buildBuckets(it.getHistogram().getBucketList())) + .build() + ); default: return Stream.of(); } @@ -49,6 +65,13 @@ public class ProtoMetricFamily2MetricsAdapter { @SuppressWarnings("unused") public String adaptMetricsName(final Metrics.Metric metric) { + if (metricFamily.getName().startsWith("cluster.inbound|")) { + return Splitter.on(".") + .splitToList(metricFamily.getName()) + .stream() + .filter(it -> !it.startsWith("inbound|")) + .collect(Collectors.joining("_")); + } return metricFamily.getName(); } @@ -81,4 +104,12 @@ public class ProtoMetricFamily2MetricsAdapter { return timestamp; } + + private static Map buildBuckets(List buckets) { + Map result = new HashMap<>(); + for (final Metrics.Bucket bucket : buckets) { + result.put(bucket.getUpperBound(), bucket.getCumulativeCount()); + } + return result; + } } diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer similarity index 80% rename from test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml rename to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer index 1abf02cff8..5290deb529 100644 --- a/test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,11 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# +# -instances: - - key: not null - label: not null - - key: not null - label: not null - - key: not null - label: not null +org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis +org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java index c1da14b05f..9af5fac86e 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java @@ -148,6 +148,8 @@ public class TelemetryDataDispatcher { service.setResponseCode(metrics.getResponseCode()); service.setType(protocol2Type(metrics.getProtocol())); service.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + service.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + service.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(service); } @@ -170,6 +172,8 @@ public class TelemetryDataDispatcher { serviceRelation.setComponentId(protocol2Component(metrics.getProtocol())); serviceRelation.setTlsMode(metrics.getTlsMode()); serviceRelation.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + serviceRelation.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + serviceRelation.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(serviceRelation); } @@ -186,6 +190,8 @@ public class TelemetryDataDispatcher { serviceInstance.setResponseCode(metrics.getResponseCode()); serviceInstance.setType(protocol2Type(metrics.getProtocol())); serviceInstance.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + serviceInstance.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + serviceInstance.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(serviceInstance); } @@ -208,6 +214,8 @@ public class TelemetryDataDispatcher { serviceRelation.setComponentId(protocol2Component(metrics.getProtocol())); serviceRelation.setTlsMode(metrics.getTlsMode()); serviceRelation.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + serviceRelation.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + serviceRelation.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(serviceRelation); } @@ -234,6 +242,8 @@ public class TelemetryDataDispatcher { return RequestType.gRPC; case HTTP: return RequestType.HTTP; + case TCP: + return RequestType.TCP; case UNRECOGNIZED: default: return RequestType.RPC; @@ -248,6 +258,9 @@ public class TelemetryDataDispatcher { case HTTP: // HTTP in component-libraries.yml return 49; + case TCP: + // TCP in component-libraries.yml + return 110; case UNRECOGNIZED: default: // RPC in component-libraries.yml diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java index ec69955f4a..65c5d02cd8 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java @@ -49,7 +49,9 @@ public class SQLExecutor implements InsertRequest, UpdateRequest { preparedStatement.setObject(i + 1, param.get(i)); } - LOGGER.debug("execute sql in batch: {}", sql); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param); + } preparedStatement.execute(); } } diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java index a3b89bcd71..d9b6dc462e 100644 --- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java +++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java @@ -154,9 +154,6 @@ public class ALSE2E extends SkyWalkingTestAdapter { LOGGER.info("instances: {}", instances); String file = "expected/als/instances.yml"; - if (service.getLabel().equals("e2e::reviews")) { - file = "expected/als/instances-reviews.yml"; - } load(file).as(InstancesMatcher.class).verify(instances); return instances; diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/services.yml b/test/e2e/e2e-test/src/test/resources/expected/als/services.yml index 37f76cfd76..5d2d4b410d 100644 --- a/test/e2e/e2e-test/src/test/resources/expected/als/services.yml +++ b/test/e2e/e2e-test/src/test/resources/expected/als/services.yml @@ -24,3 +24,5 @@ services: label: e2e::details - key: not null label: e2e::istio-ingressgateway + - key: not null + label: e2e::mongodb -- GitLab