tscAsync.c 16.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hjxilinx 已提交
17 18 19
#include "tutil.h"

#include "tnote.h"
H
hzcheng 已提交
20
#include "trpc.h"
S
slguan 已提交
21
#include "tscLog.h"
H
hzcheng 已提交
22
#include "tscProfile.h"
H
hjxilinx 已提交
23
#include "tscSubquery.h"
H
hzcheng 已提交
24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
S
slguan 已提交
26
#include "tsched.h"
H
hjxilinx 已提交
27
#include "tschemautil.h"
H
hjxilinx 已提交
28
#include "tsclient.h"
H
hzcheng 已提交
29

30 31
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
32 33 34 35

static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());

/*
36 37 38
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
39
 */
40 41
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
42

H
[td-99]  
hjxilinx 已提交
43
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
H
hzcheng 已提交
44 45
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
46
  
H
hzcheng 已提交
47
  pSql->signature = pSql;
H
hjxilinx 已提交
48 49 50
  pSql->param     = param;
  pSql->pTscObj   = pObj;
  pSql->maxRetry  = TSDB_MAX_REPLICA_NUM;
51
  pSql->fp        = fp;
52
  
H
Haojun Liao 已提交
53
  sem_init(&pSql->rspSem, 0, 0);
S
slguan 已提交
54 55
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("failed to malloc payload");
H
[td-99]  
hjxilinx 已提交
56
    tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
H
hzcheng 已提交
57 58
    return;
  }
59
  
H
Haojun Liao 已提交
60 61
  // todo check for OOM problem
  pSql->sqlstr = calloc(1, sqlLen + 1);
H
hzcheng 已提交
62 63
  if (pSql->sqlstr == NULL) {
    tscError("%p failed to malloc sql string buffer", pSql);
H
[td-99]  
hjxilinx 已提交
64
    tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
65
    free(pCmd->payload);
H
hzcheng 已提交
66 67
    return;
  }
68
  
H
hzcheng 已提交
69 70
  pRes->qhandle = 0;
  pRes->numOfRows = 1;
71
  
S
slguan 已提交
72
  strtolower(pSql->sqlstr, sqlstr);
73
  tscDump("%p SQL: %s", pSql, pSql->sqlstr);
74
  
75
  int32_t code = tsParseSql(pSql, true);
H
hzcheng 已提交
76
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
77
  
H
hzcheng 已提交
78
  if (code != TSDB_CODE_SUCCESS) {
H
[td-99]  
hjxilinx 已提交
79
    pSql->res.code = code;
H
hzcheng 已提交
80 81 82
    tscQueueAsyncRes(pSql);
    return;
  }
83
  
H
hzcheng 已提交
84 85 86
  tscDoQuery(pSql);
}

87
// TODO return the correct error code to client in tscQueueAsyncError
H
hjxilinx 已提交
88
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
89 90 91
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    tscError("bug!!! pObj:%p", pObj);
H
[td-99]  
hjxilinx 已提交
92 93
    terrno = TSDB_CODE_DISCONNECTED;
    tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
94 95 96 97 98
    return;
  }
  
  int32_t sqlLen = strlen(sqlstr);
  if (sqlLen > tsMaxSQLStringLen) {
H
Haojun Liao 已提交
99
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
H
[td-99]  
hjxilinx 已提交
100 101
    terrno = TSDB_CODE_INVALID_SQL;
    tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL);
102 103 104 105 106 107 108 109
    return;
  }
  
  taosNotePrintTsc(sqlstr);
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
H
[td-99]  
hjxilinx 已提交
110 111
    terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
    tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
112 113 114 115 116 117
    return;
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}

118
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
119 120 121 122 123 124 125 126
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

127 128 129 130
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
131 132 133 134 135 136 137 138 139
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

140 141 142 143 144
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
145
    }
146 147 148 149
    
    return;
  }
  
150
  // local merge has handle this situation during super table non-projection query.
H
hjxilinx 已提交
151
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
H
Haojun Liao 已提交
152
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()) {
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

169
  if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
170
    if (pRes->qhandle == 0 && numOfRows != 0) {
H
hzcheng 已提交
171 172 173 174 175
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }

H
[td-99]  
hjxilinx 已提交
176
    tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
H
hzcheng 已提交
177 178 179 180
    return;
  }

  pSql->fp = fp;
H
hjxilinx 已提交
181
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
182 183 184 185 186 187
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
  tscProcessSql(pSql);
}

/*
188 189 190
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
191
 */
192 193
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
194
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
195 196
}

197 198
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
199
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
H
hzcheng 已提交
200 201 202 203 204 205
}

void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
H
[td-99]  
hjxilinx 已提交
206
    tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
H
hzcheng 已提交
207 208 209 210 211 212 213 214
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
[td-99]  
hjxilinx 已提交
215
    tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
