tscServer.c 78.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27 28 29 30 31
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
32
SRpcIpSet  tscMgmtIpSet;
S
slguan 已提交
33 34
SRpcIpSet  tscDnodeIpSet;

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

47
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
48 49
  SRpcIpSet* pIpList = &pSql->ipList;
  pIpList->inUse    = 0;
B
Bomin Zhang 已提交
50 51 52 53
  if (pVgroupInfo == NULL) {
    pIpList->numOfIps = 0;
    return;
  }
54
  
B
Bomin Zhang 已提交
55
  pIpList->numOfIps = pVgroupInfo->numOfIps;
56 57 58
  for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
    strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
    pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
59 60 61
  }
}

S
slguan 已提交
62
void tscPrintMgmtIp() {
S
slguan 已提交
63
  if (tscMgmtIpSet.numOfIps <= 0) {
S
Shengliang Guan 已提交
64
    tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps);
S
slguan 已提交
65
  } else {
S
slguan 已提交
66
    for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
67
      tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
68
    }
S
slguan 已提交
69 70 71
  }
}

72
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
73 74 75
  tscMgmtIpSet.numOfIps = pIpList->numOfIps;
  tscMgmtIpSet.inUse = pIpList->inUse;
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
76
    tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
S
slguan 已提交
77 78 79
  }
}

S
slguan 已提交
80
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
S
slguan 已提交
81
  tscMgmtIpSet = *pIpSet;
82
  tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
S
slguan 已提交
83
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
84
    tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
85
  }
S
slguan 已提交
86 87
}

H
hjxilinx 已提交
88 89 90 91 92 93 94
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
95
UNUSED_FUNC
H
hjxilinx 已提交
96 97
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
S
slguan 已提交
98
  return tscMgmtIpSet.numOfIps * factor;
H
hjxilinx 已提交
99 100
}

H
hzcheng 已提交
101 102 103 104 105 106 107 108 109 110 111 112
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
113
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
114
    SRpcIpSet *      pIpList = &pRsp->ipList;
115 116
    if (pIpList->numOfIps > 0) 
      tscSetMgmtIpList(pIpList);
S
slguan 已提交
117

S
Shengliang Guan 已提交
118 119
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
120 121 122
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
123 124
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
125 126
    }
  } else {
127
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
141 142 143
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
144
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
145
    
146 147 148 149
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
150
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
151 152 153 154 155
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

156
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
157 158 159 160
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
161
    tscAddSubqueryInfo(&pObj->pHb->cmd);
162

163
    tscDebug("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
164 165 166
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
167
    tscDebug("%p free HB object and release connection", pObj);
H
hzcheng 已提交
168 169 170 171 172 173 174 175 176
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
177
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
178 179 180
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
181
  if (NULL == pMsg) {
182
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
183
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
184 185
  }

186 187
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
J
jtao1735 已提交
188 189 190
    pSql->ipList = tscMgmtIpSet;
  }

191
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
192

J
jtao1735 已提交
193
  SRpcMsg rpcMsg = {
194 195 196
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197 198
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
199
      .code    = 0
J
jtao1735 已提交
200
  };
H
hzcheng 已提交
201

H
Haojun Liao 已提交
202 203 204 205
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206
  rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
207
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
208 209
}

J
jtao1735 已提交
210
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211
  SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
H
Haojun Liao 已提交
212
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
213
    tscError("%p sql is already released", pSql);
214 215
    return;
  }
216

H
Haojun Liao 已提交
217
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
218 219
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
220

H
Haojun Liao 已提交
221
  if (pObj->signature != pObj) {
222
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
223 224 225 226 227 228 229 230

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
231
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
232
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
233

H
Haojun Liao 已提交
234
    tscFreeSqlObj(pSql);
235
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
236
    return;
H
hzcheng 已提交
237 238
  }

J
jtao1735 已提交
239 240 241 242 243 244
  if (pCmd->command < TSDB_SQL_MGMT) {
    if (pIpSet) pSql->ipList = *pIpSet;
  } else {
    if (pIpSet) tscMgmtIpSet = *pIpSet;
  }

