tscParseInsert.c 48.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE /* See feature_test_macros(7) */
#define _GNU_SOURCE

#define _XOPEN_SOURCE

21
#include "hash.h"
22
#include "os.h"
H
hzcheng 已提交
23 24 25 26
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
27
#include "ttokendef.h"
H
hzcheng 已提交
28
#include "taosdef.h"
H
hzcheng 已提交
29 30

#include "tlog.h"
H
hjxilinx 已提交
31
#include "tscSubquery.h"
H
hzcheng 已提交
32 33 34
#include "tstoken.h"
#include "ttime.h"

S
slguan 已提交
35
enum {
S
slguan 已提交
36 37 38 39
  TSDB_USE_SERVER_TS = 0,
  TSDB_USE_CLI_TS = 1,
};

L
lihui 已提交
40
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
H
hzcheng 已提交
41

S
slguan 已提交
42 43 44 45
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
  int32_t numType = isValidNumber(pToken);
  if (TK_ILLEGAL == numType) {
    return numType;
H
hzcheng 已提交
46 47 48 49 50 51 52
  }

  int32_t radix = 10;
  if (numType == TK_HEX) {
    radix = 16;
  } else if (numType == TK_OCT) {
    radix = 8;
S
slguan 已提交
53 54
  } else if (numType == TK_BIN) {
    radix = 2;
H
hzcheng 已提交
55 56
  }

L
lihui 已提交
57
  errno = 0;
S
slguan 已提交
58 59 60
  *value = strtoll(pToken->z, endPtr, radix);

  return numType;
H
hzcheng 已提交
61 62
}

S
slguan 已提交
63 64 65 66 67
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
  int32_t numType = isValidNumber(pToken);
  if (TK_ILLEGAL == numType) {
    return numType;
  }
L
lihui 已提交
68 69

  errno = 0;
S
slguan 已提交
70 71 72
  *value = strtod(pToken->z, endPtr);
  return numType;
}
H
hzcheng 已提交
73

S
slguan 已提交
74 75 76 77 78 79
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
  int32_t   index = 0;
  SSQLToken sToken;
  int64_t   interval;
  int64_t   useconds = 0;
  char *    pTokenEnd = *next;
H
hzcheng 已提交
80

S
slguan 已提交
81
  index = 0;
H
hzcheng 已提交
82

S
slguan 已提交
83
  if (pToken->type == TK_NOW) {
H
hzcheng 已提交
84
    useconds = taosGetTimestamp(timePrec);
S
slguan 已提交
85
  } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
H
hzcheng 已提交
86
    // do nothing
S
slguan 已提交
87 88
  } else if (pToken->type == TK_INTEGER) {
    useconds = str2int64(pToken->z);
H
hzcheng 已提交
89 90
  } else {
    // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
S
slguan 已提交
91
    if (taosParseTime(pToken->z, time, pToken->n, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
92
      return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
H
hzcheng 已提交
93 94 95 96 97
    }

    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
98 99 100 101
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
    if (pToken->z[k] == ',') {
      *next = pTokenEnd;
H
hzcheng 已提交
102 103 104 105 106 107 108 109
      *time = useconds;
      return 0;
    }

    break;
  }

  /*
S
slguan 已提交
110 111 112
   * time expression:
   * e.g., now+12a, now-5h
   */
S
slguan 已提交
113 114 115 116
  SSQLToken valueToken;
  index = 0;
  sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
  pTokenEnd += index;
117

S
slguan 已提交
118 119 120 121
  if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
    index = 0;
    valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
    pTokenEnd += index;
122

S
slguan 已提交
123
    if (valueToken.n < 2) {
H
hjxilinx 已提交
124
      return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
H
hzcheng 已提交
125 126
    }

S
slguan 已提交
127
    if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
128 129
      return TSDB_CODE_INVALID_SQL;
    }
130

H
hzcheng 已提交
131 132 133 134
    if (timePrec == TSDB_TIME_PRECISION_MILLI) {
      interval /= 1000;
    }

S
slguan 已提交
135
    if (sToken.type == TK_PLUS) {
H
hzcheng 已提交
136 137 138 139 140 141 142 143 144 145 146 147
      useconds += interval;
    } else {
      useconds = (useconds >= interval) ? useconds - interval : 0;
    }

    *next = pTokenEnd;
  }

  *time = useconds;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
148
int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
S
slguan 已提交
149 150 151
                             int16_t timePrec) {
  int64_t iv;
  int32_t numType;
S
slguan 已提交
152
  char *  endptr = NULL;
153 154
  errno = 0;  // clear the previous existed error information

H
hzcheng 已提交
155 156
  switch (pSchema->type) {
    case TSDB_DATA_TYPE_BOOL: {  // bool
S
slguan 已提交
157 158
      if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
        if (strncmp(pToken->z, "true", pToken->n) == 0) {
S
slguan 已提交
159
          *(uint8_t *)payload = TSDB_TRUE;
S
slguan 已提交
160
        } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
S
slguan 已提交
161
          *(uint8_t *)payload = TSDB_FALSE;
S
slguan 已提交
162 163
        } else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
          *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
H
hzcheng 已提交
164
        } else {
H
hjxilinx 已提交
165
          return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
166
        }
S
slguan 已提交
167 168 169 170 171 172 173 174 175
      } else if (pToken->type == TK_INTEGER) {
        iv = strtoll(pToken->z, NULL, 10);
        *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_FLOAT) {
        double dv = strtod(pToken->z, NULL);
        *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
      } else if (pToken->type == TK_NULL) {
        *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
      } else {
H
hjxilinx 已提交
176
        return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
H
hzcheng 已提交
177 178 179 180
      }
      break;
    }
    case TSDB_DATA_TYPE_TINYINT:
S
slguan 已提交
181 182 183 184 185
      if (pToken->type == TK_NULL) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
H
hzcheng 已提交
186
      } else {
S
slguan 已提交
187
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
188
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
189
          return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
L
lihui 已提交
190
        } else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
H
hjxilinx 已提交
191
          return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
H
hzcheng 已提交
192 193
        }

194
        *((int8_t *)payload) = (int8_t)iv;
H
hzcheng 已提交
195 196 197 198 199
      }

      break;

    case TSDB_DATA_TYPE_SMALLINT:
S
slguan 已提交
200 201 202 203 204
      if (pToken->type == TK_NULL) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
        *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
H
hzcheng 已提交
205
      } else {
S
slguan 已提交
206
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
207
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
208
          return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
L
lihui 已提交
209
        } else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
H
hjxilinx 已提交
210
          return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
H
hzcheng 已提交
211 212
        }

S
slguan 已提交
213
        *((int16_t *)payload) = (int16_t)iv;
H
hzcheng 已提交
214 215 216 217
      }
      break;

    case TSDB_DATA_TYPE_INT:
S
slguan 已提交
218 219 220 221
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
222
        *((int32_t *)payload) = TSDB_DATA_INT_NULL;
H
hzcheng 已提交
223
      } else {
S
slguan 已提交
224
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
225
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
226
          return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
L
lihui 已提交
227
        } else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
H
hjxilinx 已提交
228
          return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
H
hzcheng 已提交
229 230
        }

S
slguan 已提交
231
        *((int32_t *)payload) = (int32_t)iv;
H
hzcheng 已提交
232 233 234 235 236
      }

      break;

    case TSDB_DATA_TYPE_BIGINT:
S
slguan 已提交
237 238 239 240
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
241
        *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
