tscAsync.c 16.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hjxilinx 已提交
17 18 19
#include "tutil.h"

#include "tnote.h"
H
hzcheng 已提交
20
#include "trpc.h"
S
slguan 已提交
21
#include "tscLog.h"
H
hjxilinx 已提交
22
#include "tscSubquery.h"
23
#include "tscLocalMerge.h"
H
hzcheng 已提交
24
#include "tscUtil.h"
S
slguan 已提交
25
#include "tsched.h"
H
hjxilinx 已提交
26
#include "tschemautil.h"
H
hjxilinx 已提交
27
#include "tsclient.h"
H
hzcheng 已提交
28

29 30
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
31 32 33 34

static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());

/*
35 36 37
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
38
 */
39 40
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
41

B
Bomin Zhang 已提交
42
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
H
hzcheng 已提交
43
  pSql->signature = pSql;
H
hjxilinx 已提交
44 45 46
  pSql->param     = param;
  pSql->pTscObj   = pObj;
  pSql->maxRetry  = TSDB_MAX_REPLICA_NUM;
47
  pSql->fp        = fp;
H
Haojun Liao 已提交
48

H
Haojun Liao 已提交
49
  pSql->sqlstr = calloc(1, sqlLen + 1);
H
hzcheng 已提交
50 51
  if (pSql->sqlstr == NULL) {
    tscError("%p failed to malloc sql string buffer", pSql);
52
    tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
H
hzcheng 已提交
53 54
    return;
  }
H
Haojun Liao 已提交
55

S
slguan 已提交
56
  strtolower(pSql->sqlstr, sqlstr);
H
Haojun Liao 已提交
57 58 59 60 61

  tscDump("%p SQL: %s", pSql, pSql->sqlstr);
  pSql->cmd.curSql = pSql->sqlstr;

  int32_t code = tsParseSql(pSql, true);
62
  if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
63
  
H
hzcheng 已提交
64
  if (code != TSDB_CODE_SUCCESS) {
H
[td-99]  
hjxilinx 已提交
65
    pSql->res.code = code;
H
hzcheng 已提交
66 67 68
    tscQueueAsyncRes(pSql);
    return;
  }
69
  
H
hzcheng 已提交
70 71 72
  tscDoQuery(pSql);
}

73
// TODO return the correct error code to client in tscQueueAsyncError
H
hjxilinx 已提交
74
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
75 76 77
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    tscError("bug!!! pObj:%p", pObj);
78 79
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
80 81 82 83 84
    return;
  }
  
  int32_t sqlLen = strlen(sqlstr);
  if (sqlLen > tsMaxSQLStringLen) {
H
Haojun Liao 已提交
85
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
86 87
    terrno = TSDB_CODE_TSC_INVALID_SQL;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_SQL);
88 89 90 91 92 93 94 95
    return;
  }
  
  taosNotePrintTsc(sqlstr);
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
96 97
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
98 99 100 101 102 103
    return;
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}

104
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
105 106 107 108 109 110 111 112
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

113 114 115 116
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
117 118 119 120 121 122 123 124 125
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

126 127 128 129 130
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
131
    }
132 133 134 135
    
    return;
  }
  
136
  // local merge has handle this situation during super table non-projection query.
H
hjxilinx 已提交
137
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
H
Haojun Liao 已提交
138
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()) {
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

155
  if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
156
    if (pRes->qhandle == 0 && numOfRows != 0) {
H
hzcheng 已提交
157 158 159 160 161
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }

H
[td-99]  
hjxilinx 已提交
162
    tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
H
hzcheng 已提交
163 164 165 166
    return;
  }

  pSql->fp = fp;
H
hjxilinx 已提交
167
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
168 169 170 171 172 173
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
  tscProcessSql(pSql);
}

/*
174 175 176
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
177
 */
178 179
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
180
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
181 182
}

183 184
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
185
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
H
hzcheng 已提交
186 187 188 189 190 191
}

void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
192
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
193 194 195 196 197 198 199 200
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
Haojun Liao 已提交
201 202
    pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
203 204 205 206 207
    return;
  }

  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
208
  pSql->fp = tscAsyncFetchRowsProxy;
H
hzcheng 已提交
209 210

  pSql->param = param;
S
slguan 已提交
211
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
212 213
  
  // handle the sub queries of join query
214
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hjxilinx 已提交
215
    tscFetchDatablockFromSubquery(pSql);
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
  } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
      return;
    } else {
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }
    
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
    }
    return;
  } else { // current query is not completed, continue retrieve from node
H
hjxilinx 已提交
238
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
239 240 241 242
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
  
    tscProcessSql(pSql);
H
hzcheng 已提交
243 244 245 246 247 248 249
  }
}

