提交 94a08704 编写于 作者: N Ning Yu 提交者: GitHub

Fix resource group memory overuse issue when increasing concurrency.

Resource group may have memory overuse in below case:

	CREATE RESOURCE GROUP rg_concurrency_test WITH
	(concurrency=1, cpu_rate_limit=20, memory_limit=60,
	 memory_shared_quota=0, memory_spill_ratio=10);
	CREATE ROLE role_concurrency_test RESOURCE GROUP rg_concurrency_test;

	11:SET ROLE role_concurrency_test;
	11:BEGIN;

	21:SET ROLE role_concurrency_test;
	22:SET ROLE role_concurrency_test;
	21&:BEGIN;
	22&:BEGIN;

	ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2;

	11:END;

The cause is that we didn't check overall memory quota usage in the
past, so pending queries can be waken up as long as the concurrency
limit is not reached, in such a case if the currently running tranctions
have used all the memory quota in the resource group then the overall
memory usage will be exceeded.

To fix this issue we now checks both concurrency limit and memory quota
usage to decide whether to wake up pending queries.
Signed-off-by: NZhenghua Lyu <zlv@pivotal.io>
上级 759c19d0
...@@ -374,6 +374,7 @@ InitProcess(void) ...@@ -374,6 +374,7 @@ InitProcess(void)
MyProc->waitProcLock = NULL; MyProc->waitProcLock = NULL;
MyProc->resWaiting = false; MyProc->resWaiting = false;
MyProc->resGranted = false; MyProc->resGranted = false;
MyProc->resSlotId = -1;
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
SHMQueueInit(&(MyProc->myProcLocks[i])); SHMQueueInit(&(MyProc->myProcLocks[i]));
......
...@@ -43,6 +43,18 @@ ...@@ -43,6 +43,18 @@
#define InvalidSlotId (-1) #define InvalidSlotId (-1)
#define RESGROUP_MAX_SLOTS 90 #define RESGROUP_MAX_SLOTS 90
/*
* Stock is a virtual unit used to manage the memory quota in resource groups.
* The GPDB system is supposed to have RESGROUP_MAX_MEM_STOCKS stocks,
* each resource group gets some stocks from the GPDB system according to
* its memory_limit setting, each resource group slot gets some stocks
* from the resource group according to the concurrency setting.
*
* We choose the name "stocks" instead of "shares" or "quota" because they
* have other meanings in resource groups.
*/
#define RESGROUP_MAX_MEM_STOCKS 1000000
/* /*
* GUC variables. * GUC variables.
*/ */
...@@ -62,6 +74,11 @@ typedef struct ResGroupHashEntry ...@@ -62,6 +74,11 @@ typedef struct ResGroupHashEntry
int index; int index;
} ResGroupHashEntry; } ResGroupHashEntry;
/*
* Resource group config snapshot.
*
* All memory & cpu configs are in percentage.
*/
typedef struct ResGroupConfigSnapshot typedef struct ResGroupConfigSnapshot
{ {
int concurrency; int concurrency;
...@@ -98,14 +115,17 @@ typedef struct ResGroupSlotData ...@@ -98,14 +115,17 @@ typedef struct ResGroupSlotData
{ {
int sessionId; int sessionId;
uint32 segmentChunks; /* total memory in chunks for segment */ ResGroupConfigSnapshot config;
int memLimit; /* memory limit of current resource group */ int32 segmentChunks; /* total memory in chunks for segment */
int memSharedQuota; /* shared memory quota of current resource group */
int memQuota; /* memory quota of current slot */ int32 memLimit; /* memory limit of current resource group */
int memSpill; /* memory spill of current slot */ int32 memSharedQuota; /* shared memory quota of current resource group */
uint32 memUsage; /* total memory usage of procs belongs to this slot */
int32 memStocks; /* memory stocks of current slot */
int32 memQuota; /* memory quota of current slot */
int32 memSpill; /* memory spill of current slot */
int32 memUsage; /* total memory usage of procs belongs to this slot */
int nProcs; /* number of procs in this slot */ int nProcs; /* number of procs in this slot */
bool inUse; bool inUse;
} ResGroupSlotData; } ResGroupSlotData;
...@@ -124,13 +144,15 @@ typedef struct ResGroupData ...@@ -124,13 +144,15 @@ typedef struct ResGroupData
bool lockedForDrop; /* true if resource group is dropped but not committed yet */ bool lockedForDrop; /* true if resource group is dropped but not committed yet */
int32 memStocksGranted; /* memory stocks granted to all the running slots */
/* /*
* memory usage of this group, should always equal to the * memory usage of this group, should always equal to the
* sum of session memory(session_state->sessionVmem) that * sum of session memory(session_state->sessionVmem) that
* belongs to this group * belongs to this group
*/ */
uint32 memUsage; int32 memUsage;
uint32 memSharedUsage; int32 memSharedUsage;
ResGroupSlotData slots[RESGROUP_MAX_SLOTS]; ResGroupSlotData slots[RESGROUP_MAX_SLOTS];
} ResGroupData; } ResGroupData;
...@@ -182,7 +204,9 @@ static void attachToSlot(ResGroupData *group, ...@@ -182,7 +204,9 @@ static void attachToSlot(ResGroupData *group,
static void detachFromSlot(ResGroupData *group, static void detachFromSlot(ResGroupData *group,
ResGroupSlotData *slot, ResGroupSlotData *slot,
ResGroupProcData *self); ResGroupProcData *self);
static int getFreeSlot(void); static int getFreeSlot(ResGroupData *group);
static int getSlot(ResGroupData *group);
static void putSlot(ResGroupData *group, int slotId);
static int ResGroupSlotAcquire(void); static int ResGroupSlotAcquire(void);
static void addTotalQueueDuration(ResGroupData *group); static void addTotalQueueDuration(ResGroupData *group);
static void ResGroupSlotRelease(void); static void ResGroupSlotRelease(void);
...@@ -344,6 +368,14 @@ InitResGroups(void) ...@@ -344,6 +368,14 @@ InitResGroups(void)
if (pResGroupControl->loaded) if (pResGroupControl->loaded)
goto exit; goto exit;
if (Gp_role == GP_ROLE_DISPATCH)
{
cdbComponentDBs = getCdbComponentDatabases();
qdinfo = &cdbComponentDBs->entry_db_info[0];
pResGroupControl->segmentsOnMaster = qdinfo->hostSegs;
Assert(pResGroupControl->segmentsOnMaster > 0);
}
ResGroupOps_Init(); ResGroupOps_Init();
numGroups = 0; numGroups = 0;
...@@ -367,13 +399,6 @@ InitResGroups(void) ...@@ -367,13 +399,6 @@ InitResGroups(void)
} }
systable_endscan(sscan); systable_endscan(sscan);
if (Gp_role == GP_ROLE_DISPATCH)
{
cdbComponentDBs = getCdbComponentDatabases();
qdinfo = &cdbComponentDBs->entry_db_info[0];
pResGroupControl->segmentsOnMaster = qdinfo->hostSegs;
}
pResGroupControl->loaded = true; pResGroupControl->loaded = true;
LOG_RESGROUP_DEBUG(LOG, "initialized %d resource groups", numGroups); LOG_RESGROUP_DEBUG(LOG, "initialized %d resource groups", numGroups);
...@@ -432,7 +457,8 @@ ResGroupCheckForDrop(Oid groupId, char *name) ...@@ -432,7 +457,8 @@ ResGroupCheckForDrop(Oid groupId, char *name)
* *
* This function is called in the callback function of DROP RESOURCE GROUP. * This function is called in the callback function of DROP RESOURCE GROUP.
*/ */
void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) void
ResGroupDropCheckForWakeup(Oid groupId, bool isCommit)
{ {
int wakeNum; int wakeNum;
PROC_QUEUE *waitQueue; PROC_QUEUE *waitQueue;
...@@ -454,7 +480,7 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) ...@@ -454,7 +480,7 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit)
waitQueue = &(group->waitProcs); waitQueue = &(group->waitProcs);
wakeNum = waitQueue->size; wakeNum = waitQueue->size;
while(wakeNum > 0) while (wakeNum > 0)
{ {
PGPROC *waitProc; PGPROC *waitProc;
...@@ -465,6 +491,7 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) ...@@ -465,6 +491,7 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit)
waitProc->resWaiting = false; waitProc->resWaiting = false;
waitProc->resGranted = false; waitProc->resGranted = false;
waitProc->resSlotId = InvalidSlotId;
SetLatch(&waitProc->procLatch); SetLatch(&waitProc->procLatch);
wakeNum--; wakeNum--;
} }
...@@ -491,7 +518,6 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit) ...@@ -491,7 +518,6 @@ void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit)
*/ */
void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed) void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed)
{ {
int wakeNum;
PROC_QUEUE *waitQueue; PROC_QUEUE *waitQueue;
ResGroupData *group; ResGroupData *group;
...@@ -506,16 +532,16 @@ void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed) ...@@ -506,16 +532,16 @@ void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed)
errmsg("Cannot find resource group %d in shared memory", groupId))); errmsg("Cannot find resource group %d in shared memory", groupId)));
} }
waitQueue = &(group->waitProcs); waitQueue = &group->waitProcs;
if (proposed <= group->nRunning)
wakeNum = 0;
else
wakeNum = Min(proposed - group->nRunning, waitQueue->size);
while(wakeNum > 0) while (waitQueue->size > 0)
{ {
PGPROC *waitProc; PGPROC *waitProc;
int slotId;
slotId = getSlot(group);
if (slotId == InvalidSlotId)
break;
/* wake up one process in the wait queue */ /* wake up one process in the wait queue */
waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next); waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
...@@ -524,10 +550,8 @@ void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed) ...@@ -524,10 +550,8 @@ void ResGroupAlterCheckForWakeup(Oid groupId, int value, int proposed)
waitProc->resWaiting = false; waitProc->resWaiting = false;
waitProc->resGranted = true; waitProc->resGranted = true;
waitProc->resSlotId = slotId;
SetLatch(&waitProc->procLatch); SetLatch(&waitProc->procLatch);
group->nRunning++;
wakeNum--;
} }
LWLockRelease(ResGroupLock); LWLockRelease(ResGroupLock);
...@@ -671,6 +695,8 @@ ResGroupReserveMemory(int32 memoryChunks, int32 overuseChunks, bool *waiverUsed) ...@@ -671,6 +695,8 @@ ResGroupReserveMemory(int32 memoryChunks, int32 overuseChunks, bool *waiverUsed)
Assert(sharedInfo != NULL); Assert(sharedInfo != NULL);
Assert(sharedInfo->groupId != InvalidOid); Assert(sharedInfo->groupId != InvalidOid);
Assert(procInfo->slotId != InvalidSlotId); Assert(procInfo->slotId != InvalidSlotId);
Assert(sharedInfo->memUsage >= 0);
Assert(procInfo->memUsage >= 0);
slot = &sharedInfo->slots[procInfo->slotId]; slot = &sharedInfo->slots[procInfo->slotId];
...@@ -691,7 +717,7 @@ ResGroupReserveMemory(int32 memoryChunks, int32 overuseChunks, bool *waiverUsed) ...@@ -691,7 +717,7 @@ ResGroupReserveMemory(int32 memoryChunks, int32 overuseChunks, bool *waiverUsed)
if (CritSectionCount == 0 && if (CritSectionCount == 0 &&
total > slot->memSharedQuota + overuseChunks) total > slot->memSharedQuota + overuseChunks)
{ {
uint32 oldUsage; int32 oldUsage;
oldUsage = pg_atomic_fetch_sub_u32((pg_atomic_uint32 *)&sharedInfo->memSharedUsage, oldUsage = pg_atomic_fetch_sub_u32((pg_atomic_uint32 *)&sharedInfo->memSharedUsage,
slotMemSharedNeeded); slotMemSharedNeeded);
...@@ -732,7 +758,7 @@ ResGroupReleaseMemory(int32 memoryChunks) ...@@ -732,7 +758,7 @@ ResGroupReleaseMemory(int32 memoryChunks)
ResGroupSlotData *slot; ResGroupSlotData *slot;
ResGroupProcData *procInfo = MyResGroupProcInfo; ResGroupProcData *procInfo = MyResGroupProcInfo;
ResGroupData *sharedInfo = MyResGroupSharedInfo; ResGroupData *sharedInfo = MyResGroupSharedInfo;
uint32 oldUsage; int32 oldUsage;
if (!IsResGroupEnabled()) if (!IsResGroupEnabled())
return; return;
...@@ -819,7 +845,7 @@ ResourceGroupGetQueryMemoryLimit(void) ...@@ -819,7 +845,7 @@ ResourceGroupGetQueryMemoryLimit(void)
ResGroupSlotData *slot; ResGroupSlotData *slot;
Assert(MyResGroupSharedInfo != NULL); Assert(MyResGroupSharedInfo != NULL);
Assert(MyResGroupProcInfo != NULL); Assert(MyResGroupProcInfo != NULL);
Assert(MyResGroupProcInfo->slotId != InvalidOid); Assert(MyResGroupProcInfo->slotId != InvalidSlotId);
if (IsResManagerMemoryPolicyNone()) if (IsResManagerMemoryPolicyNone())
return 0; return 0;
...@@ -854,6 +880,7 @@ ResGroupCreate(Oid groupId) ...@@ -854,6 +880,7 @@ ResGroupCreate(Oid groupId)
group->totalQueued = 0; group->totalQueued = 0;
group->memUsage = 0; group->memUsage = 0;
group->memSharedUsage = 0; group->memSharedUsage = 0;
group->memStocksGranted = 0;
memset(&group->totalQueuedTime, 0, sizeof(group->totalQueuedTime)); memset(&group->totalQueuedTime, 0, sizeof(group->totalQueuedTime));
group->lockedForDrop = false; group->lockedForDrop = false;
memset(group->slots, 0, sizeof(group->slots)); memset(group->slots, 0, sizeof(group->slots));
...@@ -935,8 +962,13 @@ detachFromSlot(ResGroupData *group, ...@@ -935,8 +962,13 @@ detachFromSlot(ResGroupData *group,
Assert(value >= 0); Assert(value >= 0);
} }
/*
* Get a free resource group slot.
*
* A free resource group slot has inUse == false, no other information is checked.
*/
static int static int
getFreeSlot(void) getFreeSlot(ResGroupData *group)
{ {
int i; int i;
...@@ -944,10 +976,10 @@ getFreeSlot(void) ...@@ -944,10 +976,10 @@ getFreeSlot(void)
for (i = 0; i < RESGROUP_MAX_SLOTS; i++) for (i = 0; i < RESGROUP_MAX_SLOTS; i++)
{ {
if (MyResGroupSharedInfo->slots[i].inUse) if (group->slots[i].inUse)
continue; continue;
MyResGroupSharedInfo->slots[i].inUse = true; group->slots[i].inUse = true;
return i; return i;
} }
...@@ -955,6 +987,138 @@ getFreeSlot(void) ...@@ -955,6 +987,138 @@ getFreeSlot(void)
return InvalidSlotId; return InvalidSlotId;
} }
/*
* Get a slot with memory quota granted.
*
* A slot can be got with this function if there is enough memory quota
* available and the concurrency limit is not reached.
*
* On success the memory quota is marked as granted, nRunning is increased
* and the slot's inUse flag is also set, the slot id is returned.
*
* On failure nothing is changed and InvalidSlotId is returned.
*/
static int
getSlot(ResGroupData *group)
{
ResGroupConfigSnapshot config;
ResGroupSlotData *slot;
int32 groupMemStocks;
int32 slotMemStocks;
int32 memStocksGranted;
int32 segmentChunks;
int32 memLimit;
int slotId;
Assert(LWLockHeldExclusiveByMe(ResGroupLock));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(group != NULL);
Assert(group->groupId != InvalidOid);
/* First check if the concurrency limit is reached */
GetConcurrencyForResGroup(group->groupId, NULL, &config.concurrency);
Assert(config.concurrency > 0);
if (group->nRunning >= config.concurrency)
return InvalidSlotId;
/* Then check for memory stocks */
GetMemoryCapabilitiesForResGroup(group->groupId,
&config.memoryLimit,
&config.sharedQuota,
&config.spillRatio);
Assert(pResGroupControl->segmentsOnMaster > 0);
groupMemStocks = RESGROUP_MAX_MEM_STOCKS * config.memoryLimit / 100;
slotMemStocks = groupMemStocks / config.concurrency;
Assert(slotMemStocks > 0);
Assert(group->memStocksGranted >= 0);
Assert(group->memStocksGranted <= groupMemStocks);
memStocksGranted = pg_atomic_add_fetch_u32((pg_atomic_uint32*) &group->memStocksGranted,
slotMemStocks);
if (memStocksGranted > groupMemStocks)
{
/* No enough memory quota available, give up */
memStocksGranted = pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->memStocksGranted,
slotMemStocks);
Assert(memStocksGranted >= 0);
return InvalidSlotId;
}
/* Now actually get a free slot */
slotId = getFreeSlot(group);
Assert(slotId != InvalidSlotId);
slot = &group->slots[slotId];
Assert(slot->inUse);
/* Grant the memory stocks to it */
Assert(slot->memStocks == 0);
slot->memStocks = slotMemStocks;
/* Grant the memory quota to it */
segmentChunks = ResGroupOps_GetTotalMemory()
* gp_resource_group_memory_limit
/ pResGroupControl->segmentsOnMaster;
memLimit = segmentChunks * config.memoryLimit / 100;
slot->memQuota = memLimit * (100 - config.sharedQuota) / config.concurrency / 100;
/* Store the config snapshot to it */
slot->config = config;
/* And finally increase nRunning */
pg_atomic_add_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1);
return slotId;
}
/*
* Put back a slot.
*
* This will release a slot, its memory quota will be freed and
* nRunning will be decreased.
*/
static void
putSlot(ResGroupData *group, int slotId)
{
ResGroupSlotData *slot;
#ifdef USE_ASSERT_CHECKING
int32 memStocksGranted;
#endif
Assert(LWLockHeldExclusiveByMe(ResGroupLock));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(group != NULL);
Assert(group->memStocksGranted >= 0);
Assert(group->nRunning > 0);
Assert(slotId != InvalidSlotId);
slot = &group->slots[slotId];
Assert(slot->inUse);
Assert(slot->memQuota > 0);
Assert(slot->memStocks > 0);
/* Return the memory quota granted to this slot */
#ifdef USE_ASSERT_CHECKING
memStocksGranted =
#endif
pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->memStocksGranted,
slot->memStocks);
Assert(memStocksGranted >= 0);
/* Mark the slot as free */
slot->memStocks = 0;
slot->inUse = false;
/* And finally decrease nRunning */
pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1);
}
/* /*
* Acquire a resource group slot * Acquire a resource group slot
* *
...@@ -966,7 +1130,6 @@ ResGroupSlotAcquire(void) ...@@ -966,7 +1130,6 @@ ResGroupSlotAcquire(void)
ResGroupData *group; ResGroupData *group;
Oid groupId; Oid groupId;
int concurrencyProposed; int concurrencyProposed;
int slotId;
bool retried = false; bool retried = false;
Assert(MyResGroupProcInfo->groupId == InvalidOid); Assert(MyResGroupProcInfo->groupId == InvalidOid);
...@@ -1014,12 +1177,27 @@ retry: ...@@ -1014,12 +1177,27 @@ retry:
/* acquire a slot */ /* acquire a slot */
if (group->nRunning < concurrencyProposed) if (group->nRunning < concurrencyProposed)
{ {
group->nRunning++; /* should not been granted a slot yet */
Assert(MyProc->resSlotId == InvalidSlotId);
/* so try to get one directly */
MyProc->resSlotId = getSlot(group);
/* if can't get one */
if (MyProc->resSlotId == InvalidSlotId)
{
/* then wait one from some others */
ResGroupWait(group, true);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
Assert(MyProc->resSlotId != InvalidSlotId);
}
group->totalExecuted++; group->totalExecuted++;
slotId = getFreeSlot();
LWLockRelease(ResGroupLock); LWLockRelease(ResGroupLock);
pgstat_report_resgroup(0, group->groupId); pgstat_report_resgroup(0, group->groupId);
return slotId; Assert(MyProc->resSlotId != InvalidSlotId);
return MyProc->resSlotId;
} }
/* We have to wait for the slot */ /* We have to wait for the slot */
...@@ -1032,9 +1210,9 @@ retry: ...@@ -1032,9 +1210,9 @@ retry:
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group->totalExecuted++; group->totalExecuted++;
addTotalQueueDuration(group); addTotalQueueDuration(group);
slotId = getFreeSlot();
LWLockRelease(ResGroupLock); LWLockRelease(ResGroupLock);
return slotId; Assert(MyProc->resSlotId != InvalidSlotId);
return MyProc->resSlotId;
} }
/* Update the total queued time of this group */ /* Update the total queued time of this group */
...@@ -1063,39 +1241,50 @@ ResGroupSlotRelease(void) ...@@ -1063,39 +1241,50 @@ ResGroupSlotRelease(void)
ResGroupData *group; ResGroupData *group;
PROC_QUEUE *waitQueue; PROC_QUEUE *waitQueue;
PGPROC *waitProc; PGPROC *waitProc;
int concurrencyProposed;
group = MyResGroupSharedInfo; group = MyResGroupSharedInfo;
Assert(group != NULL); Assert(group != NULL);
GetConcurrencyForResGroup(group->groupId, NULL, &concurrencyProposed);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
waitQueue = &(group->waitProcs); waitQueue = &group->waitProcs;
if ((group->nRunning > concurrencyProposed) || Assert(MyProc->resSlotId != InvalidSlotId);
waitQueue->size == 0) putSlot(group, MyProc->resSlotId);
MyProc->resSlotId = InvalidSlotId;
/*
* My slot is put back, then how many queueing queries should I wake up?
* Maybe zero, maybe one, maybe more, depends on how the resgroup's
* configuration were changed during our execution.
*/
while (waitQueue->size > 0)
{ {
AssertImply(waitQueue->size == 0, int slotId;
waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) &&
waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links));
Assert(group->nRunning > 0);
group->nRunning--; slotId = getSlot(group);
if (slotId == InvalidSlotId)
break;
/* wake up one process in the wait queue */
waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
SHMQueueDelete(&waitProc->links);
waitQueue->size--;
waitProc->resGranted = true;
waitProc->resSlotId = slotId; /* pass the slot to new query */
LWLockRelease(ResGroupLock); LWLockRelease(ResGroupLock);
return;
waitProc->resWaiting = false;
SetLatch(&waitProc->procLatch);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
} }
/* wake up one process in the wait queue */ AssertImply(waitQueue->size == 0,
waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next); waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) &&
SHMQueueDelete(&(waitProc->links)); waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links));
waitQueue->size--;
waitProc->resGranted = true;
LWLockRelease(ResGroupLock);
waitProc->resWaiting = false; LWLockRelease(ResGroupLock);
SetLatch(&waitProc->procLatch);
} }
/* /*
...@@ -1136,7 +1325,8 @@ SerializeResGroupInfo(StringInfo str) ...@@ -1136,7 +1325,8 @@ SerializeResGroupInfo(StringInfo str)
* Deserialize the resource group information dispatched by QD. * Deserialize the resource group information dispatched by QD.
*/ */
void void
DeserializeResGroupInfo(const char *buf, int len) DeserializeResGroupInfo(struct ResGroupConfigSnapshot *config,
const char *buf, int len)
{ {
int tmp; int tmp;
const char *ptr = buf; const char *ptr = buf;
...@@ -1152,21 +1342,21 @@ DeserializeResGroupInfo(const char *buf, int len) ...@@ -1152,21 +1342,21 @@ DeserializeResGroupInfo(const char *buf, int len)
procInfo->slotId = ntohl(tmp); procInfo->slotId = ntohl(tmp);
ptr += sizeof(procInfo->slotId); ptr += sizeof(procInfo->slotId);
memcpy(&tmp, ptr, sizeof(procInfo->config.concurrency)); memcpy(&tmp, ptr, sizeof(config->concurrency));
procInfo->config.concurrency = ntohl(tmp); config->concurrency = ntohl(tmp);
ptr += sizeof(procInfo->config.concurrency); ptr += sizeof(config->concurrency);
memcpy(&tmp, ptr, sizeof(procInfo->config.memoryLimit)); memcpy(&tmp, ptr, sizeof(config->memoryLimit));
procInfo->config.memoryLimit = ntohl(tmp); config->memoryLimit = ntohl(tmp);
ptr += sizeof(procInfo->config.memoryLimit); ptr += sizeof(config->memoryLimit);
memcpy(&tmp, ptr, sizeof(procInfo->config.sharedQuota)); memcpy(&tmp, ptr, sizeof(config->sharedQuota));
procInfo->config.sharedQuota = ntohl(tmp); config->sharedQuota = ntohl(tmp);
ptr += sizeof(procInfo->config.sharedQuota); ptr += sizeof(config->sharedQuota);
memcpy(&tmp, ptr, sizeof(procInfo->config.spillRatio)); memcpy(&tmp, ptr, sizeof(config->spillRatio));
procInfo->config.spillRatio = ntohl(tmp); config->spillRatio = ntohl(tmp);
ptr += sizeof(procInfo->config.spillRatio); ptr += sizeof(config->spillRatio);
Assert(len == ptr - buf); Assert(len == ptr - buf);
} }
...@@ -1195,8 +1385,6 @@ AssignResGroupOnMaster(void) ...@@ -1195,8 +1385,6 @@ AssignResGroupOnMaster(void)
ResGroupData *sharedInfo; ResGroupData *sharedInfo;
ResGroupSlotData *slot; ResGroupSlotData *slot;
ResGroupProcData *procInfo; ResGroupProcData *procInfo;
int concurrency;
int memoryLimit, sharedQuota, spillRatio;
int slotId; int slotId;
Oid groupId; Oid groupId;
...@@ -1211,19 +1399,17 @@ AssignResGroupOnMaster(void) ...@@ -1211,19 +1399,17 @@ AssignResGroupOnMaster(void)
Assert(groupId != InvalidOid); Assert(groupId != InvalidOid);
Assert(!MyResGroupProcInfo->doMemCheck); Assert(!MyResGroupProcInfo->doMemCheck);
/* Get config information */
GetMemoryCapabilitiesForResGroup(groupId, &memoryLimit, &sharedQuota, &spillRatio);
GetConcurrencyForResGroup(groupId, NULL, &concurrency);
/* Init slot */ /* Init slot */
slot = &sharedInfo->slots[slotId]; slot = &sharedInfo->slots[slotId];
Assert(slot->memQuota > 0);
slot->sessionId = gp_session_id; slot->sessionId = gp_session_id;
slot->segmentChunks = ResGroupOps_GetTotalMemory() slot->segmentChunks = ResGroupOps_GetTotalMemory()
* gp_resource_group_memory_limit / pResGroupControl->segmentsOnMaster; * gp_resource_group_memory_limit / pResGroupControl->segmentsOnMaster;
slot->memLimit = slot->segmentChunks * memoryLimit / 100; slot->memLimit = slot->segmentChunks * slot->config.memoryLimit / 100;
slot->memSharedQuota = slot->memLimit * sharedQuota / 100; slot->memSharedQuota = slot->memLimit * slot->config.sharedQuota / 100;
slot->memQuota = slot->memLimit * (100 - sharedQuota) / concurrency / 100; slot->memSpill = slot->memLimit * slot->config.spillRatio
slot->memSpill = slot->memLimit * spillRatio / concurrency / 100; / slot->config.concurrency / 100;
pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1);
Assert(slot->memLimit > 0); Assert(slot->memLimit > 0);
Assert(slot->memQuota > 0); Assert(slot->memQuota > 0);
...@@ -1231,10 +1417,7 @@ AssignResGroupOnMaster(void) ...@@ -1231,10 +1417,7 @@ AssignResGroupOnMaster(void)
procInfo = MyResGroupProcInfo; procInfo = MyResGroupProcInfo;
procInfo->groupId = groupId; procInfo->groupId = groupId;
procInfo->slotId = slotId; procInfo->slotId = slotId;
procInfo->config.memoryLimit = memoryLimit; procInfo->config = slot->config;
procInfo->config.sharedQuota = sharedQuota;
procInfo->config.spillRatio = spillRatio;
procInfo->config.concurrency = concurrency;
Assert(pResGroupControl != NULL); Assert(pResGroupControl != NULL);
Assert(pResGroupControl->segmentsOnMaster > 0); Assert(pResGroupControl->segmentsOnMaster > 0);
...@@ -1286,7 +1469,7 @@ UnassignResGroupOnMaster(void) ...@@ -1286,7 +1469,7 @@ UnassignResGroupOnMaster(void)
procInfo->slotId = InvalidSlotId; procInfo->slotId = InvalidSlotId;
/* Cleanup slotInfo */ /* Cleanup slotInfo */
slot->inUse = false; pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1);
/* Relesase the slot */ /* Relesase the slot */
ResGroupSlotRelease(); ResGroupSlotRelease();
...@@ -1305,6 +1488,7 @@ SwitchResGroupOnSegment(const char *buf, int len) ...@@ -1305,6 +1488,7 @@ SwitchResGroupOnSegment(const char *buf, int len)
{ {
Oid prevGroupId; Oid prevGroupId;
int prevSlotId; int prevSlotId;
ResGroupConfigSnapshot config;
ResGroupData *sharedInfo; ResGroupData *sharedInfo;
ResGroupSlotData *slot; ResGroupSlotData *slot;
ResGroupProcData *procInfo; ResGroupProcData *procInfo;
...@@ -1318,7 +1502,7 @@ SwitchResGroupOnSegment(const char *buf, int len) ...@@ -1318,7 +1502,7 @@ SwitchResGroupOnSegment(const char *buf, int len)
/* Stop memory limit checking */ /* Stop memory limit checking */
procInfo->doMemCheck = false; procInfo->doMemCheck = false;
DeserializeResGroupInfo(buf, len); DeserializeResGroupInfo(&config, buf, len);
AssertImply(procInfo->groupId != InvalidOid, AssertImply(procInfo->groupId != InvalidOid,
procInfo->slotId != InvalidSlotId); procInfo->slotId != InvalidSlotId);
...@@ -1361,18 +1545,21 @@ SwitchResGroupOnSegment(const char *buf, int len) ...@@ -1361,18 +1545,21 @@ SwitchResGroupOnSegment(const char *buf, int len)
/* Init MyResGroupProcInfo */ /* Init MyResGroupProcInfo */
Assert(host_segments > 0); Assert(host_segments > 0);
Assert(procInfo->config.concurrency > 0); Assert(config.concurrency > 0);
Assert(procInfo->slotId != InvalidSlotId);
procInfo->config = config;
/* Init slot */ /* Init slot */
slot = &sharedInfo->slots[procInfo->slotId]; slot = &sharedInfo->slots[procInfo->slotId];
slot->sessionId = gp_session_id; slot->sessionId = gp_session_id;
slot->config = config;
slot->segmentChunks = ResGroupOps_GetTotalMemory() slot->segmentChunks = ResGroupOps_GetTotalMemory()
* gp_resource_group_memory_limit / host_segments; * gp_resource_group_memory_limit / host_segments;
slot->memLimit = slot->segmentChunks * procInfo->config.memoryLimit / 100; slot->memLimit = slot->segmentChunks * slot->config.memoryLimit / 100;
slot->memSharedQuota = slot->memLimit * procInfo->config.sharedQuota / 100; slot->memSharedQuota = slot->memLimit * slot->config.sharedQuota / 100;
slot->memQuota = slot->memLimit slot->memQuota = slot->memLimit
* (100 - procInfo->config.sharedQuota) * (100 - slot->config.sharedQuota)
/ procInfo->config.concurrency / slot->config.concurrency
/ 100; / 100;
slot->memSpill = slot->memLimit slot->memSpill = slot->memLimit
* procInfo->config.spillRatio * procInfo->config.spillRatio
...@@ -1565,7 +1752,7 @@ AtProcExit_ResGroup(int code, Datum arg) ...@@ -1565,7 +1752,7 @@ AtProcExit_ResGroup(int code, Datum arg)
* ResGroupData entry may have been removed if the DROP is committed. * ResGroupData entry may have been removed if the DROP is committed.
*/ */
static void static void
ResGroupWaitCancel() ResGroupWaitCancel(void)
{ {
ResGroupData *group; ResGroupData *group;
PROC_QUEUE *waitQueue; PROC_QUEUE *waitQueue;
...@@ -1579,13 +1766,15 @@ ResGroupWaitCancel() ...@@ -1579,13 +1766,15 @@ ResGroupWaitCancel()
/* We are sure to be interrupted in the for loop of ResGroupWait now */ /* We are sure to be interrupted in the for loop of ResGroupWait now */
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
waitQueue = &group->waitProcs;
if (MyProc->links.next != INVALID_OFFSET) if (MyProc->links.next != INVALID_OFFSET)
{ {
/* Still waiting on the queue when get interrupted, remove myself from the queue */ /* Still waiting on the queue when get interrupted, remove myself from the queue */
waitQueue = &(group->waitProcs);
Assert(waitQueue->size > 0); Assert(waitQueue->size > 0);
Assert(MyProc->resWaiting); Assert(MyProc->resWaiting);
Assert(MyProc->resSlotId == InvalidSlotId);
addTotalQueueDuration(group); addTotalQueueDuration(group);
...@@ -1595,29 +1784,39 @@ ResGroupWaitCancel() ...@@ -1595,29 +1784,39 @@ ResGroupWaitCancel()
else if (MyProc->links.next == INVALID_OFFSET && MyProc->resGranted) else if (MyProc->links.next == INVALID_OFFSET && MyProc->resGranted)
{ {
/* Woken up by a slot holder */ /* Woken up by a slot holder */
Assert(MyProc->resSlotId != InvalidSlotId);
putSlot(group, MyProc->resSlotId);
MyProc->resSlotId = InvalidSlotId;
group->totalExecuted++; group->totalExecuted++;
addTotalQueueDuration(group); addTotalQueueDuration(group);
waitQueue = &(group->waitProcs); /*
if (waitQueue->size == 0) * Similar as ResGroupSlotRelease(), how many pending queries to
* wake up depends on how many slots we can get.
*/
while (waitQueue->size > 0)
{ {
/* This is the last transaction on the wait queue, don't have to wake up others */ int slotId;
Assert(waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) &&
waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links)); slotId = getSlot(group);
Assert(group->nRunning > 0); if (slotId == InvalidSlotId)
break;
group->nRunning--;
}
else
{
/* wake up one process on the wait queue */ /* wake up one process on the wait queue */
waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next); waitProc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
SHMQueueDelete(&(waitProc->links)); SHMQueueDelete(&(waitProc->links));
waitQueue->size--; waitQueue->size--;
waitProc->resGranted = true; waitProc->resGranted = true;
waitProc->resWaiting = false; waitProc->resWaiting = false;
waitProc->resSlotId = slotId; /* pass the slot to new query */
SetLatch(&waitProc->procLatch); SetLatch(&waitProc->procLatch);
} }
AssertImply(waitQueue->size == 0,
waitQueue->links.next == MAKE_OFFSET(&waitQueue->links) &&
waitQueue->links.prev == MAKE_OFFSET(&waitQueue->links));
} }
else else
{ {
......
...@@ -172,6 +172,7 @@ struct PGPROC ...@@ -172,6 +172,7 @@ struct PGPROC
bool resGranted; /* true means a resource group slot is granted. bool resGranted; /* true means a resource group slot is granted.
false when wake up from a resource group which false when wake up from a resource group which
is locked for drop */ is locked for drop */
int resSlotId; /* the resource group slot id granted */
}; };
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
......
...@@ -25,6 +25,8 @@ extern int MaxResourceGroups; ...@@ -25,6 +25,8 @@ extern int MaxResourceGroups;
extern double gp_resource_group_cpu_limit; extern double gp_resource_group_cpu_limit;
extern double gp_resource_group_memory_limit; extern double gp_resource_group_memory_limit;
struct ResGroupConfigSnapshot;
/* Type of statistic infomation */ /* Type of statistic infomation */
typedef enum typedef enum
{ {
...@@ -54,7 +56,8 @@ extern void AllocResGroupEntry(Oid groupId); ...@@ -54,7 +56,8 @@ extern void AllocResGroupEntry(Oid groupId);
extern void FreeResGroupEntry(Oid groupId); extern void FreeResGroupEntry(Oid groupId);
extern void SerializeResGroupInfo(StringInfo str); extern void SerializeResGroupInfo(StringInfo str);
extern void DeserializeResGroupInfo(const char *buf, int len); extern void DeserializeResGroupInfo(struct ResGroupConfigSnapshot *config,
const char *buf, int len);
extern bool ShouldAssignResGroupOnMaster(void); extern bool ShouldAssignResGroupOnMaster(void);
extern void AssignResGroupOnMaster(void); extern void AssignResGroupOnMaster(void);
......
-- create a resource group when gp_resource_manager is queue
DROP ROLE IF EXISTS role_concurrency_test;
DROP
-- start_ignore
DROP RESOURCE GROUP rg_concurrency_test;
ERROR: resource group "rg_concurrency_test" does not exist
-- end_ignore
CREATE RESOURCE GROUP rg_concurrency_test WITH (concurrency=1, cpu_rate_limit=20, memory_limit=60, memory_shared_quota=0, memory_spill_ratio=10);
CREATE
CREATE ROLE role_concurrency_test RESOURCE GROUP rg_concurrency_test;
CREATE
--
-- increase concurrency after pending queries
--
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1;
ALTER
11:SET ROLE role_concurrency_test;
SET
11:BEGIN;
BEGIN
21:SET ROLE role_concurrency_test;
SET
22:SET ROLE role_concurrency_test;
SET
21&:BEGIN; <waiting ...>
22&:BEGIN; <waiting ...>
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2;
ALTER
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rsgname |waiting_reason|current_query
-------------------+--------------+------------------------------------------------------------------
admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rg_concurrency_test| |<IDLE> in transaction
rg_concurrency_test|resgroup |BEGIN;
rg_concurrency_test|resgroup |BEGIN;
(4 rows)
11:END;
END
11q: ... <quitting>
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rsgname |waiting_reason|current_query
-------------------+--------------+------------------------------------------------------------------
admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rg_concurrency_test| |<IDLE> in transaction
rg_concurrency_test| |<IDLE> in transaction
(3 rows)
21<: <... completed>
BEGIN
22<: <... completed>
BEGIN
21:END;
END
22:END;
END
21q: ... <quitting>
22q: ... <quitting>
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rsgname |waiting_reason|current_query
-----------+--------------+------------------------------------------------------------------
admin_group| |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
(1 row)
--
-- increase concurrency before pending queries
--
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1;
ALTER
11:SET ROLE role_concurrency_test;
SET
11:BEGIN;
BEGIN
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2;
ALTER
21:SET ROLE role_concurrency_test;
SET
22:SET ROLE role_concurrency_test;
SET
21&:BEGIN; <waiting ...>
22&:BEGIN; <waiting ...>
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rsgname |waiting_reason|current_query
-------------------+--------------+------------------------------------------------------------------
admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rg_concurrency_test| |<IDLE> in transaction
rg_concurrency_test|resgroup |BEGIN;
rg_concurrency_test|resgroup |BEGIN;
(4 rows)
11:END;
END
11q: ... <quitting>
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rsgname |waiting_reason|current_query
-------------------+--------------+------------------------------------------------------------------
admin_group | |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rg_concurrency_test| |<IDLE> in transaction
rg_concurrency_test| |<IDLE> in transaction
(3 rows)
21<: <... completed>
BEGIN
22<: <... completed>
BEGIN
21:END;
END
22:END;
END
21q: ... <quitting>
22q: ... <quitting>
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
rsgname |waiting_reason|current_query
-----------+--------------+------------------------------------------------------------------
admin_group| |SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
(1 row)
-- cleanup
DROP ROLE role_concurrency_test;
DROP
DROP RESOURCE GROUP rg_concurrency_test;
DROP
...@@ -101,7 +101,7 @@ ALTER ...@@ -101,7 +101,7 @@ ALTER
SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test'; SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
rsgname |num_running|num_queueing|num_queued|num_executed rsgname |num_running|num_queueing|num_queued|num_executed
-------------------+-----------+------------+----------+------------ -------------------+-----------+------------+----------+------------
rg_concurrency_test|3 |0 |1 |3 rg_concurrency_test|2 |1 |1 |2
(1 row) (1 row)
SELECT concurrency,proposed_concurrency FROM gp_toolkit.gp_resgroup_config WHERE groupname='rg_concurrency_test'; SELECT concurrency,proposed_concurrency FROM gp_toolkit.gp_resgroup_config WHERE groupname='rg_concurrency_test';
concurrency|proposed_concurrency concurrency|proposed_concurrency
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
! bash /tmp/.resgroup_mem_helper.sh; ! bash /tmp/.resgroup_mem_helper.sh;
! rm -f /tmp/.resgroup_mem_helper.sh; ! rm -f /tmp/.resgroup_mem_helper.sh;
! gpconfig -c gp_resource_manager -v group; ! gpconfig -c gp_resource_manager -v group;
! gpconfig -c gp_resource_group_cpu_limit -v 0.9;
! gpstop -rai; ! gpstop -rai;
-- end_ignore -- end_ignore
......
...@@ -5,8 +5,12 @@ test: resgroup_syntax ...@@ -5,8 +5,12 @@ test: resgroup_syntax
test: resgroup_transaction test: resgroup_transaction
test: resgroup_concurrency test: resgroup_concurrency
test: resgroup_alter_concurrency
test: resgroup_memory_statistic test: resgroup_memory_statistic
test: resgroup_memory_limit test: resgroup_memory_limit
test: resgroup_cpu_rate_limit
# memory spill tests
test: resgroup_memory_hashagg_spill test: resgroup_memory_hashagg_spill
test: resgroup_memory_hashjoin_spill test: resgroup_memory_hashjoin_spill
test: resgroup_memory_materialize_spill test: resgroup_memory_materialize_spill
...@@ -14,6 +18,5 @@ test: resgroup_memory_sisc_mat_sort ...@@ -14,6 +18,5 @@ test: resgroup_memory_sisc_mat_sort
test: resgroup_memory_sisc_sort_spill test: resgroup_memory_sisc_sort_spill
test: resgroup_memory_sort_spill test: resgroup_memory_sort_spill
test: resgroup_memory_spilltodisk test: resgroup_memory_spilltodisk
test: resgroup_cpu_rate_limit
test: disable_resgroup test: disable_resgroup
...@@ -42,6 +42,9 @@ ...@@ -42,6 +42,9 @@
20170502:01:28:12:000367 gpconfig:sdw6:gpadmin-[WARNING]:-Managing queries with resource groups is an experimental feature. A work-in-progress version is enabled. 20170502:01:28:12:000367 gpconfig:sdw6:gpadmin-[WARNING]:-Managing queries with resource groups is an experimental feature. A work-in-progress version is enabled.
20170502:01:28:13:000367 gpconfig:sdw6:gpadmin-[INFO]:-completed successfully 20170502:01:28:13:000367 gpconfig:sdw6:gpadmin-[INFO]:-completed successfully
! gpconfig -c gp_resource_group_cpu_limit -v 0.9;
20170803:10:42:57:015929 gpconfig:nyu-vm-centos:gpadmin-[INFO]:-completed successfully
! gpstop -rai; ! gpstop -rai;
-- end_ignore -- end_ignore
......
-- create a resource group when gp_resource_manager is queue
DROP ROLE IF EXISTS role_concurrency_test;
-- start_ignore
DROP RESOURCE GROUP rg_concurrency_test;
-- end_ignore
CREATE RESOURCE GROUP rg_concurrency_test WITH
(concurrency=1, cpu_rate_limit=20, memory_limit=60, memory_shared_quota=0, memory_spill_ratio=10);
CREATE ROLE role_concurrency_test RESOURCE GROUP rg_concurrency_test;
--
-- increase concurrency after pending queries
--
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1;
11:SET ROLE role_concurrency_test;
11:BEGIN;
21:SET ROLE role_concurrency_test;
22:SET ROLE role_concurrency_test;
21&:BEGIN;
22&:BEGIN;
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2;
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
11:END;
11q:
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
21<:
22<:
21:END;
22:END;
21q:
22q:
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
--
-- increase concurrency before pending queries
--
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 1;
11:SET ROLE role_concurrency_test;
11:BEGIN;
ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2;
21:SET ROLE role_concurrency_test;
22:SET ROLE role_concurrency_test;
21&:BEGIN;
22&:BEGIN;
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
11:END;
11q:
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
21<:
22<:
21:END;
22:END;
21q:
22q:
SELECT rsgname,waiting_reason,current_query FROM pg_stat_activity;
-- cleanup
DROP ROLE role_concurrency_test;
DROP RESOURCE GROUP rg_concurrency_test;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册