From 27a9f628780a15e2e06b7dac28cb3f72c94b1e51 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Thu, 22 Aug 2019 11:34:28 -0500 Subject: [PATCH] Need to reinitialize certain components for externally managed runtimes when moving functions (#5007) --- .../functions/runtime/KubernetesRuntime.java | 24 +++++++++++++------ .../pulsar/functions/runtime/Runtime.java | 4 ++++ .../worker/FunctionRuntimeManager.java | 7 ++---- .../worker/FunctionRuntimeManagerTest.java | 5 +++- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 65f2f7e0867..4f9b77ae37e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -254,9 +254,19 @@ public class KubernetesRuntime implements Runtime { throw e; } - if (channel == null && stub == null) { + setupGrpcChannelIfNeeded(); + } + + @Override + public void reinitialize() { + setupGrpcChannelIfNeeded(); + } + + private synchronized void setupGrpcChannelIfNeeded() { + if (channel == null || stub == null) { channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()]; stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()]; + String jobName = createJobName(instanceConfig.getFunctionDetails()); for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) { String address = getServiceUrl(jobName, jobNamespace, i); @@ -339,16 +349,16 @@ public class KubernetesRuntime implements Runtime { @Override public CompletableFuture getMetrics(int instanceId) { CompletableFuture retval = new CompletableFuture<>(); - if (instanceId < 0 || instanceId >= stub.length) { - if (stub == null) { - retval.completeExceptionally(new RuntimeException("Invalid InstanceId")); - return retval; - } - } if (stub == null) { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } + + if (instanceId < 0 || instanceId >= stub.length) { + retval.completeExceptionally(new RuntimeException("Invalid InstanceId")); + return retval; + } + ListenableFuture response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback() { @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java index 77f660f747f..7d49cf84341 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java @@ -31,6 +31,10 @@ public interface Runtime { void start() throws Exception; + default void reinitialize() { + + } + void join() throws Exception; void stop() throws Exception; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index bbf8e604940..f4d5d83e4f5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -217,9 +217,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } } - // start assignment tailer - this.functionAssignmentTailer.start(); - } catch (Exception e) { log.error("Failed to initialize function runtime manager: ", e.getMessage(), e); throw new RuntimeException(e); @@ -231,7 +228,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ */ public void start() { log.info("/** Starting Function Runtime Manager **/"); - log.info("Initialize metrics sink..."); log.info("Starting function assignment tailer..."); this.functionAssignmentTailer.start(); } @@ -631,7 +627,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ // changes to the function meta data of the instance if (runtimeFactory.externallyManaged()) { - // change in metadata thus need to potentially restart if (!assignment.getInstance().equals(existingAssignment.getInstance())) { //stop function @@ -659,6 +654,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner( assignment.getInstance(), assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath()); + // re-initialize if necessary + runtimeSpawner.getRuntime().reinitialize(); newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index ac3a3956dd2..c1e13a9f958 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -608,7 +608,8 @@ public class FunctionRuntimeManagerTest { doNothing().when(kubernetesRuntimeFactory).setupClient(); doReturn(true).when(kubernetesRuntimeFactory).externallyManaged(); - doReturn(mock(KubernetesRuntime.class)).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any()); + KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class); + doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any()); FunctionActioner functionActioner = spy(new FunctionActioner( workerConfig, @@ -705,5 +706,7 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory); assertNotNull( functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime()); + + verify(kubernetesRuntime, times(1)).reinitialize(); } } -- GitLab