提交 9a91f0e4 编写于 作者: H Haojun Liao

[td-225] add some logs.

上级 a79f608d
...@@ -202,8 +202,9 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -202,8 +202,9 @@ static void *dnodeProcessReadQueue(void *param) {
break; break;
} }
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle, dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type); taosMsg[pReadMsg->rpcMsg.msgType], type, pReadMsg);
int32_t code = vnodeProcessRead(pVnode, pReadMsg); int32_t code = vnodeProcessRead(pVnode, pReadMsg);
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
......
...@@ -219,6 +219,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "Duplicated ...@@ -219,6 +219,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "Duplicated
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT, 0, 0x0706, "Tag conditon too many") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT, 0, 0x0706, "Tag conditon too many")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not ready") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query")
// grant // grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired")
......
...@@ -184,8 +184,8 @@ enum { ...@@ -184,8 +184,8 @@ enum {
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
int32_t pointsInterpo; int32_t code; // error code to returned to client
int32_t code; // error code to returned to client pthread_t owner; // if it is in execution
void* tsdb; void* tsdb;
int32_t vgId; int32_t vgId;
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list> STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
......
...@@ -2784,6 +2784,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2784,6 +2784,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf); setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
// todo add windowRes iterator
int64_t lastTimestamp = -1; int64_t lastTimestamp = -1;
int64_t startt = taosGetTimestampMs(); int64_t startt = taosGetTimestampMs();
...@@ -2791,7 +2792,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2791,7 +2792,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
int32_t pos = pTree->pNode[0].index; int32_t pos = pTree->pNode[0].index;
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
SWindowResult * pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); SWindowResult *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page);
...@@ -2828,6 +2829,9 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2828,6 +2829,9 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
lastTimestamp = ts; lastTimestamp = ts;
// move to the next element of current entry
int32_t currentPageId = pWindowRes->pos.pageId;
cs.position[pos] += 1; cs.position[pos] += 1;
if (cs.position[pos] >= pWindowResInfo->size) { if (cs.position[pos] >= pWindowResInfo->size) {
cs.position[pos] = -1; cs.position[pos] = -1;
...@@ -2836,6 +2840,12 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2836,6 +2840,12 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
if (--numOfTables == 0) { if (--numOfTables == 0) {
break; break;
} }
} else {
// current page is not needed anymore
SWindowResult *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
if (pNextWindowRes->pos.pageId != currentPageId) {
releaseResBufPage(pRuntimeEnv->pResultBuf, page);
}
} }
} }
...@@ -5081,8 +5091,6 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -5081,8 +5091,6 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
} }
pQInfo->pointsInterpo += numOfFilled;
} }
static void tableQueryImpl(SQInfo *pQInfo) { static void tableQueryImpl(SQInfo *pQInfo) {
...@@ -6330,16 +6338,24 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -6330,16 +6338,24 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
// clear qhandle owner
// assert(pQInfo->owner == pthread_self());
// pQInfo->owner = 0;
return buildRes; return buildRes;
} }
bool qTableQuery(qinfo_t qinfo) { bool qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
if (pQInfo == NULL || pQInfo->signature != pQInfo) { // int64_t threadId = pthread_self();
qDebug("QInfo:%p has been freed, no need to execute", pQInfo);
return false; // int64_t curOwner = 0;
} // if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
// qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner);
// pQInfo->code = TSDB_CODE_QRY_IN_EXEC;
// return false;
// }
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
......
...@@ -186,7 +186,7 @@ static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { ...@@ -186,7 +186,7 @@ static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
return GET_DATA_PAYLOAD(pg); return GET_DATA_PAYLOAD(pg);
} }
#define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL);
...@@ -281,7 +281,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 ...@@ -281,7 +281,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
pResultBuf->statis.getPages += 1; pResultBuf->statis.getPages += 1;
char* availablePage = NULL; char* availablePage = NULL;
if (NO_AVAILABLE_PAGES(pResultBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf); availablePage = evicOneDataPage(pResultBuf);
} }
...@@ -340,7 +340,7 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { ...@@ -340,7 +340,7 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0); assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0);
char* availablePage = NULL; char* availablePage = NULL;
if (NO_AVAILABLE_PAGES(pResultBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf); availablePage = evicOneDataPage(pResultBuf);
} }
...@@ -396,12 +396,13 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { ...@@ -396,12 +396,13 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
} }
if (pResultBuf->file != NULL) { if (pResultBuf->file != NULL) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file size:%"PRId64" bytes", qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, inmem size:%dbytes, file size:%"PRId64" bytes",
pResultBuf->handle, pResultBuf->totalBufSize, pResultBuf->fileSize); pResultBuf->handle, pResultBuf->totalBufSize, listNEles(pResultBuf->lruList) * pResultBuf->pageSize,
pResultBuf->fileSize);
fclose(pResultBuf->file); fclose(pResultBuf->file);
} else { } else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle, qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle,
pResultBuf->totalBufSize); pResultBuf->totalBufSize);
} }
......
...@@ -32,7 +32,7 @@ typedef void (*_hash_free_fn_t)(void *param); ...@@ -32,7 +32,7 @@ typedef void (*_hash_free_fn_t)(void *param);
typedef struct SHashNode { typedef struct SHashNode {
char *key; char *key;
struct SHashNode *prev; // struct SHashNode *prev;
struct SHashNode *next; struct SHashNode *next;
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t keyLen; // length of the key uint32_t keyLen; // length of the key
...@@ -47,7 +47,7 @@ typedef enum SHashLockTypeE { ...@@ -47,7 +47,7 @@ typedef enum SHashLockTypeE {
typedef struct SHashEntry { typedef struct SHashEntry {
int32_t num; // number of elements in current entry int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch SRWLatch latch; // entry latch
SHashNode head; // dummy head SHashNode *next;
} SHashEntry; } SHashEntry;
typedef struct SHashObj { typedef struct SHashObj {
......
...@@ -20,12 +20,21 @@ ...@@ -20,12 +20,21 @@
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) #define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
#define FREE_HASH_NODE(_n) \ #define DO_FREE_HASH_NODE(_n) \
do { \ do { \
taosTFree((_n)->data); \ taosTFree((_n)->data); \
taosTFree(_n); \ taosTFree(_n); \
} while (0) } while (0)
#define FREE_HASH_NODE(_h, _n) \
do { \
if ((_h)->freeFp) { \
(_h)->freeFp((_n)->data); \
} \
\
DO_FREE_HASH_NODE(_n); \
} while (0);
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) { static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) { if (type == HASH_NO_LOCK) {
return; return;
...@@ -65,17 +74,8 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { ...@@ -65,17 +74,8 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
return i; return i;
} }
/**
* Get SHashNode from hashlist, nodes from trash are not included.
* @param pHashObj Cache objection
* @param key key for hash
* @param keyLen key length
* @param hashVal hash value by hash function
* @return
*/
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) { static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
SHashNode *pNode = pe->head.next; SHashNode *pNode = pe->next;
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
...@@ -88,28 +88,21 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *k ...@@ -88,28 +88,21 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *k
return pNode; return pNode;
} }
static FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, static FORCE_INLINE SHashNode *doSerchPrevInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
uint32_t hashVal) { SHashNode *prev= NULL;
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashNode *pNode = pe->next;
SHashEntry *pe = pHashObj->hashList[slot]; while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
// no data, return directly assert(pNode->hashVal == hashVal);
if (atomic_load_32(&pe->num) == 0) { break;
return NULL; }
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pe->latch);
}
SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal);
if (pHashObj->type == HASH_ENTRY_LOCK) { prev = pNode;
taosRUnLockLatch(&pe->latch); pNode = pNode->next;
} }
return pNode; return prev;
} }
/** /**
...@@ -153,7 +146,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNe ...@@ -153,7 +146,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNe
* @param pHashObj * @param pHashObj
* @param pNode * @param pNode
*/ */
static void pushfrontNode(SHashEntry *pEntry, SHashNode *pNode); static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
/** /**
* Get the next element in hash table for iterator * Get the next element in hash table for iterator
...@@ -225,7 +218,13 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -225,7 +218,13 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
taosWLockLatch(&pe->latch); taosWLockLatch(&pe->latch);
} }
SHashNode *pNode = pe->head.next; SHashNode *pNode = pe->next;
if (pe->num > 0) {
assert(pNode != NULL);
} else {
assert(pNode == NULL);
}
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
...@@ -237,7 +236,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -237,7 +236,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
if (pNode == NULL) { if (pNode == NULL) {
// no data in hash table with the specified key, add it into hash table // no data in hash table with the specified key, add it into hash table
pushfrontNode(pe, pNewNode); pushfrontNodeInEntryList(pe, pNewNode);
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch); taosWUnLockLatch(&pe->latch);
...@@ -261,7 +260,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -261,7 +260,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize // enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type); __rd_unlock(&pHashObj->lock, pHashObj->type);
FREE_HASH_NODE(pNewNode); DO_FREE_HASH_NODE(pNewNode);
return pHashObj->enableUpdate ? 0 : -1; return pHashObj->enableUpdate ? 0 : -1;
} }
} }
...@@ -301,6 +300,7 @@ void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*f ...@@ -301,6 +300,7 @@ void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*f
if (fp != NULL) { if (fp != NULL) {
fp(pNode->data); fp(pNode->data);
} }
data = pNode->data; data = pNode->data;
} }
...@@ -316,13 +316,12 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -316,13 +316,12 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0); return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0);
} }
static FORCE_INLINE void doPopFromEntryList(SHashEntry *pe, SHashNode *pNode) { static FORCE_INLINE void doPopNextFromEntryList(SHashEntry *pe, SHashNode *pNode) {
SHashNode *pNext = pNode->next; SHashNode *pNext = pNode->next;
assert(pNode->prev != NULL);
pNode->prev->next = pNext;
if (pNext != NULL) { if (pNext != NULL) {
pNext->prev = pNode->prev; pNode->next = pNext->next;
} else {
pNode->next = NULL;
} }
pe->num -= 1; pe->num -= 1;
...@@ -351,9 +350,27 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe ...@@ -351,9 +350,27 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
taosWLockLatch(&pe->latch); taosWLockLatch(&pe->latch);
} }
SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal); SHashNode *pNode = pe->next;
if (pNode != NULL) { SHashNode *pRes = NULL;
doPopFromEntryList(pe, pNode); // remove it
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
pe->next = pNode->next;
pRes = pNode;
} else {
while (pNode->next != NULL) {
if (((pNode->next)->keyLen == keyLen) && (memcmp((pNode->next)->key, key, keyLen) == 0)) {
assert((pNode->next)->hashVal == hashVal);
break;
}
pNode = pNode->next;
}
if (pNode->next != NULL) {
pRes = pNode->next;
pNode->next = pNode->next->next;
}
} }
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
...@@ -362,18 +379,14 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe ...@@ -362,18 +379,14 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
__rd_unlock(&pHashObj->lock, pHashObj->type); __rd_unlock(&pHashObj->lock, pHashObj->type);
if (data != NULL) { if (data != NULL && pRes != NULL) {
memcpy(data, pNode->data, dsize); memcpy(data, pRes->data, dsize);
} }
if (pNode != NULL) { if (pRes != NULL) {
pe->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1); atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pHashObj, pRes);
pNode->next = NULL;
pNode->prev = NULL;
FREE_HASH_NODE(pNode);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -391,7 +404,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi ...@@ -391,7 +404,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
int32_t numOfEntries = pHashObj->capacity; int32_t numOfEntries = pHashObj->capacity;
for (int32_t i = 0; i < numOfEntries; ++i) { for (int32_t i = 0; i < numOfEntries; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i]; SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num <= 0) { if (pEntry->num == 0) {
continue; continue;
} }
...@@ -399,20 +412,35 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi ...@@ -399,20 +412,35 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
taosWLockLatch(&pEntry->latch); taosWLockLatch(&pEntry->latch);
} }
SHashNode *pNode = pEntry->head.next; // todo remove first node
assert(pNode != NULL); SHashNode *pNode = NULL;
while((pNode = pEntry->next) != NULL) {
SHashNode *pNext = NULL;
while (pNode != NULL) {
pNext = pNode->next;
// not qualified, remove it
if (fp && (!fp(param, pNode->data))) { if (fp && (!fp(param, pNode->data))) {
doPopFromEntryList(pEntry, pNode); pEntry->num -= 1;
FREE_HASH_NODE(pNode); pEntry->next = pNode->next;
FREE_HASH_NODE(pHashObj, pNode);
} else {
break;
} }
}
// handle the following node
if (pNode != NULL) {
assert(pNode == pEntry->next);
SHashNode *pNext = NULL;
pNode = pNext; while ((pNext = pNode->next) != NULL) {
// not qualified, remove it
if (fp && (!fp(param, pNext->data))) {
pNode->next = pNext->next;
pEntry->num -= 1;
FREE_HASH_NODE(pHashObj, pNext);
} else {
pNode = pNext;
}
}
} }
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
...@@ -437,18 +465,15 @@ void taosHashCleanup(SHashObj *pHashObj) { ...@@ -437,18 +465,15 @@ void taosHashCleanup(SHashObj *pHashObj) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i]; SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) { if (pEntry->num == 0) {
assert(pEntry->head.next == 0); assert(pEntry->next == 0);
continue; continue;
} }
pNode = pEntry->head.next; pNode = pEntry->next;
while (pNode) { while (pNode) {
pNext = pNode->next; pNext = pNode->next;
if (pHashObj->freeFp) { FREE_HASH_NODE(pHashObj, pNode);
pHashObj->freeFp(pNode->data);
}
FREE_HASH_NODE(pNode);
pNode = pNext; pNode = pNext;
} }
} }
...@@ -501,6 +526,8 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { ...@@ -501,6 +526,8 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
while (1) { while (1) {
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex]; SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
if (pEntry->num == 0) { if (pEntry->num == 0) {
assert(pEntry->next == NULL);
pIter->entryIndex++; pIter->entryIndex++;
continue; continue;
} }
...@@ -509,7 +536,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { ...@@ -509,7 +536,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
taosRLockLatch(&pEntry->latch); taosRLockLatch(&pEntry->latch);
} }
pIter->pCur = pEntry->head.next; pIter->pCur = pEntry->next;
if (pIter->pCur->next) { if (pIter->pCur->next) {
pIter->pNext = pIter->pCur->next; pIter->pNext = pIter->pCur->next;
...@@ -595,7 +622,7 @@ void taosHashTableResize(SHashObj *pHashObj) { ...@@ -595,7 +622,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
return; return;
} }
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(SHashEntry) * newSize); void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void*) * newSize);
if (pNewEntryList == NULL) { // todo handle error if (pNewEntryList == NULL) { // todo handle error
// uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); // uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return; return;
...@@ -616,33 +643,39 @@ void taosHashTableResize(SHashObj *pHashObj) { ...@@ -616,33 +643,39 @@ void taosHashTableResize(SHashObj *pHashObj) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pe = pHashObj->hashList[i]; SHashEntry *pe = pHashObj->hashList[i];
if (pe->num == 0) { if (pe->num == 0) {
assert(pe->head.next == NULL); assert(pe->next == NULL);
continue; continue;
} }
pNode = pe->head.next; while ((pNode = pe->next) != NULL) {
while (pNode) {
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity); int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
if (j == i) { // this key locates in the same slot, no need to relocate it if (j != i) {
pNode = pNode->next; pe->num -= 1;
} else { pe->next = pNode->next;
pNext = pNode->next;
assert(pNode != pNext && (pNext == NULL || pNext->prev == pNode) && pNode->prev->next == pNode);
doPopFromEntryList(pe, pNode);
// clear pointer
pNode->next = NULL;
pNode->prev = NULL;
// added into new slot
SHashEntry *pNewEntry = pHashObj->hashList[j]; SHashEntry *pNewEntry = pHashObj->hashList[j];
pushfrontNode(pNewEntry, pNode); pushfrontNodeInEntryList(pNewEntry, pNode);
} else {
break;
}
}
// continue if (pNode != NULL) {
pNode = pNext; while ((pNext = pNode->next) != NULL) {
int32_t j = HASH_INDEX(pNext->hashVal, pHashObj->capacity);
if (j != i) {
pNode->next = pNext->next;
pNext->next = NULL;
// added into new slot
SHashEntry *pNewEntry = pHashObj->hashList[j];
pushfrontNodeInEntryList(pNewEntry, pNext);
} else {
pNode = pNext;
}
} }
} }
} }
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, // uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
...@@ -668,17 +701,11 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s ...@@ -668,17 +701,11 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
return pNewNode; return pNewNode;
} }
void pushfrontNode(SHashEntry *pEntry, SHashNode *pNode) { void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode) {
assert(pNode != NULL && pEntry != NULL); assert(pNode != NULL && pEntry != NULL);
SHashNode *pNext = pEntry->head.next; pNode->next = pEntry->next;
if (pNext != NULL) { pEntry->next = pNode;
pNext->prev = pNode;
}
pNode->next = pNext;
pNode->prev = &pEntry->head;
pEntry->head.next = pNode;
pEntry->num += 1; pEntry->num += 1;
} }
...@@ -700,7 +727,7 @@ SHashNode *getNextHashNode(SHashMutableIterator *pIter) { ...@@ -700,7 +727,7 @@ SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
taosRLockLatch(&pEntry->latch); taosRLockLatch(&pEntry->latch);
} }
p = pEntry->head.next; p = pEntry->next;
if (pIter->pHashObj->type == HASH_ENTRY_LOCK) { if (pIter->pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pEntry->latch); taosRUnLockLatch(&pEntry->latch);
......
...@@ -74,6 +74,8 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { ...@@ -74,6 +74,8 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
pRead->rpcMsg.handle = NULL; pRead->rpcMsg.handle = NULL;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("QInfo:%p add to query task queue for exec, msg:%p", qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
} }
...@@ -83,7 +85,6 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, b ...@@ -83,7 +85,6 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, b
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) { if (continueExec) {
vDebug("QInfo:%p add to query task queue for exec", handle);
vnodePutItemIntoReadQueue(pVnode, handle); vnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = handle; pRet->qhandle = handle;
*freeHandle = false; *freeHandle = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册