From 94a08704a61c37807886a41030467e27af0af411 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Thu, 3 Aug 2017 13:50:54 +0800 Subject: [PATCH] Fix resource group memory overuse issue when increasing concurrency. Resource group may have memory overuse in below case: CREATE RESOURCE GROUP rg_concurrency_test WITH (concurrency=1, cpu_rate_limit=20, memory_limit=60, memory_shared_quota=0, memory_spill_ratio=10); CREATE ROLE role_concurrency_test RESOURCE GROUP rg_concurrency_test; 11:SET ROLE role_concurrency_test; 11:BEGIN; 21:SET ROLE role_concurrency_test; 22:SET ROLE role_concurrency_test; 21&:BEGIN; 22&:BEGIN; ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2; 11:END; The cause is that we didn't check overall memory quota usage in the past, so pending queries can be waken up as long as the concurrency limit is not reached, in such a case if the currently running tranctions have used all the memory quota in the resource group then the overall memory usage will be exceeded. To fix this issue we now checks both concurrency limit and memory quota usage to decide whether to wake up pending queries. Signed-off-by: Zhenghua Lyu --- src/backend/storage/lmgr/proc.c | 1 + src/backend/utils/resgroup/resgroup.c | 415 +++++++++++++----- src/include/storage/proc.h | 1 + src/include/utils/resgroup.h | 5 +- .../expected/resgroup_alter_concurrency.out | 137 ++++++ .../expected/resgroup_concurrency.out | 2 +- .../isolation2/input/enable_resgroup.source | 1 + .../isolation2/isolation2_resgroup_schedule | 5 +- .../isolation2/output/enable_resgroup.source | 3 + .../sql/resgroup_alter_concurrency.sql | 76 ++++ 10 files changed, 535 insertions(+), 111 deletions(-) create mode 100644 src/test/isolation2/expected/resgroup_alter_concurrency.out create mode 100644 src/test/isolation2/sql/resgroup_alter_concurrency.sql diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 386f2350fa..b26af69a2f 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -374,6 +374,7 @@ InitProcess(void) MyProc->waitProcLock = NULL; MyProc->resWaiting = false; MyProc->resGranted = false; + MyProc->resSlotId = -1; for (i = 0; i < NUM_LOCK_PARTITIONS; i++) SHMQueueInit(&(MyProc->myProcLocks[i])); diff --git a/src/backend/utils/resgroup/resgroup.c b/src/backend/utils/resgroup/resgroup.c index 931cd4cdd1..73cf33fb90 100644 --- a/src/backend/utils/resgroup/resgroup.c +++ b/src/backend/utils/resgroup/resgroup.c @@ -43,6 +43,18 @@ #define InvalidSlotId (-1) #define RESGROUP_MAX_SLOTS 90 +/* + * Stock is a virtual unit used to manage the memory quota in resource groups. + * The GPDB system is supposed to have RESGROUP_MAX_MEM_STOCKS stocks, + * each resource group gets some stocks from the GPDB system according to + * its memory_limit setting, each resource group slot gets some stocks + * from the resource group according to the concurrency setting. + * + * We choose the name "stocks" instead of "shares" or "quota" because they + * have other meanings in resource groups. + */ +#define RESGROUP_MAX_MEM_STOCKS 1000000 + /* * GUC variables. */ @@ -62,6 +74,11 @@ typedef struct ResGroupHashEntry int index; } ResGroupHashEntry; +/* + * Resource group config snapshot. + * + * All memory & cpu configs are in percentage. + */ typedef struct ResGroupConfigSnapshot { int concurrency; @@ -98,14 +115,17 @@ typedef struct ResGroupSlotData { int sessionId; - uint32 segmentChunks; /* total memory in chunks for segment */ + ResGroupConfigSnapshot config; - int memLimit; /* memory limit of current resource group */ - int memSharedQuota; /* shared memory quota of current resource group */ + int32 segmentChunks; /* total memory in chunks for segment */ - int memQuota; /* memory quota of current slot */ - int memSpill; /* memory spill of current slot */ - uint32 memUsage; /* total memory usage of procs belongs to this slot */ + int32 memLimit; /* memory limit of current resource group */ + int32 memSharedQuota; /* shared memory quota of current resource group */ + + int32 memStocks; /* memory stocks of current slot */ + int32 memQuota; /* memory quota of current slot */ + int32 memSpill; /* memory spill of current slot */ + int32 memUsage; /* total memory usage of procs belongs to this slot */ int nProcs; /* number of procs in this slot */ bool inUse; } ResGroupSlotData; @@ -124,13 +144,15 @@ typedef struct ResGroupData bool lockedForDrop; /* true if resource group is dropped but not committed yet */ + int32 memStocksGranted; /* memory stocks granted to all the running slots */ + /* * memory usage of this group, should always equal to the * sum of session memory(session_state->sessionVmem) that * belongs to this group */ - uint32 memUsage; - uint32 memSharedUsage; + int32 memUsage; + int32 memSharedUsage; ResGroupSlotData slots[RESGROUP_MAX_SLOTS]; } ResGroupData; @@ -182,7 +204,9 @@ static void attachToSlot(ResGroupData *group, static void detachFromSlot(ResGroupData *group, ResGroupSlotData *slot, ResGroupProcData *self); -static int getFreeSlot(void); +static int getFreeSlot(ResGroupData *group); +static int getSlot(ResGroupData *group); +static void putSlot(ResGroupData *group, int slotId); static int ResGroupSlotAcquire(void); static void addTotalQueueDuration(ResGroupData *group); static void ResGroupSlotRelease(void); @@ -344,6 +368,14 @@ InitResGroups(void) if (pResGroupControl->loaded) goto exit; + if (Gp_role == GP_ROLE_DISPATCH) + { + cdbComponentDBs = getCdbComponentDatabases(); + qdinfo = &cdbComponentDBs->entry_db_info[0]; + pResGroupControl->segmentsOnMaster = qdinfo->hostSegs; + Assert(pResGroupControl->segmentsOnMaster > 0); + } + ResGroupOps_Init(); numGroups = 0; @@ -367,13 +399,6 @@ InitResGroups(void) } systable_endscan(sscan); - if (Gp_role == GP_ROLE_DISPATCH) - { - cdbComponentDBs = getCdbComponentDatabases(); - qdinfo = &cdbComponentDBs->entry_db_info[0]; - pResGroupControl->segmentsOnMaster = qdinfo->hostSegs; - } - pResGroupControl->loaded = true; LOG_RESGROUP_DEBUG(LOG, "initialized %d resource groups", numGroups); @@ -432,7 +457,8 @@ ResGroupCheckForDrop(Oid groupId, char *name) * * This function is called in the callback function of DROP RESOURCE GROUP. */ -void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) +void +ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) { int wakeNum; PROC_QUEUE *waitQueue; @@ -454,7 +480,7 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) waitQueue = &(group->waitProcs); wakeNum = waitQueue->size; - while(wakeNum > 0) + while (wakeNum > 0) { PGPROC *waitProc; @@ -465,6 +491,7 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) waitProc->resWaiting = false; waitProc->resGranted = false; + waitProc->resSlotId = InvalidSlotId; SetLatch(&waitProc->procLatch); wakeNum--; } @@ -491,7 +518,6 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) */ void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed) { - int wakeNum; PROC_QUEUE *waitQueue; ResGroupData *group; @@ -506,16 +532,16 @@ void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed) errmsg("Cannot find resource group %d in shared memory", groupId))); } - waitQueue = &(group->waitProcs); - - if (proposed <= group->nRunning) - wakeNum = 0; - else - wakeNum = Min(proposed - group->nRunning, waitQueue->size); + waitQueue = &group->waitProcs; - while(wakeNum > 0) + while (waitQueue->size > 0) { - PGPROC *waitProc; + PGPROC *waitProc; + int slotId; + + slotId = getSlot(group); + if (slotId == InvalidSlotId) + break; /* wake up one process in the wait queue */ waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next); @@ -524,10 +550,8 @@ void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed) waitProc->resWaiting = false; waitProc->resGranted = true; + waitProc->resSlotId = slotId; SetLatch(&waitProc->procLatch); - - group->nRunning++; - wakeNum--; } LWLockRelease(ResGroupLock); @@ -671,6 +695,8 @@ ResGroupReserveMemory(int32 memoryChunks, int32 overuseChunks, bool *waiverUsed) Assert(sharedInfo != NULL); Assert(sharedInfo->groupId != InvalidOid); Assert(procInfo->slotId != InvalidSlotId); + Assert(sharedInfo->memUsage >= 0); + Assert(procInfo->memUsage >= 0); slot = &sharedInfo->slots[procInfo->slotId]; @@ -691,7 +717,7 @@ ResGroupReserveMemory(int32 memoryChunks, int32 overuseChunks, bool *waiverUsed) if (CritSectionCount == 0 && total > slot->memSharedQuota + overuseChunks) { - uint32 oldUsage; + int32 oldUsage; oldUsage = pg_atomic_fetch_sub_u32((pg_atomic_uint32 *)&sharedInfo->memSharedUsage, slotMemSharedNeeded); @@ -732,7 +758,7 @@ ResGroupReleaseMemory(int32 memoryChunks) ResGroupSlotData *slot; ResGroupProcData *procInfo = MyResGroupProcInfo; ResGroupData *sharedInfo = MyResGroupSharedInfo; - uint32 oldUsage; + int32 oldUsage; if (!IsResGroupEnabled()) return; @@ -819,7 +845,7 @@ ResourceGroupGetQueryMemoryLimit(void) ResGroupSlotData *slot; Assert(MyResGroupSharedInfo != NULL); Assert(MyResGroupProcInfo != NULL); - Assert(MyResGroupProcInfo->slotId != InvalidOid); + Assert(MyResGroupProcInfo->slotId != InvalidSlotId); if (IsResManagerMemoryPolicyNone()) return 0; @@ -854,6 +880,7 @@ ResGroupCreate(Oid groupId) group->totalQueued = 0; group->memUsage = 0; group->memSharedUsage = 0; + group->memStocksGranted = 0; memset(&group->totalQueuedTime, 0, sizeof(group->totalQueuedTime)); group->lockedForDrop = false; memset(group->slots, 0, sizeof(group->slots)); @@ -935,8 +962,13 @@ detachFromSlot(ResGroupData *group, Assert(value >= 0); } +/* + * Get a free resource group slot. + * + * A free resource group slot has inUse == false, no other information is checked. + */ static int -getFreeSlot(void) +getFreeSlot(ResGroupData *group) { int i; @@ -944,10 +976,10 @@ getFreeSlot(void) for (i = 0; i < RESGROUP_MAX_SLOTS; i++) { - if (MyResGroupSharedInfo->slots[i].inUse) + if (group->slots[i].inUse) continue; - MyResGroupSharedInfo->slots[i].inUse = true; + group->slots[i].inUse = true; return i; } @@ -955,6 +987,138 @@ getFreeSlot(void) return InvalidSlotId; } +/* + * Get a slot with memory quota granted. + * + * A slot can be got with this function if there is enough memory quota + * available and the concurrency limit is not reached. + * + * On success the memory quota is marked as granted, nRunning is increased + * and the slot's inUse flag is also set, the slot id is returned. + * + * On failure nothing is changed and InvalidSlotId is returned. + */ +static int +getSlot(ResGroupData *group) +{ + ResGroupConfigSnapshot config; + ResGroupSlotData *slot; + int32 groupMemStocks; + int32 slotMemStocks; + int32 memStocksGranted; + int32 segmentChunks; + int32 memLimit; + int slotId; + + Assert(LWLockHeldExclusiveByMe(ResGroupLock)); + Assert(Gp_role == GP_ROLE_DISPATCH); + + Assert(group != NULL); + Assert(group->groupId != InvalidOid); + + /* First check if the concurrency limit is reached */ + GetConcurrencyForResGroup(group->groupId, NULL, &config.concurrency); + Assert(config.concurrency > 0); + + if (group->nRunning >= config.concurrency) + return InvalidSlotId; + + /* Then check for memory stocks */ + GetMemoryCapabilitiesForResGroup(group->groupId, + &config.memoryLimit, + &config.sharedQuota, + &config.spillRatio); + Assert(pResGroupControl->segmentsOnMaster > 0); + + groupMemStocks = RESGROUP_MAX_MEM_STOCKS * config.memoryLimit / 100; + slotMemStocks = groupMemStocks / config.concurrency; + + Assert(slotMemStocks > 0); + Assert(group->memStocksGranted >= 0); + Assert(group->memStocksGranted <= groupMemStocks); + + memStocksGranted = pg_atomic_add_fetch_u32((pg_atomic_uint32*) &group->memStocksGranted, + slotMemStocks); + + if (memStocksGranted > groupMemStocks) + { + /* No enough memory quota available, give up */ + memStocksGranted = pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->memStocksGranted, + slotMemStocks); + Assert(memStocksGranted >= 0); + return InvalidSlotId; + } + + /* Now actually get a free slot */ + slotId = getFreeSlot(group); + Assert(slotId != InvalidSlotId); + + slot = &group->slots[slotId]; + Assert(slot->inUse); + + /* Grant the memory stocks to it */ + Assert(slot->memStocks == 0); + slot->memStocks = slotMemStocks; + + /* Grant the memory quota to it */ + segmentChunks = ResGroupOps_GetTotalMemory() + * gp_resource_group_memory_limit + / pResGroupControl->segmentsOnMaster; + memLimit = segmentChunks * config.memoryLimit / 100; + slot->memQuota = memLimit * (100 - config.sharedQuota) / config.concurrency / 100; + + /* Store the config snapshot to it */ + slot->config = config; + + /* And finally increase nRunning */ + pg_atomic_add_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1); + + return slotId; +} + +/* + * Put back a slot. + * + * This will release a slot, its memory quota will be freed and + * nRunning will be decreased. + */ +static void +putSlot(ResGroupData *group, int slotId) +{ + ResGroupSlotData *slot; +#ifdef USE_ASSERT_CHECKING + int32 memStocksGranted; +#endif + + Assert(LWLockHeldExclusiveByMe(ResGroupLock)); + Assert(Gp_role == GP_ROLE_DISPATCH); + Assert(group != NULL); + Assert(group->memStocksGranted >= 0); + Assert(group->nRunning > 0); + Assert(slotId != InvalidSlotId); + + slot = &group->slots[slotId]; + + Assert(slot->inUse); + Assert(slot->memQuota > 0); + Assert(slot->memStocks > 0); + + /* Return the memory quota granted to this slot */ +#ifdef USE_ASSERT_CHECKING + memStocksGranted = +#endif + pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->memStocksGranted, + slot->memStocks); + Assert(memStocksGranted >= 0); + + /* Mark the slot as free */ + slot->memStocks = 0; + slot->inUse = false; + + /* And finally decrease nRunning */ + pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1); +} + /* * Acquire a resource group slot * @@ -966,7 +1130,6 @@ ResGroupSlotAcquire(void) ResGroupData *group; Oid groupId; int concurrencyProposed; - int slotId; bool retried = false; Assert(MyResGroupProcInfo->groupId == InvalidOid); @@ -1014,12 +1177,27 @@ retry: /* acquire a slot */ if (group->nRunning < concurrencyProposed) { - group->nRunning++; + /* should not been granted a slot yet */ + Assert(MyProc->resSlotId == InvalidSlotId); + + /* so try to get one directly */ + MyProc->resSlotId = getSlot(group); + + /* if can't get one */ + if (MyProc->resSlotId == InvalidSlotId) + { + /* then wait one from some others */ + ResGroupWait(group, true); + LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); + + Assert(MyProc->resSlotId != InvalidSlotId); + } + group->totalExecuted++; - slotId = getFreeSlot(); LWLockRelease(ResGroupLock); pgstat_report_resgroup(0, group->groupId); - return slotId; + Assert(MyProc->resSlotId != InvalidSlotId); + return MyProc->resSlotId; } /* We have to wait for the slot */ @@ -1032,9 +1210,9 @@ retry: LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); group->totalExecuted++; addTotalQueueDuration(group); - slotId = getFreeSlot(); LWLockRelease(ResGroupLock); - return slotId; + Assert(MyProc->resSlotId != InvalidSlotId); + return MyProc->resSlotId; } /* Update the total queued time of this group */ @@ -1063,39 +1241,50 @@ ResGroupSlotRelease(void) ResGroupData *group; PROC_QUEUE *waitQueue; PGPROC *waitProc; - int concurrencyProposed; group = MyResGroupSharedInfo; Assert(group != NULL); - GetConcurrencyForResGroup(group->groupId, NULL, &concurrencyProposed); - LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); - waitQueue = &(group->waitProcs); + waitQueue = &group->waitProcs; - if ((group->nRunning > concurrencyProposed) || - waitQueue->size == 0) + Assert(MyProc->resSlotId != InvalidSlotId); + putSlot(group, MyProc->resSlotId); + MyProc->resSlotId = InvalidSlotId; + + /* + * My slot is put back, then how many queueing queries should I wake up? + * Maybe zero, maybe one, maybe more, depends on how the resgroup's + * configuration were changed during our execution. + */ + while (waitQueue->size > 0) { - AssertImply(waitQueue->size == 0, - waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) && - waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links)); - Assert(group->nRunning > 0); + int slotId; - group->nRunning--; + slotId = getSlot(group); + if (slotId == InvalidSlotId) + break; + + /* wake up one process in the wait queue */ + waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next); + SHMQueueDelete(&waitProc->links); + waitQueue->size--; + waitProc->resGranted = true; + waitProc->resSlotId = slotId; /* pass the slot to new query */ LWLockRelease(ResGroupLock); - return; + + waitProc->resWaiting = false; + SetLatch(&waitProc->procLatch); + + LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); } - /* wake up one process in the wait queue */ - waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next); - SHMQueueDelete(&(waitProc->links)); - waitQueue->size--; - waitProc->resGranted = true; - LWLockRelease(ResGroupLock); + AssertImply(waitQueue->size == 0, + waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) && + waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links)); - waitProc->resWaiting = false; - SetLatch(&waitProc->procLatch); + LWLockRelease(ResGroupLock); } /* @@ -1136,7 +1325,8 @@ SerializeResGroupInfo(StringInfo str) * Deserialize the resource group information dispatched by QD. */ void -DeserializeResGroupInfo(const char *buf, int len) +DeserializeResGroupInfo(struct ResGroupConfigSnapshot *config, + const char *buf, int len) { int tmp; const char *ptr = buf; @@ -1152,21 +1342,21 @@ DeserializeResGroupInfo(const char *buf, int len) procInfo->slotId = ntohl(tmp); ptr += sizeof(procInfo->slotId); - memcpy(&tmp, ptr, sizeof(procInfo->config.concurrency)); - procInfo->config.concurrency = ntohl(tmp); - ptr += sizeof(procInfo->config.concurrency); + memcpy(&tmp, ptr, sizeof(config->concurrency)); + config->concurrency = ntohl(tmp); + ptr += sizeof(config->concurrency); - memcpy(&tmp, ptr, sizeof(procInfo->config.memoryLimit)); - procInfo->config.memoryLimit = ntohl(tmp); - ptr += sizeof(procInfo->config.memoryLimit); + memcpy(&tmp, ptr, sizeof(config->memoryLimit)); + config->memoryLimit = ntohl(tmp); + ptr += sizeof(config->memoryLimit); - memcpy(&tmp, ptr, sizeof(procInfo->config.sharedQuota)); - procInfo->config.sharedQuota = ntohl(tmp); - ptr += sizeof(procInfo->config.sharedQuota); + memcpy(&tmp, ptr, sizeof(config->sharedQuota)); + config->sharedQuota = ntohl(tmp); + ptr += sizeof(config->sharedQuota); - memcpy(&tmp, ptr, sizeof(procInfo->config.spillRatio)); - procInfo->config.spillRatio = ntohl(tmp); - ptr += sizeof(procInfo->config.spillRatio); + memcpy(&tmp, ptr, sizeof(config->spillRatio)); + config->spillRatio = ntohl(tmp); + ptr += sizeof(config->spillRatio); Assert(len == ptr - buf); } @@ -1195,8 +1385,6 @@ AssignResGroupOnMaster(void) ResGroupData *sharedInfo; ResGroupSlotData *slot; ResGroupProcData *procInfo; - int concurrency; - int memoryLimit, sharedQuota, spillRatio; int slotId; Oid groupId; @@ -1211,19 +1399,17 @@ AssignResGroupOnMaster(void) Assert(groupId != InvalidOid); Assert(!MyResGroupProcInfo->doMemCheck); - /* Get config information */ - GetMemoryCapabilitiesForResGroup(groupId, &memoryLimit, &sharedQuota, &spillRatio); - GetConcurrencyForResGroup(groupId, NULL, &concurrency); - /* Init slot */ slot = &sharedInfo->slots[slotId]; + Assert(slot->memQuota > 0); slot->sessionId = gp_session_id; slot->segmentChunks = ResGroupOps_GetTotalMemory() * gp_resource_group_memory_limit / pResGroupControl->segmentsOnMaster; - slot->memLimit = slot->segmentChunks * memoryLimit / 100; - slot->memSharedQuota = slot->memLimit * sharedQuota / 100; - slot->memQuota = slot->memLimit * (100 - sharedQuota) / concurrency / 100; - slot->memSpill = slot->memLimit * spillRatio / concurrency / 100; + slot->memLimit = slot->segmentChunks * slot->config.memoryLimit / 100; + slot->memSharedQuota = slot->memLimit * slot->config.sharedQuota / 100; + slot->memSpill = slot->memLimit * slot->config.spillRatio + / slot->config.concurrency / 100; + pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); Assert(slot->memLimit > 0); Assert(slot->memQuota > 0); @@ -1231,10 +1417,7 @@ AssignResGroupOnMaster(void) procInfo = MyResGroupProcInfo; procInfo->groupId = groupId; procInfo->slotId = slotId; - procInfo->config.memoryLimit = memoryLimit; - procInfo->config.sharedQuota = sharedQuota; - procInfo->config.spillRatio = spillRatio; - procInfo->config.concurrency = concurrency; + procInfo->config = slot->config; Assert(pResGroupControl != NULL); Assert(pResGroupControl->segmentsOnMaster > 0); @@ -1286,7 +1469,7 @@ UnassignResGroupOnMaster(void) procInfo->slotId = InvalidSlotId; /* Cleanup slotInfo */ - slot->inUse = false; + pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); /* Relesase the slot */ ResGroupSlotRelease(); @@ -1305,6 +1488,7 @@ SwitchResGroupOnSegment(const char *buf, int len) { Oid prevGroupId; int prevSlotId; + ResGroupConfigSnapshot config; ResGroupData *sharedInfo; ResGroupSlotData *slot; ResGroupProcData *procInfo; @@ -1318,7 +1502,7 @@ SwitchResGroupOnSegment(const char *buf, int len) /* Stop memory limit checking */ procInfo->doMemCheck = false; - DeserializeResGroupInfo(buf, len); + DeserializeResGroupInfo(&config, buf, len); AssertImply(procInfo->groupId != InvalidOid, procInfo->slotId != InvalidSlotId); @@ -1361,18 +1545,21 @@ SwitchResGroupOnSegment(const char *buf, int len) /* Init MyResGroupProcInfo */ Assert(host_segments > 0); - Assert(procInfo->config.concurrency > 0); + Assert(config.concurrency > 0); + Assert(procInfo->slotId != InvalidSlotId); + procInfo->config = config; /* Init slot */ slot = &sharedInfo->slots[procInfo->slotId]; slot->sessionId = gp_session_id; + slot->config = config; slot->segmentChunks = ResGroupOps_GetTotalMemory() * gp_resource_group_memory_limit / host_segments; - slot->memLimit = slot->segmentChunks * procInfo->config.memoryLimit / 100; - slot->memSharedQuota = slot->memLimit * procInfo->config.sharedQuota / 100; + slot->memLimit = slot->segmentChunks * slot->config.memoryLimit / 100; + slot->memSharedQuota = slot->memLimit * slot->config.sharedQuota / 100; slot->memQuota = slot->memLimit - * (100 - procInfo->config.sharedQuota) - / procInfo->config.concurrency + * (100 - slot->config.sharedQuota) + / slot->config.concurrency / 100; slot->memSpill = slot->memLimit * procInfo->config.spillRatio @@ -1565,7 +1752,7 @@ AtProcExit_ResGroup(int code, Datum arg) * ResGroupData entry may have been removed if the DROP is committed. */ static void -ResGroupWaitCancel() +ResGroupWaitCancel(void) { ResGroupData *group; PROC_QUEUE *waitQueue; @@ -1579,13 +1766,15 @@ ResGroupWaitCancel() /* We are sure to be interrupted in the for loop of ResGroupWait now */ LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); + waitQueue = &group->waitProcs; + if (MyProc->links.next != INVALID_OFFSET) { /* Still waiting on the queue when get interrupted, remove myself from the queue */ - waitQueue = &(group->waitProcs); Assert(waitQueue->size > 0); Assert(MyProc->resWaiting); + Assert(MyProc->resSlotId == InvalidSlotId); addTotalQueueDuration(group); @@ -1595,29 +1784,39 @@ ResGroupWaitCancel() else if (MyProc->links.next == INVALID_OFFSET && MyProc->resGranted) { /* Woken up by a slot holder */ + + Assert(MyProc->resSlotId != InvalidSlotId); + putSlot(group, MyProc->resSlotId); + MyProc->resSlotId = InvalidSlotId; + group->totalExecuted++; addTotalQueueDuration(group); - waitQueue = &(group->waitProcs); - if (waitQueue->size == 0) + /* + * Similar as ResGroupSlotRelease(), how many pending queries to + * wake up depends on how many slots we can get. + */ + while (waitQueue->size > 0) { - /* This is the last transaction on the wait queue, don't have to wake up others */ - Assert(waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) && - waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links)); - Assert(group->nRunning > 0); + int slotId; + + slotId = getSlot(group); + if (slotId == InvalidSlotId) + break; - group->nRunning--; - } - else - { /* wake up one process on the wait queue */ waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next); SHMQueueDelete(&(waitProc->links)); waitQueue->size--; waitProc->resGranted = true; waitProc->resWaiting = false; + waitProc->resSlotId = slotId; /* pass the slot to new query */ SetLatch(&waitProc->procLatch); } + + AssertImply(waitQueue->size == 0, + waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) && + waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links)); } else { diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index cd9e2dc3d3..ddf8aa0f0f 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -172,6 +172,7 @@ struct PGPROC bool resGranted; /* true means a resource group slot is granted. false when wake up from a resource group which is locked for drop */ + int resSlotId; /* the resource group slot id granted */ }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/utils/resgroup.h b/src/include/utils/resgroup.h index 81c089bdc3..edb2c96185 100644 --- a/src/include/utils/resgroup.h +++ b/src/include/utils/resgroup.h @@ -25,6 +25,8 @@ extern int MaxResourceGroups; extern double gp_resource_group_cpu_limit; extern double gp_resource_group_memory_limit; +struct ResGroupConfigSnapshot; + /* Type of statistic infomation */ typedef enum { @@ -54,7 +56,8 @@ extern void AllocResGroupEntry(Oid groupId); extern void FreeResGroupEntry(Oid groupId); extern void SerializeResGroupInfo(StringInfo str); -extern void DeserializeResGroupInfo(const char *buf, int len); +extern void DeserializeResGroupInfo(struct ResGroupConfigSnapshot *config, + const char *buf, int len); extern bool ShouldAssignResGroupOnMaster(void); extern void AssignResGroupOnMaster(void); diff --git a/src/test/isolation2/expected/resgroup_alter_concurrency.out b/src/test/isolation2/expected/resgroup_alter_concurrency.out new file mode 100644 index 0000000000..1f8eb87a00 --- /dev/null +++ b/src/test/isolation2/expected/resgroup_alter_concurrency.out @@ -0,0 +1,137 @@ +-- create a resource group when gp_resource_manager is queue +DROP ROLE IF EXISTS role_concurrency_test; +DROP +-- start_ignore +DROP RESOURCE GROUP rg_concurrency_test; +ERROR: resource group "rg_concurrency_test" does not exist +-- end_ignore +CREATE RESOURCE GROUP rg_concurrency_test WITH (concurrency=1, cpu_rate_limit=20, memory_limit=60, memory_shared_quota=0, memory_spill_ratio=10); +CREATE +CREATE ROLE role_concurrency_test RESOURCE GROUP rg_concurrency_test; +CREATE + +-- +-- increase concurrency after pending queries +-- + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1; +ALTER + +11:SET ROLE role_concurrency_test; +SET +11:BEGIN; +BEGIN + +21:SET ROLE role_concurrency_test; +SET +22:SET ROLE role_concurrency_test; +SET +21&:BEGIN; +22&:BEGIN; + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2; +ALTER + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rsgname |waiting_reason|current_query +-------------------+--------------+------------------------------------------------------------------ +admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rg_concurrency_test| | in transaction +rg_concurrency_test|resgroup |BEGIN; +rg_concurrency_test|resgroup |BEGIN; +(4 rows) + +11:END; +END +11q: ... + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rsgname |waiting_reason|current_query +-------------------+--------------+------------------------------------------------------------------ +admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rg_concurrency_test| | in transaction +rg_concurrency_test| | in transaction +(3 rows) + +21<: <... completed> +BEGIN +22<: <... completed> +BEGIN +21:END; +END +22:END; +END +21q: ... +22q: ... + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rsgname |waiting_reason|current_query +-----------+--------------+------------------------------------------------------------------ +admin_group| |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +(1 row) + +-- +-- increase concurrency before pending queries +-- + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1; +ALTER + +11:SET ROLE role_concurrency_test; +SET +11:BEGIN; +BEGIN + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2; +ALTER + +21:SET ROLE role_concurrency_test; +SET +22:SET ROLE role_concurrency_test; +SET +21&:BEGIN; +22&:BEGIN; + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rsgname |waiting_reason|current_query +-------------------+--------------+------------------------------------------------------------------ +admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rg_concurrency_test| | in transaction +rg_concurrency_test|resgroup |BEGIN; +rg_concurrency_test|resgroup |BEGIN; +(4 rows) + +11:END; +END +11q: ... + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rsgname |waiting_reason|current_query +-------------------+--------------+------------------------------------------------------------------ +admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rg_concurrency_test| | in transaction +rg_concurrency_test| | in transaction +(3 rows) + +21<: <... completed> +BEGIN +22<: <... completed> +BEGIN +21:END; +END +22:END; +END +21q: ... +22q: ... + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +rsgname |waiting_reason|current_query +-----------+--------------+------------------------------------------------------------------ +admin_group| |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; +(1 row) + +-- cleanup +DROP ROLE role_concurrency_test; +DROP +DROP RESOURCE GROUP rg_concurrency_test; +DROP diff --git a/src/test/isolation2/expected/resgroup_concurrency.out b/src/test/isolation2/expected/resgroup_concurrency.out index 9189dcf0ae..0d9a95cf48 100644 --- a/src/test/isolation2/expected/resgroup_concurrency.out +++ b/src/test/isolation2/expected/resgroup_concurrency.out @@ -101,7 +101,7 @@ ALTER SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test'; rsgname |num_running|num_queueing|num_queued|num_executed -------------------+-----------+------------+----------+------------ -rg_concurrency_test|3 |0 |1 |3 +rg_concurrency_test|2 |1 |1 |2 (1 row) SELECT concurrency,proposed_concurrency FROM gp_toolkit.gp_resgroup_config WHERE groupname='rg_concurrency_test'; concurrency|proposed_concurrency diff --git a/src/test/isolation2/input/enable_resgroup.source b/src/test/isolation2/input/enable_resgroup.source index 525ae422f6..f05c7b4684 100644 --- a/src/test/isolation2/input/enable_resgroup.source +++ b/src/test/isolation2/input/enable_resgroup.source @@ -19,6 +19,7 @@ ! bash /tmp/.resgroup_mem_helper.sh; ! rm -f /tmp/.resgroup_mem_helper.sh; ! gpconfig -c gp_resource_manager -v group; +! gpconfig -c gp_resource_group_cpu_limit -v 0.9; ! gpstop -rai; -- end_ignore diff --git a/src/test/isolation2/isolation2_resgroup_schedule b/src/test/isolation2/isolation2_resgroup_schedule index 1406e99a0d..295c6c3224 100644 --- a/src/test/isolation2/isolation2_resgroup_schedule +++ b/src/test/isolation2/isolation2_resgroup_schedule @@ -5,8 +5,12 @@ test: resgroup_syntax test: resgroup_transaction test: resgroup_concurrency +test: resgroup_alter_concurrency test: resgroup_memory_statistic test: resgroup_memory_limit +test: resgroup_cpu_rate_limit + +# memory spill tests test: resgroup_memory_hashagg_spill test: resgroup_memory_hashjoin_spill test: resgroup_memory_materialize_spill @@ -14,6 +18,5 @@ test: resgroup_memory_sisc_mat_sort test: resgroup_memory_sisc_sort_spill test: resgroup_memory_sort_spill test: resgroup_memory_spilltodisk -test: resgroup_cpu_rate_limit test: disable_resgroup diff --git a/src/test/isolation2/output/enable_resgroup.source b/src/test/isolation2/output/enable_resgroup.source index 9169bb7e24..a60f0639b9 100644 --- a/src/test/isolation2/output/enable_resgroup.source +++ b/src/test/isolation2/output/enable_resgroup.source @@ -42,6 +42,9 @@ 20170502:01:28:12:000367 gpconfig:sdw6:gpadmin-[WARNING]:-Managing queries with resource groups is an experimental feature. A work-in-progress version is enabled. 20170502:01:28:13:000367 gpconfig:sdw6:gpadmin-[INFO]:-completed successfully +! gpconfig -c gp_resource_group_cpu_limit -v 0.9; +20170803:10:42:57:015929 gpconfig:nyu-vm-centos:gpadmin-[INFO]:-completed successfully + ! gpstop -rai; -- end_ignore diff --git a/src/test/isolation2/sql/resgroup_alter_concurrency.sql b/src/test/isolation2/sql/resgroup_alter_concurrency.sql new file mode 100644 index 0000000000..e1866e00b0 --- /dev/null +++ b/src/test/isolation2/sql/resgroup_alter_concurrency.sql @@ -0,0 +1,76 @@ +-- create a resource group when gp_resource_manager is queue +DROP ROLE IF EXISTS role_concurrency_test; +-- start_ignore +DROP RESOURCE GROUP rg_concurrency_test; +-- end_ignore +CREATE RESOURCE GROUP rg_concurrency_test WITH +(concurrency=1, cpu_rate_limit=20, memory_limit=60, memory_shared_quota=0, memory_spill_ratio=10); +CREATE ROLE role_concurrency_test RESOURCE GROUP rg_concurrency_test; + +-- +-- increase concurrency after pending queries +-- + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1; + +11:SET ROLE role_concurrency_test; +11:BEGIN; + +21:SET ROLE role_concurrency_test; +22:SET ROLE role_concurrency_test; +21&:BEGIN; +22&:BEGIN; + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2; + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; + +11:END; +11q: + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; + +21<: +22<: +21:END; +22:END; +21q: +22q: + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; + +-- +-- increase concurrency before pending queries +-- + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1; + +11:SET ROLE role_concurrency_test; +11:BEGIN; + +ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2; + +21:SET ROLE role_concurrency_test; +22:SET ROLE role_concurrency_test; +21&:BEGIN; +22&:BEGIN; + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; + +11:END; +11q: + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; + +21<: +22<: +21:END; +22:END; +21q: +22q: + +SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity; + +-- cleanup +DROP ROLE role_concurrency_test; +DROP RESOURCE GROUP rg_concurrency_test; -- GitLab