void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
250
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
251 252 253 254 255 256 257 258
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
259
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
H
hzcheng 已提交
260 261 262 263 264
    return;
  }

  pSql->fetchFp = fp;
  pSql->param = param;
265
  
H
hzcheng 已提交
266
  if (pRes->row >= pRes->numOfRows) {
S
slguan 已提交
267
    tscResetForNextRetrieve(pRes);
268
    pSql->fp = tscAsyncFetchSingleRowProxy;
269
    
H
hjxilinx 已提交
270
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
271 272 273
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
    
H
hzcheng 已提交
274 275
    tscProcessSql(pSql);
  } else {
B
Bomin Zhang 已提交
276
    SSchedMsg schedMsg = { 0 };
H
hzcheng 已提交
277 278 279 280 281 282 283 284
    schedMsg.fp = tscProcessFetchRow;
    schedMsg.ahandle = pSql;
    schedMsg.thandle = pRes->tsrow;
    schedMsg.msg = NULL;
    taosScheduleTask(tscQhandle, &schedMsg);
  }
}

285
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
286 287 288 289
  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

290
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
291
  
H
hzcheng 已提交
292
  if (numOfRows == 0) {
293 294 295
    if (hasMoreVnodesToTry(pSql)) {     // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
    } else {
H
hzcheng 已提交
296
      /*
297 298
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
H
hzcheng 已提交
299 300 301
       */
      (*pSql->fetchFp)(pSql->param, pSql, NULL);
    }
302
    return;
H
hzcheng 已提交
303
  }
304
  
305
  for (int i = 0; i < pCmd->numOfCols; ++i){
H
hjxilinx 已提交
306 307 308
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
    if (pSup->pSqlExpr != NULL) {
//      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
309 310 311 312 313
    } else {
      //todo add
    }
  }
  
314 315 316
  pRes->row++;

  (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
H
hzcheng 已提交
317 318 319 320 321 322
}

void tscProcessFetchRow(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
323
  
H
hjxilinx 已提交
324
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
325

326
  for (int i = 0; i < pCmd->numOfCols; ++i) {
H
hjxilinx 已提交
327 328 329
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);

    if (pSup->pSqlExpr != NULL) {
330
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
331
    } else {
H
hjxilinx 已提交
332
//      todo add
333
    }
334 335
  }
  
H
hzcheng 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348
  pRes->row++;
  (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}

void tscProcessAsyncRes(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *taosres = pSql;

  // pCmd may be released, so cache pCmd->command
  int cmd = pCmd->command;
H
[TD-98]  
hjxilinx 已提交
349
  int code = pRes->code;
H
hzcheng 已提交
350 351

  // in case of async insert, restore the user specified callback function
H
hjxilinx 已提交
352
  bool shouldFree = tscShouldBeFreed(pSql);
H
hzcheng 已提交
353 354 355 356 357 358

  if (cmd == TSDB_SQL_INSERT) {
    assert(pSql->fp != NULL);
    pSql->fp = pSql->fetchFp;
  }

B
Bomin Zhang 已提交
359 360 361
  if (pSql->fp) {
    (*pSql->fp)(pSql->param, taosres, code);
  }
H
hzcheng 已提交
362 363

  if (shouldFree) {
364
    tscTrace("%p sqlObj is automatically freed in async res", pSql);
sangshuduo's avatar
sangshuduo 已提交
365
    tscFreeSqlObj(pSql);
H
hzcheng 已提交
366 367 368
  }
}