245
  if (rpcMsg->pCont == NULL) {
246
    rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
slguan 已提交
247
  } else {
248
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
    // if (rpcMsg->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
    //   if (pCmd->command == TSDB_SQL_CONNECT) {
    //     rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
    //     rpcFreeCont(rpcMsg->pCont);
    //     return;
    //   }

    //   if (pCmd->command == TSDB_SQL_HB) {
    //     rpcMsg->code = TSDB_CODE_RPC_NOT_READY;
    //     rpcFreeCont(rpcMsg->pCont);
    //     return;
    //   }

    //   if (pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
    //       pCmd->command == TSDB_SQL_STABLEVGROUP || pCmd->command == TSDB_SQL_SHOW ||
    //       pCmd->command == TSDB_SQL_RETRIEVE) {
    //     // get table meta/vgroup query will not retry, do nothing
    //   }
    // }
268

269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
    if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_INSERT ||
         pCmd->command == TSDB_SQL_UPDATE_TAGS_VAL) &&
        (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
         rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
      tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
      // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
      if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
        pSql->cmd.submitSchema = 1;
      }

      pSql->res.code = rpcMsg->code;  // keep the previous error code
      if (pSql->retry > pSql->maxRetry) {
        tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
      } else {
        rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

        // if there is an error occurring, proceed to the following error handling procedure.
        // todo add test cases
        if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
          rpcFreeCont(rpcMsg->pCont);
          return;
S
slguan 已提交
290
        }
H
hzcheng 已提交
291 292
      }
    }
S
slguan 已提交
293
  }
294

H
hzcheng 已提交
295
  pRes->rspLen = 0;
296
  
297 298
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
299
  } else {
300
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
301 302
  }

S
slguan 已提交
303
  if (pRes->code == TSDB_CODE_SUCCESS) {
304
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
305 306 307
    pSql->retry = 0;
  }

308
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
309
    assert(rpcMsg->msgType == pCmd->msgType + 1);
310
    pRes->code    = rpcMsg->code;
311
    pRes->rspType = rpcMsg->msgType;
312
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
313

314
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
315 316
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
317
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
318 319
      } else {
        pRes->pRsp = tmp;
320
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
321
      }
322 323
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
324 325
    }

H
hzcheng 已提交
326 327 328 329
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
330
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
331
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
332 333 334 335 336 337 338
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
339
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
340
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
341
    } else {
342
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
343 344
    }
  }
345
  
H
Haojun Liao 已提交
346
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
347
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
348
  }
S
Shengliang Guan 已提交
349

350
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
351
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS)? pRes->numOfRows: pRes->code;
352
    
H
hjxilinx 已提交
353
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
354
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
355

356
    if (shouldFree) {
357
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
358
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
359 360 361
    }
  }

362
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
363 364
}

S
slguan 已提交
365 366 367
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
368

H
hjxilinx 已提交
369 370 371 372 373 374 375
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
376
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
377 378 379 380 381 382
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
383
  }
384

385 386 387
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
388
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
389
    pRes->code = code;
H
hjxilinx 已提交
390
    tscQueueAsyncRes(pSql);
391
    return pRes->code;
S
slguan 已提交
392
  }
H
hjxilinx 已提交
393 394
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
395 396 397
}

int tscProcessSql(SSqlObj *pSql) {
398 399
  char *   name = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
400 401
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
402
  STableMetaInfo *pTableMetaInfo = NULL;
403
  uint32_t        type = 0;
404

405
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
406
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
407 408
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
409
    }
410

411
    type = pQueryInfo->type;
412
  
H
hjxilinx 已提交
413
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
414
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
415
  }
416

417
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
418
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
419
    if (pTableMetaInfo == NULL) {
420
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
421 422
      return pSql->res.code;
    }
H
hjxilinx 已提交
423
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
H
Haojun Liao 已提交
424
    pSql->ipList = tscMgmtIpSet;
H
hzcheng 已提交
425 426 427
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
428
  
S
slguan 已提交
429 430
  return doProcessSql(pSql);
}
H
hzcheng 已提交
431

