tscAsync.c 16.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hjxilinx 已提交
17 18 19
#include "tutil.h"

#include "tnote.h"
H
hzcheng 已提交
20
#include "trpc.h"
S
slguan 已提交
21
#include "tscLog.h"
H
hzcheng 已提交
22
#include "tscProfile.h"
H
hjxilinx 已提交
23
#include "tscSubquery.h"
H
hzcheng 已提交
24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
S
slguan 已提交
26
#include "tsched.h"
H
hjxilinx 已提交
27
#include "tschemautil.h"
H
hjxilinx 已提交
28
#include "tsclient.h"
H
hzcheng 已提交
29

30 31
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
32 33 34 35

static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());

/*
36 37 38
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
39
 */
40 41
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
42

43
int doAsyncParseSql(SSqlObj* pSql) {
B
Bomin Zhang 已提交
44 45 46 47
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
  int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
48
    tscError("failed to malloc payload");
49
    tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
B
Bomin Zhang 已提交
50
    return code;
H
hzcheng 已提交
51
  }
52
  
H
hzcheng 已提交
53 54
  pRes->qhandle = 0;
  pRes->numOfRows = 1;
55
  
56
  tscDump("%p SQL: %s", pSql, pSql->sqlstr);
B
Bomin Zhang 已提交
57 58 59 60
  return tsParseSql(pSql, true);
}

void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
H
hzcheng 已提交
61
  pSql->signature = pSql;
H
hjxilinx 已提交
62 63 64
  pSql->param     = param;
  pSql->pTscObj   = pObj;
  pSql->maxRetry  = TSDB_MAX_REPLICA_NUM;
65
  pSql->fp        = fp;
H
Haojun Liao 已提交
66
  pSql->sqlstr = calloc(1, sqlLen + 1);
H
hzcheng 已提交
67 68
  if (pSql->sqlstr == NULL) {
    tscError("%p failed to malloc sql string buffer", pSql);
69
    tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
H
hzcheng 已提交
70 71
    return;
  }
S
slguan 已提交
72
  strtolower(pSql->sqlstr, sqlstr);
73
  
74
  int32_t code = doAsyncParseSql(pSql);
75
  if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
76
  
H
hzcheng 已提交
77
  if (code != TSDB_CODE_SUCCESS) {
H
[td-99]  
hjxilinx 已提交
78
    pSql->res.code = code;
H
hzcheng 已提交
79 80 81
    tscQueueAsyncRes(pSql);
    return;
  }
82
  
H
hzcheng 已提交
83 84 85
  tscDoQuery(pSql);
}

86
// TODO return the correct error code to client in tscQueueAsyncError
H
hjxilinx 已提交
87
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
88 89 90
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    tscError("bug!!! pObj:%p", pObj);
91 92
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
93 94 95 96 97
    return;
  }
  
  int32_t sqlLen = strlen(sqlstr);
  if (sqlLen > tsMaxSQLStringLen) {
H
Haojun Liao 已提交
98
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
99 100
    terrno = TSDB_CODE_TSC_INVALID_SQL;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_SQL);
101 102 103 104 105 106 107 108
    return;
  }
  
  taosNotePrintTsc(sqlstr);
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
109 110
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
111 112 113 114 115 116
    return;
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}

117
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
118 119 120 121 122 123 124 125
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

126 127 128 129
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
130 131 132 133 134 135 136 137 138
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

139 140 141 142 143
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
144
    }
145 146 147 148
    
    return;
  }
  
149
  // local merge has handle this situation during super table non-projection query.
H
hjxilinx 已提交
150
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
H
Haojun Liao 已提交
151
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()) {
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

168
  if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
169
    if (pRes->qhandle == 0 && numOfRows != 0) {
H
hzcheng 已提交
170 171 172 173 174
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }

H
[td-99]  
hjxilinx 已提交
175
    tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
H
hzcheng 已提交
176 177 178 179
    return;
  }

  pSql->fp = fp;
H
hjxilinx 已提交
180
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
181 182 183 184 185 186
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
  tscProcessSql(pSql);
}

/*
187 188 189
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
190
 */
191 192
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
193
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
194 195
}

196 197
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
198
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
H
hzcheng 已提交
199 200 201 202 203 204
}

void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
205
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
206 207 208 209 210 211 212 213
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
214
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
H
hzcheng 已提交
215 216 217 218 219
    return;
  }

  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
220
  pSql->fp = tscAsyncFetchRowsProxy;
H
hzcheng 已提交
221 222

  pSql->param = param;
