diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 3338cd3aeafbe479ae335171b088ef02fa33bc08..4cd62f3f5df30162e472cd15c5f3f04af8abfe04 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -247,9 +247,19 @@ public class KubernetesRuntime implements Runtime { throw e; } - if (channel == null && stub == null) { + setupGrpcChannelIfNeeded(); + } + + @Override + public void reinitialize() { + setupGrpcChannelIfNeeded(); + } + + private synchronized void setupGrpcChannelIfNeeded() { + if (channel == null || stub == null) { channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()]; stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()]; + String jobName = createJobName(instanceConfig.getFunctionDetails()); for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) { String address = getServiceUrl(jobName, jobNamespace, i); @@ -332,16 +342,16 @@ public class KubernetesRuntime implements Runtime { @Override public CompletableFuture getMetrics(int instanceId) { CompletableFuture retval = new CompletableFuture<>(); - if (instanceId < 0 || instanceId >= stub.length) { - if (stub == null) { - retval.completeExceptionally(new RuntimeException("Invalid InstanceId")); - return retval; - } - } if (stub == null) { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } + + if (instanceId < 0 || instanceId >= stub.length) { + retval.completeExceptionally(new RuntimeException("Invalid InstanceId")); + return retval; + } + ListenableFuture response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback() { @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java index 77f660f747fc23a8a0e3c778c17c07f939396080..7d49cf84341d719d1678c88f8293c6817ea7362c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java @@ -31,6 +31,10 @@ public interface Runtime { void start() throws Exception; + default void reinitialize() { + + } + void join() throws Exception; void stop() throws Exception; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 534710dd358cf5818db37de331968ef20a2467a8..7ae25727fd2435bb1ab001ad4908226db965ca95 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -215,9 +215,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } } - // start assignment tailer - this.functionAssignmentTailer.start(); - } catch (Exception e) { log.error("Failed to initialize function runtime manager: ", e.getMessage(), e); throw new RuntimeException(e); @@ -229,7 +226,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ */ public void start() { log.info("/** Starting Function Runtime Manager **/"); - log.info("Initialize metrics sink..."); log.info("Starting function assignment tailer..."); this.functionAssignmentTailer.start(); } @@ -629,7 +625,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ // changes to the function meta data of the instance if (runtimeFactory.externallyManaged()) { - // change in metadata thus need to potentially restart if (!assignment.getInstance().equals(existingAssignment.getInstance())) { //stop function @@ -657,6 +652,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner( assignment.getInstance(), assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath()); + // re-initialize if necessary + runtimeSpawner.getRuntime().reinitialize(); newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 399b3554ddc754431e3e79cfb162ae7fb56b7419..631507a44dccf12cd96708cfd396044d355b9125 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -648,7 +648,8 @@ public class FunctionRuntimeManagerTest { doNothing().when(kubernetesRuntimeFactory).setupClient(); doReturn(true).when(kubernetesRuntimeFactory).externallyManaged(); - doReturn(mock(KubernetesRuntime.class)).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any()); + KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class); + doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any()); FunctionActioner functionActioner = spy(new FunctionActioner( workerConfig, @@ -743,7 +744,10 @@ public class FunctionRuntimeManagerTest { instance.getInstanceId()); Assert.assertTrue( functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory); + Assert.assertTrue( functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime() != null); + + verify(kubernetesRuntime, times(1)).reinitialize(); } }