diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index 81dc05ee97b56a5c054845a329332f9b75fe408a..656a2efa5df4d42d899f86edafa5330c0ddcd5d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -55,6 +55,13 @@ public interface DeclarativeSlotPool { */ void decreaseResourceRequirementsBy(ResourceCounter decrement); + /** + * Sets the resource requirements to the given resourceRequirements. + * + * @param resourceRequirements new resource requirements + */ + void setResourceRequirements(ResourceCounter resourceRequirements); + /** * Returns the current resource requirements. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index bd8a27a17e401c3555e24e51e50e992d465c4e8e..279992b531ed3f5643a6dd7b393d4c5849512fa9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -124,6 +124,13 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { declareResourceRequirements(); } + @Override + public void setResourceRequirements(ResourceCounter resourceRequirements) { + totalResourceRequirements = resourceRequirements; + + declareResourceRequirements(); + } + private void declareResourceRequirements() { final Collection resourceRequirements = getResourceRequirements(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index 602aca895c7ad2fac68a555f9c20584ab826f9de..60046cff72b7eb8702fcbaf022eb7ec991a357c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -515,6 +515,35 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { hasItems(ResourceRequirement.create(largeResourceProfile, 2))); } + @Test + public void testSetResourceRequirementsForInitialResourceRequirements() { + final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + + final ResourceCounter resourceRequirements = + ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); + + slotPool.setResourceRequirements(resourceRequirements); + + assertThat( + slotPool.getResourceRequirements(), + is(toResourceRequirements(resourceRequirements))); + } + + @Test + public void testSetResourceRequirementsOverwritesPreviousValue() { + final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + + slotPool.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1, 1)); + + final ResourceCounter resourceRequirements = + ResourceCounter.withResource(RESOURCE_PROFILE_2, 1); + slotPool.setResourceRequirements(resourceRequirements); + + assertThat( + slotPool.getResourceRequirements(), + is(toResourceRequirements(resourceRequirements))); + } + @Nonnull private static ResourceCounter createResourceRequirements() { final Map requirements = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index 4828bf282ffcb078b2f632c96ea0cf09142cc60e..a2bac2b1592ac0e8a8cb0115b49b4e66fe8ff93f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -72,6 +72,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { private final LongConsumer releaseIdleSlotsConsumer; + private final Consumer setResourceRequirementsConsumer; + TestingDeclarativeSlotPool( Consumer increaseResourceRequirementsByConsumer, Consumer decreaseResourceRequirementsByConsumer, @@ -90,7 +92,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { BiFunction reserveFreeSlotFunction, TriFunction freeReservedSlotFunction, Function containsSlotsFunction, - LongConsumer releaseIdleSlotsConsumer) { + LongConsumer releaseIdleSlotsConsumer, + Consumer setResourceRequirementsConsumer) { this.increaseResourceRequirementsByConsumer = increaseResourceRequirementsByConsumer; this.decreaseResourceRequirementsByConsumer = decreaseResourceRequirementsByConsumer; this.getResourceRequirementsSupplier = getResourceRequirementsSupplier; @@ -103,6 +106,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { this.freeReservedSlotFunction = freeReservedSlotFunction; this.containsSlotsFunction = containsSlotsFunction; this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer; + this.setResourceRequirementsConsumer = setResourceRequirementsConsumer; } @Override @@ -115,6 +119,11 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { decreaseResourceRequirementsByConsumer.accept(decrement); } + @Override + public void setResourceRequirements(ResourceCounter resourceRequirements) { + setResourceRequirementsConsumer.accept(resourceRequirements); + } + @Override public Collection getResourceRequirements() { return getResourceRequirementsSupplier.get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java index b1bdfc41e23784d561665086ce5599f2dcb54564..84cf9519bae54bde5d083ad448f8fcf7134d78f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java @@ -66,6 +66,7 @@ public class TestingDeclarativeSlotPoolBuilder { (ignoredA, ignoredB, ignoredC) -> ResourceCounter.empty(); private Function containsSlotsFunction = ignored -> false; private LongConsumer returnIdleSlotsConsumer = ignored -> {}; + private Consumer setResourceRequirementsConsumer = ignored -> {}; public TestingDeclarativeSlotPoolBuilder setIncreaseResourceRequirementsByConsumer( Consumer increaseResourceRequirementsByConsumer) { @@ -79,6 +80,12 @@ public class TestingDeclarativeSlotPoolBuilder { return this; } + public TestingDeclarativeSlotPoolBuilder setSetResourceRequirementsConsumer( + Consumer setResourceRequirementsConsumer) { + this.setResourceRequirementsConsumer = setResourceRequirementsConsumer; + return this; + } + public TestingDeclarativeSlotPoolBuilder setGetResourceRequirementsSupplier( Supplier> getResourceRequirementsSupplier) { this.getResourceRequirementsSupplier = getResourceRequirementsSupplier; @@ -159,6 +166,7 @@ public class TestingDeclarativeSlotPoolBuilder { reserveFreeSlotFunction, freeReservedSlotFunction, containsSlotsFunction, - returnIdleSlotsConsumer); + returnIdleSlotsConsumer, + setResourceRequirementsConsumer); } }