未验证 提交 92f0ebf1 编写于 作者: C CoderGang 提交者: GitHub

Fixed bug #7706. Make LogHandler of kafka-fetcher-plugin can recognize namespace. (#7717)

上级 08990c90
......@@ -65,6 +65,7 @@ Release Notes.
* Fix `ProfileThreadSnapshotQuery.queryProfiledSegments` adopts a wrong sort function
* Support gRPC sync grouped dynamic configurations.
* Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
* Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
#### UI
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -37,15 +38,15 @@ public class JsonLogHandler extends LogHandler {
}
@Override
public String getTopic() {
public String getPlainTopic() {
return config.getTopicNameOfJsonLogs();
}
@Override
protected String getDataFormat() {
return "json";
}
@Override
protected LogData parseConsumerRecord(ConsumerRecord<String, Bytes> record) throws IOException {
LogData.Builder logDataBuilder = LogData.newBuilder();
......
......@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@Slf4j
public class LogHandler implements KafkaHandler {
public class LogHandler extends AbstractKafkaHandler {
private final KafkaFetcherConfig config;
private final HistogramMetrics histogram;
......@@ -41,6 +41,7 @@ public class LogHandler implements KafkaHandler {
public LogHandler(final ModuleManager moduleManager,
final KafkaFetcherConfig config) {
super(moduleManager, config);
this.config = config;
this.logAnalyzerService = moduleManager.find(LogAnalyzerModule.NAME)
.provider()
......@@ -69,7 +70,7 @@ public class LogHandler implements KafkaHandler {
}
@Override
public String getTopic() {
protected String getPlainTopic() {
return config.getTopicNameOfLogs();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
public class LogHandlerTest {
private static final String TOPIC_NAME = "skywalking-logs";
private LogHandler handler = null;
private KafkaFetcherConfig config = new KafkaFetcherConfig();
private ModuleManager manager;
@Before
public void setup() {
final ModuleManager manager = mock(ModuleManager.class, RETURNS_DEEP_STUBS);
when(manager.find(LogAnalyzerModule.NAME).provider().getService(any()))
.thenReturn(mock(ILogAnalyzerService.class));
when(manager.find(TelemetryModule.NAME).provider().getService(any()))
.thenReturn(mock(MetricsCreator.class));
handler = new LogHandler(manager, config);
}
@Test
public void testGetTopic() {
assertEquals(handler.getTopic(), TOPIC_NAME);
String namespace = "product";
config.setNamespace(namespace);
assertEquals(namespace + "-" + TOPIC_NAME, handler.getTopic());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册