tscParseInsert.c 47.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

21
#include "os.h"
22

23
#include "ttype.h"
24
#include "hash.h"
H
hzcheng 已提交
25 26 27
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
28
#include "ttokendef.h"
H
hzcheng 已提交
29
#include "taosdef.h"
H
hzcheng 已提交
30

S
slguan 已提交
31
#include "tscLog.h"
H
hjxilinx 已提交
32
#include "tscSubquery.h"
H
hzcheng 已提交
33 34
#include "tstoken.h"

S
slguan 已提交
35
#include "tdataformat.h"
36

S
slguan 已提交
37
enum {
S
slguan 已提交
38 39 40 41
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

L
lihui 已提交
42
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
H
Haojun Liao 已提交
43
static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SSchema* pSchema, char* str, char** end);
H
hzcheng 已提交
44

H
Haojun Liao 已提交
45
static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
L
lihui 已提交
46
  errno = 0;
47
  *value = strtold(pToken->z, endPtr);
48 49
  
  // not a valid integer number, return error
B
Bomin Zhang 已提交
50
  if ((*endPtr - pToken->z) != pToken->n) {
51 52
    return TK_ILLEGAL;
  }
B
Bomin Zhang 已提交
53 54

  return pToken->type;
S
slguan 已提交
55
}
H
hzcheng 已提交
56

H
Haojun Liao 已提交
57
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
S
slguan 已提交
58
  int32_t   index = 0;
H
Haojun Liao 已提交
59
  SStrToken sToken;
S
slguan 已提交
60 61 62
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
63

S
slguan 已提交
64
  index = 0;
H
hzcheng 已提交
65

S
slguan 已提交
66
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
67
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
68
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
69
    // do nothing
S
slguan 已提交
70
  } else if (pToken->type == TK_INTEGER) {
S
Shengliang Guan 已提交
71
    useconds = tsosStr2int64(pToken->z);
H
hzcheng 已提交
72 73
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
dengyihao's avatar
dengyihao 已提交
74
    if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
75
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
76 77 78 79 80
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
81 82 83 84
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
85 86 87 88 89 90 91 92
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
93 94 95
   * time expression:
   * e.g., now+12a, now-5h
   */
H
Haojun Liao 已提交
96
  SStrToken valueToken;
S
slguan 已提交
97
  index = 0;
H
Haojun Liao 已提交
98
  sToken = tStrGetToken(pTokenEnd, &index, false);
S
slguan 已提交
99
  pTokenEnd += index;
100

S
slguan 已提交
101 102
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
H
Haojun Liao 已提交
103
    valueToken = tStrGetToken(pTokenEnd, &index, false);
S
slguan 已提交
104
    pTokenEnd += index;
105

S
slguan 已提交
106
    if (valueToken.n < 2) {
H
hjxilinx 已提交
107
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
108 109
    }

110
    if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
111
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
112
    }
113

H
hzcheng 已提交
114 115 116 117
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
118
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
119 120
      useconds += interval;
    } else {
D
dapan1121 已提交
121
      useconds = useconds - interval;
H
hzcheng 已提交
122 123 124 125 126 127 128 129 130
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

131 132 133 134
static bool isNullStr(SStrToken* pToken) {
  return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                                       (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
H
Haojun Liao 已提交
135 136
int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
                         int16_t timePrec) {
S
slguan 已提交
137
  int64_t iv;
138
  int32_t ret;
139
  char   *endptr = NULL;
140

141 142 143 144
  if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
    return tscInvalidSQLErrMsg(msg, "invalid numeric data", pToken->z);
  }

H
hzcheng 已提交
145 146
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
      if (isNullStr(pToken)) {
        *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL;
      } else {
        if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
          if (strncmp(pToken->z, "true", pToken->n) == 0) {
            *(uint8_t *)payload = TSDB_TRUE;
          } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
            *(uint8_t *)payload = TSDB_FALSE;
          } else {
            return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
          }
        } else if (pToken->type == TK_INTEGER) {
          iv = strtoll(pToken->z, NULL, 10);
          *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
        } else if (pToken->type == TK_FLOAT) {
          double dv = strtod(pToken->z, NULL);
          *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
H
hzcheng 已提交
164
        } else {
165
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
166 167 168 169
        }
      }
      break;
    }
170

H
hzcheng 已提交
171
    case TSDB_DATA_TYPE_TINYINT:
172
      if (isNullStr(pToken)) {
173
        *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
174
      } else {
175 176
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
177
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
178
        } else if (!IS_VALID_TINYINT(iv)) {
179
          return tscInvalidSQLErrMsg(msg, "data overflow", pToken->z);
H
hzcheng 已提交
180 181
        }

182 183 184 185 186 187
        *((uint8_t *)payload) = (uint8_t)iv;
      }

      break;

    case TSDB_DATA_TYPE_UTINYINT:
188
      if (isNullStr(pToken)) {
189 190
        *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
      } else {
191 192
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
193
          return tscInvalidSQLErrMsg(msg, "invalid unsigned tinyint data", pToken->z);
194
        } else if (!IS_VALID_UTINYINT(iv)) {
195 196 197 198
          return tscInvalidSQLErrMsg(msg, "unsigned tinyint data overflow", pToken->z);
        }

        *((uint8_t *)payload) = (uint8_t)iv;
H
hzcheng 已提交
199 200 201 202 203
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
204
      if (isNullStr(pToken)) {
S
slguan 已提交
205
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
206
      } else {
207 208
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
209
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
210
        } else if (!IS_VALID_SMALLINT(iv)) {
H
hjxilinx 已提交
211
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
212 213
        }

S
slguan 已提交
214
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
215
      }
216 217 218 219

      break;

    case TSDB_DATA_TYPE_USMALLINT:
220
      if (isNullStr(pToken)) {
221 222
        *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
      } else {
223 224
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
225
          return tscInvalidSQLErrMsg(msg, "invalid unsigned smallint data", pToken->z);
226
        } else if (!IS_VALID_USMALLINT(iv)) {
227 228 229 230 231 232
          return tscInvalidSQLErrMsg(msg, "unsigned smallint data overflow", pToken->z);
        }

        *((uint16_t *)payload) = (uint16_t)iv;
      }

H
hzcheng 已提交
233 234 235
      break;

    case TSDB_DATA_TYPE_INT:
236
      if (isNullStr(pToken)) {
S
slguan 已提交
237
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
238
      } else {
239 240
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
241
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
242
        } else if (!IS_VALID_INT(iv)) {
H
hjxilinx 已提交
243
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
244 245
        }

S
slguan 已提交
246
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
247 248 249 250
      }

      break;

251
    case TSDB_DATA_TYPE_UINT:
