提交 6c31a3b4 编写于 作者: G George Caragea

Metadata Versioning feature for the ORCA Query Optimizer.

Added a generation-based Metadata Versioning mechanism
which will be used by ORCA to cache and invalidate catalog
data in its Metadata Cache.
Versioning is disabled by default at this point, until the
Metadata Cache eviction policy is completed.
上级 b7365f58
......@@ -2984,4 +2984,18 @@ gpdb::UlLeafPartitions
return 0;
}
// Notify MD Versioning of a new command beginning
bool
gpdb::FMDVersioningNewCommand
(
void
)
{
GP_WRAP_START;
return mdver_command_begin();
GP_WRAP_END;
return true;
}
// EOF
......@@ -927,11 +927,22 @@ COptTasks::PvOptimizeTask
AUTO_MEM_POOL(amp);
IMemoryPool *pmp = amp.Pmp();
// initialize metadata cache
// Notify MD Versioning of new command
bool reset_mdcache = gpdb::FMDVersioningNewCommand();
// initialize metadata cache, or purge if needed
if (!CMDCache::FInitialized())
{
CMDCache::Init();
}
else
{
// If MD Versioning detected a new generation, purge MDCache contents
if (reset_mdcache)
{
CMDCache::Reset();
}
}
// load search strategy
DrgPss *pdrgpss = PdrgPssLoad(pmp, optimizer_search_strategy_path);
......
......@@ -69,6 +69,7 @@
#include "executor/spi.h"
#include "utils/workfile_mgr.h"
#include "utils/session_state.h"
#include "utils/mdver.h"
shmem_startup_hook_type shmem_startup_hook = NULL;
......@@ -148,6 +149,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, ResPortalIncrementShmemSize());
}
}
size = add_size(size, mdver_shmem_size());
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, LocalDistribXact_ShmemSize());
size = add_size(size, XLOGShmemSize());
......@@ -412,7 +415,16 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
*/
BTreeShmemInit();
workfile_mgr_cache_init();
/*
* On the master and standby master, we allocate
* Metadata Versioning's Global generation component in shared memory
*/
if (GpIdentity.segindex == MASTER_CONTENT_ID) {
mdver_shmem_init();
}
#ifdef EXEC_BACKEND
/*
......
......@@ -10,7 +10,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = fmgrtab.o session_state.o
SUBDIRS = adt cache datumstream error fmgr hash init mb misc mmgr resowner \
resscheduler sort stat time gpmon gp workfile_manager
resscheduler sort stat time gpmon gp workfile_manager mdver
include $(top_srcdir)/src/backend/common.mk
......
......@@ -34,7 +34,7 @@
#include "utils/rel.h"
#include "utils/resowner.h"
#include "utils/syscache.h"
#include "utils/mdver.h"
/* #define CACHEDEBUG */ /* turns DEBUG elogs on */
......@@ -1702,6 +1702,9 @@ PrepareToInvalidateCacheTuple(Relation relation,
Assert(CacheHdr != NULL);
reloid = RelationGetRelid(relation);
/* Notifying the MD Versioning component of catalog changes */
mdver_inv_translator(relation);
/* ----------------
* for each cache
......
......@@ -98,7 +98,7 @@
#include "utils/relcache.h"
#include "utils/simex.h"
#include "utils/syscache.h"
#include "utils/mdver.h"
/*
* To minimize palloc traffic, we keep pending requests in successively-
......@@ -156,6 +156,7 @@ typedef struct TransInvalidationInfo
} TransInvalidationInfo;
static TransInvalidationInfo *transInvalInfo = NULL;
static mdver_local *local_mdver = NULL;
/*
* Dynamically-registered callback functions. Current implementation
......@@ -797,6 +798,17 @@ AtStart_Inval(void)
MemoryContextAllocZero(TopTransactionContext,
sizeof(TransInvalidationInfo));
transInvalInfo->my_level = GetCurrentTransactionNestLevel();
if (mdver_enabled())
{
/*
* Since we create the TransInvalidationInfo in the TopTransactionContext,
* we should create the local mdver in the same context as well.
*/
MemoryContext oldcxt = MemoryContextSwitchTo(TopTransactionContext);
local_mdver = mdver_create_local();
MemoryContextSwitchTo(oldcxt);
}
}
/*
......@@ -956,6 +968,9 @@ AtEOXact_Inval(bool isCommit)
if (transInvalInfo->RelcacheInitFileInval)
RelationCacheInitFileInvalidate(false);
/* Notifying the MD Versioning component of transaction commit */
mdver_bump_global_generation(local_mdver);
}
else if (transInvalInfo != NULL)
{
......@@ -968,6 +983,7 @@ AtEOXact_Inval(bool isCommit)
/* Need not free anything explicitly */
transInvalInfo = NULL;
local_mdver = NULL;
}
/*
......@@ -1200,3 +1216,13 @@ CacheRegisterRelcacheCallback(CacheCallbackFunction func,
++cache_callback_count;
}
/*
* GetCurrentLocalMDVer
* Return the local mdver
*/
mdver_local*
GetCurrentLocalMDVer(void)
{
return local_mdver;
}
......@@ -1076,7 +1076,7 @@ void
Cache_UpdatePerfCounter64(int64 *counter, int64 delta)
{
Assert(counter + delta >= 0);
gp_atomic_add_64(counter, delta);
gp_atomic_add_int64(counter, delta);
}
/*
......
#-------------------------------------------------------------------------
#
# Makefile--
# Makefile for utils/mdver
#
# IDENTIFICATION
# $PostgreSQL: pgsql/src/backend/utils/mdver/Makefile,v 1.15 2015/11/11 17:45:09
#
#-------------------------------------------------------------------------
subdir = src/backend/utils/mdver
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = mdver_global_generation.o mdver_local_generation.o mdver_utils.o \
mdver_inv_translator.o
include $(top_srcdir)/src/backend/common.mk
Metadata Versioning
===================
Objectives
----------
Metadata versioning is a component used by the Optimizer to maintain and
invalidate the content of the Metadata Cache (MD Cache).
Different backends can check the global generation counter
Overview
--------
A global "Generation" counter is stored in shared memory, which reflects the
current "generation" of the catalog contents. Whenever a catalog change is
committed by any transaction, the Generation counter is incremented.
The backend processes running the Optimizer (QDs) check this Generation
counter at the beginning of each command. If the local Metadata Cache contains
any cached entries from a different generation, all those entries are discarded.
Architecture
------------
Global Cache Generation (GG)
An integer counter stored in Shared Memory (SHM). This records the current
generation number, and can be read and written atomically by all backends.
Local Cache Generation (LG)
An integer stored inside the Metadata Cache component of a Backend
process. This holds the last global generation observed by the backend. This
is updated only when a command starts.
Transaction Dirty (TD)
A flag stored in the local memory of the Backend process. This flag
is set if the current transaction executed any commands that changed metadata.
The flag is reset at the end of the transaction.
MDCache Dirty (MD)
A flag stored in the local memory of the backend process. This flag is set
when a command changes metadata. At the beginning of the next command,
the entire contents of MDCache is flushed if this flag is set; the flag
is then reset.
Invalidation Translator (IT)
A new logical component. This component intercepts all changes to metadata
done by a query. When a relevant catalog update is detected, the IT component
takes action to record that a new generation id should be generated.
Functionality
-------------
Consider a transaction that reads and changes the schema of a table foo and a
table bar. MDC is the Optimizer MDCache, and the actions described are reading
schema information from the catalog, and storing them with the current
version.
Cmd Id | Cmd | Flags | MD Cache Actions | LG, GG
--------+-------------+-------------+-----------------------------+-----------
cmdId=0 | BEGIN | TD=f MD=f | | LG=GG=5
cmdId=1 | READ foo | TD=f MD=f | MDC <- (foo, MD(foo_t1)) | LG=GG=5
cmdId=2 | CHANGE foo | TD:=t MD:=t | Purge MDC | LG=GG=5
cmdId=3 | CHANGE bar | TD:=t MD:=t | Purge MDC | LG=GG=5
cmdId=4 | READ foo | TD=t MD:=f | MDC <- (foo, MD(foo_t4)) | LG=GG=5
cmdId=5 | COMMIT | TD=f MD=f | | GG++; GG=6
1. At the beginning of a transaction,
- set the TD flag to false
- set LG = GG.
2. At the beginning of each command, backend checks if LG != GG or MD = t. If true:
- Purge the contents of MD Cache
- Assign LG = GG
- Reset MD = f
3. When the Optimizer requests an object metadata, the MD Cache will go to the
catalog and request it. It will then store the metadata in the MD Cache.
4. When the Invalidation Translator detects a change to metadata from a command:
- set TD = true
- set MD = true
6. When a new read request comes from the Optimizer for an object not in MDC, MD
Cache will again request the metadata from the catalog. The catalog will apply
the current visibility rules (MVCC, SnapshotNow, etc), and return the most
up-to-date visible version..
7. At commit time, if TD = true, this transaction changed the metadata. Atomically
increment GG part of the commit.
/*-------------------------------------------------------------------------
*
* mdver_global_generation.c
* Implementation of Global Cache Generation of Metadata Versioning
*
* Copyright (c) 2015, Pivotal, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cdb/cdbvars.h"
#include "utils/mdver.h"
#include "utils/guc.h"
/* Name to identify the MD Version Global Cache Generation (GG) shared memory area*/
#define MDVER_GLOBAL_GEN_SHMEM_NAME "MDVer Global Cache Generation"
/*
* Pointer to the shared memory global cache generation (GG)
* This records the current
* generation number, and can be read and written atomically by all backends.
*/
uint64 *mdver_global_generation = NULL;
/*
* mdver_shmem_init
* Initalize the shared memory data structure needed for MD Version
*/
void
mdver_shmem_init(void) {
bool attach = false;
/*Allocate or attach to shared memory area */
void *shmem_base = ShmemInitStruct(MDVER_GLOBAL_GEN_SHMEM_NAME,
sizeof (*mdver_global_generation),
&attach);
mdver_global_generation = (uint64 *) shmem_base;
#ifdef MD_VERSIONING_INSTRUMENTATION
elog(gp_mdver_loglevel,
"MDVer: Creating global cache generation");
#endif
AssertImply(!attach, 0 == *mdver_global_generation);
}
/*
* mdver_shmem_size
* Compute the size of shared memory required for the MD Version component
*/
Size
mdver_shmem_size(void)
{
return (GpIdentity.segindex == MASTER_CONTENT_ID) ? sizeof(*mdver_global_generation) : 0;
}
/*
* mdver_get_global_generation
* Get Current global generation number
*/
uint64
mdver_get_global_generation(void)
{
Assert(NULL != mdver_global_generation);
return *mdver_global_generation;
}
/*
* mdver_bump_global_generation
* If transaction_is_dirty flag was set, it means this tx
* has caused some metadata changes and we record that by atomically bump
* global generation by 1
* local_mdver : current local mdver pointer
*/
void
mdver_bump_global_generation(mdver_local* local_mdver)
{
if (!mdver_enabled() ||
NULL == local_mdver ||
!local_mdver->transaction_dirty) {
return;
}
Assert(NULL != mdver_global_generation);
#ifdef MD_VERSIONING_INSTRUMENTATION
uint64 old_global_id = *mdver_global_generation;
#endif
const int64 INC = 1;
gp_atomic_add_uint64(mdver_global_generation, INC);
#ifdef MD_VERSIONING_INSTRUMENTATION
elog(gp_mdver_loglevel,
"MDVer: Bumping global cache generation from %llu to %llu",
old_global_id, *mdver_global_generation);
#endif
}
/*-------------------------------------------------------------------------
*
* mdver_inv_translator.c
* Implementation of Invalidation Translator (IT) for metadata version
*
* Copyright (c) 2015, Pivotal, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/catalog.h"
#include "utils/guc.h"
#include "utils/mdver.h"
/*
* Singleton static flag to mark the mdcache as dirty when a local
* command updates the catalog
*/
bool mdver_dirty_mdcache = false;
static void mdver_mark_dirty_mdcache(void);
/*
* mdver_inv_translator
* This component intercepts all changes to catalog(metadata) done by a query.
* When a relevant catalog update is detected, this IT component
* updates the bump command id so at command end mdcache is purged and at tx commit
* new global cache generation can be generated.
* relation : The catalog table being touched
*/
void
mdver_inv_translator(Relation relation)
{
if (!mdver_enabled())
{
return;
}
mdver_local* local_mdver = GetCurrentLocalMDVer();
/*
* We set local_mdver to null when transaction commit at AtEOXact_Inval.
* There are some catalog updates after this for e.g. by storage manager.
* We don't want to track those changes similar to how it is done for other
* cache invalidation
*/
if (NULL == local_mdver)
{
return;
}
/*
* We don't track(bump command id) catalog tables changes in the AOSEG namespace.
* These are modified for DML only (not DDL)
*/
if (IsAoSegmentRelation(relation))
{
return;
}
#ifdef MD_VERSIONING_INSTRUMENTATION
elog(gp_mdver_loglevel, "MDVer : INV Translator marking command and transaction as dirty");
#endif
mdver_mark_dirty_xact(local_mdver);
mdver_mark_dirty_mdcache();
}
static void
mdver_mark_dirty_mdcache(void)
{
mdver_dirty_mdcache = true;
}
/*-------------------------------------------------------------------------
*
* mdver_local_generation.c
* Implementation of Local Cache Generation for Metadata Versioning
*
* Copyright (c) 2015, Pivotal, Inc.
*
*-------------------------------------------------------------------------
*/
#include "utils/mdver.h"
#include "utils/guc.h"
#include "cdb/cdbvars.h"
/*
* mdver_create_local
* Creates the new Local MDVer and initialize for the current transaction
* Same mdver_local will be used for subtransaction as well.
*/
mdver_local*
mdver_create_local(void)
{
mdver_local* local_mdver = palloc0(sizeof(mdver_local));
/* sync to global cache generation */
local_mdver->local_generation = mdver_get_global_generation();
/* transaction is marked as clean initially */
local_mdver->transaction_dirty = false;
#ifdef MD_VERSIONING_INSTRUMENTATION
elog(gp_mdver_loglevel, "MDVer : Creating local cache generation. Gp_role=%d, Gp_identity=%d",
Gp_role,
GpIdentity.segindex);
#endif
return local_mdver;
}
/*
* mdver_mark_dirty_xact
* Mark the current transaction as "dirty". At commit time, the
* global generation counter will be updated.
*
* local_mdver : current local mdver pointer
*/
void
mdver_mark_dirty_xact(mdver_local* local_mdver)
{
Assert(NULL != local_mdver);
local_mdver->transaction_dirty = true;
#ifdef MD_VERSIONING_INSTRUMENTATION
elog(gp_mdver_loglevel, "MDVer: transaction_dirty set to true");
#endif
}
/*
* mdver_command_begin
* Called at the beginning of a new command.
* Checks local generation vs global generation. If different, returns true and
* updates the local generation.
* The caller should purge the MD Cache when a new generation is detected.
*/
bool
mdver_command_begin(void) {
/* Always purge cache if MD Versioning is disabled */
if (!mdver_enabled()) {
return true;
}
uint64 global_generation = mdver_get_global_generation();
mdver_local *local_mdver = GetCurrentLocalMDVer();
Assert(NULL != local_mdver);
#ifdef MD_VERSIONING_INSTRUMENTATION
elog(gp_mdver_loglevel, "MDVer: New command to optimizer, LG = " UINT64_FORMAT" , GG = " UINT64_FORMAT " mdver_dirty_mdcache = %d",
local_mdver->local_generation, global_generation, mdver_dirty_mdcache);
#endif
bool new_generation_detected = false;
/*
* We updated the local generation and ask a MD Cache purge in two
* scenarios:
* 1. A previous command in this session has updated the
* catalog (mdver dirty flag true)
* 2. A transaction in another session committed a catalog
* change and bumped the global generation (LG != GG)
*/
if (mdver_dirty_mdcache ||
local_mdver->local_generation != global_generation)
{
new_generation_detected = true;
local_mdver->local_generation = global_generation;
/* We are requesting MD Cache purge, we can reset the "dirty" flag */
mdver_dirty_mdcache = false;
#ifdef MD_VERSIONING_INSTRUMENTATION
elog(gp_mdver_loglevel, "MDVer: MDCache purge requested at command start");
#endif
}
return new_generation_detected;
}
/*-------------------------------------------------------------------------
*
* mdver_utils.c
* Utility functions for Metadata Versioning
*
* Copyright (c) 2015, Pivotal, Inc.
*
*-------------------------------------------------------------------------
*/
#include "utils/mdver.h"
#include "cdb/cdbvars.h"
#include "catalog/gp_verification_history.h"
/*
* mdver_enabled
* Returns true if Metadata Versiong for MD Cache is enabled in the current context
*/
bool
mdver_enabled()
{
/*
* We only initialized Metadata Version on the master,
* and only for QD or utility mode process.
* MD Version can also be disabled by the guc
* optimizer_release_mdcache to true
*/
return !optimizer_release_mdcache &&
GpIdentity.segindex == MASTER_CONTENT_ID &&
((GP_ROLE_DISPATCH == Gp_role) || (GP_ROLE_UTILITY == Gp_role));
}
subdir=src/backend/utils/mdver
top_builddir=../../../../..
include $(top_builddir)/src/Makefile.global
TARGETS=mdver_global_generation mdver_local_generation mdver_inv_translator
include $(top_builddir)/src/backend/mock.mk
mdver_global_generation.t: \
$(MOCK_DIR)/backend/storage/ipc/shmem_mock.o \
$(MOCK_DIR)/backend/utils/mdver/mdver_utils_mock.o
mdver_local_generation.t: \
$(MOCK_DIR)/backend/utils/cache/inval_mock.o \
$(MOCK_DIR)/backend/utils/mdver/mdver_utils_mock.o \
$(MOCK_DIR)/backend/utils/mdver/mdver_global_generation_mock.o
mdver_inv_translator.t: \
$(MOCK_DIR)/backend/utils/cache/inval_mock.o \
$(MOCK_DIR)/backend/utils/mdver/mdver_utils_mock.o \
$(MOCK_DIR)/backend/catalog/catalog_mock.o
#include <stdarg.h>
#include <stddef.h>
#include <setjmp.h>
#include "cmockery.h"
#include "../mdver_global_generation.c"
/*
*test__mdver_shmem_init__NULL_memory
* Testing global cache generation memory for default value
*/
void
test__mdver_shmem_init__NULL_memory(void** state)
{
assert_true(NULL == mdver_global_generation);
}
/*
* test__mdver_shmem_init__No_NULL
* Testing global cache generation memory by mocking ShmemInitStruct for memory
*/
void
test__mdver_shmem_init__No_NULL(void** state)
{
uint64 global_cache_generation = 0;
expect_any(ShmemInitStruct, name);
expect_any(ShmemInitStruct, size);
expect_any(ShmemInitStruct, foundPtr);
will_return(ShmemInitStruct, &global_cache_generation);
mdver_shmem_init();
assert_true(global_cache_generation = mdver_global_generation && 0 == *mdver_global_generation);
}
/*
* test__mdver_shmem_size__uint64
* Testing size of shared memory
*/
void
test__mdver_shmem_size__uint64(void** state)
{
GpIdentity.segindex = MASTER_CONTENT_ID;
assert_true(sizeof(uint64) == mdver_shmem_size());
GpIdentity.segindex = 42;
assert_true(0 == mdver_shmem_size());
}
/* test__mdver_bump_global_generation__inc
* Testing bumping global generation
*/
void
test__mdver_bump_global_generation__inc(void** state)
{
mdver_local local;
uint64 global_cache_generation = 0;
expect_any(ShmemInitStruct, name);
expect_any(ShmemInitStruct, size);
expect_any(ShmemInitStruct, foundPtr);
will_return(ShmemInitStruct, &global_cache_generation);
mdver_shmem_init();
/* If transaction_dirty is false, leave global generation unchanged */
local.transaction_dirty = false;
will_return(mdver_enabled, true);
mdver_bump_global_generation(&local);
assert_true(NULL != mdver_global_generation && 0 == *mdver_global_generation);
local.transaction_dirty = true;
will_return(mdver_enabled, true);
mdver_bump_global_generation(&local);
assert_true(NULL != mdver_global_generation && 1 == *mdver_global_generation);
}
int
main(int argc, char* argv[])
{
cmockery_parse_arguments(argc, argv);
const UnitTest tests[] = {
unit_test(test__mdver_shmem_init__NULL_memory),
unit_test(test__mdver_shmem_init__No_NULL),
unit_test(test__mdver_shmem_size__uint64),
unit_test(test__mdver_bump_global_generation__inc)
};
return run_tests(tests);
}
#include <stdarg.h>
#include <stddef.h>
#include <setjmp.h>
#include "cmockery.h"
#include "../mdver_inv_translator.c"
/*
* test__mdver_inv_translator__set_bmp
* Testing invalidate translator functionality
*/
void
test__mdver_inv_translator__set_bmp(void** state)
{
mdver_local local = {0, false};
Relation relation;
/* When relation is AoSegmentRelation, ignore the event */
local.transaction_dirty = false;
mdver_dirty_mdcache = false;
will_return(mdver_enabled, true);
will_return(GetCurrentLocalMDVer, &local);
expect_any(IsAoSegmentRelation, relation);
will_return(IsAoSegmentRelation, true);
mdver_inv_translator(relation);
assert_false(local.transaction_dirty);
assert_false(mdver_dirty_mdcache);
/* A real invalidation, dirty flag for command and transaction set */
local.transaction_dirty = false;
mdver_dirty_mdcache = false;
will_return(mdver_enabled, true);
will_return(GetCurrentLocalMDVer, &local);
expect_any(IsAoSegmentRelation, relation);
will_return(IsAoSegmentRelation, false);
mdver_inv_translator(relation);
assert_true(local.transaction_dirty);
assert_true(mdver_dirty_mdcache);
}
/* Testing that the global dirty flag is set to true */
void
test__mdver_mark_dirty_mdcache(void **state)
{
mdver_dirty_mdcache = false;
mdver_mark_dirty_mdcache();
assert_true(mdver_dirty_mdcache);
}
int
main(int argc, char* argv[])
{
cmockery_parse_arguments(argc, argv);
const UnitTest tests[] = {
unit_test(test__mdver_inv_translator__set_bmp),
unit_test(test__mdver_mark_dirty_mdcache)
};
return run_tests(tests);
}
#include <stdarg.h>
#include <stddef.h>
#include <setjmp.h>
#include "cmockery.h"
#include "../mdver_local_generation.c"
extern int gp_command_count;
/*
* test__mdver_create_local__default_value
* Testing default value for newly created local cache generations objects
*/
void
test__mdver_create_local__default_value(void** state)
{
will_return(mdver_get_global_generation, 42);
mdver_local* local_mdver = mdver_create_local();
assert_true(local_mdver->transaction_dirty == false &&
local_mdver->local_generation == 42);
pfree(local_mdver);
}
/*
* test__mdver_mark_dirty_xact__set_flag
* Testing that mdver_mark_dirty_xact sets the flag in the local context
*/
void
test__mdver_mark_dirty_xact__set_flag(void** state)
{
mdver_local local;
local.transaction_dirty = false;
mdver_mark_dirty_xact(&local);
assert_true(local.transaction_dirty);
}
/*
* test__mdver_command_begin__set_local_generation
* Testing that at command begin, we correctly check the local and global
* generation and take action:
* - if LG == GG, mdver_dirty_mdcache = false nothing to do, return false
* - if LG == GG, mdver_dirty_mdcache = true set LG = GG and return true
* - if LG != GG, then set LG = GG and return true
*/
void
test__mdver_command_begin__set_local_generation(void **state)
{
mdver_local local_mdver = {0, false};
bool result = false;
/* local_generation == global_generation == 10, mdver_dirty_mdcache = false */
mdver_dirty_mdcache = false;
local_mdver.local_generation = 10;
will_return(mdver_enabled, true);
will_return(mdver_get_global_generation, 10);
will_return(GetCurrentLocalMDVer, &local_mdver);
result = mdver_command_begin();
assert_false(result);
assert_true(local_mdver.local_generation == 10);
assert_false(local_mdver.transaction_dirty);
assert_false(mdver_dirty_mdcache);
/* local_generation == global_generation == 10, mdver_dirty_mdcache = false */
mdver_dirty_mdcache = true;
local_mdver.local_generation = 10;
will_return(mdver_enabled, true);
will_return(mdver_get_global_generation, 10);
will_return(GetCurrentLocalMDVer, &local_mdver);
result = mdver_command_begin();
assert_true(result);
assert_true(local_mdver.local_generation == 10);
assert_false(local_mdver.transaction_dirty);
assert_false(mdver_dirty_mdcache);
/* local_generation = 10, global_generation = 15. mdver_dirty_mdcache = true */
mdver_dirty_mdcache = true;
local_mdver.local_generation = 10;
will_return(mdver_enabled, true);
will_return(mdver_get_global_generation, 15);
will_return(GetCurrentLocalMDVer, &local_mdver);
result = mdver_command_begin();
assert_true(result);
assert_true(local_mdver.local_generation == 15);
assert_false(local_mdver.transaction_dirty);
assert_false(mdver_dirty_mdcache);
/* local_generation = 10, global_generation = 15. mdver_dirty_mdcache = false */
mdver_dirty_mdcache = false;
local_mdver.local_generation = 10;
will_return(mdver_enabled, true);
will_return(mdver_get_global_generation, 15);
will_return(GetCurrentLocalMDVer, &local_mdver);
result = mdver_command_begin();
assert_true(result);
assert_true(local_mdver.local_generation == 15);
assert_false(local_mdver.transaction_dirty);
assert_false(mdver_dirty_mdcache);
}
int
main(int argc, char* argv[])
{
cmockery_parse_arguments(argc, argv);
const UnitTest tests[] = {
unit_test(test__mdver_create_local__default_value),
unit_test(test__mdver_mark_dirty_xact__set_flag),
unit_test(test__mdver_command_begin__set_local_generation)
};
MemoryContextInit();
return run_tests(tests);
}
......@@ -160,11 +160,12 @@ int32 gp_atomic_add_32(volatile int32 *ptr, int32 inc)
}
/*
* gp_atomic_add_64
* gp_atomic_add_int64
* Atomic increment a 64-bit address, and return the incremented value
* inc can be a positive or negative quantity
*/
int64 gp_atomic_add_64(int64 *ptr, int64 inc)
int64
gp_atomic_add_int64(int64 *ptr, int64 inc)
{
Assert(NULL != ptr);
......@@ -185,13 +186,13 @@ int64 gp_atomic_add_64(int64 *ptr, int64 inc)
"MOVL %0, %%edi; \n\t" /* Load ptr */
"MOVL (%%edi), %%eax; \n\t" /* Load first word from *ptr */
"MOVL 0x4(%%edi), %%edx; \n\t" /* Load second word from *ptr + 4 */
"tryAgain_add_64: \n\t"
"1: \n\t"
"MOVL %1, %%ebx; \n\t" /* Load addValueLow */
"MOVL %2, %%ecx; \n\t" /* Load addValueHigh */
"ADDL %%eax, %%ebx; \n\t" /* Add first word */
"ADCL %%edx, %%ecx; \n\t" /* Add second word */
"lock cmpxchg8b (%%edi); \n\t" /* Compare and exchange 8 bytes atomically */
"jnz tryAgain_add_64; \n\t" /* If ptr has changed, try again with new value */
"jnz 1b\n\t" /* If ptr has changed, try again with new value */
"MOVL %3, %%edi; \n\t" /* Put result in *newValuePtr */
"MOVL %%ebx, (%%edi); \n\t" /* first word */
......@@ -213,6 +214,64 @@ int64 gp_atomic_add_64(int64 *ptr, int64 inc)
return newValue;
}
/*
* gp_atomic_add_uint64
* Atomic increment an unsigned 64-bit address, and return the incremented value
* inc can be a positive or negative quantity
* ptr : uint64 pointer that we need to inc atomically
* inc : values that needs to be added
*/
uint64
gp_atomic_add_uint64(uint64 *ptr, int64 inc)
{
Assert(inc >= 0);
Assert(NULL != ptr);
volatile uint64 newValue = 0;
uint64 uInc = (uint64) inc;
#if defined(__x86_64__)
uint64 oldValue = __sync_fetch_and_add(ptr, uInc);
newValue = oldValue + uInc;
#elif defined(__i386)
volatile uint64* newValuePtr = &newValue;
int addValueLow = (int) uInc;
int addValueHigh = (int) (uInc>>32);
__asm__ __volatile__ (
"PUSHL %%ebx; \n\t" /* Save ebx, it's a special register we can't clobber */
"MOVL %0, %%edi; \n\t" /* Load ptr */
"MOVL (%%edi), %%eax; \n\t" /* Load first word from *ptr */
"MOVL 0x4(%%edi), %%edx; \n\t" /* Load second word from *ptr + 4 */
"0: \n\t"
"MOVL %1, %%ebx; \n\t" /* Load addValueLow */
"MOVL %2, %%ecx; \n\t" /* Load addValueHigh */
"ADDL %%eax, %%ebx; \n\t" /* Add first word */
"ADCL %%edx, %%ecx; \n\t" /* Add second word */
"lock cmpxchg8b (%%edi); \n\t" /* Compare and exchange 8 bytes atomically */
"jnz 0b\n\t" /* If ptr has changed, try again with new value */
"MOVL %3, %%edi; \n\t" /* Put result in *newValuePtr */
"MOVL %%ebx, (%%edi); \n\t" /* first word */
"MOVL %%ecx, 0x4(%%edi); \n\t" /* second word */
"POPL %%ebx; \n\t" /* Restore ebx */
: /* no output registers */
: "m" (ptr), "m" (addValueLow), "m" (addValueHigh), "m" (newValuePtr) /* input registers */
: "memory", "%edi", "%edx", "%ecx", "%eax" /* clobber list */
);
#elif defined(__sparc__)
#error unsupported platform sparc: requires atomic_add_64 operation
/* For sparc we can probably use __sync_fetch_and_add as well */
#else
#error unsupported/unknown platform: requires atomic_add_64 operation
#endif
return newValue;
}
/*
* atomic_incmod_value
......
......@@ -87,6 +87,8 @@ static const char *assign_optimizer_cost_model(const char *newval,
bool doit, GucSource source);
static const char *assign_gp_workfile_caching_loglevel(const char *newval,
bool doit, GucSource source);
static const char *assign_gp_mdver_loglevel(const char *newval,
bool doit, GucSource source);
static const char *assign_gp_sessionstate_loglevel(const char *newval,
bool doit, GucSource source);
static const char *assign_time_slice_report_level(const char *newval, bool doit,
......@@ -367,6 +369,7 @@ static char *Debug_persistent_store_print_level_str;
static char *Debug_database_command_print_level_str;
static char *gp_log_format_string;
static char *gp_workfile_caching_loglevel_str;
static char *gp_mdver_loglevel_str;
static char *gp_sessionstate_loglevel_str;
static char *explain_memory_verbosity_str;
......@@ -572,6 +575,9 @@ bool optimizer_enable_derive_stats_all_groups;
bool optimizer_explain_show_status;
bool optimizer_prefer_scalar_dqa_multistage_agg;
/* MD Versiong Guc Variables */
int gp_mdver_loglevel = DEBUG1;
/* Security */
bool gp_reject_internal_tcp_conn = true;
......@@ -5055,7 +5061,18 @@ struct config_string ConfigureNamesString_gp[] =
&gp_workfile_caching_loglevel_str,
"debug1", assign_gp_workfile_caching_loglevel, NULL
},
{
{"gp_mdver_loglevel", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Sets the logging level for metadata version debugging messages"),
gettext_noop("Valid values are DEBUG5, DEBUG4, DEBUG3, DEBUG2, "
"DEBUG1, LOG, NOTICE, WARNING, and ERROR. Each level includes all the "
"levels that follow it. The later the level, the fewer messages are "
"sent."),
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
},
&gp_mdver_loglevel_str,
"debug1", assign_gp_mdver_loglevel, NULL
},
{
{"gp_sessionstate_loglevel", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Sets the logging level for session state debugging messages"),
......@@ -5662,6 +5679,12 @@ assign_gp_workfile_caching_loglevel(const char *newval,
return (assign_msglvl(&gp_workfile_caching_loglevel, newval, doit, source));
}
static const char *
assign_gp_mdver_loglevel(const char *newval,
bool doit, GucSource source) {
return (assign_msglvl(&gp_mdver_loglevel, newval, doit, source));
}
static const char *
assign_gp_sessionstate_loglevel(const char *newval,
bool doit, GucSource source)
......
......@@ -2,10 +2,13 @@ subdir=src/backend/utils/misc
top_builddir=../../../../..
include $(top_builddir)/src/Makefile.global
TARGETS=guc ps_status bitstream bitmap_compression
TARGETS=guc ps_status bitstream bitmap_compression atomic
include $(top_builddir)/src/backend/mock.mk
guc.t: \
$(MOCK_DIR)/backend/storage/ipc/shmem_mock.o \
$(MOCK_DIR)/backend/utils/misc/superuser_mock.o
atomic.t: \
$(MOCK_DIR)/backend/utils/error/assert_mock.o
\ No newline at end of file
#include <stdarg.h>
#include <stddef.h>
#include <setjmp.h>
#include "cmockery.h"
#include "c.h"
#include "postgres.h"
#include "../atomic.c"
#define INT64_MAX 9223372036854775807
#define INT64_MIN -9223372036854775807
#define UINT64_MAX 18446744073709551615
#define UINT64_MIN 0
/*
* Test gp_atomic_add_int64
*/
void
test__gp_atomic_add_int64(void **state)
{
/* Running sub-test: gp_atomic_add_int64 small addition */
int64 base = 25;
int64 inc = 3;
int64 result = 0;
int64 expected_result = base + inc;
result = gp_atomic_add_int64(&base, inc);
/* Examine if the value of base has been increased by the value of inc */
assert_true(result == expected_result && base == expected_result);
assert_true(result <= INT64_MAX && result >= INT64_MIN && base <= INT64_MAX && base >= INT64_MIN);
/* Running sub-test: gp_atomic_add_int64 small subtraction */
inc = -4;
result = 0;
expected_result = base + inc;
result = gp_atomic_add_int64(&base, inc);
assert_true(result == expected_result && base == expected_result);
assert_true(result <= INT64_MAX && result >= INT64_MIN && base <= INT64_MAX && base >= INT64_MIN);
/* Running sub-test: gp_atomic_add_int64 huge addition */
base = 37421634719307;
inc = 738246483234;
result = 0;
expected_result = base + inc;
result = gp_atomic_add_int64(&base, inc);
assert_true(result == expected_result && base == expected_result);
assert_true(result <= INT64_MAX && result >= INT64_MIN && base <= INT64_MAX && base >= INT64_MIN);
/* Ensure that an integer overflow occurs.*/
inc = INT64_MAX;
result = gp_atomic_add_int64(&base, inc);
assert_true(base < 0);
assert_true(result <= INT64_MAX && result >= INT64_MIN && base <= INT64_MAX && base >= INT64_MIN);
/* Running sub-test: gp_atomic_add_int64 huge subtraction */
base = 0;
inc = -32738246483234;
result = 0;
expected_result = base + inc;
result = gp_atomic_add_int64(&base, inc);
assert_true(result == expected_result && base == expected_result);
assert_true(result <= INT64_MAX && result >= INT64_MIN && base <= INT64_MAX && base >= INT64_MIN);
/* Ensure that an integer overflow occurs.*/
inc = INT64_MIN;
result = gp_atomic_add_int64(&base, inc);
assert_true(base > 0);
assert_true(result <= INT64_MAX && result >= INT64_MIN && base <= INT64_MAX && base >= INT64_MIN);
}
/*
* Test gp_atomic_add_uint64
*/
void
test__gp_atomic_add_uint64(void **state)
{
/* Running sub-test: gp_atomic_add_uint64 small addition */
uint64 base = 25;
int64 inc = 3;
uint64 result = 0;
uint64 expected_result = base + inc;
result = gp_atomic_add_uint64(&base, inc);
/* Examine if the value of base has been increased by the value of inc */
assert_true(result == expected_result && base == expected_result);
assert_true(result <= UINT64_MAX && result >= UINT64_MIN && base <= UINT64_MAX && base >= UINT64_MIN);
/* Running sub-test: gp_atomic_add_uint64 huge addition */
base = INT64_MAX;
inc = 738246483234;
result = 0;
expected_result = base + inc;
result = gp_atomic_add_uint64(&base, inc);
assert_true(result == expected_result && base == expected_result);
assert_true(result <= UINT64_MAX && result >= UINT64_MIN && base <= UINT64_MAX && base >= UINT64_MIN);
/* Ensure that an integer overflow occurs.*/
base = UINT64_MAX;
inc = 1;
result = gp_atomic_add_uint64(&base, inc);
assert_true(base == 0);
/* Running sub-test: gp_atomic_add_uint64 negative inc */
#ifdef USE_ASSERT_CHECKING
expect_any(ExceptionalCondition,conditionName);
expect_any(ExceptionalCondition,errorType);
expect_any(ExceptionalCondition,fileName);
expect_any(ExceptionalCondition,lineNumber);
will_be_called(ExceptionalCondition);
/* inc should be either zero or a positive integer. So, negative inc should fail. */
inc = -4;
result = gp_atomic_add_uint64(&base, inc);
#endif
}
int
main(int argc, char* argv[]) {
cmockery_parse_arguments(argc, argv);
const UnitTest tests[] = {
unit_test(test__gp_atomic_add_int64),
unit_test(test__gp_atomic_add_uint64)
};
return run_tests(tests);
}
......@@ -1799,7 +1799,7 @@ atomic_test(void)
int64 result = 0;
int64 expected_result = base + inc;
elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
result = gp_atomic_add_64(&base, inc);
result = gp_atomic_add_int64(&base, inc);
elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
unit_test_result(result == expected_result && base == expected_result);
......@@ -1809,7 +1809,7 @@ atomic_test(void)
result = 0;
expected_result = base + inc;
elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
result = gp_atomic_add_64(&base, inc);
result = gp_atomic_add_int64(&base, inc);
elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
unit_test_result(result == expected_result && base == expected_result);
......@@ -1819,7 +1819,7 @@ atomic_test(void)
result = 0;
expected_result = base + inc;
elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
result = gp_atomic_add_64(&base, inc);
result = gp_atomic_add_int64(&base, inc);
elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
unit_test_result(result == expected_result && base == expected_result);
......@@ -1829,7 +1829,7 @@ atomic_test(void)
result = 0;
expected_result = base + inc;
elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
result = gp_atomic_add_64(&base, inc);
result = gp_atomic_add_int64(&base, inc);
elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result);
unit_test_result(result == expected_result && base == expected_result);
}
......
......@@ -133,7 +133,7 @@ WorkfileQueryspace_Reserve(int64 bytes_to_reserve)
return true;
}
int64 total = gp_atomic_add_64(&queryEntry->queryDiskspace, bytes_to_reserve);
int64 total = gp_atomic_add_int64(&queryEntry->queryDiskspace, bytes_to_reserve);
Assert(total >= (int64) 0);
if (gp_workfile_limit_per_query == 0)
......@@ -151,7 +151,7 @@ WorkfileQueryspace_Reserve(int64 bytes_to_reserve)
workfileError = WORKFILE_ERROR_LIMIT_PER_QUERY;
/* Revert the reserved space */
gp_atomic_add_64(&queryEntry->queryDiskspace, - bytes_to_reserve);
gp_atomic_add_int64(&queryEntry->queryDiskspace, - bytes_to_reserve);
/* Set diskfull to true to stop any further attempts to write more data */
WorkfileDiskspace_SetFull(true /* isFull */);
}
......@@ -187,7 +187,7 @@ WorkfileQueryspace_Commit(int64 commit_bytes, int64 reserved_bytes)
#if USE_ASSERT_CHECKING
int64 total =
#endif
gp_atomic_add_64(&queryEntry->queryDiskspace, (commit_bytes - reserved_bytes));
gp_atomic_add_int64(&queryEntry->queryDiskspace, (commit_bytes - reserved_bytes));
Assert(total >= (int64) 0);
}
}
......
......@@ -58,7 +58,7 @@ WorkfileSegspace_Reserve(int64 bytes_to_reserve)
{
Assert(NULL != used_segspace);
int64 total = gp_atomic_add_64(used_segspace, bytes_to_reserve);
int64 total = gp_atomic_add_int64(used_segspace, bytes_to_reserve);
Assert(total >= (int64) 0);
if (gp_workfile_limit_per_segment == 0)
......@@ -81,7 +81,7 @@ WorkfileSegspace_Reserve(int64 bytes_to_reserve)
{
/* Revert the reserved space */
(void) gp_atomic_add_64(used_segspace, - bytes_to_reserve);
(void) gp_atomic_add_int64(used_segspace, - bytes_to_reserve);
CHECK_FOR_INTERRUPTS();
......@@ -106,7 +106,7 @@ WorkfileSegspace_Reserve(int64 bytes_to_reserve)
}
/* Try to reserve again */
total = gp_atomic_add_64(used_segspace, bytes_to_reserve);
total = gp_atomic_add_int64(used_segspace, bytes_to_reserve);
Assert(total >= (int64) 0);
if (total <= max_allowed_diskspace)
......@@ -150,7 +150,7 @@ WorkfileSegspace_Commit(int64 commit_bytes, int64 reserved_bytes)
#if USE_ASSERT_CHECKING
int64 total =
#endif
gp_atomic_add_64(used_segspace, (commit_bytes - reserved_bytes));
gp_atomic_add_int64(used_segspace, (commit_bytes - reserved_bytes));
Assert(total >= (int64) 0);
}
......
......@@ -613,6 +613,9 @@ namespace gpdb {
// return the number of leaf partition for a given table oid
gpos::ULONG UlLeafPartitions(Oid oidRelation);
// Notify Metadata Versioning of a new command starting. Returns true if MDCache should be purged
bool FMDVersioningNewCommand(void);
} //namespace gpdb
#define ForEach(cell, l) \
......
......@@ -63,6 +63,7 @@ extern "C" {
#include "parser/parse_coerce.h"
#include "utils/selfuncs.h"
#include "utils/faultinjector.h"
#include "utils/mdver.h"
extern
Query *preprocess_query_optimizer(Query *pquery, ParamListInfo boundParams);
......
......@@ -10,8 +10,8 @@ extern int32 compare_and_swap_ulong(unsigned long *dest,
unsigned long oldval,
unsigned long newval);
extern int32 gp_atomic_add_32(volatile int32 *ptr, int32 inc);
extern int64 gp_atomic_add_64(int64 *ptr, int64 inc);
extern int64 gp_atomic_add_int64(int64 *ptr, int64 inc);
extern uint64 gp_atomic_add_uint64(uint64 *ptr, int64 inc);
extern int32 gp_atomic_incmod_32(volatile int32 *loc, int32 mod);
extern uint32 gp_atomic_dec_positive_32(volatile uint32 *loc, uint32 dec);
......
......@@ -450,6 +450,9 @@ extern bool optimizer_enable_derive_stats_all_groups;
extern bool optimizer_explain_show_status;
extern bool optimizer_prefer_scalar_dqa_multistage_agg;
/* Metadata Versioning Guc Variables */
extern int gp_mdver_loglevel;
/**
* Enable logging of DPE match in optimizer.
*/
......
/* -------------------------------------------------------------------------
*
* mdver.h
* Interface for metadata versioning
*
* Copyright (c) 2014, Pivotal, Inc.
*
*
* -------------------------------------------------------------------------
*/
#ifndef MDVER_H
#define MDVER_H
#include "postgres.h"
#include "utils/relcache.h"
typedef struct mdver_local
{
/*
* An integer stored inside the Metadata Cache component of a Backend
* process. This holds the last global generation observed by the backend. This
* is updated only when a command starts.
*/
uint64 local_generation;
/*
* Flag is set to true for a transaction that changed metadata. At the
* end of the transaction, this will trigger an update to the global
* generation counter (GG).
*/
bool transaction_dirty;
} mdver_local;
/* Pointer to the shared memory global cache generation (GG) */
extern uint64 *mdver_global_generation;
/* Set to true if mdcache is marked as "dirty" and needs purging */
extern bool mdver_dirty_mdcache;
/* Metadata Versioning global generation functionality */
void mdver_shmem_init(void);
Size mdver_shmem_size(void);
uint64 mdver_get_global_generation(void);
void mdver_bump_global_generation(mdver_local* local_mdver);
/* Metadata Versioning local generation functionality */
mdver_local* mdver_create_local(void);
void mdver_mark_dirty_xact(mdver_local* local_mdver);
bool mdver_command_begin(void);
/* Metadata Versioning Invalidation Translator operations */
void mdver_inv_translator(Relation relation);
/* Metadata Versioning utility functions */
bool mdver_enabled();
/* inval.c */
extern mdver_local *GetCurrentLocalMDVer(void);
#endif /* MDVER_H */
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册