tscAsync.c 16.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hjxilinx 已提交
17 18 19
#include "tutil.h"

#include "tnote.h"
H
hzcheng 已提交
20
#include "trpc.h"
S
slguan 已提交
21
#include "tscLog.h"
H
hzcheng 已提交
22
#include "tscProfile.h"
H
hjxilinx 已提交
23
#include "tscSubquery.h"
H
hzcheng 已提交
24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
S
slguan 已提交
26
#include "tsched.h"
H
hjxilinx 已提交
27
#include "tschemautil.h"
H
hjxilinx 已提交
28
#include "tsclient.h"
H
hzcheng 已提交
29

30 31
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
32 33 34 35

static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());

/*
36 37 38
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
39
 */
40 41
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
42

43
int doAsyncParseSql(SSqlObj* pSql) {
B
Bomin Zhang 已提交
44 45 46 47
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
  int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
48
    tscError("failed to malloc payload");
H
Haojun Liao 已提交
49 50
    tscQueueAsyncRes(pSql);
//    tscQueueAsyncRes(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
B
Bomin Zhang 已提交
51
    return code;
H
hzcheng 已提交
52
  }
53
  
H
hzcheng 已提交
54 55
  pRes->qhandle = 0;
  pRes->numOfRows = 1;
56
  
57
  tscDump("%p SQL: %s", pSql, pSql->sqlstr);
B
Bomin Zhang 已提交
58 59 60 61
  return tsParseSql(pSql, true);
}

void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
H
hzcheng 已提交
62
  pSql->signature = pSql;
H
hjxilinx 已提交
63 64 65
  pSql->param     = param;
  pSql->pTscObj   = pObj;
  pSql->maxRetry  = TSDB_MAX_REPLICA_NUM;
66
  pSql->fp        = fp;
H
Haojun Liao 已提交
67
  pSql->sqlstr = calloc(1, sqlLen + 1);
H
hzcheng 已提交
68 69
  if (pSql->sqlstr == NULL) {
    tscError("%p failed to malloc sql string buffer", pSql);
70
    tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
H
hzcheng 已提交
71 72
    return;
  }
S
slguan 已提交
73
  strtolower(pSql->sqlstr, sqlstr);
74
  
75
  int32_t code = doAsyncParseSql(pSql);
76
  if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
77
  
H
hzcheng 已提交
78
  if (code != TSDB_CODE_SUCCESS) {
H
[td-99]  
hjxilinx 已提交
79
    pSql->res.code = code;
H
hzcheng 已提交
80 81 82
    tscQueueAsyncRes(pSql);
    return;
  }
83
  
H
hzcheng 已提交
84 85 86
  tscDoQuery(pSql);
}

87
// TODO return the correct error code to client in tscQueueAsyncError
H
hjxilinx 已提交
88
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
89 90 91
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    tscError("bug!!! pObj:%p", pObj);
92 93
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
94 95 96 97 98
    return;
  }
  
  int32_t sqlLen = strlen(sqlstr);
  if (sqlLen > tsMaxSQLStringLen) {
H
Haojun Liao 已提交
99
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
100 101
    terrno = TSDB_CODE_TSC_INVALID_SQL;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_SQL);
102 103 104 105 106 107 108 109
    return;
  }
  
  taosNotePrintTsc(sqlstr);
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
110 111
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
112 113 114 115 116 117
    return;
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}

118
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
119 120 121 122 123 124 125 126
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

127 128 129 130
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
131 132 133 134 135 136 137 138 139
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

140 141 142 143 144
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
145
    }
146 147 148 149
    
    return;
  }
  
150
  // local merge has handle this situation during super table non-projection query.
H
hjxilinx 已提交
151
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
H
Haojun Liao 已提交
152
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()) {
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

169
  if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
170
    if (pRes->qhandle == 0 && numOfRows != 0) {
H
hzcheng 已提交
171 172 173 174 175
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }

H
[td-99]  
hjxilinx 已提交
176
    tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
H
hzcheng 已提交
177 178 179 180
    return;
  }

  pSql->fp = fp;
H
hjxilinx 已提交
181
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
182 183 184 185 186 187
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
  tscProcessSql(pSql);
}

/*
188 189 190
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
191
 */
192 193
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
194
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
195 196
}

197 198
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
199
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
H
hzcheng 已提交
200 201 202 203 204 205
}

void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
206
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
207 208 209 210 211 212 213 214
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
Haojun Liao 已提交
215 216
    pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
217 218 219 220 221
    return;
  }

  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
222
  pSql->fp = tscAsyncFetchRowsProxy;
H
hzcheng 已提交
223 224

  pSql->param = param;