252
      if (isNullStr(pToken)) {
253 254
        *((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
      } else {
255 256
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
257
          return tscInvalidSQLErrMsg(msg, "invalid unsigned int data", pToken->z);
258
        } else if (!IS_VALID_UINT(iv)) {
259 260 261 262 263 264 265 266
          return tscInvalidSQLErrMsg(msg, "unsigned int data overflow", pToken->z);
        }

        *((uint32_t *)payload) = (uint32_t)iv;
      }

      break;

H
hzcheng 已提交
267
    case TSDB_DATA_TYPE_BIGINT:
268
      if (isNullStr(pToken)) {
S
slguan 已提交
269
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
270
      } else {
271 272
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
        if (ret != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
273
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
274
        } else if (!IS_VALID_BIGINT(iv)) {
H
hjxilinx 已提交
275
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
276
        }
S
slguan 已提交
277 278

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
279 280 281
      }
      break;

282
    case TSDB_DATA_TYPE_UBIGINT:
283
      if (isNullStr(pToken)) {
284 285
        *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
      } else {
286 287
        ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
        if (ret != TSDB_CODE_SUCCESS) {
288
          return tscInvalidSQLErrMsg(msg, "invalid unsigned bigint data", pToken->z);
289
        } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
290 291 292 293 294 295 296
          return tscInvalidSQLErrMsg(msg, "unsigned bigint data overflow", pToken->z);
        }

        *((uint64_t *)payload) = iv;
      }
      break;

H
hzcheng 已提交
297
    case TSDB_DATA_TYPE_FLOAT:
298
      if (isNullStr(pToken)) {
S
slguan 已提交
299
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
300
      } else {
S
slguan 已提交
301 302
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
303
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
304 305
        }

306
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
H
hjxilinx 已提交
307
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
308 309
        }

310 311
//        *((float *)payload) = (float)dv;
        SET_FLOAT_VAL(payload, dv);
H
hzcheng 已提交
312 313 314 315
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
316
      if (isNullStr(pToken)) {
S
slguan 已提交
317
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
318
      } else {
S
slguan 已提交
319 320
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
321
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
322 323
        }

324
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
H
hjxilinx 已提交
325
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
326 327
        }

328
        *((double *)payload) = dv;
H
hzcheng 已提交
329 330 331 332
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
333 334
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
335
        setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
H
hjxilinx 已提交
336
      } else { // too long values will return invalid sql, not be truncated automatically
H
hjxilinx 已提交
337
        if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
H
hjxilinx 已提交
338
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
339
        }
H
hjxilinx 已提交
340
        
341
        STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
H
hzcheng 已提交
342 343 344 345 346
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
347
      if (pToken->type == TK_NULL) {
348
        setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
349
      } else {
H
hjxilinx 已提交
350
        // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
351
        int32_t output = 0;
352
        if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
H
hjxilinx 已提交
353 354
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), "%s", strerror(errno));
H
hjxilinx 已提交
355
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
356
        }
357
        
358
        varDataSetLen(payload, output);
H
hzcheng 已提交
359 360 361 362
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
363
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
364
        if (primaryKey) {
S
slguan 已提交
365
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
366
        } else {
S
slguan 已提交
367
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
368 369
        }
      } else {
S
slguan 已提交
370 371
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
372
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
373
        }
H
hjxilinx 已提交
374
        
S
slguan 已提交
375
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
376 377 378 379 380 381
      }

      break;
    }
  }

H
hjxilinx 已提交
382
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
383 384
}

S
slguan 已提交
385 386 387 388 389 390 391 392 393 394 395 396
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

397
  if (k == INT64_MIN) {
S
slguan 已提交
398 399 400 401 402 403 404
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
405
      return -1;  // client time/server time can not be mixed
406

S
slguan 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
420 421 422 423 424 425 426 427
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, SSqlCmd *pCmd, int16_t timePrec, int32_t *len,
                  char *tmpTokenBuf) {
  int32_t    index = 0;
  SStrToken  sToken = {0};
  char      *payload = pDataBlocks->pData + pDataBlocks->size;

  SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
  SSchema *schema = tscGetTableSchema(pDataBlocks->pTableMeta);
S
slguan 已提交
428

S
slguan 已提交
429
  // 1. set the parsed value from sql string
H
hzcheng 已提交
430
  int32_t rowSize = 0;
H
Haojun Liao 已提交
431
  for (int i = 0; i < spd->numOfBound; ++i) {
S
slguan 已提交
432
    // the start position in data block buffer of current value in sql
H
Haojun Liao 已提交
433 434 435 436
    int32_t colIndex = spd->boundedColumns[i];

    char    *start = payload + spd->cols[colIndex].offset;
    SSchema *pSchema = &schema[colIndex];
S
slguan 已提交
437
    rowSize += pSchema->bytes;
H
hzcheng 已提交
438

S
slguan 已提交
439
    index = 0;
H
Haojun Liao 已提交
440
    sToken = tStrGetToken(*str, &index, true);
S
slguan 已提交
441 442 443
    *str += index;

    if (sToken.type == TK_QUESTION) {
H
Haojun Liao 已提交
444
      if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
H
Haojun Liao 已提交
445
        return tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
H
Haojun Liao 已提交
446 447
      }

S
TD-1057  
Shengliang Guan 已提交
448
      uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
S
slguan 已提交
449 450 451
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
452

H
Haojun Liao 已提交
453
      strcpy(pCmd->payload, "client out of memory");
H
Haojun Liao 已提交
454
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
455 456
    }

457 458 459
    int16_t type = sToken.type;
    if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
         type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
H
Haojun Liao 已提交
460
      return tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z);
H
hzcheng 已提交
461 462
    }

S
slguan 已提交
463 464
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
465
      // delete escape character: \\, \', \"
466
      char    delim = sToken.z[0];
L
[1292]  
lihui 已提交
467 468
      int32_t cnt = 0;
      int32_t j = 0;
S
TD-1057  
Shengliang Guan 已提交
469
      for (uint32_t k = 1; k < sToken.n - 1; ++k) {
F
fang 已提交
470 471
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
472
            cnt++;
L
lihui 已提交
473 474 475
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
476 477 478
            continue;
          }
        }
479

L
[NONE]  
lihui 已提交
480
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
481 482
        j++;
      }
483
      tmpTokenBuf[j] = 0;
L
[1292]  
lihui 已提交
484
      sToken.z = tmpTokenBuf;
485
      sToken.n -= 2 + cnt;
H
hzcheng 已提交
486 487
    }

S
slguan 已提交
488
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
H
Haojun Liao 已提交
489
    int32_t ret = tsParseOneColumn(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec);
S
slguan 已提交
490
    if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
491
      return ret;
H
hzcheng 已提交
492
    }
493

S
slguan 已提交
494
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
495
      tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z);
H
Haojun Liao 已提交
496
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
497
    }
H
hzcheng 已提交
498 499
  }

S
slguan 已提交
500
  // 2. set the null value for the columns that do not assign values
