未验证 提交 2adadccf 编写于 作者: G Ganlin Zhao 提交者: GitHub

Merge pull request #20518 from taosdata/fix/TS-2865-2.6.0.35

fix(query): add stmt retry mechanism in case of schema change
......@@ -34,20 +34,22 @@ def sync_source() {
} else if (env.CHANGE_TARGET == '2.4') {
sh '''
cd ${WKC}
git clean -fxd
git clean -f
git checkout -f 2.4
'''
} else if (env.CHANGE_TARGET == '2.6') {
sh '''
cd ${WKC}
git clean -fxd
git clean -f
git checkout -f 2.6
'''
} else {
sh '''
cd ${WKC}
git clean -fxd
git checkout -f develop
git clean -f
git checkout -f 2.6
git remote prune origin
git fetch
'''
}
}
......@@ -84,7 +86,9 @@ def sync_source() {
} else {
sh '''
cd ${WK}
git checkout develop
git checkout -f 2.6
git remote prune origin
git fetch
'''
}
}
......@@ -230,22 +234,27 @@ def pre_test_win(){
} else {
bat '''
cd %WIN_INTERNAL_ROOT%
git checkout -f develop
git checkout -f 2.6
git remote prune origin
git fetch
'''
bat '''
cd %WIN_COMMUNITY_ROOT%
git checkout -f develop
git checkout -f 2.6
git remote prune origin
git fetch
'''
}
}
bat '''
cd %WIN_INTERNAL_ROOT%
git pull origin ''' + env.CHANGE_TARGET + '''
git remote prune origin
git pull
'''
bat '''
cd %WIN_COMMUNITY_ROOT%
git remote prune origin
git pull origin ''' + env.CHANGE_TARGET + '''
git remote prune origin
git pull
'''
bat '''
cd %WIN_INTERNAL_ROOT%
......
......@@ -151,6 +151,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam);
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......
......@@ -1162,20 +1162,11 @@ static int insertStmtReset(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
static int insertStmtExecute(STscStmt* stmt) {
static int insertStmtExecuteImpl(STscStmt* stmt, STableMetaInfo* pTableMetaInfo, bool schemaAttached) {
SSqlCmd* pCmd = &stmt->pSql->cmd;
if (pCmd->batchSize == 0) {
tscError("no records bind");
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind");
}
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) {
return TSDB_CODE_SUCCESS;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
stmt->pSql->cmd.insertParam.schemaAttached = schemaAttached ? 1 : 0;
if (pCmd->insertParam.pTableBlockHashList == NULL) {
pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
......@@ -1192,6 +1183,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pBlk->dataLen = 0;
pBlk->uid = pTableMeta->id.uid;
pBlk->tid = pTableMeta->id.tid;
pBlk->sversion = pTableMeta->sversion;
fillTablesColumnsNull(stmt->pSql);
......@@ -1213,9 +1205,55 @@ static int insertStmtExecute(STscStmt* stmt) {
tscBuildAndSendRequest(pSql, NULL);
return TSDB_CODE_SUCCESS;
}
static int insertStmtExecute(STscStmt* stmt) {
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &stmt->pSql->cmd;
SSqlObj* pSql = stmt->pSql;
if (pCmd->batchSize == 0) {
tscError("no records bind");
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind");
}
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) {
return code;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
code = insertStmtExecuteImpl(stmt, pTableMetaInfo, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
while (pSql->retry < pSql->maxRetry) {
if (pSql->res.code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->retry += 1;
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
code = insertStmtExecuteImpl(stmt, pTableMetaInfo, true);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
} else {
break;
}
}
}
stmt->numOfRows += pSql->res.numOfRows;
// data block reset
......@@ -1265,13 +1303,13 @@ static void insertBatchClean(STscStmt* pStmt) {
static int insertBatchStmtExecute(STscStmt* pStmt) {
int32_t code = 0;
if(pStmt->mtb.nameSet == false) {
tscError("0x%"PRIx64" no table name set", pStmt->pSql->self);
return invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "no table name set");
}
pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry
pStmt->pSql->retry = 0; // enable retry in case of reconfiguring table meta
if (taosHashGetSize(pStmt->pSql->cmd.insertParam.pTableBlockHashList) <= 0) { // merge according to vgId
tscError("0x%"PRIx64" no data block to insert", pStmt->pSql->self);
......
......@@ -431,7 +431,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pSql->self, taosMsg[pSql->cmd.msgType]);
......
......@@ -3500,7 +3500,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
SSqlObj* pSql = (SSqlObj*) tres;
assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code;
// set the flag in the parent sqlObj
......@@ -3508,9 +3508,9 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->cmd.insertParam.schemaAttached = 1;
}
}
if (!subAndCheckDone(tres, pParentObj, pSupporter->idx)) {
// concurrency problem, other thread already release pParentObj
// concurrency problem, other thread already release pParentObj
//tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, suppIdx, pParentObj->subState.numOfSub);
return;
}
......@@ -3567,6 +3567,15 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
}
pParentObj->res.code = TSDB_CODE_SUCCESS;
if (TSDB_QUERY_HAS_TYPE(pParentObj->cmd.insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
tscDebug("0x%"PRIx64" re-try stmt with same submit data, retry:%d", pParentObj->self, pParentObj->retry);
pParentObj->retry++;
tscRestoreTableDataBlocks(&pParentObj->cmd.insertParam);
tscMergeTableDataBlocks(pParentObj, &pParentObj->cmd.insertParam, false);
tscHandleMultivnodeInsert(pParentObj);
return;
}
tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
......
......@@ -2211,16 +2211,31 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam) {
STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
while (iter) {
STableDataBlocks* pOneTableBlock = *iter;
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter);
}
return TSDB_CODE_SUCCESS;
}
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType);
size_t initialSize = taosHashGetSize(pInsertParam->pTableBlockHashList);
initialSize = initialSize > 128 ? 128 : initialSize;
void* pVnodeDataBlockHashList = taosHashInit(initialSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
// alloc table name list.
size_t numOfTables = taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList) {
......@@ -2228,7 +2243,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
}
pInsertParam->pTableNameList = calloc(numOfTables, sizeof(SName*));
pInsertParam->numOfTables = (int32_t) numOfTables;
size_t tail = 0;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
......@@ -2236,10 +2251,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
STableDataBlocks* pOneTableBlock = *iter;
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter);
// extract table name list.
pInsertParam->pTableNameList[tail++] = tNameDup(&pOneTableBlock->tableName);
if (pBlocks->numOfRows > 0) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
......@@ -2320,17 +2335,15 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
// the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1;
pBlocks->numOfRows = 0;
} else {
tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname);
}
if (freeBlockMap) {
tscDestroyDataBlock(pSql, pOneTableBlock, false);
}
}
if (freeBlockMap) {
taosHashCleanup(pInsertParam->pTableBlockHashList);
pInsertParam->pTableBlockHashList = NULL;
......@@ -2355,7 +2368,7 @@ void tscCloseTscObj(void *param) {
tscReleaseRpc(pObj->pRpcObj);
pthread_mutex_destroy(&pObj->mutex);
tscReleaseClusterInfo(pObj->clusterId);
destroyDispatcherManager(pObj->dispatcherManager);
pObj->dispatcherManager = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册