tscParseInsert.c 48.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

21
#include "os.h"
22 23

#include "hash.h"
H
hzcheng 已提交
24 25 26
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
27
#include "ttokendef.h"
H
hzcheng 已提交
28
#include "taosdef.h"
H
hzcheng 已提交
29

S
slguan 已提交
30
#include "tscLog.h"
H
hjxilinx 已提交
31
#include "tscSubquery.h"
H
hzcheng 已提交
32 33 34
#include "tstoken.h"
#include "ttime.h"

S
slguan 已提交
35
#include "tdataformat.h"
36

S
slguan 已提交
37
enum {
S
slguan 已提交
38 39 40 41
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

L
lihui 已提交
42
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
H
hzcheng 已提交
43

S
slguan 已提交
44
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
45 46 47 48
  if (pToken->n == 0) {
    return TK_ILLEGAL;
  }
  
B
Bomin Zhang 已提交
49 50 51 52 53 54 55 56

  int32_t radix = 10;
  if (pToken->type == TK_HEX) {
    radix = 16;
  } else if (pToken->type == TK_BIN) {
    radix = 2;
  }
  
L
lihui 已提交
57
  errno = 0;
B
Bomin Zhang 已提交
58
  *value = strtoll(pToken->z, endPtr, radix);
B
Bomin Zhang 已提交
59 60 61 62 63 64 65 66 67
  if (**endPtr == 'e' || **endPtr == 'E' || **endPtr == '.') {
    errno = 0;
    double v = round(strtod(pToken->z, endPtr));
    if (v > INT64_MAX || v <= INT64_MIN) {
      errno = ERANGE;
    } else {
      *value = v;
    }
  }
68 69
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
70
  if (*endPtr - pToken->z != pToken->n) {
71 72
    return TK_ILLEGAL;
  }
S
slguan 已提交
73

H
Haojun Liao 已提交
74
  return pToken->type;
H
hzcheng 已提交
75 76
}

S
slguan 已提交
77
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
78 79 80 81
  if (pToken->n == 0) {
    return TK_ILLEGAL;
  }
  
L
lihui 已提交
82
  errno = 0;
S
slguan 已提交
83
  *value = strtod(pToken->z, endPtr);
84 85
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
86
  if ((*endPtr - pToken->z) != pToken->n) {
87 88
    return TK_ILLEGAL;
  }
B
Bomin Zhang 已提交
89 90

  return pToken->type;
S
slguan 已提交
91
}
H
hzcheng 已提交
92

S
slguan 已提交
93 94 95 96 97 98
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
  int32_t   index = 0;
  SSQLToken sToken;
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
99

S
slguan 已提交
100
  index = 0;
H
hzcheng 已提交
101

S
slguan 已提交
102
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
103
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
104
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
105
    // do nothing
S
slguan 已提交
106 107
  } else if (pToken->type == TK_INTEGER) {
    useconds = str2int64(pToken->z);
H
hzcheng 已提交
108 109
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
dengyihao's avatar
dengyihao 已提交
110
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
111
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
112 113 114 115 116
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
117 118 119 120
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
121 122 123 124 125 126 127 128
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
129 130 131
   * time expression:
   * e.g., now+12a, now-5h
   */
S
slguan 已提交
132 133 134 135
  SSQLToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
  pTokenEnd += index;
136

S
slguan 已提交
137 138 139 140
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
    pTokenEnd += index;
141

S
slguan 已提交
142
    if (valueToken.n < 2) {
H
hjxilinx 已提交
143
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
144 145
    }

S
slguan 已提交
146
    if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
147
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
148
    }
149

H
hzcheng 已提交
150 151 152 153
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
154
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
155 156 157 158 159 160 161 162 163 164 165 166
      useconds += interval;
    } else {
      useconds = (useconds >= interval) ? useconds - interval : 0;
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
167
int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
S
slguan 已提交
168 169 170
                             int16_t timePrec) {
  int64_t iv;
  int32_t numType;
S
slguan 已提交
171
  char *  endptr = NULL;
172 173
  errno = 0;  // clear the previous existed error information

H
hzcheng 已提交
174 175
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
S
slguan 已提交
176 177
      if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
S
slguan 已提交
178
          *(uint8_t *)payload = TSDB_TRUE;
S
slguan 已提交
179
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
S
slguan 已提交
180
          *(uint8_t *)payload = TSDB_FALSE;
S
slguan 已提交
181 182
        } else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
          *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
H
hzcheng 已提交
183
        } else {
H
hjxilinx 已提交
184
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
185
        }
S
slguan 已提交
186 187 188 189 190 191 192 193 194
      } else if (pToken->type == TK_INTEGER) {
        iv = strtoll(pToken->z, NULL, 10);
        *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_FLOAT) {
        double dv = strtod(pToken->z, NULL);
        *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_NULL) {
        *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
      } else {
H
hjxilinx 已提交
195
        return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
196 197 198 199
      }
      break;
    }
    case TSDB_DATA_TYPE_TINYINT:
S
slguan 已提交
200 201 202 203 204
      if (pToken->type == TK_NULL) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
205
      } else {
S
slguan 已提交
206
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
207
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
208
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
L
lihui 已提交
209
        } else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
H
hjxilinx 已提交
210
          return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
H
hzcheng 已提交
211 212
        }

213
        *((int8_t *)payload) = (int8_t)iv;
H
hzcheng 已提交
214 215 216 217 218
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
S
slguan 已提交
219 220 221 222 223
      if (pToken->type == TK_NULL) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
224
      } else {
S
slguan 已提交
225
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
226
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
227
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
L
lihui 已提交
228
        } else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
H
hjxilinx 已提交
229
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
230 231
        }

S
slguan 已提交
232
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
233 234 235 236
      }
      break;

    case TSDB_DATA_TYPE_INT:
S
slguan 已提交
237 238 239 240
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
241
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
242
      } else {
S
slguan 已提交
243
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
244
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
245
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
L
lihui 已提交
246
        } else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
H
hjxilinx 已提交
247
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
248 249
        }

S
slguan 已提交
250
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
251 252 253 254 255
      }

      break;

    case TSDB_DATA_TYPE_BIGINT:
