tscParseInsert.c 43.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

#pragma GCC diagnostic ignored "-Woverflow"
S
slguan 已提交
22
#pragma GCC diagnostic ignored "-Wunused-variable"
H
hzcheng 已提交
23

S
slguan 已提交
24
#include "os.h"
25
#include "ihash.h"
H
hzcheng 已提交
26 27 28 29 30 31 32 33 34 35 36
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsqldef.h"
#include "ttypes.h"

#include "tlog.h"
#include "tstoken.h"
#include "ttime.h"

S
slguan 已提交
37
enum {
S
slguan 已提交
38 39 40 41
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

S
slguan 已提交
42
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize);
H
hzcheng 已提交
43

S
slguan 已提交
44 45 46 47
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
  int32_t numType = isValidNumber(pToken);
  if (TK_ILLEGAL == numType) {
    return numType;
H
hzcheng 已提交
48 49 50 51 52 53 54
  }

  int32_t radix = 10;
  if (numType == TK_HEX) {
    radix = 16;
  } else if (numType == TK_OCT) {
    radix = 8;
S
slguan 已提交
55 56
  } else if (numType == TK_BIN) {
    radix = 2;
H
hzcheng 已提交
57 58
  }

L
lihui 已提交
59
  errno = 0;
S
slguan 已提交
60 61 62
  *value = strtoll(pToken->z, endPtr, radix);

  return numType;
H
hzcheng 已提交
63 64
}

S
slguan 已提交
65 66 67 68 69
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
  int32_t numType = isValidNumber(pToken);
  if (TK_ILLEGAL == numType) {
    return numType;
  }
L
lihui 已提交
70 71

  errno = 0;
S
slguan 已提交
72 73 74
  *value = strtod(pToken->z, endPtr);
  return numType;
}
H
hzcheng 已提交
75

S
slguan 已提交
76
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
F
fang 已提交
77 78
  //char *    token; //fang not used
  //int       tokenlen; //fang not used
S
slguan 已提交
79 80 81 82 83
  int32_t   index = 0;
  SSQLToken sToken;
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
84

S
slguan 已提交
85
  index = 0;
H
hzcheng 已提交
86

S
slguan 已提交
87
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
88
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
89
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
90
    // do nothing
S
slguan 已提交
91 92
  } else if (pToken->type == TK_INTEGER) {
    useconds = str2int64(pToken->z);
H
hzcheng 已提交
93 94
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
S
slguan 已提交
95
    if (taosParseTime(pToken->z, time, pToken->n, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
96
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
97 98 99 100 101
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
102 103 104 105
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
106 107 108 109 110 111 112 113
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
114 115 116
   * time expression:
   * e.g., now+12a, now-5h
   */
S
slguan 已提交
117 118 119 120
  SSQLToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
  pTokenEnd += index;
H
hjxilinx 已提交
121
  
S
slguan 已提交
122
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
H
hjxilinx 已提交
123
    
S
slguan 已提交
124 125 126
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
    pTokenEnd += index;
H
hjxilinx 已提交
127
    
S
slguan 已提交
128
    if (valueToken.n < 2) {
H
hjxilinx 已提交
129
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
130 131
    }

S
slguan 已提交
132
    if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
133 134
      return TSDB_CODE_INVALID_SQL;
    }
H
hjxilinx 已提交
135
    
H
hzcheng 已提交
136 137 138 139
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
140
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
141 142 143 144 145 146 147 148 149 150 151 152
      useconds += interval;
    } else {
      useconds = (useconds >= interval) ? useconds - interval : 0;
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
153 154 155 156
int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
                             int16_t timePrec) {
  int64_t iv;
  int32_t numType;
S
slguan 已提交
157
  char *  endptr = NULL;
H
hjxilinx 已提交
158 159
  errno = 0;   // clear the previous existed error information
  
H
hzcheng 已提交
160 161
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
S
slguan 已提交
162 163
      if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
S
slguan 已提交
164
          *(uint8_t *)payload = TSDB_TRUE;
S
slguan 已提交
165
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
S
slguan 已提交
166
          *(uint8_t *)payload = TSDB_FALSE;
S
slguan 已提交
167 168
        } else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
          *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
H
hzcheng 已提交
169
        } else {
H
hjxilinx 已提交
170
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
171
        }
S
slguan 已提交
172 173 174 175 176 177 178 179 180
      } else if (pToken->type == TK_INTEGER) {
        iv = strtoll(pToken->z, NULL, 10);
        *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_FLOAT) {
        double dv = strtod(pToken->z, NULL);
        *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_NULL) {
        *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
      } else {
H
hjxilinx 已提交
181
        return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
182 183 184 185
      }
      break;
    }
    case TSDB_DATA_TYPE_TINYINT:
S
slguan 已提交
186 187 188 189 190
      if (pToken->type == TK_NULL) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
191
      } else {
S
slguan 已提交
192
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
193
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
194
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
L
lihui 已提交
195
        } else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
H
hjxilinx 已提交
196
          return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
H
hzcheng 已提交
197 198
        }

H
hjxilinx 已提交
199
        *((int8_t *)payload) = (int8_t) iv;
H
hzcheng 已提交
200 201 202 203 204
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
S
slguan 已提交
205 206 207 208 209
      if (pToken->type == TK_NULL) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
210
      } else {
S
slguan 已提交
211
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
212
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
213
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
L
lihui 已提交
214
        } else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
H
hjxilinx 已提交
215
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
216 217
        }

S
slguan 已提交
218
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
219 220 221 222
      }
      break;

    case TSDB_DATA_TYPE_INT:
S
slguan 已提交
223 224 225 226
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
227
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
228
      } else {
S
slguan 已提交
229
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
230
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
231
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
L
lihui 已提交
232
        } else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
H
hjxilinx 已提交
233
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
234 235
        }

S
slguan 已提交
236
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
237 238 239 240 241
      }

      break;

    case TSDB_DATA_TYPE_BIGINT:
S
slguan 已提交
242 243 244 245
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
246
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
247
      } else {
S
slguan 已提交
248
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
249
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
250
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
L
lihui 已提交
251
        } else if (errno == ERANGE || iv > INT64_MAX || iv <= INT64_MIN) {
H
hjxilinx 已提交
252
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
253
        }
S
slguan 已提交
254 255

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
256 257 258 259
      }
      break;

    case TSDB_DATA_TYPE_FLOAT:
S
slguan 已提交
260 261 262 263
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
264
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
265
      } else {
S
slguan 已提交
266 267
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
268
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
269 270 271 272
        }

        float fv = (float)dv;
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
H
hjxilinx 已提交
273
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
274 275
        }

S
slguan 已提交
276 277
        if (isinf(fv) || isnan(fv)) {
          *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
278
        }
S
slguan 已提交
279 280

        *((float *)payload) = fv;
H
hzcheng 已提交
281 282 283 284
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
S
slguan 已提交
285 286 287 288
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
289
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
290
      } else {
S
slguan 已提交
291 292
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
293
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
294 295
        }

S
slguan 已提交
296
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
H
hjxilinx 已提交
297
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
298 299 300 301 302 303
        }

        if (isinf(dv) || isnan(dv)) {
          *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
        } else {
          *((double *)payload) = dv;
H
hzcheng 已提交
304 305 306 307 308
        }
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
309 310
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
311
        *payload = TSDB_DATA_BINARY_NULL;
H
hjxilinx 已提交
312
      } else { // too long values will return invalid sql, not be truncated automatically
S
slguan 已提交
313
        if (pToken->n > pSchema->bytes) {
H
hjxilinx 已提交
314
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
315
        }
H
hjxilinx 已提交
316
        
S
slguan 已提交
317
        strncpy(payload, pToken->z, pToken->n);
H
hzcheng 已提交
318 319 320 321 322
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
323
      if (pToken->type == TK_NULL) {
S
slguan 已提交
324
        *(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
H
hzcheng 已提交
325
      } else {
S
slguan 已提交
326 327
        // if the converted output len is over than pSchema->bytes, return error: 'Argument list too long'
        if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) {
H
hjxilinx 已提交
328 329 330 331
          char buf[512] = {0};
          snprintf(buf, 512, "%s", strerror(errno));
          
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
332 333 334 335 336
        }
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
337
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
338
        if (primaryKey) {
S
slguan 已提交
339
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
340
        } else {
S
slguan 已提交
341
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
342 343
        }
      } else {
S
slguan 已提交
344 345
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
346
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
347
        }
H
hjxilinx 已提交
348
        
S
slguan 已提交
349
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
350 351 352 353 354 355
      }

      break;
    }
  }

H
hjxilinx 已提交
356
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
357 358
}

S
slguan 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == 0) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
379 380
      return -1;  // client time/server time can not be mixed
      
S
slguan 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
394
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
L
[1292]  
lihui 已提交
395
                      int16_t timePrec, int32_t *code, char* tmpTokenBuf) {
S
slguan 已提交
396
  int32_t   index = 0;
F
fang 已提交
397
  //bool      isPrevOptr; //fang, never used
H
hjxilinx 已提交
398
  SSQLToken sToken = {0};
S
slguan 已提交
399
  char *    payload = pDataBlocks->pData + pDataBlocks->size;
S
slguan 已提交
400

S
slguan 已提交
401
  // 1. set the parsed value from sql string
H
hzcheng 已提交
402
  int32_t rowSize = 0;
403
  for (int i = 0; i < spd->numOfAssignedCols; ++i) {
S
slguan 已提交
404
    // the start position in data block buffer of current value in sql
S
slguan 已提交
405
    char *  start = payload + spd->elems[i].offset;
H
hzcheng 已提交
406
    int16_t colIndex = spd->elems[i].colIndex;
S
slguan 已提交
407 408
    SSchema *pSchema = schema + colIndex;
    rowSize += pSchema->bytes;
H
hzcheng 已提交
409

S
slguan 已提交
410 411 412 413 414 415 416 417 418
    index = 0;
    sToken = tStrGetToken(*str, &index, true, 0, NULL);
    *str += index;

    if (sToken.type == TK_QUESTION) {
      uint32_t offset = start - pDataBlocks->pData;
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
H
hjxilinx 已提交
419
      
S
slguan 已提交
420
      strcpy(error, "client out of memory");
L
[1292]  
lihui 已提交
421
      *code = TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
422 423 424
      return -1;
    }

S
slguan 已提交
425 426 427
    if (((sToken.type != TK_NOW) && (sToken.type != TK_INTEGER) && (sToken.type != TK_STRING) &&
         (sToken.type != TK_FLOAT) && (sToken.type != TK_BOOL) && (sToken.type != TK_NULL)) ||
        (sToken.n == 0) || (sToken.type == TK_RP)) {
H
hjxilinx 已提交
428
      tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z);
L
[1292]  
lihui 已提交
429
      *code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
430
      return -1;
H
hzcheng 已提交
431 432
    }

S
slguan 已提交
433 434
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
435 436 437 438
      // delete escape character: \\, \', \"
      char delim = sToken.z[0];
      int32_t cnt = 0;
      int32_t j = 0;
F
fang 已提交
439 440 441
      for (int32_t k = 1; k < sToken.n - 1; ++k) {  
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
442 443 444 445 446 447 448 449
            cnt++;
            continue;
          }
        }
      
        tmpTokenBuf[j] = sToken.z[i];
        j++;
      }
L
lihui 已提交
450
      tmpTokenBuf[j] = 0; 
L
[1292]  
lihui 已提交
451 452
      sToken.z = tmpTokenBuf;
      sToken.n -= 2 + cnt;    
H
hzcheng 已提交
453 454
    }

S
slguan 已提交
455 456
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
    int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