H
hjxilinx 已提交
432
void tscKillSTableQuery(SSqlObj *pSql) {
433 434 435
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
436
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
437 438 439 440 441
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
442
    if (pSub == NULL) {
H
hzcheng 已提交
443 444
      continue;
    }
S
slguan 已提交
445

H
hzcheng 已提交
446 447
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
448
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
449
     */
dengyihao's avatar
dengyihao 已提交
450 451 452
    rpcCancelRequest(pSub->pRpcCtx);
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
    tscQueueAsyncRes(pSub);
H
hzcheng 已提交
453 454 455 456 457
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
458
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
459 460 461 462 463
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
464
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
465 466 467 468 469 470
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

471
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
472 473
}

J
jtao1735 已提交
474
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
475
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
476
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
477

478
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
479
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
480

481
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
482 483
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
484
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
485
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
486
    
H
hjxilinx 已提交
487
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
488 489
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
490
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
491
  } else {
H
hjxilinx 已提交
492
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
493
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
494
  }
495 496

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
497
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
498 499 500

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

501
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
502 503
}

504
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
505
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
506
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
507
  
508
  char* pMsg = pSql->cmd.payload;
509 510 511
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
512 513
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

514
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
515 516
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

517
  pMsg += sizeof(SMsgDesc);
518
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
519

H
hjxilinx 已提交
520
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
521
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
522
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
523
  
H
Haojun Liao 已提交
524
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
525

H
hjxilinx 已提交
526
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
527
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
528
  tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
529
  
530
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
531
      pSql->ipList.numOfIps);
532
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
533 534 535
}

/*
536
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
537
 */
538
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
539
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
540
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
541

542
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
543 544 545 546
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
547
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
548 549
}

550
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
551
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
552
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
553

H
hjxilinx 已提交
554
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
555
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
556 557
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
558
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
559 560
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
561
  
B
Bomin Zhang 已提交
562
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
563
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
564 565
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
566
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
567 568
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
569
    }
weixin_48148422's avatar
weixin_48148422 已提交
570

571
    tscSetDnodeIpList(pSql, pVgroupInfo);
B
Bomin Zhang 已提交
572 573 574
    if (pVgroupInfo != NULL) {
      pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
    }
weixin_48148422's avatar
weixin_48148422 已提交
575

576 577 578
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->tid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
B
Bomin Zhang 已提交
579
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
580

581 582
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
H
Haojun Liao 已提交
583
  } else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo
584 585 586
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
587

588
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
589

590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
    
    // set the vgroup info
    tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
606
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
607 608 609 610
      pMsg += sizeof(STableIdInfo);
    }
  }
  
611
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
H
Haojun Liao 已提交
612
      pTableMeta->sid, pTableMeta->uid);
H
hjxilinx 已提交
613
  
614 615 616
  return pMsg;
}

617
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
618 619
  SSqlCmd *pCmd = &pSql->cmd;

620
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
621

S
slguan 已提交
622 623
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
624
    return -1;  // todo add test for this
S
slguan 已提交
625
  }
626
  
627
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
628
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
629
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
630
  
H
hjxilinx 已提交
631
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
632 633 634
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
635 636 637 638 639 640 641 642 643 644
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
645

646
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
647

648
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
649
  
650
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
651 652
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
653
  } else {
H
hjxilinx 已提交
654 655
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
656 657
  }

658 659
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
660
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
661 662
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
663
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
664 665
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
666
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
667
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
668
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
669
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
670
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
671 672 673
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hzcheng 已提交
674 675

  // set column list ids
676 677
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
678
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
679
  
680 681 682
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
683

684
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
685 686
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
B
Bomin Zhang 已提交
687
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
688 689
               pColSchema->name);

690
      return TSDB_CODE_TSC_INVALID_SQL;
691
    }
H
hzcheng 已提交
692 693 694

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
695
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
696
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
697

S
slguan 已提交
698 699 700
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
701

S
slguan 已提交
702
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
703
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
704 705

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
706

707
      if (pColFilter->filterstr) {
S
slguan 已提交
708 709 710 711 712 713 714 715 716 717
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
718

S
slguan 已提交
719 720 721 722 723
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
724 725
  }