S
slguan 已提交
256 257 258 259
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
260
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
261
      } else {
S
slguan 已提交
262
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
263
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
264
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
B
Bomin Zhang 已提交
265
        } else if (errno == ERANGE || iv == INT64_MIN) {
H
hjxilinx 已提交
266
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
267
        }
S
slguan 已提交
268 269

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
270 271 272 273
      }
      break;

    case TSDB_DATA_TYPE_FLOAT:
S
slguan 已提交
274 275 276 277
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
278
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
279
      } else {
S
slguan 已提交
280 281
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
282
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
283 284 285 286
        }

        float fv = (float)dv;
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
H
hjxilinx 已提交
287
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
288 289
        }

S
slguan 已提交
290 291
        if (isinf(fv) || isnan(fv)) {
          *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
292
        }
S
slguan 已提交
293 294

        *((float *)payload) = fv;
H
hzcheng 已提交
295 296 297 298
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
S
slguan 已提交
299 300 301 302
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
303
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
304
      } else {
S
slguan 已提交
305 306
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
307
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
308 309
        }

S
slguan 已提交
310
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
H
hjxilinx 已提交
311
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
312 313 314 315 316 317
        }

        if (isinf(dv) || isnan(dv)) {
          *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
        } else {
          *((double *)payload) = dv;
H
hzcheng 已提交
318 319 320 321 322
        }
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
323 324
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
325
        setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
H
hjxilinx 已提交
326
      } else { // too long values will return invalid sql, not be truncated automatically
H
hjxilinx 已提交
327
        if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
H
hjxilinx 已提交
328
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
329
        }
H
hjxilinx 已提交
330
        
331
        STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
H
hzcheng 已提交
332 333 334 335 336
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
337
      if (pToken->type == TK_NULL) {
338
        setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
339
      } else {
H
hjxilinx 已提交
340
        // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
341 342
        size_t output = 0;
        if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
H
hjxilinx 已提交
343 344
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), "%s", strerror(errno));
H
hjxilinx 已提交
345
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
346
        }
347
        
348
        varDataSetLen(payload, output);
H
hzcheng 已提交
349 350 351 352
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
353
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
354
        if (primaryKey) {
S
slguan 已提交
355
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
356
        } else {
S
slguan 已提交
357
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
358 359
        }
      } else {
S
slguan 已提交
360 361
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
362
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
363
        }
H
hjxilinx 已提交
364
        
S
slguan 已提交
365
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
366 367 368 369 370 371
      }

      break;
    }
  }

H
hjxilinx 已提交
372
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
373 374
}

S
slguan 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == 0) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
395
      return -1;  // client time/server time can not be mixed
396

S
slguan 已提交
397 398 399 400 401 402 403 404 405 406 407 408 409
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
410
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
411 412
                      int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
  int32_t index = 0;
H
hjxilinx 已提交
413
  SSQLToken sToken = {0};
S
slguan 已提交
414
  char *    payload = pDataBlocks->pData + pDataBlocks->size;
S
slguan 已提交
415

S
slguan 已提交
416
  // 1. set the parsed value from sql string
H
hzcheng 已提交
417
  int32_t rowSize = 0;
418
  for (int i = 0; i < spd->numOfAssignedCols; ++i) {
S
slguan 已提交
419
    // the start position in data block buffer of current value in sql
420 421
    char *   start = payload + spd->elems[i].offset;
    int16_t  colIndex = spd->elems[i].colIndex;
S
slguan 已提交
422
    SSchema *pSchema = schema + colIndex;
S
slguan 已提交
423
    rowSize += pSchema->bytes;
H
hzcheng 已提交
424

S
slguan 已提交
425 426 427 428 429 430 431 432 433
    index = 0;
    sToken = tStrGetToken(*str, &index, true, 0, NULL);
    *str += index;

    if (sToken.type == TK_QUESTION) {
      uint32_t offset = start - pDataBlocks->pData;
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
434

S
slguan 已提交
435
      strcpy(error, "client out of memory");
436
      *code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
437 438 439
      return -1;
    }

440 441 442
    int16_t type = sToken.type;
    if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
         type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
H
hjxilinx 已提交
443
      tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z);
444
      *code = TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
445
      return -1;
H
hzcheng 已提交
446 447
    }

S
slguan 已提交
448 449
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
450
      // delete escape character: \\, \', \"
451
      char    delim = sToken.z[0];
L
[1292]  
lihui 已提交
452 453
      int32_t cnt = 0;
      int32_t j = 0;
454
      for (int32_t k = 1; k < sToken.n - 1; ++k) {
F
fang 已提交
455 456
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
457
            cnt++;
L
lihui 已提交
458 459 460
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
461 462 463
            continue;
          }
        }
464

L
[NONE]  
lihui 已提交
465
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
466 467
        j++;
      }
468
      tmpTokenBuf[j] = 0;
L
[1292]  
lihui 已提交
469
      sToken.z = tmpTokenBuf;
470
      sToken.n -= 2 + cnt;
H
hzcheng 已提交
471 472
    }

S
slguan 已提交
473 474
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
    int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
S
slguan 已提交
475
    if (ret != TSDB_CODE_SUCCESS) {
476
      *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
477 478
      return -1;  // NOTE: here 0 mean error!
    }
479

S
slguan 已提交
480
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
481
      tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
482
      *code = TSDB_CODE_TSC_INVALID_TIME_STAMP;
S
slguan 已提交
483
      return -1;
484
    }
H
hzcheng 已提交
485 486
  }

S
slguan 已提交
487
  // 2. set the null value for the columns that do not assign values
488
  if (spd->numOfAssignedCols < spd->numOfCols) {
S
slguan 已提交
489
    char *ptr = payload;
H
hzcheng 已提交
490 491

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
492
      
493
      if (!spd->hasVal[i]) {  // current column do not have any value to insert, set it to null
494 495 496 497 498 499 500 501 502
        if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
          varDataSetLen(ptr, sizeof(int8_t));
          *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
        } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
          varDataSetLen(ptr, sizeof(int32_t));
          *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
        } else {
          setNull(ptr, schema[i].type, schema[i].bytes);
        }
H
hzcheng 已提交
503
      }
504
      
H
hzcheng 已提交
505 506 507 508 509 510 511 512 513
      ptr += schema[i].bytes;
    }

    rowSize = ptr - payload;
  }

  return rowSize;
}