H
hzcheng 已提交
216 217 218 219 220
    return;
  }

  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
221
  pSql->fp = tscAsyncFetchRowsProxy;
H
hzcheng 已提交
222 223

  pSql->param = param;
S
slguan 已提交
224
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
225 226
  
  // handle the sub queries of join query
227
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hjxilinx 已提交
228
    tscFetchDatablockFromSubquery(pSql);
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
  } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
      return;
    } else {
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }
    
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
    }
    return;
  } else { // current query is not completed, continue retrieve from node
H
hjxilinx 已提交
251
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
252 253 254 255
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
  
    tscProcessSql(pSql);
H
hzcheng 已提交
256 257 258 259 260 261 262
  }
}

void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
H
[td-99]  
hjxilinx 已提交
263
    tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
H
hzcheng 已提交
264 265 266 267 268 269 270 271
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
[td-99]  
hjxilinx 已提交
272
    tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
H
hzcheng 已提交
273 274 275 276 277
    return;
  }

  pSql->fetchFp = fp;
  pSql->param = param;
278
  
H
hzcheng 已提交
279
  if (pRes->row >= pRes->numOfRows) {
S
slguan 已提交
280
    tscResetForNextRetrieve(pRes);
281
    pSql->fp = tscAsyncFetchSingleRowProxy;
282
    
H
hjxilinx 已提交
283
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
284 285 286
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
    
H
hzcheng 已提交
287 288 289 290 291 292 293 294 295 296 297
    tscProcessSql(pSql);
  } else {
    SSchedMsg schedMsg;
    schedMsg.fp = tscProcessFetchRow;
    schedMsg.ahandle = pSql;
    schedMsg.thandle = pRes->tsrow;
    schedMsg.msg = NULL;
    taosScheduleTask(tscQhandle, &schedMsg);
  }
}

298
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
299 300 301 302
  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

303
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
304
  
H
hzcheng 已提交
305
  if (numOfRows == 0) {
306 307 308
    if (hasMoreVnodesToTry(pSql)) {     // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
    } else {
H
hzcheng 已提交
309
      /*
310 311
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
H
hzcheng 已提交
312 313 314
       */
      (*pSql->fetchFp)(pSql->param, pSql, NULL);
    }
315
    return;
H
hzcheng 已提交
316
  }
317
  
318
  for (int i = 0; i < pCmd->numOfCols; ++i){
H
hjxilinx 已提交
319 320 321
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
    if (pSup->pSqlExpr != NULL) {
//      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
322 323 324 325 326
    } else {
      //todo add
    }
  }
  
327 328 329
  pRes->row++;

  (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
H
hzcheng 已提交
330 331 332 333 334 335
}

void tscProcessFetchRow(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
336
  
H
hjxilinx 已提交
337
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
338

339
  for (int i = 0; i < pCmd->numOfCols; ++i) {
H
hjxilinx 已提交
340 341 342
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);

    if (pSup->pSqlExpr != NULL) {
343
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
344
    } else {
H
hjxilinx 已提交
345
//      todo add
346
    }
347 348
  }
  
H
hzcheng 已提交
349 350 351 352 353 354 355 356 357 358 359 360 361
  pRes->row++;
  (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}

void tscProcessAsyncRes(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *taosres = pSql;

  // pCmd may be released, so cache pCmd->command
  int cmd = pCmd->command;
H
[TD-98]  
hjxilinx 已提交
362
  int code = pRes->code;
H
hzcheng 已提交
363 364

  // in case of async insert, restore the user specified callback function
H
hjxilinx 已提交
365
  bool shouldFree = tscShouldBeFreed(pSql);
H
hzcheng 已提交
366 367 368 369 370 371 372 373 374

  if (cmd == TSDB_SQL_INSERT) {
    assert(pSql->fp != NULL);
    pSql->fp = pSql->fetchFp;
  }

  (*pSql->fp)(pSql->param, taosres, code);

  if (shouldFree) {
375
    tscTrace("%p sqlObj is automatically freed in async res", pSql);
sangshuduo's avatar
sangshuduo 已提交
376
    tscFreeSqlObj(pSql);
H
hzcheng 已提交
377 378 379
  }
}