H
hjxilinx 已提交
726
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
727
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
728
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
729

H
hjxilinx 已提交
730
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
731
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
732 733 734 735
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

736 737 738
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
739

740
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
741
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
742
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
743 744

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
745
      // todo add log
H
hzcheng 已提交
746 747 748 749 750
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
751
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
752 753 754 755 756
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
757
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
758
  }
759
  
760
  // serialize the table info (sid, uid, tags)
761 762
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
763
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
764
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
765
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
766 767
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
768
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
769 770
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
771 772 773
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

774 775
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
776 777 778

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
779 780 781
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
782 783 784
    }
  }

785
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
786
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
787 788
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
789 790
    }
  }
791 792 793 794 795 796 797 798 799
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
800
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
801 802 803 804 805 806
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
807
                 pCol->colIndex.columnIndex, pColSchema->name);
808

809
        return TSDB_CODE_TSC_INVALID_SQL;
810 811 812 813 814 815 816 817 818 819 820 821
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
822

H
Haojun Liao 已提交
823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
844
  // compressed ts block
845
  pQueryMsg->tsOffset = htonl(pMsg - pCmd->payload);
S
slguan 已提交
846 847 848
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

849
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
850
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
851
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
852 853

    // todo refactor
B
Bomin Zhang 已提交
854 855 856 857 858
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
859 860 861

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
862 863 864 865
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
866 867 868 869

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
870 871
  }

S
slguan 已提交
872 873
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
874 875
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
876 877
  }

878
  int32_t msgLen = pMsg - pCmd->payload;
H
hzcheng 已提交
879

880
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
881
  pCmd->payloadLen = msgLen;
S
slguan 已提交
882
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
883
  
884
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
885
  assert(msgLen + minMsgSize() <= size);
886 887

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
888 889
}

890 891
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
892
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
893
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
894

895
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
896

897
  assert(pCmd->numOfClause == 1);
898
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
899
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
900

901
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
902 903
}

904 905
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
906
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
907 908
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
909
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
910
  }
H
hzcheng 已提交
911

912
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
913 914
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
915
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
916

917
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
918 919
}

920 921
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
922
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
923 924
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
925
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
926
  }
H
hzcheng 已提交
927

928
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
929

930 931
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
932

933 934
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
935

936
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
937

938 939 940 941 942 943 944 945
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
946

947 948 949 950 951 952 953 954 955 956 957 958 959
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
960

S
slguan 已提交
961
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
962
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
963 964
}

965 966
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
967
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
968

S
slguan 已提交
969 970
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
971
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
972 973
  }

974
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
975

976 977 978
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
979

980 981 982 983
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
984 985
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
986
  }
H
hzcheng 已提交
987

988
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
989
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
990
  } else {
S
slguan 已提交
991
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
992
  }
H
hzcheng 已提交
993

994
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
995 996
}

997 998
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
999
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1000
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1001 1002
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1003

1004 1005
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1006
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1007

S
slguan 已提交
1008 1009
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1010
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1011 1012
  }

1013
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1014

1015
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1016
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1017
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1018

S
slguan 已提交
1019
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1020
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1021 1022
}

1023 1024
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1025
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1026

S
slguan 已提交
1027 1028
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1029
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1030
  }
H
hzcheng 已提交
1031

1032
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1033
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1034
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1035
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1036

S
slguan 已提交
1037
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1038
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1039 1040
}

1041
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1042
  SSqlCmd *pCmd = &pSql->cmd;
1043
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1044 1045
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1046
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1047
  }
H
hzcheng 已提交
1048

1049
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1050
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1051
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1052
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1053

1054
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1055 1056
}

S
[TD-16]  
slguan 已提交
1057
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1058
  SSqlCmd *pCmd = &pSql->cmd;
1059
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1060
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1061

S
slguan 已提交
1062 1063
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1064
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1065
  }
H
hzcheng 已提交
1066

1067
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1068
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1069
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1070

1071
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1072 1073
}

S
[TD-16]  
slguan 已提交
1074 1075 1076 1077 1078 1079 1080
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1081
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1082 1083 1084 1085
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1086
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1087 1088 1089 1090

  return TSDB_CODE_SUCCESS;
}

