未验证 提交 1abd71f0 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4883 from taosdata/hotfix/TD-2516

[TD-2516]<fix>:taos crash when receiving meta response while query is canceled
......@@ -43,6 +43,11 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql);
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId);
void tscLockByThread(int64_t *lockedBy);
void tscUnlockByThread(int64_t *lockedBy);
#ifdef __cplusplus
}
#endif
......
......@@ -347,6 +347,11 @@ typedef struct SSqlObj {
SSubqueryState subState;
struct SSqlObj **pSubs;
int64_t metaRid;
int64_t svgroupRid;
int64_t squeryLock;
struct SSqlObj *prev, *next;
int64_t self;
} SSqlObj;
......
......@@ -402,8 +402,10 @@ void tscAsyncResultOnError(SSqlObj *pSql) {
int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj *pSql = (SSqlObj *)param;
if (pSql == NULL || pSql->signature != pSql) return;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
if (pSql == NULL) return;
assert(pSql->signature == pSql && (int64_t)param == pSql->self);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -428,7 +430,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
......@@ -436,6 +439,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
// tscProcessSql can add error into async res
tscProcessSql(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // continue to process normal async query
if (pCmd->parseFinished) {
......@@ -446,6 +450,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
......@@ -458,6 +463,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -468,12 +474,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscProcessSql(pSql);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
tscDebug("%p continue parse sql after get table meta", pSql);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -483,12 +491,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
......@@ -501,6 +511,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -509,6 +520,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -521,10 +533,16 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
tscDoQuery(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
_error:
......@@ -532,4 +550,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
pSql->res.code = code;
tscAsyncResultOnError(pSql);
}
taosReleaseRef(tscObjRef, pSql->self);
}
......@@ -423,7 +423,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
taosRemoveRef(tscObjRef, pSql->self);
tscDebug("%p sqlObj is automatically freed", pSql);
tscDebug("%p sqlObj is automatically freed", pSql);
}
rpcFreeCont(rpcMsg->pCont);
......@@ -1987,16 +1987,20 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
}
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
// master sqlObj locates in param
SSqlObj* parent = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param);
if(parent == NULL) {
return pSql->res.code;
}
assert(parent->signature == parent && (int64_t)pSql->param == parent->self);
SSqlRes* pRes = &pSql->res;
// NOTE: the order of several table must be preserved.
SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp;
pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
char *pMsg = pRes->pRsp + sizeof(SSTableVgroupRspMsg);
// master sqlObj locates in param
SSqlObj* parent = pSql->param;
assert(parent != NULL);
SSqlCmd* pCmd = &parent->cmd;
for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
......@@ -2030,6 +2034,8 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
pMsg += size;
}
taosReleaseRef(tscObjRef, parent->self);
return pSql->res.code;
}
......@@ -2323,10 +2329,14 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
registerSqlObj(pNew);
pNew->fp = tscTableMetaCallBack;
pNew->param = pSql;
pNew->param = (void *)pSql->self;
registerSqlObj(pNew);
tscDebug("%p metaRid from %" PRId64 " to %" PRId64 , pSql, pSql->metaRid, pNew->self);
pSql->metaRid = pNew->self;
int32_t code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {
......@@ -2343,6 +2353,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
uint32_t size = tscGetTableMetaMaxSize();
pTableMetaInfo->pTableMeta = calloc(1, size);
pTableMetaInfo->pTableMeta->tableType = -1;
pTableMetaInfo->pTableMeta->tableInfo.numOfColumns = -1;
int32_t len = (int32_t) strlen(pTableMetaInfo->name);
......@@ -2442,10 +2453,15 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
registerSqlObj(pNew);
tscDebug("%p svgroupRid from %" PRId64 " to %" PRId64 , pSql, pSql->svgroupRid, pNew->self);
pSql->svgroupRid = pNew->self;
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack;
pNew->param = pSql;
pNew->param = (void *)pSql->self;
code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
......
......@@ -694,6 +694,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
// set the master sqlObj flag to cancel query
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscLockByThread(&pSql->squeryLock);
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
// NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i];
......@@ -713,6 +715,12 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
taosReleaseRef(tscObjRef, pSubObj->self);
}
if (pSql->subState.numOfSub <= 0) {
tscAsyncResultOnError(pSql);
}
tscUnlockByThread(&pSql->squeryLock);
tscDebug("%p super table query cancelled", pSql);
}
......
......@@ -551,7 +551,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
registerSqlObj(pSql);
tscAddIntoStreamList(pStream);
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
......@@ -610,12 +609,15 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
tscCreateStream(pStream, pSql, code);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code));
tscFreeSqlObj(pSql);
taosReleaseRef(tscObjRef, pSql->self);
free(pStream);
return NULL;
}
......
......@@ -533,7 +533,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
freeJoinSubqueryObj(pSqlObj);
}
tscDestroyJoinSupporter(pSupporter);
//tscDestroyJoinSupporter(pSupporter);
}
// update the query time range according to the join results on timestamp
......@@ -1362,14 +1362,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
// retrieve actual query results from vnode during the second stage join subquery
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
return;
}
}
quitAllSubquery(pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
......@@ -1383,6 +1392,12 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
pParentSql->res.code = code;
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
return;
}
}
quitAllSubquery(pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
......@@ -1405,9 +1420,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
return;
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
......@@ -1658,6 +1670,25 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
}
}
void tscLockByThread(int64_t *lockedBy) {
int64_t tid = taosGetSelfPthreadId();
int i = 0;
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
if (++i % 100 == 0) {
sched_yield();
}
}
}
void tscUnlockByThread(int64_t *lockedBy) {
int64_t tid = taosGetSelfPthreadId();
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
assert(false);
}
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......
......@@ -468,6 +468,18 @@ void tscFreeRegisteredSqlObj(void *pSql) {
}
void tscFreeMetaSqlObj(int64_t *rid){
if (RID_VALID(*rid)) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, *rid);
if (pSql) {
taosRemoveRef(tscObjRef, *rid);
taosReleaseRef(tscObjRef, *rid);
}
*rid = 0;
}
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
......@@ -477,6 +489,9 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscFreeMetaSqlObj(&pSql->metaRid);
tscFreeMetaSqlObj(&pSql->svgroupRid);
tscFreeSubobj(pSql);
SSqlCmd* pCmd = &pSql->cmd;
......@@ -505,6 +520,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pCmd->allocSize = 0;
tsem_destroy(&pSql->rspSem);
memset(pSql, 0, sizeof(*pSql));
free(pSql);
}
......@@ -2193,7 +2209,9 @@ void tscDoQuery(SSqlObj* pSql) {
tscProcessSql(pSql);
} else { // secondary stage join query.
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock);
tscHandleMasterSTableQuery(pSql);
tscUnlockByThread(&pSql->squeryLock);
} else {
tscProcessSql(pSql);
}
......@@ -2202,7 +2220,9 @@ void tscDoQuery(SSqlObj* pSql) {
return;
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock);
tscHandleMasterSTableQuery(pSql);
tscUnlockByThread(&pSql->squeryLock);
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册