H
Haojun Liao 已提交
501
  if (spd->numOfBound < spd->numOfCols) {
S
slguan 已提交
502
    char *ptr = payload;
H
hzcheng 已提交
503 504

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
H
Haojun Liao 已提交
505
      if (!spd->cols[i].hasVal) {  // current column do not have any value to insert, set it to null
506 507 508 509 510 511 512 513 514
        if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
          varDataSetLen(ptr, sizeof(int8_t));
          *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
        } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
          varDataSetLen(ptr, sizeof(int32_t));
          *(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
        } else {
          setNull(ptr, schema[i].type, schema[i].bytes);
        }
H
hzcheng 已提交
515
      }
516
      
H
hzcheng 已提交
517 518 519
      ptr += schema[i].bytes;
    }

S
TD-1057  
Shengliang Guan 已提交
520
    rowSize = (int32_t)(ptr - payload);
H
hzcheng 已提交
521 522
  }

H
Haojun Liao 已提交
523 524
  *len = rowSize;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
525 526
}

S
slguan 已提交
527 528 529 530 531 532 533 534 535
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
536 537
}

H
Haojun Liao 已提交
538 539 540
int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SSqlCmd* pCmd, int32_t* numOfRows, char *tmpTokenBuf) {
  int32_t index = 0;
  int32_t code = 0;
H
hzcheng 已提交
541

H
Haojun Liao 已提交
542
  (*numOfRows) = 0;
H
hzcheng 已提交
543

H
Haojun Liao 已提交
544 545 546
  SStrToken sToken;

  STableMeta* pTableMeta = pDataBlock->pTableMeta;
H
hjxilinx 已提交
547
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
548 549
  
  int32_t  precision = tinfo.precision;
S
slguan 已提交
550

H
hzcheng 已提交
551
  while (1) {
S
slguan 已提交
552
    index = 0;
H
Haojun Liao 已提交
553
    sToken = tStrGetToken(*str, &index, false);
S
slguan 已提交
554
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
555

S
slguan 已提交
556
    *str += index;
H
Haojun Liao 已提交
557
    if ((*numOfRows) >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
L
lihui 已提交
558
      int32_t tSize;
H
Haojun Liao 已提交
559 560
      code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
      if (code != TSDB_CODE_SUCCESS) {  //TODO pass the correct error code to client
H
Haojun Liao 已提交
561
        strcpy(pCmd->payload, "client out of memory");
H
Haojun Liao 已提交
562
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
563
      }
H
Haojun Liao 已提交
564

L
lihui 已提交
565 566
      ASSERT(tSize > maxRows);
      maxRows = tSize;
H
hzcheng 已提交
567 568
    }

H
Haojun Liao 已提交
569 570 571 572
    int32_t len = 0;
    code = tsParseOneRow(str, pDataBlock, pCmd, precision, &len, tmpTokenBuf);
    if (code != TSDB_CODE_SUCCESS) {  // error message has been set in tsParseOneRow, return directly
      return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
H
hzcheng 已提交
573 574 575 576
    }

    pDataBlock->size += len;

S
slguan 已提交
577
    index = 0;
H
Haojun Liao 已提交
578
    sToken = tStrGetToken(*str, &index, false);
S
slguan 已提交
579 580
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
Haojun Liao 已提交
581
      tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
H
Haojun Liao 已提交
582
      code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
H
hzcheng 已提交
583 584 585
      return -1;
    }

H
Haojun Liao 已提交
586
    (*numOfRows)++;
H
hzcheng 已提交
587 588
  }

H
Haojun Liao 已提交
589
  if ((*numOfRows) <= 0) {
H
Haojun Liao 已提交
590
    strcpy(pCmd->payload, "no any data points");
H
Haojun Liao 已提交
591
    return  TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
S
slguan 已提交
592
  } else {
H
Haojun Liao 已提交
593
    return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
594 595 596
  }
}

H
Haojun Liao 已提交
597 598 599
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
  pColInfo->numOfCols = numOfCols;
  pColInfo->numOfBound = numOfCols;
H
hzcheng 已提交
600

H
Haojun Liao 已提交
601 602
  pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
  pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
H
hzcheng 已提交
603

H
Haojun Liao 已提交
604
  for (int32_t i = 0; i < pColInfo->numOfCols; ++i) {
H
hzcheng 已提交
605
    if (i > 0) {
H
Haojun Liao 已提交
606
      pColInfo->cols[i].offset = pSchema[i - 1].bytes + pColInfo->cols[i - 1].offset;
H
hzcheng 已提交
607
    }
H
Haojun Liao 已提交
608 609 610

    pColInfo->cols[i].hasVal = true;
    pColInfo->boundedColumns[i] = i;
H
hzcheng 已提交
611 612 613
  }
}

L
lihui 已提交
614
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
S
slguan 已提交
615
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
616
  const int factor = 5;
S
slguan 已提交
617
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
L
[#1102]  
lihui 已提交
618
  
H
hzcheng 已提交
619
  // expand the allocated size
S
slguan 已提交
620 621
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
622
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
623 624
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
625

S
slguan 已提交
626 627 628 629 630
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
H
hjxilinx 已提交
631
      // do nothing, if allocate more memory failed
S
slguan 已提交
632
      pDataBlock->nAllocSize = nAllocSizeOld;
L
[#1102]  
lihui 已提交
633
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
634
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
635
    }
H
hzcheng 已提交
636 637
  }

L
[#1102]  
lihui 已提交
638
  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
639
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
640 641
}

642
static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
643 644
  pBlocks->tid = pTableMeta->id.tid;
  pBlocks->uid = pTableMeta->id.uid;
H
hjxilinx 已提交
645
  pBlocks->sversion = pTableMeta->sversion;
646 647 648 649 650 651 652

  if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
    return TSDB_CODE_TSC_INVALID_SQL;
  } else {
    pBlocks->numOfRows += numOfRows;
    return TSDB_CODE_SUCCESS;
  }
H
hzcheng 已提交
653 654
}

S
slguan 已提交
655
// data block is disordered, sort it in ascending order
H
hjxilinx 已提交
656
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
657
  SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
S
slguan 已提交
658 659

  // size is less than the total size, since duplicated rows may be removed yet.
660
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
S
slguan 已提交
661

S
slguan 已提交
662 663 664 665 666
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
667
  if (!dataBuf->ordered) {
668
    char *pBlockData = pBlocks->data;
S
slguan 已提交
669
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
670

S
slguan 已提交
671 672
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
673

S
slguan 已提交
674
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
675 676
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
677

S
slguan 已提交
678 679 680 681
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
682

S
slguan 已提交
683 684 685 686 687 688 689 690 691
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
692

S
slguan 已提交
693
    pBlocks->numOfRows = i + 1;
694
    dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
695
  }
S
slguan 已提交
696 697
}