1091 1092
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1093
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1094

S
slguan 已提交
1095 1096
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1097
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1098
  }
1099

1100
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1101
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1102
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1103
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1104

1105
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1106 1107
}

1108
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1109
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1110
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1111
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1112
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1113

S
slguan 已提交
1114 1115
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1116
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1117
  }
H
hzcheng 已提交
1118

1119
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1120

1121
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1122
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1123
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1124
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1125
  } else {
B
Bomin Zhang 已提交
1126
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1127 1128
  }

1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1141

1142 1143 1144 1145
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1146
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1147
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1148 1149
}

1150
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1151
  SSqlCmd *pCmd = &pSql->cmd;
1152
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1153

1154 1155
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1156
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1157 1158
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1159
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1160 1161
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1162
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1163 1164 1165
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1166 1167
}

1168
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1169 1170
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1171
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1172

1173
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1174
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1175 1176
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1177
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1178
  }
1179

1180 1181 1182
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1183 1184 1185 1186

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1187
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1188
  int              msgLen = 0;
S
slguan 已提交
1189
  SSchema *        pSchema;
H
hzcheng 已提交
1190
  int              size = 0;
1191 1192 1193
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1194
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1195 1196

  // Reallocate the payload size
1197
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1198 1199
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1200
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1201
  }
H
hzcheng 已提交
1202 1203


1204
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1205
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1206 1207

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1208
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1209

1210 1211 1212
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1213 1214 1215 1216
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1217
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1218

1219 1220
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1221 1222 1223 1224 1225 1226 1227
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1228
  } else {  // create (super) table
1229
    pSchema = (SSchema *)pCreateTableMsg->schema;
1230

H
hzcheng 已提交
1231
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1232
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1233 1234 1235 1236

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1237

H
hzcheng 已提交
1238 1239 1240 1241
      pSchema++;
    }

    pMsg = (char *)pSchema;
1242 1243
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1244

1245 1246 1247
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1248 1249 1250
    }
  }

H
hjxilinx 已提交
1251
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1252

S
slguan 已提交
1253
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1254
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1255
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1256
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1257 1258

  assert(msgLen + minMsgSize() <= size);
1259
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1260 1261 1262
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1263
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1264
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1265 1266 1267
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1268
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1269 1270
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1271

1272
  SSqlCmd    *pCmd = &pSql->cmd;
1273 1274
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1275
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1276 1277 1278
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1279 1280 1281 1282
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1283 1284
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1285
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1286

H
hjxilinx 已提交
1287
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1288
  pAlterTableMsg->type = htons(pAlterInfo->type);
1289

1290
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1291
  SSchema *pSchema = pAlterTableMsg->schema;
1292
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1293
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1294
  
H
hzcheng 已提交
1295 1296 1297 1298 1299 1300 1301
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1302 1303 1304
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1305

S
slguan 已提交
1306
  msgLen = pMsg - (char*)pAlterTableMsg;
1307

H
hzcheng 已提交
1308
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1309
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1310 1311

  assert(msgLen + minMsgSize() <= size);
1312

1313
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1314 1315
}

1316 1317 1318 1319
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1320
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1321
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1322

1323 1324
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1325

1326 1327
  tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);

1328 1329 1330
  return TSDB_CODE_SUCCESS;
}

1331
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1332
  SSqlCmd *pCmd = &pSql->cmd;
1333
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1334
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1335

1336
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1337
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1338
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1339

1340
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1341 1342
}

1343
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1344
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1345
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1346
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1347

S
slguan 已提交
1348 1349
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1350
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1351
  }
S
slguan 已提交
1352

S
slguan 已提交
1353 1354 1355 1356
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1357

1358
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1359 1360
}

1361
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1362
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1363 1364 1365
    return pRes->code;
  }

H
hjxilinx 已提交
1366
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1367
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1368
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1383

1384
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1385

