From e98d251acaa5cb73afca50571d7f9ff7ff4f9e36 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 8 Jan 2021 15:49:29 +0100 Subject: [PATCH] [FLINK-20901] Add DeclarativeSlotPool.setResourceRequirements This commits adds DeclarativeSlotPool.setResourceRequirements which sets the absolutely required resources. Hence, this method can be used to overwrite the currently set resource requirements. This closes #14589. --- .../slotpool/DeclarativeSlotPool.java | 7 +++++ .../slotpool/DefaultDeclarativeSlotPool.java | 7 +++++ .../DefaultDeclarativeSlotPoolTest.java | 29 +++++++++++++++++++ .../slotpool/TestingDeclarativeSlotPool.java | 11 ++++++- .../TestingDeclarativeSlotPoolBuilder.java | 10 ++++++- 5 files changed, 62 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index 81dc05ee97b..656a2efa5df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -55,6 +55,13 @@ public interface DeclarativeSlotPool { */ void decreaseResourceRequirementsBy(ResourceCounter decrement); + /** + * Sets the resource requirements to the given resourceRequirements. + * + * @param resourceRequirements new resource requirements + */ + void setResourceRequirements(ResourceCounter resourceRequirements); + /** * Returns the current resource requirements. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index bd8a27a17e4..279992b531e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -124,6 +124,13 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { declareResourceRequirements(); } + @Override + public void setResourceRequirements(ResourceCounter resourceRequirements) { + totalResourceRequirements = resourceRequirements; + + declareResourceRequirements(); + } + private void declareResourceRequirements() { final Collection resourceRequirements = getResourceRequirements(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index 602aca895c7..60046cff72b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -515,6 +515,35 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { hasItems(ResourceRequirement.create(largeResourceProfile, 2))); } + @Test + public void testSetResourceRequirementsForInitialResourceRequirements() { + final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + + final ResourceCounter resourceRequirements = + ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); + + slotPool.setResourceRequirements(resourceRequirements); + + assertThat( + slotPool.getResourceRequirements(), + is(toResourceRequirements(resourceRequirements))); + } + + @Test + public void testSetResourceRequirementsOverwritesPreviousValue() { + final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + + slotPool.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1, 1)); + + final ResourceCounter resourceRequirements = + ResourceCounter.withResource(RESOURCE_PROFILE_2, 1); + slotPool.setResourceRequirements(resourceRequirements); + + assertThat( + slotPool.getResourceRequirements(), + is(toResourceRequirements(resourceRequirements))); + } + @Nonnull private static ResourceCounter createResourceRequirements() { final Map requirements = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index 4828bf282ff..a2bac2b1592 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -72,6 +72,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { private final LongConsumer releaseIdleSlotsConsumer; + private final Consumer setResourceRequirementsConsumer; + TestingDeclarativeSlotPool( Consumer increaseResourceRequirementsByConsumer, Consumer decreaseResourceRequirementsByConsumer, @@ -90,7 +92,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { BiFunction reserveFreeSlotFunction, TriFunction freeReservedSlotFunction, Function containsSlotsFunction, - LongConsumer releaseIdleSlotsConsumer) { + LongConsumer releaseIdleSlotsConsumer, + Consumer setResourceRequirementsConsumer) { this.increaseResourceRequirementsByConsumer = increaseResourceRequirementsByConsumer; this.decreaseResourceRequirementsByConsumer = decreaseResourceRequirementsByConsumer; this.getResourceRequirementsSupplier = getResourceRequirementsSupplier; @@ -103,6 +106,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { this.freeReservedSlotFunction = freeReservedSlotFunction; this.containsSlotsFunction = containsSlotsFunction; this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer; + this.setResourceRequirementsConsumer = setResourceRequirementsConsumer; } @Override @@ -115,6 +119,11 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { decreaseResourceRequirementsByConsumer.accept(decrement); } + @Override + public void setResourceRequirements(ResourceCounter resourceRequirements) { + setResourceRequirementsConsumer.accept(resourceRequirements); + } + @Override public Collection getResourceRequirements() { return getResourceRequirementsSupplier.get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java index b1bdfc41e23..84cf9519bae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java @@ -66,6 +66,7 @@ public class TestingDeclarativeSlotPoolBuilder { (ignoredA, ignoredB, ignoredC) -> ResourceCounter.empty(); private Function containsSlotsFunction = ignored -> false; private LongConsumer returnIdleSlotsConsumer = ignored -> {}; + private Consumer setResourceRequirementsConsumer = ignored -> {}; public TestingDeclarativeSlotPoolBuilder setIncreaseResourceRequirementsByConsumer( Consumer increaseResourceRequirementsByConsumer) { @@ -79,6 +80,12 @@ public class TestingDeclarativeSlotPoolBuilder { return this; } + public TestingDeclarativeSlotPoolBuilder setSetResourceRequirementsConsumer( + Consumer setResourceRequirementsConsumer) { + this.setResourceRequirementsConsumer = setResourceRequirementsConsumer; + return this; + } + public TestingDeclarativeSlotPoolBuilder setGetResourceRequirementsSupplier( Supplier> getResourceRequirementsSupplier) { this.getResourceRequirementsSupplier = getResourceRequirementsSupplier; @@ -159,6 +166,7 @@ public class TestingDeclarativeSlotPoolBuilder { reserveFreeSlotFunction, freeReservedSlotFunction, containsSlotsFunction, - returnIdleSlotsConsumer); + returnIdleSlotsConsumer, + setResourceRequirementsConsumer); } } -- GitLab