H
Haojun Liao 已提交
698 699
static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
  STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta);
H
hjxilinx 已提交
700
  
L
lihui 已提交
701
  int32_t maxNumOfRows;
H
Haojun Liao 已提交
702 703
  int32_t code = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
  if (TSDB_CODE_SUCCESS != code) {
704
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
705
  }
706

H
Haojun Liao 已提交
707
  code = TSDB_CODE_TSC_INVALID_SQL;
Y
yihaoDeng 已提交
708
  char tmpTokenBuf[16*1024] = {0};  // used for deleting Escape character: \\, \', \"
L
lihui 已提交
709

H
Haojun Liao 已提交
710 711 712
  int32_t numOfRows = 0;
  code = tsParseValues(str, dataBuf, maxNumOfRows, pCmd, &numOfRows, tmpTokenBuf);

S
slguan 已提交
713
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
714
    SParamInfo *param = dataBuf->params + i;
S
slguan 已提交
715 716
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
717
      param->offset -= sizeof(SSubmitBlk);
S
slguan 已提交
718 719 720
    }
  }

721
  SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
H
Haojun Liao 已提交
722
  code = tsSetBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows);
723 724 725 726
  if (code != TSDB_CODE_SUCCESS) {
    tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", *str);
    return code;
  }
S
slguan 已提交
727

S
slguan 已提交
728
  dataBuf->numOfTables = 1;
S
slguan 已提交
729
  *totalNum += numOfRows;
H
hzcheng 已提交
730 731 732
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
733
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundColumn) {
S
slguan 已提交
734
  int32_t   index = 0;
H
Haojun Liao 已提交
735 736
  SStrToken sToken = {0};
  SStrToken tableToken = {0};
S
slguan 已提交
737
  int32_t   code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
738

739 740 741 742 743
  const int32_t TABLE_INDEX = 0;
  const int32_t STABLE_INDEX = 1;
  
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
744

S
slguan 已提交
745
  char *sql = *sqlstr;
746

747 748
  pSql->cmd.autoCreated = false;
  
S
slguan 已提交
749 750
  // get the token of specified table
  index = 0;
H
Haojun Liao 已提交
751
  tableToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
752
  sql += index;
H
hzcheng 已提交
753

S
slguan 已提交
754 755
  // skip possibly exists column list
  index = 0;
H
Haojun Liao 已提交
756
  sToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
757 758
  sql += index;

H
hzcheng 已提交
759 760
  int32_t numOfColList = 0;

H
Haojun Liao 已提交
761
  // Bind table columns list in string, skip it and continue
S
slguan 已提交
762
  if (sToken.type == TK_LP) {
H
Haojun Liao 已提交
763 764
    *boundColumn = &sToken.z[0];

H
hzcheng 已提交
765
    while (1) {
H
Haojun Liao 已提交
766 767 768
      index = 0;
      sToken = tStrGetToken(sql, &index, false);

S
slguan 已提交
769
      if (sToken.type == TK_RP) {
H
hzcheng 已提交
770 771 772
        break;
      }

H
Haojun Liao 已提交
773
      sql += index;
H
hzcheng 已提交
774 775 776
      ++numOfColList;
    }

H
Haojun Liao 已提交
777
    sToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
778
    sql += index;
H
hzcheng 已提交
779 780
  }

H
Haojun Liao 已提交
781
  if (numOfColList == 0 && (*boundColumn) != NULL) {
782
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
783
  }
784
  
H
hjxilinx 已提交
785
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
786 787
  
  if (sToken.type == TK_USING) {  // create table if not exists according to the super table
S
slguan 已提交
788
    index = 0;
H
Haojun Liao 已提交
789
    sToken = tStrGetToken(sql, &index, false);
S
slguan 已提交
790 791
    sql += index;

H
Haojun Liao 已提交
792
    //the source super table is moved to the secondary position of the pTableMetaInfo list
793
    if (pQueryInfo->numOfTables < 2) {
H
hjxilinx 已提交
794
      tscAddEmptyMetaInfo(pQueryInfo);
795
    }
H
hzcheng 已提交
796

797 798
    STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
    code = tscSetTableFullName(pSTableMetaInfo, &sToken, pSql);
B
Bomin Zhang 已提交
799 800 801
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
802

803
    tNameExtractFullName(&pSTableMetaInfo->name, pCmd->tagData.name);
804
    pCmd->tagData.dataLen = 0;
805

806
    code = tscGetTableMeta(pSql, pSTableMetaInfo);
H
hzcheng 已提交
807 808 809 810
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

811
    if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMetaInfo)) {
H
hjxilinx 已提交
812
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
813 814
    }

815 816
    SSchema *pTagSchema = tscGetTableTagSchema(pSTableMetaInfo->pTableMeta);
    STableComInfo tinfo = tscGetTableInfo(pSTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
817
    
818
    SParsedDataColInfo spd = {0};
H
Haojun Liao 已提交
819
    tscSetBoundColumnInfo(&spd, pTagSchema, tscGetNumOfTags(pSTableMetaInfo->pTableMeta));
L
lihui 已提交
820

H
Haojun Liao 已提交
821 822 823 824 825
    index = 0;
    sToken = tStrGetToken(sql, &index, false);
    if (sToken.type != TK_TAGS && sToken.type != TK_LP) {
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
    }
L
lihui 已提交
826

H
Haojun Liao 已提交
827 828 829 830 831 832 833 834 835 836
    // parse the bound tags column
    if (sToken.type == TK_LP) {
      /*
       * insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
       * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn);
       */
      char* end = NULL;
      code = parseBoundColumns(pCmd, &spd, pTagSchema, sql, &end);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
L
lihui 已提交
837 838
      }

H
Haojun Liao 已提交
839
      sql = end;
L
lihui 已提交
840

H
Haojun Liao 已提交
841 842 843 844
      index = 0;  // keywords of "TAGS"
      sToken = tStrGetToken(sql, &index, false);
      sql += index;
    } else {
L
lihui 已提交
845
      sql += index;
H
hzcheng 已提交
846 847
    }

Y
TD-2453  
yihaoDeng 已提交
848
    index = 0;
H
Haojun Liao 已提交
849
    sToken = tStrGetToken(sql, &index, false);
Y
TD-2453  
yihaoDeng 已提交
850
    sql += index;
H
Haojun Liao 已提交
851

Y
TD-2453  
yihaoDeng 已提交
852
    if (sToken.type != TK_LP) {
H
Haojun Liao 已提交
853
      return tscInvalidSQLErrMsg(pCmd->payload, "( is expected", sToken.z);
Y
TD-2453  
yihaoDeng 已提交
854 855
    }
    
B
Bomin Zhang 已提交
856 857 858 859 860
    SKVRowBuilder kvRowBuilder = {0};
    if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
