未验证 提交 9b74e9dc 编写于 作者: S Sijie Guo 提交者: GitHub

[functions][stats] NPE in FunctionStatsGenerator when worker service is not ready (#2723)

* [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready

*Motivation*

NullPointerException was thrown when function worker is running as part of broker and metrics collection kicks in
before worker service completes initialization

*Changes*

Only generate functions when worker service is ready

* Fix FunctionSTatsGeneratorTest
上级 520f9aa9
......@@ -38,7 +38,8 @@ public class FunctionsStatsGenerator {
private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
if (workerService != null) {
// only when worker service is initialized, we generate the stats. otherwise we will get bunch of NPE.
if (workerService != null && workerService.isInitialized()) {
Map<String, FunctionRuntimeInfo> functionRuntimes
= workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
......
......@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import lombok.ToString;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.functions.proto.Function;
......@@ -41,10 +42,23 @@ import java.util.regex.Pattern;
import static com.google.common.base.Preconditions.checkArgument;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class FunctionStatsGeneratorTest {
@Test
public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() {
WorkerService workerService = mock(WorkerService.class);
when(workerService.isInitialized()).thenReturn(false);
FunctionsStatsGenerator.generate(
workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer()));
verify(workerService, times(1)).isInitialized();
verify(workerService, times(0)).getFunctionRuntimeManager();
}
@Test
public void testFunctionsStatsGenerate() {
FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
......@@ -53,6 +67,7 @@ public class FunctionStatsGeneratorTest {
WorkerService workerService = mock(WorkerService.class);
doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
doReturn(new WorkerConfig()).when(workerService).getWorkerConfig();
when(workerService.isInitialized()).thenReturn(true);
CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册