242
      } else {
S
slguan 已提交
243
        numType = tscToInteger(pToken, &iv, &endptr);
L
lihui 已提交
244
        if (TK_ILLEGAL == numType) {
H
hjxilinx 已提交
245
          return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
L
lihui 已提交
246
        } else if (errno == ERANGE || iv > INT64_MAX || iv <= INT64_MIN) {
H
hjxilinx 已提交
247
          return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
H
hzcheng 已提交
248
        }
S
slguan 已提交
249 250

        *((int64_t *)payload) = iv;
H
hzcheng 已提交
251 252 253 254
      }
      break;

    case TSDB_DATA_TYPE_FLOAT:
S
slguan 已提交
255 256 257 258
      if (pToken->type == TK_NULL) {
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
259
        *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
260
      } else {
S
slguan 已提交
261 262
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
263
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
S
slguan 已提交
264 265 266 267
        }

        float fv = (float)dv;
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
H
hjxilinx 已提交
268
          return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
H
hzcheng 已提交
269 270
        }

S
slguan 已提交
271 272
        if (isinf(fv) || isnan(fv)) {
          *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
H
hzcheng 已提交
273
        }
S
slguan 已提交
274 275

        *((float *)payload) = fv;
H
hzcheng 已提交
276 277 278 279
      }
      break;

    case TSDB_DATA_TYPE_DOUBLE:
S
slguan 已提交
280 281 282 283
      if (pToken->type == TK_NULL) {
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
      } else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
                 (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
S
slguan 已提交
284
        *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
H
hzcheng 已提交
285
      } else {
S
slguan 已提交
286 287
        double dv;
        if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
H
hjxilinx 已提交
288
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
H
hzcheng 已提交
289 290
        }

S
slguan 已提交
291
        if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
H
hjxilinx 已提交
292
          return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
S
slguan 已提交
293 294 295 296 297 298
        }

        if (isinf(dv) || isnan(dv)) {
          *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
        } else {
          *((double *)payload) = dv;
H
hzcheng 已提交
299 300 301 302 303
        }
      }
      break;

    case TSDB_DATA_TYPE_BINARY:
S
slguan 已提交
304 305
      // binary data cannot be null-terminated char string, otherwise the last char of the string is lost
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
306
        *payload = TSDB_DATA_BINARY_NULL;
H
hjxilinx 已提交
307
      } else { // too long values will return invalid sql, not be truncated automatically
S
slguan 已提交
308
        if (pToken->n > pSchema->bytes) {
H
hjxilinx 已提交
309
          return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
S
slguan 已提交
310
        }
H
hjxilinx 已提交
311
        
S
slguan 已提交
312
        strncpy(payload, pToken->z, pToken->n);
313 314 315 316
        
        if (pToken->n < pSchema->bytes) {
          payload[pToken->n] = 0;   // add the null-terminated char if the length of the string is shorter than the available space
        }
H
hzcheng 已提交
317 318 319 320 321
      }

      break;

    case TSDB_DATA_TYPE_NCHAR:
S
slguan 已提交
322
      if (pToken->type == TK_NULL) {
S
slguan 已提交
323
        *(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
H
hzcheng 已提交
324
      } else {
H
hjxilinx 已提交
325
        // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
S
slguan 已提交
326
        if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) {
H
hjxilinx 已提交
327 328 329 330
          char buf[512] = {0};
          snprintf(buf, 512, "%s", strerror(errno));
          
          return tscInvalidSQLErrMsg(msg, buf, pToken->z);
H
hzcheng 已提交
331 332 333 334 335
        }
      }
      break;

    case TSDB_DATA_TYPE_TIMESTAMP: {
S
slguan 已提交
336
      if (pToken->type == TK_NULL) {
H
hzcheng 已提交
337
        if (primaryKey) {
S
slguan 已提交
338
          *((int64_t *)payload) = 0;
H
hzcheng 已提交
339
        } else {
S
slguan 已提交
340
          *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
H
hzcheng 已提交
341 342
        }
      } else {
S
slguan 已提交
343 344
        int64_t temp;
        if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
345
          return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
H
hzcheng 已提交
346
        }
H
hjxilinx 已提交
347
        
S
slguan 已提交
348
        *((int64_t *)payload) = temp;
H
hzcheng 已提交
349 350 351 352 353 354
      }

      break;
    }
  }

H
hjxilinx 已提交
355
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
356 357
}

S
slguan 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
/*
 * The server time/client time should not be mixed up in one sql string
 * Do not employ sort operation is not involved if server time is used.
 */
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
  // once the data block is disordered, we do NOT keep previous timestamp any more
  if (!pDataBlocks->ordered) {
    return TSDB_CODE_SUCCESS;
  }

  TSKEY k = *(TSKEY *)start;

  if (k == 0) {
    if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
      return -1;
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
    }
  } else {
    if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
H
hjxilinx 已提交
378
      return -1;  // client time/server time can not be mixed
379

S
slguan 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392
    } else if (pDataBlocks->tsSource == -1) {
      pDataBlocks->tsSource = TSDB_USE_CLI_TS;
    }
  }

  if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
    pDataBlocks->ordered = false;
  }

  pDataBlocks->prevTS = k;
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
393
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
394 395 396
                      int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
  int32_t index = 0;
  // bool      isPrevOptr; //fang, never used
H
hjxilinx 已提交
397
  SSQLToken sToken = {0};
S
slguan 已提交
398
  char *    payload = pDataBlocks->pData + pDataBlocks->size;
S
slguan 已提交
399

S
slguan 已提交
400
  // 1. set the parsed value from sql string
H
hzcheng 已提交
401
  int32_t rowSize = 0;
402
  for (int i = 0; i < spd->numOfAssignedCols; ++i) {
S
slguan 已提交
403
    // the start position in data block buffer of current value in sql
404 405
    char *   start = payload + spd->elems[i].offset;
    int16_t  colIndex = spd->elems[i].colIndex;
S
slguan 已提交
406
    SSchema *pSchema = schema + colIndex;
S
slguan 已提交
407
    rowSize += pSchema->bytes;
H
hzcheng 已提交
408

S
slguan 已提交
409 410 411 412 413 414 415 416 417
    index = 0;
    sToken = tStrGetToken(*str, &index, true, 0, NULL);
    *str += index;

    if (sToken.type == TK_QUESTION) {
      uint32_t offset = start - pDataBlocks->pData;
      if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
        continue;
      }
418

S
slguan 已提交
419
      strcpy(error, "client out of memory");
L
[1292]  
lihui 已提交
420
      *code = TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
421 422 423
      return -1;
    }

S
slguan 已提交
424 425 426
    if (((sToken.type != TK_NOW) && (sToken.type != TK_INTEGER) && (sToken.type != TK_STRING) &&
         (sToken.type != TK_FLOAT) && (sToken.type != TK_BOOL) && (sToken.type != TK_NULL)) ||
        (sToken.n == 0) || (sToken.type == TK_RP)) {
H
hjxilinx 已提交
427
      tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z);
L
[1292]  
lihui 已提交
428
      *code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
429
      return -1;
H
hzcheng 已提交
430 431
    }