H
[td-99]  
hjxilinx 已提交
380
static void tscProcessAsyncError(SSchedMsg *pMsg) {
H
hzcheng 已提交
381
  void (*fp)() = pMsg->ahandle;
H
[td-99]  
hjxilinx 已提交
382
  (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
H
hzcheng 已提交
383 384
}

H
[td-99]  
hjxilinx 已提交
385 386 387 388
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
  int32_t* c = malloc(sizeof(int32_t));
  *c = code;
  
H
hzcheng 已提交
389 390 391 392
  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
H
[td-99]  
hjxilinx 已提交
393
  schedMsg.msg = c;
H
hzcheng 已提交
394 395 396 397 398 399 400 401
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscQueueAsyncRes(SSqlObj *pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
    tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
    return;
  } else {
H
[td-32]  
hjxilinx 已提交
402
    tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
H
hzcheng 已提交
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
  }

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncRes;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscProcessAsyncFree(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  tscTrace("%p sql is freed", pSql);
  taos_free_result(pSql);
}

void tscQueueAsyncFreeResult(SSqlObj *pSql) {
  tscTrace("%p sqlObj put in queue to async free", pSql);

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncFree;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

int tscSendMsgToServer(SSqlObj *pSql);

H
hjxilinx 已提交
432
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
H
hzcheng 已提交
433 434 435 436 437 438
  SSqlObj *pSql = (SSqlObj *)param;
  if (pSql == NULL || pSql->signature != pSql) return;

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
[td-32]  
hjxilinx 已提交
439 440
  if (code != TSDB_CODE_SUCCESS) {
    pRes->code = code;
H
hzcheng 已提交
441 442 443 444 445
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream == NULL) {
H
hjxilinx 已提交
446
    // check if it is a sub-query of super table query first, if true, enter another routine
H
hjxilinx 已提交
447
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
448 449
  
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
H
hjxilinx 已提交
450
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
L
lihui 已提交
451 452 453 454
      if (pTableMetaInfo->pTableMeta == NULL){
        code = tscGetTableMeta(pSql, pTableMetaInfo);
        assert(code == TSDB_CODE_SUCCESS);      
      }     
L
lihui 已提交
455
      
H
hjxilinx 已提交
456
      assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
H
hzcheng 已提交
457 458 459

      SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
      SSqlObj *         pParObj = trs->pParentSqlObj;
460
      
H
hjxilinx 已提交
461
      assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
H
hjxilinx 已提交
462
          tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
S
slguan 已提交
463

L
lihui 已提交
464
      tscTrace("%p get metricMeta during super table query successfully", pSql);      
H
hzcheng 已提交
465

H
hjxilinx 已提交
466
      code = tscGetSTableVgroupInfo(pSql, 0);
H
hzcheng 已提交
467 468 469 470
      pRes->code = code;

      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
    } else {  // normal async query continues
471
      if (pCmd->parseFinished) {
472
        tscTrace("%p re-send data to vnode in table Meta callback since sql parsed completed", pSql);
L
[#1083]  
lihui 已提交
473
        
474
        STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
475
        code = tscGetTableMeta(pSql, pTableMetaInfo);
L
[#1083]  
lihui 已提交
476 477
        assert(code == TSDB_CODE_SUCCESS);
      
H
hjxilinx 已提交
478
        if (pTableMetaInfo->pTableMeta) {
479 480
          // todo update the submit message according to the new table meta
          // 1. table uid, 2. ip address
L
[#1083]  
lihui 已提交
481 482 483 484
          code = tscSendMsgToServer(pSql);
          if (code == TSDB_CODE_SUCCESS) return;
        }
      } else {
H
hjxilinx 已提交
485
        code = tsParseSql(pSql, false);
H
Hui Li 已提交
486 487 488 489 490 491 492 493
        if ((pQueryInfo->type & TSDB_QUERY_TYPE_STMT_INSERT) == TSDB_QUERY_TYPE_STMT_INSERT) {
          STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
          code = tscGetTableMeta(pSql, pTableMetaInfo);
          assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL);
          (*pSql->fp)(pSql->param, NULL, code);
          return;
        }
        
L
[#1083]  
lihui 已提交
494 495
        if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
      }
H
hzcheng 已提交
496
    }
S
slguan 已提交
497

H
hzcheng 已提交
498
  } else {  // stream computing
499
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
500
    code = tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
501 502 503 504
    pRes->code = code;

    if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;

weixin_48148422's avatar
weixin_48148422 已提交
505
    if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
506
      code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
H
hzcheng 已提交
507 508 509 510 511 512
      pRes->code = code;

      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
    }
  }

513 514
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
H
hzcheng 已提交
515 516 517 518 519 520 521 522
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream) {
    tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
    /*
     * NOTE:
H
hjxilinx 已提交
523
     * transfer the sql function for super table query before get meter/metric meta,
524
     * since in callback functions, only tscProcessSql(pStream->pSql) is executed!
H
hzcheng 已提交
525
     */
H
hjxilinx 已提交
526
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
527
    
H
hjxilinx 已提交
528
    tscTansformSQLFuncForSTableQuery(pQueryInfo);
H
hzcheng 已提交
529 530
    tscIncStreamExecutionCount(pSql->pStream);
  } else {
H
hjxilinx 已提交
531
    tscTrace("%p get tableMeta successfully", pSql);
H
hzcheng 已提交
532 533 534 535
  }

  tscDoQuery(pSql);
}