861 862
    for (int i = 0; i < spd.numOfBound; ++i) {
      SSchema* pSchema = &pTagSchema[spd.boundedColumns[i]];
863

S
slguan 已提交
864
      index = 0;
H
Haojun Liao 已提交
865
      sToken = tStrGetToken(sql, &index, true);
S
slguan 已提交
866
      sql += index;
H
Haojun Liao 已提交
867

B
Bomin Zhang 已提交
868 869 870 871 872
      if (TK_ILLEGAL == sToken.type) {
        tdDestroyKVRowBuilder(&kvRowBuilder);
        return TSDB_CODE_TSC_INVALID_SQL;
      }

H
Haojun Liao 已提交
873
      if (sToken.n == 0 || sToken.type == TK_RP) {
S
slguan 已提交
874 875
        break;
      }
H
hzcheng 已提交
876

S
slguan 已提交
877 878 879 880
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
881 882
      }

B
Bomin Zhang 已提交
883
      char tagVal[TSDB_MAX_TAGS_LEN];
H
Haojun Liao 已提交
884
      code = tsParseOneColumn(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
H
hzcheng 已提交
885
      if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
886
        tdDestroyKVRowBuilder(&kvRowBuilder);
H
hjxilinx 已提交
887
        return code;
H
hzcheng 已提交
888
      }
B
Bomin Zhang 已提交
889 890

      tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
L
lihui 已提交
891
    }
S
slguan 已提交
892

H
Haojun Liao 已提交
893 894
    tscDestroyBoundColumnInfo(&spd);

B
Bomin Zhang 已提交
895
    SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
B
Bomin Zhang 已提交
896 897
    tdDestroyKVRowBuilder(&kvRowBuilder);
    if (row == NULL) {
898
      return tscInvalidSQLErrMsg(pCmd->payload, "tag value expected", NULL);
B
Bomin Zhang 已提交
899 900
    }
    tdSortKVRowByColIdx(row);
901

902
    pCmd->tagData.dataLen = kvRowLen(row);
903 904 905 906
    if (pCmd->tagData.dataLen <= 0){
      return tscInvalidSQLErrMsg(pCmd->payload, "tag value expected", NULL);
    }
    
907
    char* pTag = realloc(pCmd->tagData.data, pCmd->tagData.dataLen);
908 909 910
    if (pTag == NULL) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
911 912

    kvRowCpy(pTag, row);
B
Bomin Zhang 已提交
913
    free(row);
914
    pCmd->tagData.data = pTag;
B
Bomin Zhang 已提交
915

L
lihui 已提交
916
    index = 0;
H
Haojun Liao 已提交
917
    sToken = tStrGetToken(sql, &index, false);
L
lihui 已提交
918 919
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
Y
TD-934  
yihaoDeng 已提交
920
      return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
921 922
    }

923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958
    /* parse columns after super table tags values.
     * insert into table_name using super_table(tag_name1, tag_name2) tags(tag_val1, tag_val2)
     * (normal_col1, normal_col2) values(normal_col1_val, normal_col2_val);
     * */
    index = 0;
    sToken = tStrGetToken(sql, &index, false);
    sql += index;
    int numOfColsAfterTags = 0;
    if (sToken.type == TK_LP) {
      if (*boundColumn != NULL) {
        return tscSQLSyntaxErrMsg(pCmd->payload, "bind columns again", sToken.z);
      } else {
        *boundColumn = &sToken.z[0];
      }

      while (1) {
        index = 0;
        sToken = tStrGetToken(sql, &index, false);

        if (sToken.type == TK_RP) {
          break;
        }

        sql += index;
        ++numOfColsAfterTags;
      }

      if (numOfColsAfterTags == 0 && (*boundColumn) != NULL) {
        return TSDB_CODE_TSC_INVALID_SQL;
      }

      sToken = tStrGetToken(sql, &index, false);
    }

    sql = sToken.z;

H
hzcheng 已提交
959
    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
960
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
961 962
    }

H
Haojun Liao 已提交
963
    int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
H
hzcheng 已提交
964 965 966 967
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

D
dapan1121 已提交
968 969 970 971
    if (sql == NULL) {
      return TSDB_CODE_TSC_INVALID_SQL;
    }

972
    code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
973
    if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
974 975 976
      return code;
    }
    
H
hzcheng 已提交
977
  } else {
H
Haojun Liao 已提交
978 979
    sql = sToken.z;

D
dapan1121 已提交
980 981 982 983
    if (sql == NULL) {
      return TSDB_CODE_TSC_INVALID_SQL;
    }

984
    code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
985
    if (pCmd->curSql == NULL) {
986
      assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
H
hjxilinx 已提交
987
    }
H
hzcheng 已提交
988 989
  }

H
Haojun Liao 已提交
990
  *sqlstr = sql;
991
  
H
hzcheng 已提交
992 993 994
  return code;
}

H
Haojun Liao 已提交
995
int validateTableName(char *tblName, int len, SStrToken* psTblToken) {
H
Haojun Liao 已提交
996
  tstrncpy(psTblToken->z, tblName, TSDB_TABLE_FNAME_LEN);
S
slguan 已提交
997

H
Hui Li 已提交
998 999
  psTblToken->n    = len;
  psTblToken->type = TK_ID;
B
Bomin Zhang 已提交
1000
  tSQLGetToken(psTblToken->z, &psTblToken->type);
S
slguan 已提交
1001

H
Hui Li 已提交
1002
  return tscValidateName(psTblToken);
H
huili 已提交
1003 1004
}

1005 1006 1007 1008 1009 1010 1011 1012 1013
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
  if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
  }

  pCmd->dataSourceType = type;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SSchema* pSchema,
    char* str, char **end) {
  pColInfo->numOfBound = 0;

  memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * pColInfo->numOfCols);
  for(int32_t i = 0; i < pColInfo->numOfCols; ++i) {
    pColInfo->cols[i].hasVal = false;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  int32_t index = 0;
  SStrToken sToken = tStrGetToken(str, &index, false);
  str += index;

  if (sToken.type != TK_LP) {
    code = tscInvalidSQLErrMsg(pCmd->payload, "( is expected", sToken.z);
    goto _clean;
  }

  while (1) {
    index = 0;
    sToken = tStrGetToken(str, &index, false);
    str += index;

    if (TK_STRING == sToken.type) {
      tscDequoteAndTrimToken(&sToken);
    }

    if (sToken.type == TK_RP) {
      if (end != NULL) {  // set the end position
        *end = str;
      }

      break;
    }

    bool findColumnIndex = false;

    // todo speedup by using hash list
    for (int32_t t = 0; t < pColInfo->numOfCols; ++t) {
      if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
        if (pColInfo->cols[t].hasVal == true) {
          code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
          goto _clean;
        }

        pColInfo->cols[t].hasVal = true;
        pColInfo->boundedColumns[pColInfo->numOfBound] = t;
        pColInfo->numOfBound += 1;
        findColumnIndex = true;
        break;
      }
    }

    if (!findColumnIndex) {
      code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column/tag name", sToken.z);
      goto _clean;
    }
  }

  memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0 , sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound));
  return TSDB_CODE_SUCCESS;

  _clean:
  pCmd->curSql     = NULL;
  pCmd->parseFinished  = 1;
  return code;
}