S
slguan 已提交
457
    if (ret != TSDB_CODE_SUCCESS) {
L
[1292]  
lihui 已提交
458
      *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
459 460
      return -1;  // NOTE: here 0 mean error!
    }
461

S
slguan 已提交
462
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
463
      tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
L
[1292]  
lihui 已提交
464
      *code = TSDB_CODE_INVALID_TIME_STAMP;
S
slguan 已提交
465
      return -1;
466
    }
H
hzcheng 已提交
467 468
  }

S
slguan 已提交
469
  // 2. set the null value for the columns that do not assign values
470
  if (spd->numOfAssignedCols < spd->numOfCols) {
S
slguan 已提交
471
    char *ptr = payload;
H
hzcheng 已提交
472 473

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
H
hjxilinx 已提交
474
      if (!spd->hasVal[i]) { // current column do not have any value to insert, set it to null
H
hzcheng 已提交
475 476 477 478 479 480 481 482 483 484 485 486
        setNull(ptr, schema[i].type, schema[i].bytes);
      }

      ptr += schema[i].bytes;
    }

    rowSize = ptr - payload;
  }

  return rowSize;
}

S
slguan 已提交
487 488 489 490 491 492 493 494 495
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
496 497
}

S
slguan 已提交
498
int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMeta, int maxRows,
L
[1292]  
lihui 已提交
499
                  SParsedDataColInfo *spd, char *error, int32_t *code, char* tmpTokenBuf) {
S
slguan 已提交
500 501
  int32_t   index = 0;
  SSQLToken sToken;
H
hzcheng 已提交
502 503 504

  int16_t numOfRows = 0;

S
slguan 已提交
505 506 507 508
  SSchema *pSchema = tsGetSchema(pMeterMeta);
  int32_t  precision = pMeterMeta->precision;

  if (spd->hasVal[0] == false) {
S
slguan 已提交
509
    strcpy(error, "primary timestamp column can not be null");
L
[1292]  
lihui 已提交
510
    *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
511 512 513 514
    return -1;
  }

  while (1) {
S
slguan 已提交
515 516 517
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
518

S
slguan 已提交
519
    *str += index;
S
slguan 已提交
520
    if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) {
S
slguan 已提交
521
      int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize);
H
hjxilinx 已提交
522
      if (0 == tSize) {  //TODO pass the correct error code to client
S
slguan 已提交
523
        strcpy(error, "client out of memory");
L
[1292]  
lihui 已提交
524
        *code = TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
525 526
        return -1;
      }
H
hjxilinx 已提交
527
      
S
slguan 已提交
528
      maxRows += tSize;
H
hzcheng 已提交
529 530
    }

L
[1292]  
lihui 已提交
531
    int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
H
hjxilinx 已提交
532
    if (len <= 0) { // error message has been set in tsParseOneRowData
H
hzcheng 已提交
533 534 535 536 537
      return -1;
    }

    pDataBlock->size += len;

S
slguan 已提交
538 539 540 541
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
hjxilinx 已提交
542
      tscInvalidSQLErrMsg(error, ") expected", *str);
L
[1292]  
lihui 已提交
543
      *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
544 545 546 547 548 549 550 551
      return -1;
    }

    numOfRows++;
  }

  if (numOfRows <= 0) {
    strcpy(error, "no any data points");
L
[1292]  
lihui 已提交
552
    *code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
553 554 555
    return -1;
  } else {
    return numOfRows;
H
hzcheng 已提交
556 557 558
  }
}

S
slguan 已提交
559
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) {
H
hzcheng 已提交
560
  spd->numOfCols = numOfCols;
561
  spd->numOfAssignedCols = numOfCols;
H
hzcheng 已提交
562 563 564 565 566 567 568 569 570 571 572

  for (int32_t i = 0; i < numOfCols; ++i) {
    spd->hasVal[i] = true;
    spd->elems[i].colIndex = i;

    if (i > 0) {
      spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes;
    }
  }
}

S
slguan 已提交
573
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) {
S
slguan 已提交
574
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
575
  const int factor = 5;
S
slguan 已提交
576
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
H
hzcheng 已提交
577 578

  // expand the allocated size
S
slguan 已提交
579 580
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
581
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
582 583
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
584

S
slguan 已提交
585 586 587 588 589
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
S
slguan 已提交
590
      //assert(false);
S
slguan 已提交
591
      // do nothing
S
slguan 已提交
592 593
      pDataBlock->nAllocSize = nAllocSizeOld;
      return 0;
S
slguan 已提交
594
    }
H
hzcheng 已提交
595 596
  }

S
slguan 已提交
597
  return (int32_t)(pDataBlock->nAllocSize - pDataBlock->size) / rowSize;
H
hzcheng 已提交
598 599
}

S
slguan 已提交
600 601 602 603 604
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterMeta, int32_t numOfRows) {
  pBlocks->sid = pMeterMeta->sid;
  pBlocks->uid = pMeterMeta->uid;
  pBlocks->sversion = pMeterMeta->sversion;
  pBlocks->numOfRows += numOfRows;
H
hzcheng 已提交
605 606
}

S
slguan 已提交
607 608
// data block is disordered, sort it in ascending order
void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
S
slguan 已提交
609
  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)dataBuf->pData;
S
slguan 已提交
610 611 612 613

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);

S
slguan 已提交
614 615 616 617 618
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
619
  if (!dataBuf->ordered) {
S
slguan 已提交
620 621
    char *pBlockData = pBlocks->payLoad;
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
622

S
slguan 已提交
623 624
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
625

S
slguan 已提交
626
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
627 628
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
629

S
slguan 已提交
630 631 632 633
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
634

S
slguan 已提交
635 636 637 638 639 640 641 642 643
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
644

S
slguan 已提交
645
    pBlocks->numOfRows = i + 1;
S
slguan 已提交
646
    dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
647
  }
S
slguan 已提交
648 649 650 651
}