S
slguan 已提交
432 433
    // Remove quotation marks
    if (TK_STRING == sToken.type) {
L
[1292]  
lihui 已提交
434
      // delete escape character: \\, \', \"
435
      char    delim = sToken.z[0];
L
[1292]  
lihui 已提交
436 437
      int32_t cnt = 0;
      int32_t j = 0;
438
      for (int32_t k = 1; k < sToken.n - 1; ++k) {
F
fang 已提交
439 440
        if (sToken.z[k] == delim || sToken.z[k] == '\\') {
          if (sToken.z[k + 1] == delim) {
L
[1292]  
lihui 已提交
441
            cnt++;
L
lihui 已提交
442 443 444
            tmpTokenBuf[j] = sToken.z[k + 1];
            j++;
            k++;
L
[1292]  
lihui 已提交
445 446 447
            continue;
          }
        }
448

L
[NONE]  
lihui 已提交
449
        tmpTokenBuf[j] = sToken.z[k];
L
[1292]  
lihui 已提交
450 451
        j++;
      }
452
      tmpTokenBuf[j] = 0;
L
[1292]  
lihui 已提交
453
      sToken.z = tmpTokenBuf;
454
      sToken.n -= 2 + cnt;
H
hzcheng 已提交
455 456
    }

S
slguan 已提交
457 458
    bool    isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
    int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
S
slguan 已提交
459
    if (ret != TSDB_CODE_SUCCESS) {
L
[1292]  
lihui 已提交
460
      *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
461 462
      return -1;  // NOTE: here 0 mean error!
    }
463

S
slguan 已提交
464
    if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
465
      tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
L
[1292]  
lihui 已提交
466
      *code = TSDB_CODE_INVALID_TIME_STAMP;
S
slguan 已提交
467
      return -1;
468
    }
H
hzcheng 已提交
469 470
  }

S
slguan 已提交
471
  // 2. set the null value for the columns that do not assign values
472
  if (spd->numOfAssignedCols < spd->numOfCols) {
S
slguan 已提交
473
    char *ptr = payload;
H
hzcheng 已提交
474 475

    for (int32_t i = 0; i < spd->numOfCols; ++i) {
476
      if (!spd->hasVal[i]) {  // current column do not have any value to insert, set it to null
H
hzcheng 已提交
477 478 479 480 481 482 483 484 485 486 487 488
        setNull(ptr, schema[i].type, schema[i].bytes);
      }

      ptr += schema[i].bytes;
    }

    rowSize = ptr - payload;
  }

  return rowSize;
}

S
slguan 已提交
489 490 491 492 493 494 495 496 497
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
  TSKEY left = *(TSKEY *)lhs;
  TSKEY right = *(TSKEY *)rhs;

  if (left == right) {
    return 0;
  } else {
    return left > right ? 1 : -1;
  }
498 499
}

H
hjxilinx 已提交
500
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows,
501
                  SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
S
slguan 已提交
502 503
  int32_t   index = 0;
  SSQLToken sToken;
H
hzcheng 已提交
504 505 506

  int16_t numOfRows = 0;

H
hjxilinx 已提交
507
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hjxilinx 已提交
508
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
509 510
  
  int32_t  precision = tinfo.precision;
S
slguan 已提交
511 512

  if (spd->hasVal[0] == false) {
S
slguan 已提交
513
    strcpy(error, "primary timestamp column can not be null");
L
[1292]  
lihui 已提交
514
    *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
515 516 517 518
    return -1;
  }

  while (1) {
S
slguan 已提交
519 520 521
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    if (sToken.n == 0 || sToken.type != TK_LP) break;
H
hzcheng 已提交
522

S
slguan 已提交
523
    *str += index;
H
hjxilinx 已提交
524
    if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
L
lihui 已提交
525
      int32_t tSize;
H
hjxilinx 已提交
526
      int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
L
lihui 已提交
527
      if (retcode != TSDB_CODE_SUCCESS) {  //TODO pass the correct error code to client
S
slguan 已提交
528
        strcpy(error, "client out of memory");
L
lihui 已提交
529
        *code = retcode;
S
slguan 已提交
530 531
        return -1;
      }
L
lihui 已提交
532 533
      ASSERT(tSize > maxRows);
      maxRows = tSize;
H
hzcheng 已提交
534 535
    }

L
[1292]  
lihui 已提交
536
    int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
537
    if (len <= 0) {  // error message has been set in tsParseOneRowData
H
hzcheng 已提交
538 539 540 541 542
      return -1;
    }

    pDataBlock->size += len;

S
slguan 已提交
543 544 545 546
    index = 0;
    sToken = tStrGetToken(*str, &index, false, 0, NULL);
    *str += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
H
hjxilinx 已提交
547
      tscInvalidSQLErrMsg(error, ") expected", *str);
L
[1292]  
lihui 已提交
548
      *code = TSDB_CODE_INVALID_SQL;
H
hzcheng 已提交
549 550 551 552 553 554 555 556
      return -1;
    }

    numOfRows++;
  }

  if (numOfRows <= 0) {
    strcpy(error, "no any data points");
L
[1292]  
lihui 已提交
557
    *code = TSDB_CODE_INVALID_SQL;
S
slguan 已提交
558 559 560
    return -1;
  } else {
    return numOfRows;
H
hzcheng 已提交
561 562 563
  }
}

S
slguan 已提交
564
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) {
H
hzcheng 已提交
565
  spd->numOfCols = numOfCols;
566
  spd->numOfAssignedCols = numOfCols;
H
hzcheng 已提交
567 568 569 570 571 572 573 574 575 576 577

  for (int32_t i = 0; i < numOfCols; ++i) {
    spd->hasVal[i] = true;
    spd->elems[i].colIndex = i;

    if (i > 0) {
      spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes;
    }
  }
}

L
lihui 已提交
578
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
S
slguan 已提交
579
  size_t    remain = pDataBlock->nAllocSize - pDataBlock->size;
S
slguan 已提交
580
  const int factor = 5;
S
slguan 已提交
581
  uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
L
[#1102]  
lihui 已提交
582 583
  assert(pDataBlock->headerSize >= 0);
  
H
hzcheng 已提交
584
  // expand the allocated size
S
slguan 已提交
585 586
  if (remain < rowSize * factor) {
    while (remain < rowSize * factor) {
S
slguan 已提交
587
      pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
S
slguan 已提交
588 589
      remain = pDataBlock->nAllocSize - pDataBlock->size;
    }
H
hzcheng 已提交
590

S
slguan 已提交
591 592 593 594 595
    char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
    if (tmp != NULL) {
      pDataBlock->pData = tmp;
      memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
    } else {
H
hjxilinx 已提交
596
      // do nothing, if allocate more memory failed
S
slguan 已提交
597
      pDataBlock->nAllocSize = nAllocSizeOld;
L
[#1102]  
lihui 已提交
598
      *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
599
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
600
    }
H
hzcheng 已提交
601 602
  }

L
[#1102]  
lihui 已提交
603
  *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
L
lihui 已提交
604
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
605 606
}

H
hjxilinx 已提交
607 608 609 610
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
  pBlocks->sid = pTableMeta->sid;
  pBlocks->uid = pTableMeta->uid;
  pBlocks->sversion = pTableMeta->sversion;
S
slguan 已提交
611
  pBlocks->numOfRows += numOfRows;
H
hzcheng 已提交
612 613
}

S
slguan 已提交
614 615
// data block is disordered, sort it in ascending order
void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
S
slguan 已提交
616
  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)dataBuf->pData;
S
slguan 已提交
617 618 619 620

  // size is less than the total size, since duplicated rows may be removed yet.
  assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);

S
slguan 已提交
621 622 623 624 625
  // if use server time, this block must be ordered
  if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
    assert(dataBuf->ordered);
  }

