未验证 提交 8dc257dc 编写于 作者: Y Yangze Guo 提交者: Xintong Song

[FLINK-20837][runtime] Refactor dynamic SlotID

This closes #14560
上级 c6443a50
......@@ -90,8 +90,8 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
return resourceId + "_" + (slotNumber >= 0 ? slotNumber : "dynamic");
}
/** Generate a SlotID without actual slot index for dynamic slot allocation. */
public static SlotID generateDynamicSlotID(ResourceID resourceID) {
/** Get a SlotID without actual slot index for dynamic slot allocation. */
public static SlotID getDynamicSlotID(ResourceID resourceID) {
return new SlotID(resourceID);
}
}
......@@ -95,6 +95,9 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
/** The table state. */
private volatile State state;
/** Current index for dynamic slot, should always not less than numberSlots */
private int dynamicSlotIndex;
private final ResourceBudgetManager budgetManager;
/** The closing future is completed when all slot are freed and state is closed. */
......@@ -120,6 +123,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
0 < numberSlots, "The number of task slots must be greater than 0.");
this.numberSlots = numberSlots;
this.dynamicSlotIndex = numberSlots;
this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
this.memoryPageSize = memoryPageSize;
......@@ -247,11 +251,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
}
for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
if (taskSlot.getIndex() < 0) {
SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
if (isDynamicIndex(taskSlot.getIndex())) {
SlotStatus slotStatus =
new SlotStatus(
slotID,
new SlotID(resourceId, taskSlot.getIndex()),
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
taskSlot.getAllocationId());
......@@ -277,14 +280,18 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
@Override
public boolean allocateSlot(
int index,
int requestedIndex,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
Time slotTimeout) {
checkRunning();
Preconditions.checkArgument(index < numberSlots);
Preconditions.checkArgument(requestedIndex < numberSlots);
// The negative requestIndex indicate that the SlotManger allocate a dynamic slot, we
// transfer the index to an increasing number not less than the numberSlots.
int index = requestedIndex < 0 ? nextDynamicSlotIndex() : requestedIndex;
TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
......@@ -297,7 +304,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
return false;
}
resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
resourceProfile = isDynamicIndex(index) ? resourceProfile : defaultSlotResourceProfile;
if (!budgetManager.reserve(resourceProfile)) {
LOG.info(
......@@ -317,9 +324,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
jobId,
allocationId,
memoryVerificationExecutor);
if (index >= 0) {
taskSlots.put(index, taskSlot);
}
taskSlots.put(index, taskSlot);
// update the allocation id to task slot map
allocatedSlots.put(allocationId, taskSlot);
......@@ -351,13 +356,17 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
index);
return taskSlot.getJobId().equals(jobId)
&& taskSlot.getResourceProfile().equals(resourceProfile)
&& (index < 0 || taskSlot.getIndex() == index);
&& (isDynamicIndex(index) || taskSlot.getIndex() == index);
}
private boolean isIndexAlreadyTaken(int index) {
return taskSlots.get(index) != null;
}
private boolean isDynamicIndex(int index) {
return index >= numberSlots;
}
@Override
public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
checkRunning();
......@@ -469,8 +478,6 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
TaskSlot<T> taskSlot = taskSlots.get(index);
if (taskSlot != null) {
return taskSlot.isAllocated(jobId, allocationId);
} else if (index < 0) {
return allocatedSlots.containsKey(allocationId);
} else {
return false;
}
......@@ -625,6 +632,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
return allocatedSlots.get(allocationId);
}
private int nextDynamicSlotIndex() {
return dynamicSlotIndex++;
}
private void checkRunning() {
Preconditions.checkState(
state == State.RUNNING,
......
......@@ -2075,7 +2075,7 @@ public class TaskExecutorTest extends TestLogger {
.taskExecutor
.getSelfGateway(TaskExecutorGateway.class)
.requestSlot(
SlotID.generateDynamicSlotID(ResourceID.generate()),
SlotID.getDynamicSlotID(ResourceID.generate()),
jobId,
allocationId,
resourceProfile,
......@@ -2092,7 +2092,7 @@ public class TaskExecutorTest extends TestLogger {
new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE),
new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE),
new SlotStatus(
SlotID.generateDynamicSlotID(resourceId),
new SlotID(resourceId, 2),
resourceProfile,
jobId,
allocationId)));
......
......@@ -158,7 +158,7 @@ public class TaskSlotTableImplTest extends TestLogger {
assertThat(
taskSlotTable.allocateSlot(-1, jobId2, allocationId, SLOT_TIMEOUT), is(false));
assertThat(taskSlotTable.isAllocated(-1, jobId1, allocationId), is(true));
assertThat(taskSlotTable.isAllocated(1, jobId1, allocationId), is(true));
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
taskSlotTable.getAllocatedSlots(jobId1);
......@@ -201,7 +201,7 @@ public class TaskSlotTableImplTest extends TestLogger {
allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
TaskSlot<TaskSlotPayload> taskSlot2 = allocatedSlots.next();
assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true));
assertThat(taskSlotTable.isAllocated(1, jobId, allocationId), is(true));
assertEquals(taskSlot1, taskSlot2);
assertThat(allocatedSlots.hasNext(), is(false));
}
......@@ -239,9 +239,9 @@ public class TaskSlotTableImplTest extends TestLogger {
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.next().getIndex(), is(-1));
assertThat(allocatedSlots.next().getIndex(), is(2));
assertThat(allocatedSlots.hasNext(), is(false));
assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true));
assertThat(taskSlotTable.isAllocated(2, jobId, allocationId), is(true));
}
}
......@@ -262,7 +262,7 @@ public class TaskSlotTableImplTest extends TestLogger {
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
taskSlotTable.getAllocatedSlots(jobId);
TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
assertThat(allocatedSlot.getIndex(), is(-1));
assertThat(allocatedSlot.getIndex(), is(2));
assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile));
assertThat(allocatedSlots.hasNext(), is(false));
}
......@@ -305,7 +305,7 @@ public class TaskSlotTableImplTest extends TestLogger {
taskSlotTable.allocateSlot(-1, jobId, allocationId3, SLOT_TIMEOUT),
is(true)); // index 4
assertThat(taskSlotTable.freeSlot(allocationId2), is(-1));
assertThat(taskSlotTable.freeSlot(allocationId2), is(3));
ResourceID resourceId = ResourceID.generate();
SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
......@@ -336,7 +336,7 @@ public class TaskSlotTableImplTest extends TestLogger {
null)),
is(
new SlotStatus(
SlotID.generateDynamicSlotID(resourceId),
new SlotID(resourceId, 4),
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
jobId,
allocationId3))));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册