static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
                                      int32_t *totalNum) {
S
slguan 已提交
652 653 654
  SSqlCmd *       pCmd = &pSql->cmd;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
  SMeterMeta *    pMeterMeta = pMeterMetaInfo->pMeterMeta;
S
slguan 已提交
655 656 657

  STableDataBlocks *dataBuf =
      tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
S
slguan 已提交
658
                              sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name);
S
slguan 已提交
659

H
hzcheng 已提交
660
  int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize);
S
slguan 已提交
661 662 663
  if (0 == maxNumOfRows) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
L
[1292]  
lihui 已提交
664 665 666 667 668 669 670 671 672
  
  int32_t code = TSDB_CODE_INVALID_SQL;
  char*   tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
  if (NULL == tmpTokenBuf) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
  
  int32_t numOfRows = tsParseValues(str, dataBuf, pMeterMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
  free(tmpTokenBuf);
H
hzcheng 已提交
673
  if (numOfRows <= 0) {
L
[1292]  
lihui 已提交
674
    return code;
H
hzcheng 已提交
675 676
  }

S
slguan 已提交
677 678 679 680 681 682 683 684
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
    SParamInfo* param = dataBuf->params + i;
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
      param->offset -= sizeof(SShellSubmitBlock);
    }
  }

S
slguan 已提交
685
  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData);
H
hzcheng 已提交
686
  tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
S
slguan 已提交
687 688 689

  dataBuf->vgid = pMeterMeta->vgid;
  dataBuf->numOfMeters = 1;
H
hzcheng 已提交
690 691

  /*
S
slguan 已提交
692 693
   * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
   * which is actually returned from server.
H
hzcheng 已提交
694
   */
S
slguan 已提交
695
  *totalNum += numOfRows;
H
hzcheng 已提交
696 697 698
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
699
static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
S
slguan 已提交
700 701 702 703
  int32_t   index = 0;
  SSQLToken sToken;
  SSQLToken tableToken;
  int32_t   code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
704

S
slguan 已提交
705 706
  SSqlCmd *       pCmd = &pSql->cmd;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
H
hzcheng 已提交
707

S
slguan 已提交
708 709 710 711 712
  char *sql = *sqlstr;
  // get the token of specified table
  index = 0;
  tableToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;
H
hzcheng 已提交
713

S
slguan 已提交
714 715
  char *cstart = NULL;
  char *cend = NULL;
H
hzcheng 已提交
716

S
slguan 已提交
717 718 719 720 721
  // skip possibly exists column list
  index = 0;
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;

H
hzcheng 已提交
722
  int32_t numOfColList = 0;
S
slguan 已提交
723
  bool    createTable = false;
H
hzcheng 已提交
724

S
slguan 已提交
725 726 727
  if (sToken.type == TK_LP) {
    cstart = &sToken.z[0];
    index = 0;
H
hzcheng 已提交
728
    while (1) {
S
slguan 已提交
729 730 731
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      if (sToken.type == TK_RP) {
        cend = &sToken.z[0];
H
hzcheng 已提交
732 733 734 735 736 737
        break;
      }

      ++numOfColList;
    }

S
slguan 已提交
738 739
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
H
hzcheng 已提交
740 741 742 743 744 745
  }

  if (numOfColList == 0 && cstart != NULL) {
    return TSDB_CODE_INVALID_SQL;
  }

H
hjxilinx 已提交
746
  if (sToken.type == TK_USING) { // create table if not exists
S
slguan 已提交
747 748 749 750
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;

S
slguan 已提交
751
    STagData *pTag = (STagData *)pCmd->payload;
S
slguan 已提交
752
    memset(pTag, 0, sizeof(STagData));
S
slguan 已提交
753
    setMeterID(pSql, &sToken, 0);
H
hzcheng 已提交
754

S
slguan 已提交
755 756
    strncpy(pTag->name, pMeterMetaInfo->name, TSDB_METER_ID_LEN);
    code = tscGetMeterMeta(pSql, pTag->name, 0);
H
hzcheng 已提交
757 758 759 760
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

S
slguan 已提交
761
    if (!UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
H
hjxilinx 已提交
762
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
763 764 765
    }

    char *   tagVal = pTag->data;
S
slguan 已提交
766
    SSchema *pTagSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
767

S
slguan 已提交
768 769 770 771
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.type != TK_TAGS) {
H
hjxilinx 已提交
772
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sql);
H
hzcheng 已提交
773 774
    }

S
slguan 已提交
775 776 777
    int32_t  numOfTagValues = 0;
    uint32_t ignoreTokenTypes = TK_LP;
    uint32_t numOfIgnoreToken = 1;
H
hzcheng 已提交
778
    while (1) {
S
slguan 已提交
779 780 781 782
      index = 0;
      sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
      sql += index;
      if (sToken.n == 0) {
H
hzcheng 已提交
783
        break;
S
slguan 已提交
784 785 786
      } else if (sToken.type == TK_RP) {
        break;
      }
H
hzcheng 已提交
787

S
slguan 已提交
788 789 790 791
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
792 793
      }

S
slguan 已提交
794 795
      code = tsParseOneColumnData(&pTagSchema[numOfTagValues], &sToken, tagVal, pCmd->payload, &sql, false,
                                  pMeterMetaInfo->pMeterMeta->precision);
H
hzcheng 已提交
796
      if (code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
797
        return code;
H
hzcheng 已提交
798 799
      }

S
slguan 已提交
800
      if ((pTagSchema[numOfTagValues].type == TSDB_DATA_TYPE_BINARY ||
H
hjxilinx 已提交
801 802
           pTagSchema[numOfTagValues].type == TSDB_DATA_TYPE_NCHAR) && sToken.n > pTagSchema[numOfTagValues].bytes) {
        return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z);
S
slguan 已提交
803 804
      }

H
hzcheng 已提交
805 806 807
      tagVal += pTagSchema[numOfTagValues++].bytes;
    }