S
slguan 已提交
626
  if (!dataBuf->ordered) {
S
slguan 已提交
627 628
    char *pBlockData = pBlocks->payLoad;
    qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
H
hzcheng 已提交
629

S
slguan 已提交
630 631
    int32_t i = 0;
    int32_t j = 1;
H
hzcheng 已提交
632

S
slguan 已提交
633
    while (j < pBlocks->numOfRows) {
S
slguan 已提交
634 635
      TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
      TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
H
hzcheng 已提交
636

S
slguan 已提交
637 638 639 640
      if (ti == tj) {
        ++j;
        continue;
      }
H
hzcheng 已提交
641

S
slguan 已提交
642 643 644 645 646 647 648 649 650
      int32_t nextPos = (++i);
      if (nextPos != j) {
        memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
      }

      ++j;
    }

    dataBuf->ordered = true;
H
hzcheng 已提交
651

S
slguan 已提交
652
    pBlocks->numOfRows = i + 1;
S
slguan 已提交
653
    dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize * pBlocks->numOfRows;
S
slguan 已提交
654
  }
S
slguan 已提交
655 656 657 658
}

static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
                                      int32_t *totalNum) {
S
slguan 已提交
659
  SSqlCmd *       pCmd = &pSql->cmd;
660
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
661
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
662
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
663
  
H
hjxilinx 已提交
664
  STableDataBlocks *dataBuf = NULL;
H
hjxilinx 已提交
665 666 667
  int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
                                        sizeof(SShellSubmitBlock), tinfo.rowSize, pTableMetaInfo->name,
                                        pTableMeta, &dataBuf);
H
hjxilinx 已提交
668 669 670 671
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  
L
lihui 已提交
672
  int32_t maxNumOfRows;
H
hjxilinx 已提交
673
  ret = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
L
lihui 已提交
674
  if (TSDB_CODE_SUCCESS != ret) {
S
slguan 已提交
675 676
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
677

L
[1292]  
lihui 已提交
678
  int32_t code = TSDB_CODE_INVALID_SQL;
679
  char *  tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
L
[1292]  
lihui 已提交
680 681 682
  if (NULL == tmpTokenBuf) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
L
lihui 已提交
683

H
hjxilinx 已提交
684
  int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
L
[1292]  
lihui 已提交
685
  free(tmpTokenBuf);
H
hzcheng 已提交
686
  if (numOfRows <= 0) {
L
[1292]  
lihui 已提交
687
    return code;
H
hzcheng 已提交
688 689
  }

S
slguan 已提交
690
  for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
691
    SParamInfo *param = dataBuf->params + i;
S
slguan 已提交
692 693 694 695 696 697
    if (param->idx == -1) {
      param->idx = pCmd->numOfParams++;
      param->offset -= sizeof(SShellSubmitBlock);
    }
  }

S
slguan 已提交
698
  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData);
H
hjxilinx 已提交
699
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
700

H
hjxilinx 已提交
701
  dataBuf->vgId = pTableMeta->vgId;
S
slguan 已提交
702
  dataBuf->numOfTables = 1;
H
hzcheng 已提交
703 704

  /*
S
slguan 已提交
705 706
   * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
   * which is actually returned from server.
H
hzcheng 已提交
707
   */
S
slguan 已提交
708
  *totalNum += numOfRows;
H
hzcheng 已提交
709 710 711
  return TSDB_CODE_SUCCESS;
}

712
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
S
slguan 已提交
713
  int32_t   index = 0;
714 715
  SSQLToken sToken = {0};
  SSQLToken tableToken = {0};
S
slguan 已提交
716
  int32_t   code = TSDB_CODE_SUCCESS;
717 718 719 720 721 722
  
  const int32_t TABLE_INDEX = 0;
  const int32_t STABLE_INDEX = 1;
  
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
723

S
slguan 已提交
724
  char *sql = *sqlstr;
725

S
slguan 已提交
726 727 728 729
  // get the token of specified table
  index = 0;
  tableToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;
H
hzcheng 已提交
730

S
slguan 已提交
731 732
  char *cstart = NULL;
  char *cend = NULL;
H
hzcheng 已提交
733

S
slguan 已提交
734 735 736 737 738
  // skip possibly exists column list
  index = 0;
  sToken = tStrGetToken(sql, &index, false, 0, NULL);
  sql += index;

H
hzcheng 已提交
739
  int32_t numOfColList = 0;
S
slguan 已提交
740
  bool    createTable = false;
H
hzcheng 已提交
741

S
slguan 已提交
742 743 744
  if (sToken.type == TK_LP) {
    cstart = &sToken.z[0];
    index = 0;
H
hzcheng 已提交
745
    while (1) {
S
slguan 已提交
746 747 748
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      if (sToken.type == TK_RP) {
        cend = &sToken.z[0];
H
hzcheng 已提交
749 750 751 752 753 754
        break;
      }

      ++numOfColList;
    }

S
slguan 已提交
755 756
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
H
hzcheng 已提交
757 758 759 760 761
  }

  if (numOfColList == 0 && cstart != NULL) {
    return TSDB_CODE_INVALID_SQL;
  }
762
  
H
hjxilinx 已提交
763
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
764 765
  
  if (sToken.type == TK_USING) {  // create table if not exists according to the super table
S
slguan 已提交
766 767 768 769
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;

S
slguan 已提交
770
    STagData *pTag = (STagData *)pCmd->payload;
S
slguan 已提交
771
    memset(pTag, 0, sizeof(STagData));
772 773
    
    /*
H
hjxilinx 已提交
774
     * the source super table is moved to the secondary position of the pTableMetaInfo list
775 776
     */
    if (pQueryInfo->numOfTables < 2) {
H
hjxilinx 已提交
777
      tscAddEmptyMetaInfo(pQueryInfo);
778
    }
H
hzcheng 已提交
779

H
hjxilinx 已提交
780
    STableMetaInfo *pSTableMeterMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
781 782
    setMeterID(pSTableMeterMetaInfo, &sToken, pSql);

S
slguan 已提交
783
    strncpy(pTag->name, pSTableMeterMetaInfo->name, TSDB_TABLE_ID_LEN);
H
hjxilinx 已提交
784
    code = tscGetTableMeta(pSql, pSTableMeterMetaInfo);
H
hzcheng 已提交
785 786 787 788
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

H
hjxilinx 已提交
789
    if (!UTIL_TABLE_IS_SUPERTABLE(pSTableMeterMetaInfo)) {
H
hjxilinx 已提交
790
      return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
S
slguan 已提交
791 792
    }

H
hjxilinx 已提交
793
    SSchema *pTagSchema = tscGetTableTagSchema(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
794
    STableComInfo tinfo = tscGetTableInfo(pSTableMeterMetaInfo->pTableMeta);
H
hjxilinx 已提交
795
    
S
slguan 已提交
796 797 798
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
L
lihui 已提交
799

800
    SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
801 802
    
    uint8_t numOfTags = tscGetNumOfTags(pSTableMeterMetaInfo->pTableMeta);
L
lihui 已提交
803 804 805 806 807 808
    spd.numOfCols = numOfTags;

    // if specify some tags column
    if (sToken.type != TK_LP) {
      tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
    } else {
809 810
      /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
       * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
L
lihui 已提交
811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836
      int16_t offset[TSDB_MAX_COLUMNS] = {0};
      for (int32_t t = 1; t < numOfTags; ++t) {
        offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
      }

      while (1) {
        index = 0;
        sToken = tStrGetToken(sql, &index, false, 0, NULL);
        sql += index;

        if (TK_STRING == sToken.type) {
          sToken.n = strdequote(sToken.z);
          strtrim(sToken.z);
          sToken.n = (uint32_t)strlen(sToken.z);
        }

        if (sToken.type == TK_RP) {
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
        for (int32_t t = 0; t < numOfTags; ++t) {
          if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) {
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
837
            pElem->offset = offset[t];
L
lihui 已提交
838 839 840 841 842 843
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
              return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z);
            }

844
            spd.hasVal[t] = true;
L
lihui 已提交
845 846 847 848 849 850 851 852 853 854 855 856 857
            findColumnIndex = true;
            break;
          }
        }

        if (!findColumnIndex) {
          return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken.z);
        }
      }

      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > numOfTags) {
        return tscInvalidSQLErrMsg(pCmd->payload, "tag name expected", sToken.z);
      }
