提交 3ff538c6 编写于 作者: H Haojun Liao

[td-225] merge develop

上级 6c5ff84d
......@@ -87,7 +87,6 @@ typedef struct SRetrieveSupport {
SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
pthread_mutex_t queryMutex;
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
......
......@@ -323,6 +323,7 @@ typedef struct SSqlObj {
SSqlRes res;
uint16_t numOfSubs;
struct SSqlObj **pSubs;
tsem_t subReadySem;
struct SSqlObj * prev, *next;
} SSqlObj;
......
......@@ -1377,13 +1377,12 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
tscResetForNextRetrieve(pRes);
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed
tscDebug("%p %s call the drop local reducer", pSql, __FUNCTION__);
tscDestroyLocalReducer(pSql);
return 0;
tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code));
return pRes->code;
}
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// set the data merge in progress
int32_t prevStatus =
......@@ -1478,8 +1477,8 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* so the processing of previous group is completed.
*/
int32_t numOfRes = finalizeRes(pQueryInfo, pLocalReducer);
bool sameGroup = isSameGroup(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpBuffer);
bool sameGroup = isSameGroup(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpBuffer);
tFilePage *pResBuf = pLocalReducer->pResultBuf;
/*
......
......@@ -314,10 +314,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
pRes->rspLen = 0;
if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
} else {
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
} else {
pRes->code = rpcMsg->code;
}
if (pRes->code == TSDB_CODE_SUCCESS) {
......@@ -460,11 +460,10 @@ void tscKillSTableQuery(SSqlObj *pSql) {
continue;
}
/*
* here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/
rpcCancelRequest(pSub->pRpcCtx);
if (pSub->pRpcCtx != NULL) {
rpcCancelRequest(pSub->pRpcCtx);
}
pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscQueueAsyncRes(pSub);
}
......@@ -1443,6 +1442,17 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
int32_t code = pRes->code;
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
return code;
}
// all subquery have completed already
if (pRes->pLocalReducer == NULL) {
sem_wait(&pSql->subReadySem);
}
pRes->code = tscDoLocalMerge(pSql);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -1453,7 +1463,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
pRes->row = 0;
pRes->completed = (pRes->numOfRows == 0);
int32_t code = pRes->code;
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
......
......@@ -295,6 +295,8 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
}
tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->subReadySem, 0, 0);
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
tsem_wait(&pSql->rspSem);
......@@ -655,16 +657,13 @@ int* taos_fetch_lengths(TAOS_RES *res) {
char *taos_get_client_info() { return version; }
void taos_stop_query(TAOS_RES *res) {
if (res == NULL) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) {
return;
}
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
if (pSql->signature != pSql) return;
tscDebug("%p start to cancel query", res);
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
......@@ -674,9 +673,8 @@ void taos_stop_query(TAOS_RES *res) {
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
rpcCancelRequest(pSql->pRpcCtx);
}
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscQueueAsyncRes(pSql);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscDebug("%p query is cancelled", res);
}
......
......@@ -1338,10 +1338,6 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState
SRetrieveSupport* pSupport = pSub->param;
taosTFree(pSupport->localBuffer);
pthread_mutex_unlock(&pSupport->queryMutex);
pthread_mutex_destroy(&pSupport->queryMutex);
taosTFree(pSupport);
tscFreeSqlObj(pSub);
......@@ -1414,13 +1410,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
pthread_mutexattr_t mutexattr;
memset(&mutexattr, 0, sizeof(pthread_mutexattr_t));
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&trs->queryMutex, &mutexattr);
pthread_mutexattr_destroy(&mutexattr);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
......@@ -1461,6 +1450,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
tscProcessSql(pSub);
}
// set the command flag must be after the semaphore been correctly set.
pSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
}
return TSDB_CODE_SUCCESS;
}
......@@ -1469,12 +1464,8 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
tscDebug("%p start to free subquery result", pSql);
taos_free_result(pSql);
taosTFree(trsupport->localBuffer);
pthread_mutex_unlock(&trsupport->queryMutex);
pthread_mutex_destroy(&trsupport->queryMutex);
taosTFree(trsupport);
}
......@@ -1498,8 +1489,6 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
pParentSql->res.code = code;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
pthread_mutex_unlock(&trsupport->queryMutex);
tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
}
......@@ -1518,8 +1507,6 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
// clear local saved number of results
trsupport->localBuffer->num = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
tstrerror(code), subqueryIndex, trsupport->numOfRetry);
......@@ -1602,15 +1589,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscFreeSubSqlObj(trsupport, pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} else { // regular super table query
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pParentSql);
}
}
tsem_post(&pParentSql->subReadySem);
}
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
......@@ -1682,14 +1661,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// only free once
taosTFree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set.
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
tscQueueAsyncRes(pParentSql);
}
// all subqueries are completed, retrieve from local can be proceeded.
tsem_post(&pParentSql->subReadySem);
}
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
......@@ -1707,9 +1681,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
......@@ -1783,7 +1754,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
return;
} else { // continue fetch data from dnode
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
......@@ -2083,12 +2053,6 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
tsem_post(&pSql->rspSem);
return;
}
// if (pSql->res.code == TSDB_CODE_SUCCESS) {
// (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
// } else {
// tscQueueAsyncRes(pSql);
// }
}
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
......
......@@ -388,6 +388,8 @@ void tscFreeSqlObj(SSqlObj* pSql) {
taosTFree(pSql->sqlstr);
tsem_destroy(&pSql->rspSem);
tsem_destroy(&pSql->subReadySem);
free(pSql);
}
......
......@@ -25,6 +25,8 @@
#include "taosdef.h"
#include "taoserror.h"
#include "tglobal.h"
#include "tsclient.h"
#include <regex.h>
/**************** Global variables ****************/
......@@ -64,11 +66,6 @@ TAOS *shellInit(SShellArguments *args) {
}
taos_init();
/*
* set tsTableMetaKeepTimer = 3000ms
* means not save cache in shell
*/
tsTableMetaKeepTimer = 3000;
// Connect to the database.
TAOS *con = NULL;
......@@ -303,8 +300,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
return;
}
int num_fields = taos_field_count(pSql);
if (num_fields != 0) { // select and show kinds of commands
if (!tscIsUpdateQuery(pSql)) { // select and show kinds of commands
int error_no = 0;
int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
if (numOfRows < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册