S
slguan 已提交
514 515 516 517 518 519 520 521 522
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
523 524
}

H
hjxilinx 已提交
525
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows,
526
                  SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
S
slguan 已提交
527 528
  int32_t   index = 0;
  SSQLToken sToken;
H
hzcheng 已提交
529 530 531

  int16_t numOfRows = 0;

H
hjxilinx 已提交
532
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hjxilinx 已提交
533
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
534 535
  
  int32_t  precision = tinfo.precision;
S
slguan 已提交
536 537

  if (spd->hasVal[0] == false) {
S
slguan 已提交
538
    strcpy(error, "primary timestamp column can not be null");
539
    *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
540 541 542 543
    return -1;
  }

  while (1) {
S
slguan 已提交
544 545 546
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
547

S
slguan 已提交
548
    *str += index;
H
hjxilinx 已提交
549
    if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
L
lihui 已提交
550
      int32_t tSize;
H
hjxilinx 已提交
551
      int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
L
lihui 已提交
552
      if (retcode != TSDB_CODE_SUCCESS) {  //TODO pass the correct error code to client
S
slguan 已提交
553
        strcpy(error, "client out of memory");
L
lihui 已提交
554
        *code = retcode;
S
slguan 已提交
555 556
        return -1;
      }
L
lihui 已提交
557 558
      ASSERT(tSize > maxRows);
      maxRows = tSize;
H
hzcheng 已提交
559 560
    }

L
[1292]  
lihui 已提交
561
    int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
562
    if (len <= 0) {  // error message has been set in tsParseOneRowData
H
hzcheng 已提交
563 564 565 566 567
      return -1;
    }

    pDataBlock->size += len;

S
slguan 已提交
568 569 570 571
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
hjxilinx 已提交
572
      tscInvalidSQLErrMsg(error, ") expected", *str);
573
      *code = TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
574 575 576 577 578 579 580 581
      return -1;
    }

    numOfRows++;
  }

  if (numOfRows <= 0) {
    strcpy(error, "no any data points");
582
    *code = TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
583 584 585
    return -1;
  } else {
    return numOfRows;
H
hzcheng 已提交
586 587 588
  }
}

S
slguan 已提交
589
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) {
H
hzcheng 已提交
590
  spd->numOfCols = numOfCols;
591
  spd->numOfAssignedCols = numOfCols;
H
hzcheng 已提交
592 593 594 595 596 597 598 599 600 601 602

  for (int32_t i = 0; i < numOfCols; ++i) {
    spd->hasVal[i] = true;
    spd->elems[i].colIndex = i;

    if (i > 0) {
      spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes;
    }
  }
}

L
lihui 已提交
603
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
S
slguan 已提交
604
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
605
  const int factor = 5;
S
slguan 已提交
606
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
L
[#1102]  
lihui 已提交
607
  
H
hzcheng 已提交
608
  // expand the allocated size
S
slguan 已提交
609 610
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
611
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
612 613
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
614

S
slguan 已提交
615 616 617 618 619
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
H
hjxilinx 已提交
620
      // do nothing, if allocate more memory failed
S
slguan 已提交
621
      pDataBlock->nAllocSize = nAllocSizeOld;
L
[#1102]  
lihui 已提交
622
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
623
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
624
    }
H
hzcheng 已提交
625 626
  }

L
[#1102]  
lihui 已提交
627
  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
628
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
629 630
}

631 632
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
  pBlocks->tid = pTableMeta->sid;
H
hjxilinx 已提交
633 634
  pBlocks->uid = pTableMeta->uid;
  pBlocks->sversion = pTableMeta->sversion;
S
slguan 已提交
635
  pBlocks->numOfRows += numOfRows;
H
hzcheng 已提交
636 637
}

S
slguan 已提交
638
// data block is disordered, sort it in ascending order
H
hjxilinx 已提交
639
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
640
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
S
slguan 已提交
641 642

  // size is less than the total size, since duplicated rows may be removed yet.
643
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
S
slguan 已提交
644

S
slguan 已提交
645 646 647 648 649
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
650
  if (!dataBuf->ordered) {
651
    char *pBlockData = pBlocks->data;
S
slguan 已提交
652
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
653

S
slguan 已提交
654 655
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
656

S
slguan 已提交
657
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
658 659
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
660

S
slguan 已提交
661 662 663 664
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
665

S
slguan 已提交
666 667 668 669 670 671 672 673 674
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
675

S
slguan 已提交
676
    pBlocks->numOfRows = i + 1;
677
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
678
  }
S
slguan 已提交
679 680
}

681
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd,
S
slguan 已提交
682
                                      int32_t *totalNum) {
S
slguan 已提交
683
  SSqlCmd *       pCmd = &pSql->cmd;
684
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
685
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
686
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
687
  
H
hjxilinx 已提交
688
  STableDataBlocks *dataBuf = NULL;
689
  int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
690
                                        sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
H
hjxilinx 已提交
691
                                        pTableMeta, &dataBuf);
H
hjxilinx 已提交
692 693 694 695
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  
L
lihui 已提交
696
  int32_t maxNumOfRows;
H
hjxilinx 已提交
697
  ret = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
L
lihui 已提交
698
  if (TSDB_CODE_SUCCESS != ret) {
699
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
700
  }
701

702
  int32_t code = TSDB_CODE_TSC_INVALID_SQL;
703
  char *  tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
L
[1292]  
lihui 已提交
704
  if (NULL == tmpTokenBuf) {
705
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
L
[1292]  
lihui 已提交
706
  }
L
lihui 已提交
707

H
hjxilinx 已提交
708
  int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
L
[1292]  
lihui 已提交
709
  free(tmpTokenBuf);
H
hzcheng 已提交
710
  if (numOfRows <= 0) {
L
[1292]  
lihui 已提交
711
    return code;
H
hzcheng 已提交
712 713
  }

S
slguan 已提交
714
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
715
    SParamInfo *param = dataBuf->params + i;
S
slguan 已提交
716 717
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
718
      param->offset -= sizeof(SSubmitBlk);
S
slguan 已提交
719 720 721
    }
  }

722
  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
H
hjxilinx 已提交
723
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
724

H
hjxilinx 已提交
725
  dataBuf->vgId = pTableMeta->vgroupInfo.vgId;
S
slguan 已提交
726
  dataBuf->numOfTables = 1;