S
slguan 已提交
808
    if (numOfTagValues != pMeterMetaInfo->pMeterMeta->numOfTags) {
H
hjxilinx 已提交
809
      return tscInvalidSQLErrMsg(pCmd->payload, "number of tags mismatch", sql);
H
hzcheng 已提交
810 811 812
    }

    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
813
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", sql);
H
hzcheng 已提交
814 815
    }

S
slguan 已提交
816
    int32_t ret = setMeterID(pSql, &tableToken, 0);
H
hzcheng 已提交
817 818 819 820 821
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

    createTable = true;
S
slguan 已提交
822
    code = tscGetMeterMetaEx(pSql, pMeterMetaInfo->name, true);
H
hzcheng 已提交
823 824 825 826
  } else {
    if (cstart != NULL) {
      sql = cstart;
    } else {
S
slguan 已提交
827
      sql = sToken.z;
H
hzcheng 已提交
828
    }
S
slguan 已提交
829
    code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
H
hzcheng 已提交
830 831 832 833 834
  }

  int32_t len = cend - cstart + 1;
  if (cstart != NULL && createTable == true) {
    /* move the column list to start position of the next accessed points */
W
WangXin 已提交
835
    memmove(sql - len, cstart, len);
H
hzcheng 已提交
836 837 838 839 840 841 842 843
    *sqlstr = sql - len;
  } else {
    *sqlstr = sql;
  }

  return code;
}

S
slguan 已提交
844
int validateTableName(char *tblName, int len) {
H
huili 已提交
845
  char buf[TSDB_METER_ID_LEN] = {0};
S
slguan 已提交
846
  strncpy(buf, tblName, len);
S
slguan 已提交
847

S
slguan 已提交
848
  SSQLToken token = {.n = len, .type = TK_ID, .z = buf};
H
huili 已提交
849
  tSQLGetToken(buf, &token.type);
S
slguan 已提交
850

H
huili 已提交
851 852 853
  return tscValidateName(&token);
}

H
hzcheng 已提交
854 855 856 857 858 859 860 861 862
/**
 * usage: insert into table1 values() () table2 values()()
 *
 * @param str
 * @param acct
 * @param db
 * @param pSql
 * @return
 */
H
hjxilinx 已提交
863
int doParserInsertSql(SSqlObj *pSql, char *str) {
S
slguan 已提交
864
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
865 866
  
  int32_t code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
867
  int32_t totalNum = 0;
H
hzcheng 已提交
868

S
slguan 已提交
869
  SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd);
H
hzcheng 已提交
870

S
slguan 已提交
871
  if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
872 873 874
    return code;
  }

H
hjxilinx 已提交
875
  void *pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt);
H
hzcheng 已提交
876 877 878 879 880

  pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
  tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks);

  while (1) {
S
slguan 已提交
881 882 883
    int32_t index = 0;
    SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL);
    if (sToken.n == 0) { // parse file, do not release the STableDataBlock
S
slguan 已提交
884 885 886 887 888
      if (pCmd->isInsertFromFile == 1) {
        goto _clean;
      }

      if (totalNum > 0) {
H
hzcheng 已提交
889 890 891 892 893 894 895
        break;
      } else {  // no data in current sql string, error
        code = TSDB_CODE_INVALID_SQL;
        goto _error_clean;
      }
    }

S
slguan 已提交
896
    // Check if the table name available or not
S
slguan 已提交
897
    if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
898
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
H
huili 已提交
899 900 901
      goto _error_clean;
    }

H
hjxilinx 已提交
902
    //TODO refactor
S
slguan 已提交
903
    if ((code = setMeterID(pSql, &sToken, 0)) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
904 905 906
      goto _error_clean;
    }

S
slguan 已提交
907
    void *fp = pSql->fp;
H
hzcheng 已提交
908 909 910 911 912 913 914 915 916 917 918 919 920
    if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) {
      if (fp != NULL) {
        goto _clean;
      } else {
        /*
         * for async insert, the free data block operations, which is tscDestroyBlockArrayList,
         * must be executed before launch another threads to get metermeta, since the
         * later ops may manipulate SSqlObj through another thread in getMeterMetaCallback function.
         */
        goto _error_clean;
      }
    }

S
slguan 已提交
921
    if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
H
hjxilinx 已提交
922
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
H
hzcheng 已提交
923 924 925
      goto _error_clean;
    }

S
slguan 已提交
926 927 928 929
    index = 0;
    sToken = tStrGetToken(str, &index, false, 0, NULL);
    str += index;
    if (sToken.n == 0) {
H
hjxilinx 已提交
930
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
hzcheng 已提交
931 932 933
      goto _error_clean;
    }

S
slguan 已提交
934 935 936
    if (sToken.type == TK_VALUES) {
      SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns};
      SSchema *          pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
937

S
slguan 已提交
938
      tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns);
H
hzcheng 已提交
939 940 941 942 943

      if (pCmd->isInsertFromFile == -1) {
        pCmd->isInsertFromFile = 0;
      } else {
        if (pCmd->isInsertFromFile == 1) {
H
hjxilinx 已提交
944
          code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
H
hzcheng 已提交
945 946 947 948 949 950 951 952
          goto _error_clean;
        }
      }

      /*
       * app here insert data in different vnodes, so we need to set the following
       * data in another submit procedure using async insert routines
       */
S
slguan 已提交
953
      code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum);
H
hzcheng 已提交
954 955 956 957
      if (code != TSDB_CODE_SUCCESS) {
        goto _error_clean;
      }

S
slguan 已提交
958
    } else if (sToken.type == TK_FILE) {
H
hzcheng 已提交
959 960 961 962
      if (pCmd->isInsertFromFile == -1) {
        pCmd->isInsertFromFile = 1;
      } else {
        if (pCmd->isInsertFromFile == 0) {
H
hjxilinx 已提交
963
          code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
H
hzcheng 已提交
964 965 966 967
          goto _error_clean;
        }
      }

S
slguan 已提交
968 969 970 971
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
972
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
H
hzcheng 已提交
973 974 975
        goto _error_clean;
      }