H
hzcheng 已提交
1386 1387 1388 1389 1390 1391
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1392
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1393
  } else {
S
slguan 已提交
1394
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1410
  SSqlCmd *       pCmd = &pSql->cmd;
1411
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1412

H
hjxilinx 已提交
1413
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1414 1415
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1416 1417 1418
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1419 1420 1421
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1422 1423 1424
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1425
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1426 1427 1428
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1429
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1430
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1431 1432

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1433
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1434 1435 1436
  }

  pRes->row = 0;
1437
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1438

1439
  int32_t code = pRes->code;
H
hjxilinx 已提交
1440 1441 1442 1443
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1444 1445 1446 1447 1448
  }

  return code;
}

S
slguan 已提交
1449
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1450

1451
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1452
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1453
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1454
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1455
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1456

S
slguan 已提交
1457 1458
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1459
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1460 1461
  }

1462
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1463 1464 1465 1466

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1467 1468 1469
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1470

1471
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1472 1473
}

H
hjxilinx 已提交
1474
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1475
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1476
  char *         pMsg;
H
hzcheng 已提交
1477 1478
  int            msgLen = 0;

B
Bomin Zhang 已提交
1479 1480 1481 1482
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1483
    if (NULL == tmpData) {
1484
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1485 1486
    }

H
hzcheng 已提交
1487
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1488
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1489 1490
  }

1491 1492 1493
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1494
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1495

1496
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1497
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1498
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1499

1500
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1501

B
Bomin Zhang 已提交
1502 1503 1504
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1505 1506
  }

1507
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;
S
slguan 已提交
1508
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1509 1510 1511 1512

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1513
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1514 1515
}

S
slguan 已提交
1516
/**
1517
 *  multi table meta req pkg format:
1518
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1519 1520
 *      no used         4B
 **/
1521
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1522
#if 0
S
slguan 已提交
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1535
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1536

1537
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1538
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1539 1540

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1541
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1542 1543 1544 1545
  }

  tfree(tmpData);

1546
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1547
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1548 1549 1550

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1551
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1552 1553 1554
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1555 1556
#endif
  return 0;  
S
slguan 已提交
1557 1558
}

H
hjxilinx 已提交
1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1585

H
hjxilinx 已提交
1586 1587
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1598 1599 1600
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1601
  }
H
hjxilinx 已提交
1602 1603

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1604
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1605

1606
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1607 1608
}

1609 1610
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1611 1612
  STscObj *pObj = pSql->pTscObj;

1613
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1614

S
Shengliang Guan 已提交
1615
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1616 1617 1618
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1619
    numOfQueries++;
H
hzcheng 已提交
1620 1621
  }

S
Shengliang Guan 已提交
1622
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1623 1624 1625
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1626
    numOfStreams++;
H
hzcheng 已提交
1627 1628
  }

1629
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1630
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1631
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1632 1633 1634
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1635

1636
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1637 1638
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1639
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1640 1641 1642 1643

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1644
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1645

1646
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1647 1648
}

1649 1650
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1651

1652 1653
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1654
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1655 1656
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1657 1658
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1659
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1660

H
hjxilinx 已提交
1661 1662
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
1663
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1664 1665
  }

B
Bomin Zhang 已提交
1666
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1667
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1668
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1669 1670
  }

1671 1672
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1673
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1674 1675
  }

H
hjxilinx 已提交
1676 1677
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1678 1679
  }

1680
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1681

1682
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1683 1684 1685
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1686 1687 1688 1689 1690

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1691
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1692 1693 1694
    pSchema++;
  }

1695 1696
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1697
  
H
hzcheng 已提交
1698
  // todo add one more function: taosAddDataIfNotExists();
1699
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1700
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1701

H
Haojun Liao 已提交
1702 1703
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1704
  
1705
  // todo handle out of memory case
1706
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1707
    free(pTableMeta);
1708
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1709
  }
H
hzcheng 已提交
1710

1711
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
1712
  free(pTableMeta);
1713
  
H
hjxilinx 已提交
1714
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1715 1716
}

