diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index aa60803dac740157e1a9f5229e524bca8401c4cd..61378c79c4b5c44ffa11ae9132aa6f8b89ab5f71 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols int dcol = 0; while (dcol < pCols->numOfCols) { + bool setCol = 0; SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= schemaNCols(pSchema)) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); @@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols STColumn *pRowCol = schemaColAt(pSchema, rcol); if (pRowCol->colId == pDataCol->colId) { void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + if(!isNull(value, pDataCol->type)) setCol = 1; dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dcol++; rcol++; } else if (pRowCol->colId < pDataCol->colId) { rcol++; } else { - if(forceSetNull) { + if(forceSetNull || setCol) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } dcol++; @@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo int nRowCols = kvRowNCols(row); while (dcol < pCols->numOfCols) { + bool setCol = 0; SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); @@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo if (colIdx->colId == pDataCol->colId) { void *value = tdGetKvRowDataOfCol(row, colIdx->offset); + if(!isNull(value, pDataCol->type)) setCol = 1; dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); ++dcol; ++rcol; } else if (colIdx->colId < pDataCol->colId) { ++rcol; } else { - if (forceSetNull) { + if(forceSetNull || setCol) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } ++dcol; @@ -518,7 +522,6 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b } } -//TODO: refactor this function to eliminate additional memory copy int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); @@ -534,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { - if (source->cols[j].len > 0) { + if (source->cols[j].len > 0 || target->cols[j].len > 0) { dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, target->maxPoints); } @@ -578,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i if (key1 < key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); - if (src1->cols[i].len > 0) { + if (src1->cols[i].len > 0 || target->cols[i].len > 0) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, target->maxPoints); } @@ -596,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, target->maxPoints); + } else if(target->cols[i].len > 0) { + dataColSetNullAt(&target->cols[i], target->numOfRows); } } target->numOfRows++; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 15fc3cc47d663aa6f2fb8910ebbadc03861418c7..03110487807076bf8ac2ac7026ffdb828ea4c7c6 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1418,13 +1418,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - bool isRowDel = false; SMemRow row = tsdbNextIterRow(pCommitIter->pIter); if (row == NULL || memRowKey(row) > maxKey) { key2 = INT64_MAX; } else { key2 = memRowKey(row); - isRowDel = memRowDeleted(row); } if (key1 == INT64_MAX && key2 == INT64_MAX) break; @@ -1439,36 +1437,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt pTarget->numOfRows++; (*iter)++; } else if (key1 > key2) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); } + tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + tSkipListIterNext(pCommitIter->pIter); } else { - if (update) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); - } - } else { - ASSERT(!isRowDel); - + if (update != TD_ROW_OVERWRITE_UPDATE) { + //copy disk data for (int i = 0; i < pDataCols->numOfCols; i++) { //TODO: dataColAppendVal may fail dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, pTarget->maxPoints); } - pTarget->numOfRows++; + if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; + } + if (update != TD_ROW_DISCARD_UPDATE) { + //copy mem data + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); } (*iter)++; tSkipListIterNext(pCommitIter->pIter);