S
slguan 已提交
976
      char fname[PATH_MAX] = {0};
S
slguan 已提交
977
      strncpy(fname, sToken.z, sToken.n);
S
slguan 已提交
978
      strdequote(fname);
979

H
hzcheng 已提交
980 981
      wordexp_t full_path;
      if (wordexp(fname, &full_path, 0) != 0) {
H
hjxilinx 已提交
982
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
H
hzcheng 已提交
983 984 985 986 987
        goto _error_clean;
      }
      strcpy(fname, full_path.we_wordv[0]);
      wordfree(&full_path);

S
slguan 已提交
988 989
      STableDataBlocks *pDataBlock = tscCreateDataBlockEx(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize,
                                                          sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
H
hzcheng 已提交
990

S
slguan 已提交
991 992
      tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
      strcpy(pDataBlock->filename, fname);
S
slguan 已提交
993
    } else if (sToken.type == TK_LP) {
H
hzcheng 已提交
994
      /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
S
slguan 已提交
995
      SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0)->pMeterMeta;
S
slguan 已提交
996
      SSchema *   pSchema = tsGetSchema(pMeterMeta);
H
hzcheng 已提交
997 998 999 1000

      if (pCmd->isInsertFromFile == -1) {
        pCmd->isInsertFromFile = 0;
      } else if (pCmd->isInsertFromFile == 1) {
H
hjxilinx 已提交
1001
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
H
hzcheng 已提交
1002 1003 1004
        goto _error_clean;
      }

1005 1006
      SParsedDataColInfo spd = {0};
      spd.numOfCols = pMeterMeta->numOfColumns;
H
hzcheng 已提交
1007 1008 1009 1010 1011 1012 1013

      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < pMeterMeta->numOfColumns; ++t) {
        offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
      }

      while (1) {
S
slguan 已提交
1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
        index = 0;
        sToken = tStrGetToken(str, &index, false, 0, NULL);
        str += index;

        if (TK_STRING == sToken.type) {
          sToken.n = strdequote(sToken.z);
          strtrim(sToken.z);
          sToken.n = (uint32_t)strlen(sToken.z);
        }

        if (sToken.type == TK_RP) {
H
hzcheng 已提交
1025 1026 1027 1028 1029 1030 1031
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < pMeterMeta->numOfColumns; ++t) {
S
slguan 已提交
1032
          if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
S
slguan 已提交
1033
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
H
hzcheng 已提交
1034 1035 1036 1037
            pElem->offset = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
H
hjxilinx 已提交
1038
              code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
H
hzcheng 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047
              goto _error_clean;
            }

            spd.hasVal[t] = true;
            findColumnIndex = true;
            break;
          }
        }

S
slguan 已提交
1048
        if (!findColumnIndex) {
H
hjxilinx 已提交
1049
          code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
H
hzcheng 已提交
1050 1051 1052 1053
          goto _error_clean;
        }
      }

1054
      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > pMeterMeta->numOfColumns) {
H
hjxilinx 已提交
1055
        code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
H
hzcheng 已提交
1056 1057 1058
        goto _error_clean;
      }

S
slguan 已提交
1059 1060 1061 1062 1063
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;

      if (sToken.type != TK_VALUES) {
H
hjxilinx 已提交
1064
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
H
hzcheng 已提交
1065 1066 1067
        goto _error_clean;
      }

S
slguan 已提交
1068
      code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum);
H
hzcheng 已提交
1069 1070 1071 1072
      if (code != TSDB_CODE_SUCCESS) {
        goto _error_clean;
      }
    } else {
H
hjxilinx 已提交
1073
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
hzcheng 已提交
1074 1075 1076 1077
      goto _error_clean;
    }
  }

S
slguan 已提交
1078 1079 1080 1081 1082
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }

S
slguan 已提交
1083
  // submit to more than one vnode
H
hzcheng 已提交
1084
  if (pCmd->pDataBlocks->nSize > 0) {
S
slguan 已提交
1085
    // merge according to vgid
S
slguan 已提交
1086 1087 1088
    if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
      goto _error_clean;
    }
H
hzcheng 已提交
1089

S
slguan 已提交
1090 1091 1092
    STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
    if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
      goto _error_clean;
H
hzcheng 已提交
1093 1094
    }

H
hjxilinx 已提交
1095 1096
    SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
    
S
slguan 已提交
1097
    // set the next sent data vnode index in data block arraylist
H
hjxilinx 已提交
1098
    pMeterMetaInfo->vnodeIndex = 1;
H
hzcheng 已提交
1099
  } else {
S
slguan 已提交
1100
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1101 1102 1103 1104 1105 1106
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

_error_clean:
S
slguan 已提交
1107
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1108 1109

_clean:
S
slguan 已提交
1110
  taosCleanUpIntHash(pTableHashList);
H
hzcheng 已提交
1111 1112 1113
  return code;
}

S
slguan 已提交
1114
int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
S
slguan 已提交
1115 1116 1117
  if (!pSql->pTscObj->writeAuth) {
    return TSDB_CODE_NO_RIGHTS;
  }
H
hzcheng 已提交
1118

H
hjxilinx 已提交
1119
  int32_t  index = 0;
S
slguan 已提交
1120
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1121

S
slguan 已提交
1122
  SSQLToken sToken = tStrGetToken(sql, &index, false, 0, NULL);
H
hjxilinx 已提交
1123 1124 1125 1126
  
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
  pCmd->import = (sToken.type == TK_IMPORT);
  
S
slguan 已提交
1127 1128
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1129
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1130
  }
H
hjxilinx 已提交
1131 1132 1133 1134 1135 1136 1137
  
  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;
  pCmd->isInsertFromFile = -1;
  pSql->res.numOfRows = 0;
  
  return doParserInsertSql(pSql, sql + index);