H
hzcheng 已提交
727 728

  /*
S
slguan 已提交
729 730
   * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
   * which is actually returned from server.
H
hzcheng 已提交
731
   */
S
slguan 已提交
732
  *totalNum += numOfRows;
H
hzcheng 已提交
733 734 735
  return TSDB_CODE_SUCCESS;
}

736
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
S
slguan 已提交
737
  int32_t   index = 0;
738 739
  SSQLToken sToken = {0};
  SSQLToken tableToken = {0};
S
slguan 已提交
740
  int32_t   code = TSDB_CODE_SUCCESS;
741 742 743 744 745 746
  
  const int32_t TABLE_INDEX = 0;
  const int32_t STABLE_INDEX = 1;
  
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
747

S
slguan 已提交
748
  char *sql = *sqlstr;
749

S
slguan 已提交
750 751 752 753
  // get the token of specified table
  index = 0;
  tableToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;
H
hzcheng 已提交
754

S
slguan 已提交
755 756
  char *cstart = NULL;
  char *cend = NULL;
H
hzcheng 已提交
757

S
slguan 已提交
758 759 760 761 762
  // skip possibly exists column list
  index = 0;
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;

H
hzcheng 已提交
763
  int32_t numOfColList = 0;
S
slguan 已提交
764
  bool    createTable = false;
H
hzcheng 已提交
765

S
slguan 已提交
766 767 768
  if (sToken.type == TK_LP) {
    cstart = &sToken.z[0];
    index = 0;
H
hzcheng 已提交
769
    while (1) {
S
slguan 已提交
770 771 772
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      if (sToken.type == TK_RP) {
        cend = &sToken.z[0];
H
hzcheng 已提交
773 774 775 776 777 778
        break;
      }

      ++numOfColList;
    }

S
slguan 已提交
779 780
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
H
hzcheng 已提交
781 782 783
  }

  if (numOfColList == 0 && cstart != NULL) {
784
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
785
  }
786
  
H
hjxilinx 已提交
787
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
788 789
  
  if (sToken.type == TK_USING) {  // create table if not exists according to the super table
S
slguan 已提交
790 791 792 793
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;

H
Haojun Liao 已提交
794 795 796
    tscAllocPayload(pCmd, sizeof(STagData));
    STagData *pTag = (STagData *) pCmd->payload;

S
slguan 已提交
797
    memset(pTag, 0, sizeof(STagData));
798
    
H
Haojun Liao 已提交
799
    //the source super table is moved to the secondary position of the pTableMetaInfo list
800
    if (pQueryInfo->numOfTables < 2) {
H
hjxilinx 已提交
801
      tscAddEmptyMetaInfo(pQueryInfo);
802
    }
H
hzcheng 已提交
803

H
hjxilinx 已提交
804
    STableMetaInfo *pSTableMeterMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
H
Haojun Liao 已提交
805
    tscSetTableFullName(pSTableMeterMetaInfo, &sToken, pSql);
806

B
Bomin Zhang 已提交
807
    tstrncpy(pTag->name, pSTableMeterMetaInfo->name, sizeof(pTag->name));
H
hjxilinx 已提交
808
    code = tscGetTableMeta(pSql, pSTableMeterMetaInfo);
H
hzcheng 已提交
809 810 811 812
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

weixin_48148422's avatar
weixin_48148422 已提交
813
    if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMeterMetaInfo)) {
H
hjxilinx 已提交
814
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
815 816
    }

H
hjxilinx 已提交
817
    SSchema *pTagSchema = tscGetTableTagSchema(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
818
    STableComInfo tinfo = tscGetTableInfo(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
819
    
S
slguan 已提交
820 821 822
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
L
lihui 已提交
823

824
    SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
825 826
    
    uint8_t numOfTags = tscGetNumOfTags(pSTableMeterMetaInfo->pTableMeta);
L
lihui 已提交
827 828 829 830 831 832
    spd.numOfCols = numOfTags;

    // if specify some tags column
    if (sToken.type != TK_LP) {
      tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
    } else {
833 834
      /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
       * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
L
lihui 已提交
835 836 837 838 839 840 841 842 843 844 845
      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < numOfTags; ++t) {
        offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
      }

      while (1) {
        index = 0;
        sToken = tStrGetToken(sql, &index, false, 0, NULL);
        sql += index;

        if (TK_STRING == sToken.type) {
H
Haojun Liao 已提交
846 847
          strdequote(sToken.z);
          sToken.n = strtrim(sToken.z);
L
lihui 已提交
848 849 850 851 852 853 854 855 856 857 858 859
        }

        if (sToken.type == TK_RP) {
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < numOfTags; ++t) {
          if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) {
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
860
            pElem->offset = offset[t];
L
lihui 已提交
861 862 863 864 865 866
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
              return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z);
            }

867
            spd.hasVal[t] = true;
L
lihui 已提交
868 869 870 871 872 873 874 875 876 877 878 879 880
            findColumnIndex = true;
            break;
          }
        }

        if (!findColumnIndex) {
          return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken.z);
        }
      }

      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > numOfTags) {
        return tscInvalidSQLErrMsg(pCmd->payload, "tag name expected", sToken.z);
      }
L
lihui 已提交
881 882 883 884

      index = 0;
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      sql += index;
L
lihui 已提交
885
    }
886

S
slguan 已提交
887
    if (sToken.type != TK_TAGS) {
L
lihui 已提交
888
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
H
hzcheng 已提交
889 890
    }

S
slguan 已提交
891 892
    uint32_t ignoreTokenTypes = TK_LP;
    uint32_t numOfIgnoreToken = 1;
L
lihui 已提交
893
    for (int i = 0; i < spd.numOfAssignedCols; ++i) {
894
      char *  tagVal = pTag->data + spd.elems[i].offset;
L
lihui 已提交
895
      int16_t colIndex = spd.elems[i].colIndex;
896

S
slguan 已提交
897 898 899
      index = 0;
      sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
      sql += index;
H
Haojun Liao 已提交
900 901

      if (sToken.n == 0 || sToken.type == TK_RP) {
S
slguan 已提交
902 903
        break;
      }
H
hzcheng 已提交
904

S
slguan 已提交
905 906 907 908
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
909 910
      }

H
hjxilinx 已提交
911
      code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
