[FLINK-20901] Add DeclarativeSlotPool.setResourceRequirements

This commits adds DeclarativeSlotPool.setResourceRequirements which sets the absolutely
required resources. Hence, this method can be used to overwrite the currently set
resource requirements.

This closes #14589.
上级 2e220d84
......@@ -55,6 +55,13 @@ public interface DeclarativeSlotPool {
*/
void decreaseResourceRequirementsBy(ResourceCounter decrement);
/**
* Sets the resource requirements to the given resourceRequirements.
*
* @param resourceRequirements new resource requirements
*/
void setResourceRequirements(ResourceCounter resourceRequirements);
/**
* Returns the current resource requirements.
*
......
......@@ -124,6 +124,13 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
declareResourceRequirements();
}
@Override
public void setResourceRequirements(ResourceCounter resourceRequirements) {
totalResourceRequirements = resourceRequirements;
declareResourceRequirements();
}
private void declareResourceRequirements() {
final Collection<ResourceRequirement> resourceRequirements = getResourceRequirements();
......
......@@ -515,6 +515,35 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger {
hasItems(ResourceRequirement.create(largeResourceProfile, 2)));
}
@Test
public void testSetResourceRequirementsForInitialResourceRequirements() {
final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
final ResourceCounter resourceRequirements =
ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
slotPool.setResourceRequirements(resourceRequirements);
assertThat(
slotPool.getResourceRequirements(),
is(toResourceRequirements(resourceRequirements)));
}
@Test
public void testSetResourceRequirementsOverwritesPreviousValue() {
final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
slotPool.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1, 1));
final ResourceCounter resourceRequirements =
ResourceCounter.withResource(RESOURCE_PROFILE_2, 1);
slotPool.setResourceRequirements(resourceRequirements);
assertThat(
slotPool.getResourceRequirements(),
is(toResourceRequirements(resourceRequirements)));
}
@Nonnull
private static ResourceCounter createResourceRequirements() {
final Map<ResourceProfile, Integer> requirements = new HashMap<>();
......
......@@ -72,6 +72,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool {
private final LongConsumer releaseIdleSlotsConsumer;
private final Consumer<ResourceCounter> setResourceRequirementsConsumer;
TestingDeclarativeSlotPool(
Consumer<ResourceCounter> increaseResourceRequirementsByConsumer,
Consumer<ResourceCounter> decreaseResourceRequirementsByConsumer,
......@@ -90,7 +92,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool {
BiFunction<AllocationID, ResourceProfile, PhysicalSlot> reserveFreeSlotFunction,
TriFunction<AllocationID, Throwable, Long, ResourceCounter> freeReservedSlotFunction,
Function<ResourceID, Boolean> containsSlotsFunction,
LongConsumer releaseIdleSlotsConsumer) {
LongConsumer releaseIdleSlotsConsumer,
Consumer<ResourceCounter> setResourceRequirementsConsumer) {
this.increaseResourceRequirementsByConsumer = increaseResourceRequirementsByConsumer;
this.decreaseResourceRequirementsByConsumer = decreaseResourceRequirementsByConsumer;
this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
......@@ -103,6 +106,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool {
this.freeReservedSlotFunction = freeReservedSlotFunction;
this.containsSlotsFunction = containsSlotsFunction;
this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer;
this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
}
@Override
......@@ -115,6 +119,11 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool {
decreaseResourceRequirementsByConsumer.accept(decrement);
}
@Override
public void setResourceRequirements(ResourceCounter resourceRequirements) {
setResourceRequirementsConsumer.accept(resourceRequirements);
}
@Override
public Collection<ResourceRequirement> getResourceRequirements() {
return getResourceRequirementsSupplier.get();
......
......@@ -66,6 +66,7 @@ public class TestingDeclarativeSlotPoolBuilder {
(ignoredA, ignoredB, ignoredC) -> ResourceCounter.empty();
private Function<ResourceID, Boolean> containsSlotsFunction = ignored -> false;
private LongConsumer returnIdleSlotsConsumer = ignored -> {};
private Consumer<ResourceCounter> setResourceRequirementsConsumer = ignored -> {};
public TestingDeclarativeSlotPoolBuilder setIncreaseResourceRequirementsByConsumer(
Consumer<ResourceCounter> increaseResourceRequirementsByConsumer) {
......@@ -79,6 +80,12 @@ public class TestingDeclarativeSlotPoolBuilder {
return this;
}
public TestingDeclarativeSlotPoolBuilder setSetResourceRequirementsConsumer(
Consumer<ResourceCounter> setResourceRequirementsConsumer) {
this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
return this;
}
public TestingDeclarativeSlotPoolBuilder setGetResourceRequirementsSupplier(
Supplier<Collection<ResourceRequirement>> getResourceRequirementsSupplier) {
this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
......@@ -159,6 +166,7 @@ public class TestingDeclarativeSlotPoolBuilder {
reserveFreeSlotFunction,
freeReservedSlotFunction,
containsSlotsFunction,
returnIdleSlotsConsumer);
returnIdleSlotsConsumer,
setResourceRequirementsConsumer);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册