H
hzcheng 已提交
1084
/**
1085
 * parse insert sql
H
hzcheng 已提交
1086 1087 1088
 * @param pSql
 * @return
 */
H
Haojun Liao 已提交
1089
int tsParseInsertSql(SSqlObj *pSql) {
S
slguan 已提交
1090
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1091
  char* str = pCmd->curSql;
1092

S
slguan 已提交
1093
  int32_t totalNum = 0;
1094 1095 1096 1097 1098
  int32_t code = TSDB_CODE_SUCCESS;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo != NULL);

1099 1100 1101 1102 1103
  STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    code = terrno;
    return code;
1104
  }
H
hzcheng 已提交
1105

H
Haojun Liao 已提交
1106 1107 1108
  if ((code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    return code;
  }
H
hzcheng 已提交
1109

H
Haojun Liao 已提交
1110 1111 1112
  if (NULL == pCmd->pTableBlockHashList) {
    pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
    if (NULL == pCmd->pTableBlockHashList) {
1113
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1114
      goto _clean;
L
lihui 已提交
1115 1116
    }
  } else {
1117
    str = pCmd->curSql;
L
lihui 已提交
1118 1119
  }
  
H
Haojun Liao 已提交
1120
  tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->pTableBlockHashList);
H
hzcheng 已提交
1121 1122

  while (1) {
1123
    int32_t   index = 0;
H
Haojun Liao 已提交
1124
    SStrToken sToken = tStrGetToken(str, &index, false);
1125 1126 1127 1128 1129 1130 1131 1132

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      /*
       * if the data is from the data file, no data has been generated yet. So, there no data to
       * merge or submit, save the file path and parse the file in other routines.
       */
      if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
S
slguan 已提交
1133 1134 1135
        goto _clean;
      }

1136 1137 1138 1139 1140
      /*
       * if no data has been generated during parsing the sql string, error msg will return
       * Otherwise, create the first submit block and submit to virtual node.
       */
      if (totalNum == 0) {
1141
        code = TSDB_CODE_TSC_INVALID_SQL;
1142
        goto _clean;
1143 1144
      } else {
        break;
H
hzcheng 已提交
1145 1146 1147
      }
    }

1148
    pCmd->curSql = sToken.z;
H
Haojun Liao 已提交
1149
    char      buf[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
1150
    SStrToken sTblToken;
B
Bomin Zhang 已提交
1151
    sTblToken.z = buf;
S
slguan 已提交
1152
    // Check if the table name available or not
H
Hui Li 已提交
1153
    if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1154
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
1155
      goto _clean;
H
huili 已提交
1156 1157
    }

H
Hui Li 已提交
1158
    if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
1159
      goto _clean;
H
hzcheng 已提交
1160 1161
    }

H
Haojun Liao 已提交
1162
    char *bindedColumns = NULL;
H
Haojun Liao 已提交
1163
    if ((code = tscCheckIfCreateTable(&str, pSql, &bindedColumns)) != TSDB_CODE_SUCCESS) {
1164
      /*
H
Haojun Liao 已提交
1165 1166
       * After retrieving the table meta from server, the sql string will be parsed from the paused position.
       * And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
1167
       */
1168
      if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
H
hjxilinx 已提交
1169
        return code;
H
hzcheng 已提交
1170
      }
H
Haojun Liao 已提交
1171

D
dapan1121 已提交
1172
      tscError("0x%"PRIx64" async insert parse error, code:%s", pSql->self, tstrerror(code));
1173
      pCmd->curSql = NULL;
1174
      goto _clean;
H
hzcheng 已提交
1175 1176
    }

weixin_48148422's avatar
weixin_48148422 已提交
1177
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
1178
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
1179
      goto _clean;
H
hzcheng 已提交
1180 1181
    }

S
slguan 已提交
1182
    index = 0;
H
Haojun Liao 已提交
1183
    sToken = tStrGetToken(str, &index, false);
S
slguan 已提交
1184
    str += index;
1185

H
Haojun Liao 已提交
1186
    if (sToken.n == 0 || (sToken.type != TK_FILE && sToken.type != TK_VALUES)) {
1187
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
1188
      goto _clean;
H
hzcheng 已提交
1189 1190
    }

H
Haojun Liao 已提交
1191 1192
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
    if (sToken.type == TK_FILE) {
1193
      if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
1194
        goto _clean;
H
hzcheng 已提交
1195 1196
      }

S
slguan 已提交
1197
      index = 0;
H
Haojun Liao 已提交
1198
      sToken = tStrGetToken(str, &index, false);
1199 1200
      if (sToken.type != TK_STRING && sToken.type != TK_ID) {
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
1201
        goto _clean;
1202
      }
S
slguan 已提交
1203 1204
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1205
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
1206
        goto _clean;
H
hzcheng 已提交
1207 1208
      }

H
Haojun Liao 已提交
1209 1210
      strncpy(pCmd->payload, sToken.z, sToken.n);
      strdequote(pCmd->payload);
1211

H
Haojun Liao 已提交
1212
      // todo refactor extract method
H
hzcheng 已提交
1213
      wordexp_t full_path;
H
Haojun Liao 已提交
1214
      if (wordexp(pCmd->payload, &full_path, 0) != 0) {
H
hjxilinx 已提交
1215
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
1216
        goto _clean;
H
hzcheng 已提交
1217 1218
      }

H
Haojun Liao 已提交
1219 1220
      tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
      wordfree(&full_path);
1221

H
Haojun Liao 已提交
1222 1223 1224
    } else {
      if (bindedColumns == NULL) {
        STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
S
slguan 已提交
1225

H
Haojun Liao 已提交
1226 1227
        if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
          goto _clean;
S
slguan 已提交
1228 1229
        }

H
Haojun Liao 已提交
1230 1231 1232 1233 1234 1235
        STableDataBlocks *dataBuf = NULL;
        int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
                                              sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
                                              &dataBuf, NULL);
        if (ret != TSDB_CODE_SUCCESS) {
          goto _clean;
H
hzcheng 已提交
1236 1237
        }

H
Haojun Liao 已提交
1238 1239 1240 1241 1242 1243 1244
        code = doParseInsertStatement(pCmd, &str, dataBuf, &totalNum);
        if (code != TSDB_CODE_SUCCESS) {
          goto _clean;
        }
      } else {  // bindedColumns != NULL
        // insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
        STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
H
hzcheng 已提交
1245

H
Haojun Liao 已提交
1246 1247
        if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
          goto _clean;
H
hzcheng 已提交
1248 1249
        }