S
slguan 已提交
225
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
226 227
  
  // handle the sub queries of join query
228
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hjxilinx 已提交
229
    tscFetchDatablockFromSubquery(pSql);
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
  } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
      return;
    } else {
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }
    
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
    }
    return;
  } else { // current query is not completed, continue retrieve from node
H
hjxilinx 已提交
252
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
253 254 255 256
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
  
    tscProcessSql(pSql);
H
hzcheng 已提交
257 258 259 260 261 262 263
  }
}

void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
264
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
265 266 267 268 269 270 271 272
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
273
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
H
hzcheng 已提交
274 275 276 277 278
    return;
  }

  pSql->fetchFp = fp;
  pSql->param = param;
279
  
H
hzcheng 已提交
280
  if (pRes->row >= pRes->numOfRows) {
S
slguan 已提交
281
    tscResetForNextRetrieve(pRes);
282
    pSql->fp = tscAsyncFetchSingleRowProxy;
283
    
H
hjxilinx 已提交
284
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
285 286 287
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
    
H
hzcheng 已提交
288 289 290 291 292 293 294 295 296 297 298
    tscProcessSql(pSql);
  } else {
    SSchedMsg schedMsg;
    schedMsg.fp = tscProcessFetchRow;
    schedMsg.ahandle = pSql;
    schedMsg.thandle = pRes->tsrow;
    schedMsg.msg = NULL;
    taosScheduleTask(tscQhandle, &schedMsg);
  }
}

299
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
300 301 302 303
  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

304
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
305
  
H
hzcheng 已提交
306
  if (numOfRows == 0) {
307 308 309
    if (hasMoreVnodesToTry(pSql)) {     // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
    } else {
H
hzcheng 已提交
310
      /*
311 312
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
H
hzcheng 已提交
313 314 315
       */
      (*pSql->fetchFp)(pSql->param, pSql, NULL);
    }
316
    return;
H
hzcheng 已提交
317
  }
318
  
319
  for (int i = 0; i < pCmd->numOfCols; ++i){
H
hjxilinx 已提交
320 321 322
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
    if (pSup->pSqlExpr != NULL) {
//      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
323 324 325 326 327
    } else {
      //todo add
    }
  }
  
328 329 330
  pRes->row++;

  (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
H
hzcheng 已提交
331 332 333 334 335 336
}

void tscProcessFetchRow(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
337
  
H
hjxilinx 已提交
338
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
339

340
  for (int i = 0; i < pCmd->numOfCols; ++i) {
H
hjxilinx 已提交
341 342 343
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);

    if (pSup->pSqlExpr != NULL) {
344
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
345
    } else {
H
hjxilinx 已提交
346
//      todo add
347
    }
348 349
  }
  
H
hzcheng 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362
  pRes->row++;
  (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}

void tscProcessAsyncRes(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *taosres = pSql;

  // pCmd may be released, so cache pCmd->command
  int cmd = pCmd->command;
H
[TD-98]  
hjxilinx 已提交
363
  int code = pRes->code;
H
hzcheng 已提交
364 365

  // in case of async insert, restore the user specified callback function
H
hjxilinx 已提交
366
  bool shouldFree = tscShouldBeFreed(pSql);
H
hzcheng 已提交
367 368 369 370 371 372

  if (cmd == TSDB_SQL_INSERT) {
    assert(pSql->fp != NULL);
    pSql->fp = pSql->fetchFp;
  }

B
Bomin Zhang 已提交
373 374 375
  if (pSql->fp) {
    (*pSql->fp)(pSql->param, taosres, code);
  }
H
hzcheng 已提交
376 377

  if (shouldFree) {
378
    tscTrace("%p sqlObj is automatically freed in async res", pSql);
sangshuduo's avatar
sangshuduo 已提交
379
    tscFreeSqlObj(pSql);
H
hzcheng 已提交
380 381 382
  }
}