L
lihui 已提交
858 859 860 861

      index = 0;
      sToken = tStrGetToken(sql, &index, false, 0, NULL);
      sql += index;
L
lihui 已提交
862
    }
863

S
slguan 已提交
864
    if (sToken.type != TK_TAGS) {
L
lihui 已提交
865
      return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
H
hzcheng 已提交
866 867
    }

S
slguan 已提交
868 869
    uint32_t ignoreTokenTypes = TK_LP;
    uint32_t numOfIgnoreToken = 1;
L
lihui 已提交
870
    for (int i = 0; i < spd.numOfAssignedCols; ++i) {
871
      char *  tagVal = pTag->data + spd.elems[i].offset;
L
lihui 已提交
872
      int16_t colIndex = spd.elems[i].colIndex;
873

S
slguan 已提交
874 875 876 877
      index = 0;
      sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
      sql += index;
      if (sToken.n == 0) {
H
hzcheng 已提交
878
        break;
S
slguan 已提交
879 880 881
      } else if (sToken.type == TK_RP) {
        break;
      }
H
hzcheng 已提交
882

S
slguan 已提交
883 884 885 886
      // Remove quotation marks
      if (TK_STRING == sToken.type) {
        sToken.z++;
        sToken.n -= 2;
H
hzcheng 已提交
887 888
      }

H
hjxilinx 已提交
889
      code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
H
hzcheng 已提交
890
      if (code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
891
        return code;
H
hzcheng 已提交
892 893
      }

894 895
      if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY || pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) &&
          sToken.n > pTagSchema[colIndex].bytes) {
H
hjxilinx 已提交
896
        return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z);
S
slguan 已提交
897
      }
L
lihui 已提交
898
    }
S
slguan 已提交
899

L
lihui 已提交
900 901 902 903 904
    index = 0;
    sToken = tStrGetToken(sql, &index, false, 0, NULL);
    sql += index;
    if (sToken.n == 0 || sToken.type != TK_RP) {
      return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z);
H
hzcheng 已提交
905 906
    }

L
lihui 已提交
907 908 909
    // 2. set the null value for the columns that do not assign values
    if (spd.numOfAssignedCols < spd.numOfCols) {
      char *ptr = pTag->data;
910

L
lihui 已提交
911
      for (int32_t i = 0; i < spd.numOfCols; ++i) {
912
        if (!spd.hasVal[i]) {  // current tag column do not have any value to insert, set it to null
L
lihui 已提交
913 914
          setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
        }
915

L
lihui 已提交
916
        ptr += pTagSchema[i].bytes;
917
      }
H
hzcheng 已提交
918 919 920
    }

    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
L
lihui 已提交
921
      return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
H
hzcheng 已提交
922 923
    }

H
hjxilinx 已提交
924
    int32_t ret = setMeterID(pTableMetaInfo, &tableToken, pSql);
H
hzcheng 已提交
925 926 927 928 929
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

    createTable = true;
H
hjxilinx 已提交
930
    code = tscGetMeterMetaEx(pSql, pTableMetaInfo, true);
H
hjxilinx 已提交
931 932 933 934
    if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
      return code;
    }
    
H
hzcheng 已提交
935 936 937 938
  } else {
    if (cstart != NULL) {
      sql = cstart;
    } else {
S
slguan 已提交
939
      sql = sToken.z;
H
hzcheng 已提交
940
    }
H
hjxilinx 已提交
941
    code = tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
942 943 944 945 946
  }

  int32_t len = cend - cstart + 1;
  if (cstart != NULL && createTable == true) {
    /* move the column list to start position of the next accessed points */
W
WangXin 已提交
947
    memmove(sql - len, cstart, len);
H
hzcheng 已提交
948 949 950 951 952 953 954 955
    *sqlstr = sql - len;
  } else {
    *sqlstr = sql;
  }

  return code;
}

S
slguan 已提交
956
int validateTableName(char *tblName, int len) {
S
slguan 已提交
957
  char buf[TSDB_TABLE_ID_LEN] = {0};
S
slguan 已提交
958
  strncpy(buf, tblName, len);
S
slguan 已提交
959

S
slguan 已提交
960
  SSQLToken token = {.n = len, .type = TK_ID, .z = buf};
H
huili 已提交
961
  tSQLGetToken(buf, &token.type);
S
slguan 已提交
962

H
huili 已提交
963 964 965
  return tscValidateName(&token);
}

966 967 968 969 970 971 972 973 974
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
  if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
  }

  pCmd->dataSourceType = type;
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
975 976 977 978 979 980 981 982 983
/**
 * usage: insert into table1 values() () table2 values()()
 *
 * @param str
 * @param acct
 * @param db
 * @param pSql
 * @return
 */
984
int doParseInsertSql(SSqlObj *pSql, char *str) {
S
slguan 已提交
985
  SSqlCmd *pCmd = &pSql->cmd;
986

S
slguan 已提交
987
  int32_t totalNum = 0;
988 989
  int32_t code = TSDB_CODE_SUCCESS;

H
hjxilinx 已提交
990
  STableMetaInfo *pTableMetaInfo = NULL;
991 992 993 994

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo != NULL);

995
  if (pQueryInfo->numOfTables == 0) {
H
hjxilinx 已提交
996
    pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
997
  } else {
H
hjxilinx 已提交
998
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
999
  }
H
hzcheng 已提交
1000

S
slguan 已提交
1001
  if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1002 1003 1004
    return code;
  }

H
hjxilinx 已提交
1005
  assert(((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList))
L
lihui 已提交
1006 1007
      || ((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList)));

L
lihui 已提交
1008
  if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) {
H
hjxilinx 已提交
1009
    pSql->pTableHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
1010

L
lihui 已提交
1011 1012 1013 1014 1015 1016
    pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
    if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) {
      code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
  } else {
H
hjxilinx 已提交
1017
    assert((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList));
L
lihui 已提交
1018 1019 1020
    str = pSql->asyncTblPos;
  }
  
L
[#1083]  
lihui 已提交
1021
  tscTrace("%p create data block list for submit data:%p, asyncTblPos:%p, pTableHashList:%p", pSql, pSql->cmd.pDataBlocks, pSql->asyncTblPos, pSql->pTableHashList);
H
hzcheng 已提交
1022 1023

  while (1) {
1024
    int32_t   index = 0;
S
slguan 已提交
1025
    SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL);
1026 1027 1028 1029 1030 1031 1032 1033

    // no data in the sql string anymore.
    if (sToken.n == 0) {
      /*
       * if the data is from the data file, no data has been generated yet. So, there no data to
       * merge or submit, save the file path and parse the file in other routines.
       */
      if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
S
slguan 已提交
1034 1035 1036
        goto _clean;
      }

1037 1038 1039 1040 1041
      /*
       * if no data has been generated during parsing the sql string, error msg will return
       * Otherwise, create the first submit block and submit to virtual node.
       */
      if (totalNum == 0) {
H
hzcheng 已提交
1042 1043
        code = TSDB_CODE_INVALID_SQL;
        goto _error_clean;
1044 1045
      } else {
        break;
H
hzcheng 已提交
1046 1047 1048
      }
    }