S
slguan 已提交
223
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
224 225
  
  // handle the sub queries of join query
226
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hjxilinx 已提交
227
    tscFetchDatablockFromSubquery(pSql);
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
  } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
      return;
    } else {
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }
    
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
    }
    return;
  } else { // current query is not completed, continue retrieve from node
H
hjxilinx 已提交
250
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
251 252 253 254
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
  
    tscProcessSql(pSql);
H
hzcheng 已提交
255 256 257 258 259 260 261
  }
}

void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
262
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
263 264 265 266 267 268 269 270
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
271
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
H
hzcheng 已提交
272 273 274 275 276
    return;
  }

  pSql->fetchFp = fp;
  pSql->param = param;
277
  
H
hzcheng 已提交
278
  if (pRes->row >= pRes->numOfRows) {
S
slguan 已提交
279
    tscResetForNextRetrieve(pRes);
280
    pSql->fp = tscAsyncFetchSingleRowProxy;
281
    
H
hjxilinx 已提交
282
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
283 284 285
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
    
H
hzcheng 已提交
286 287 288 289 290 291 292 293 294 295 296
    tscProcessSql(pSql);
  } else {
    SSchedMsg schedMsg;
    schedMsg.fp = tscProcessFetchRow;
    schedMsg.ahandle = pSql;
    schedMsg.thandle = pRes->tsrow;
    schedMsg.msg = NULL;
    taosScheduleTask(tscQhandle, &schedMsg);
  }
}

297
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
298 299 300 301
  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

302
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
303
  
H
hzcheng 已提交
304
  if (numOfRows == 0) {
305 306 307
    if (hasMoreVnodesToTry(pSql)) {     // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
    } else {
H
hzcheng 已提交
308
      /*
309 310
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
H
hzcheng 已提交
311 312 313
       */
      (*pSql->fetchFp)(pSql->param, pSql, NULL);
    }
314
    return;
H
hzcheng 已提交
315
  }
316
  
317
  for (int i = 0; i < pCmd->numOfCols; ++i){
H
hjxilinx 已提交
318 319 320
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
    if (pSup->pSqlExpr != NULL) {
//      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
321 322 323 324 325
    } else {
      //todo add
    }
  }
  
326 327 328
  pRes->row++;

  (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
H
hzcheng 已提交
329 330 331 332 333 334
}

void tscProcessFetchRow(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
335
  
H
hjxilinx 已提交
336
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
337

338
  for (int i = 0; i < pCmd->numOfCols; ++i) {
H
hjxilinx 已提交
339 340 341
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);

    if (pSup->pSqlExpr != NULL) {
342
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
343
    } else {
H
hjxilinx 已提交
344
//      todo add
345
    }
346 347
  }
  
H
hzcheng 已提交
348 349 350 351 352 353 354 355 356 357 358 359 360
  pRes->row++;
  (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}

void tscProcessAsyncRes(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *taosres = pSql;

  // pCmd may be released, so cache pCmd->command
  int cmd = pCmd->command;
H
[TD-98]  
hjxilinx 已提交
361
  int code = pRes->code;
H
hzcheng 已提交
362 363

  // in case of async insert, restore the user specified callback function
H
hjxilinx 已提交
364
  bool shouldFree = tscShouldBeFreed(pSql);
H
hzcheng 已提交
365 366 367 368 369 370 371 372 373

  if (cmd == TSDB_SQL_INSERT) {
    assert(pSql->fp != NULL);
    pSql->fp = pSql->fetchFp;
  }

  (*pSql->fp)(pSql->param, taosres, code);

  if (shouldFree) {
374
    tscTrace("%p sqlObj is automatically freed in async res", pSql);
sangshuduo's avatar
sangshuduo 已提交
375
    tscFreeSqlObj(pSql);
H
hzcheng 已提交
376 377 378
  }
}