S
slguan 已提交
1717
/**
1718
 *  multi table meta rsp pkg format:
1719
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1720 1721 1722
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1723
#if 0
S
slguan 已提交
1724 1725 1726 1727 1728
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1729
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1730
    pSql->res.numOfTotal = 0;
1731
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1732 1733 1734 1735
  }

  rsp++;

1736
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1737
  totalNum = htonl(pInfo->numOfTables);
1738
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1739 1740

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1741
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1742
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1743 1744 1745

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1746
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1747 1748
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1749 1750
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1751
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1752
      pSql->res.numOfTotal = i;
1753
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1754 1755
    }

H
hjxilinx 已提交
1756 1757 1758 1759
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1760
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1761
    //      pSql->res.numOfTotal = i;
1762
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1763 1764 1765 1766
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1767
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1768
    //      pSql->res.numOfTotal = i;
1769
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1770 1771 1772 1773
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1774
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1775
    //      pSql->res.numOfTotal = i;
1776
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1777 1778
    //    }
    //
H
hjxilinx 已提交
1779
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1814
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1815
    //  }
S
slguan 已提交
1816
  }
H
hjxilinx 已提交
1817
  
S
slguan 已提交
1818 1819
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1820
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1821 1822
#endif
  
S
slguan 已提交
1823 1824 1825
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1826
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1827
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1828
  
H
hjxilinx 已提交
1829
  // NOTE: the order of several table must be preserved.
1830
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1831 1832
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1833
  
1834 1835 1836
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1837
  
1838
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];

      pVgroups->vgId = htonl(pVgroups->vgId);
      assert(pVgroups->numOfIps >= 1);

      for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
        pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
      }

      pMsg += size;
1860
    }
H
hjxilinx 已提交
1861 1862
  }
  
S
slguan 已提交
1863
  return pSql->res.code;
H
hzcheng 已提交
1864 1865 1866 1867 1868 1869
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1870
  STableMetaMsg * pMetaMsg;
1871
  SCMShowRsp *pShow;
S
slguan 已提交
1872
  SSchema *    pSchema;
H
hzcheng 已提交
1873 1874
  char         key[20];

1875 1876 1877
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1878
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1879

H
hjxilinx 已提交
1880
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1881

1882
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1883
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1884 1885
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1886
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1887
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1888

H
hjxilinx 已提交
1889
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1890

H
hjxilinx 已提交
1891
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1892 1893
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1894 1895 1896 1897
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1898
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1899 1900
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1901
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1902
  
H
hjxilinx 已提交
1903 1904 1905
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1906 1907
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
      tsTableMetaKeepTimer);
H
hjxilinx 已提交
1908
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1909

1910 1911 1912 1913
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1914 1915
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1916
  SColumnIndex index = {0};
H
hjxilinx 已提交
1917 1918 1919
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1920
    index.columnIndex = i;
1921 1922
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1923 1924
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1925
    
H
hjxilinx 已提交
1926
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1927
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1928
  }
H
hjxilinx 已提交
1929 1930
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1931
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1932 1933
  
  tfree(pTableMeta);
H
hzcheng 已提交
1934 1935 1936 1937
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
1938
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
1939 1940 1941
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1942
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1943
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1944 1945
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1946 1947
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1948
  
1949 1950
  if (pConnect->ipList.numOfIps > 0) 
    tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
1951

S
slguan 已提交
1952
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
1953 1954
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
1955
  pObj->connId = htonl(pConnect->connId);
S
scripts  
slguan 已提交
1956
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
1957 1958 1959 1960 1961

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
1962
  STscObj *       pObj = pSql->pTscObj;
1963
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1964

B
Bomin Zhang 已提交
1965
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
1966 1967 1968 1969
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
1970
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
1971 1972 1973 1974
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
1975
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1976

H
Haojun Liao 已提交
1977 1978
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
1979 1980 1981 1982 1983 1984 1985
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
1986 1987
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
1988
   */
1989
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
1990
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
1991

H
hjxilinx 已提交
1992 1993
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
1994 1995 1996 1997 1998 1999
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2000
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2001

H
Haojun Liao 已提交
2002
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2003
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2004 2005 2006
    return 0;
  }

2007
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2008
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2009

H
hjxilinx 已提交
2010
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2011
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2012
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2013

2014
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2015
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2016
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2031
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2032 2033 2034
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2035
  pRes->data = NULL;
