diff --git a/CHANGES.md b/CHANGES.md index 995cce823db56f08c9b667bd4cb8ffdecf68f9c1..f1efebd6854476a21a4e3427f8eeac0e744405ac 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,6 +29,7 @@ Release Notes. * Fix possible NullPointerException in agent's ES plugin. * Fix the conversion problem of float type in ConfigInitializer. * Fixed part of the dynamic configuration of ConfigurationDiscoveryService that does not take effect under certain circumstances. +* Support `native-json` format log in kafka-fetcher-plugin. #### OAP-Backend * BugFix: filter invalid Envoy access logs whose socket address is empty. diff --git a/docs/en/protocols/Log-Data-Protocol.md b/docs/en/protocols/Log-Data-Protocol.md index a7fde723040d1bd9da836bdacee8aaaf78015273..565bc97be23efff7342192ec7461a3d5b26f788a 100644 --- a/docs/en/protocols/Log-Data-Protocol.md +++ b/docs/en/protocols/Log-Data-Protocol.md @@ -2,4 +2,44 @@ Report log data via protocol. +## Native Proto Protocol + +Report `native-proto` format log via gRPC. + [gRPC service define](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto) + +## Native Json Protocol + +Report `native-json` format log via kafka. + +Json log record example: +```json +{ + "timestamp":1618161813371, + "service":"Your_ApplicationName", + "serviceInstance":"3a5b8da5a5ba40c0b192e91b5c80f1a8@192.168.1.8", + "traceContext":{ + "traceId":"ddd92f52207c468e9cd03ddd107cd530.69.16181331190470001", + "spanId":"0", + "traceSegmentId":"ddd92f52207c468e9cd03ddd107cd530.69.16181331190470000" + }, + "tags":{ + "data":[ + { + "key":"level", + "value":"INFO" + }, + { + "key":"logger", + "value":"com.example.MyLogger" + } + ] + }, + "body":{ + "text":{ + "text":"log message" + } + } +} +``` + diff --git a/docs/en/setup/backend/backend-fetcher.md b/docs/en/setup/backend/backend-fetcher.md index 4b1dfdf6208a882e0fb55f0d9fb15341b569a00f..605e314f5f2fb374a08835523e499fb2dd5dd980 100644 --- a/docs/en/setup/backend/backend-fetcher.md +++ b/docs/en/setup/backend/backend-fetcher.md @@ -95,8 +95,8 @@ kafka-fetcher: namespace: ${SW_NAMESPACE:""} ``` -`skywalking-segments`, `skywalking-metrics`, `skywalking-profile`, `skywalking-managements`, `skywalking-meters` -and `skywalking-logs` topics are required by `kafka-fetcher`. +`skywalking-segments`, `skywalking-metrics`, `skywalking-profile`, `skywalking-managements`, `skywalking-meters`, `skywalking-logs` +and `skywalking-logs-json` topics are required by `kafka-fetcher`. If they do not exist, Kafka Fetcher will create them in default. Also, you can create them by yourself before the OAP server started. When using the OAP server automatic creation mechanism, you could modify the number of partitions and replications of the topics through the following configurations: diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index 67100d39f8ab37d936379922b2f925fd59bcea36..e960405b2065dcab6f01055f78c7333e1657919d 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -219,7 +219,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | createTopicIfNotExist | If true, create the Kafka topic when it does not exist. | - | true | | - | - | partitions | The number of partitions for the topic being created. | SW_KAFKA_FETCHER_PARTITIONS | 3 | | - | - | enableMeterSystem | To enable to fetch and handle [Meter System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false | -| - | - | enableLog | To enable to fetch and handle log data. | SW_KAFKA_FETCHER_ENABLE_LOG | false | +| - | - | enableNativeProtoLog | To enable to fetch and handle native proto log data. | SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG | false | +| - | - | enableNativeJsonLog | To enable to fetch and handle native json log data. | SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG | false | | - | - | replicationFactor | The replication factor for each partition in the topic being created. | SW_KAFKA_FETCHER_PARTITIONS_FACTOR | 2 | | - | - | kafkaHandlerThreadPoolSize | Pool size of kafka message handler executor. | SW_KAFKA_HANDLER_THREAD_POOL_SIZE | CPU core * 2 | | - | - | kafkaHandlerThreadPoolQueueSize | The queue size of kafka message handler executor. | SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE | 10000 | @@ -228,7 +229,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | topicNameOfProfiling | Specifying Kafka topic name for Profiling data. | - | skywalking-profilings | | - | - | topicNameOfTracingSegments | Specifying Kafka topic name for Tracing data. | - | skywalking-segments | | - | - | topicNameOfManagements | Specifying Kafka topic name for service instance reporting and registering. | - | skywalking-managements | -| - | - | topicNameOfLogs | Specifying Kafka topic name for log data. | - | skywalking-logs | +| - | - | topicNameOfLogs | Specifying Kafka topic name for native proto log data. | - | skywalking-logs | +| - | - | topicNameOfJsonLogs | Specifying Kafka topic name for native json log data. | - | skywalking-logs-json | | receiver-browser | default | Read [receiver doc](backend-receivers.md) for more details | - | - | - | | - | - | sampleRate | Sampling rate for receiving trace. The precision is 1/10000. 10000 means 100% sample in default. | SW_RECEIVER_BROWSER_SAMPLE_RATE | 10000 | | query | graphql | - | GraphQL query implementation | - | diff --git a/docs/en/setup/backend/log-analyzer.md b/docs/en/setup/backend/log-analyzer.md index 4f1c88cbbd9a07b497ac8e08485fc6f2a63820fa..f82746935ea60c76affefc5580a7d8f3c8ecfb8d 100644 --- a/docs/en/setup/backend/log-analyzer.md +++ b/docs/en/setup/backend/log-analyzer.md @@ -11,8 +11,30 @@ Java agent provides toolkit for to report logs through gRPC with automatic injected trace context. [SkyWalking Satellite sidecar](https://github.com/apache/skywalking-satellite) is a recommended proxy/side to -forward logs including to use Kafka MQ to transport logs. When use this, need to open [kafka-fetcher](backend-fetcher.md#kafka-fetcher). - +forward logs including to use Kafka MQ to transport logs. When use this, need to open [kafka-fetcher](backend-fetcher.md#kafka-fetcher) +and enable configs `enableNativeProtoLog`. + +### Log files collector + +Java agent provides toolkit for +[log4j](../service-agent/java-agent/Application-toolkit-log4j-1.x.md#print-skywalking-context-in-your-logs), +[log4j2](../service-agent/java-agent/Application-toolkit-log4j-2.x.md#print-skywalking-context-in-your-logs), +[logback](../service-agent/java-agent/Application-toolkit-logback-1.x.md#print-skywalking-context-in-your-logs) +to report logs through files with automatic injected trace context. + +Log framework config examples: +- [log4j1.x fileAppender](../../../../test/e2e/e2e-service-provider/src/main/resources/log4j.properties) +- [log4j2.x fileAppender](../../../../test/e2e/e2e-service-provider/src/main/resources/log4j2.xml) +- [logback fileAppender](../../../../test/e2e/e2e-service-provider/src/main/resources/logback.xml) + +You can use [Filebeat](https://www.elastic.co/cn/beats/filebeat) 、[Fluentd](https://fluentd.org) to +collect file logs including to use Kafka MQ to transport [native-json](../../protocols/Log-Data-Protocol.md#Native-Json-Protocol) +format logs. When use this, need to open [kafka-fetcher](backend-fetcher.md#kafka-fetcher) +and enable configs `enableNativeJsonLog`. + +Collector config examples: +- [filebeat.yml](../../../../test/e2e/e2e-test/docker/kafka/filebeat.yml) +- [fluentd.conf](../../../../test/e2e/e2e-test/docker/kafka/fluentd.conf) ## Log Analyzer diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 7e5ed87ca159f6c8ba1060e650c47f00b42b1ad9..4692a2d1981242f585a4337a90e0acdb1d259796 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -347,7 +347,8 @@ kafka-fetcher: partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3} replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2} enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false} - enableLog: ${SW_KAFKA_FETCHER_ENABLE_LOG:false} + enableNativeProtoLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG:false} + enableNativeJsonLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG:false} isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false} consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""} kafkaHandlerThreadPoolSize: ${SW_KAFKA_HANDLER_THREAD_POOL_SIZE:-1} diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java index 3b324ed4ff984ce7bf8f89566577f3bf6180054a..63a3c0ea0aacc66ec19897f97ef0310a41e8db79 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java @@ -81,7 +81,9 @@ public class KafkaFetcherHandlerRegister implements Runnable { config.getTopicNameOfMetrics(), config.getTopicNameOfProfiling(), config.getTopicNameOfTracingSegments(), - config.getTopicNameOfMeters() + config.getTopicNameOfMeters(), + config.getTopicNameOfLogs(), + config.getTopicNameOfJsonLogs() )) .values() .entrySet() diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java index bb6f5470e7c0838583dfe913ffaee68c1b80ace2..e173513b39a333677bf07da7a89d87cd39cee91c 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java @@ -69,7 +69,9 @@ public class KafkaFetcherConfig extends ModuleConfig { private boolean enableMeterSystem = false; - private boolean enableLog = false; + private boolean enableNativeProtoLog = false; + + private boolean enableNativeJsonLog = false; private String configPath = "meter-analyzer-config"; @@ -85,6 +87,8 @@ public class KafkaFetcherConfig extends ModuleConfig { private String topicNameOfLogs = "skywalking-logs"; + private String topicNameOfJsonLogs = "skywalking-logs-json"; + private int kafkaHandlerThreadPoolSize; private int kafkaHandlerThreadPoolQueueSize; diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java index 367edf267aa1f84fd5d72274c61d8574fcfc6ce4..94be3913fd23196cad0c3c474509b11185c87514 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/KafkaFetcherProvider.java @@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.analyzer.agent.kafka.KafkaFetcherHandler import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig; import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherModule; import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.JVMMetricsHandler; +import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.JsonLogHandler; import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.LogHandler; import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.MeterServiceHandler; import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.ProfileTaskHandler; @@ -77,9 +78,13 @@ public class KafkaFetcherProvider extends ModuleProvider { if (config.isEnableMeterSystem()) { handlerRegister.register(new MeterServiceHandler(getManager(), config)); } - if (config.isEnableLog()) { + if (config.isEnableNativeProtoLog()) { handlerRegister.register(new LogHandler(getManager(), config)); } + if (config.isEnableNativeJsonLog()) { + handlerRegister.register(new JsonLogHandler(getManager(), config)); + } + handlerRegister.start(); } diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..83f2197726f98cd2e9f470f8912888b44fc0f37e --- /dev/null +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler; + +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.skywalking.apm.network.logging.v3.LogData; +import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils; + +@Slf4j +public class JsonLogHandler extends LogHandler { + + private final KafkaFetcherConfig config; + + public JsonLogHandler(ModuleManager moduleManager, KafkaFetcherConfig config) { + super(moduleManager, config); + this.config = config; + } + + @Override + public String getTopic() { + return config.getTopicNameOfJsonLogs(); + } + + @Override + protected String getProtocolName() { + return "kafka-fetcher-native-json"; + } + + @Override + protected LogData parseConsumerRecord(ConsumerRecord record) throws IOException { + LogData.Builder logDataBuilder = LogData.newBuilder(); + ProtoBufJsonUtils.fromJSON(record.value().toString(), logDataBuilder); + return logDataBuilder.build(); + } +} diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java index 1cb4f427d281860276fa8f4960a5974975c77a83..3a9c07d0401c078033f8c05c324a6aa891aee699 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java @@ -51,11 +51,11 @@ public class LogHandler implements KafkaHandler { .getService(MetricsCreator.class); histogram = metricsCreator.createHistogramMetric( "log_in_latency", "The process latency of log", - new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka-fetcher") + new MetricsTag.Keys("protocol"), new MetricsTag.Values(getProtocolName()) ); errorCounter = metricsCreator.createCounter("log_analysis_error_count", "The error number of log analysis", new MetricsTag.Keys("protocol"), - new MetricsTag.Values("kafka-fetcher") + new MetricsTag.Values(getProtocolName()) ); } @@ -73,7 +73,7 @@ public class LogHandler implements KafkaHandler { public void handle(final ConsumerRecord record) { HistogramMetrics.Timer timer = histogram.createTimer(); try { - LogData logData = LogData.parseFrom(record.value().get()); + LogData logData = parseConsumerRecord(record); logAnalyzerService.doAnalysis(logData); } catch (Exception e) { errorCounter.inc(); @@ -82,4 +82,12 @@ public class LogHandler implements KafkaHandler { timer.finish(); } } + + protected String getProtocolName() { + return "kafka-fetcher-native-proto"; + } + + protected LogData parseConsumerRecord(ConsumerRecord record) throws Exception { + return LogData.parseFrom(record.value().get()); + } } diff --git a/test/e2e/e2e-service-provider/src/main/java/org/apache/skywalking/e2e/controller/FileLogController.java b/test/e2e/e2e-service-provider/src/main/java/org/apache/skywalking/e2e/controller/FileLogController.java new file mode 100644 index 0000000000000000000000000000000000000000..34e293e57757b54b177ad55d235d425c6283aed0 --- /dev/null +++ b/test/e2e/e2e-service-provider/src/main/java/org/apache/skywalking/e2e/controller/FileLogController.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.e2e.controller; + +import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.skywalking.apm.toolkit.trace.TraceContext; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class FileLogController { + + private static final Logger LOG4J_LOGGER = Logger.getLogger("fileLogger"); + private static final org.apache.logging.log4j.Logger LOG4J2_LOGGER = LogManager.getLogger("fileLogger"); + private static final org.slf4j.Logger LOGBACK_LOGGER = LoggerFactory.getLogger("fileLogger"); + + @RequestMapping(value = "/file/logs/trigger") + public String trigger() { + LOG4J_LOGGER.info("log4j fileLogger ==> mills: " + System.currentTimeMillis()); + LOG4J2_LOGGER.info("log4j2 fileLogger ==> mills: " + System.currentTimeMillis()); + LOGBACK_LOGGER.info("logback fileLogger ==> mills: {}", System.currentTimeMillis()); + return TraceContext.traceId(); + } +} diff --git a/test/e2e/e2e-service-provider/src/main/resources/log4j.properties b/test/e2e/e2e-service-provider/src/main/resources/log4j.properties index d2fc80fd0e9cd9f719a86944417c67920bbbdc22..56a721e0adfd4efb28e4a01f8907ced6d1fdd416 100644 --- a/test/e2e/e2e-service-provider/src/main/resources/log4j.properties +++ b/test/e2e/e2e-service-provider/src/main/resources/log4j.properties @@ -16,3 +16,11 @@ log4j.rootLogger=info,CustomAppender log4j.appender.CustomAppender=org.apache.skywalking.apm.toolkit.log.log4j.v1.x.log.GRPCLogClientAppender log4j.appender.CustomAppender.layout=org.apache.log4j.PatternLayout log4j.appender.CustomAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n + +log4j.logger.fileLogger=info,FileAppender +log4j.appender.FileAppender=org.apache.log4j.FileAppender +log4j.appender.FileAppender.ImmediateFlush=true +log4j.appender.FileAppender.Append=true +log4j.appender.FileAppender.File=/tmp/skywalking-logs/log4j1/e2e-service-provider.log +log4j.appender.FileAppender.layout=org.apache.skywalking.apm.toolkit.log.log4j.v1.x.TraceIdPatternLayout +log4j.appender.FileAppender.layout.ConversionPattern=[%T{SW_CTX}] [%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c:%L - %m%n \ No newline at end of file diff --git a/test/e2e/e2e-service-provider/src/main/resources/log4j2.xml b/test/e2e/e2e-service-provider/src/main/resources/log4j2.xml index dc622bc07564632215c3d53ab838d2a32f55ecbf..65b0bbd431ee4b344b96ffe7f6bcae379196ea71 100644 --- a/test/e2e/e2e-service-provider/src/main/resources/log4j2.xml +++ b/test/e2e/e2e-service-provider/src/main/resources/log4j2.xml @@ -23,6 +23,11 @@ + + + [%sw_ctx] [%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c:%L - %m%n + + @@ -32,5 +37,8 @@ + + + diff --git a/test/e2e/e2e-service-provider/src/main/resources/logback.xml b/test/e2e/e2e-service-provider/src/main/resources/logback.xml index 16c9064a55066285fdf6564bec86611f1275b718..e091cdac16a5b287a8e68667f131234bcb237651 100644 --- a/test/e2e/e2e-service-provider/src/main/resources/logback.xml +++ b/test/e2e/e2e-service-provider/src/main/resources/logback.xml @@ -33,8 +33,20 @@ + + /tmp/skywalking-logs/logback/e2e-service-provider.log + + + [%sw_ctx] [%level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger:%line - %msg%n + + + + + + + diff --git a/test/e2e/e2e-test/docker/kafka/Dockerfile.fluentd b/test/e2e/e2e-test/docker/kafka/Dockerfile.fluentd new file mode 100644 index 0000000000000000000000000000000000000000..01d09480ffadcad590e9bac308eeeb24346f6e7f --- /dev/null +++ b/test/e2e/e2e-test/docker/kafka/Dockerfile.fluentd @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM bitnami/fluentd:1.12.1 + +## Install output kafka plugins +RUN fluent-gem install 'fluent-plugin-kafka' diff --git a/test/e2e/e2e-test/docker/kafka/docker-compose.log.yml b/test/e2e/e2e-test/docker/kafka/docker-compose.log.yml index 8df544e1fb51204589c992a028d337c5f874e3f8..54de06f112bdcc86f29b7fd3b236ea6c03119144 100644 --- a/test/e2e/e2e-test/docker/kafka/docker-compose.log.yml +++ b/test/e2e/e2e-test/docker/kafka/docker-compose.log.yml @@ -25,7 +25,9 @@ services: SW_KAFKA_FETCHER_SERVERS: broker-a:9092,broker-b:9092 SW_KAFKA_FETCHER_PARTITIONS: 2 SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1 - SW_KAFKA_FETCHER_ENABLE_LOG: "true" + SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG: "true" + SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG: "true" + SW_SEARCHABLE_LOGS_TAG_KEYS: level,logger,agent depends_on: broker-a: condition: service_healthy @@ -56,16 +58,62 @@ services: broker-b: condition: service_healthy + filebeat: + image: elastic/filebeat:7.12.0 + command: -e --strict.perms=false + networks: + - e2e + volumes: + - ./filebeat.yml:/usr/share/filebeat/filebeat.yml + - /tmp/skywalking-logs/:/tmp/skywalking-logs/ + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5066"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + broker-a: + condition: service_healthy + broker-b: + condition: service_healthy + + fluentd: + build: + context: ../../../ + dockerfile: e2e-test/docker/kafka/Dockerfile.fluentd + networks: + - e2e + volumes: + - ./fluentd.conf:/opt/bitnami/fluentd/conf/fluentd.conf + - /tmp/skywalking-logs/:/tmp/skywalking-logs/ + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/24220"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + broker-a: + condition: service_healthy + broker-b: + condition: service_healthy + provider: extends: file: ../base-compose.yml service: provider environment: SW_GRPC_LOG_SERVER_HOST: sw-satellite + volumes: + - /tmp/skywalking-logs/:/tmp/skywalking-logs/ depends_on: oap: condition: service_healthy sw-satellite: condition: service_healthy + filebeat: + condition: service_healthy + fluentd: + condition: service_healthy + networks: e2e: diff --git a/test/e2e/e2e-test/docker/kafka/filebeat.yml b/test/e2e/e2e-test/docker/kafka/filebeat.yml new file mode 100644 index 0000000000000000000000000000000000000000..3c4c9a22d13e7c0da1dcc055c42aa697aac692cc --- /dev/null +++ b/test/e2e/e2e-test/docker/kafka/filebeat.yml @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +http.enabled: true +filebeat.inputs: + - type: log + enabled: true + paths: + - /tmp/skywalking-logs/*/e2e-service-provider.log + +processors: + - dissect: + tokenizer: '[SW_CTX:%{SW_CTX}] [%{level}] %{logtime} [%{thread}] %{logger}:%{line} - %{body}' + field: "message" + target_prefix: "" + trim_values: all + overwrite_keys: true + - dissect: + tokenizer: '[%{service},%{serviceInstance},%{traceContext.traceId},%{traceContext.traceSegmentId},%{traceContext.spanId}]' + field: "SW_CTX" + target_prefix: "" + trim_values: all + - script: + lang: javascript + id: format_datetime + source: > + function process(event) { + var datetime = event.Get("logtime"); + var datetimeArr = datetime.split(" "); + var dateArr = datetimeArr[0].split("-"); + var timeArrs = datetimeArr[1].split("."); + var timeArr = timeArrs[0].split(":"); + var mills = timeArrs[1]; + + var date = new Date(); + date.setUTCFullYear(parseInt(dateArr[0])); + date.setUTCMonth(parseInt(dateArr[1]) - 1); + date.setUTCDate(parseInt(dateArr[2])); + date.setUTCHours(parseInt(timeArr[0])); + date.setUTCMinutes(parseInt(timeArr[1])); + date.setUTCSeconds(parseInt(timeArr[2])); + date.setUTCMilliseconds(parseInt(mills)); + + var timeMillis = date.getTime(); + event.Put("timestamp", timeMillis); + } + - script: + lang: javascript + id: build_logTags_array + source: > + function process(event) { + var logLevel = {"key": "level", "value": event.Get("level")}; + var logThread = {"key": "thread", "value": event.Get("thread")}; + var logLogger = {"key": "logger", "value": event.Get("logger")}; + + var logFile = {"key": "logfile", "value": event.Get("log.file.path")}; + var hostname = {"key": "hostname", "value": event.Get("agent.hostname")}; + var agent = {"key": "agent", "value": event.Get("agent.type")}; + var agentVersion = {"key": "agentVersion", "value": event.Get("agent.version")}; + var agentId = {"key": "agentId", "value": event.Get("agent.id")}; + var agentName = {"key": "agentName", "value": event.Get("agent.name")}; + + var logTags = [logLevel,logThread,logLogger,logFile,hostname,agent,agentVersion,agentId,agentName]; + event.Put("tags.data", logTags); + } + - rename: + fields: + - from: "body" + to: "body.text.text" + ignore_missing: false + fail_on_error: true + - drop_fields: + fields: ["ecs", "host", "input", "agent", "log", "message", "SW_CTX", "level", "thread", "logger", "line", "logtime"] + +output.kafka: + hosts: ["broker-a:9092", "broker-b:9092"] + topic: "skywalking-logs-json" diff --git a/test/e2e/e2e-test/docker/kafka/fluentd.conf b/test/e2e/e2e-test/docker/kafka/fluentd.conf new file mode 100644 index 0000000000000000000000000000000000000000..20f6a7018c404e53ec2a43f9b2f6f77efb196c7f --- /dev/null +++ b/test/e2e/e2e-test/docker/kafka/fluentd.conf @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + + @type monitor_agent + bind 0.0.0.0 + port 24220 + + + + @type tail + path /tmp/skywalking-logs/*/e2e-service-provider.log + read_from_head true + tag skywalking_log + refresh_interval 1 + + @type regexp + expression /^\[SW_CTX:\s*(?[^ ]*]*)\] \[(?[^ ]*)\] (?[^\]]*) \[(?[^ ]*)\] (?[^\]]*):(?[^\]]*) - (?[^\]]*)$/ + + + + + @type parser + key_name SW_CTX + reserve_data true + remove_key_name_field true + + @type regexp + expression /^\[(?[^\]]*),(?[^\]]*),(?[^\]]*),(?[^\]]*),(?[^\]]*)\]$/ + + + + + @type record_transformer + enable_ruby true + + traceContext ${{"traceId" => record["traceId"], "traceSegmentId" => record["traceSegmentId"], "spanId" => record["spanId"]}} + tags ${{"data" => [{"key" => "level", "value" => record["level"]}, {"key" => "thread", "value" => record["thread"]}, {"key" => "logger", "value" => record["logger"]}, {"key" => "agent", "value" => "fluentd"}]}} + body ${{"text" => {"text" => record["body"]}}} + timestamp ${DateTime.strptime(record["logtime"], '%Y-%m-%d %H:%M:%S.%L').strftime('%Q')} + + remove_keys level,thread,logger,line,traceId,traceSegmentId,spanId,logtime + + + + @type kafka2 + brokers broker-a:9092,broker-b:9092 + default_topic skywalking-logs-json + + @type json + + diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/kafka/KafkaLogE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/kafka/KafkaLogE2E.java index beb9f2135e4cf0a51300d3345dbfece93778189e..fc6d536b65d203a56b11ef2bf417cbcc75e5ff5b 100644 --- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/kafka/KafkaLogE2E.java +++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/kafka/KafkaLogE2E.java @@ -68,6 +68,7 @@ public class KafkaLogE2E extends SkyWalkingTestAdapter { public void setUp() throws Exception { queryClient(swWebappHostPort); trafficController(serviceHostPort, "/logs/trigger"); + trafficController(serviceHostPort, "/file/logs/trigger"); } @AfterAll @@ -107,6 +108,36 @@ public class KafkaLogE2E extends SkyWalkingTestAdapter { load("expected/log/logs.yml").as(LogsMatcher.class).verifyLoosely(logs); } + @RetryableTest + public void verifyLogFromFilebeat() throws Exception { + final String agent = "filebeat"; + verifyLogFrom(agent, "log4j fileLogger"); + verifyLogFrom(agent, "log4j2 fileLogger"); + verifyLogFrom(agent, "logback fileLogger"); + } + + @RetryableTest + public void verifyLogFromFluentd() throws Exception { + final String agent = "fluentd"; + verifyLogFrom(agent, "log4j fileLogger"); + verifyLogFrom(agent, "log4j2 fileLogger"); + verifyLogFrom(agent, "logback fileLogger"); + } + + private void verifyLogFrom(String agent, String keyword) throws Exception { + LogsQuery logsQuery = new LogsQuery().serviceId("WW91cl9BcHBsaWNhdGlvbk5hbWU=.1") + .addTag("agent", agent) + .start(startTime) + .end(Times.now()); + if (graphql.supportQueryLogsByKeywords()) { + logsQuery.keywordsOfContent(keyword); + } + final List logs = graphql.logs(logsQuery); + LOGGER.info("logs: {}", logs); + + load("expected/log/logs.yml").as(LogsMatcher.class).verifyLoosely(logs); + } + private void verifyServiceInstances(final Service service) throws Exception { final Instances instances = graphql.instances( new InstancesQuery().serviceId(service.getKey()).start(startTime).end(Times.now()));