diff --git a/documentation20/webdocs/markdowndocs/architecture-ch.md b/documentation20/webdocs/markdowndocs/architecture-ch.md index 7ab4b5d096b4676b205fa57175889f780df2cb45..a279875649503c64861f7b42b64741967f75aa69 100644 --- a/documentation20/webdocs/markdowndocs/architecture-ch.md +++ b/documentation20/webdocs/markdowndocs/architecture-ch.md @@ -162,7 +162,7 @@ Master Vnode遵循下面的写入流程:
图 3 TDengine Master写入流程
1. Master vnode收到应用的数据插入请求,验证OK,进入下一步; -2. 如果系统配置参数walLevel打开(设置为2),vnode将把该请求的原始数据包写入数据库日志文件WAL,以保证TDengine能够在断电等因素导致的服务重启时从数据库日志文件中恢复数据,避免数据的丢失; +2. 如果系统配置参数walLevel大于0,vnode将把该请求的原始数据包写入数据库日志文件WAL。如果walLevel设置为2,而且fsync设置为0,TDengine还将WAL数据立即落盘,以保证即使宕机,也能从数据库日志文件中恢复数据,避免数据的丢失; 3. 如果有多个副本,vnode将把数据包转发给同一虚拟节点组内slave vnodes, 该转发包带有数据的版本号(version); 4. 写入内存,并加记录加入到skip list; 5. Master vnode返回确认信息给应用,表示写入成功。 @@ -174,7 +174,7 @@ Master Vnode遵循下面的写入流程:
图 4 TDengine Slave写入流程
1. Slave vnode收到Master vnode转发了的数据插入请求。 -2. 如果系统配置参数walLevl设置为2,vnode将把该请求的原始数据包写入日志(WAL); +2. 如果系统配置参数walLevel大于0,vnode将把该请求的原始数据包写入数据库日志文件WAL。如果walLevel设置为2,而且fsync设置为0,TDengine还将WAL数据立即落盘,以保证即使宕机,也能从数据库日志文件中恢复数据,避免数据的丢失; 3. 写入内存,更新内存中的skip list。 与Master vnode相比,slave vnode不存在转发环节,也不存在回复确认环节,少了两步。但写内存与WAL是完全一样的。 diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 5ed80f41a8aa83b9b4ec0b151488c757fd51cbab..78544b9b99143fa988304fa04bc104786e6866bf 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -399,7 +399,7 @@ int tsParseSql(SSqlObj *pSql, bool initial); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); int tscProcessSql(SSqlObj *pSql); -int tscRenewTableMeta(SSqlObj *pSql, char *tableId); +int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex); void tscQueueAsyncRes(SSqlObj *pSql); void tscQueueAsyncError(void(*fp), void *param, int32_t code); @@ -414,7 +414,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscDestroyResPointerInfo(SSqlRes *pRes); -void tscResetSqlCmdObj(SSqlCmd *pCmd); +void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache); /** * free query result of the sql object diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 41aa1221601f8b1b2ef65b09539025d417812adb..650f101645e68f3b84ccbd25c82bf6696c041808 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -468,7 +468,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { tscDebug("%p redo parse sql string and proceed", pSql); pCmd->parseFinished = false; - tscResetSqlCmdObj(pCmd); + tscResetSqlCmdObj(pCmd, false); code = tsParseSql(pSql, true); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f214e91f457f541f8517fef3084611e092155cc9..7f8fd7f4feaad472db5e06744c7cea742e22c9c7 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1327,18 +1327,40 @@ int tsParseSql(SSqlObj *pSql, bool initial) { pSql->fetchFp = pSql->fp; pSql->fp = (void(*)())tscHandleMultivnodeInsert; } - + if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) { return ret; } - + + // make a backup as tsParseInsertSql may modify the string + char* sqlstr = strdup(pSql->sqlstr); ret = tsParseInsertSql(pSql); + if (sqlstr == NULL || pSql->retry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) { + free(sqlstr); + } else { + tscResetSqlCmdObj(pCmd, true); + free(pSql->sqlstr); + pSql->sqlstr = sqlstr; + pSql->retry++; + if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) { + ret = tsParseInsertSql(pSql); + } + } } else { SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr); ret = tscToSQLCmd(pSql, &SQLInfo); + if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->retry == 0 && SQLInfo.type == TSDB_SQL_NULL) { + tscResetSqlCmdObj(pCmd, true); + pSql->retry++; + ret = tscToSQLCmd(pSql, &SQLInfo); + } SQLInfoDestroy(&SQLInfo); } + if (ret == TSDB_CODE_SUCCESS) { + pSql->retry = 0; + } + /* * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function, * so do NOT use pRes->code to determine if the getTableMeta function diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1f042b59d6ae09edf8eddd0a450fe1becd6be033..16e3458e133980bf5e85ea85c25da893c2b55929 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -276,8 +276,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } } - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - int32_t cmd = pCmd->command; if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || @@ -302,7 +300,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosMsleep(duration); } - rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); + rpcMsg->code = tscRenewTableMeta(pSql, 0); // if there is an error occurring, proceed to the following error handling procedure. if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { @@ -2202,14 +2200,14 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create /** * retrieve table meta from mnode, and update the local table meta cache. * @param pSql sql object - * @param tableId table full name + * @param tableIndex table index * @return status code */ -int tscRenewTableMeta(SSqlObj *pSql, char *tableId) { +int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { SSqlCmd *pCmd = &pSql->cmd; SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; if (pTableMetaInfo->pTableMeta) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 1af53d3645cc9aba13d078f8c9796ff208ace4e8..9fa4db999f1257e09a40bfd9720f77e9da94ec3a 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -820,7 +820,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) { // must before clean the sqlcmd object - tscResetSqlCmdObj(&pSql->cmd); + tscResetSqlCmdObj(&pSql->cmd, false); SSqlCmd *pCmd = &pSql->cmd; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e1a0ff69f98b27661de8420588e33d4809eb3817..b45d40f49cdb30a7ff88f488e9a9b7354ce89b81 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -33,7 +33,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache); - SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) { +SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) { if (pTagCond->pCond == NULL) { return NULL; } @@ -294,7 +294,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free } -static void tscFreeQueryInfo(SSqlCmd* pCmd) { +static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) { if (pCmd == NULL || pCmd->numOfClause == 0) { return; } @@ -304,7 +304,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); freeQueryInfoImpl(pQueryInfo); - clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false); + clearAllTableMetaInfo(pQueryInfo, (const char*)addr, removeFromCache); taosTFree(pQueryInfo); } @@ -312,7 +312,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) { taosTFree(pCmd->pQueryInfo); } -void tscResetSqlCmdObj(SSqlCmd* pCmd) { +void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) { pCmd->command = 0; pCmd->numOfCols = 0; pCmd->count = 0; @@ -326,7 +326,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - tscFreeQueryInfo(pCmd); + tscFreeQueryInfo(pCmd, removeFromCache); } void tscFreeSqlResult(SSqlObj* pSql) { @@ -364,7 +364,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { taosTFree(pSql->pSubs); pSql->numOfSubs = 0; - tscResetSqlCmdObj(pCmd); + tscResetSqlCmdObj(pCmd, false); } void tscFreeSqlObj(SSqlObj* pSql) { diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index 3e9b077d3011d8bfc918f3ff976a98e943998c55..1ce5861e5219b77d6e580e6c81e86ad45a29307f 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -154,9 +154,14 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { // todo refactor to more generic int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { - int32_t v = *(int32_t *)value; - int32_t index = -1; + int32_t v = 0; + switch(pBucket->type) { + case TSDB_DATA_TYPE_SMALLINT: v = *(int16_t*) value; break; + case TSDB_DATA_TYPE_TINYINT: v = *(int8_t*) value; break; + default: v = *(int32_t*) value;break; + } + int32_t index = -1; if (pBucket->range.iMaxVal == INT32_MIN) { /* * taking negative integer into consideration, diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 7588e03e17cd04afa615adc7274dd363421f8112..c0a8fd1f00ade2642b1f23f6f1ef3cc60dfe615b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -693,7 +693,7 @@ class DbConnRest(DbConn): def __init__(self): super().__init__() self._type = self.TYPE_REST - self._url = "http://localhost:6020/rest/sql" # fixed for now + self._url = "http://localhost:6041/rest/sql" # fixed for now self._result = None def openByType(self): # Open connection @@ -1306,6 +1306,7 @@ class DbManager(): "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") sys.exit(2) else: + print("Failed to connect to DB, errno = {}, msg: {}".format(Helper.convertErrno(err.errno), err.msg)) raise except BaseException: print("[=] Unexpected exception") @@ -1910,10 +1911,19 @@ class TaskReadData(StateTransitionTask): # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable 'sum(speed)', 'stddev(speed)', + # SELECTOR functions 'min(speed)', 'max(speed)', 'first(speed)', - 'last(speed)']) # TODO: add more from 'top' + 'last(speed)', + # 'top(speed)', # TODO: not supported? + # 'bottom(speed)', # TODO: not supported? + # 'percentile(speed, 10)', # TODO: TD-1316 + 'last_row(speed)', + # Transformation Functions + # 'diff(speed)', # TODO: no supported?! + 'spread(speed)' + ]) # TODO: add more from 'top' filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions None ]) @@ -2350,7 +2360,7 @@ class ServiceManagerThread: self._thread2.start() # wait for service to start - for i in range(0, 10): + for i in range(0, 100): time.sleep(1.0) # self.procIpcBatch() # don't pump message during start up print("_zz_", end="", flush=True) @@ -2358,7 +2368,7 @@ class ServiceManagerThread: logger.info("[] TDengine service READY to process requests") return # now we've started # TODO: handle this better? - self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output + self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output raise RuntimeError("TDengine service did not start successfully") def stop(self): @@ -2768,7 +2778,7 @@ class MainExec: try: ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside except requests.exceptions.ConnectionError as err: - logger.warning("Failed to open REST connection to DB") + logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage())) # don't raise return ret