H
hzcheng 已提交
912
      if (code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
913
        return code;
H
hzcheng 已提交
914
      }
L
lihui 已提交
915
    }
S
slguan 已提交
916

L
lihui 已提交
917 918 919 920 921
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
      return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
922 923
    }

L
lihui 已提交
924 925 926
    // 2. set the null value for the columns that do not assign values
    if (spd.numOfAssignedCols < spd.numOfCols) {
      char *ptr = pTag->data;
927

L
lihui 已提交
928
      for (int32_t i = 0; i < spd.numOfCols; ++i) {
929
        if (!spd.hasVal[i]) {  // current tag column do not have any value to insert, set it to null
930 931 932 933 934
          if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
            setVardataNull(ptr, pTagSchema[i].type);
          } else {
            setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
          }
L
lihui 已提交
935
        }
936

L
lihui 已提交
937
        ptr += pTagSchema[i].bytes;
938
      }
H
hzcheng 已提交
939 940
    }

941
    // 3. calculate the actual data size of STagData
942
    pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen);
943
    for (int32_t t = 0; t < numOfTags; ++t) {
944
      pTag->dataLen += pTagSchema[t].bytes;
945 946
      pCmd->payloadLen += pTagSchema[t].bytes;
    }
947
    pTag->dataLen = htonl(pTag->dataLen);
948

H
hzcheng 已提交
949
    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
950
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
951 952
    }

H
Haojun Liao 已提交
953
    int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
H
hzcheng 已提交
954 955 956 957 958
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

    createTable = true;
H
hjxilinx 已提交
959
    code = tscGetMeterMetaEx(pSql, pTableMetaInfo, true);
960
    if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
961 962 963
      return code;
    }
    
H
hzcheng 已提交
964 965 966 967
  } else {
    if (cstart != NULL) {
      sql = cstart;
    } else {
S
slguan 已提交
968
      sql = sToken.z;
H
hzcheng 已提交
969
    }
B
Bomin Zhang 已提交
970
    code = tscGetMeterMetaEx(pSql, pTableMetaInfo, false);
H
hjxilinx 已提交
971
    
972
    if (pCmd->curSql == NULL) {
973
      assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
H
hjxilinx 已提交
974
    }
H
hzcheng 已提交
975 976 977 978 979
  }

  int32_t len = cend - cstart + 1;
  if (cstart != NULL && createTable == true) {
    /* move the column list to start position of the next accessed points */
W
WangXin 已提交
980
    memmove(sql - len, cstart, len);
H
hzcheng 已提交
981 982 983 984 985
    *sqlstr = sql - len;
  } else {
    *sqlstr = sql;
  }

986
  if (*sqlstr == NULL) {
987
    code = TSDB_CODE_TSC_INVALID_SQL;
988 989
  }
  
H
hzcheng 已提交
990 991 992
  return code;
}

H
Hui Li 已提交
993
int validateTableName(char *tblName, int len, SSQLToken* psTblToken) {
B
Bomin Zhang 已提交
994
  tstrncpy(psTblToken->z, tblName, TSDB_TABLE_ID_LEN);
S
slguan 已提交
995

H
Hui Li 已提交
996 997
  psTblToken->n    = len;
  psTblToken->type = TK_ID;
B
Bomin Zhang 已提交
998
  tSQLGetToken(psTblToken->z, &psTblToken->type);
S
slguan 已提交
999

H
Hui Li 已提交
1000
  return tscValidateName(psTblToken);
H
huili 已提交
1001 1002
}

1003 1004 1005 1006 1007 1008 1009 1010 1011
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
  if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
  }

  pCmd->dataSourceType = type;
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
1012 1013 1014 1015 1016 1017 1018 1019 1020
/**
 * usage: insert into table1 values() () table2 values()()
 *
 * @param str
 * @param acct
 * @param db
 * @param pSql
 * @return
 */
H
Haojun Liao 已提交
1021
int tsParseInsertSql(SSqlObj *pSql) {
S
slguan 已提交
1022
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1023
  char* str = pCmd->curSql;
1024

S
slguan 已提交
1025
  int32_t totalNum = 0;
1026 1027 1028 1029 1030
  int32_t code = TSDB_CODE_SUCCESS;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo != NULL);

H
Haojun Liao 已提交
1031
  STableMetaInfo *pTableMetaInfo = NULL;
1032
  if (pQueryInfo->numOfTables == 0) {
H
hjxilinx 已提交
1033
    pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
1034
  } else {
H
hjxilinx 已提交
1035
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1036
  }
H
hzcheng 已提交
1037

H
Haojun Liao 已提交
1038 1039 1040
  if ((code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    return code;
  }
H
hzcheng 已提交
1041

H
Haojun Liao 已提交
1042
  if (NULL == pCmd->pTableList) {
1043
    pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
1044
    pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
1045
    if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
1046
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1047
      goto _error;
L
lihui 已提交
1048 1049
    }
  } else {
1050
    str = pCmd->curSql;
L
lihui 已提交
1051 1052
  }
  
H
Haojun Liao 已提交
1053
  tscTrace("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList);
H
hzcheng 已提交
1054 1055

  while (1) {
1056
    int32_t   index = 0;
S
slguan 已提交
1057
    SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL);
1058 1059 1060 1061 1062 1063 1064 1065

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      /*
       * if the data is from the data file, no data has been generated yet. So, there no data to
       * merge or submit, save the file path and parse the file in other routines.
       */
      if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
S
slguan 已提交
1066 1067 1068
        goto _clean;
      }

1069 1070 1071 1072 1073
      /*
       * if no data has been generated during parsing the sql string, error msg will return
       * Otherwise, create the first submit block and submit to virtual node.
       */
      if (totalNum == 0) {
1074
        code = TSDB_CODE_TSC_INVALID_SQL;
H
Haojun Liao 已提交
1075
        goto _error;
1076 1077
      } else {
        break;
H
hzcheng 已提交
1078 1079 1080
      }
    }

1081
    pCmd->curSql = sToken.z;
B
Bomin Zhang 已提交
1082
    char buf[TSDB_TABLE_ID_LEN];
H
Hui Li 已提交
1083
    SSQLToken sTblToken;
B
Bomin Zhang 已提交
1084
    sTblToken.z = buf;