H
hzcheng 已提交
1138 1139
}

S
slguan 已提交
1140
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
H
hzcheng 已提交
1141
  int32_t ret = TSDB_CODE_SUCCESS;
S
slguan 已提交
1142 1143 1144

  // must before clean the sqlcmd object
  tscRemoveAllMeterMetaInfo(&pSql->cmd, false);
S
slguan 已提交
1145
  tscCleanSqlCmd(&pSql->cmd);
H
hzcheng 已提交
1146 1147 1148

  if (tscIsInsertOrImportData(pSql->sqlstr)) {
    /*
S
slguan 已提交
1149 1150 1151
     * only for async multi-vnode insertion
     * Set the fp before parse the sql string, in case of getmetermeta failed, in which
     * the error handle callback function can rightfully restore the user defined function (fp)
H
hzcheng 已提交
1152 1153 1154 1155 1156
     */
    if (pSql->fp != NULL && multiVnodeInsertion) {
      assert(pSql->fetchFp == NULL);
      pSql->fetchFp = pSql->fp;

S
slguan 已提交
1157
      // replace user defined callback function with multi-insert proxy function
H
hzcheng 已提交
1158 1159 1160 1161 1162
      pSql->fp = tscAsyncInsertMultiVnodesProxy;
    }

    ret = tsParseInsertSql(pSql, pSql->sqlstr, acct, db);
  } else {
S
slguan 已提交
1163 1164 1165
    ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
    if (TSDB_CODE_SUCCESS != ret) return ret;
    
H
hzcheng 已提交
1166 1167
    SSqlInfo SQLInfo = {0};
    tSQLParse(&SQLInfo, pSql->sqlstr);
S
slguan 已提交
1168

H
hzcheng 已提交
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
    ret = tscToSQLCmd(pSql, &SQLInfo);
    SQLInfoDestroy(&SQLInfo);
  }

  /*
   * the pRes->code may be modified or even released by another thread in tscMeterMetaCallBack
   * function, so do NOT use pRes->code to determine if the getMeterMeta/getMetricMeta function
   * invokes new threads to get data from mnode or simply retrieves data from cache.
   *
   * do NOT assign return code to pRes->code for the same reason for it may be released by another thread
   * pRes->code = ret;
   */
  return ret;
}

S
slguan 已提交
1184 1185 1186
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1187

S
slguan 已提交
1188
  SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0)->pMeterMeta;
S
slguan 已提交
1189 1190 1191 1192

  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
  tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);

S
slguan 已提交
1193 1194 1195
  if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
    return code;
  }
S
slguan 已提交
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209

  // the pDataBlock is different from the pTableDataBlocks
  STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

  if ((code = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
    return code;
  }

  return TSDB_CODE_SUCCESS;
}

L
[1292]  
lihui 已提交
1210
static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
S
slguan 已提交
1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222
  size_t          readLen = 0;
  char *          line = NULL;
  size_t          n = 0;
  int             len = 0;
  uint32_t        maxRows = 0;
  SSqlCmd *       pCmd = &pSql->cmd;
  int             numOfRows = 0;
  int32_t         code = 0;
  int             nrows = 0;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
  SMeterMeta *    pMeterMeta = pMeterMetaInfo->pMeterMeta;
  int32_t         rowSize = pMeterMeta->rowSize;
S
slguan 已提交
1223 1224

  pCmd->pDataBlocks = tscCreateBlockArrayList();
S
slguan 已提交
1225 1226
  STableDataBlocks *pTableDataBlock =
      tscCreateDataBlockEx(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
S
slguan 已提交
1227 1228 1229 1230

  tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);

  maxRows = tscAllocateMemIfNeed(pTableDataBlock, rowSize);
H
hzcheng 已提交
1231 1232 1233
  if (maxRows < 1) return -1;

  int                count = 0;
S
slguan 已提交
1234 1235
  SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns};
  SSchema *          pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
1236

S
slguan 已提交
1237
  tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns);
H
hzcheng 已提交
1238

H
huili 已提交
1239
  while ((readLen = getline(&line, &n, fp)) != -1) {
H
hzcheng 已提交
1240 1241
    // line[--readLen] = '\0';
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0;
F
fang 已提交
1242
    if (readLen == 0) continue; //fang, <= to ==
H
huili 已提交
1243

S
slguan 已提交
1244
    char *lineptr = line;
H
hzcheng 已提交
1245
    strtolower(line, line);
S
slguan 已提交
1246

H
hjxilinx 已提交
1247
    if (numOfRows >= maxRows || pTableDataBlock->size + pMeterMeta->rowSize >= pTableDataBlock->nAllocSize) {
S
slguan 已提交
1248 1249 1250
      uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, pMeterMeta->rowSize);
      if (0 == tSize) return (-TSDB_CODE_CLI_OUT_OF_MEMORY);
      maxRows += tSize;    
H
hjxilinx 已提交
1251 1252
    }

L
[1292]  
lihui 已提交
1253
    len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code, tmpTokenBuf);
S
slguan 已提交
1254
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
L
[1292]  
lihui 已提交
1255 1256
      pSql->res.code = code;
      return (-code);
S
slguan 已提交
1257
    }
H
hjxilinx 已提交
1258
    
S
slguan 已提交
1259
    pTableDataBlock->size += len;
H
hzcheng 已提交
1260 1261 1262 1263

    count++;
    nrows++;
    if (count >= maxRows) {
S
slguan 已提交
1264 1265
      if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
        return -code;
H
hzcheng 已提交
1266
      }
S
slguan 已提交
1267 1268 1269 1270 1271

      pTableDataBlock = pCmd->pDataBlocks->pData[0];
      pTableDataBlock->size = sizeof(SShellSubmitBlock);
      pTableDataBlock->rowSize = pMeterMeta->rowSize;

H
hzcheng 已提交
1272
      numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1273
      pSql->res.numOfRows = 0;