S
slguan 已提交
2036
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2037 2038 2039
  return 0;
}

H
hjxilinx 已提交
2040
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2041 2042 2043
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2044
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2045 2046 2047

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2048 2049
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2050
  pRes->completed = (pRetrieve->completed == 1);
2051
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2052
  
2053
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2054 2055 2056 2057
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2058
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2059
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2060
    
H
hjxilinx 已提交
2061
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2062 2063
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2064 2065
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2066
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2067
    p += sizeof(int32_t);
S
slguan 已提交
2068
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2069 2070
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2071
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2072 2073
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2074
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2075
    }
2076 2077
  }

H
hzcheng 已提交
2078
  pRes->row = 0;
2079
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2080 2081 2082 2083 2084

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2085 2086
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2087
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2088

S
slguan 已提交
2089
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2090 2091 2092

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2093

2094
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2095 2096 2097 2098
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2099
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2100

2101
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2102 2103
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2104
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2105
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2106
  }
2107

H
hzcheng 已提交
2108 2109 2110
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2111

2112
  tscAddSubqueryInfo(&pNew->cmd);
2113 2114 2115 2116

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2117
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2118
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2119
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2120
    free(pNew);
2121

2122
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2123 2124
  }

H
hjxilinx 已提交
2125
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2126
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2127

B
Bomin Zhang 已提交
2128
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2129 2130
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2131
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2132

H
hjxilinx 已提交
2133 2134
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2135

H
hjxilinx 已提交
2136 2137
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2138
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2139 2140 2141 2142 2143
  }

  return code;
}

H
hjxilinx 已提交
2144
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2145
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2146

2147
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2148 2149
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2150 2151
  }
  
H
Haojun Liao 已提交
2152
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2153
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2154
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2155
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2156
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2157 2158 2159

    return TSDB_CODE_SUCCESS;
  }
2160 2161
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2162 2163
}

H
hjxilinx 已提交
2164
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2165
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2166
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2167 2168 2169
}

/**
H
Haojun Liao 已提交
2170
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2171
 * @param pSql          sql object
H
Haojun Liao 已提交
2172
 * @param tableId       table full name
H
hzcheng 已提交
2173 2174
 * @return              status code
 */
H
Haojun Liao 已提交
2175
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2176
  SSqlCmd *pCmd = &pSql->cmd;
2177 2178

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2179
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2180

H
Haojun Liao 已提交
2181 2182
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2183
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
Haojun Liao 已提交
2184
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2185 2186
  }

H
Haojun Liao 已提交
2187 2188
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2189 2190
}

H
hjxilinx 已提交
2191
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2192
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2193
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2194
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2195 2196
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2197 2198
    }
  }
H
hjxilinx 已提交
2199 2200 2201 2202
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2203

H
hjxilinx 已提交
2204
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2205
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2206 2207 2208
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2209 2210
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2211

S
slguan 已提交
2212
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2213 2214 2215
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2216
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2217 2218
  
  SQueryInfo *pNewQueryInfo = NULL;
2219
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2220
    tscFreeSqlObj(pNew);
2221 2222
    return code;
  }
2223
  
H
hjxilinx 已提交
2224
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2225
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2226 2227 2228
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2229 2230 2231 2232 2233 2234
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2235

2236
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2237
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2238

2239 2240 2241 2242
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2243
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2244 2245 2246 2247 2248
  }

  return code;
}

2249
void tscInitMsgsFp() {
S
slguan 已提交
2250 2251
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2252
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2253 2254

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2255
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2256

2257 2258
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2259 2260

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2261
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2262 2263 2264
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2265
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2266 2267 2268
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2269
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2270
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2271 2272 2273 2274
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2275
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2276
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2277
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2278 2279 2280 2281

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2282 2283 2284
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2285 2286

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2287
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2288 2289 2290 2291 2292

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2293
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2294
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2295
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2296 2297

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2298
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2299
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2300

H
Haojun Liao 已提交
2301 2302 2303 2304 2305
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2306

H
hzcheng 已提交
2307 2308
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2309
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2310 2311 2312 2313 2314 2315 2316 2317 2318 2319

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}