S
slguan 已提交
1085
    // Check if the table name available or not
H
Hui Li 已提交
1086
    if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1087
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
H
Haojun Liao 已提交
1088
      goto _error;
H
huili 已提交
1089 1090
    }

H
Hui Li 已提交
1091
    if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1092
      goto _error;
H
hzcheng 已提交
1093 1094
    }

1095 1096
    if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
      /*
H
Haojun Liao 已提交
1097 1098
       * After retrieving the table meta from server, the sql string will be parsed from the paused position.
       * And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
1099
       */
1100
      if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
1101
        return code;
H
hzcheng 已提交
1102
      }
H
hjxilinx 已提交
1103
      
H
Haojun Liao 已提交
1104
      tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
1105
      pCmd->curSql = NULL;
H
Haojun Liao 已提交
1106
      goto _error;
H
hzcheng 已提交
1107 1108
    }

weixin_48148422's avatar
weixin_48148422 已提交
1109
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
1110
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
H
Haojun Liao 已提交
1111
      goto _error;
H
hzcheng 已提交
1112 1113
    }

S
slguan 已提交
1114 1115 1116
    index = 0;
    sToken = tStrGetToken(str, &index, false, 0, NULL);
    str += index;
1117

S
slguan 已提交
1118
    if (sToken.n == 0) {
1119
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
H
Haojun Liao 已提交
1120
      goto _error;
H
hzcheng 已提交
1121
    }
H
hjxilinx 已提交
1122
    
H
hjxilinx 已提交
1123
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1124
    
S
slguan 已提交
1125
    if (sToken.type == TK_VALUES) {
H
hjxilinx 已提交
1126
      SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
H
hjxilinx 已提交
1127 1128
      
      SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1129
      tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
H
hzcheng 已提交
1130

1131
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1132
        goto _error;
H
hzcheng 已提交
1133 1134 1135 1136 1137 1138
      }

      /*
       * app here insert data in different vnodes, so we need to set the following
       * data in another submit procedure using async insert routines
       */
1139
      code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
H
hzcheng 已提交
1140
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1141
        goto _error;
H
hzcheng 已提交
1142
      }
S
slguan 已提交
1143
    } else if (sToken.type == TK_FILE) {
1144
      if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1145
        goto _error;
H
hzcheng 已提交
1146 1147
      }

S
slguan 已提交
1148 1149 1150 1151
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1152
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
H
Haojun Liao 已提交
1153
        goto _error;
H
hzcheng 已提交
1154 1155
      }

S
slguan 已提交
1156
      char fname[PATH_MAX] = {0};
S
slguan 已提交
1157
      strncpy(fname, sToken.z, sToken.n);
S
slguan 已提交
1158
      strdequote(fname);
1159

H
hzcheng 已提交
1160 1161
      wordexp_t full_path;
      if (wordexp(fname, &full_path, 0) != 0) {
H
hjxilinx 已提交
1162
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
H
Haojun Liao 已提交
1163
        goto _error;
H
hzcheng 已提交
1164 1165 1166 1167
      }
      strcpy(fname, full_path.we_wordv[0]);
      wordfree(&full_path);

H
hjxilinx 已提交
1168
      STableDataBlocks *pDataBlock = NULL;
H
hjxilinx 已提交
1169
      STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
1170
      
1171
      int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
H
hjxilinx 已提交
1172
                                       pTableMeta, &pDataBlock);
H
hjxilinx 已提交
1173
      if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1174
        goto _error;
H
hjxilinx 已提交
1175
      }
1176

1177
      taosArrayPush(pCmd->pDataBlocks, &pDataBlock);
S
slguan 已提交
1178
      strcpy(pDataBlock->filename, fname);
S
slguan 已提交
1179
    } else if (sToken.type == TK_LP) {
H
hzcheng 已提交
1180
      /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
1181
      STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
H
hjxilinx 已提交
1182
      SSchema *   pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1183

1184
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1185
        goto _error;
H
hzcheng 已提交
1186 1187
      }

1188
      SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
1189
      spd.numOfCols = tinfo.numOfColumns;
H
hzcheng 已提交
1190 1191

      int16_t offset[TSDB_MAX_COLUMNS] = {0};
H
hjxilinx 已提交
1192
      for (int32_t t = 1; t < tinfo.numOfColumns; ++t) {
H
hzcheng 已提交
1193 1194 1195 1196
        offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
      }

      while (1) {
S
slguan 已提交
1197 1198 1199 1200 1201
        index = 0;
        sToken = tStrGetToken(str, &index, false, 0, NULL);
        str += index;

        if (TK_STRING == sToken.type) {
H
Haojun Liao 已提交
1202 1203
          strdequote(sToken.z);
          sToken.n = strtrim(sToken.z);
S
slguan 已提交
1204 1205 1206
        }

        if (sToken.type == TK_RP) {
H
hzcheng 已提交
1207 1208 1209 1210 1211 1212
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
H
hjxilinx 已提交
1213
        for (int32_t t = 0; t < tinfo.numOfColumns; ++t) {
S
slguan 已提交
1214
          if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
S
slguan 已提交
1215
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
H
hzcheng 已提交
1216 1217 1218 1219
            pElem->offset = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
H
hjxilinx 已提交
1220
              code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
H
Haojun Liao 已提交
1221
              goto _error;
H
hzcheng 已提交
1222 1223 1224 1225 1226 1227 1228 1229
            }

            spd.hasVal[t] = true;
            findColumnIndex = true;
            break;
          }
        }

S
slguan 已提交
1230
        if (!findColumnIndex) {
H
hjxilinx 已提交
1231
          code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
H
Haojun Liao 已提交
1232
          goto _error;
H
hzcheng 已提交
1233 1234 1235
        }
      }

H
hjxilinx 已提交
1236
      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
H
hjxilinx 已提交
1237
        code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
H
Haojun Liao 已提交
1238
        goto _error;
H
hzcheng 已提交
1239 1240
      }

S
slguan 已提交
1241 1242 1243 1244 1245
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;

      if (sToken.type != TK_VALUES) {
H
hjxilinx 已提交
1246
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
H
Haojun Liao 已提交
1247
        goto _error;
H
hzcheng 已提交
1248 1249
      }

1250
      code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
H
hzcheng 已提交
1251
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1252
        goto _error;