H
[td-99]  
hjxilinx 已提交
379
static void tscProcessAsyncError(SSchedMsg *pMsg) {
H
hzcheng 已提交
380
  void (*fp)() = pMsg->ahandle;
H
[td-99]  
hjxilinx 已提交
381
  (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
H
hzcheng 已提交
382 383
}

H
[td-99]  
hjxilinx 已提交
384 385 386 387
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
  int32_t* c = malloc(sizeof(int32_t));
  *c = code;
  
H
hzcheng 已提交
388 389 390 391
  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
H
[td-99]  
hjxilinx 已提交
392
  schedMsg.msg = c;
H
hzcheng 已提交
393 394 395 396 397 398 399 400
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscQueueAsyncRes(SSqlObj *pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
    tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
    return;
  } else {
H
[td-32]  
hjxilinx 已提交
401
    tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
H
hzcheng 已提交
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
  }

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncRes;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscProcessAsyncFree(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  tscTrace("%p sql is freed", pSql);
  taos_free_result(pSql);
}

void tscQueueAsyncFreeResult(SSqlObj *pSql) {
  tscTrace("%p sqlObj put in queue to async free", pSql);

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncFree;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

int tscSendMsgToServer(SSqlObj *pSql);

H
hjxilinx 已提交
431
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
H
hzcheng 已提交
432 433 434 435 436 437
  SSqlObj *pSql = (SSqlObj *)param;
  if (pSql == NULL || pSql->signature != pSql) return;

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
[td-32]  
hjxilinx 已提交
438 439
  if (code != TSDB_CODE_SUCCESS) {
    pRes->code = code;
H
hzcheng 已提交
440 441 442 443 444
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream == NULL) {
H
hjxilinx 已提交
445
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
446 447 448 449 450

    // check if it is a sub-query of super table query first, if true, enter another routine
    if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
      tscTrace("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);

H
hjxilinx 已提交
451
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
L
lihui 已提交
452 453 454
      if (pTableMetaInfo->pTableMeta == NULL){
        code = tscGetTableMeta(pSql, pTableMetaInfo);
        assert(code == TSDB_CODE_SUCCESS);      
H
Haojun Liao 已提交
455
      }
L
lihui 已提交
456
      
H
hjxilinx 已提交
457
      assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
H
hzcheng 已提交
458 459 460

      SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
      SSqlObj *         pParObj = trs->pParentSqlObj;
461
      
H
hjxilinx 已提交
462
      assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
H
hjxilinx 已提交
463
          tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
S
slguan 已提交
464

H
Haojun Liao 已提交
465 466 467 468 469 470
      // NOTE: the vgroupInfo for the queried super table must be existed here.
      assert(pTableMetaInfo->vgroupList != NULL);
      if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {  // continue to process normal async query
471
      if (pCmd->parseFinished) {
H
Haojun Liao 已提交
472 473
        tscTrace("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);

474
        STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
475
        code = tscGetTableMeta(pSql, pTableMetaInfo);
L
[#1083]  
lihui 已提交
476
        assert(code == TSDB_CODE_SUCCESS);
H
Haojun Liao 已提交
477 478 479 480

        // if failed to process sql, go to error handler
        if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
          return;
L
[#1083]  
lihui 已提交
481
        }
H
Haojun Liao 已提交
482 483 484 485 486
//          // todo update the submit message according to the new table meta
//          // 1. table uid, 2. ip address
//          code = tscSendMsgToServer(pSql);
//          if (code == TSDB_CODE_SUCCESS) return;
//        }
L
[#1083]  
lihui 已提交
487
      } else {
H
Haojun Liao 已提交
488 489
        tscTrace("%p continue parse sql after get table meta", pSql);

H
hjxilinx 已提交
490
        code = tsParseSql(pSql, false);
H
Haojun Liao 已提交
491
        if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
H
Hui Li 已提交
492 493 494
          STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
          code = tscGetTableMeta(pSql, pTableMetaInfo);
          assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL);
H
Haojun Liao 已提交
495

H
Hui Li 已提交
496
          (*pSql->fp)(pSql->param, pSql, code);
H
Hui Li 已提交
497 498 499
          return;
        }
        
500
        if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
L
[#1083]  
lihui 已提交
501
      }
H
hzcheng 已提交
502
    }
S
slguan 已提交
503

H
hzcheng 已提交
504
  } else {  // stream computing
505
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
506
    code = tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
507 508
    pRes->code = code;

509
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
H
hzcheng 已提交
510

weixin_48148422's avatar
weixin_48148422 已提交
511
    if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
512
      code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
H
hzcheng 已提交
513 514
      pRes->code = code;

515
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
H
hzcheng 已提交
516 517 518
    }
  }

519 520
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
H
hzcheng 已提交
521 522 523 524 525 526
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream) {
    tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
527 528 529 530
    if (!pSql->cmd.parseFinished) {
      tsParseSql(pSql, false);
      sem_post(&pSql->rspSem);
    }
531
    return;
H
hzcheng 已提交
532
  } else {
H
hjxilinx 已提交
533
    tscTrace("%p get tableMeta successfully", pSql);
H
hzcheng 已提交
534 535 536 537
  }

  tscDoQuery(pSql);
}