H
hzcheng 已提交
1274 1275 1276 1277 1278
      count = 0;
    }
  }

  if (count > 0) {
S
slguan 已提交
1279 1280
    if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
      return -code;
H
hzcheng 已提交
1281
    }
S
slguan 已提交
1282

H
hzcheng 已提交
1283
    numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1284
    pSql->res.numOfRows = 0;
H
hzcheng 已提交
1285 1286 1287
  }

  if (line) tfree(line);
S
slguan 已提交
1288

H
hzcheng 已提交
1289 1290 1291 1292 1293 1294 1295 1296 1297
  return numOfRows;
}

/* multi-vnodes insertion in sync query model
 *
 * modify history
 * 2019.05.10 lihui
 * Remove the code for importing records from files
 */
S
slguan 已提交
1298 1299
void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1300 1301

  // not insert/import, return directly
H
hzcheng 已提交
1302 1303 1304 1305
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

S
slguan 已提交
1306 1307 1308 1309 1310
  // SSqlCmd may have been released
  if (pCmd->pDataBlocks == NULL) {
    return;
  }

S
slguan 已提交
1311
  STableDataBlocks *pDataBlock = NULL;
S
slguan 已提交
1312
  SMeterMetaInfo *  pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
S
slguan 已提交
1313
  int32_t           code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1314 1315

  /* the first block has been sent to server in processSQL function */
H
hjxilinx 已提交
1316
  assert(pCmd->isInsertFromFile != -1 && pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL);
H
hzcheng 已提交
1317

H
hjxilinx 已提交
1318
  if (pMeterMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) {
S
slguan 已提交
1319
    SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
H
hzcheng 已提交
1320

H
hjxilinx 已提交
1321
    for (int32_t i = pMeterMetaInfo->vnodeIndex; i < pDataBlocks->nSize; ++i) {
H
hzcheng 已提交
1322 1323 1324 1325 1326 1327
      pDataBlock = pDataBlocks->pData[i];
      if (pDataBlock == NULL) {
        continue;
      }

      if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1328
        tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, pDataBlocks->nSize);
H
hzcheng 已提交
1329 1330 1331 1332 1333 1334 1335 1336
        continue;
      }

      tscProcessSql(pSql);
    }
  }

  // all data have been submit to vnode, release data blocks
S
slguan 已提交
1337
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1338 1339
}

S
slguan 已提交
1340
// multi-vnodes insertion in sync query model
S
slguan 已提交
1341 1342
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1343 1344 1345 1346
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

S
slguan 已提交
1347
  SMeterMetaInfo *  pInfo = tscGetMeterMetaInfo(pCmd, 0);
S
slguan 已提交
1348 1349
  STableDataBlocks *pDataBlock = NULL;
  int32_t           affected_rows = 0;
H
hzcheng 已提交
1350

S
slguan 已提交
1351 1352 1353
  assert(pCmd->isInsertFromFile == 1 && pCmd->pDataBlocks != NULL);
  SDataBlockList *pDataBlockList = pCmd->pDataBlocks;
  pCmd->pDataBlocks = NULL;
H
hzcheng 已提交
1354

S
slguan 已提交
1355
  char path[PATH_MAX] = {0};
H
hzcheng 已提交
1356

S
slguan 已提交
1357 1358
  for (int32_t i = 0; i < pDataBlockList->nSize; ++i) {
    pDataBlock = pDataBlockList->pData[i];
H
hzcheng 已提交
1359 1360 1361
    if (pDataBlock == NULL) {
      continue;
    }
S
slguan 已提交
1362 1363 1364 1365 1366
    
    if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) {
      tscError("%p failed to malloc when insert file", pSql);
      continue;
    }
H
hzcheng 已提交
1367 1368
    pCmd->count = 1;

S
slguan 已提交
1369 1370 1371
    strncpy(path, pDataBlock->filename, PATH_MAX);

    FILE *fp = fopen(path, "r");
H
hzcheng 已提交
1372
    if (fp == NULL) {
S
slguan 已提交
1373
      tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno));
H
hzcheng 已提交
1374 1375 1376
      continue;
    }

S
slguan 已提交
1377
    strncpy(pInfo->name, pDataBlock->meterId, TSDB_METER_ID_LEN);
S
slguan 已提交
1378 1379
    memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);

S
slguan 已提交
1380
    int32_t ret = tscGetMeterMeta(pSql, pInfo->name, 0);
S
slguan 已提交
1381 1382 1383 1384
    if (ret != TSDB_CODE_SUCCESS) {
      tscError("%p get meter meta failed, abort", pSql);
      continue;
    }
L
[1292]  
lihui 已提交
1385 1386 1387 1388 1389 1390
    
    char*   tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
    if (NULL == tmpTokenBuf) {
      tscError("%p calloc failed", pSql);
      continue;
    }
S
slguan 已提交
1391

L
[1292]  
lihui 已提交
1392 1393 1394
    int nrows = tscInsertDataFromFile(pSql, fp, tmpTokenBuf);
    free(tmpTokenBuf);
    
S
slguan 已提交
1395 1396
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
hzcheng 已提交
1397 1398
    if (nrows < 0) {
      fclose(fp);
S
slguan 已提交
1399
      tscTrace("%p no records(%d) in file %s", pSql, nrows, path);
H
hzcheng 已提交
1400 1401 1402
      continue;
    }

S
slguan 已提交
1403
    fclose(fp);
H
hzcheng 已提交
1404 1405
    affected_rows += nrows;

S
slguan 已提交
1406
    tscTrace("%p Insert data %d records from file %s", pSql, nrows, path);
H
hzcheng 已提交
1407 1408 1409 1410 1411
  }

  pSql->res.numOfRows = affected_rows;

  // all data have been submit to vnode, release data blocks
S
slguan 已提交
1412 1413
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
  tscDestroyBlockArrayList(pDataBlockList);
H
hzcheng 已提交
1414
}