H
hzcheng 已提交
1253 1254
      }
    } else {
H
hjxilinx 已提交
1255
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
Haojun Liao 已提交
1256
      goto _error;
H
hzcheng 已提交
1257 1258 1259
    }
  }

S
slguan 已提交
1260 1261 1262 1263
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }
1264

1265
  if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId
S
slguan 已提交
1266
    if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1267
      goto _error;
S
slguan 已提交
1268
    }
H
hzcheng 已提交
1269
  } else {
S
slguan 已提交
1270
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1271 1272 1273 1274 1275
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

H
Haojun Liao 已提交
1276
_error:
S
slguan 已提交
1277
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1278 1279

_clean:
1280 1281
  taosHashCleanup(pCmd->pTableList);
  pCmd->pTableList = NULL;
H
hjxilinx 已提交
1282
  
1283 1284
  pCmd->curSql    = NULL;
  pCmd->parseFinished  = 1;
H
hjxilinx 已提交
1285
  
H
hzcheng 已提交
1286 1287 1288
  return code;
}

H
Haojun Liao 已提交
1289
int tsInsertInitialCheck(SSqlObj *pSql) {
S
slguan 已提交
1290
  if (!pSql->pTscObj->writeAuth) {
1291
    return TSDB_CODE_TSC_NO_WRITE_AUTH;
S
slguan 已提交
1292
  }
H
hzcheng 已提交
1293

H
hjxilinx 已提交
1294
  int32_t  index = 0;
S
slguan 已提交
1295
  SSqlCmd *pCmd = &pSql->cmd;
1296 1297

  SSQLToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
H
hjxilinx 已提交
1298
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
1299 1300 1301

  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;
1302
  pSql->res.numOfRows = 0;
1303 1304

  SQueryInfo *pQueryInfo = NULL;
1305
  tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
1306

H
Haojun Liao 已提交
1307
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
1308 1309

  sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
S
slguan 已提交
1310
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1311
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1312
  }
1313

H
Haojun Liao 已提交
1314 1315
  pCmd->curSql = sToken.z + sToken.n;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1316 1317
}

H
Haojun Liao 已提交
1318
int tsParseSql(SSqlObj *pSql, bool initial) {
H
hzcheng 已提交
1319
  int32_t ret = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1320
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1321

H
Haojun Liao 已提交
1322
  if ((!pCmd->parseFinished) && (!initial)) {
H
Haojun Liao 已提交
1323
    tscTrace("%p resume to parse sql: %s", pSql, pCmd->curSql);
H
[TD-98]  
hjxilinx 已提交
1324
  }
L
lihui 已提交
1325
  
H
hjxilinx 已提交
1326
  if (tscIsInsertData(pSql->sqlstr)) {
H
hzcheng 已提交
1327
    /*
1328 1329
     * Set the fp before parse the sql string, in case of getTableMeta failed, in which
     * the error handle callback function can rightfully restore the user-defined callback function (fp).
H
hzcheng 已提交
1330
     */
H
Haojun Liao 已提交
1331
    if (initial && (pSql->cmd.insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
H
[TD-98]  
hjxilinx 已提交
1332
      pSql->fetchFp = pSql->fp;
H
hjxilinx 已提交
1333
      pSql->fp = (void(*)())tscHandleMultivnodeInsert;
H
hzcheng 已提交
1334
    }
H
Haojun Liao 已提交
1335
    
H
Haojun Liao 已提交
1336
    if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
H
Haojun Liao 已提交
1337 1338 1339
      return ret;
    }
    
1340
    ret = tsParseInsertSql(pSql);
H
hzcheng 已提交
1341
  } else {
S
slguan 已提交
1342
    ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
1343 1344 1345
    if (TSDB_CODE_SUCCESS != ret) {
      return ret;
    }
H
Haojun Liao 已提交
1346

1347
    SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
H
hzcheng 已提交
1348 1349 1350 1351 1352
    ret = tscToSQLCmd(pSql, &SQLInfo);
    SQLInfoDestroy(&SQLInfo);
  }

  /*
1353
   * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
H
Haojun Liao 已提交
1354 1355 1356
   * so do NOT use pRes->code to determine if the getTableMeta function
   * invokes new threads to get data from mgmt node or simply retrieves data from cache.
   * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
H
hzcheng 已提交
1357 1358 1359 1360
   */
  return ret;
}

S
slguan 已提交
1361 1362 1363
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1364

1365
  assert(pCmd->numOfClause == 1);
1366
  STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
S
slguan 已提交
1367

1368
  SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
H
hjxilinx 已提交
1369
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
1370

S
slguan 已提交
1371 1372 1373
  if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
    return code;
  }
S
slguan 已提交
1374

1375
  STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
S
slguan 已提交
1376 1377 1378 1379
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407
  return tscProcessSql(pSql);
}

typedef struct SImportFileSupport {
  SSqlObj *pSql;
  FILE    *fp;
} SImportFileSupport;

static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
  assert(param != NULL && tres != NULL);

  SSqlObj *pSql = tres;
  SSqlCmd *pCmd = &pSql->cmd;

  SImportFileSupport *pSupporter = (SImportFileSupport *) param;

  SSqlObj *pParentSql = pSupporter->pSql;
  FILE    *fp = pSupporter->fp;

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {  // handle error
    assert(taos_errno(pSql) == code);

    taos_free_result(pSql);
    tfree(pSupporter);
    fclose(fp);

    pParentSql->res.code = code;
    return;
S
slguan 已提交
1408 1409
  }