L
lihui 已提交
1049 1050
    pSql->asyncTblPos = sToken.z;

S
slguan 已提交
1051
    // Check if the table name available or not
S
slguan 已提交
1052
    if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1053
      code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
H
huili 已提交
1054 1055 1056
      goto _error_clean;
    }

H
hjxilinx 已提交
1057
    if ((code = setMeterID(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1058 1059 1060
      goto _error_clean;
    }

1061 1062
    ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr;
    
1063 1064 1065 1066 1067 1068 1069
    if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
      /*
       * For async insert, after get the metermeta from server, the sql string will not be
       * parsed using the new metermeta to avoid the overhead cause by get metermeta data information.
       * And during the getMeterMetaCallback function, the sql string will be parsed from the
       * interrupted position.
       */
H
hjxilinx 已提交
1070 1071 1072
      if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
        tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos);
        return code;
H
hzcheng 已提交
1073
      }
H
hjxilinx 已提交
1074
      
H
hjxilinx 已提交
1075 1076 1077 1078
      // todo add to return
      tscError("async insert parse error, code:%d, %s", code, tstrerror(code));
      pSql->asyncTblPos = NULL;
      
H
hjxilinx 已提交
1079
      goto _error_clean;       // TODO: should _clean or _error_clean to async flow ????
H
hzcheng 已提交
1080 1081
    }

H
hjxilinx 已提交
1082
    if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
1083
      code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
H
hzcheng 已提交
1084 1085 1086
      goto _error_clean;
    }

S
slguan 已提交
1087 1088 1089
    index = 0;
    sToken = tStrGetToken(str, &index, false, 0, NULL);
    str += index;
1090

S
slguan 已提交
1091
    if (sToken.n == 0) {
1092
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
H
hzcheng 已提交
1093 1094
      goto _error_clean;
    }
H
hjxilinx 已提交
1095
    
H
hjxilinx 已提交
1096
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1097
    
S
slguan 已提交
1098
    if (sToken.type == TK_VALUES) {
H
hjxilinx 已提交
1099
      SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
H
hjxilinx 已提交
1100 1101
      
      SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1102
      tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
H
hzcheng 已提交
1103

1104 1105
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
        goto _error_clean;
H
hzcheng 已提交
1106 1107 1108 1109 1110 1111
      }

      /*
       * app here insert data in different vnodes, so we need to set the following
       * data in another submit procedure using async insert routines
       */
L
lihui 已提交
1112
      code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
H
hzcheng 已提交
1113 1114 1115
      if (code != TSDB_CODE_SUCCESS) {
        goto _error_clean;
      }
S
slguan 已提交
1116
    } else if (sToken.type == TK_FILE) {
1117 1118
      if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
        goto _error_clean;
H
hzcheng 已提交
1119 1120
      }

S
slguan 已提交
1121 1122 1123 1124
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;
      if (sToken.n == 0) {
H
hjxilinx 已提交
1125
        code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
H
hzcheng 已提交
1126 1127 1128
        goto _error_clean;
      }

S
slguan 已提交
1129
      char fname[PATH_MAX] = {0};
S
slguan 已提交
1130
      strncpy(fname, sToken.z, sToken.n);
S
slguan 已提交
1131
      strdequote(fname);
1132

H
hzcheng 已提交
1133 1134
      wordexp_t full_path;
      if (wordexp(fname, &full_path, 0) != 0) {
H
hjxilinx 已提交
1135
        code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
H
hzcheng 已提交
1136 1137 1138 1139 1140
        goto _error_clean;
      }
      strcpy(fname, full_path.we_wordv[0]);
      wordfree(&full_path);

H
hjxilinx 已提交
1141
      STableDataBlocks *pDataBlock = NULL;
H
hjxilinx 已提交
1142
      STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
1143
      
H
hjxilinx 已提交
1144 1145
      int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SShellSubmitBlock), pTableMetaInfo->name,
                                       pTableMeta, &pDataBlock);
H
hjxilinx 已提交
1146 1147 1148
      if (ret != TSDB_CODE_SUCCESS) {
        goto _error_clean;
      }
1149

S
slguan 已提交
1150 1151
      tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
      strcpy(pDataBlock->filename, fname);
S
slguan 已提交
1152
    } else if (sToken.type == TK_LP) {
H
hzcheng 已提交
1153
      /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
1154
      STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
H
hjxilinx 已提交
1155
      SSchema *   pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1156

1157
      if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1158 1159 1160
        goto _error_clean;
      }

1161
      SParsedDataColInfo spd = {0};
H
hjxilinx 已提交
1162
      spd.numOfCols = tinfo.numOfColumns;
H
hzcheng 已提交
1163 1164

      int16_t offset[TSDB_MAX_COLUMNS] = {0};
H
hjxilinx 已提交
1165
      for (int32_t t = 1; t < tinfo.numOfColumns; ++t) {
H
hzcheng 已提交
1166 1167 1168 1169
        offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
      }

      while (1) {
S
slguan 已提交
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
        index = 0;
        sToken = tStrGetToken(str, &index, false, 0, NULL);
        str += index;

        if (TK_STRING == sToken.type) {
          sToken.n = strdequote(sToken.z);
          strtrim(sToken.z);
          sToken.n = (uint32_t)strlen(sToken.z);
        }

        if (sToken.type == TK_RP) {
H
hzcheng 已提交
1181 1182 1183 1184 1185 1186
          break;
        }

        bool findColumnIndex = false;

        // todo speedup by using hash list
H
hjxilinx 已提交
1187
        for (int32_t t = 0; t < tinfo.numOfColumns; ++t) {
S
slguan 已提交
1188
          if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
S
slguan 已提交
1189
            SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
H
hzcheng 已提交
1190 1191 1192 1193
            pElem->offset = offset[t];
            pElem->colIndex = t;

            if (spd.hasVal[t] == true) {
H
hjxilinx 已提交
1194
              code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
H
hzcheng 已提交
1195 1196 1197 1198 1199 1200 1201 1202 1203
              goto _error_clean;
            }

            spd.hasVal[t] = true;
            findColumnIndex = true;
            break;
          }
        }

S
slguan 已提交
1204
        if (!findColumnIndex) {
H
hjxilinx 已提交
1205
          code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
H
hzcheng 已提交
1206 1207 1208 1209
          goto _error_clean;
        }
      }

H
hjxilinx 已提交
1210
      if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
H
hjxilinx 已提交
1211
        code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
H
hzcheng 已提交
1212 1213 1214
        goto _error_clean;
      }

S
slguan 已提交
1215 1216 1217 1218 1219
      index = 0;
      sToken = tStrGetToken(str, &index, false, 0, NULL);
      str += index;

      if (sToken.type != TK_VALUES) {
H
hjxilinx 已提交
1220
        code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
H
hzcheng 已提交
1221 1222 1223
        goto _error_clean;
      }

L
lihui 已提交
1224
      code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
H
hzcheng 已提交
1225 1226 1227 1228
      if (code != TSDB_CODE_SUCCESS) {
        goto _error_clean;
      }
    } else {
H
hjxilinx 已提交
1229
      code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
H
hzcheng 已提交
1230 1231 1232 1233
      goto _error_clean;
    }
  }

