提交 0cd1dd4a 编写于 作者: H hjxilinx

refactor code in cache module, and fix some race condition problems.

上级 07f26663
......@@ -29,6 +29,7 @@
#include "ttimer.h"
#include "tutil.h"
#define HASH_MAX_CAPACITY (1024*1024*16)
#define HASH_VALUE_IN_TRASH (-1)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
......@@ -81,24 +82,68 @@ typedef struct {
*
* when the node in pTrash does not be referenced, it will be release at the expired time
*/
SDataNode * pTrash;
int numOfElemsInTrash; // number of element in trash
void * tmrCtrl;
void * pTimer;
SCacheStatis statistics;
_hashFunc hashFp;
#if defined LINUX
SDataNode * pTrash;
void * tmrCtrl;
void * pTimer;
SCacheStatis statistics;
_hashFunc hashFp;
int numOfElemsInTrash; // number of element in trash
int16_t deleting; // set the deleting flag to stop refreshing asap.
int16_t refreshing; // if refreshing is invoked, it will be set 1
#if defined LINUX
pthread_rwlock_t lock;
#else
pthread_mutex_t mutex;
pthread_mutex_t lock;
#endif
} SCacheObj;
static FORCE_INLINE int32_t taosNormalHashTableLength(int32_t length) {
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
#endif
}
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) {
#if defined LINUX
pthread_rwlock_rdlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
#endif
}
static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) {
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->lock);
#endif
}
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) {
#if defined LINUX
return pthread_rwlock_init(&pObj->lock, NULL);
#else
return pthread_mutex_init(&pObj->lock, NULL);
#endif
}
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pObj) {
#if defined LINUX
pthread_rwlock_destroy(&pObj->lock);
#else
pthread_mutex_destroy(&pObj->lock);
#endif
}
static FORCE_INLINE int32_t taosHashTableLength(int32_t length) {
int32_t trueLength = MIN(length, HASH_MAX_CAPACITY);
int32_t i = 4;
while (i < length) i = (i << 1);
while (i < trueLength) i = (i << 1);
return i;
}
......@@ -197,22 +242,15 @@ static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) {
* may cause corruption. So, forece model only applys before cache is closed
*/
static void taosClearCacheTrash(SCacheObj *pObj, bool force) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->mutex);
#endif
__cache_wr_lock(pObj);
if (pObj->numOfElemsInTrash == 0) {
if (pObj->pTrash != NULL) {
pError("key:inconsistency data in cache, numOfElem in trash:%d", pObj->numOfElemsInTrash);
}
pObj->pTrash = NULL;
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
return;
}
......@@ -239,11 +277,7 @@ static void taosClearCacheTrash(SCacheObj *pObj, bool force) {
}
assert(pObj->numOfElemsInTrash >= 0);
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
}
/**
......@@ -323,7 +357,7 @@ static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) {
* @param keyLen key length
* @return
*/
static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, char *key, uint32_t keyLen) {
static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, const char *key, uint32_t keyLen) {
uint32_t hash = (*pObj->hashFp)(key, keyLen);
int32_t slot = HASH_INDEX(hash, pObj->capacity);
......@@ -358,8 +392,13 @@ static void taosHashTableResize(SCacheObj *pObj) {
SDataNode *pNext = NULL;
int32_t newSize = pObj->capacity << 1;
if (newSize > HASH_MAX_CAPACITY) {
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
pObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
SDataNode **pList = realloc(pObj->hashList, sizeof(SDataNode *) * newSize);
if (pList == NULL) {
pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity);
......@@ -367,6 +406,10 @@ static void taosHashTableResize(SCacheObj *pObj) {
}
pObj->hashList = pList;
int32_t inc = newSize - pObj->capacity;
memset(&pObj->hashList[pObj->capacity], 0, inc * sizeof(SDataNode *));
pObj->capacity = newSize;
for (int32_t i = 0; i < pObj->capacity; ++i) {
......@@ -457,7 +500,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
// only a node is not referenced by any other object, in-place update it
if (pNode->refCount == 0) {
size_t newSize = sizeof(SDataNode) + dataSize + keyLen;
size_t newSize = sizeof(SDataNode) + dataSize + (keyLen + 1);
pNewNode = (SDataNode *)realloc(pNode, newSize);
if (pNewNode == NULL) {
......@@ -542,11 +585,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
uint32_t keyLen = (uint32_t)strlen(key) + 1;
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->mutex);
#endif
__cache_wr_lock(pObj);
SDataNode *pOldNode = taosGetNodeFromHashTable(pObj, key, keyLen - 1);
......@@ -565,11 +604,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
pTrace("key:%s %p exist in cache, updated", key, pNode);
}
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
return (pNode != NULL) ? pNode->data : NULL;
}
......@@ -612,17 +647,13 @@ void taosRemoveDataFromCache(void *handle, void **data, bool _remove) {
*data = NULL;
if (_remove) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->mutex);
#endif
__cache_wr_lock(pObj);
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
// So we need to lock it in the first place.
taosDecRef(pNode);
taosCacheMoveNodeToTrash(pObj, pNode);
pthread_rwlock_unlock(&pObj->lock);
__cache_unlock(pObj);
} else {
taosDecRef(pNode);
}
......@@ -640,22 +671,14 @@ void *taosGetDataFromCache(void *handle, char *key) {
uint32_t keyLen = (uint32_t)strlen(key);
#if defined LINUX
pthread_rwlock_rdlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->mutex);
#endif
__cache_rd_lock(pObj);
SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen);
if (ptNode != NULL) {
__sync_add_and_fetch_32(&ptNode->refCount, 1);
}
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
if (ptNode != NULL) {
__sync_add_and_fetch_32(&pObj->statistics.hitCount, 1);
......@@ -685,11 +708,7 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in
uint32_t keyLen = strlen(key) + 1;
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->mutex);
#endif
__cache_wr_lock(pObj);
SDataNode *pNode = taosGetNodeFromHashTable(handle, key, keyLen - 1);
......@@ -702,25 +721,33 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in
pTrace("key:%s updated.expireTime:%lld.refCnt:%d", key, pNode->time, pNode->refCount);
}
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
return (pNew != NULL) ? pNew->data : NULL;
}
/**
* refresh cache to remove data in both hashlist and trash, if any nodes' refcount == 0, every pObj->refreshTime
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pObj->refreshTime
* @param handle Cache object handle
*/
void taosRefreshDataCache(void *handle, void *tmrId) {
SDataNode *pNode, *pNext;
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || (pObj->size == 0 && pObj->numOfElemsInTrash == 0)) {
taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
if (pObj == NULL || pObj->capacity <= 0 || pObj->deleting == 1) {
pTrace("object is destroyed. no refresh retry");
return;
}
pObj->refreshing = 1;
#if defined LINUX
__sync_synchronize();
#else
MemoryBarrier();
#endif
if (pObj->deleting == 1) {
pObj->refreshing = 0;
return;
}
......@@ -730,14 +757,15 @@ void taosRefreshDataCache(void *handle, void *tmrId) {
int32_t num = pObj->size;
for (int hash = 0; hash < pObj->capacity; ++hash) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->mutex);
#endif
for (int i = 0; i < pObj->capacity; ++i) {
// in deleting process, quit refreshing immediately
if (pObj->deleting == 1) {
pObj->refreshing = 0;
return;
}
pNode = pObj->hashList[hash];
__cache_wr_lock(pObj);
pNode = pObj->hashList[i];
while (pNode) {
numOfCheck++;
......@@ -751,23 +779,23 @@ void taosRefreshDataCache(void *handle, void *tmrId) {
/* all data have been checked, not need to iterate further */
if (numOfCheck == num || pObj->size <= 0) {
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
break;
}
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
}
taosClearCacheTrash(pObj, false);
taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
int16_t isDeleting = pObj->deleting;
pObj->refreshing = 0;
// the SCacheObj may have been released now.
if (isDeleting == 1) {
return;
} else {
taosClearCacheTrash(pObj, false);
taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
}
}
/**
......@@ -779,25 +807,22 @@ void taosClearDataCache(void *handle) {
SDataNode *pNode, *pNext;
SCacheObj *pObj = (SCacheObj *)handle;
for (int hash = 0; hash < pObj->capacity; ++hash) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->mutex);
#endif
int32_t capacity = pObj->capacity;
for (int i = 0; i < capacity; ++i) {
__cache_wr_lock(pObj);
pNode = pObj->hashList[hash];
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
taosCacheMoveNodeToTrash(pObj, pNode);
pNode = pNext;
}
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
pObj->hashList[i] = NULL;
__cache_unlock(pObj);
}
taosClearCacheTrash(pObj, false);
......@@ -823,8 +848,8 @@ void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) {
}
// the max slots is not defined by user
pObj->capacity = taosNormalHashTableLength(capacity);
assert((pObj->capacity & (pObj->capacity-1)) == 0);
pObj->capacity = taosHashTableLength(capacity);
assert((pObj->capacity & (pObj->capacity - 1)) == 0);
pObj->hashFp = taosHashKey;
pObj->refreshTime = refreshTime * 1000;
......@@ -839,11 +864,7 @@ void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) {
pObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
#if defined LINUX
if (pthread_rwlock_init(&pObj->lock, NULL) != 0) {
#else
if (pthread_mutex_init(&pObj->mutex, NULL) != 0) {
#endif
if (__cache_lock_init(pObj) != 0) {
taosTmrStopA(&pObj->pTimer);
free(pObj->hashList);
free(pObj);
......@@ -863,26 +884,35 @@ void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) {
void taosCleanUpDataCache(void *handle) {
SCacheObj *pObj;
SDataNode *pNode, *pNext;
pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->capacity <= 0) {
#if defined LINUX
pthread_rwlock_destroy(&pObj->lock);
#else
pthread_mutex_destroy(&pObj->mutex);
#endif
if (pObj == NULL) {
return;
}
if (pObj->capacity <= 0) {
__cache_lock_destroy(pObj);
free(pObj);
return;
}
taosTmrStopA(&pObj->pTimer);
pObj->deleting = 1;
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
__sync_synchronize();
#else
pthread_mutex_lock(&pObj->mutex);
MemoryBarrier();
#endif
while (pObj->refreshing == 1) {
taosMsleep(0);
}
__cache_wr_lock(pObj);
if (pObj->hashList && pObj->size > 0) {
for (int i = 0; i < pObj->capacity; ++i) {
pNode = pObj->hashList[i];
......@@ -893,22 +923,14 @@ void taosCleanUpDataCache(void *handle) {
}
}
free(pObj->hashList);
tfree(pObj->hashList);
}
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
__cache_unlock(pObj);
taosClearCacheTrash(pObj, true);
__cache_lock_destroy(pObj);
#if defined LINUX
pthread_rwlock_destroy(&pObj->lock);
#else
pthread_mutex_destroy(&pObj->mutex);
#endif
memset(pObj, 0, sizeof(SCacheObj));
free(pObj);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册