H
Haojun Liao 已提交
1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
  // accumulate the total submit records
  pParentSql->res.numOfRows += pSql->res.numOfRows;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
  SSchema *       pSchema = tscGetTableSchema(pTableMeta);
  STableComInfo   tinfo = tscGetTableInfo(pTableMeta);

  SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
  tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);

  size_t  n = 0;
  ssize_t readLen = 0;
  char *  line = NULL;
  int32_t count = 0;
  int32_t maxRows = 0;

  STableDataBlocks *pTableDataBlock = taosArrayGetP(pSql->cmd.pDataBlocks, 0);
  pTableDataBlock->size = pTableDataBlock->headerSize;

  tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
  char *tokenBuf = calloc(1, 4096);

  while ((readLen = getline(&line, &n, fp)) != -1) {
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
      line[--readLen] = 0;
    }

    if (readLen == 0) {
      continue;
    }

    char *lineptr = line;
    strtolower(line, line);

    int32_t len =
        tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf);
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
      pSql->res.code = code;
      break;
    }

    pTableDataBlock->size += len;

    if (++count >= maxRows) {
      break;
    }
  }

  tfree(tokenBuf);
  free(line);

  if (count > 0) {
    if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
      pParentSql->res.code = code;
    }

  } else {
    taos_free_result(pSql);
    tfree(pSupporter);
    fclose(fp);

    pParentSql->fp = pParentSql->fetchFp;

    // all data has been sent to vnode, call user function
    int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : pParentSql->res.numOfRows;
    (*pParentSql->fp)(pParentSql->param, pParentSql, v);
  }
S
slguan 已提交
1478 1479
}

H
Haojun Liao 已提交
1480
static UNUSED_FUNC int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
S
slguan 已提交
1481 1482
  size_t          readLen = 0;
  char *          line = NULL;
L
lihui 已提交
1483
  int32_t         maxRows = 0;
S
slguan 已提交
1484 1485 1486
  SSqlCmd *       pCmd = &pSql->cmd;
  int             numOfRows = 0;
  int32_t         code = 0;
H
Haojun Liao 已提交
1487

1488
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1489
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
1490
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
1491
  
1492 1493
  assert(pCmd->numOfClause == 1);
  
H
hjxilinx 已提交
1494
  int32_t rowSize = tinfo.rowSize;
S
slguan 已提交
1495

1496
  pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
H
hjxilinx 已提交
1497
  STableDataBlocks *pTableDataBlock = NULL;
1498 1499

  int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
H
hjxilinx 已提交
1500
  if (ret != TSDB_CODE_SUCCESS) {
1501
    return ret;
H
hjxilinx 已提交
1502
  }
1503

1504
  taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock);
S
slguan 已提交
1505

L
lihui 已提交
1506 1507
  code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows);
  if (TSDB_CODE_SUCCESS != code) return -1;
H
hzcheng 已提交
1508 1509

  int                count = 0;
H
hjxilinx 已提交
1510 1511
  SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
  SSchema *          pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1512

H
hjxilinx 已提交
1513
  tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
H
hzcheng 已提交
1514

H
Haojun Liao 已提交
1515
  size_t n = 0;
H
huili 已提交
1516
  while ((readLen = getline(&line, &n, fp)) != -1) {
H
hzcheng 已提交
1517 1518
    // line[--readLen] = '\0';
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0;
1519
    if (readLen == 0) continue;  // fang, <= to ==
H
huili 已提交
1520

S
slguan 已提交
1521
    char *lineptr = line;
H
hzcheng 已提交
1522
    strtolower(line, line);
H
hjxilinx 已提交
1523
    
H
Haojun Liao 已提交
1524
    int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tmpTokenBuf);
S
slguan 已提交
1525
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
L
[1292]  
lihui 已提交
1526
      pSql->res.code = code;
H
Haojun Liao 已提交
1527
      return code;
S
slguan 已提交
1528
    }
1529

S
slguan 已提交
1530
    pTableDataBlock->size += len;
H
hzcheng 已提交
1531 1532 1533

    count++;
    if (count >= maxRows) {
S
slguan 已提交
1534
      if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1535
        return code;
H
hzcheng 已提交
1536
      }
S
slguan 已提交
1537

1538
      pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
1539
      pTableDataBlock->size = sizeof(SSubmitBlk);
H
hjxilinx 已提交
1540
      pTableDataBlock->rowSize = tinfo.rowSize;
S
slguan 已提交
1541

H
hzcheng 已提交
1542
      numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1543
      pSql->res.numOfRows = 0;
H
hzcheng 已提交
1544 1545 1546 1547 1548
      count = 0;
    }
  }

  if (count > 0) {
S
slguan 已提交
1549
    if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1550
      return code;
H
hzcheng 已提交
1551
    }
S
slguan 已提交
1552

H
hzcheng 已提交
1553
    numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1554
    pSql->res.numOfRows = 0;
H
hzcheng 已提交
1555 1556 1557
  }

  if (line) tfree(line);
S
slguan 已提交
1558

H
hzcheng 已提交
1559 1560 1561
  return numOfRows;
}

1562
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
S
slguan 已提交
1563
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1564 1565 1566 1567
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

1568
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1569
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1570 1571
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
1572

H
Haojun Liao 已提交
1573
  assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE/* && pCmd->pDataBlocks != NULL*/);
1574
  SArray *pDataBlockList = pCmd->pDataBlocks;
H
Haojun Liao 已提交
1575
  STableDataBlocks* pDataBlock = taosArrayGetP(pDataBlockList, 0);
H
hzcheng 已提交
1576

S
slguan 已提交
1577
  char path[PATH_MAX] = {0};
H
hzcheng 已提交
1578

H
Haojun Liao 已提交
1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
  SImportFileSupport* pSupporter = calloc(1, sizeof(SImportFileSupport));
  pSupporter->pSql = pSql;

  SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);

  pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES);

  STableDataBlocks *pTableDataBlock = NULL;
  int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
  if (ret != TSDB_CODE_SUCCESS) {
//    return ret;
  }

  taosArrayPush(pNew->cmd.pDataBlocks, &pTableDataBlock);
1593

S
slguan 已提交
1594 1595 1596
    if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) {
      tscError("%p failed to malloc when insert file", pSql);
    }
H
hzcheng 已提交
1597 1598
    pCmd->count = 1;

B
Bomin Zhang 已提交
1599
    tstrncpy(path, pDataBlock->filename, sizeof(path));
S
slguan 已提交
1600 1601

    FILE *fp = fopen(path, "r");
H
hzcheng 已提交
1602
    if (fp == NULL) {
S
slguan 已提交
1603
      tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno));
H
Haojun Liao 已提交
1604
//      continue;// todo handle error
L
[1292]  
lihui 已提交
1605
    }
S
slguan 已提交
1606

H
Haojun Liao 已提交
1607
    pSupporter->fp = fp;
1608

H
Haojun Liao 已提交
1609
    parseFileSendDataBlock(pSupporter, pNew, 0);
H
hzcheng 已提交
1610
}