H
Haojun Liao 已提交
1250 1251 1252 1253 1254
        STableDataBlocks *dataBuf = NULL;
        int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
                                              sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
                                              &dataBuf, NULL);
        if (ret != TSDB_CODE_SUCCESS) {
1255
          goto _clean;
H
hzcheng 已提交
1256 1257
        }

H
Haojun Liao 已提交
1258 1259 1260 1261 1262
        SSchema *pSchema = tscGetTableSchema(pTableMeta);
        code = parseBoundColumns(pCmd, &dataBuf->boundColumnInfo, pSchema, bindedColumns, NULL);
        if (code != TSDB_CODE_SUCCESS) {
          goto _clean;
        }
H
hzcheng 已提交
1263

H
Haojun Liao 已提交
1264 1265 1266 1267
        if (dataBuf->boundColumnInfo.cols[0].hasVal == false) {
          code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", NULL);
          goto _clean;
        }
S
slguan 已提交
1268

H
Haojun Liao 已提交
1269 1270 1271 1272
        if (sToken.type != TK_VALUES) {
          code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
          goto _clean;
        }
H
hzcheng 已提交
1273

H
Haojun Liao 已提交
1274 1275 1276 1277
        code = doParseInsertStatement(pCmd, &str, dataBuf, &totalNum);
        if (code != TSDB_CODE_SUCCESS) {
          goto _clean;
        }
H
hzcheng 已提交
1278 1279 1280 1281
      }
    }
  }

S
slguan 已提交
1282 1283 1284 1285
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }
1286

H
Haojun Liao 已提交
1287
  if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
H
Haojun Liao 已提交
1288
    if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
1289
      goto _clean;
S
slguan 已提交
1290
    }
H
hzcheng 已提交
1291 1292 1293 1294 1295 1296
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

_clean:
H
Haojun Liao 已提交
1297
  pCmd->curSql = NULL;
1298
  pCmd->parseFinished  = 1;
H
hzcheng 已提交
1299 1300 1301
  return code;
}

H
Haojun Liao 已提交
1302
int tsInsertInitialCheck(SSqlObj *pSql) {
S
slguan 已提交
1303
  if (!pSql->pTscObj->writeAuth) {
1304
    return TSDB_CODE_TSC_NO_WRITE_AUTH;
S
slguan 已提交
1305
  }
H
hzcheng 已提交
1306

H
hjxilinx 已提交
1307
  int32_t  index = 0;
S
slguan 已提交
1308
  SSqlCmd *pCmd = &pSql->cmd;
1309

H
Haojun Liao 已提交
1310
  SStrToken sToken = tStrGetToken(pSql->sqlstr, &index, false);
H
hjxilinx 已提交
1311
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
1312 1313 1314 1315

  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;

1316
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex);
1317

H
Haojun Liao 已提交
1318
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
1319

H
Haojun Liao 已提交
1320
  sToken = tStrGetToken(pSql->sqlstr, &index, false);
S
slguan 已提交
1321
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1322
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1323
  }
1324

H
Haojun Liao 已提交
1325 1326
  pCmd->curSql = sToken.z + sToken.n;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1327 1328
}

H
Haojun Liao 已提交
1329
int tsParseSql(SSqlObj *pSql, bool initial) {
H
hzcheng 已提交
1330
  int32_t ret = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1331
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1332

H
Haojun Liao 已提交
1333
  if ((!pCmd->parseFinished) && (!initial)) {
H
Haojun Liao 已提交
1334
    tscDebug("0x%"PRIx64" resume to parse sql: %s", pSql->self, pCmd->curSql);
H
[TD-98]  
hjxilinx 已提交
1335
  }
1336

B
Bomin Zhang 已提交
1337 1338 1339 1340 1341
  ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }

H
hjxilinx 已提交
1342
  if (tscIsInsertData(pSql->sqlstr)) {
B
Bomin Zhang 已提交
1343 1344 1345 1346 1347
    if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
      return ret;
    }

    // make a backup as tsParseInsertSql may modify the string
H
Haojun Liao 已提交
1348
//    char* sqlstr = strdup(pSql->sqlstr);
B
Bomin Zhang 已提交
1349
    ret = tsParseInsertSql(pSql);
H
Haojun Liao 已提交
1350
    if (/*(sqlstr == NULL) || */(pSql->parseRetry >= 1) ||
1351
        (ret != TSDB_CODE_TSC_SQL_SYNTAX_ERROR && ret != TSDB_CODE_TSC_INVALID_SQL)) {
B
Bomin Zhang 已提交
1352
    } else {
1353
      tscResetSqlCmd(pCmd, true);
1354
      pSql->parseRetry++;
B
Bomin Zhang 已提交
1355 1356 1357
      if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
        ret = tsParseInsertSql(pSql);
      }
H
Haojun Liao 已提交
1358
    }
H
hzcheng 已提交
1359
  } else {
H
Haojun Liao 已提交
1360
    SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr);
H
hzcheng 已提交
1361
    ret = tscToSQLCmd(pSql, &SQLInfo);
1362
    if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
1363
      tscResetSqlCmd(pCmd, true);
1364
      pSql->parseRetry++;
B
Bomin Zhang 已提交
1365 1366
      ret = tscToSQLCmd(pSql, &SQLInfo);
    }
1367

H
Haojun Liao 已提交
1368
    SqlInfoDestroy(&SQLInfo);
H
hzcheng 已提交
1369 1370 1371
  }

  /*
1372
   * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
H
Haojun Liao 已提交
1373 1374 1375
   * so do NOT use pRes->code to determine if the getTableMeta function
   * invokes new threads to get data from mgmt node or simply retrieves data from cache.
   * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
H
hzcheng 已提交
1376 1377 1378 1379
   */
  return ret;
}

S
slguan 已提交
1380 1381 1382
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1383
  pSql->res.numOfRows = 0;
S
slguan 已提交
1384

1385
  assert(pCmd->numOfClause == 1);
1386
  STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
S
slguan 已提交
1387

1388
  SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
H
Haojun Liao 已提交
1389 1390
  code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1391
    return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
H
Haojun Liao 已提交
1392
  }
S
slguan 已提交
1393

H
Haojun Liao 已提交
1394
  if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1395 1396
    return code;
  }
S
slguan 已提交
1397

1398
  STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
S
slguan 已提交
1399 1400 1401 1402
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
1403 1404 1405 1406 1407 1408 1409 1410
  return tscProcessSql(pSql);
}

typedef struct SImportFileSupport {
  SSqlObj *pSql;
  FILE    *fp;
} SImportFileSupport;