S
slguan 已提交
1234 1235 1236 1237
  // we need to keep the data blocks if there are parameters in the sql
  if (pCmd->numOfParams > 0) {
    goto _clean;
  }
1238

S
slguan 已提交
1239
  // submit to more than one vnode
H
hzcheng 已提交
1240
  if (pCmd->pDataBlocks->nSize > 0) {
H
hjxilinx 已提交
1241
    // merge according to vgId
S
slguan 已提交
1242 1243 1244
    if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
      goto _error_clean;
    }
H
hzcheng 已提交
1245

S
slguan 已提交
1246 1247 1248
    STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
    if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
      goto _error_clean;
H
hzcheng 已提交
1249 1250
    }

1251
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
1252

S
slguan 已提交
1253
    // set the next sent data vnode index in data block arraylist
H
hjxilinx 已提交
1254
    pTableMetaInfo->vnodeIndex = 1;
H
hzcheng 已提交
1255
  } else {
S
slguan 已提交
1256
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1257 1258 1259 1260 1261 1262
  }

  code = TSDB_CODE_SUCCESS;
  goto _clean;

_error_clean:
S
slguan 已提交
1263
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1264 1265

_clean:
H
hjxilinx 已提交
1266
  taosHashCleanup(pSql->pTableHashList);
H
hjxilinx 已提交
1267
  
L
lihui 已提交
1268
  pSql->pTableHashList = NULL;
L
[1078]  
lihui 已提交
1269
  pSql->asyncTblPos    = NULL;
L
[#1083]  
lihui 已提交
1270
  pCmd->isParseFinish  = 1;
H
hjxilinx 已提交
1271
  
H
hzcheng 已提交
1272 1273 1274
  return code;
}

1275
int tsParseInsertSql(SSqlObj *pSql) {
S
slguan 已提交
1276 1277 1278
  if (!pSql->pTscObj->writeAuth) {
    return TSDB_CODE_NO_RIGHTS;
  }
H
hzcheng 已提交
1279

H
hjxilinx 已提交
1280
  int32_t  index = 0;
S
slguan 已提交
1281
  SSqlCmd *pCmd = &pSql->cmd;
1282 1283

  SSQLToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
H
hjxilinx 已提交
1284
  assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
1285 1286 1287 1288 1289

  pCmd->count = 0;
  pCmd->command = TSDB_SQL_INSERT;

  SQueryInfo *pQueryInfo = NULL;
1290
  tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
1291

1292 1293
  uint16_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT;
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, type);
1294 1295

  sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
S
slguan 已提交
1296
  if (sToken.type != TK_INTO) {
H
hjxilinx 已提交
1297
    return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
S
slguan 已提交
1298
  }
1299

H
hjxilinx 已提交
1300
  pSql->res.numOfRows = 0;
1301
  return doParseInsertSql(pSql, pSql->sqlstr + index);
H
hzcheng 已提交
1302 1303
}

1304
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
H
hzcheng 已提交
1305
  int32_t ret = TSDB_CODE_SUCCESS;
S
slguan 已提交
1306

L
lihui 已提交
1307 1308
  if (NULL == pSql->asyncTblPos) {
    tscCleanSqlCmd(&pSql->cmd);
1309 1310
  } else {
    tscTrace("continue parse sql: %s", pSql->asyncTblPos);
L
lihui 已提交
1311
  }
L
lihui 已提交
1312
  
H
hzcheng 已提交
1313 1314
  if (tscIsInsertOrImportData(pSql->sqlstr)) {
    /*
S
slguan 已提交
1315 1316 1317
     * only for async multi-vnode insertion
     * Set the fp before parse the sql string, in case of getmetermeta failed, in which
     * the error handle callback function can rightfully restore the user defined function (fp)
H
hzcheng 已提交
1318 1319 1320 1321
     */
    if (pSql->fp != NULL && multiVnodeInsertion) {
      pSql->fetchFp = pSql->fp;

S
slguan 已提交
1322
      // replace user defined callback function with multi-insert proxy function
H
hjxilinx 已提交
1323
      pSql->fp = (void(*)())tscHandleMultivnodeInsert;
H
hzcheng 已提交
1324 1325
    }

1326
    ret = tsParseInsertSql(pSql);
H
hzcheng 已提交
1327
  } else {
S
slguan 已提交
1328 1329
    ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
    if (TSDB_CODE_SUCCESS != ret) return ret;
1330

H
hzcheng 已提交
1331 1332
    SSqlInfo SQLInfo = {0};
    tSQLParse(&SQLInfo, pSql->sqlstr);
S
slguan 已提交
1333

H
hzcheng 已提交
1334 1335 1336 1337 1338
    ret = tscToSQLCmd(pSql, &SQLInfo);
    SQLInfoDestroy(&SQLInfo);
  }

  /*
H
hjxilinx 已提交
1339
   * the pRes->code may be modified or even released by another thread in tscTableMetaCallBack
H
hzcheng 已提交
1340 1341 1342 1343 1344 1345 1346 1347 1348
   * function, so do NOT use pRes->code to determine if the getMeterMeta/getMetricMeta function
   * invokes new threads to get data from mnode or simply retrieves data from cache.
   *
   * do NOT assign return code to pRes->code for the same reason for it may be released by another thread
   * pRes->code = ret;
   */
  return ret;
}

S
slguan 已提交
1349 1350 1351
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
  int32_t  code = TSDB_CODE_SUCCESS;
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1352

1353
  assert(pCmd->numOfClause == 1);
1354
  STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
S
slguan 已提交
1355 1356

  SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
H
hjxilinx 已提交
1357
  tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
S
slguan 已提交
1358

S
slguan 已提交
1359 1360 1361
  if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
    return code;
  }
S
slguan 已提交
1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375

  // the pDataBlock is different from the pTableDataBlocks
  STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
  if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
    return code;
  }

  if ((code = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
    return code;
  }

  return TSDB_CODE_SUCCESS;
}

L
[1292]  
lihui 已提交
1376
static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
S
slguan 已提交
1377 1378 1379 1380
  size_t          readLen = 0;
  char *          line = NULL;
  size_t          n = 0;
  int             len = 0;
L
lihui 已提交
1381
  int32_t         maxRows = 0;
S
slguan 已提交
1382 1383 1384 1385
  SSqlCmd *       pCmd = &pSql->cmd;
  int             numOfRows = 0;
  int32_t         code = 0;
  int             nrows = 0;
1386
  
1387
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1388
  STableMeta *    pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
1389
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
hjxilinx 已提交
1390
  
1391 1392
  assert(pCmd->numOfClause == 1);
  
H
hjxilinx 已提交
1393
  int32_t rowSize = tinfo.rowSize;
S
slguan 已提交
1394 1395

  pCmd->pDataBlocks = tscCreateBlockArrayList();
H
hjxilinx 已提交
1396
  STableDataBlocks *pTableDataBlock = NULL;
1397
  int32_t           ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SShellSubmitBlock),
H
hjxilinx 已提交
1398
                                   pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
H
hjxilinx 已提交
1399 1400 1401
  if (ret != TSDB_CODE_SUCCESS) {
    return -1;
  }
1402

S
slguan 已提交
1403 1404
  tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);

L
lihui 已提交
1405 1406
  code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows);
  if (TSDB_CODE_SUCCESS != code) return -1;
H
hzcheng 已提交
1407 1408

  int                count = 0;
