未验证 提交 43a3ee06 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 5300 taosdemo stmt print for master (#6919)

* [TD-5300]<fix>: taosdemo stmt debug print.

* fix default iface is unknown.

* merge with TD-5300 fix against develop
上级 59f9aa8c
...@@ -5228,63 +5228,69 @@ static int32_t generateStbDataTail( ...@@ -5228,63 +5228,69 @@ static int32_t generateStbDataTail(
int64_t remainderBufLen, int64_t insertRows, int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime, uint64_t recordFrom, int64_t startTime,
int64_t *pSamplePos, int64_t *dataLen) { int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0; uint64_t len = 0;
char *pstr = buffer; char *pstr = buffer;
bool tsRand; bool tsRand;
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true; tsRand = true;
} else { } else {
tsRand = false; tsRand = false;
} }
verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n", verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n",
__func__, __LINE__, batch, remainderBufLen); __func__, __LINE__, batch, remainderBufLen);
int32_t k; int32_t k;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE); memset(data, 0, MAX_DATA_SIZE);
int64_t lenOfRow = 0; int64_t lenOfRow = 0;
if (tsRand) { if (tsRand) {
lenOfRow = generateStbRowData(superTblInfo, data, if (superTblInfo->disorderRatio > 0) {
startTime + getTSRandTail( lenOfRow = generateStbRowData(superTblInfo, data,
superTblInfo->timeStampStep, k, startTime + getTSRandTail(
superTblInfo->disorderRatio, superTblInfo->timeStampStep, k,
superTblInfo->disorderRange) superTblInfo->disorderRatio,
); superTblInfo->disorderRange)
} else { );
lenOfRow = getRowDataFromSample( } else {
data, lenOfRow = generateStbRowData(superTblInfo, data,
(remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE, startTime + superTblInfo->timeStampStep * k
startTime + superTblInfo->timeStampStep * k, );
superTblInfo, }
pSamplePos); } else {
} lenOfRow = getRowDataFromSample(
data,
(remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE,
startTime + superTblInfo->timeStampStep * k,
superTblInfo,
pSamplePos);
}
if ((lenOfRow + 1) > remainderBufLen) { if ((lenOfRow + 1) > remainderBufLen) {
break; break;
} }
pstr += snprintf(pstr , lenOfRow + 1, "%s", data); pstr += snprintf(pstr , lenOfRow + 1, "%s", data);
k++; k++;
len += lenOfRow; len += lenOfRow;
remainderBufLen -= lenOfRow; remainderBufLen -= lenOfRow;
verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n", verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer); __func__, __LINE__, len, k, buffer);
recordFrom ++; recordFrom ++;
if (recordFrom >= insertRows) { if (recordFrom >= insertRows) {
break; break;
}
} }
}
*dataLen = len; *dataLen = len;
return k; return k;
} }
...@@ -5381,52 +5387,52 @@ static int32_t generateStbInterlaceData( ...@@ -5381,52 +5387,52 @@ static int32_t generateStbInterlaceData(
int64_t startTime, int64_t startTime,
uint64_t *pRemainderBufLen) uint64_t *pRemainderBufLen)
{ {
assert(buffer); assert(buffer);
char *pstr = buffer; char *pstr = buffer;
int headLen = generateStbSQLHead( int headLen = generateStbSQLHead(
superTblInfo, superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, tableName, tableSeq, pThreadInfo->db_name,
pstr, *pRemainderBufLen); pstr, *pRemainderBufLen);
if (headLen <= 0) { if (headLen <= 0) {
return 0; return 0;
} }
// generate data buffer // generate data buffer
verbosePrint("[%d] %s() LN%d i=%"PRIu64" buffer:\n%s\n", verbosePrint("[%d] %s() LN%d i=%"PRIu64" buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer); pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen; pstr += headLen;
*pRemainderBufLen -= headLen; *pRemainderBufLen -= headLen;
int64_t dataLen = 0; int64_t dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n", verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl); i, batchPerTblTimes, batchPerTbl);
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision); startTime = taosGetTimestamp(pThreadInfo->time_precision);
} }
int32_t k = generateStbDataTail( int32_t k = generateStbDataTail(
superTblInfo, superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
if (k == batchPerTbl) { if (k == batchPerTbl) {
pstr += dataLen; pstr += dataLen;
*pRemainderBufLen -= dataLen; *pRemainderBufLen -= dataLen;
} else { } else {
debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n", debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n",
__func__, __LINE__, k, batchPerTbl); __func__, __LINE__, k, batchPerTbl);
pstr -= headLen; pstr -= headLen;
pstr[0] = '\0'; pstr[0] = '\0';
k = 0; k = 0;
} }
return k; return k;
} }
static int64_t generateInterlaceDataWithoutStb( static int64_t generateInterlaceDataWithoutStb(
...@@ -5874,7 +5880,7 @@ static int32_t prepareStbStmtInterlace( ...@@ -5874,7 +5880,7 @@ static int32_t prepareStbStmtInterlace(
stbInfo, stbInfo,
stmt, stmt,
tableName, tableName,
g_args.num_of_RPR, batch,
insertRows, 0, startTime, insertRows, 0, startTime,
pSamplePos); pSamplePos);
} }
...@@ -5892,7 +5898,7 @@ static int32_t prepareStbStmtProgressive( ...@@ -5892,7 +5898,7 @@ static int32_t prepareStbStmtProgressive(
stbInfo, stbInfo,
stmt, stmt,
tableName, tableName,
g_args.num_of_RPR, batch,
insertRows, recordFrom, startTime, insertRows, recordFrom, startTime,
pSamplePos); pSamplePos);
} }
...@@ -5908,29 +5914,29 @@ static int32_t generateStbProgressiveData( ...@@ -5908,29 +5914,29 @@ static int32_t generateStbProgressiveData(
uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos, uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen) int64_t *pRemainderBufLen)
{ {
assert(buffer != NULL); assert(buffer != NULL);
char *pstr = buffer; char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen); memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateStbSQLHead( int64_t headLen = generateStbSQLHead(
superTblInfo, superTblInfo,
tableName, tableSeq, dbName, tableName, tableSeq, dbName,
buffer, *pRemainderBufLen); buffer, *pRemainderBufLen);
if (headLen <= 0) { if (headLen <= 0) {
return 0; return 0;
} }
pstr += headLen; pstr += headLen;
*pRemainderBufLen -= headLen; *pRemainderBufLen -= headLen;
int64_t dataLen; int64_t dataLen;
return generateStbDataTail(superTblInfo, return generateStbDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen, g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, recordFrom, insertRows, recordFrom,
startTime, startTime,
pSamplePos, &dataLen); pSamplePos, &dataLen);
} }
static int32_t generateProgressiveDataWithoutStb( static int32_t generateProgressiveDataWithoutStb(
...@@ -5977,283 +5983,283 @@ static void printStatPerThread(threadInfo *pThreadInfo) ...@@ -5977,283 +5983,283 @@ static void printStatPerThread(threadInfo *pThreadInfo)
// sync write interlace data // sync write interlace data
static void* syncWriteInterlace(threadInfo *pThreadInfo) { static void* syncWriteInterlace(threadInfo *pThreadInfo) {
debugPrint("[%d] %s() LN%d: ### interlace write\n", debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows; int64_t insertRows;
uint32_t interlaceRows; uint32_t interlaceRows;
uint64_t maxSqlLen; uint64_t maxSqlLen;
int64_t nTimeStampStep; int64_t nTimeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if (superTblInfo) { if (superTblInfo) {
insertRows = superTblInfo->insertRows; insertRows = superTblInfo->insertRows;
if ((superTblInfo->interlaceRows == 0) if ((superTblInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) { && (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
} else {
interlaceRows = superTblInfo->interlaceRows;
}
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else { } else {
interlaceRows = superTblInfo->interlaceRows; insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = DEFAULT_TIMESTAMP_STEP;
insert_interval = g_args.insert_interval;
} }
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else {
insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = DEFAULT_TIMESTAMP_STEP;
insert_interval = g_args.insert_interval;
}
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from, pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows); pThreadInfo->ntables, insertRows);
if (interlaceRows > insertRows) if (interlaceRows > insertRows)
interlaceRows = insertRows; interlaceRows = insertRows;
if (interlaceRows > g_args.num_of_RPR) if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR; interlaceRows = g_args.num_of_RPR;
uint32_t batchPerTbl = interlaceRows; uint32_t batchPerTbl = interlaceRows;
uint32_t batchPerTblTimes; uint32_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = batchPerTblTimes =
g_args.num_of_RPR / interlaceRows; g_args.num_of_RPR / interlaceRows;
} else { } else {
batchPerTblTimes = 1; batchPerTblTimes = 1;
} }
pThreadInfo->buffer = calloc(maxSqlLen, 1); pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__, maxSqlLen, strerror(errno)); __func__, __LINE__, maxSqlLen, strerror(errno));
return NULL; return NULL;
} }
pThreadInfo->totalInsertRows = 0; pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0; pThreadInfo->totalAffectedRows = 0;
uint64_t st = 0; uint64_t st = 0;
uint64_t et = UINT64_MAX; uint64_t et = UINT64_MAX;
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
uint64_t endTs; uint64_t endTs;
uint64_t tableSeq = pThreadInfo->start_table_from; uint64_t tableSeq = pThreadInfo->start_table_from;
int64_t startTime = pThreadInfo->start_time; int64_t startTime = pThreadInfo->start_time;
uint64_t generatedRecPerTbl = 0; uint64_t generatedRecPerTbl = 0;
bool flagSleep = true; bool flagSleep = true;
uint64_t sleepTimeTotal = 0; uint64_t sleepTimeTotal = 0;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
flagSleep = false; flagSleep = false;
} }
// generate data // generate data
memset(pThreadInfo->buffer, 0, maxSqlLen); memset(pThreadInfo->buffer, 0, maxSqlLen);
uint64_t remainderBufLen = maxSqlLen; uint64_t remainderBufLen = maxSqlLen;
char *pstr = pThreadInfo->buffer; char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
uint32_t recOfBatch = 0; uint32_t recOfBatch = 0;
for (uint64_t i = 0; i < batchPerTblTimes; i ++) { for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer); free(pThreadInfo->buffer);
return NULL; return NULL;
} }
uint64_t oldRemainderLen = remainderBufLen; uint64_t oldRemainderLen = remainderBufLen;
int32_t generated; int32_t generated;
if (superTblInfo) { if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtInterlace( generated = prepareStbStmtInterlace(
superTblInfo, superTblInfo,
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
startTime, startTime,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
#else #else
generated = -1; generated = -1;
#endif #endif
} else { } else {
generated = generateStbInterlaceData( generated = generateStbInterlaceData(
superTblInfo, superTblInfo,
tableName, batchPerTbl, i, tableName, batchPerTbl, i,
batchPerTblTimes, batchPerTblTimes,
tableSeq, tableSeq,
pThreadInfo, pstr, pThreadInfo, pstr,
insertRows, insertRows,
startTime, startTime,
&remainderBufLen); &remainderBufLen);
} }
} else { } else {
if (g_args.iface == STMT_IFACE) { if (g_args.iface == STMT_IFACE) {
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
pThreadInfo->threadID, pThreadInfo->threadID,
__func__, __LINE__, __func__, __LINE__,
tableName, batchPerTbl, startTime); tableName, batchPerTbl, startTime);
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb( generated = prepareStmtWithoutStb(
pThreadInfo->stmt, tableName, pThreadInfo->stmt, tableName,
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
startTime); startTime);
#else #else
generated = -1; generated = -1;
#endif #endif
} else { } else {
generated = generateInterlaceDataWithoutStb( generated = generateInterlaceDataWithoutStb(
tableName, batchPerTbl, tableName, batchPerTbl,
tableSeq, tableSeq,
pThreadInfo->db_name, pstr, pThreadInfo->db_name, pstr,
insertRows, insertRows,
startTime, startTime,
&remainderBufLen); &remainderBufLen);
} }
} }
debugPrint("[%d] %s() LN%d, generated records is %d\n", debugPrint("[%d] %s() LN%d, generated records is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
if (generated < 0) { if (generated < 0) {
errorPrint("[%d] %s() LN%d, generated records is %d\n", errorPrint("[%d] %s() LN%d, generated records is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_of_interlace; goto free_of_interlace;
} else if (generated == 0) { } else if (generated == 0) {
break; break;
} }
tableSeq ++; tableSeq ++;
recOfBatch += batchPerTbl; recOfBatch += batchPerTbl;
pstr += (oldRemainderLen - remainderBufLen); pstr += (oldRemainderLen - remainderBufLen);
pThreadInfo->totalInsertRows += batchPerTbl; pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch); batchPerTbl, recOfBatch);
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table // turn to first table
tableSeq = pThreadInfo->start_table_from; tableSeq = pThreadInfo->start_table_from;
generatedRecPerTbl += batchPerTbl; generatedRecPerTbl += batchPerTbl;
startTime = pThreadInfo->start_time startTime = pThreadInfo->start_time
+ generatedRecPerTbl * nTimeStampStep; + generatedRecPerTbl * nTimeStampStep;
flagSleep = true; flagSleep = true;
if (generatedRecPerTbl >= insertRows) if (generatedRecPerTbl >= insertRows)
break; break;
int64_t remainRows = insertRows - generatedRecPerTbl; int64_t remainRows = insertRows - generatedRecPerTbl;
if ((remainRows > 0) && (batchPerTbl > remainRows)) if ((remainRows > 0) && (batchPerTbl > remainRows))
batchPerTbl = remainRows; batchPerTbl = remainRows;
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR) if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
break; break;
} }
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
generatedRecPerTbl, insertRows); generatedRecPerTbl, insertRows);
if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl) if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl)
break; break;
} }
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows); pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n", verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
startTs = taosGetTimestampMs(); startTs = taosGetTimestampMs();
if (recOfBatch == 0) { if (recOfBatch == 0) {
errorPrint("[%d] %s() LN%d Failed to insert records of batch %d\n", errorPrint("[%d] %s() LN%d Failed to insert records of batch %d\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl); batchPerTbl);
if (batchPerTbl > 0) { if (batchPerTbl > 0) {
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n", errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
batchPerTbl, maxSqlLen / batchPerTbl); batchPerTbl, maxSqlLen / batchPerTbl);
}
errorPrint("\tPlease check if the buffer length(%"PRId64") or batch(%d) is set with proper value!\n",
maxSqlLen, batchPerTbl);
goto free_of_interlace;
} }
errorPrint("\tPlease check if the buffer length(%"PRId64") or batch(%d) is set with proper value!\n", int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
maxSqlLen, batchPerTbl);
goto free_of_interlace;
}
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRIu64"ms\n", performancePrint("%s() LN%d, insert execution time is %"PRIu64"ms\n",
__func__, __LINE__, delay); __func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID, pThreadInfo->threadID,
__func__, __LINE__, affectedRows); __func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++; pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay; pThreadInfo->totalDelay += delay;
if (recOfBatch != affectedRows) { if (recOfBatch != affectedRows) {
errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n", errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, pThreadInfo->buffer); recOfBatch, affectedRows, pThreadInfo->buffer);
goto free_of_interlace; goto free_of_interlace;
} }
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n", printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
if ((insert_interval) && flagSleep) { if ((insert_interval) && flagSleep) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
if (insert_interval > (et - st) ) { if (insert_interval > (et - st) ) {
uint64_t sleepTime = insert_interval - (et -st); uint64_t sleepTime = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n", performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
__func__, __LINE__, sleepTime); __func__, __LINE__, sleepTime);
taosMsleep(sleepTime); // ms taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval; sleepTimeTotal += insert_interval;
} }
}
} }
}
free_of_interlace: free_of_interlace:
tmfree(pThreadInfo->buffer); tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo); printStatPerThread(pThreadInfo);
return NULL; return NULL;
} }
// sync insertion progressive data // sync insertion progressive data
...@@ -6420,29 +6426,29 @@ free_of_progressive: ...@@ -6420,29 +6426,29 @@ free_of_progressive:
static void* syncWrite(void *sarg) { static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint32_t interlaceRows; uint32_t interlaceRows;
if (superTblInfo) { if (superTblInfo) {
if ((superTblInfo->interlaceRows == 0) if ((superTblInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) { && (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
} else {
interlaceRows = superTblInfo->interlaceRows;
}
} else { } else {
interlaceRows = superTblInfo->interlaceRows; interlaceRows = g_args.interlace_rows;
} }
} else {
interlaceRows = g_args.interlace_rows;
}
if (interlaceRows > 0) { if (interlaceRows > 0) {
// interlace mode // interlace mode
return syncWriteInterlace(pThreadInfo); return syncWriteInterlace(pThreadInfo);
} else { } else {
// progressive mode // progressive mode
return syncWriteProgressive(pThreadInfo); return syncWriteProgressive(pThreadInfo);
} }
} }
static void callBack(void *param, TAOS_RES *res, int code) { static void callBack(void *param, TAOS_RES *res, int code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册