未验证 提交 47fef5df 编写于 作者: H hailin0 提交者: GitHub

Support native-json format log in kafka-fetcher-plugin (#6952)

上级 015d3877
......@@ -29,6 +29,7 @@ Release Notes.
* Fix possible NullPointerException in agent's ES plugin.
* Fix the conversion problem of float type in ConfigInitializer.
* Fixed part of the dynamic configuration of ConfigurationDiscoveryService that does not take effect under certain circumstances.
* Support `native-json` format log in kafka-fetcher-plugin.
#### OAP-Backend
* BugFix: filter invalid Envoy access logs whose socket address is empty.
......
......@@ -2,4 +2,44 @@
Report log data via protocol.
## Native Proto Protocol
Report `native-proto` format log via gRPC.
[gRPC service define](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto)
## Native Json Protocol
Report `native-json` format log via kafka.
Json log record example:
```json
{
"timestamp":1618161813371,
"service":"Your_ApplicationName",
"serviceInstance":"3a5b8da5a5ba40c0b192e91b5c80f1a8@192.168.1.8",
"traceContext":{
"traceId":"ddd92f52207c468e9cd03ddd107cd530.69.16181331190470001",
"spanId":"0",
"traceSegmentId":"ddd92f52207c468e9cd03ddd107cd530.69.16181331190470000"
},
"tags":{
"data":[
{
"key":"level",
"value":"INFO"
},
{
"key":"logger",
"value":"com.example.MyLogger"
}
]
},
"body":{
"text":{
"text":"log message"
}
}
}
```
......@@ -95,8 +95,8 @@ kafka-fetcher:
namespace: ${SW_NAMESPACE:""}
```
`skywalking-segments`, `skywalking-metrics`, `skywalking-profile`, `skywalking-managements`, `skywalking-meters`
and `skywalking-logs` topics are required by `kafka-fetcher`.
`skywalking-segments`, `skywalking-metrics`, `skywalking-profile`, `skywalking-managements`, `skywalking-meters`, `skywalking-logs`
and `skywalking-logs-json` topics are required by `kafka-fetcher`.
If they do not exist, Kafka Fetcher will create them in default. Also, you can create them by yourself before the OAP server started.
When using the OAP server automatic creation mechanism, you could modify the number of partitions and replications of the topics through the following configurations:
......
......@@ -219,7 +219,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | createTopicIfNotExist | If true, create the Kafka topic when it does not exist. | - | true |
| - | - | partitions | The number of partitions for the topic being created. | SW_KAFKA_FETCHER_PARTITIONS | 3 |
| - | - | enableMeterSystem | To enable to fetch and handle [Meter System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false |
| - | - | enableLog | To enable to fetch and handle log data. | SW_KAFKA_FETCHER_ENABLE_LOG | false |
| - | - | enableNativeProtoLog | To enable to fetch and handle native proto log data. | SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG | false |
| - | - | enableNativeJsonLog | To enable to fetch and handle native json log data. | SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG | false |
| - | - | replicationFactor | The replication factor for each partition in the topic being created. | SW_KAFKA_FETCHER_PARTITIONS_FACTOR | 2 |
| - | - | kafkaHandlerThreadPoolSize | Pool size of kafka message handler executor. | SW_KAFKA_HANDLER_THREAD_POOL_SIZE | CPU core * 2 |
| - | - | kafkaHandlerThreadPoolQueueSize | The queue size of kafka message handler executor. | SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE | 10000 |
......@@ -228,7 +229,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | topicNameOfProfiling | Specifying Kafka topic name for Profiling data. | - | skywalking-profilings |
| - | - | topicNameOfTracingSegments | Specifying Kafka topic name for Tracing data. | - | skywalking-segments |
| - | - | topicNameOfManagements | Specifying Kafka topic name for service instance reporting and registering. | - | skywalking-managements |
| - | - | topicNameOfLogs | Specifying Kafka topic name for log data. | - | skywalking-logs |
| - | - | topicNameOfLogs | Specifying Kafka topic name for native proto log data. | - | skywalking-logs |
| - | - | topicNameOfJsonLogs | Specifying Kafka topic name for native json log data. | - | skywalking-logs-json |
| receiver-browser | default | Read [receiver doc](backend-receivers.md) for more details | - | - | - |
| - | - | sampleRate | Sampling rate for receiving trace. The precision is 1/10000. 10000 means 100% sample in default. | SW_RECEIVER_BROWSER_SAMPLE_RATE | 10000 |
| query | graphql | - | GraphQL query implementation | - |
......
......@@ -11,8 +11,30 @@ Java agent provides toolkit for
to report logs through gRPC with automatic injected trace context.
[SkyWalking Satellite sidecar](https://github.com/apache/skywalking-satellite) is a recommended proxy/side to
forward logs including to use Kafka MQ to transport logs. When use this, need to open [kafka-fetcher](backend-fetcher.md#kafka-fetcher).
forward logs including to use Kafka MQ to transport logs. When use this, need to open [kafka-fetcher](backend-fetcher.md#kafka-fetcher)
and enable configs `enableNativeProtoLog`.
### Log files collector
Java agent provides toolkit for
[log4j](../service-agent/java-agent/Application-toolkit-log4j-1.x.md#print-skywalking-context-in-your-logs),
[log4j2](../service-agent/java-agent/Application-toolkit-log4j-2.x.md#print-skywalking-context-in-your-logs),
[logback](../service-agent/java-agent/Application-toolkit-logback-1.x.md#print-skywalking-context-in-your-logs)
to report logs through files with automatic injected trace context.
Log framework config examples:
- [log4j1.x fileAppender](../../../../test/e2e/e2e-service-provider/src/main/resources/log4j.properties)
- [log4j2.x fileAppender](../../../../test/e2e/e2e-service-provider/src/main/resources/log4j2.xml)
- [logback fileAppender](../../../../test/e2e/e2e-service-provider/src/main/resources/logback.xml)
You can use [Filebeat](https://www.elastic.co/cn/beats/filebeat)[Fluentd](https://fluentd.org) to
collect file logs including to use Kafka MQ to transport [native-json](../../protocols/Log-Data-Protocol.md#Native-Json-Protocol)
format logs. When use this, need to open [kafka-fetcher](backend-fetcher.md#kafka-fetcher)
and enable configs `enableNativeJsonLog`.
Collector config examples:
- [filebeat.yml](../../../../test/e2e/e2e-test/docker/kafka/filebeat.yml)
- [fluentd.conf](../../../../test/e2e/e2e-test/docker/kafka/fluentd.conf)
## Log Analyzer
......
......@@ -347,7 +347,8 @@ kafka-fetcher:
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
enableLog: ${SW_KAFKA_FETCHER_ENABLE_LOG:false}
enableNativeProtoLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG:false}
enableNativeJsonLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG:false}
isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
kafkaHandlerThreadPoolSize: ${SW_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
......
......@@ -81,7 +81,9 @@ public class KafkaFetcherHandlerRegister implements Runnable {
config.getTopicNameOfMetrics(),
config.getTopicNameOfProfiling(),
config.getTopicNameOfTracingSegments(),
config.getTopicNameOfMeters()
config.getTopicNameOfMeters(),
config.getTopicNameOfLogs(),
config.getTopicNameOfJsonLogs()
))
.values()
.entrySet()
......
......@@ -69,7 +69,9 @@ public class KafkaFetcherConfig extends ModuleConfig {
private boolean enableMeterSystem = false;
private boolean enableLog = false;
private boolean enableNativeProtoLog = false;
private boolean enableNativeJsonLog = false;
private String configPath = "meter-analyzer-config";
......@@ -85,6 +87,8 @@ public class KafkaFetcherConfig extends ModuleConfig {
private String topicNameOfLogs = "skywalking-logs";
private String topicNameOfJsonLogs = "skywalking-logs-json";
private int kafkaHandlerThreadPoolSize;
private int kafkaHandlerThreadPoolQueueSize;
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.analyzer.agent.kafka.KafkaFetcherHandler
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherModule;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.JVMMetricsHandler;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.JsonLogHandler;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.LogHandler;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.MeterServiceHandler;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.ProfileTaskHandler;
......@@ -77,9 +78,13 @@ public class KafkaFetcherProvider extends ModuleProvider {
if (config.isEnableMeterSystem()) {
handlerRegister.register(new MeterServiceHandler(getManager(), config));
}
if (config.isEnableLog()) {
if (config.isEnableNativeProtoLog()) {
handlerRegister.register(new LogHandler(getManager(), config));
}
if (config.isEnableNativeJsonLog()) {
handlerRegister.register(new JsonLogHandler(getManager(), config));
}
handlerRegister.start();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils;
@Slf4j
public class JsonLogHandler extends LogHandler {
private final KafkaFetcherConfig config;
public JsonLogHandler(ModuleManager moduleManager, KafkaFetcherConfig config) {
super(moduleManager, config);
this.config = config;
}
@Override
public String getTopic() {
return config.getTopicNameOfJsonLogs();
}
@Override
protected String getProtocolName() {
return "kafka-fetcher-native-json";
}
@Override
protected LogData parseConsumerRecord(ConsumerRecord<String, Bytes> record) throws IOException {
LogData.Builder logDataBuilder = LogData.newBuilder();
ProtoBufJsonUtils.fromJSON(record.value().toString(), logDataBuilder);
return logDataBuilder.build();
}
}
......@@ -51,11 +51,11 @@ public class LogHandler implements KafkaHandler {
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"log_in_latency", "The process latency of log",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka-fetcher")
new MetricsTag.Keys("protocol"), new MetricsTag.Values(getProtocolName())
);
errorCounter = metricsCreator.createCounter("log_analysis_error_count", "The error number of log analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka-fetcher")
new MetricsTag.Values(getProtocolName())
);
}
......@@ -73,7 +73,7 @@ public class LogHandler implements KafkaHandler {
public void handle(final ConsumerRecord<String, Bytes> record) {
HistogramMetrics.Timer timer = histogram.createTimer();
try {
LogData logData = LogData.parseFrom(record.value().get());
LogData logData = parseConsumerRecord(record);
logAnalyzerService.doAnalysis(logData);
} catch (Exception e) {
errorCounter.inc();
......@@ -82,4 +82,12 @@ public class LogHandler implements KafkaHandler {
timer.finish();
}
}
protected String getProtocolName() {
return "kafka-fetcher-native-proto";
}
protected LogData parseConsumerRecord(ConsumerRecord<String, Bytes> record) throws Exception {
return LogData.parseFrom(record.value().get());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.e2e.controller;
import org.apache.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class FileLogController {
private static final Logger LOG4J_LOGGER = Logger.getLogger("fileLogger");
private static final org.apache.logging.log4j.Logger LOG4J2_LOGGER = LogManager.getLogger("fileLogger");
private static final org.slf4j.Logger LOGBACK_LOGGER = LoggerFactory.getLogger("fileLogger");
@RequestMapping(value = "/file/logs/trigger")
public String trigger() {
LOG4J_LOGGER.info("log4j fileLogger ==> mills: " + System.currentTimeMillis());
LOG4J2_LOGGER.info("log4j2 fileLogger ==> mills: " + System.currentTimeMillis());
LOGBACK_LOGGER.info("logback fileLogger ==> mills: {}", System.currentTimeMillis());
return TraceContext.traceId();
}
}
......@@ -16,3 +16,11 @@ log4j.rootLogger=info,CustomAppender
log4j.appender.CustomAppender=org.apache.skywalking.apm.toolkit.log.log4j.v1.x.log.GRPCLogClientAppender
log4j.appender.CustomAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.CustomAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
log4j.logger.fileLogger=info,FileAppender
log4j.appender.FileAppender=org.apache.log4j.FileAppender
log4j.appender.FileAppender.ImmediateFlush=true
log4j.appender.FileAppender.Append=true
log4j.appender.FileAppender.File=/tmp/skywalking-logs/log4j1/e2e-service-provider.log
log4j.appender.FileAppender.layout=org.apache.skywalking.apm.toolkit.log.log4j.v1.x.TraceIdPatternLayout
log4j.appender.FileAppender.layout.ConversionPattern=[%T{SW_CTX}] [%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c:%L - %m%n
\ No newline at end of file
......@@ -23,6 +23,11 @@
<GRPCLogClientAppender name="grpc-log">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</GRPCLogClientAppender>
<RandomAccessFile name="fileAppender" fileName="/tmp/skywalking-logs/log4j2/e2e-service-provider.log" immediateFlush="true" append="true">
<PatternLayout>
<Pattern>[%sw_ctx] [%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c:%L - %m%n</Pattern>
</PatternLayout>
</RandomAccessFile>
</Appenders>
......@@ -32,5 +37,8 @@
<AppenderRef ref="Console"/>
<AppenderRef ref="grpc-log"/>
</Root>
<Logger name="fileLogger" level="info" additivity="false">
<AppenderRef ref="fileAppender"/>
</Logger>
</Loggers>
</Configuration>
......@@ -33,8 +33,20 @@
</encoder>
</appender>
<appender name="fileAppender" class="ch.qos.logback.core.FileAppender">
<file>/tmp/skywalking-logs/logback/e2e-service-provider.log</file>
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<Pattern>[%sw_ctx] [%level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger:%line - %msg%n</Pattern>
</layout>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="grpc-log"/>
<appender-ref ref="stdout"/>
</root>
<logger name="fileLogger" level="INFO">
<appender-ref ref="fileAppender"/>
</logger>
</configuration>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM bitnami/fluentd:1.12.1
## Install output kafka plugins
RUN fluent-gem install 'fluent-plugin-kafka'
......@@ -25,7 +25,9 @@ services:
SW_KAFKA_FETCHER_SERVERS: broker-a:9092,broker-b:9092
SW_KAFKA_FETCHER_PARTITIONS: 2
SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1
SW_KAFKA_FETCHER_ENABLE_LOG: "true"
SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG: "true"
SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG: "true"
SW_SEARCHABLE_LOGS_TAG_KEYS: level,logger,agent
depends_on:
broker-a:
condition: service_healthy
......@@ -56,16 +58,62 @@ services:
broker-b:
condition: service_healthy
filebeat:
image: elastic/filebeat:7.12.0
command: -e --strict.perms=false
networks:
- e2e
volumes:
- ./filebeat.yml:/usr/share/filebeat/filebeat.yml
- /tmp/skywalking-logs/:/tmp/skywalking-logs/
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5066"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
broker-a:
condition: service_healthy
broker-b:
condition: service_healthy
fluentd:
build:
context: ../../../
dockerfile: e2e-test/docker/kafka/Dockerfile.fluentd
networks:
- e2e
volumes:
- ./fluentd.conf:/opt/bitnami/fluentd/conf/fluentd.conf
- /tmp/skywalking-logs/:/tmp/skywalking-logs/
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/24220"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
broker-a:
condition: service_healthy
broker-b:
condition: service_healthy
provider:
extends:
file: ../base-compose.yml
service: provider
environment:
SW_GRPC_LOG_SERVER_HOST: sw-satellite
volumes:
- /tmp/skywalking-logs/:/tmp/skywalking-logs/
depends_on:
oap:
condition: service_healthy
sw-satellite:
condition: service_healthy
filebeat:
condition: service_healthy
fluentd:
condition: service_healthy
networks:
e2e:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
http.enabled: true
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/skywalking-logs/*/e2e-service-provider.log
processors:
- dissect:
tokenizer: '[SW_CTX:%{SW_CTX}] [%{level}] %{logtime} [%{thread}] %{logger}:%{line} - %{body}'
field: "message"
target_prefix: ""
trim_values: all
overwrite_keys: true
- dissect:
tokenizer: '[%{service},%{serviceInstance},%{traceContext.traceId},%{traceContext.traceSegmentId},%{traceContext.spanId}]'
field: "SW_CTX"
target_prefix: ""
trim_values: all
- script:
lang: javascript
id: format_datetime
source: >
function process(event) {
var datetime = event.Get("logtime");
var datetimeArr = datetime.split(" ");
var dateArr = datetimeArr[0].split("-");
var timeArrs = datetimeArr[1].split(".");
var timeArr = timeArrs[0].split(":");
var mills = timeArrs[1];
var date = new Date();
date.setUTCFullYear(parseInt(dateArr[0]));
date.setUTCMonth(parseInt(dateArr[1]) - 1);
date.setUTCDate(parseInt(dateArr[2]));
date.setUTCHours(parseInt(timeArr[0]));
date.setUTCMinutes(parseInt(timeArr[1]));
date.setUTCSeconds(parseInt(timeArr[2]));
date.setUTCMilliseconds(parseInt(mills));
var timeMillis = date.getTime();
event.Put("timestamp", timeMillis);
}
- script:
lang: javascript
id: build_logTags_array
source: >
function process(event) {
var logLevel = {"key": "level", "value": event.Get("level")};
var logThread = {"key": "thread", "value": event.Get("thread")};
var logLogger = {"key": "logger", "value": event.Get("logger")};
var logFile = {"key": "logfile", "value": event.Get("log.file.path")};
var hostname = {"key": "hostname", "value": event.Get("agent.hostname")};
var agent = {"key": "agent", "value": event.Get("agent.type")};
var agentVersion = {"key": "agentVersion", "value": event.Get("agent.version")};
var agentId = {"key": "agentId", "value": event.Get("agent.id")};
var agentName = {"key": "agentName", "value": event.Get("agent.name")};
var logTags = [logLevel,logThread,logLogger,logFile,hostname,agent,agentVersion,agentId,agentName];
event.Put("tags.data", logTags);
}
- rename:
fields:
- from: "body"
to: "body.text.text"
ignore_missing: false
fail_on_error: true
- drop_fields:
fields: ["ecs", "host", "input", "agent", "log", "message", "SW_CTX", "level", "thread", "logger", "line", "logtime"]
output.kafka:
hosts: ["broker-a:9092", "broker-b:9092"]
topic: "skywalking-logs-json"
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
<source>
@type tail
path /tmp/skywalking-logs/*/e2e-service-provider.log
read_from_head true
tag skywalking_log
refresh_interval 1
<parse>
@type regexp
expression /^\[SW_CTX:\s*(?<SW_CTX>[^ ]*]*)\] \[(?<level>[^ ]*)\] (?<logtime>[^\]]*) \[(?<thread>[^ ]*)\] (?<logger>[^\]]*):(?<line>[^\]]*) - (?<body>[^\]]*)$/
</parse>
</source>
<filter skywalking_log>
@type parser
key_name SW_CTX
reserve_data true
remove_key_name_field true
<parse>
@type regexp
expression /^\[(?<service>[^\]]*),(?<serviceInstance>[^\]]*),(?<traceId>[^\]]*),(?<traceSegmentId>[^\]]*),(?<spanId>[^\]]*)\]$/
</parse>
</filter>
<filter skywalking_log>
@type record_transformer
enable_ruby true
<record>
traceContext ${{"traceId" => record["traceId"], "traceSegmentId" => record["traceSegmentId"], "spanId" => record["spanId"]}}
tags ${{"data" => [{"key" => "level", "value" => record["level"]}, {"key" => "thread", "value" => record["thread"]}, {"key" => "logger", "value" => record["logger"]}, {"key" => "agent", "value" => "fluentd"}]}}
body ${{"text" => {"text" => record["body"]}}}
timestamp ${DateTime.strptime(record["logtime"], '%Y-%m-%d %H:%M:%S.%L').strftime('%Q')}
</record>
remove_keys level,thread,logger,line,traceId,traceSegmentId,spanId,logtime
</filter>
<match skywalking_log>
@type kafka2
brokers broker-a:9092,broker-b:9092
default_topic skywalking-logs-json
<format>
@type json
</format>
</match>
......@@ -68,6 +68,7 @@ public class KafkaLogE2E extends SkyWalkingTestAdapter {
public void setUp() throws Exception {
queryClient(swWebappHostPort);
trafficController(serviceHostPort, "/logs/trigger");
trafficController(serviceHostPort, "/file/logs/trigger");
}
@AfterAll
......@@ -107,6 +108,36 @@ public class KafkaLogE2E extends SkyWalkingTestAdapter {
load("expected/log/logs.yml").as(LogsMatcher.class).verifyLoosely(logs);
}
@RetryableTest
public void verifyLogFromFilebeat() throws Exception {
final String agent = "filebeat";
verifyLogFrom(agent, "log4j fileLogger");
verifyLogFrom(agent, "log4j2 fileLogger");
verifyLogFrom(agent, "logback fileLogger");
}
@RetryableTest
public void verifyLogFromFluentd() throws Exception {
final String agent = "fluentd";
verifyLogFrom(agent, "log4j fileLogger");
verifyLogFrom(agent, "log4j2 fileLogger");
verifyLogFrom(agent, "logback fileLogger");
}
private void verifyLogFrom(String agent, String keyword) throws Exception {
LogsQuery logsQuery = new LogsQuery().serviceId("WW91cl9BcHBsaWNhdGlvbk5hbWU=.1")
.addTag("agent", agent)
.start(startTime)
.end(Times.now());
if (graphql.supportQueryLogsByKeywords()) {
logsQuery.keywordsOfContent(keyword);
}
final List<Log> logs = graphql.logs(logsQuery);
LOGGER.info("logs: {}", logs);
load("expected/log/logs.yml").as(LogsMatcher.class).verifyLoosely(logs);
}
private void verifyServiceInstances(final Service service) throws Exception {
final Instances instances = graphql.instances(
new InstancesQuery().serviceId(service.getKey()).start(startTime).end(Times.now()));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册