H
hjxilinx 已提交
1409 1410
  SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
  SSchema *          pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1411

H
hjxilinx 已提交
1412
  tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
H
hzcheng 已提交
1413

H
huili 已提交
1414
  while ((readLen = getline(&line, &n, fp)) != -1) {
H
hzcheng 已提交
1415 1416
    // line[--readLen] = '\0';
    if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0;
1417
    if (readLen == 0) continue;  // fang, <= to ==
H
huili 已提交
1418

S
slguan 已提交
1419
    char *lineptr = line;
H
hzcheng 已提交
1420
    strtolower(line, line);
H
hjxilinx 已提交
1421
    
H
hjxilinx 已提交
1422
    len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tmpTokenBuf);
S
slguan 已提交
1423
    if (len <= 0 || pTableDataBlock->numOfParams > 0) {
L
[1292]  
lihui 已提交
1424 1425
      pSql->res.code = code;
      return (-code);
S
slguan 已提交
1426
    }
1427

S
slguan 已提交
1428
    pTableDataBlock->size += len;
H
hzcheng 已提交
1429 1430 1431 1432

    count++;
    nrows++;
    if (count >= maxRows) {
S
slguan 已提交
1433 1434
      if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
        return -code;
H
hzcheng 已提交
1435
      }
S
slguan 已提交
1436 1437 1438

      pTableDataBlock = pCmd->pDataBlocks->pData[0];
      pTableDataBlock->size = sizeof(SShellSubmitBlock);
H
hjxilinx 已提交
1439
      pTableDataBlock->rowSize = tinfo.rowSize;
S
slguan 已提交
1440

H
hzcheng 已提交
1441
      numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1442
      pSql->res.numOfRows = 0;
H
hzcheng 已提交
1443 1444 1445 1446 1447
      count = 0;
    }
  }

  if (count > 0) {
S
slguan 已提交
1448 1449
    if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
      return -code;
H
hzcheng 已提交
1450
    }
S
slguan 已提交
1451

H
hzcheng 已提交
1452
    numOfRows += pSql->res.numOfRows;
S
slguan 已提交
1453
    pSql->res.numOfRows = 0;
H
hzcheng 已提交
1454 1455 1456
  }

  if (line) tfree(line);
S
slguan 已提交
1457

H
hzcheng 已提交
1458 1459 1460 1461 1462 1463 1464 1465 1466
  return numOfRows;
}

/* multi-vnodes insertion in sync query model
 *
 * modify history
 * 2019.05.10 lihui
 * Remove the code for importing records from files
 */
S
slguan 已提交
1467 1468
void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1469 1470

  // not insert/import, return directly
H
hzcheng 已提交
1471 1472 1473 1474
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

S
slguan 已提交
1475 1476 1477 1478 1479
  // SSqlCmd may have been released
  if (pCmd->pDataBlocks == NULL) {
    return;
  }

S
slguan 已提交
1480
  STableDataBlocks *pDataBlock = NULL;
1481
  STableMetaInfo *  pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
1482 1483
  assert(pCmd->numOfClause == 1);
  
S
slguan 已提交
1484
  int32_t           code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1485 1486

  /* the first block has been sent to server in processSQL function */
H
hjxilinx 已提交
1487
  assert(pTableMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL);
H
hzcheng 已提交
1488

H
hjxilinx 已提交
1489
  if (pTableMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) {
S
slguan 已提交
1490
    SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
H
hzcheng 已提交
1491

H
hjxilinx 已提交
1492
    for (int32_t i = pTableMetaInfo->vnodeIndex; i < pDataBlocks->nSize; ++i) {
H
hzcheng 已提交
1493 1494 1495 1496 1497 1498
      pDataBlock = pDataBlocks->pData[i];
      if (pDataBlock == NULL) {
        continue;
      }

      if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1499
        tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pTableMetaInfo->vnodeIndex,
1500
                 pDataBlocks->nSize);
H
hzcheng 已提交
1501 1502 1503 1504 1505 1506 1507 1508
        continue;
      }

      tscProcessSql(pSql);
    }
  }

  // all data have been submit to vnode, release data blocks
S
slguan 已提交
1509
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
1510 1511
}

S
slguan 已提交
1512
// multi-vnodes insertion in sync query model
1513
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
S
slguan 已提交
1514
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1515 1516 1517 1518
  if (pCmd->command != TSDB_SQL_INSERT) {
    return;
  }

1519
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1520
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1521

S
slguan 已提交
1522 1523
  STableDataBlocks *pDataBlock = NULL;
  int32_t           affected_rows = 0;
H
hzcheng 已提交
1524

1525
  assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL);
S
slguan 已提交
1526 1527
  SDataBlockList *pDataBlockList = pCmd->pDataBlocks;
  pCmd->pDataBlocks = NULL;
H
hzcheng 已提交
1528

S
slguan 已提交
1529
  char path[PATH_MAX] = {0};
H
hzcheng 已提交
1530

S
slguan 已提交
1531 1532
  for (int32_t i = 0; i < pDataBlockList->nSize; ++i) {
    pDataBlock = pDataBlockList->pData[i];
H
hzcheng 已提交
1533 1534 1535
    if (pDataBlock == NULL) {
      continue;
    }
1536

S
slguan 已提交
1537 1538 1539 1540
    if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) {
      tscError("%p failed to malloc when insert file", pSql);
      continue;
    }
H
hzcheng 已提交
1541 1542
    pCmd->count = 1;

S
slguan 已提交
1543 1544 1545
    strncpy(path, pDataBlock->filename, PATH_MAX);

    FILE *fp = fopen(path, "r");
H
hzcheng 已提交
1546
    if (fp == NULL) {
S
slguan 已提交
1547
      tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno));
H
hzcheng 已提交
1548 1549 1550
      continue;
    }

H
hjxilinx 已提交
1551
    strncpy(pTableMetaInfo->name, pDataBlock->tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1552 1553
    memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);

H
hjxilinx 已提交
1554
    int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo);
S
slguan 已提交
1555 1556 1557 1558
    if (ret != TSDB_CODE_SUCCESS) {
      tscError("%p get meter meta failed, abort", pSql);
      continue;
    }
1559 1560

    char *tmpTokenBuf = calloc(1, 4096);  // used for deleting Escape character: \\, \', \"
L
[1292]  
lihui 已提交
1561 1562 1563 1564
    if (NULL == tmpTokenBuf) {
      tscError("%p calloc failed", pSql);
      continue;
    }
S
slguan 已提交
1565

L
[1292]  
lihui 已提交
1566 1567
    int nrows = tscInsertDataFromFile(pSql, fp, tmpTokenBuf);
    free(tmpTokenBuf);
1568

S
slguan 已提交
1569 1570
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
hzcheng 已提交
1571 1572
    if (nrows < 0) {
      fclose(fp);
S
slguan 已提交
1573
      tscTrace("%p no records(%d) in file %s", pSql, nrows, path);
H
hzcheng 已提交
1574 1575 1576
      continue;
    }

S
slguan 已提交
1577
    fclose(fp);
H
hzcheng 已提交
1578 1579
    affected_rows += nrows;

S
slguan 已提交
1580
    tscTrace("%p Insert data %d records from file %s", pSql, nrows, path);
H
hzcheng 已提交
1581 1582 1583 1584 1585
  }

  pSql->res.numOfRows = affected_rows;

  // all data have been submit to vnode, release data blocks
S
slguan 已提交
1586 1587
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
  tscDestroyBlockArrayList(pDataBlockList);
H
hzcheng 已提交
1588
}