H
Haojun Liao 已提交
1411
static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRows) {
H
Haojun Liao 已提交
1412 1413
  assert(param != NULL && tres != NULL);

1414 1415 1416 1417 1418 1419 1420 1421
  char *  tokenBuf = NULL;
  size_t  n = 0;
  ssize_t readLen = 0;
  char *  line = NULL;
  int32_t count = 0;
  int32_t maxRows = 0;
  FILE *  fp   = NULL;

H
Haojun Liao 已提交
1422 1423 1424
  SSqlObj *pSql = tres;
  SSqlCmd *pCmd = &pSql->cmd;

1425
  SImportFileSupport *pSupporter = (SImportFileSupport *)param;
H
Haojun Liao 已提交
1426 1427

  SSqlObj *pParentSql = pSupporter->pSql;
1428
  fp = pSupporter->fp;
H
Haojun Liao 已提交
1429

H
Haojun Liao 已提交
1430
  int32_t code = pSql->res.code;
H
Haojun Liao 已提交
1431

H
Haojun Liao 已提交
1432 1433 1434 1435 1436
  // retry parse data from file and import data from the begining again
  if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
    assert(pSql->res.numOfRows == 0);
    int32_t ret = fseek(fp, 0, SEEK_SET);
    if (ret < 0) {
D
dapan1121 已提交
1437
      tscError("0x%"PRIx64" failed to seek SEEK_SET since:%s", pSql->self, tstrerror(errno));
H
Haojun Liao 已提交
1438
      code = TAOS_SYSTEM_ERROR(errno);
1439 1440
      goto _error;
    }
H
Haojun Liao 已提交
1441 1442
  } else if (code != TSDB_CODE_SUCCESS) {
    goto _error;
S
slguan 已提交
1443 1444
  }

H
Haojun Liao 已提交
1445 1446 1447 1448 1449 1450 1451
  // accumulate the total submit records
  pParentSql->res.numOfRows += pSql->res.numOfRows;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
  STableComInfo   tinfo = tscGetTableInfo(pTableMeta);

H
Haojun Liao 已提交
1452
  destroyTableNameList(pCmd);
H
Haojun Liao 已提交
1453

1454 1455 1456 1457
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

  if (pCmd->pTableBlockHashList == NULL) {
    pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
1458
    if (pCmd->pTableBlockHashList == NULL) {
H
Haojun Liao 已提交
1459
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1460 1461
      goto _error;
    }
1462
  }
H
Haojun Liao 已提交
1463

H
Haojun Liao 已提交
1464
  STableDataBlocks *pTableDataBlock = NULL;
1465 1466
  int32_t           ret =
      tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
1467
                              tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL);
H
Haojun Liao 已提交
1468
  if (ret != TSDB_CODE_SUCCESS) {
1469 1470
    pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
1471 1472
  }

H
Haojun Liao 已提交
1473
  tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
1474 1475
  tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
  if (tokenBuf == NULL) {
H
Haojun Liao 已提交
1476
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1477 1478
    goto _error;
  }
H
Haojun Liao 已提交
1479

S
TD-1848  
Shengliang Guan 已提交
1480
  while ((readLen = tgetline(&line, &n, fp)) != -1) {
H
Haojun Liao 已提交
1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
      line[--readLen] = 0;
    }

    if (readLen == 0) {
      continue;
    }

    char *lineptr = line;
    strtolower(line, line);

H
Haojun Liao 已提交
1492
    int32_t len = 0;
H
Haojun Liao 已提交
1493
    code = tsParseOneRow(&lineptr, pTableDataBlock, pCmd, tinfo.precision, &len, tokenBuf);
H
Haojun Liao 已提交
1494
    if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
H
Haojun Liao 已提交
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
      pSql->res.code = code;
      break;
    }

    pTableDataBlock->size += len;

    if (++count >= maxRows) {
      break;
    }
  }

S
TD-1848  
Shengliang Guan 已提交
1506
  tfree(tokenBuf);
1507
  tfree(line);
H
Haojun Liao 已提交
1508

1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521
  pParentSql->res.code = code;
  if (code == TSDB_CODE_SUCCESS) {
    if (count > 0) {
      code = doPackSendDataBlock(pSql, count, pTableDataBlock);
      if (code == TSDB_CODE_SUCCESS) {
        return;
      } else {
        goto _error;
      }
    } else {
      taos_free_result(pSql);
      tfree(pSupporter);
      fclose(fp);
H
Haojun Liao 已提交
1522

1523 1524 1525
      pParentSql->fp = pParentSql->fetchFp;

      // all data has been sent to vnode, call user function
H
Haojun Liao 已提交
1526
      int32_t v = (code != TSDB_CODE_SUCCESS) ? code : (int32_t)pParentSql->res.numOfRows;
1527
      (*pParentSql->fp)(pParentSql->param, pParentSql, v);
H
Haojun Liao 已提交
1528
      return;
H
Haojun Liao 已提交
1529
    }
1530
  }
H
Haojun Liao 已提交
1531

1532 1533 1534 1535 1536 1537
_error:
  tfree(tokenBuf);
  tfree(line);
  taos_free_result(pSql);
  tfree(pSupporter);
  fclose(fp);
H
Haojun Liao 已提交
1538

1539
  tscAsyncResultOnError(pParentSql);
S
slguan 已提交
1540 1541
}

H
Haojun Liao 已提交
1542
void tscImportDataFromFile(SSqlObj *pSql) {
S
slguan 已提交
1543
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1544 1545 1546 1547
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

H
Haojun Liao 已提交
1548
  assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE  && strlen(pCmd->payload) != 0);
H
Haojun Liao 已提交
1549

H
Haojun Liao 已提交
1550
  SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
H
Haojun Liao 已提交
1551
  SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
H
Haojun Liao 已提交
1552
  pCmd->count = 1;
H
Haojun Liao 已提交
1553

S
TD-1207  
Shengliang Guan 已提交
1554
  FILE *fp = fopen(pCmd->payload, "rb");
H
Haojun Liao 已提交
1555 1556
  if (fp == NULL) {
    pSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
dapan1121 已提交
1557
    tscError("0x%"PRIx64" failed to open file %s to load data from file, code:%s", pSql->self, pCmd->payload, tstrerror(pSql->res.code));
H
hzcheng 已提交
1558

H
Haojun Liao 已提交
1559
    tfree(pSupporter);
Y
TD-2453  
yihaoDeng 已提交
1560
    taos_free_result(pNew);
H
Haojun Liao 已提交
1561
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
1562 1563
    return;
  }
S
slguan 已提交
1564

H
Haojun Liao 已提交
1565
  pSupporter->pSql = pSql;
H
Haojun Liao 已提交
1566
  pSupporter->fp   = fp;
1567

H
Haojun Liao 已提交
1568
  parseFileSendDataBlock(pSupporter, pNew, TSDB_CODE_SUCCESS);
H
hzcheng 已提交
1569
}