H
[td-99]  
hjxilinx 已提交
369
static void tscProcessAsyncError(SSchedMsg *pMsg) {
H
hzcheng 已提交
370
  void (*fp)() = pMsg->ahandle;
H
[td-99]  
hjxilinx 已提交
371
  (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
H
hzcheng 已提交
372 373
}

H
[td-99]  
hjxilinx 已提交
374 375 376 377
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
  int32_t* c = malloc(sizeof(int32_t));
  *c = code;
  
B
Bomin Zhang 已提交
378
  SSchedMsg schedMsg = { 0 };
H
hzcheng 已提交
379 380 381
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
H
[td-99]  
hjxilinx 已提交
382
  schedMsg.msg = c;
H
hzcheng 已提交
383 384 385 386 387 388 389 390
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscQueueAsyncRes(SSqlObj *pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
    tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
    return;
  } else {
H
[td-32]  
hjxilinx 已提交
391
    tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
H
hzcheng 已提交
392 393
  }

B
Bomin Zhang 已提交
394
  SSchedMsg schedMsg = { 0 };
H
hzcheng 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
  schedMsg.fp = tscProcessAsyncRes;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscProcessAsyncFree(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  tscTrace("%p sql is freed", pSql);
  taos_free_result(pSql);
}

void tscQueueAsyncFreeResult(SSqlObj *pSql) {
  tscTrace("%p sqlObj put in queue to async free", pSql);

B
Bomin Zhang 已提交
411
  SSchedMsg schedMsg = { 0 };
H
hzcheng 已提交
412 413 414 415 416 417 418 419 420
  schedMsg.fp = tscProcessAsyncFree;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

int tscSendMsgToServer(SSqlObj *pSql);

H
hjxilinx 已提交
421
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
H
hzcheng 已提交
422 423 424 425 426 427
  SSqlObj *pSql = (SSqlObj *)param;
  if (pSql == NULL || pSql->signature != pSql) return;

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
[td-32]  
hjxilinx 已提交
428 429
  if (code != TSDB_CODE_SUCCESS) {
    pRes->code = code;
H
hzcheng 已提交
430 431 432 433 434
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream == NULL) {
H
hjxilinx 已提交
435
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
436 437 438 439 440

    // check if it is a sub-query of super table query first, if true, enter another routine
    if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
      tscTrace("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);

H
hjxilinx 已提交
441
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
dengyihao's avatar
dengyihao 已提交
442 443 444 445 446
      code = tscGetTableMeta(pSql, pTableMetaInfo);
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        return;
      } else {
        assert(code == TSDB_CODE_SUCCESS);      
H
Haojun Liao 已提交
447
      }
dengyihao's avatar
dengyihao 已提交
448
     
L
lihui 已提交
449
      
H
hjxilinx 已提交
450
      assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
H
hzcheng 已提交
451 452 453

      SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
      SSqlObj *         pParObj = trs->pParentSqlObj;
454
      
H
hjxilinx 已提交
455
      assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
H
hjxilinx 已提交
456
          tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
S
slguan 已提交
457

H
Haojun Liao 已提交
458 459 460 461 462 463
      // NOTE: the vgroupInfo for the queried super table must be existed here.
      assert(pTableMetaInfo->vgroupList != NULL);
      if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {  // continue to process normal async query
464
      if (pCmd->parseFinished) {
H
Haojun Liao 已提交
465 466
        tscTrace("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);

467
        STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
468
        code = tscGetTableMeta(pSql, pTableMetaInfo);
dengyihao's avatar
dengyihao 已提交
469 470 471 472 473
        if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
          return;
        } else {
          assert(code == TSDB_CODE_SUCCESS);      
        }
H
Haojun Liao 已提交
474 475 476 477

        // if failed to process sql, go to error handler
        if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
          return;
L
[#1083]  
lihui 已提交
478
        }
H
Haojun Liao 已提交
479 480 481 482
//          // todo update the submit message according to the new table meta
//          // 1. table uid, 2. ip address
//          code = tscSendMsgToServer(pSql);
//          if (code == TSDB_CODE_SUCCESS) return;
L
[#1083]  
lihui 已提交
483
      } else {
H
Haojun Liao 已提交
484 485
        tscTrace("%p continue parse sql after get table meta", pSql);

H
hjxilinx 已提交
486
        code = tsParseSql(pSql, false);
H
Haojun Liao 已提交
487
        if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
H
Hui Li 已提交
488 489
          STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
          code = tscGetTableMeta(pSql, pTableMetaInfo);
dengyihao's avatar
dengyihao 已提交
490 491 492 493 494
          if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
            return;
          } else {
            assert(code == TSDB_CODE_SUCCESS);      
          }
H
Hui Li 已提交
495
          (*pSql->fp)(pSql->param, pSql, code);
H
Hui Li 已提交
496 497 498
          return;
        }
        
499
        if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
L
[#1083]  
lihui 已提交
500
      }
H
hzcheng 已提交
501
    }
S
slguan 已提交
502

H
hzcheng 已提交
503
  } else {  // stream computing
504
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
505
    code = tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
506 507
    pRes->code = code;

508
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
H
hzcheng 已提交
509

weixin_48148422's avatar
weixin_48148422 已提交
510
    if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
511
      code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
H
hzcheng 已提交
512 513
      pRes->code = code;

514
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
H
hzcheng 已提交
515 516 517
    }
  }

518 519
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
H
hzcheng 已提交
520 521 522 523 524 525
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream) {
    tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
526 527 528 529
    if (!pSql->cmd.parseFinished) {
      tsParseSql(pSql, false);
      sem_post(&pSql->rspSem);
    }
530
    return;
H
hzcheng 已提交
531
  } else {
H
hjxilinx 已提交
532
    tscTrace("%p get tableMeta successfully", pSql);
H
hzcheng 已提交
533 534 535 536
  }

  tscDoQuery(pSql);
}