H
[td-99]  
hjxilinx 已提交
383
static void tscProcessAsyncError(SSchedMsg *pMsg) {
H
hzcheng 已提交
384
  void (*fp)() = pMsg->ahandle;
H
[td-99]  
hjxilinx 已提交
385
  (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
H
hzcheng 已提交
386 387
}

H
[td-99]  
hjxilinx 已提交
388 389 390 391
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
  int32_t* c = malloc(sizeof(int32_t));
  *c = code;
  
H
hzcheng 已提交
392 393 394 395
  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
H
[td-99]  
hjxilinx 已提交
396
  schedMsg.msg = c;
H
hzcheng 已提交
397 398 399 400 401 402 403 404
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscQueueAsyncRes(SSqlObj *pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
    tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
    return;
  } else {
H
[td-32]  
hjxilinx 已提交
405
    tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
H
hzcheng 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
  }

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncRes;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscProcessAsyncFree(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  tscTrace("%p sql is freed", pSql);
  taos_free_result(pSql);
}

void tscQueueAsyncFreeResult(SSqlObj *pSql) {
  tscTrace("%p sqlObj put in queue to async free", pSql);

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncFree;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

int tscSendMsgToServer(SSqlObj *pSql);

H
hjxilinx 已提交
435
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
H
hzcheng 已提交
436 437 438 439 440 441
  SSqlObj *pSql = (SSqlObj *)param;
  if (pSql == NULL || pSql->signature != pSql) return;

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
[td-32]  
hjxilinx 已提交
442 443
  if (code != TSDB_CODE_SUCCESS) {
    pRes->code = code;
H
hzcheng 已提交
444 445 446 447 448
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream == NULL) {
H
hjxilinx 已提交
449
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
450 451 452 453 454

    // check if it is a sub-query of super table query first, if true, enter another routine
    if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
      tscTrace("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);

H
hjxilinx 已提交
455
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
dengyihao's avatar
dengyihao 已提交
456 457 458 459 460
      code = tscGetTableMeta(pSql, pTableMetaInfo);
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        return;
      } else {
        assert(code == TSDB_CODE_SUCCESS);      
H
Haojun Liao 已提交
461
      }
dengyihao's avatar
dengyihao 已提交
462
     
L
lihui 已提交
463
      
H
hjxilinx 已提交
464
      assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
H
hzcheng 已提交
465 466 467

      SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
      SSqlObj *         pParObj = trs->pParentSqlObj;
468
      
H
hjxilinx 已提交
469
      assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
H
hjxilinx 已提交
470
          tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
S
slguan 已提交
471

H
Haojun Liao 已提交
472 473 474 475 476 477
      // NOTE: the vgroupInfo for the queried super table must be existed here.
      assert(pTableMetaInfo->vgroupList != NULL);
      if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {  // continue to process normal async query
478
      if (pCmd->parseFinished) {
H
Haojun Liao 已提交
479 480
        tscTrace("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);

481
        STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
482
        code = tscGetTableMeta(pSql, pTableMetaInfo);
dengyihao's avatar
dengyihao 已提交
483 484 485 486 487
        if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
          return;
        } else {
          assert(code == TSDB_CODE_SUCCESS);      
        }
H
Haojun Liao 已提交
488 489 490 491

        // if failed to process sql, go to error handler
        if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
          return;
L
[#1083]  
lihui 已提交
492
        }
H
Haojun Liao 已提交
493 494 495 496
//          // todo update the submit message according to the new table meta
//          // 1. table uid, 2. ip address
//          code = tscSendMsgToServer(pSql);
//          if (code == TSDB_CODE_SUCCESS) return;
L
[#1083]  
lihui 已提交
497
      } else {
H
Haojun Liao 已提交
498 499
        tscTrace("%p continue parse sql after get table meta", pSql);

H
hjxilinx 已提交
500
        code = tsParseSql(pSql, false);
H
Haojun Liao 已提交
501
        if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
H
Hui Li 已提交
502 503
          STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
          code = tscGetTableMeta(pSql, pTableMetaInfo);
dengyihao's avatar
dengyihao 已提交
504 505 506 507 508
          if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
            return;
          } else {
            assert(code == TSDB_CODE_SUCCESS);      
          }
H
Hui Li 已提交
509
          (*pSql->fp)(pSql->param, pSql, code);
H
Hui Li 已提交
510 511 512
          return;
        }
        
513
        if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
L
[#1083]  
lihui 已提交
514
      }
H
hzcheng 已提交
515
    }
S
slguan 已提交
516

H
hzcheng 已提交
517
  } else {  // stream computing
518
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
519
    code = tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
520 521
    pRes->code = code;

522
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
H
hzcheng 已提交
523

weixin_48148422's avatar
weixin_48148422 已提交
524
    if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
525
      code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
H
hzcheng 已提交
526 527
      pRes->code = code;

528
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
H
hzcheng 已提交
529 530 531
    }
  }

532 533
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
H
hzcheng 已提交
534 535 536 537 538 539
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream) {
    tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
540 541 542 543
    if (!pSql->cmd.parseFinished) {
      tsParseSql(pSql, false);
      sem_post(&pSql->rspSem);
    }
544
    return;
H
hzcheng 已提交
545
  } else {
H
hjxilinx 已提交
546
    tscTrace("%p get tableMeta successfully", pSql);
H
hzcheng 已提交
547 548 549 550
  }

  tscDoQuery(pSql);
}