diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml index 769db83def6150ee09777aa1d3e103e5500c4852..9378a9cda5cee3e84424c658eb1cfe48291b4437 100644 --- a/.github/workflows/codeql.yaml +++ b/.github/workflows/codeql.yaml @@ -45,7 +45,7 @@ jobs: with: languages: ${{ matrix.language }} - - run: ./mvnw -Dmaven.test.skip=true clean install + - run: ./mvnw -Dmaven.test.skip=true clean install || ./mvnw -Dmaven.test.skip=true clean install - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml index 252eda82d08faf302b94221f56c91ae1cabaac0f..c0ee36fbbf24e41f665d403322f0a89d47c7d9ef 100644 --- a/.github/workflows/e2e.istio.yaml +++ b/.github/workflows/e2e.istio.yaml @@ -100,6 +100,7 @@ jobs: --set elasticsearch.minimumMasterNodes=1 \ --set elasticsearch.imageTag=7.5.1 \ --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=${{ matrix.analyzer }} \ + --set oap.env.SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=${{ matrix.analyzer }} \ --set oap.env.K8S_SERVICE_NAME_RULE='e2e::${service.metadata.name}' \ --set oap.envoy.als.enabled=true \ --set oap.replicas=1 \ @@ -117,7 +118,13 @@ jobs: - name: Deploy demo services if: env.SKIP_CI != 'true' - run: bash ${SCRIPTS_DIR}/demo.sh + run: | + bash ${SCRIPTS_DIR}/demo.sh + # Enable TCP services + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/platform/kube/bookinfo-ratings-v2.yaml + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/platform/kube/bookinfo-db.yaml + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/networking/destination-rule-all.yaml + kubectl apply -f https://raw.githubusercontent.com/istio/istio/$ISTIO_VERSION/samples/bookinfo/networking/virtual-service-ratings-db.yaml - name: Cluster Info if: ${{ failure() }} diff --git a/CHANGES.md b/CHANGES.md index c6bc4d69bdf0cfa51098cd11fa6acf76a2caeae8..037e945efebfc1f0199e6a269948ffca57d9026d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -34,6 +34,7 @@ Release Notes. * CVE: fix Jetty vulnerability. https://nvd.nist.gov/vuln/detail/CVE-2019-17638 * Fix: MAL function would miss samples name after creating new samples. * perf: use iterator.remove() to remove modulesWithoutProvider +* Support analyzing Envoy TCP access logs. #### UI * Add logo for kong plugin. diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index 9a689b0188cbdc7bd8d6ddd99b4ad5283e82fe88..7da226cfced7fa4eb91c6528e8c30827288531a0 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit 9a689b0188cbdc7bd8d6ddd99b4ad5283e82fe88 +Subproject commit 7da226cfced7fa4eb91c6528e8c30827288531a0 diff --git a/docs/en/concepts-and-designs/scope-definitions.md b/docs/en/concepts-and-designs/scope-definitions.md index ede1252af3729eee5ca3aa67b429e25b82860a9c..cc7518ff8e7d11e4b098c5bb5bdc0d29b3008d9b 100644 --- a/docs/en/concepts-and-designs/scope-definitions.md +++ b/docs/en/concepts-and-designs/scope-definitions.md @@ -30,6 +30,8 @@ This calculates the metrics data from each request of the service. | type | The type of each request. Such as: Database, HTTP, RPC, gRPC. | | enum | | tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List` | | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `ServiceInstance` @@ -47,6 +49,8 @@ This calculates the metrics data from each request of the service instance. | type | The type of each request, such as Database, HTTP, RPC, or gRPC. | | enum | | tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List` | | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | #### Secondary scopes of `ServiceInstance` @@ -120,6 +124,8 @@ This calculates the metrics data from each request of the endpoint in the servic | type | The type of each request, such as Database, HTTP, RPC, or gRPC. | | enum | | tags | The labels of each request. Each value is made up by `TagKey:TagValue` in the segment. | | `List` | | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `ServiceRelation` @@ -142,7 +148,8 @@ This calculates the metrics data from each request between services. | detectPoint | Where the relation is detected. The value may be client, server, or proxy. | yes | enum| | tlsMode | The TLS mode between source and destination services, such as `service_relation_mtls_cpm = from(ServiceRelation.*).filter(tlsMode == "mTLS").cpm()` || string| | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| - +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `ServiceInstanceRelation` @@ -165,6 +172,8 @@ This calculates the metrics data from each request between service instances. | detectPoint | Where the relation is detected. The value may be client, server, or proxy. | yes | enum| | tlsMode | The TLS mode between source and destination service instances, such as `service_instance_relation_mtls_cpm = from(ServiceInstanceRelation.*).filter(tlsMode == "mTLS").cpm()` || string| | sideCar.internalErrorCode | The sidecar/gateway proxy internal error code. The value is based on the implementation. | | string| +| tcpInfo.receivedBytes | The received bytes of the TCP traffic, if this request is a TCP call. | | long | +| tcpInfo.sentBytes | The sent bytes of the TCP traffic, if this request is a TCP call. | | long | ### SCOPE `EndpointRelation` diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index 5d8746ec34a53f6a601ab3bfa9fe33c10213b05e..67100d39f8ab37d936379922b2f925fd59bcea36 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -191,7 +191,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | service-mesh| default| Read [receiver doc](backend-receivers.md) for more details | - | - | | envoy-metric| default| Read [receiver doc](backend-receivers.md) for more details | - | - | | - | - | acceptMetricsService | Open Envoy Metrics Service analysis | SW_ENVOY_METRIC_SERVICE | true| -| - | - | alsHTTPAnalysis | Open Envoy Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS | - | +| - | - | alsHTTPAnalysis | Open Envoy HTTP Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS | - | +| - | - | alsTCPAnalysis | Open Envoy TCP Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_TCP_ANALYSIS | - | | - | - | k8sServiceNameRule | `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata, the available variables are `pod`, `service`, e.g., you can use `${service.metadata.name}-${pod.metadata.labels.version}` to append the version number to the service name. Be careful, when using environment variables to pass this configuration, use single quotes(`''`) to avoid it being evaluated by the shell. | - | | receiver-otel | default | Read [receiver doc](backend-receivers.md) for more details | - | - | | - | - | enabledHandlers|Enabled handlers for otel| SW_OTEL_RECEIVER_ENABLED_HANDLERS | - | diff --git a/docs/en/setup/envoy/als_setting.md b/docs/en/setup/envoy/als_setting.md index bc22a7abd4443fffb1ce44a3c92bca83b636ceed..c23298bb28e528a6900cf991d4d1f759bb63940b 100644 --- a/docs/en/setup/envoy/als_setting.md +++ b/docs/en/setup/envoy/als_setting.md @@ -1,6 +1,6 @@ # Observe Service Mesh through ALS -[Envoy Access Log Service (ALS)](https://www.envoyproxy.io/docs/envoy/latest/api-v2/service/accesslog/v2/als.proto) provides +[Envoy Access Log Service (ALS)](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/service/accesslog/v2/als.proto) provides full logs about RPC routed, including HTTP and TCP. ## Background @@ -29,8 +29,9 @@ On Istio version 1.6.0+, if Istio is installed with [`demo` profile](https://ist - Activate SkyWalking [Envoy Receiver](../backend/backend-receivers.md). This is activated by default. -- Choose an ALS analyzer. There are two available analyzers, `k8s-mesh` and `mx-mesh`. - Set the system environment variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh` +- Choose an ALS analyzer. There are two available analyzers, `k8s-mesh` and `mx-mesh` for both HTTP access logs and TCP access logs. + Set the system environment variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** and **SW_ENVOY_METRIC_ALS_TCP_ANALYSIS** + such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=mx-mesh`, `SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=mx-mesh` or in the `application.yaml` to activate the analyzer. For more about the analyzers, see [SkyWalking ALS Analyzers](#skywalking-als-analyzers) ```yaml @@ -39,6 +40,7 @@ On Istio version 1.6.0+, if Istio is installed with [`demo` profile](https://ist default: acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true} alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""} # Setting the system env variable would override this. + alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""} ``` To use multiple analyzers as a fallbackļ¼Œplease use `,` to concatenate. @@ -61,12 +63,13 @@ helm repo add elastic https://helm.elastic.co helm dep up skywalking helm install 8.1.0 skywalking -n istio-system \ - --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \ + --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=mx-mesh \ + --set oap.env.SW_ENVOY_METRIC_ALS_TCP_ANALYSIS=mx-mesh \ --set fullnameOverride=skywalking \ --set oap.envoy.als.enabled=true ``` -You can use `kubectl -n istio-system logs -l app=skywalking | grep "K8sALSServiceMeshHTTPAnalysis"` to ensure OAP ALS `k8s-mesh` analyzer has been activated. +You can use `kubectl -n istio-system logs -l app=skywalking | grep "K8sALSServiceMeshHTTPAnalysis"` to ensure OAP ALS `mx-mesh` analyzer has been activated. ## SkyWalking ALS Analyzers diff --git a/docs/en/setup/envoy/examples/metrics/README.md b/docs/en/setup/envoy/examples/metrics/README.md index 0f2465725479071d6c90f23e82d65fb32b88fd20..dddd2c57a1018d44db0bd1d290662799efcdcd21 100644 --- a/docs/en/setup/envoy/examples/metrics/README.md +++ b/docs/en/setup/envoy/examples/metrics/README.md @@ -1,7 +1,7 @@ # Sending Envoy Metrics to SkyWalking OAP Server Example This is an example of sending [Envoy Stats](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/observability/statistics#arch-overview-statistics) to SkyWalking OAP server -through [Metric Service](https://www.envoyproxy.io/docs/envoy/latest/api-v2/config/metrics/v2/metrics_service.proto). +through [Metric Service](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/config/metrics/v2/metrics_service.proto). ## Running the example diff --git a/docs/en/setup/envoy/metrics_service_setting.md b/docs/en/setup/envoy/metrics_service_setting.md index bc3babcf57a3fd7cf4d8f7b57f2f2347d43d51d3..8750de9844e59f42698f08b9bd622163859efd5e 100644 --- a/docs/en/setup/envoy/metrics_service_setting.md +++ b/docs/en/setup/envoy/metrics_service_setting.md @@ -6,15 +6,15 @@ SkyWalking has a built-in receiver that implements this protocol so that you can As an APM system, SkyWalking does not only receive and store the metrics emitted by Envoy, it also analyzes the topology of services and service instances. **Attention:** There are two versions of Envoy metrics service protocol up to date, -[v2](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) and -[v3](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/metrics/v3/metrics_service.proto), SkyWalking (8.3.0+) supports both of them. +[v2](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) and +[v3](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v3/config/metrics/v3/metrics_service.proto), SkyWalking (8.3.0+) supports both of them. ## Configure Envoy to send metrics to SkyWalking without Istio Envoy can be used with / without Istio's control. This section introduces how to configure the standalone Envoy to send the metrics to SkyWalking. In order to let Envoy send metrics to SkyWalking, we need to feed Envoy with a configuration which contains `stats_sinks` that includes `envoy.metrics_service`. -This `envoy.metrics_service` should be configured as a [`config.grpc_service`](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) entry. +This `envoy.metrics_service` should be configured as a [`config.grpc_service`](https://www.envoyproxy.io/docs/envoy/v1.18.2/api-v2/api/v2/core/grpc_service.proto#envoy-api-msg-core-grpcservice) entry. The interesting parts of the config is shown in the config below: diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 239858b00b4f08c6e824778cd672db2c41331329..7e5ed87ca159f6c8ba1060e650c47f00b42b1ad9 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -327,6 +327,7 @@ envoy-metric: default: acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true} alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""} + alsTCPAnalysis: ${SW_ENVOY_METRIC_ALS_TCP_ANALYSIS:""} # `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata, # the available variables are `pod`, `service`, f.e., you can use `${service.metadata.name}-${pod.metadata.labels.version}` # to append the version number to the service name. diff --git a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml index ddcd3e9aed89c8af575003eb1cab9e7ca71a9bc3..73d8012de869f5301bed157f24ab3e8cfcce90a9 100755 --- a/oap-server/server-bootstrap/src/main/resources/component-libraries.yml +++ b/oap-server/server-bootstrap/src/main/resources/component-libraries.yml @@ -359,6 +359,9 @@ seata: MyBatis: id: 109 languages: Java +tcp: + id: 110 + languages: Java # .NET/.NET Core components # [3000, 4000) for C#/.NET only diff --git a/oap-server/server-bootstrap/src/main/resources/oal/core.oal b/oap-server/server-bootstrap/src/main/resources/oal/core.oal index e02934de67d1e08ef04eaebcd7d373d78eaa6fa4..16dc46d17d09a07be87f6726b9fe797b8984a186 100755 --- a/oap-server/server-bootstrap/src/main/resources/oal/core.oal +++ b/oap-server/server-bootstrap/src/main/resources/oal/core.oal @@ -16,6 +16,9 @@ * */ +// For services using protocols HTTP 1/2, gRPC, RPC, etc., the cpm metrics means "calls per minute", +// for services that are built on top of TCP, the cpm means "packages per minute". + // All scope metrics all_percentile = from(All.latency).percentile(10); // Multiple values including p50, p75, p90, p95, p99 all_heatmap = from(All.latency).histogram(100, 20); diff --git a/oap-server/server-bootstrap/src/main/resources/oal/tcp.oal b/oap-server/server-bootstrap/src/main/resources/oal/tcp.oal new file mode 100644 index 0000000000000000000000000000000000000000..006abd1e5801175e305302dd082421e9b3cb0d42 --- /dev/null +++ b/oap-server/server-bootstrap/src/main/resources/oal/tcp.oal @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// TCP services' metrics +service_throughput_received = from(Service.tcpInfo.receivedBytes).filter(type == RequestType.TCP).longAvg(); +service_throughput_sent = from(Service.tcpInfo.sentBytes).filter(type == RequestType.TCP).longAvg(); +service_relation_client_received = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_relation_client_sent = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_relation_server_received = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); +service_relation_server_sent = from(ServiceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); + +service_instance_throughput_received = from(ServiceInstance.tcpInfo.receivedBytes).filter(type == RequestType.TCP).longAvg(); +service_instance_throughput_sent = from(ServiceInstance.tcpInfo.sentBytes).filter(type == RequestType.TCP).longAvg(); +service_instance_relation_client_received = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_instance_relation_client_sent = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.CLIENT).longAvg(); +service_instance_relation_server_received = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); +service_instance_relation_server_sent = from(ServiceInstanceRelation.tcpInfo.sentBytes).filter(type == RequestType.TCP).filter(detectPoint == DetectPoint.SERVER).longAvg(); diff --git a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml index fa418982335cd43ca1b6a06f92dba6b93650a75b..901bd88538ba5dc7e47dc3fd3a77dc74f4cbdc11 100644 --- a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml +++ b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/apm.yml @@ -45,7 +45,8 @@ templates: "queryMetricType": "sortMetrics", "chartType": "ChartSlow", "parentService": false, - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { "width": 3, @@ -168,7 +169,8 @@ templates: "metricName": "service_cpm", "queryMetricType": "readMetricsValue", "chartType": "ChartNum", - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { "width": 3, @@ -221,10 +223,24 @@ templates: "metricName": "service_cpm", "queryMetricType": "readMetricsValues", "chartType": "ChartLine", - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { - "width": "4", + "width": 3, + "title": "Service Throughput", + "height": "200", + "entityType": "Service", + "independentSelector": false, + "metricType": "REGULAR_VALUE", + "metricName": "service_throughput_received,service_throughput_sent", + "queryMetricType": "readMetricsValues", + "chartType": "ChartLine", + "unit": "Bytes", + "tips": "This metrics is only avaible for TCP services" + }, + { + "width": "3", "title": "Service Instances Load", "height": "280", "entityType": "ServiceInstance", @@ -234,10 +250,11 @@ templates: "queryMetricType": "sortMetrics", "chartType": "ChartSlow", "parentService": true, - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { - "width": "4", + "width": "3", "title": "Slow Service Instance", "height": "280", "entityType": "ServiceInstance", @@ -250,7 +267,7 @@ templates: "unit": "ms" }, { - "width": "4", + "width": "3", "title": "Service Instance Successful Rate", "height": "280", "entityType": "ServiceInstance", @@ -271,21 +288,34 @@ templates: "name": "Instance", "children": [ { - "width": "4", + "width": "3", "title": "Service Instance Load", - "height": "150", + "height": "200", "entityType": "ServiceInstance", "independentSelector": false, "metricType": "REGULAR_VALUE", "metricName": "service_instance_cpm", "queryMetricType": "readMetricsValues", "chartType": "ChartLine", - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { - "width": "4", + "width": 3, + "title": "Service Instance Throughput", + "height": "200", + "entityType": "ServiceInstance", + "independentSelector": false, + "metricType": "REGULAR_VALUE", + "metricName": "service_instance_throughput_received,service_instance_throughput_sent", + "queryMetricType": "readMetricsValues", + "chartType": "ChartLine", + "unit": "Bytes" + }, + { + "width": "3", "title": "Service Instance Successful Rate", - "height": "150", + "height": "200", "entityType": "ServiceInstance", "independentSelector": false, "metricType": "REGULAR_VALUE", @@ -297,9 +327,9 @@ templates: "aggregationNum": "100" }, { - "width": "4", + "width": "3", "title": "Service Instance Latency", - "height": "150", + "height": "200", "entityType": "ServiceInstance", "independentSelector": false, "metricType": "REGULAR_VALUE", @@ -434,7 +464,8 @@ templates: "queryMetricType": "sortMetrics", "chartType": "ChartSlow", "parentService": true, - "unit": "CPM - calls per minute" + "unit": "CPM / PPM", + "tips": "For HTTP 1/2, gRPC, RPC services, this means Calls Per Minute (CPM), for TCP services, this means Packets Per Minute (PPM)" }, { "width": "4", @@ -525,4 +556,4 @@ templates: # False means providing a basic template, user needs to add it manually. activated: true # True means wouldn't show up on the dashboard. Only keeps the definition in the storage. - disabled: false \ No newline at end of file + disabled: false diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java index 020dd8f4b7fe70cfc460f75416f0ed7745c21de1..b33cd0717fab5673b657d243389237808b8b4032 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALDefine.java @@ -33,11 +33,17 @@ import static java.util.Objects.requireNonNull; public abstract class OALDefine { protected OALDefine(final String configFile, final String sourcePackage) { + this(configFile, sourcePackage, sourcePackage); + } + + protected OALDefine(final String configFile, + final String sourcePackage, + final String dispatcherPackage) { this.configFile = requireNonNull(configFile); this.sourcePackage = appendPoint(requireNonNull(sourcePackage)); this.dynamicMetricsClassPackage = appendPoint(sourcePackage + ".oal.rt.metrics"); this.dynamicMetricsBuilderClassPackage = appendPoint(sourcePackage + ".oal.rt.metrics.builder"); - this.dynamicDispatcherClassPackage = appendPoint(sourcePackage + ".oal.rt.dispatcher"); + this.dynamicDispatcherClassPackage = appendPoint(dispatcherPackage + ".oal.rt.dispatcher"); } private final String configFile; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java index 3298da09870b215b6f753a68b54f798c5d0be39d..6eea8656cabe1d4210868472ff2e7d79ebbaa3c6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/RequestType.java @@ -29,5 +29,6 @@ public enum RequestType { /** * Logic request only. */ - LOGIC + LOGIC, + TCP } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java index 3700271b3a6fda4e6cc8ecca874cdd0f7f94a2ff..f140e70f8f3b0712b2b245e1bda42d72cf218039 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Service.java @@ -72,4 +72,7 @@ public class Service extends Source { @Getter @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java index df882c18f4ecd377e6b1231f1b7343aa6eb18bd9..a4070bde7b0e3afaa156d3101dcc82738d7e864a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstance.java @@ -76,6 +76,10 @@ public class ServiceInstance extends Source { @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); + @Override public void prepare() { serviceId = IDManager.ServiceID.buildId(serviceName, nodeType); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java index d5f93fe492ce462b31ec07258293062b28e06f17..775561a23ff5f8c0128c0c2fe2f6cee3da1aef7c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceInstanceRelation.java @@ -105,6 +105,9 @@ public class ServiceInstanceRelation extends Source { @Getter @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); @Override public void prepare() { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java index ca19163c526384d03d1b922e4d86a486a6d1c9f1..9b13d71cc422e9658099227794faffb232363dd3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java @@ -99,6 +99,9 @@ public class ServiceRelation extends Source { @Getter @Setter private SideCar sideCar = new SideCar(); + @Getter + @Setter + private TCPInfo tcpInfo = new TCPInfo(); @Override public void prepare() { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TCPInfo.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TCPInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..26539e591131a776a766281a128421fb4ee40f01 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TCPInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.source; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@NoArgsConstructor +@AllArgsConstructor +public class TCPInfo { + @Getter + @Setter + private long receivedBytes; + + @Getter + @Setter + private long sentBytes; +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java index bce63fb0effe7a44143bc2b86b664ac32de5f3eb..f3bc5ef23095ed751943ffd8caf1ac68b4160915 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.receiver.envoy; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse; @@ -32,6 +33,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; @@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory; public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class); private final List envoyHTTPAnalysisList; + private final List envoyTCPAnalysisList; private final CounterMetrics counter; private final HistogramMetrics histogram; @@ -51,6 +54,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { ServiceLoader alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class); + ServiceLoader alsTcpAnalyzers = ServiceLoader.load(TCPAccessLogAnalyzer.class); envoyHTTPAnalysisList = new ArrayList<>(); for (String httpAnalysisName : config.getAlsHTTPAnalysis()) { for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) { @@ -60,8 +64,17 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS } } } + envoyTCPAnalysisList = new ArrayList<>(); + for (String analyzerName : config.getAlsTCPAnalysis()) { + for (TCPAccessLogAnalyzer tcpAnalyzer : alsTcpAnalyzers) { + if (analyzerName.equals(tcpAnalyzer.name())) { + tcpAnalyzer.init(manager, config); + envoyTCPAnalysisList.add(tcpAnalyzer); + } + } + } - LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList); + LOGGER.debug("envoy HTTP analysis: {}, envoy TCP analysis: {}", envoyHTTPAnalysisList, envoyTCPAnalysisList); MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); counter = metricCreator.createCounter( @@ -110,11 +123,11 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS .getId(), role, logCase, message); } + List sourceResult = new ArrayList<>(); switch (logCase) { case HTTP_LOGS: StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs(); - List sourceResult = new ArrayList<>(); for (final HTTPAccessLogEntry log : logs.getLogEntryList()) { List result = new ArrayList<>(); for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { @@ -123,10 +136,22 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS sourceResult.addAll(result); } - sourceDispatcherCounter.inc(sourceResult.size()); - sourceResult.forEach(TelemetryDataDispatcher::process); + break; + case TCP_LOGS: + StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs(); + + for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) { + List result = new ArrayList<>(); + for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) { + result = analyzer.analysis(result, identifier, tcpLog, role); + } + sourceResult.addAll(result); + } + break; } + sourceDispatcherCounter.inc(sourceResult.size()); + sourceResult.forEach(TelemetryDataDispatcher::process); } finally { timer.finish(); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java index 94f8c458381eec3355279a0ba2a3a52cdf78e1b6..925a820f4a997a3136072235bdbf8fb0145f744e 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java @@ -33,6 +33,7 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig { @Getter private boolean acceptMetricsService = false; private String alsHTTPAnalysis; + private String alsTCPAnalysis; // TODO: add to doc @Getter private String k8sServiceNameRule; @@ -45,6 +46,13 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig { return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList()); } + public List getAlsTCPAnalysis() { + if (Strings.isNullOrEmpty(alsTCPAnalysis)) { + return Collections.emptyList(); + } + return Arrays.stream(alsTCPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList()); + } + public List rules() throws ModuleStartException { return Rules.loadRules("envoy-metrics-rules", Collections.singletonList("envoy")); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java index 4098cbf626dbb5501e72a960c7b99b6d91fb3411..2678f87dfd3be1301fc64f592410af3e330ead99 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.receiver.envoy; import org.apache.skywalking.aop.server.receiver.mesh.MeshReceiverModule; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; @@ -65,6 +66,13 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { + if (!config.getAlsTCPAnalysis().isEmpty()) { + getManager().find(CoreModule.NAME) + .provider() + .getService(OALEngineLoaderService.class) + .load(TCPOALDefine.INSTANCE); + } + GRPCHandlerRegister service = getManager().find(SharingServerModule.NAME) .provider() .getService(GRPCHandlerRegister.class); diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/TCPOALDefine.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/TCPOALDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..93ca89967a43e68c66951bf9e70aac4e12c8d93f --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/TCPOALDefine.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.receiver.envoy; + +import org.apache.skywalking.oap.server.core.oal.rt.OALDefine; + +/** + * OAL rules to calculate TCP-specific metrics. + */ +public class TCPOALDefine extends OALDefine { + public static final TCPOALDefine INSTANCE = new TCPOALDefine(); + + private TCPOALDefine() { + super( + "oal/tcp.oal", + "org.apache.skywalking.oap.server.core.source", + "org.apache.skywalking.oap.server.core.source.tcp" + ); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java index 7058e30a7b682170c97ca48377c9f7a2dce43955..ce69eaa5d8ec919aef806301f5395d73b814404d 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java @@ -19,38 +19,9 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; -import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; -import java.util.List; -import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; -import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.apache.skywalking.oap.server.library.module.ModuleStartException; -import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; /** * Analysis source metrics from ALS */ -public interface ALSHTTPAnalysis { - String name(); - - void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException; - - /** - * The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one. - * - * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty. - * - * @param result of the previous analyzer. - * @param identifier of the Envoy node where the logs are emitted. - * @param entry the log entry. - * @param role the role of the Envoy node where the logs are emitted. - * @return the analysis results. - */ - List analysis( - final List result, - final StreamAccessLogsMessage.Identifier identifier, - final HTTPAccessLogEntry entry, - final Role role - ); - - Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev); +public interface ALSHTTPAnalysis extends AccessLogAnalyzer { } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java index 0ccc14769aff2e982ea7dc71d1adfdc6296c93bb..2bb71afd39e726721d6b8a3d05f7dbfd23437ba4 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java @@ -18,33 +18,13 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; -import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; -import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; @Slf4j public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis { - @Override - public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) { - if (alsIdentifier == null) { - return defaultRole; - } - if (!alsIdentifier.hasNode()) { - return defaultRole; - } - final Node node = alsIdentifier.getNode(); - final String id = node.getId(); - if (id.startsWith("router~")) { - return Role.PROXY; - } else if (id.startsWith("sidecar~")) { - return Role.SIDECAR; - } - return defaultRole; - } - /** * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. * diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java new file mode 100644 index 0000000000000000000000000000000000000000..4dcf883f4898aac5c84cf9864ba3c07da21064ef --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; +import java.util.List; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; + +public interface AccessLogAnalyzer { + String name(); + + void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException; + + /** + * The method works as a chain of analyzers. Logs are processed sequentially by analyzers one by one, the results of the previous analyzer are passed into the current one. + * + * To do fast-success, the analyzer could simply check the results of the previous analyzer and return if not empty. + * + * @param result of the previous analyzer. + * @param identifier of the Envoy node where the logs are emitted. + * @param entry the log entry. + * @param role the role of the Envoy node where the logs are emitted. + * @return the analysis results. + */ + List analysis( + final List result, + final StreamAccessLogsMessage.Identifier identifier, + final E entry, + final Role role + ); + + default Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role defaultRole) { + if (alsIdentifier == null) { + return defaultRole; + } + if (!alsIdentifier.hasNode()) { + return defaultRole; + } + final Node node = alsIdentifier.getNode(); + final String id = node.getId(); + if (id.startsWith("router~")) { + return Role.PROXY; + } else if (id.startsWith("sidecar~")) { + return Role.SIDECAR; + } + return defaultRole; + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java index fdeb0422d097b89503fec5412603e98c915fe431..7b6f8fbf92446dce939ccb15bfd7817962262b48 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java @@ -135,15 +135,15 @@ public class LogEntry2MetricsAdapter { return method + ":" + request.getPath(); } - protected static long formatAsLong(final Timestamp timestamp) { + public static long formatAsLong(final Timestamp timestamp) { return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); } - protected static long formatAsLong(final Duration duration) { + public static long formatAsLong(final Duration duration) { return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); } - protected static Protocol requestProtocol(final HTTPRequestProperties request) { + public static Protocol requestProtocol(final HTTPRequestProperties request) { if (request == null) { return Protocol.HTTP; } @@ -154,7 +154,7 @@ public class LogEntry2MetricsAdapter { return Protocol.gRPC; } - protected static String parseTLS(final TLSProperties properties) { + public static String parseTLS(final TLSProperties properties) { if (properties == null) { return NON_TLS; } @@ -175,7 +175,7 @@ public class LogEntry2MetricsAdapter { * @param responseFlags in the ALS v2 * @return empty string if no internal error code, or literal string representing the code. */ - protected static String parseInternalErrorCode(final ResponseFlags responseFlags) { + public static String parseInternalErrorCode(final ResponseFlags responseFlags) { if (responseFlags != null) { if (responseFlags.getFailedLocalHealthcheck()) { return "failed_local_healthcheck"; diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java index 158cd2043e434199ff904bf9fb31637afd965b6a..f57fa66056cf43e56e725023f1d681ccd80d118a 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java @@ -214,17 +214,13 @@ public class K8SServiceRegistry { } protected void addPod(final V1Pod pod) { - ofNullable(pod.getStatus()).ifPresent( - status -> ipPodMap.put(status.getPodIP(), pod) - ); + ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(podIP -> ipPodMap.put(podIP, pod)); recompose(); } protected void removePod(final V1Pod pod) { - ofNullable(pod.getStatus()).ifPresent( - status -> ipPodMap.remove(status.getPodIP()) - ); + ofNullable(pod.getStatus()).flatMap(status -> ofNullable(status.getPodIP())).ifPresent(ipPodMap::remove); } protected void addEndpoints(final V1Endpoints endpoints) { @@ -239,7 +235,7 @@ public class K8SServiceRegistry { ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach( subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach( - address -> ipServiceMap.put(address.getIp(), namespace + ":" + name) + address -> ofNullable(address.getIp()).ifPresent(ip -> ipServiceMap.put(ip, namespace + ":" + name)) )) )); @@ -249,7 +245,7 @@ public class K8SServiceRegistry { protected void removeEndpoints(final V1Endpoints endpoints) { ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach( subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach( - address -> ipServiceMap.remove(address.getIp()) + address -> ofNullable(address.getIp()).ifPresent(ipServiceMap::remove) )) )); } @@ -264,7 +260,7 @@ public class K8SServiceRegistry { .collect(Collectors.toList()); } - protected ServiceMetaInfo findService(final String ip) { + public ServiceMetaInfo findService(final String ip) { final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip); if (isNull(service)) { log.debug("Unknown ip {}, ip -> service is null", ip); @@ -311,7 +307,7 @@ public class K8SServiceRegistry { }); } - protected boolean isEmpty() { + public boolean isEmpty() { return ipServiceMetaInfoMap.isEmpty(); } } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java new file mode 100644 index 0000000000000000000000000000000000000000..05165094ba0a107cabf6c007df9f054bfaed79ba --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/AbstractTCPAccessLogAnalyzer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp; + +import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +@Slf4j +public abstract class AbstractTCPAccessLogAnalyzer implements TCPAccessLogAnalyzer { + + /** + * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. + * + * @param entry the access log entry that is to be adapted from. + * @param sourceService the source service. + * @param targetService the target/destination service. + * @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. + */ + protected TCPLogEntry2MetricsAdapter newAdapter( + final TCPAccessLogEntry entry, + final ServiceMetaInfo sourceService, + final ServiceMetaInfo targetService) { + return new TCPLogEntry2MetricsAdapter(entry, sourceService, targetService); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java new file mode 100644 index 0000000000000000000000000000000000000000..c073b82f9a9d58b7630f6e580e9c33746509e21f --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPAccessLogAnalyzer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp; + +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer; + +public interface TCPAccessLogAnalyzer extends AccessLogAnalyzer { +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..e7d60d1dd5b2a07860db5de1f6f10df213449da1 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp; + +import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v3.ConnectionProperties; +import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.apm.network.common.v3.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.apm.network.servicemesh.v3.TCPInfo; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +import static org.apache.skywalking.apm.util.StringUtil.isBlank; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.formatAsLong; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseInternalErrorCode; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.parseTLS; + +/** + * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders. + */ +@RequiredArgsConstructor +public class TCPLogEntry2MetricsAdapter { + + /** + * The access log entry that is to be adapted into metrics builders. + */ + protected final TCPAccessLogEntry entry; + + protected final ServiceMetaInfo sourceService; + + protected final ServiceMetaInfo targetService; + + /** + * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}. + * + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public ServiceMeshMetric.Builder adaptToDownstreamMetrics() { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getStartTime()); + final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + + return adaptCommonPart() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setLatency((int) Math.max(1L, duration)) + .setDetectPoint(DetectPoint.server); + } + + /** + * Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}. + * + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public ServiceMeshMetric.Builder adaptToUpstreamMetrics() { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getStartTime()); + final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); + final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); + + return adaptCommonPart() + .setStartTime(outboundStartTime) + .setEndTime(outboundEndTime) + .setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime)) + .setDetectPoint(DetectPoint.client); + } + + protected ServiceMeshMetric.Builder adaptCommonPart() { + final AccessLogCommon properties = entry.getCommonProperties(); + final ConnectionProperties connectionProperties = entry.getConnectionProperties(); + final String tlsMode = parseTLS(properties.getTlsProperties()); + final String internalErrorCode = parseInternalErrorCode(properties.getResponseFlags()); + + final ServiceMeshMetric.Builder builder = + ServiceMeshMetric.newBuilder() + .setTlsMode(tlsMode) + .setProtocol(Protocol.TCP) + .setStatus(isBlank(internalErrorCode)) + .setTcp( + TCPInfo.newBuilder() + .setReceivedBytes(connectionProperties.getReceivedBytes()) + .setSentBytes(connectionProperties.getSentBytes()) + ) + .setInternalErrorCode(internalErrorCode); + + Optional.ofNullable(sourceService) + .map(ServiceMetaInfo::getServiceName) + .ifPresent(builder::setSourceServiceName); + Optional.ofNullable(sourceService) + .map(ServiceMetaInfo::getServiceInstanceName) + .ifPresent(builder::setSourceServiceInstance); + Optional.ofNullable(targetService) + .map(ServiceMetaInfo::getServiceName) + .ifPresent(builder::setDestServiceName); + Optional.ofNullable(targetService) + .map(ServiceMetaInfo::getServiceInstanceName) + .ifPresent(builder::setDestServiceInstance); + + return builder; + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java new file mode 100644 index 0000000000000000000000000000000000000000..c3a8f363d6f884e90bd71153801f39f64f2d5e8a --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s; + +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SServiceRegistry; +import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer; + +import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS; + +/** + * Analysis log based on ingress and mesh scenarios. + */ +@Slf4j +public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer { + protected K8SServiceRegistry serviceRegistry; + + @Override + public String name() { + return "k8s-mesh"; + } + + @Override + @SneakyThrows + public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) { + serviceRegistry = new K8SServiceRegistry(config); + serviceRegistry.start(); + } + + @Override + public List analysis( + final List result, + final StreamAccessLogsMessage.Identifier identifier, + final TCPAccessLogEntry entry, + final Role role + ) { + if (isNotEmpty(result)) { + return result; + } + if (serviceRegistry.isEmpty()) { + return Collections.emptyList(); + } + switch (role) { + case PROXY: + return analyzeProxy(entry); + case SIDECAR: + return analyzeSideCar(entry); + } + + return Collections.emptyList(); + } + + protected List analyzeSideCar(final TCPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final String cluster = properties.getUpstreamCluster(); + if (cluster == null) { + return Collections.emptyList(); + } + + final List sources = new ArrayList<>(); + + final Address downstreamRemoteAddress = + properties.hasDownstreamDirectRemoteAddress() + ? properties.getDownstreamDirectRemoteAddress() + : properties.getDownstreamRemoteAddress(); + final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); + + if (cluster.startsWith("inbound|")) { + // Server side + final ServiceMeshMetric.Builder metrics; + if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { + // Ingress -> sidecar(server side) + // Mesh telemetry without source, the relation would be generated. + metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics(); + + log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics); + } else { + // sidecar -> sidecar(server side) + metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics(); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics); + } + sources.add(metrics); + } else if (cluster.startsWith("outbound|")) { + // sidecar(client side) -> sidecar + final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); + + final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics(); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + sources.add(metric); + } + + return sources; + } + + protected List analyzeProxy(final TCPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ? + properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress(); + final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) { + return Collections.emptyList(); + } + + final List result = new ArrayList<>(2); + final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); + final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); + + final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); + final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); + + final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics(); + + log.debug("Transformed ingress inbound mesh metric {}", metric); + result.add(metric); + + final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); + final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); + + final ServiceMeshMetric.Builder outboundMetric = + newAdapter(entry, ingress, targetService) + .adaptToUpstreamMetrics() + // Can't parse it from tls properties, leave it to Server side. + .setTlsMode(NON_TLS); + + log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); + result.add(outboundMetric); + + return result; + } + + /** + * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. + */ + protected ServiceMetaInfo find(String ip) { + return serviceRegistry.findService(ip); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java new file mode 100644 index 0000000000000000000000000000000000000000..1f4cdcc317f71591c04836f5c4211ce7b55909ae --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx; + +import com.google.protobuf.Any; +import com.google.protobuf.TextFormat; +import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import org.apache.skywalking.oap.server.receiver.envoy.als.mx.FieldsHelper; +import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter; +import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.AbstractTCPAccessLogAnalyzer; + +import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS; +import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN; +import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.DOWNSTREAM_KEY; +import static org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer.UPSTREAM_KEY; + +@Slf4j +public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyzer { + protected String fieldMappingFile = "metadata-service-mapping.yaml"; + + protected EnvoyMetricReceiverConfig config; + + @Override + public String name() { + return "mx-mesh"; + } + + @Override + public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { + this.config = config; + try { + FieldsHelper.SINGLETON.init(fieldMappingFile, config.serviceMetaInfoFactory().clazz()); + } catch (final Exception e) { + throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e); + } + } + + @Override + public List analysis( + final List result, + final StreamAccessLogsMessage.Identifier identifier, + final TCPAccessLogEntry entry, + final Role role + ) { + if (isNotEmpty(result)) { + return result; + } + if (!entry.hasCommonProperties()) { + return Collections.emptyList(); + } + final AccessLogCommon properties = entry.getCommonProperties(); + final Map stateMap = properties.getFilterStateObjectsMap(); + if (stateMap.isEmpty()) { + return Collections.emptyList(); + } + final ServiceMetaInfo currSvc; + try { + currSvc = adaptToServiceMetaInfo(identifier); + } catch (Exception e) { + log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e); + return Collections.emptyList(); + } + + final AtomicBoolean downstreamExists = new AtomicBoolean(); + stateMap.forEach((key, value) -> { + if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) { + return; + } + final ServiceMetaInfo svc; + try { + svc = adaptToServiceMetaInfo(value); + } catch (Exception e) { + log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray())); + return; + } + final ServiceMeshMetric.Builder metrics; + switch (key) { + case UPSTREAM_KEY: + metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics)); + } + result.add(metrics); + break; + case DOWNSTREAM_KEY: + metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics(); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics)); + } + result.add(metrics); + downstreamExists.set(true); + break; + } + }); + if (role.equals(Role.PROXY) && !downstreamExists.get()) { + final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics(); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric)); + } + result.add(metric); + } + return result; + } + + protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception { + return new ServiceMetaInfoAdapter(value); + } + + protected ServiceMetaInfo adaptToServiceMetaInfo(final StreamAccessLogsMessage.Identifier identifier) throws Exception { + return config.serviceMetaInfoFactory().fromStruct(identifier.getNode().getMetadata()); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java index ffcef3927906c4cda08ba02325caf64cf7be3d33..030fa373e0d49451088ed94ba5908b81c071b826 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java @@ -18,11 +18,16 @@ package org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters; +import com.google.common.base.Splitter; import io.prometheus.client.Metrics; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge; +import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram; import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric; import static java.util.stream.Collectors.toMap; @@ -42,6 +47,17 @@ public class ProtoMetricFamily2MetricsAdapter { .timestamp(adaptTimestamp(it)) .labels(adaptLabels(it)) .build()); + case HISTOGRAM: + return metricFamily.getMetricList() + .stream() + .map(it -> Histogram.builder() + .name(adaptMetricsName(it)) + .labels(adaptLabels(it)) + .sampleCount(it.getHistogram().getSampleCount()) + .sampleSum(it.getHistogram().getSampleSum()) + .buckets(buildBuckets(it.getHistogram().getBucketList())) + .build() + ); default: return Stream.of(); } @@ -49,6 +65,13 @@ public class ProtoMetricFamily2MetricsAdapter { @SuppressWarnings("unused") public String adaptMetricsName(final Metrics.Metric metric) { + if (metricFamily.getName().startsWith("cluster.inbound|")) { + return Splitter.on(".") + .splitToList(metricFamily.getName()) + .stream() + .filter(it -> !it.startsWith("inbound|")) + .collect(Collectors.joining("_")); + } return metricFamily.getName(); } @@ -81,4 +104,12 @@ public class ProtoMetricFamily2MetricsAdapter { return timestamp; } + + private static Map buildBuckets(List buckets) { + Map result = new HashMap<>(); + for (final Metrics.Bucket bucket : buckets) { + result.put(bucket.getUpperBound(), bucket.getCumulativeCount()); + } + return result; + } } diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer similarity index 80% rename from test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml rename to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer index 1abf02cff85d88424b9f7ca1ae4e37f8d6ef1d08..5290deb529c98350be4faaa7f326fc3460417e41 100644 --- a/test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,11 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# +# -instances: - - key: not null - label: not null - - key: not null - label: not null - - key: not null - label: not null +org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis +org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java index c1da14b05f2029b7beec6cea7fc425cdc0d9e1b5..9af5fac86ef51e6bbedcd58ab5069d68051114b0 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java @@ -148,6 +148,8 @@ public class TelemetryDataDispatcher { service.setResponseCode(metrics.getResponseCode()); service.setType(protocol2Type(metrics.getProtocol())); service.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + service.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + service.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(service); } @@ -170,6 +172,8 @@ public class TelemetryDataDispatcher { serviceRelation.setComponentId(protocol2Component(metrics.getProtocol())); serviceRelation.setTlsMode(metrics.getTlsMode()); serviceRelation.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + serviceRelation.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + serviceRelation.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(serviceRelation); } @@ -186,6 +190,8 @@ public class TelemetryDataDispatcher { serviceInstance.setResponseCode(metrics.getResponseCode()); serviceInstance.setType(protocol2Type(metrics.getProtocol())); serviceInstance.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + serviceInstance.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + serviceInstance.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(serviceInstance); } @@ -208,6 +214,8 @@ public class TelemetryDataDispatcher { serviceRelation.setComponentId(protocol2Component(metrics.getProtocol())); serviceRelation.setTlsMode(metrics.getTlsMode()); serviceRelation.getSideCar().setInternalErrorCode(metrics.getInternalErrorCode()); + serviceRelation.getTcpInfo().setReceivedBytes(metrics.getTcp().getReceivedBytes()); + serviceRelation.getTcpInfo().setSentBytes(metrics.getTcp().getSentBytes()); SOURCE_RECEIVER.receive(serviceRelation); } @@ -234,6 +242,8 @@ public class TelemetryDataDispatcher { return RequestType.gRPC; case HTTP: return RequestType.HTTP; + case TCP: + return RequestType.TCP; case UNRECOGNIZED: default: return RequestType.RPC; @@ -248,6 +258,9 @@ public class TelemetryDataDispatcher { case HTTP: // HTTP in component-libraries.yml return 49; + case TCP: + // TCP in component-libraries.yml + return 110; case UNRECOGNIZED: default: // RPC in component-libraries.yml diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java index ec69955f4a8f6d6ccab5b8fc5a405d5d122633a0..65c5d02cd8b06b490885e132fbb1893d996001f7 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java @@ -49,7 +49,9 @@ public class SQLExecutor implements InsertRequest, UpdateRequest { preparedStatement.setObject(i + 1, param.get(i)); } - LOGGER.debug("execute sql in batch: {}", sql); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param); + } preparedStatement.execute(); } } diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java index a3b89bcd711b01f60b0a55022f05d53e4060c460..d9b6dc462ea36b269c75097ab02a9747c098ac11 100644 --- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java +++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java @@ -154,9 +154,6 @@ public class ALSE2E extends SkyWalkingTestAdapter { LOGGER.info("instances: {}", instances); String file = "expected/als/instances.yml"; - if (service.getLabel().equals("e2e::reviews")) { - file = "expected/als/instances-reviews.yml"; - } load(file).as(InstancesMatcher.class).verify(instances); return instances; diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/services.yml b/test/e2e/e2e-test/src/test/resources/expected/als/services.yml index 37f76cfd7654be5985cee50a4a4f64af0554509e..5d2d4b410d445f83c3a7f6466a440054672d14b0 100644 --- a/test/e2e/e2e-test/src/test/resources/expected/als/services.yml +++ b/test/e2e/e2e-test/src/test/resources/expected/als/services.yml @@ -24,3 +24,5 @@ services: label: e2e::details - key: not null label: e2e::istio-ingressgateway + - key: not null + label: e2e::mongodb