提交 6137a2b3 编写于 作者: S Shengliang Guan

TD-1746

上级 dad2a7c7
......@@ -216,8 +216,8 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
SVnodeGid *pVnode = pVgroup->vnodeGid + i;
if (pVnode == pRmVnode) continue;
mTrace("vgId:%d, change vgroup status, dnode:%d status:%d", pVgroup->vgId, pVnode->pDnode->dnodeId,
pVnode->pDnode->status);
mTrace("vgId:%d, check vgroup status, dnode:%d status:%d, vnode role:%s", pVgroup->vgId, pVnode->pDnode->dnodeId,
pVnode->pDnode->status, syncRole[pVnode->role]);
if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue;
if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue;
......
......@@ -588,7 +588,20 @@ static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo != 0) {
STableMetaInfo *pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo1 != NULL) {
// for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo1)) {
if (pTableMetaInfo1->pVgroupTables == 0) {
tscError("error !!!%p, vgroupTable is null", pSql);
}
}
}
}
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true;
}
......@@ -702,6 +715,19 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pQueryInfo != 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo != NULL) {
// for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
if (pTableMetaInfo->pVgroupTables == 0) {
tscError("error !!!%p, vgroupTable is null", pSql);
}
}
}
}
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return;
}
......@@ -750,6 +776,19 @@ void taos_stop_query(TAOS_RES *res) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pQueryInfo != 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo != NULL) {
// for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
if (pTableMetaInfo->pVgroupTables == 0) {
tscError("error !!!%p, vgroupTable is null", pSql);
}
}
}
}
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->pRpcCtx == NULL);
tscKillSTableQuery(pSql);
......
......@@ -114,9 +114,9 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
}
// for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->vgroupList != NULL);
}
// if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// assert(pTableMetaInfo->vgroupList != NULL);
// }
if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
return false;
......@@ -2098,6 +2098,18 @@ void tscDoQuery(SSqlObj* pSql) {
} else {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
uint16_t type = pQueryInfo->type;
if (pQueryInfo != 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo != NULL) {
// for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
if (pTableMetaInfo->pVgroupTables == 0) {
tscError("error !!!%p, vgroupTable is null", pSql);
}
}
}
}
if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion
tscHandleMultivnodeInsert(pSql);
......
......@@ -189,7 +189,6 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
return;
}
static void *dnodeProcessReadQueue(void *param) {
......
......@@ -41,6 +41,8 @@ typedef struct {
SRpcMsg rpcMsg;
} SReadMsg;
extern char *vnodeStatus[];
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir);
......
......@@ -111,7 +111,6 @@ void mnodeReleaseConn(SConnObj *pConn) {
}
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) {
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
......@@ -126,7 +125,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
}
// mDebug("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port);
pConn->lastAccess = expireTime;
pConn->lastAccess = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
return pConn;
}
......@@ -626,7 +625,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = atoi(pKill->queryId);
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......
......@@ -56,6 +56,14 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
int32_t vnodeInitResources() {
int code = syncInit();
if (code != 0) return code;
......@@ -74,6 +82,7 @@ int32_t vnodeInitResources() {
void vnodeCleanupResources() {
if (tsDnodeVnodesHash != NULL) {
vDebug("vnode list is cleanup");
taosHashCleanup(tsDnodeVnodesHash);
tsDnodeVnodesHash = NULL;
}
......@@ -84,9 +93,10 @@ void vnodeCleanupResources() {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t));
if (pTemp != NULL) {
vInfo("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
SVnodeObj *pVnode = vnodeAcquire(pVnodeCfg->cfg.vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, vnode already exist, refCount:%d pVnode:%p", pVnodeCfg->cfg.vgId, pVnode->refCount, pVnode);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
}
......@@ -143,22 +153,24 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return TSDB_CODE_VND_INIT_FAILED;
}
vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod);
vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel,
pVnodeCfg->cfg.fsyncPeriod);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
return code;
}
int32_t vnodeDrop(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
vDebug("vgId:%d, failed to drop, vgId not find", vgId);
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vDebug("vgId:%d, failed to drop, vnode not find", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
SVnodeObj *pVnode = *ppVnode;
vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount);
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
pVnode->dropped = 1;
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS;
......@@ -340,11 +352,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
}
int32_t vnodeClose(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) return 0;
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return 0;
SVnodeObj *pVnode = *ppVnode;
vDebug("vgId:%d, vnode will be closed", pVnode->vgId);
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
return 0;
......@@ -355,21 +367,27 @@ void vnodeRelease(void *pVnodeRaw) {
int32_t vgId = pVnode->vgId;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
assert(refCount >= 0);
if (refCount > 0) {
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2)
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) {
tsem_post(&pVnode->sem);
}
return;
}
qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL;
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
if (pVnode->qMgmt) {
qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL;
}
if (pVnode->tsdb)
if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
pVnode->tsdb = NULL;
}
// stop continuous query
if (pVnode->cq) {
......@@ -378,18 +396,21 @@ void vnodeRelease(void *pVnodeRaw) {
cqClose(cq);
}
if (pVnode->wal)
if (pVnode->wal) {
walClose(pVnode->wal);
pVnode->wal = NULL;
pVnode->wal = NULL;
}
if (pVnode->wqueue)
if (pVnode->wqueue) {
dnodeFreeVnodeWqueue(pVnode->wqueue);
pVnode->wqueue = NULL;
pVnode->wqueue = NULL;
}
if (pVnode->rqueue)
if (pVnode->rqueue) {
dnodeFreeVnodeRqueue(pVnode->rqueue);
pVnode->rqueue = NULL;
pVnode->rqueue = NULL;
}
taosTFree(pVnode->rootDir);
if (pVnode->dropped) {
......@@ -413,20 +434,20 @@ void vnodeRelease(void *pVnodeRaw) {
free(pVnode);
int32_t count = taosHashGetSize(tsDnodeVnodesHash);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vInfo("vgId:%d, not exist", vgId);
vDebug("vgId:%d, not exist", vgId);
return NULL;
}
SVnodeObj *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return pVnode;
}
......@@ -528,7 +549,7 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState)
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
}
vnodeRelease(pVnode);
}
......@@ -538,11 +559,12 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
int i = 0;
if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) {
int i = 0;
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) !=
TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) {
sched_yield();
}
......@@ -556,7 +578,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
syncStop(sync);
}
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
// release local resources only after cutting off outside connections
qQueryMgmtNotifyClosed(pVnode->qMgmt);
......@@ -613,17 +635,19 @@ static int vnodeResetTsdb(SVnodeObj *pVnode)
char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY)
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) {
return -1;
}
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
// acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2)
if (refCount > 2) {
tsem_wait(&pVnode->sem);
}
// close tsdb, then open tsdb
tsdbCloseRepo(tsdb, 0);
......
......@@ -202,7 +202,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pSync);
......@@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册