diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 913df08f1155092c37f44f1f4276234c68e33105..08d412b93f485350b6c5bda09124d693cce08972 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -78,7 +78,37 @@ public class FunctionRuntimeManager implements AutoCloseable{ // All the runtime info related to functions executed by this worker // Fully Qualified InstanceId - > FunctionRuntimeInfo @VisibleForTesting - Map functionRuntimeInfoMap = new ConcurrentHashMap<>(); + class FunctionRuntimeInfos { + + private Map functionRuntimeInfoMap = new ConcurrentHashMap<>(); + + public FunctionRuntimeInfo get(String fullyQualifiedInstanceId) { + return functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + } + + public void put(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) { + if (!isInitializePhase) { + functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo); + } + } + + public void remove (String fullyQualifiedInstanceId) { + if (!isInitializePhase) { + functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); + } + } + + public Map getAll() { + return functionRuntimeInfoMap; + } + + public int size() { + return functionRuntimeInfoMap.size(); + } + } + + @VisibleForTesting + final FunctionRuntimeInfos functionRuntimeInfos = new FunctionRuntimeInfos(); @VisibleForTesting @Getter @@ -100,8 +130,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Getter private WorkerService workerService; - @Setter - @Getter boolean isInitializePhase = false; private final FunctionMetaDataManager functionMetaDataManager; @@ -212,12 +240,14 @@ public class FunctionRuntimeManager implements AutoCloseable{ .startMessageId(MessageId.earliest).create(); this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader); + // start init phase + this.isInitializePhase = true; // read all existing messages - this.setInitializePhase(true); while (reader.hasMessageAvailable()) { this.functionAssignmentTailer.processAssignment(reader.readNext()); } - this.setInitializePhase(false); + // init phase is done + this.isInitializePhase = false; // realize existing assignments Map assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId()); if (assignmentMap != null) { @@ -651,10 +681,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); this.conditionallyStartFunction(newFunctionRuntimeInfo); - this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); + this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } } else { - this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); + this.functionRuntimeInfos.remove(fullyQualifiedInstanceId); } } else { // if assignment got transferred to me just set function runtime @@ -668,9 +698,9 @@ public class FunctionRuntimeManager implements AutoCloseable{ runtimeSpawner.getRuntime().reinitialize(); newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); - this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); + this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } else { - this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); + this.functionRuntimeInfos.remove(fullyQualifiedInstanceId); } } } else { @@ -685,10 +715,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); this.conditionallyStartFunction(newFunctionRuntimeInfo); - this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); + this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } } else { - this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); + this.functionRuntimeInfos.remove(fullyQualifiedInstanceId); } } @@ -732,7 +762,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.conditionallyTerminateFunction(functionRuntimeInfo); } } - this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); + this.functionRuntimeInfos.remove(fullyQualifiedInstanceId); } String workerId = null; @@ -773,13 +803,14 @@ public class FunctionRuntimeManager implements AutoCloseable{ } private void startFunctionInstance(Assignment assignment) { + log.info("infos: {}", functionRuntimeInfos.getAll()); String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()); FunctionRuntimeInfo functionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId); if (functionRuntimeInfo == null) { functionRuntimeInfo = new FunctionRuntimeInfo() .setFunctionInstance(assignment.getInstance()); - this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo); + this.functionRuntimeInfos.put(fullyQualifiedInstanceId, functionRuntimeInfo); } else { //Somehow this function is already started log.warn("Function {} already running. Going to restart function.", @@ -790,7 +821,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } public Map getFunctionRuntimeInfos() { - return this.functionRuntimeInfoMap; + return this.functionRuntimeInfos.getAll(); } /** @@ -844,7 +875,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } private FunctionRuntimeInfo _getFunctionRuntimeInfo(String fullyQualifiedInstanceId) { - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfos.get(fullyQualifiedInstanceId); // sanity check to make sure assignments and runtimeinfo is in sync if (functionRuntimeInfo == null) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index c1e13a9f95875899ea34b2d29a752554f22c9988..9845555d84501850e8b425c96481c0644dc251d0 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -135,8 +135,8 @@ public class FunctionRuntimeManagerTest { functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1))); verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class)); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0"), + assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1); + assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())); @@ -201,7 +201,7 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.setAssignment(assignment2); reset(functionRuntimeManager); - functionRuntimeManager.functionRuntimeInfoMap.put( + functionRuntimeManager.functionRuntimeInfos.put( "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())); @@ -222,7 +222,7 @@ public class FunctionRuntimeManagerTest { verify(functionActioner).terminateFunction(argThat( functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1))); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 0); + assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0); } @Test @@ -245,13 +245,13 @@ public class FunctionRuntimeManagerTest { doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); // test new assignment update functions - FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( + FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), - mock(FunctionMetaDataManager.class))); + mock(FunctionMetaDataManager.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -280,7 +280,6 @@ public class FunctionRuntimeManagerTest { // add existing assignments functionRuntimeManager.setAssignment(assignment1); functionRuntimeManager.setAssignment(assignment2); - reset(functionRuntimeManager); reset(functionActioner); Function.Assignment assignment3 = Function.Assignment.newBuilder() @@ -289,11 +288,11 @@ public class FunctionRuntimeManagerTest { .setFunctionMetaData(function2).setInstanceId(0).build()) .build(); - functionRuntimeManager.functionRuntimeInfoMap.put( + functionRuntimeManager.functionRuntimeInfos.put( "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())); - functionRuntimeManager.functionRuntimeInfoMap.put( + functionRuntimeManager.functionRuntimeInfos.put( "test-tenant/test-namespace/func-2:0", new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0) .build())); @@ -312,14 +311,13 @@ public class FunctionRuntimeManagerTest { verify(functionActioner).startFunction(argThat( functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2))); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2); + assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2); assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1); assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3); - reset(functionRuntimeManager); reset(functionActioner); // add a stop @@ -344,7 +342,7 @@ public class FunctionRuntimeManagerTest { verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2); + assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2); assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1); @@ -373,13 +371,13 @@ public class FunctionRuntimeManagerTest { doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); // test new assignment update functions - FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( + FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), - mock(FunctionMetaDataManager.class))); + mock(FunctionMetaDataManager.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -401,7 +399,6 @@ public class FunctionRuntimeManagerTest { // add existing assignments functionRuntimeManager.setAssignment(assignment1); - reset(functionRuntimeManager); // new assignment with different worker Function.Assignment assignment2 = Function.Assignment.newBuilder() @@ -413,7 +410,7 @@ public class FunctionRuntimeManagerTest { FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build()); - functionRuntimeManager.functionRuntimeInfoMap.put( + functionRuntimeManager.functionRuntimeInfos.put( "test-tenant/test-namespace/func-1:0", functionRuntimeInfo); functionRuntimeManager.processAssignment(assignment2); @@ -424,11 +421,10 @@ public class FunctionRuntimeManagerTest { assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 0); - assertNull(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0")); + assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0); + assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")); /** Test transfer from other worker to me **/ - reset(functionRuntimeManager); reset(functionActioner); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -452,8 +448,8 @@ public class FunctionRuntimeManagerTest { assertNull(functionRuntimeManager.workerIdToAssignments .get("worker-2")); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo); + assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1); + assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo); } @Test @@ -550,13 +546,13 @@ public class FunctionRuntimeManagerTest { doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); // test new assignment add functions - FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( + FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), - mock(FunctionMetaDataManager.class))); + mock(FunctionMetaDataManager.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -567,16 +563,15 @@ public class FunctionRuntimeManagerTest { assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class)); - // Ideally this should be zero, but it will nevertheless be called with null runtimespawner which essentially - // results in it being noop. We ensure that in the check below. - verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class)); + + // verify stop function is called zero times because we don't want to unnecessarily restart any functions during initialization + verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner).startFunction(argThat(functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance()))); - verify(functionActioner).stopFunction(argThat(functionRuntimeInfo -> functionRuntimeInfo.getRuntimeSpawner() == null)); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1); - assertEquals(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0"), + assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1); + assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())); @@ -616,13 +611,13 @@ public class FunctionRuntimeManagerTest { kubernetesRuntimeFactory, null, null, null)); // test new assignment update functions - FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( + FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), - mock(FunctionMetaDataManager.class))); + mock(FunctionMetaDataManager.class)); functionRuntimeManager.setFunctionActioner(functionActioner); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() @@ -642,7 +637,6 @@ public class FunctionRuntimeManagerTest { // add existing assignments functionRuntimeManager.setAssignment(assignment1); - reset(functionRuntimeManager); // new assignment with different worker Function.Assignment assignment2 = Function.Assignment.newBuilder() @@ -656,7 +650,7 @@ public class FunctionRuntimeManagerTest { FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo() .setFunctionInstance(instance) .setRuntimeSpawner(functionActioner.getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath())); - functionRuntimeManager.functionRuntimeInfoMap.put( + functionRuntimeManager.functionRuntimeInfos.put( "test-tenant/test-namespace/func-1:0", functionRuntimeInfo); functionRuntimeManager.processAssignment(assignment2); @@ -668,7 +662,7 @@ public class FunctionRuntimeManagerTest { assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2); - assertNull(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0")); + assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")); /** Test transfer from other worker to me **/ @@ -691,21 +685,21 @@ public class FunctionRuntimeManagerTest { .get("worker-2")); assertEquals( - functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getFunctionInstance(), + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getFunctionInstance(), functionRuntimeInfo.getFunctionInstance()); assertNotNull( - functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner()); + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner()); assertEquals( - functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getInstanceConfig().getFunctionDetails(), + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getInstanceConfig().getFunctionDetails(), function1.getFunctionDetails()); assertEquals( - functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getInstanceConfig().getInstanceId(), + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getInstanceConfig().getInstanceId(), instance.getInstanceId()); assertTrue( - functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory); + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory); assertNotNull( - functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime()); + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime()); verify(kubernetesRuntime, times(1)).reinitialize(); }