tscServer.c 126.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tcache.h"
#include "trpc.h"
S
slguan 已提交
19
#include "tscJoinProcess.h"
H
hzcheng 已提交
20
#include "tscProfile.h"
21
#include "tscSQLParser.h"
H
hzcheng 已提交
22 23 24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
S
slguan 已提交
26
#include "tscompression.h"
H
hzcheng 已提交
27 28 29 30 31 32 33
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
34 35 36
SIpStrList tscMgmtIpList;
int        tsMasterIndex = 0;
int        tsSlaveIndex = 1;
H
hzcheng 已提交
37

38 39
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
40
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
H
Hongze Cheng 已提交
41
char *doBuildMsgHeader(SSqlObj *pSql, char **pStart);
S
slguan 已提交
42
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
H
hzcheng 已提交
43 44
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
45 46 47
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
48 49 50

static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); }

S
slguan 已提交
51 52
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
S
slguan 已提交
53
    tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
S
slguan 已提交
54
  } else {
S
slguan 已提交
55 56 57
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
      tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]);
    }
S
slguan 已提交
58 59 60
  }
}

S
slguan 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74
void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
  tscMgmtIpList.numOfIps = pIpList->numOfIps;
  if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) {
    for (int i = 0; i < pIpList->numOfIps; ++i) {
      tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
      tscMgmtIpList.ip[i] = pIpList->ip[i];
    }
    tscTrace("cluster mgmt IP list:");
    tscPrintMgmtIp();
  }
}

void tscSetMgmtIpListFromEdge() {
  if (tscMgmtIpList.numOfIps != 2) {
S
slguan 已提交
75
    tscMgmtIpList.numOfIps = 2;
S
slguan 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
    strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
    tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
    strcpy(tscMgmtIpList.ipstr[1], tsMasterIp);
    tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

void tscSetMgmtIpList(SIpList *pIpList) {
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
94 95 96
  }
}

H
hjxilinx 已提交
97 98 99 100 101 102 103 104 105 106 107 108
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
  return tscMgmtIpList.numOfIps * factor;
}

H
hzcheng 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
    SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
122
    SIpList *      pIpList = &pRsp->ipList;
S
slguan 已提交
123
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
124

H
hzcheng 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
      if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId);
      if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId);
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
146 147 148
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
149
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
150
    
151 152 153 154
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
S
slguan 已提交
155 156 157 158 159
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
160 161 162 163
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
164
    tscAddSubqueryInfo(&pObj->pHb->cmd);
165

S
slguan 已提交
166
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
    tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle);
    taosCloseRpcConn(pObj->pHb->thandle);

    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
  STscObj *pTscObj = pSql->pTscObj;
H
hjxilinx 已提交
183
  if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
S
slguan 已提交
184 185 186 187 188
    *pCode = 0;
    pSql->retry++;
    pSql->index = pSql->index % tscMgmtIpList.numOfIps;
    if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
    void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
H
hzcheng 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202

    if (thandle == NULL) {
      SRpcConnInit connInit;
      memset(&connInit, 0, sizeof(connInit));
      connInit.cid = 0;
      connInit.sid = 0;
      connInit.meterId = pSql->pTscObj->user;
      connInit.peerId = 0;
      connInit.shandle = pTscMgmtConn;
      connInit.ahandle = pSql;
      connInit.peerPort = tsMgmtShellPort;
      connInit.spi = 1;
      connInit.encrypt = 0;
      connInit.secret = pSql->pTscObj->pass;
H
hjxilinx 已提交
203
      
S
slguan 已提交
204
      connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
H
hzcheng 已提交
205 206 207 208
      thandle = taosOpenRpcConn(&connInit, pCode);
    }

    pSql->thandle = thandle;
S
slguan 已提交
209 210 211 212
    pSql->ip = tscMgmtIpList.ip[pSql->index];
    pSql->vnode = TSC_MGMT_VNODE;
    tscTrace("%p mgmt index:%d ip:0x%x is picked up, pConn:%p", pSql, pSql->index, tscMgmtIpList.ip[pSql->index],
             pSql->thandle);
H
hzcheng 已提交
213
  }
214

H
hjxilinx 已提交
215 216 217 218 219
  // the pSql->res.code is the previous error(status) code.
  if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
    if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) {
      *pCode = pSql->res.code;
    }
220

H
hjxilinx 已提交
221 222
    tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
  }
H
hzcheng 已提交
223 224 225 226 227 228 229 230 231
}

void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
  SVPeerDesc *pVPeersDesc = NULL;
  static int  vidIndex = 0;
  STscObj *   pTscObj = pSql->pTscObj;

  pSql->thandle = NULL;

S
slguan 已提交
232
  SSqlCmd *       pCmd = &pSql->cmd;
233
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
234

235
  if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {  // multiple vnode query
H
hjxilinx 已提交
236
    SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
237 238 239 240
    if (vnodeList != NULL) {
      pVPeersDesc = vnodeList->vpeerDesc;
    }
  } else {
S
slguan 已提交
241
    SMeterMeta *pMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
    if (pMeta == NULL) {
      tscError("%p pMeterMeta is NULL", pSql);
      pSql->retry = pSql->maxRetry;
      return;
    }
    pVPeersDesc = pMeta->vpeerDesc;
  }

  if (pVPeersDesc == NULL) {
    pSql->retry = pSql->maxRetry;
    tscError("%p pVPeerDesc is NULL", pSql);
  }

  while (pSql->retry < pSql->maxRetry) {
    (pSql->retry)++;
S
slguan 已提交
257
    char ipstr[40] = {0};
S
slguan 已提交
258
    if (pVPeersDesc[pSql->index].ip == 0) {
S
slguan 已提交
259
      /*
S
slguan 已提交
260 261
       * in the edge edition, ip is 0, and at this time we use masterIp instead
       * in the cluster edition, ip is vnode ip
S
slguan 已提交
262 263
       */
      pVPeersDesc[pSql->index].ip = tscMgmtIpList.ip[0];
S
slguan 已提交
264
    }
H
hjxilinx 已提交
265
    *pCode = TSDB_CODE_SUCCESS;
S
slguan 已提交
266 267 268

    void *thandle =
        taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user);
H
hzcheng 已提交
269

S
slguan 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
    if (thandle == NULL) {
      SRpcConnInit connInit;
      tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip);
      memset(&connInit, 0, sizeof(connInit));
      connInit.cid = vidIndex;
      connInit.sid = 0;
      connInit.spi = 0;
      connInit.encrypt = 0;
      connInit.meterId = pSql->pTscObj->user;
      connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS));
      connInit.shandle = pVnodeConn;
      connInit.ahandle = pSql;
      connInit.peerIp = ipstr;
      connInit.peerPort = tsVnodeShellPort;
      thandle = taosOpenRpcConn(&connInit, pCode);
      vidIndex = (vidIndex + 1) % tscNumOfThreads;
    }

    pSql->thandle = thandle;
    pSql->ip = pVPeersDesc[pSql->index].ip;
    pSql->vnode = pVPeersDesc[pSql->index].vnode;
H
hjxilinx 已提交
291
    tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
S
slguan 已提交
292 293
             pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);

H
hzcheng 已提交
294 295
    break;
  }
296

H
4]  
hjxilinx 已提交
297
  // the pSql->res.code is the previous error(status) code.
H
hjxilinx 已提交
298
  if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
H
4]  
hjxilinx 已提交
299
    if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) {
H
hjxilinx 已提交
300 301
      *pCode = pSql->res.code;
    }
302

H
hjxilinx 已提交
303
    tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
H
hjxilinx 已提交
304
  }
H
hzcheng 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317
}

int tscSendMsgToServer(SSqlObj *pSql) {
  uint8_t code = TSDB_CODE_NETWORK_UNAVAIL;

  if (pSql->thandle == NULL) {
    if (pSql->cmd.command < TSDB_SQL_MGMT)
      tscGetConnToVnode(pSql, &code);
    else
      tscGetConnToMgmt(pSql, &code);
  }

  if (pSql->thandle) {
S
slguan 已提交
318 319 320 321 322 323 324 325
    /*
     * the total length of message
     * rpc header + actual message body + digest
     *
     * the pSql object may be released automatically during insert procedure, in which the access of
     * message body by using "if (pHeader->msgType & 1)" may cause the segment fault.
     *
     */
S
slguan 已提交
326
    size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest);
S
slguan 已提交
327 328

    // the memory will be released by taosProcessResponse, so no memory leak here
S
slguan 已提交
329
    char *buf = malloc(totalLen);
330 331 332 333
    if (NULL == buf) {
      tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }
S
slguan 已提交
334
    memcpy(buf, pSql->cmd.payload, totalLen);
S
slguan 已提交
335

H
hzcheng 已提交
336
    tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
S
slguan 已提交
337

S
slguan 已提交
338
    char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf);
H
hzcheng 已提交
339
    if (pStart) {
H
hjxilinx 已提交
340 341 342 343
      /*
       * this SQL object may be released by other thread due to the completion of this query even before the log
       * is dumped to log file. So the signature needs to be kept in a local variable.
       */
344
      uint64_t signature = (uint64_t)pSql->signature;
S
slguan 已提交
345
      if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf);
346

H
hzcheng 已提交
347
      int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql);
H
hjxilinx 已提交
348 349 350
      if (ret >= 0) {
        code = 0;
      }
351

H
hjxilinx 已提交
352
      tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, signature);
H
hzcheng 已提交
353 354 355 356 357 358
    }
  }

  return code;
}

S
slguan 已提交
359 360
void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) {
  SIpList *pIpList = (SIpList *)(cont);
S
slguan 已提交
361
  tscSetMgmtIpList(pIpList);
S
slguan 已提交
362 363 364 365 366 367 368 369 370 371 372

  if (pSql->cmd.command < TSDB_SQL_READ) {
    tsMasterIndex = 0;
    pSql->index = 0;
  } else {
    pSql->index++;
  }

  tscPrintMgmtIp();
}

H
hzcheng 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
  if (ahandle == NULL) return NULL;

  SIntMsg *pMsg = (SIntMsg *)msg;
  SSqlObj *pSql = (SSqlObj *)ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  int      code = TSDB_CODE_NETWORK_UNAVAIL;

  if (pSql->signature != pSql) {
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
    return NULL;
  }

  if (pSql->thandle != thandle) {
    tscError("%p thandle:%p is different from received:%p", pSql, pSql->thandle, thandle);
    return NULL;
  }

  tscTrace("%p msg:%p is received from server, pConn:%p", pSql, msg, thandle);

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
396 397
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
398 399 400 401 402
    taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
    tscFreeSqlObj(pSql);
    return ahandle;
  }

403
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
404
  if (msg == NULL) {
405
    tscTrace("%p no response from ip:%s", pSql, taosIpStr(pSql->ip));
406

S
slguan 已提交
407
    pSql->index++;
H
hzcheng 已提交
408 409 410 411
    pSql->thandle = NULL;
    // todo taos_stop_query() in async model
    /*
     * in case of
H
hjxilinx 已提交
412 413
     * 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the request to server.
     * 2. retrieve, do NOT re-issue the retrieve request since the qhandle may have been released by server
H
hzcheng 已提交
414 415 416 417 418 419 420 421 422
     */
    if (pCmd->command != TSDB_SQL_FETCH && pCmd->command != TSDB_SQL_RETRIEVE && pCmd->command != TSDB_SQL_KILL_QUERY &&
        pRes->code != TSDB_CODE_QUERY_CANCELLED) {
      code = tscSendMsgToServer(pSql);
      if (code == 0) return NULL;
    }

    // renew meter meta in case it is changed
    if (pCmd->command < TSDB_SQL_FETCH && pRes->code != TSDB_CODE_QUERY_CANCELLED) {
S
slguan 已提交
423 424
      pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
      code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
425 426 427
      pRes->code = code;
      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;

S
slguan 已提交
428
      if (pMeterMetaInfo->pMeterMeta) {
H
hzcheng 已提交
429 430 431 432 433
        code = tscSendMsgToServer(pSql);
        if (code == 0) return pSql;
      }
    }
  } else {
H
hjxilinx 已提交
434
    uint16_t rspCode = pMsg->content[0];
F
fangpanpan 已提交
435
    
H
hjxilinx 已提交
436
    if (rspCode == TSDB_CODE_REDIRECT) {
S
slguan 已提交
437 438 439 440 441 442
      tscTrace("%p it shall be redirected!", pSql);
      taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
      pSql->thandle = NULL;

      if (pCmd->command > TSDB_SQL_MGMT) {
        tscProcessMgmtRedirect(pSql, pMsg->content + 1);
443
      } else if (pCmd->command == TSDB_SQL_INSERT) {
S
slguan 已提交
444 445
        pSql->index++;
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
S
slguan 已提交
446 447 448 449 450 451 452
      } else {
        pSql->index++;
      }

      code = tscSendMsgToServer(pSql);
      if (code == 0) return pSql;
      msg = NULL;
H
hjxilinx 已提交
453 454
    } else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
        rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
S
slguan 已提交
455 456
        rspCode == TSDB_CODE_NETWORK_UNAVAIL || rspCode == TSDB_CODE_NOT_ACTIVE_SESSION ||
        rspCode == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
457 458 459 460 461 462 463 464 465 466
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
S
slguan 已提交
467
     pSql->thandle = NULL;
H
hzcheng 已提交
468
      taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
469

470
      if (pCmd->command == TSDB_SQL_CONNECT) {
H
hzcheng 已提交
471 472 473 474
        code = TSDB_CODE_NETWORK_UNAVAIL;
      } else if (pCmd->command == TSDB_SQL_HB) {
        code = TSDB_CODE_NOT_READY;
      } else {
H
hjxilinx 已提交
475
        tscTrace("%p it shall renew meter meta, code:%d", pSql, rspCode);
476

H
hzcheng 已提交
477
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
478 479
        pSql->res.code = (uint8_t)rspCode;  // keep the previous error code

S
slguan 已提交
480
        code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
481 482
        if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;

S
slguan 已提交
483
        if (pMeterMetaInfo->pMeterMeta) {
H
hzcheng 已提交
484 485 486 487 488 489
          code = tscSendMsgToServer(pSql);
          if (code == 0) return pSql;
        }
      }

      msg = NULL;
S
slguan 已提交
490
    } else {  // for other error set and return to invoker
H
hjxilinx 已提交
491
      code = rspCode;
H
hzcheng 已提交
492 493 494 495 496 497 498
    }
  }

  pSql->retry = 0;

  if (msg) {
    if (pCmd->command < TSDB_SQL_MGMT) {
S
slguan 已提交
499 500 501
      if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
        if (pMeterMetaInfo->pMeterMeta)  // it may be deleted
          pMeterMetaInfo->pMeterMeta->index = pSql->index;
H
hzcheng 已提交
502
      } else {
H
hjxilinx 已提交
503
        SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
504 505 506 507 508 509 510 511 512 513
        pVnodeSidList->index = pSql->index;
      }
    } else {
      if (pCmd->command > TSDB_SQL_READ)
        tsSlaveIndex = pSql->index;
      else
        tsMasterIndex = pSql->index;
    }
  }

S
slguan 已提交
514
  if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
H
hzcheng 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528

  pRes->rspLen = 0;
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
    pRes->code = (code != TSDB_CODE_SUCCESS) ? code : TSDB_CODE_NETWORK_UNAVAIL;
  } else {
    tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
  }

  if (msg && pRes->code != TSDB_CODE_QUERY_CANCELLED) {
    assert(pMsg->msgType == pCmd->msgType + 1);
    pRes->code = pMsg->content[0];
    pRes->rspType = pMsg->msgType;
    pRes->rspLen = pMsg->msgLen - sizeof(SIntMsg);

S
slguan 已提交
529 530 531 532 533 534 535 536 537 538 539
    char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
    if (tmp == NULL) {
      pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    } else {
      pRes->pRsp = tmp;
      if (pRes->rspLen) {
        memcpy(pRes->pRsp, pMsg->content + 1, pRes->rspLen - 1);
      }
    }

    // ignore the error information returned from mnode when set ignore flag in sql
H
hzcheng 已提交
540 541 542 543 544 545 546 547 548 549
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CREATE_DB_RSP) {
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
    if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
      pRes->numOfRows += *(int32_t *)pRes->pRsp;
S
slguan 已提交
550 551 552 553 554

      tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
               *(int32_t *)pRes->pRsp, pRes->rspLen);
    } else {
      tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
H
hzcheng 已提交
555 556 557 558 559 560 561 562 563 564 565 566
    }
  }

  if (tscKeepConn[pCmd->command] == 0 ||
      (pRes->code != TSDB_CODE_SUCCESS && pRes->code != TSDB_CODE_ACTION_IN_PROGRESS)) {
    if (pSql->thandle != NULL) {
      taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
      pSql->thandle = NULL;
    }
  }

  if (pSql->fp == NULL) {
S
slguan 已提交
567
    tsem_post(&pSql->rspSem);
H
hzcheng 已提交
568 569 570 571 572 573 574 575 576
  } else {
    if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
      code = (*tscProcessMsgRsp[pCmd->command])(pSql);

    if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
      int   command = pCmd->command;
      void *taosres = tscKeepConn[command] ? pSql : NULL;
      code = pRes->code ? -pRes->code : pRes->numOfRows;

S
slguan 已提交
577
      tscTrace("%p Async SQL result:%d res:%p", pSql, code, taosres);
H
hzcheng 已提交
578 579

      /*
S
slguan 已提交
580 581 582
       * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
       * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
       * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
H
hzcheng 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
       *
       * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
       * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
       */
      bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
      if (command == TSDB_SQL_INSERT) {  // handle multi-vnode insertion situation
        (*pSql->fp)(pSql, taosres, code);
      } else {
        (*pSql->fp)(pSql->param, taosres, code);
      }

      if (shouldFree) {
        // If it is failed, all objects allocated during execution taos_connect_a should be released
        if (command == TSDB_SQL_CONNECT) {
          taos_close(pObj);
          tscTrace("%p Async sql close failed connection", pSql);
        } else {
          tscFreeSqlObj(pSql);
          tscTrace("%p Async sql is automatically freed", pSql);
        }
      }
    }
  }

  return ahandle;
}

S
slguan 已提交
610
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
611
static int      tscLaunchSTableSubqueries(SSqlObj *pSql);
H
hzcheng 已提交
612

S
slguan 已提交
613
// todo merge with callback
H
hjxilinx 已提交
614
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
615
  SSqlCmd *   pCmd = &pSql->cmd;
H
hjxilinx 已提交
616
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
617

S
slguan 已提交
618 619 620 621
  pSql->res.qhandle = 0x1;
  pSql->res.numOfRows = 0;

  if (pSql->pSubs == NULL) {
H
hjxilinx 已提交
622
    pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
S
slguan 已提交
623 624 625 626 627
    if (pSql->pSubs == NULL) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }
  }

H
hjxilinx 已提交
628
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
S
slguan 已提交
629 630 631
  if (pNew == NULL) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
632

S
slguan 已提交
633
  pSql->pSubs[pSql->numOfSubs++] = pNew;
H
hjxilinx 已提交
634
  assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
H
hzcheng 已提交
635

636 637
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
S
slguan 已提交
638 639

    // refactor as one method
640
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
641
    assert(pNewQueryInfo != NULL);
642

643 644
    tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
    tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
S
slguan 已提交
645

646
    tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid);
S
slguan 已提交
647

648
    tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
649
    tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
S
slguan 已提交
650 651

    pNew->cmd.numOfCols = 0;
652
    pNewQueryInfo->intervalTime = 0;
653
    memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
654

655 656
    // backup the data and clear it in the sqlcmd object
    pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
657
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
S
slguan 已提交
658 659

    // set the ts,tags that involved in join, as the output column of intermediate result
660
    tscClearSubqueryInfo(&pNew->cmd);
661

S
slguan 已提交
662 663 664
    SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
    SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};

665
    tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
S
slguan 已提交
666 667

    // set the tags value for ts_comp function
668
    SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
S
slguan 已提交
669

670
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0);
H
hjxilinx 已提交
671
    int16_t         tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
672 673 674 675 676 677 678 679

    pExpr->param->i64Key = tagColIndex;
    pExpr->numOfParams = 1;

    // add the filter tag column
    for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
      SColumnBase *pColBase = &pSupporter->colList.pColList[i];
      if (pColBase->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
680 681
        tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
        pNewQueryInfo->colList.numOfCols++;
S
slguan 已提交
682 683
      }
    }
684 685 686 687 688 689
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
             pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
H
hjxilinx 已提交
690
    tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
691 692 693 694 695 696 697
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
             pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
    tscPrintSelectClause(pNew, 0);
S
slguan 已提交
698
  } else {
699
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
700
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
S
slguan 已提交
701
  }
702

H
hjxilinx 已提交
703
#ifdef _DEBUG_VIEW
H
hjxilinx 已提交
704
  tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
705
#endif
706
  
S
slguan 已提交
707 708 709 710 711 712 713 714
  return tscProcessSql(pNew);
}

int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *asyncFp = pSql->fp;
715 716 717 718
  if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) {
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
719
  }
720 721 722

  int32_t code = tscSendMsgToServer(pSql);

S
slguan 已提交
723
  if (asyncFp) {
724
    if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
725 726 727 728 729 730
      pRes->code = code;
      tscQueueAsyncRes(pSql);
    }
    return 0;
  }

731
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
732 733 734 735 736 737
    pRes->code = code;
    return code;
  }

  tsem_wait(&pSql->rspSem);

738
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) (*tscProcessMsgRsp[pCmd->command])(pSql);
S
slguan 已提交
739 740 741 742 743 744 745

  tsem_post(&pSql->emptyRspSem);

  return pRes->code;
}

int tscProcessSql(SSqlObj *pSql) {
746 747 748
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
749 750
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
751 752 753
  SMeterMetaInfo *pMeterMetaInfo = NULL;
  int16_t         type = 0;

754 755 756 757 758
  if (pQueryInfo != NULL) {
    pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
    if (pMeterMetaInfo != NULL) {
      name = pMeterMetaInfo->name;
    }
759

760
    type = pQueryInfo->type;
761 762 763
  
    // for hearbeat, numOfTables == 0;
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
764
  }
765

766
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
767 768
  pSql->retry = 0;
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
769
    pSql->maxRetry = TSDB_VNODES_SUPPORT;
770

H
hjxilinx 已提交
771 772 773 774 775
    // the pMeterMetaInfo cannot be NULL
    if (pMeterMetaInfo == NULL) {
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
776

S
slguan 已提交
777 778
    if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
      pSql->index = pMeterMetaInfo->pMeterMeta->index;
H
hjxilinx 已提交
779
    } else {  // it must be the parent SSqlObj for super table query
780
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
781
        int32_t idx = pMeterMetaInfo->vnodeIndex;
782

S
slguan 已提交
783
        SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
H
hzcheng 已提交
784 785 786 787 788 789 790 791 792
        pSql->index = pSidList->index;
      }
    }
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
    pSql->index = pSql->cmd.command < TSDB_SQL_READ ? tsMasterIndex : tsSlaveIndex;
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
793
  // todo handle async situation
794 795
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
S
slguan 已提交
796
      SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
797

798
      pState->numOfTotal = pQueryInfo->numOfTables;
S
slguan 已提交
799

800
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
801 802 803 804
        SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);

        if (pSupporter == NULL) {  // failed to create support struct, abort current query
          tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
805
          pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
S
slguan 已提交
806 807 808 809 810
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          return pSql->res.code;
        }

H
hjxilinx 已提交
811
        int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
S
slguan 已提交
812 813 814 815 816 817 818 819
        if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
          tscDestroyJoinSupporter(pSupporter);
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          break;
        }
      }

S
slguan 已提交
820 821
      tsem_post(&pSql->emptyRspSem);
      tsem_wait(&pSql->rspSem);
S
slguan 已提交
822

S
slguan 已提交
823
      tsem_post(&pSql->emptyRspSem);
S
slguan 已提交
824 825 826 827 828 829 830 831 832 833

      if (pSql->numOfSubs <= 0) {
        pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      } else {
        pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
      }

      return TSDB_CODE_SUCCESS;
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
834
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
835 836 837 838
        return doProcessSql(pSql);
      }
    }
  }
H
hzcheng 已提交
839

840
  if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
841 842
    /*
     * (ref. line: 964)
H
hjxilinx 已提交
843
     * Before this function returns from tscLaunchSTableSubqueries and continues, pSql may have been released at user
H
hzcheng 已提交
844 845 846 847 848 849 850
     * program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack.
     *
     * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
     * which causes deadlock. So we keep it as local variable.
     */
    void *fp = pSql->fp;

H
hjxilinx 已提交
851
    if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
852 853 854 855
      return pRes->code;
    }

    if (fp == NULL) {
S
slguan 已提交
856 857 858
      tsem_post(&pSql->emptyRspSem);
      tsem_wait(&pSql->rspSem);
      tsem_post(&pSql->emptyRspSem);
H
hzcheng 已提交
859 860 861 862 863 864 865 866

      // set the command flag must be after the semaphore been correctly set.
      pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
    }

    return pSql->res.code;
  }

S
slguan 已提交
867 868
  return doProcessSql(pSql);
}
H
hzcheng 已提交
869

870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
  assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
  
    tfree(pSupport->localBuffer);
  
    pthread_mutex_unlock(&pSupport->queryMutex);
    pthread_mutex_destroy(&pSupport->queryMutex);
  
    tfree(pSupport);
  
    tscFreeSqlObj(pSub);
S
slguan 已提交
887
  }
888 889
  
  free(pState);
H
hzcheng 已提交
890 891
}

H
hjxilinx 已提交
892
int tscLaunchSTableSubqueries(SSqlObj *pSql) {
H
hzcheng 已提交
893
  SSqlRes *pRes = &pSql->res;
894
  SSqlCmd *pCmd = &pSql->cmd;
895

S
slguan 已提交
896
  // pRes->code check only serves in launching metric sub-queries
H
hzcheng 已提交
897
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
898 899
    pCmd->command = TSDB_SQL_RETRIEVE_METRIC;  // enable the abort of kill metric function.
    return pRes->code;
H
hzcheng 已提交
900 901 902 903
  }

  tExtMemBuffer **  pMemoryBuf = NULL;
  tOrderDescriptor *pDesc = NULL;
H
hjxilinx 已提交
904
  SColumnModel *       pModel = NULL;
H
hzcheng 已提交
905 906

  pRes->qhandle = 1;  // hack the qhandle check
907 908 909

  const uint32_t nBufferSize = (1 << 16);  // 64KB

910
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
911
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
912 913
  int32_t         numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes;
  assert(numOfSubQueries > 0);
H
hzcheng 已提交
914 915 916 917 918 919 920 921 922 923

  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
  if (ret != 0) {
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    if (pSql->fp) {
      tscQueueAsyncRes(pSql);
    }
    return pRes->code;
  }

924 925
  pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
  pSql->numOfSubs = numOfSubQueries;
926

927
  tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
S
slguan 已提交
928
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
929
  pState->numOfTotal = numOfSubQueries;
H
hzcheng 已提交
930 931
  pRes->code = TSDB_CODE_SUCCESS;

932 933 934 935 936
  int32_t i = 0;
  for (; i < numOfSubQueries; ++i) {
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
H
hzcheng 已提交
937 938
      break;
    }
939
    
H
hzcheng 已提交
940 941
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
S
slguan 已提交
942
    trs->pState = pState;
943
    
H
hzcheng 已提交
944
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
945 946 947 948 949 950
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs);
      break;
    }
    
H
hjxilinx 已提交
951
    trs->subqueryIndex = i;
H
hzcheng 已提交
952 953 954 955 956 957 958 959
    trs->pParentSqlObj = pSql;
    trs->pFinalColModel = pModel;

    pthread_mutexattr_t mutexattr = {0};
    pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(&trs->queryMutex, &mutexattr);
    pthread_mutexattr_destroy(&mutexattr);

960
    SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
S
slguan 已提交
961
    if (pNew == NULL) {
962 963 964
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs->localBuffer);
      tfree(trs);
S
slguan 已提交
965 966 967 968
      break;
    }

    // todo handle multi-vnode situation
969
    if (pQueryInfo->tsBuf) {
970
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
971
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
S
slguan 已提交
972
    }
973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997
    
    tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
  }
  
  if (i < numOfSubQueries) {
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
  
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;   // free all allocated resource
  }
  
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;
  }
  
  for(int32_t j = 0; j < numOfSubQueries; ++j) {
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
    tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
    tscProcessSql(pSub);
H
hzcheng 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
  }

  return TSDB_CODE_SUCCESS;
}

static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
  tscTrace("%p start to free subquery result", pSql);

  if (pSql->res.code == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
  }
S
slguan 已提交
1009

H
hzcheng 已提交
1010 1011 1012 1013 1014 1015 1016 1017
  tfree(trsupport->localBuffer);

  pthread_mutex_unlock(&trsupport->queryMutex);
  pthread_mutex_destroy(&trsupport->queryMutex);

  tfree(trsupport);
}

S
slguan 已提交
1018 1019
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);

H
hzcheng 已提交
1020
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
S
slguan 已提交
1021 1022 1023 1024 1025 1026 1027 1028 1029
// set no disk space error info
#ifdef WINDOWS
  LPVOID lpMsgBuf;
  FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
                GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),  // Default language
                (LPTSTR)&lpMsgBuf, 0, NULL);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
  LocalFree(lpMsgBuf);
#else
H
hzcheng 已提交
1030 1031 1032
  char buf[256] = {0};
  strerror_r(errno, buf, 256);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
S
slguan 已提交
1033
#endif
H
hzcheng 已提交
1034

S
slguan 已提交
1035
  trsupport->pState->code = -errCode;
H
hzcheng 已提交
1036 1037 1038 1039
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

  pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
1040
  tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
H
hzcheng 已提交
1041 1042 1043 1044
}

static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
  SSqlObj *pPObj = trsupport->pParentSqlObj;
H
hjxilinx 已提交
1045
  int32_t  subqueryIndex = trsupport->subqueryIndex;
H
hzcheng 已提交
1046 1047

  assert(pSql != NULL);
1048 1049 1050
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
         pPObj->numOfSubs == pState->numOfTotal);
H
hzcheng 已提交
1051 1052

  /* retrieved in subquery failed. OR query cancelled in retrieve phase. */
1053 1054
  if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
    pState->code = -(int)pPObj->res.code;
H
hzcheng 已提交
1055 1056 1057 1058 1059 1060 1061 1062

    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
     * Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
    tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
1063
             subqueryIndex, pState->code);
H
hzcheng 已提交
1064 1065
  }

S
slguan 已提交
1066
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
H
hjxilinx 已提交
1067 1068
    tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
1069
        subqueryIndex, pState->code);
H
hzcheng 已提交
1070
  } else {
1071
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1072
      /*
S
slguan 已提交
1073 1074
       * current query failed, and the retry count is less than the available
       * count, retry query clear previous retrieved data, then launch a new sub query
H
hzcheng 已提交
1075
       */
H
hjxilinx 已提交
1076
      tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
H
hzcheng 已提交
1077 1078 1079 1080 1081

      // clear local saved number of results
      trsupport->localBuffer->numOfElems = 0;
      pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
1082
      tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
H
hjxilinx 已提交
1083
               subqueryIndex, trsupport->numOfRetry);
S
slguan 已提交
1084

1085
      SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
S
slguan 已提交
1086 1087 1088 1089
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
                 trsupport->pParentSqlObj, pSql);

1090
        pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1091 1092 1093
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
        return;
      }
H
hzcheng 已提交
1094 1095 1096

      tscProcessSql(pNew);
      return;
S
slguan 已提交
1097
    } else {  // reach the maximum retry count, abort
1098
      atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
S
slguan 已提交
1099
      tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
1100
               numOfRows, subqueryIndex, pState->code);
H
hzcheng 已提交
1101 1102 1103
    }
  }

H
Hongze Cheng 已提交
1104 1105
  int32_t numOfTotal = pState->numOfTotal;

1106
  int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
1107
  if (finished < numOfTotal) {
1108
    tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
1109 1110 1111 1112
    return tscFreeSubSqlObj(trsupport, pSql);
  }

  // all subqueries are failed
H
hjxilinx 已提交
1113
  tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
1114
  pPObj->res.code = -(pState->code);
H
hzcheng 已提交
1115 1116 1117

  // release allocated resource
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
1118
                            pState->numOfTotal);
H
hzcheng 已提交
1119

S
slguan 已提交
1120
  tfree(trsupport->pState);
H
hzcheng 已提交
1121 1122
  tscFreeSubSqlObj(trsupport, pSql);

S
slguan 已提交
1123
  // sync query, wait for the master SSqlObj to proceed
H
hzcheng 已提交
1124 1125
  if (pPObj->fp == NULL) {
    // sync query, wait for the master SSqlObj to proceed
S
slguan 已提交
1126 1127
    tsem_wait(&pPObj->emptyRspSem);
    tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
1128

S
slguan 已提交
1129
    tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
1130 1131 1132

    pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
  } else {
S
slguan 已提交
1133
    // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
1134 1135
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);

1136
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
1137 1138 1139 1140 1141
      (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
    } else {  // regular super table query
      if (pPObj->res.code != TSDB_CODE_SUCCESS) {
        tscQueueAsyncRes(pPObj);
      }
H
hzcheng 已提交
1142 1143 1144 1145 1146 1147
    }
  }
}

void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
1148
  int32_t           idx = trsupport->subqueryIndex;
H
hzcheng 已提交
1149 1150 1151 1152
  SSqlObj *         pPObj = trsupport->pParentSqlObj;
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;

  SSqlObj *pSql = (SSqlObj *)tres;
1153
  if (pSql == NULL) {  // sql object has been released in error process, return immediately
H
hzcheng 已提交
1154 1155 1156 1157
    tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
    return;
  }

1158 1159 1160 1161
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
      pPObj->numOfSubs == pState->numOfTotal);
  
H
hzcheng 已提交
1162 1163 1164
  // query process and cancel query process may execute at the same time
  pthread_mutex_lock(&trsupport->queryMutex);

1165
  if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1166 1167 1168
    return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
  }

1169 1170 1171
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1172
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1173

S
slguan 已提交
1174
  SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
H
hzcheng 已提交
1175 1176 1177 1178
  SVPeerDesc *   pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];

  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
1179
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
H
hzcheng 已提交
1180

S
slguan 已提交
1181
    tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
1182
             pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
1183
    
H
hjxilinx 已提交
1184
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
1185 1186 1187 1188 1189 1190
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
          pPObj, pSql, tsMaxNumOfOrderedResults, num);
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
      return;
    }
    
H
hzcheng 已提交
1191 1192 1193 1194

#ifdef _DEBUG_VIEW
    printf("received data from vnode: %d rows\n", pRes->numOfRows);
    SSrcColumnInfo colInfo[256] = {0};
1195 1196

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
1197
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
H
hzcheng 已提交
1198
#endif
S
slguan 已提交
1199
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
1200 1201
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
1202 1203 1204
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
1205
    
S
slguan 已提交
1206
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
1207
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
1208 1209 1210 1211 1212 1213 1214 1215
    if (ret < 0) {
      // set no disk space error info, and abort retry
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    } else {
      pthread_mutex_unlock(&trsupport->queryMutex);
      taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
    }

S
slguan 已提交
1216 1217
  } else {  // all data has been retrieved to client
    /* data in from current vnode is stored in cache and disk */
H
hjxilinx 已提交
1218
    uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
S
slguan 已提交
1219 1220
    tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
             pSvd->vnode, numOfRowsFromVnode, idx);
H
hzcheng 已提交
1221

H
hjxilinx 已提交
1222
    tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
H
hzcheng 已提交
1223 1224

#ifdef _DEBUG_VIEW
L
lihui 已提交
1225
    printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
H
hzcheng 已提交
1226
    SSrcColumnInfo colInfo[256] = {0};
1227
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
1228
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
H
hzcheng 已提交
1229 1230
                       trsupport->localBuffer->numOfElems, colInfo);
#endif
1231
    
S
slguan 已提交
1232
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
1233 1234
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
1235 1236 1237
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
H
hzcheng 已提交
1238 1239 1240

    // each result for a vnode is ordered as an independant list,
    // then used as an input of loser tree for disk-based merge routine
1241 1242
    int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
                                    pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
1243 1244 1245 1246
    if (ret != 0) {
      /* set no disk space error info, and abort retry */
      return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    }
1247
  
H
Hongze Cheng 已提交
1248 1249
    // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
    // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
H
hjxilinx 已提交
1250
    // In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
H
Hongze Cheng 已提交
1251 1252
    int32_t numOfTotal = pState->numOfTotal;

1253
    int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
1254
    if (finished < numOfTotal) {
1255
      tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
1256 1257 1258 1259
      return tscFreeSubSqlObj(trsupport, pSql);
    }

    // all sub-queries are returned, start to local merge process
H
hjxilinx 已提交
1260
    pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
H
hzcheng 已提交
1261

S
slguan 已提交
1262
    tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
H
Hongze Cheng 已提交
1263
             pState->numOfTotal, pState->numOfRetrievedRows);
1264
    
1265
    SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
1266
    tscClearInterpInfo(pPQueryInfo);
1267

1268
    tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
H
hzcheng 已提交
1269 1270 1271 1272 1273 1274 1275 1276
                          &pPObj->cmd, &pPObj->res);
    tscTrace("%p build loser tree completed", pPObj);

    pPObj->res.precision = pSql->res.precision;
    pPObj->res.numOfRows = 0;
    pPObj->res.row = 0;

    // only free once
1277 1278
    tfree(trsupport->pState);
    
H
hzcheng 已提交
1279 1280 1281
    tscFreeSubSqlObj(trsupport, pSql);

    if (pPObj->fp == NULL) {
S
slguan 已提交
1282 1283
      tsem_wait(&pPObj->emptyRspSem);
      tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
1284

S
slguan 已提交
1285
      tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298
    } else {
      // set the command flag must be after the semaphore been correctly set.
      pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
      if (pPObj->res.code == TSDB_CODE_SUCCESS) {
        (*pPObj->fp)(pPObj->param, pPObj, 0);
      } else {
        tscQueueAsyncRes(pPObj);
      }
    }
  }
}

void tscKillMetricQuery(SSqlObj *pSql) {
1299 1300 1301 1302
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
1303 1304 1305 1306 1307 1308 1309 1310 1311
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

    if (pSub == NULL || pSub->thandle == NULL) {
      continue;
    }
S
slguan 已提交
1312

H
hzcheng 已提交
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
    taosStopRpcConn(pSql->pSubs[i]->thandle);
  }

  pSql->numOfSubs = 0;

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

S
slguan 已提交
1342
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
H
hzcheng 已提交
1343

S
slguan 已提交
1344
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
1345 1346 1347
  const int32_t table_index = 0;
  
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
S
slguan 已提交
1348
  if (pNew != NULL) {  // the sub query of two-stage super table query
1349
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1350
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
1351 1352
    
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
1353 1354

    // launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
1355
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
1356
    pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex;
1357

H
hjxilinx 已提交
1358
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
1359
  }
H
hzcheng 已提交
1360 1361 1362 1363

  return pNew;
}

S
slguan 已提交
1364
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
H
hzcheng 已提交
1365
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
1366 1367 1368 1369
  
  SSqlObj*  pParentSql = trsupport->pParentSqlObj;
  SSqlObj*  pSql = (SSqlObj *)tres;
  
1370
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1371 1372 1373
  assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
  
  int32_t idx = pMeterMetaInfo->vnodeIndex;
H
hzcheng 已提交
1374 1375

  SVnodeSidList *vnodeInfo = NULL;
S
slguan 已提交
1376 1377 1378 1379
  SVPeerDesc *   pSvd = NULL;
  if (pMeterMetaInfo->pMetricMeta != NULL) {
    vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
    pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
H
hzcheng 已提交
1380 1381
  }

1382 1383
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
H
hjxilinx 已提交
1384
         pParentSql->numOfSubs == pState->numOfTotal);
1385
  
H
hjxilinx 已提交
1386
  if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1387
    // metric query is killed, Note: code must be less than 0
H
hzcheng 已提交
1388
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
hjxilinx 已提交
1389 1390
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
      code = -(int)(pParentSql->res.code);
H
hzcheng 已提交
1391
    } else {
1392
      code = pState->code;
H
hzcheng 已提交
1393
    }
H
hjxilinx 已提交
1394
    tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
H
hjxilinx 已提交
1395
             trsupport->subqueryIndex, code);
H
hzcheng 已提交
1396 1397 1398
  }

  /*
S
slguan 已提交
1399
   * if a query on a vnode is failed, all retrieve operations from vnode that occurs later
H
hzcheng 已提交
1400 1401
   * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
   * function to abort current and remain retrieve process.
S
slguan 已提交
1402 1403
   *
   * NOTE: threadsafe is required.
H
hzcheng 已提交
1404
   */
S
slguan 已提交
1405
  if (code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1406
    if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
H
hjxilinx 已提交
1407
      tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
1408
      atomic_val_compare_exchange_32(&pState->code, 0, code);
H
hzcheng 已提交
1409
    } else {  // does not reach the maximum retry count, go on
H
hjxilinx 已提交
1410
      tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry);
S
slguan 已提交
1411

H
hjxilinx 已提交
1412
      SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
S
slguan 已提交
1413 1414
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
L
lihui 已提交
1415
                 trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
H
hzcheng 已提交
1416

1417
        pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1418 1419
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
      } else {
1420
        SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1421
        assert(pNewQueryInfo->pMeterInfo[0]->pMeterMeta != NULL && pNewQueryInfo->pMeterInfo[0]->pMetricMeta != NULL);
S
slguan 已提交
1422 1423 1424
        tscProcessSql(pNew);
        return;
      }
H
hzcheng 已提交
1425 1426 1427
    }
  }

1428
  if (pState->code != TSDB_CODE_SUCCESS) {  // failed, abort
H
hzcheng 已提交
1429
    if (vnodeInfo != NULL) {
H
hjxilinx 已提交
1430
      tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
H
hzcheng 已提交
1431
               vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
1432
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1433
    } else {
H
hjxilinx 已提交
1434
      tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
1435
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1436 1437
    }

1438
    tscRetrieveFromVnodeCallBack(param, tres, pState->code);
H
hzcheng 已提交
1439
  } else {  // success, proceed to retrieve data from dnode
L
lihui 已提交
1440 1441
    if (vnodeInfo != NULL) {
      tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
H
hzcheng 已提交
1442
             vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
H
hjxilinx 已提交
1443
             trsupport->subqueryIndex);
L
lihui 已提交
1444 1445 1446 1447
    } else {
      tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
             trsupport->subqueryIndex);
    }
H
hzcheng 已提交
1448 1449 1450 1451 1452

    taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
  }
}

1453
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1454 1455 1456 1457 1458 1459
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  *((uint64_t *)pMsg) = pSql->res.qhandle;
S
slguan 已提交
1460 1461
  pMsg += sizeof(pSql->res.qhandle);

1462
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
1463 1464
  *((uint16_t *)pMsg) = htons(pQueryInfo->type);
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
1465

1466
  pSql->cmd.payloadLen = pMsg - pStart;
H
hzcheng 已提交
1467 1468
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;

1469
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1470 1471
}

S
slguan 已提交
1472
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
H
hzcheng 已提交
1473 1474
  SShellSubmitMsg *pShellMsg;
  char *           pMsg;
1475
  SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
S
slguan 已提交
1476 1477

  SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1478

S
slguan 已提交
1479
  pMsg = buf + tsRpcHeadSize;
H
hzcheng 已提交
1480 1481 1482

  pShellMsg = (SShellSubmitMsg *)pMsg;
  pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
1483 1484
  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip),
           htons(pShellMsg->vnode));
H
hzcheng 已提交
1485 1486
}

1487
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1488 1489
  SShellSubmitMsg *pShellMsg;
  char *           pMsg, *pStart;
S
slguan 已提交
1490

1491 1492 1493 1494
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);

  SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1495 1496 1497 1498 1499

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  pShellMsg = (SShellSubmitMsg *)pMsg;
1500 1501

  pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
H
hzcheng 已提交
1502
  pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
1503
  pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
1504

S
slguan 已提交
1505
  // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
H
hzcheng 已提交
1506
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
1507 1508
  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
           htons(pShellMsg->vnode));
1509 1510
  
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1511 1512
}

S
slguan 已提交
1513 1514
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
  SSqlCmd *       pCmd = &pSql->cmd;
1515
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1516

S
slguan 已提交
1517
  char *          pStart = buf + tsRpcHeadSize;
H
hzcheng 已提交
1518 1519
  SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;

H
hjxilinx 已提交
1520
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // pColumnModel == NULL, query on meter
S
slguan 已提交
1521
    SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1522 1523
    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
  } else {  // query on metric
S
slguan 已提交
1524
    SMetricMeta *  pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hjxilinx 已提交
1525
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1526 1527 1528 1529 1530 1531 1532 1533
    pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
  }
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
1534
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
1535
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
1536
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
1537

1538
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
1539

1540
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->fieldsInfo.numOfOutputCols;
1541
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1542 1543

  // meter query without tags values
1544
  if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
H
hzcheng 已提交
1545 1546 1547
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryMeterMsg) + srcColListSize + exprSize;
  }

S
slguan 已提交
1548
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hzcheng 已提交
1549

H
hjxilinx 已提交
1550
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1551 1552

  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids;
1553
  int32_t outputColumnSize = pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
1554

S
slguan 已提交
1555
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
1556 1557
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
1558 1559 1560
  }

  return size;
H
hzcheng 已提交
1561 1562
}

1563
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfMeters, int32_t vnodeId, char *pMsg) {
1564
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
1565

1566 1567 1568 1569 1570 1571
  SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;

  tscTrace("%p vid:%d, query on %d meters", pSql, htons(vnodeId), numOfMeters);
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
L
lihui 已提交
1572
    tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pMeterMetaInfo->pMeterMeta->sid, pMeterMetaInfo->pMeterMeta->uid);
1573 1574 1575 1576
#endif
    SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
    pMeterInfo->sid = htonl(pMeterMeta->sid);
    pMeterInfo->uid = htobe64(pMeterMeta->uid);
1577
    pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pMeterMeta->uid));
1578 1579 1580
    pMsg += sizeof(SMeterSidExtInfo);
  } else {
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
1581

1582 1583 1584
    for (int32_t i = 0; i < numOfMeters; ++i) {
      SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
      SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
1585

1586 1587
      pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
      pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
1588
      pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
1589 1590
      
      pMsg += sizeof(SMeterSidExtInfo);
1591

1592 1593 1594 1595
      memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
      pMsg += pMetricMeta->tagLen;

#ifdef _DEBUG_VIEW
L
lihui 已提交
1596
      tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
1597 1598 1599
#endif
    }
  }
1600

1601 1602 1603
  return pMsg;
}

1604
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1605 1606
  SSqlCmd *pCmd = &pSql->cmd;

1607
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1608

S
slguan 已提交
1609 1610 1611 1612 1613
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }

1614
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1615
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
1616
  
S
slguan 已提交
1617
  char *          pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
1618

S
slguan 已提交
1619 1620
  SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hzcheng 已提交
1621 1622 1623 1624 1625 1626

  SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;

  int32_t msgLen = 0;
  int32_t numOfMeters = 0;

S
slguan 已提交
1627
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
H
hzcheng 已提交
1628 1629 1630
    numOfMeters = 1;

    tscTrace("%p query on vnode: %d, number of sid:%d, meter id: %s", pSql,
S
slguan 已提交
1631
             pMeterMeta->vpeerDesc[pMeterMeta->index].vnode, 1, pMeterMetaInfo->name);
H
hzcheng 已提交
1632 1633 1634 1635

    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
    pQueryMsg->uid = pMeterMeta->uid;
    pQueryMsg->numOfTagsCols = 0;
1636
  } else {  // query on super table
H
hjxilinx 已提交
1637 1638
    if (pMeterMetaInfo->vnodeIndex < 0) {
      tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1639 1640 1641
      return -1;
    }

H
hjxilinx 已提交
1642
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

    numOfMeters = pVnodeSidList->numOfSids;
    if (numOfMeters <= 0) {
      tscError("%p vid:%d,error numOfMeters in query message:%d", pSql, vnodeId, numOfMeters);
      return -1;  // error
    }

    tscTrace("%p query on vid:%d, number of sid:%d", pSql, vnodeId, numOfMeters);
    pQueryMsg->vnode = htons(vnodeId);
  }

  pQueryMsg->numOfSids = htonl(numOfMeters);
S
slguan 已提交
1656
  pQueryMsg->numOfTagsCols = htons(pMeterMetaInfo->numOfTags);
H
hzcheng 已提交
1657

1658 1659 1660
  if (pQueryInfo->order.order == TSQL_SO_ASC) {
    pQueryMsg->skey = htobe64(pQueryInfo->stime);
    pQueryMsg->ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
1661
  } else {
1662 1663
    pQueryMsg->skey = htobe64(pQueryInfo->etime);
    pQueryMsg->ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
1664 1665
  }

1666 1667
  pQueryMsg->order = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
H
hzcheng 已提交
1668

1669
  pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
H
hzcheng 已提交
1670

1671 1672
  pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
H
hzcheng 已提交
1673

1674
  pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
H
hzcheng 已提交
1675

1676
  if (pQueryInfo->colList.numOfCols <= 0) {
H
hzcheng 已提交
1677 1678 1679 1680 1681 1682 1683 1684 1685
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, pMeterMeta->numOfColumns);
    return -1;
  }

  if (pMeterMeta->numOfTags < 0) {
    tscError("%p illegal value of numOfTagsCols in query msg: %d", pSql, pMeterMeta->numOfTags);
    return -1;
  }

1686
  pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
1687
  pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
1688 1689
  pQueryMsg->slidingTime = htobe64(pQueryInfo->nSlidingTime);
  
1690 1691
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
H
hzcheng 已提交
1692 1693 1694
    return -1;
  }

1695 1696
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1697 1698 1699
    return -1;
  }

1700
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1701 1702

  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // query on meter
H
hzcheng 已提交
1703 1704 1705 1706 1707
    pQueryMsg->tagLength = 0;
  } else {  // query on metric
    pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
  }

1708 1709
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
1710

1711
  if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
1712 1713
    tscError("%p illegal value of number of output columns in query msg: %d", pSql,
             pQueryInfo->fieldsInfo.numOfOutputCols);
H
hzcheng 已提交
1714 1715 1716 1717
    return -1;
  }

  // set column list ids
1718
  char *   pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
H
hzcheng 已提交
1719 1720
  SSchema *pSchema = tsGetSchema(pMeterMeta);

1721 1722
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
1723
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
1724

S
slguan 已提交
1725
    if (pCol->colIndex.columnIndex >= pMeterMeta->numOfColumns || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
H
hzcheng 已提交
1726
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
S
slguan 已提交
1727 1728
      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
               htons(pQueryMsg->vnode), pMeterMeta->sid, pMeterMetaInfo->name, pMeterMeta->numOfColumns, pCol->colIndex,
H
hzcheng 已提交
1729 1730
               pColSchema->name);

S
slguan 已提交
1731
      return -1;  // 0 means build msg failed
H
hzcheng 已提交
1732 1733 1734 1735 1736
    }

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
    pQueryMsg->colList[i].type = htons(pColSchema->type);
S
slguan 已提交
1737
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
1738

S
slguan 已提交
1739 1740 1741
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
1742

S
slguan 已提交
1743 1744 1745 1746
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
1747

S
slguan 已提交
1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
1759

S
slguan 已提交
1760 1761 1762 1763 1764
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
1765 1766 1767 1768
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
1769
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hzcheng 已提交
1770

1771 1772
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
1773

S
slguan 已提交
1774
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
1775 1776 1777
      hasArithmeticFunction = true;
    }

1778
    if (!tscValidateColumnId(pMeterMetaInfo, pExpr->colInfo.colId)) {
H
hzcheng 已提交
1779 1780 1781 1782 1783 1784 1785
      /* column id is not valid according to the cached metermeta, the meter meta is expired */
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

    pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
S
slguan 已提交
1786
    pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
1787

S
slguan 已提交
1788
    pSqlFuncExpr->functionId = htons(pExpr->functionId);
H
hzcheng 已提交
1789 1790 1791 1792 1793 1794 1795 1796 1797
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
1798 1799 1800

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
1801 1802 1803 1804 1805 1806 1807 1808 1809 1810
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
1811 1812
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
1813
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

1825 1826
  // serialize the table info (sid, uid, tags)
  pMsg = doSerializeTableInfo(pSql, numOfMeters, htons(pQueryMsg->vnode), pMsg);
H
hzcheng 已提交
1827

S
slguan 已提交
1828 1829 1830
  // only include the required tag column schema. If a tag is not required, it won't be sent to vnode
  if (pMeterMetaInfo->numOfTags > 0) {
    // always transfer tag schema to vnode if exists
H
hzcheng 已提交
1831 1832
    SSchema *pTagSchema = tsGetTagSchema(pMeterMeta);

S
slguan 已提交
1833 1834 1835 1836
    for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
      if (pMeterMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
        SSchema tbSchema = {
            .bytes = TSDB_METER_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
H
hzcheng 已提交
1837 1838
        memcpy(pMsg, &tbSchema, sizeof(SSchema));
      } else {
S
slguan 已提交
1839
        memcpy(pMsg, &pTagSchema[pMeterMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
H
hzcheng 已提交
1840 1841 1842 1843 1844 1845
      }

      pMsg += sizeof(SSchema);
    }
  }

1846
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1847 1848
  if (pGroupbyExpr->numOfGroupCols != 0) {
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
1849 1850
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
      SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

      *((int16_t *)pMsg) += pCol->colIdx;
      pMsg += sizeof(pCol->colIdx);

      *((int16_t *)pMsg) += pCol->colIdxInBuf;
      pMsg += sizeof(pCol->colIdxInBuf);

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
    }
  }

1868 1869 1870 1871
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
1872 1873 1874 1875 1876 1877 1878 1879
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

1880 1881 1882
  if (pQueryInfo->tsBuf != NULL) {
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pMeterMetaInfo->vnodeIndex);
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
1883 1884

    // todo refactor
1885 1886
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
1887 1888 1889 1890

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
1891 1892
  }

S
slguan 已提交
1893 1894
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
1895 1896
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
1897 1898 1899 1900 1901 1902 1903 1904 1905
  }

  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;

  assert(msgLen + minMsgSize() <= size);
1906 1907

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1908 1909
}

1910
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1911 1912
  SCreateDbMsg *pCreateDbMsg;
  char *        pMsg, *pStart;
S
slguan 已提交
1913

1914
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1915

1916
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
1917
  pCreateDbMsg = (SCreateDbMsg *)pMsg;
1918

1919 1920 1921
  assert(pCmd->numOfClause == 1);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  
L
lihui 已提交
1922
  strncpy(pCreateDbMsg->db, pMeterMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
1923 1924
  pMsg += sizeof(SCreateDbMsg);

1925
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
1926 1927
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DB;

1928
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1929 1930
}

1931
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1932 1933
  SCreateDnodeMsg *pCreate;

1934
  char *pMsg, *pStart;
S
slguan 已提交
1935

1936
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1937

1938
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
1939 1940

  pCreate = (SCreateDnodeMsg *)pMsg;
1941
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
H
hzcheng 已提交
1942 1943 1944

  pMsg += sizeof(SCreateDnodeMsg);

1945 1946
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DNODE;
H
hzcheng 已提交
1947

1948
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1949 1950
}

1951 1952 1953 1954
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SCreateAcctMsg *pAlterMsg;
  char *          pMsg, *pStart;
  int             msgLen = 0;
H
hzcheng 已提交
1955

1956
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1957

1958
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
1959

1960
  pAlterMsg = (SCreateAcctMsg *)pMsg;
H
hzcheng 已提交
1961

1962 1963
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1964

1965 1966
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1967

1968
  pMsg += sizeof(SCreateAcctMsg);
H
hzcheng 已提交
1969

1970
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1971

1972 1973 1974 1975 1976 1977 1978 1979
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1980

1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1994 1995 1996 1997

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;

1998 1999
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_ACCT;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2000 2001
}

2002
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
2003
  SCreateUserMsg *pAlterMsg;
2004
  char *         pMsg, *pStart;
H
hzcheng 已提交
2005

2006
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
2007

2008 2009
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pAlterMsg = (SCreateUserMsg *)pMsg;
H
hzcheng 已提交
2010

2011 2012
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
H
hjxilinx 已提交
2013
  
2014
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
2015

2016 2017 2018 2019
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
2020 2021
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
2022
  }
H
hzcheng 已提交
2023

H
hjxilinx 已提交
2024
  pMsg += sizeof(SCreateUserMsg);
2025
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2026

2027 2028 2029 2030 2031
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pCmd->msgType = TSDB_MSG_TYPE_ALTER_USER;
  } else {
    pCmd->msgType = TSDB_MSG_TYPE_CREATE_USER;
  }
H
hzcheng 已提交
2032

2033
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2034 2035
}

2036 2037 2038
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  char *   pStart = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2039

2040 2041
  char *pMsg = doBuildMsgHeader(pSql, &pStart);
  pMsg += sizeof(SCfgMsg);
H
hzcheng 已提交
2042

2043 2044
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_CFG_PNODE;
H
hzcheng 已提交
2045

2046 2047
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
2048

2049 2050 2051 2052 2053 2054
char *doBuildMsgHeader(SSqlObj *pSql, char **pStart) {
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  char *pMsg = pCmd->payload + tsRpcHeadSize;
  *pStart = pMsg;
H
hzcheng 已提交
2055 2056 2057 2058

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);

2059
  pMsg += sizeof(SMgmtHead);
H
hzcheng 已提交
2060

2061
  return pMsg;
H
hzcheng 已提交
2062 2063
}

2064 2065 2066
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropDbMsg *pDropDbMsg;
  char *      pMsg, *pStart;
S
slguan 已提交
2067

2068
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2069

2070 2071
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pDropDbMsg = (SDropDbMsg *)pMsg;
H
hzcheng 已提交
2072

2073
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
2074 2075
  strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db));
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
2076

2077
  pMsg += sizeof(SDropDbMsg);
H
hzcheng 已提交
2078

2079 2080
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DB;
H
hzcheng 已提交
2081

2082
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2083 2084
}

2085 2086 2087 2088
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropTableMsg *pDropTableMsg;
  char *         pMsg, *pStart;
  int            msgLen = 0;
H
hzcheng 已提交
2089

2090
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2091

L
lihui 已提交
2092 2093 2094 2095 2096 2097 2098 2099 2100 2101
  //pMsg = doBuildMsgHeader(pSql, &pStart);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);

  pMsg   = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
  pMsg += sizeof(SMgmtHead);

2102
  pDropTableMsg = (SDropTableMsg *)pMsg;
H
hzcheng 已提交
2103

2104
  strcpy(pDropTableMsg->meterId, pMeterMetaInfo->name);
H
hzcheng 已提交
2105

2106 2107
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
  pMsg += sizeof(SDropTableMsg);
H
hzcheng 已提交
2108 2109 2110

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
2111
  pCmd->msgType = TSDB_MSG_TYPE_DROP_TABLE;
H
hzcheng 已提交
2112

2113
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2114 2115
}

2116 2117 2118
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropDnodeMsg *pDrop;
  char *         pMsg, *pStart;
H
hzcheng 已提交
2119

S
slguan 已提交
2120
  SSqlCmd *       pCmd = &pSql->cmd;
2121
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2122

2123 2124
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pDrop = (SDropDnodeMsg *)pMsg;
H
hzcheng 已提交
2125

2126
  strcpy(pDrop->ip, pMeterMetaInfo->name);
H
hzcheng 已提交
2127

2128
  pMsg += sizeof(SDropDnodeMsg);
H
hzcheng 已提交
2129

2130 2131
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DNODE;
H
hzcheng 已提交
2132

2133
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2134 2135
}

2136 2137
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropUserMsg *pDropMsg;
H
hzcheng 已提交
2138 2139
  char *        pMsg, *pStart;

2140
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2141

2142 2143
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pDropMsg = (SDropUserMsg *)pMsg;
H
hzcheng 已提交
2144

2145
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2146
  strcpy(pDropMsg->user, pMeterMetaInfo->name);
H
hzcheng 已提交
2147

2148
  pMsg += sizeof(SDropUserMsg);
H
hzcheng 已提交
2149

2150 2151
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
H
hzcheng 已提交
2152

2153
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2154 2155
}

2156
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2157 2158 2159
  SUseDbMsg *pUseDbMsg;
  char *     pMsg, *pStart;

2160
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2161

2162
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
2163
  pUseDbMsg = (SUseDbMsg *)pMsg;
2164

2165
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2166
  strcpy(pUseDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2167 2168 2169

  pMsg += sizeof(SUseDbMsg);

2170
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2171 2172
  pCmd->msgType = TSDB_MSG_TYPE_USE_DB;

2173
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2174 2175
}

2176
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2177 2178 2179 2180 2181 2182 2183 2184
  SShowMsg *pShowMsg;
  char *    pMsg, *pStart;
  int       msgLen = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowTableMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE;
S
slguan 已提交
2185 2186 2187 2188
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for show msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2189 2190 2191 2192 2193

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2194

2195
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2196 2197 2198
  size_t          nameLen = strlen(pMeterMetaInfo->name);

  if (nameLen > 0) {
2199
    strcpy(pMgmt->db, pMeterMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
2200 2201 2202 2203 2204 2205 2206
  } else {
    strcpy(pMgmt->db, pObj->db);
  }

  pMsg += sizeof(SMgmtHead);

  pShowMsg = (SShowMsg *)pMsg;
2207
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
H
hzcheng 已提交
2208

2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
    pMsg += (sizeof(SShowTableMsg) + pPattern->n);
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
2221

2222 2223
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
H
hzcheng 已提交
2224

2225 2226 2227 2228
    pMsg += (sizeof(SShowTableMsg) + pIpAddr->n);
  }

  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2229 2230 2231
  pCmd->msgType = TSDB_MSG_TYPE_SHOW;

  assert(msgLen + minMsgSize() <= size);
2232 2233

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2234 2235
}

2236
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2237 2238 2239 2240 2241
  SKillQuery *pKill;
  char *      pMsg, *pStart;

  SSqlCmd *pCmd = &pSql->cmd;

2242
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
2243 2244 2245
  pKill = (SKillQuery *)pMsg;

  pKill->handle = 0;
2246
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
H
hzcheng 已提交
2247

2248
  pMsg += sizeof(SKillQuery);
H
hzcheng 已提交
2249

2250
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2251

2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_QUERY;
      break;
    case TSDB_SQL_KILL_CONNECTION:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_CONNECTION;
      break;
    case TSDB_SQL_KILL_STREAM:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_STREAM;
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2264 2265
}

2266
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2267 2268 2269 2270
  SSqlCmd *pCmd = &(pSql->cmd);

  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg);

2271
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
2272
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
2273 2274 2275 2276
    size += sizeof(STagData);
  } else {
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
  }
2277

2278 2279 2280
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
2281 2282 2283 2284

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2285
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2286 2287 2288 2289 2290 2291
  SCreateTableMsg *pCreateTableMsg;
  char *           pMsg, *pStart;
  int              msgLen = 0;
  SSchema *        pSchema;
  int              size = 0;

2292 2293 2294
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2295
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2296 2297

  // Reallocate the payload size
2298
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
2299 2300
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
2301
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
2302
  }
H
hzcheng 已提交
2303 2304 2305 2306 2307

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
2308 2309

  // use dbinfo from table id without modifying current db info
S
slguan 已提交
2310
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2311 2312 2313 2314

  pMsg += sizeof(SMgmtHead);

  pCreateTableMsg = (SCreateTableMsg *)pMsg;
S
slguan 已提交
2315
  strcpy(pCreateTableMsg->meterId, pMeterMetaInfo->name);
H
hzcheng 已提交
2316

2317 2318 2319 2320
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;

H
hzcheng 已提交
2321 2322 2323 2324
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
2325
  pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
2326

2327 2328 2329
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
2330
    pMsg += sizeof(STagData);
2331
  } else {  // create (super) table
H
hzcheng 已提交
2332
    pSchema = pCreateTableMsg->schema;
2333

H
hzcheng 已提交
2334
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
2335
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2336 2337 2338 2339

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
2340

H
hzcheng 已提交
2341 2342 2343 2344
      pSchema++;
    }

    pMsg = (char *)pSchema;
2345 2346
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
2347

2348 2349 2350
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
2351 2352 2353
    }
  }

2354
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
2355 2356 2357 2358 2359 2360

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_TABLE;

  assert(msgLen + minMsgSize() <= size);
2361
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2362 2363 2364
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
2365
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2366
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
2367 2368 2369
         TSDB_EXTRA_PAYLOAD_SIZE;
}

2370
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2371 2372 2373 2374 2375
  SAlterTableMsg *pAlterTableMsg;
  char *          pMsg, *pStart;
  int             msgLen = 0;
  int             size = 0;

2376 2377 2378
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

2379
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2380 2381

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
2382 2383 2384 2385
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2386 2387 2388 2389 2390

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2391
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2392 2393
  pMsg += sizeof(SMgmtHead);

2394 2395
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hzcheng 已提交
2396
  pAlterTableMsg = (SAlterTableMsg *)pMsg;
S
slguan 已提交
2397
  strcpy(pAlterTableMsg->meterId, pMeterMetaInfo->name);
2398
  pAlterTableMsg->type = htons(pAlterInfo->type);
2399

2400
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
2401
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
2402 2403

  SSchema *pSchema = pAlterTableMsg->schema;
2404 2405
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_TABLE;

  assert(msgLen + minMsgSize() <= size);
2420

2421
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2422 2423
}

2424
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2425 2426 2427 2428
  SAlterDbMsg *pAlterDbMsg;
  char *       pMsg, *pStart;
  int          msgLen = 0;

S
slguan 已提交
2429 2430
  SSqlCmd *       pCmd = &pSql->cmd;
  STscObj *       pObj = pSql->pTscObj;
2431
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2432

H
hzcheng 已提交
2433 2434 2435 2436 2437 2438 2439 2440
  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pAlterDbMsg = (SAlterDbMsg *)pMsg;
S
slguan 已提交
2441
  strcpy(pAlterDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2442 2443 2444 2445 2446 2447 2448

  pMsg += sizeof(SAlterDbMsg);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_DB;

2449
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2450 2451
}

2452
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2453 2454 2455 2456 2457 2458 2459 2460 2461
  char *pMsg, *pStart;
  int   msgLen = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2462

2463
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2464
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2465 2466 2467 2468
  size_t          nameLen = strlen(pMeterMetaInfo->name);

  if (nameLen > 0) {
    strcpy(pMgmt->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2469 2470 2471
  } else {
    strcpy(pMgmt->db, pObj->db);
  }
S
slguan 已提交
2472

H
hzcheng 已提交
2473 2474
  pMsg += sizeof(SMgmtHead);

2475
  *((uint64_t *)pMsg) = pSql->res.qhandle;
S
slguan 已提交
2476 2477
  pMsg += sizeof(pSql->res.qhandle);

2478 2479
  *((uint16_t *)pMsg) = htons(pQueryInfo->type);
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
2480 2481 2482 2483 2484

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;

2485
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2486 2487
}

2488
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
2489
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
2490 2491 2492
    return pRes->code;
  }

2493 2494 2495
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hzcheng 已提交
2496 2497

    pRes->bytes[i] = pField->bytes;
2498 2499 2500 2501
//    if (pQueryInfo->order.order == TSQL_SO_DESC) {
//      pRes->bytes[i] = -pRes->bytes[i];
//      pRes->tsrow[i] = ((pRes->data + offset * pRes->numOfRows) + (pRes->numOfRows - 1) * pField->bytes);
//    } else {
H
hzcheng 已提交
2502
      pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
2503
//    }
H
hzcheng 已提交
2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
2518

2519
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2520

H
hzcheng 已提交
2521 2522 2523 2524 2525 2526 2527
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

2528
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2529
  } else {
S
slguan 已提交
2530
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
2546
  SSqlCmd *       pCmd = &pSql->cmd;
2547
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2548 2549

  int32_t numOfRes = pMeterMetaInfo->pMeterMeta->numOfColumns + pMeterMetaInfo->pMeterMeta->numOfTags;
H
hzcheng 已提交
2550 2551 2552 2553 2554

  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
2555 2556
  SSqlCmd *pCmd = &pSql->cmd;

2557
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2558
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2559 2560

  int32_t numOfRes = 0;
2561
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
S
slguan 已提交
2562
    numOfRes = pMeterMetaInfo->pMetricMeta->numOfMeters;
H
hzcheng 已提交
2563 2564 2565 2566 2567 2568 2569 2570 2571 2572
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2573 2574
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
2575 2576

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
2577
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2578 2579 2580 2581
  }

  pRes->row = 0;

2582
  uint8_t code = pRes->code;
H
hzcheng 已提交
2583
  if (pSql->fp) {  // async retrieve metric data
2584 2585
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
2586 2587 2588 2589 2590 2591 2592 2593
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
2594
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
2595

2596
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611
  SConnectMsg *pConnect;
  char *       pMsg, *pStart;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  pConnect = (SConnectMsg *)pMsg;

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);

S
slguan 已提交
2612 2613
  strcpy(pConnect->clientVersion, version);

H
hzcheng 已提交
2614 2615
  pMsg += sizeof(SConnectMsg);

2616
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2617 2618
  pCmd->msgType = TSDB_MSG_TYPE_CONNECT;

2619
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2620 2621
}

2622
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2623 2624 2625 2626 2627 2628 2629
  SMeterInfoMsg *pInfoMsg;
  char *         pMsg, *pStart;
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
2630 2631 2632 2633
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
2634 2635 2636 2637
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

2638 2639 2640
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2641
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2642

H
hzcheng 已提交
2643 2644 2645 2646
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2647
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2648 2649 2650 2651

  pMsg += sizeof(SMgmtHead);

  pInfoMsg = (SMeterInfoMsg *)pMsg;
S
slguan 已提交
2652
  strcpy(pInfoMsg->meterId, pMeterMetaInfo->name);
2653
  pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
H
hzcheng 已提交
2654 2655
  pMsg += sizeof(SMeterInfoMsg);

2656
  if (pSql->cmd.createOnDemand) {
H
hzcheng 已提交
2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_METERINFO;

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
2668
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2669 2670
}

S
slguan 已提交
2671 2672 2673 2674 2675
/**
 *  multi meter meta req pkg format:
 *  | SMgmtHead | SMultiMeterInfoMsg | meterId0 | meterId1 | meterId2 | ......
 *      no used         4B
 **/
2676
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
  memset(pMgmt->db, 0, TSDB_METER_ID_LEN);  // server don't need the db

  SMultiMeterInfoMsg *pInfoMsg = (SMultiMeterInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
  pInfoMsg->numOfMeters = htonl((int32_t)pCmd->count);

  if (pCmd->payloadLen > 0) {
    memcpy(pInfoMsg->meterId, tmpData, pCmd->payloadLen);
  }

  tfree(tmpData);

  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg);
  pCmd->msgType = TSDB_MSG_TYPE_MULTI_METERINFO;

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

  tscTrace("%p build load multi-metermeta msg completed, numOfMeters:%d, msg size:%d", pSql, pCmd->count,
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hzcheng 已提交
2711 2712 2713
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
  const int32_t defaultSize =
      minMsgSize() + sizeof(SMetricMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
2714
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
2715

S
slguan 已提交
2716
  int32_t n = 0;
2717 2718
  for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
2719
  }
S
slguan 已提交
2720

H
hjxilinx 已提交
2721
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
2722 2723
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
2724
  }
2725

S
slguan 已提交
2726
  int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2;
2727
  int32_t elemSize = sizeof(SMetricMetaElemMsg) * pQueryInfo->numOfTables;
S
slguan 已提交
2728 2729 2730 2731

  int32_t len = tagLen + joinCondLen + elemSize + defaultSize;

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
2732 2733
}

2734
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2735 2736 2737
  SMetricMetaMsg *pMetaMsg;
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
2738
  int             tableIndex = 0;
H
hzcheng 已提交
2739

2740 2741 2742
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2743
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
2744

2745
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2746 2747

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
2748 2749 2750 2751
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2752 2753 2754 2755 2756

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2757
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2758 2759 2760 2761

  pMsg += sizeof(SMgmtHead);

  pMetaMsg = (SMetricMetaMsg *)pMsg;
2762
  pMetaMsg->numOfMeters = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
2763 2764 2765 2766 2767 2768 2769 2770

  pMsg += sizeof(SMetricMetaMsg);

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
  pMetaMsg->joinCondLen = htonl((TSDB_METER_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
2771

S
slguan 已提交
2772 2773
  memcpy(pMsg, pTagCond->joinInfo.left.meterId, TSDB_METER_ID_LEN);
  pMsg += TSDB_METER_ID_LEN;
H
hzcheng 已提交
2774

S
slguan 已提交
2775 2776 2777 2778 2779 2780 2781 2782 2783
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

  memcpy(pMsg, pTagCond->joinInfo.right.meterId, TSDB_METER_ID_LEN);
  pMsg += TSDB_METER_ID_LEN;

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

2784
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
2785
    pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, i);
S
slguan 已提交
2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797
    uint64_t uid = pMeterMetaInfo->pMeterMeta->uid;

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

    SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)pMsg;
    pMsg += sizeof(SMetricMetaElemMsg);

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
      SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
H
hjxilinx 已提交
2798
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
2799
        condLen = strlen(pCond->cond) + 1;
2800

H
hjxilinx 已提交
2801
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
2802 2803 2804 2805 2806
        if (!ret) {
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
          return 0;
        }
      }
H
hzcheng 已提交
2807 2808
    }

S
slguan 已提交
2809
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
2810

S
slguan 已提交
2811 2812 2813
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
2814

S
slguan 已提交
2815 2816 2817
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
2818

S
slguan 已提交
2819
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
2820 2821 2822 2823 2824 2825 2826
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
2827 2828
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
2829 2830
    }

2831
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
2832

H
hjxilinx 已提交
2833
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
      for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pMeterMetaInfo->tagColumnIndex[j]);
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
2849 2850
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
          SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
2851 2852
          SColIndexEx *pDestCol = (SColIndexEx *)pMsg;

H
hjxilinx 已提交
2853 2854 2855 2856
          pDestCol->colIdxInBuf = 0;
          pDestCol->colIdx = htons(pCol->colIdx);
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
2857

H
hjxilinx 已提交
2858
          pMsg += sizeof(SColIndexEx);
S
slguan 已提交
2859 2860
        }
      }
H
hzcheng 已提交
2861
    }
S
slguan 已提交
2862 2863 2864 2865 2866 2867

    strcpy(pElem->meterId, pMeterMetaInfo->name);
    pElem->numOfTags = htons(pMeterMetaInfo->numOfTags);

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
2868 2869 2870 2871 2872 2873
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_METRIC_META;
  assert(msgLen + minMsgSize() <= size);
2874 2875
  
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2876 2877
}

2878
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
  size += sizeof(SQList);

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    size += sizeof(SQDesc);
    tpSql = tpSql->next;
  }

  size += sizeof(SSList);
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    size += sizeof(SSDesc);
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2901
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2902 2903 2904 2905 2906 2907 2908 2909 2910
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

2911
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
2912 2913 2914 2915
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_HEARTBEAT;

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

int tscProcessMeterMetaRsp(SSqlObj *pSql) {
  SMeterMeta *pMeta;
  SSchema *   pSchema;
  uint8_t     ieType;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;
  pMeta = (SMeterMeta *)rsp;

  pMeta->sid = htonl(pMeta->sid);
S
slguan 已提交
2952
  pMeta->sversion = htons(pMeta->sversion);
H
hzcheng 已提交
2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967
  pMeta->vgid = htonl(pMeta->vgid);
  pMeta->uid = htobe64(pMeta->uid);

  if (pMeta->sid < 0 || pMeta->vgid < 0) {
    tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
    return TSDB_CODE_INVALID_VALUE;
  }

  pMeta->numOfColumns = htons(pMeta->numOfColumns);

  if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMeta->numOfTags);
    return TSDB_CODE_INVALID_VALUE;
  }

2968
  if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) {
H
hzcheng 已提交
2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995
    tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    return TSDB_CODE_INVALID_VALUE;
  }

  for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
    pMeta->vpeerDesc[i].vnode = htonl(pMeta->vpeerDesc[i].vnode);
  }

  pMeta->rowSize = 0;
  rsp += sizeof(SMeterMeta);
  pSchema = (SSchema *)rsp;

  int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);

    // ignore the tags length
    if (i < pMeta->numOfColumns) {
      pMeta->rowSize += pSchema->bytes;
    }
    pSchema++;
  }

  rsp += numOfTotalCols * sizeof(SSchema);

  int32_t  tagLen = 0;
S
slguan 已提交
2996
  SSchema *pTagsSchema = tsGetTagSchema(pMeta);
H
hzcheng 已提交
2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010

  if (pMeta->meterType == TSDB_METER_MTABLE) {
    for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
      tagLen += pTagsSchema[i].bytes;
    }
  }

  rsp += tagLen;
  int32_t size = (int32_t)(rsp - (char *)pMeta);

  // pMeta->index = rand() % TSDB_VNODES_SUPPORT;
  pMeta->index = 0;

  // todo add one more function: taosAddDataIfNotExists();
3011
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
3012
  assert(pMeterMetaInfo->pMeterMeta == NULL);
H
hzcheng 已提交
3013

S
slguan 已提交
3014 3015
  pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
                                                                  size, tsMeterMetaKeepTimer);
3016
  // todo handle out of memory case
S
slguan 已提交
3017
  if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
H
hzcheng 已提交
3018 3019 3020 3021

  return TSDB_CODE_OTHERS;
}

S
slguan 已提交
3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131
/**
 *  multi meter meta rsp pkg format:
 *  | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
  SSchema *pSchema;
  uint8_t  ieType;
  int32_t  totalNum;
  int32_t  i;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

  SMultiMeterInfoMsg *pInfo = (SMultiMeterInfoMsg *)rsp;
  totalNum = htonl(pInfo->numOfMeters);
  rsp += sizeof(SMultiMeterInfoMsg);

  for (i = 0; i < totalNum; i++) {
    SMultiMeterMeta *pMultiMeta = (SMultiMeterMeta *)rsp;
    SMeterMeta *     pMeta = &pMultiMeta->meta;

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
    pMeta->vgid = htonl(pMeta->vgid);
    pMeta->uid = htobe64(pMeta->uid);

    if (pMeta->sid <= 0 || pMeta->vgid < 0) {
      tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    pMeta->numOfColumns = htons(pMeta->numOfColumns);

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid tag value count:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid numOfTags:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    }

    pMeta->rowSize = 0;
    rsp += sizeof(SMultiMeterMeta);
    pSchema = (SSchema *)rsp;

    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    for (int j = 0; j < numOfTotalCols; ++j) {
      pSchema->bytes = htons(pSchema->bytes);
      pSchema->colId = htons(pSchema->colId);

      // ignore the tags length
      if (j < pMeta->numOfColumns) {
        pMeta->rowSize += pSchema->bytes;
      }
      pSchema++;
    }

    rsp += numOfTotalCols * sizeof(SSchema);

    int32_t  tagLen = 0;
    SSchema *pTagsSchema = tsGetTagSchema(pMeta);

    if (pMeta->meterType == TSDB_METER_MTABLE) {
      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
        tagLen += pTagsSchema[j].bytes;
      }
    }

    rsp += tagLen;
    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with SMeterMeta in cache

    pMeta->index = 0;
    (void)taosAddDataIntoCache(tscCacheHandle, pMultiMeta->meterId, (char *)pMeta, size, tsMeterMetaKeepTimer);
  }

  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
3132 3133 3134
int tscProcessMetricMetaRsp(SSqlObj *pSql) {
  SMetricMeta *pMeta;
  uint8_t      ieType;
S
slguan 已提交
3135 3136 3137 3138
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;

  char *rsp = pSql->res.pRsp;
H
hzcheng 已提交
3139 3140 3141 3142 3143 3144 3145 3146 3147

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;

S
slguan 已提交
3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
    pMeta = (SMetricMeta *)rsp;

    size_t size = (size_t)pSql->res.rspLen - 1;
    rsp = rsp + sizeof(SMetricMeta);

    pMeta->numOfMeters = htonl(pMeta->numOfMeters);
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfMeters * sizeof(SMeterSidExtInfo *);
H
hzcheng 已提交
3174

3175 3176
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
3177 3178 3179
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
3180

3181
    SMetricMeta *pNewMetricMeta = (SMetricMeta *)pBuf;
S
slguan 已提交
3182
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
3183

S
slguan 已提交
3184 3185 3186
    pNewMetricMeta->numOfMeters = pMeta->numOfMeters;
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
3187

3188
    pBuf = pBuf + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
3189

S
slguan 已提交
3190 3191
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
3192
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
3193

3194 3195
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
3196

S
slguan 已提交
3197
      tscTrace("%p metricmeta:vid:%d,numOfMeters:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
3198

3199
      pBuf += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids;
S
slguan 已提交
3200
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
3201

3202
      size_t elemSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
3203
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
3204 3205
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
3206 3207 3208 3209

        ((SMeterSidExtInfo *)pBuf)->uid = htobe64(((SMeterSidExtInfo *)pBuf)->uid);
        ((SMeterSidExtInfo *)pBuf)->sid = htonl(((SMeterSidExtInfo *)pBuf)->sid);

3210 3211
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
3212
      }
H
hzcheng 已提交
3213
    }
S
slguan 已提交
3214

3215
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
3216 3217
  }

3218
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
3219 3220 3221
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

3222
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
3223
    tscGetMetricMetaCacheKey(pQueryInfo, name, pMeterMetaInfo->pMeterMeta->uid);
H
hzcheng 已提交
3224

S
slguan 已提交
3225 3226 3227
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
3228

S
slguan 已提交
3229 3230
    // release the used metricmeta
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
H
hzcheng 已提交
3231

S
slguan 已提交
3232 3233 3234 3235 3236 3237 3238 3239 3240
    pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i],
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
    if (pMeterMetaInfo->pMetricMeta == NULL) {
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
3241 3242
  }

S
slguan 已提交
3243 3244 3245 3246 3247 3248 3249 3250 3251 3252
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);

  return pSql->res.code;
H
hzcheng 已提交
3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
  SMeterMeta * pMeta;
  SShowRspMsg *pShow;
  SSchema *    pSchema;
  char         key[20];

3264 3265 3266 3267 3268
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);  //?

3269
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3270 3271 3272 3273

  pShow = (SShowRspMsg *)pRes->pRsp;
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
3274
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285
  pMeta = &(pShow->meterMeta);

  pMeta->numOfColumns = ntohs(pMeta->numOfColumns);

  pSchema = (SSchema *)((char *)pMeta + sizeof(SMeterMeta));
  pMeta->sid = ntohs(pMeta->sid);
  for (int i = 0; i < pMeta->numOfColumns; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

3286
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
3287 3288
  strcpy(key + 1, "showlist");

S
slguan 已提交
3289
  taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
H
hzcheng 已提交
3290 3291

  int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(SMeterMeta);
S
slguan 已提交
3292 3293
  pMeterMetaInfo->pMeterMeta =
      (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
3294
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
S
slguan 已提交
3295
  SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3296

3297
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMeta->numOfColumns);
S
slguan 已提交
3298 3299 3300 3301
  SColumnIndex index = {0};

  for (int16_t i = 0; i < pMeta->numOfColumns; ++i) {
    index.columnIndex = i;
3302 3303
    tscColumnBaseInfoInsert(pQueryInfo, &index);
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pMeterSchema[i]);
H
hzcheng 已提交
3304 3305
  }

3306
  tscFieldInfoCalOffset(pQueryInfo);
H
hzcheng 已提交
3307 3308 3309 3310
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
3311
  char         temp[TSDB_METER_ID_LEN * 2];
H
hzcheng 已提交
3312 3313 3314 3315 3316 3317 3318
  SConnectRsp *pConnect;

  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

  pConnect = (SConnectRsp *)pRes->pRsp;
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
3319 3320
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
3321 3322 3323
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
3324 3325 3326
  SIpList *    pIpList;
  char *rsp = pRes->pRsp + sizeof(SConnectRsp);
  pIpList = (SIpList *)rsp;
S
slguan 已提交
3327
  tscSetMgmtIpList(pIpList);
H
hzcheng 已提交
3328 3329 3330 3331 3332 3333 3334 3335 3336 3337

  strcpy(pObj->sversion, pConnect->version);
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
3338
  STscObj *       pObj = pSql->pTscObj;
3339
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3340 3341

  strcpy(pObj->db, pMeterMetaInfo->name);
H
hzcheng 已提交
3342 3343 3344 3345 3346 3347 3348 3349 3350
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
  taosClearDataCache(tscCacheHandle);
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
3351
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3352 3353

  SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
3354 3355 3356 3357 3358 3359 3360 3361 3362
  if (pMeterMeta == NULL) {
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
3363 3364
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
3365
   */
S
slguan 已提交
3366
  tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3367 3368
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3369 3370 3371
  if (pMeterMetaInfo->pMeterMeta) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3372 3373 3374 3375 3376 3377
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
3378
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3379 3380

  SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
3381 3382 3383 3384
  if (pMeterMeta == NULL) { /* not in cache, abort */
    return 0;
  }

S
slguan 已提交
3385
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3386 3387
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3388
  if (pMeterMetaInfo->pMeterMeta) {
3389
    bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
H
hzcheng 已提交
3390

S
slguan 已提交
3391 3392
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3393

3394
    if (isSuperTable) {  // if it is a super table, reset whole query cache
S
slguan 已提交
3395
      tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412
      taosClearDataCache(tscCacheHandle);
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

  pRes->qhandle = *((uint64_t *)pRes->pRsp);
  pRes->data = NULL;
S
slguan 已提交
3413
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3414 3415 3416 3417
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
3418 3419 3420 3421
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

H
hzcheng 已提交
3422 3423 3424 3425 3426
  SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
  pRes->offset = htobe64(pRetrieve->offset);
S
slguan 已提交
3427
  pRes->useconds = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
3428
  pRes->data = pRetrieve->data;
H
hjxilinx 已提交
3429
  
3430 3431
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
3432

weixin_48148422's avatar
weixin_48148422 已提交
3433
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
3434 3435 3436 3437 3438
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
    
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
3439 3440 3441 3442 3443 3444 3445 3446 3447
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

    int32_t numOfMeters = htonl(*(int32_t*)p);
    p += sizeof(int32_t);
    for (int i = 0; i < numOfMeters; i++) {
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
3448
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
3449
    }
3450 3451
  }

H
hzcheng 已提交
3452 3453
  pRes->row = 0;

H
hjxilinx 已提交
3454
  /**
H
hjxilinx 已提交
3455 3456
   * If the query result is exhausted, or current query is to free resource at server side,
   * the connection will be recycled.
H
hjxilinx 已提交
3457
   */
3458
  if ((pRes->numOfRows == 0 && !(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pRes->offset > 0)) ||
3459
      ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
H
hjxilinx 已提交
3460
    tscTrace("%p no result or free resource, recycle connection", pSql);
H
hzcheng 已提交
3461 3462 3463 3464 3465 3466 3467 3468 3469 3470
    taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
    pSql->thandle = NULL;
  } else {
    tscTrace("%p numOfRows:%d, offset:%d, not recycle connection", pSql, pRes->numOfRows, pRes->offset);
  }

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
3471 3472
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
3473
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3474

H
hzcheng 已提交
3475 3476 3477 3478
  SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
3479

3480
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
3481 3482 3483 3484 3485 3486
  pRes->row = 0;
  return 0;
}

void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code);

3487
static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
H
hzcheng 已提交
3488
  int32_t code = TSDB_CODE_SUCCESS;
3489

S
slguan 已提交
3490 3491 3492 3493 3494
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
    tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
3495

H
hzcheng 已提交
3496 3497 3498
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
3499

3500
  tscAddSubqueryInfo(&pNew->cmd);
3501 3502 3503 3504 3505

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

  pNew->cmd.createOnDemand = pSql->cmd.createOnDemand;  // create table if not exists
S
slguan 已提交
3506 3507 3508
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("%p malloc failed for payload to get meter meta", pSql);
    free(pNew);
3509

S
slguan 已提交
3510 3511 3512
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

3513 3514
  SMeterMetaInfo *pNewMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
3515

3516 3517
  strcpy(pNewMeterMetaInfo->name, pMeterMetaInfo->name);
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
H
hzcheng 已提交
3518 3519 3520
  tscTrace("%p new pSqlObj:%p to get meterMeta", pSql, pNew);

  if (pSql->fp == NULL) {
S
slguan 已提交
3521 3522
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3523 3524

    code = tscProcessSql(pNew);
S
slguan 已提交
3525

3526 3527 3528 3529
    /*
     * Update cache only on succeeding in getting metermeta.
     * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
     */
H
hzcheng 已提交
3530
    if (code == TSDB_CODE_SUCCESS) {
3531
      pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
3532
      assert(pMeterMetaInfo->pMeterMeta != NULL);
H
hzcheng 已提交
3533 3534
    }

3535
    tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3536 3537 3538 3539 3540
    tscFreeSqlObj(pNew);

  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
3541
    pNew->sqlstr = strdup(pSql->sqlstr);
H
hzcheng 已提交
3542 3543 3544 3545 3546 3547 3548 3549 3550 3551

    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

3552 3553
int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
  assert(strlen(pMeterMetaInfo->name) != 0);
S
slguan 已提交
3554

3555 3556 3557 3558 3559
  // If this SMeterMetaInfo owns a metermeta, release it first
  if (pMeterMetaInfo->pMeterMeta != NULL) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
  }
  
3560
  pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
S
slguan 已提交
3561 3562 3563 3564 3565
  if (pMeterMetaInfo->pMeterMeta != NULL) {
    SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;

    tscTrace("%p retrieve meterMeta from cache, the number of columns:%d, numOfTags:%d", pSql, pMeterMeta->numOfColumns,
             pMeterMeta->numOfTags);
H
hzcheng 已提交
3566 3567 3568 3569 3570 3571 3572 3573

    return TSDB_CODE_SUCCESS;
  }

  /*
   * for async insert operation, release data block buffer before issue new object to get metermeta
   * because in metermeta callback function, the tscParse function will generate the submit data blocks
   */
3574
  return doGetMeterMetaFromServer(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3575 3576
}

3577 3578 3579
int tscGetMeterMetaEx(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo, bool createIfNotExists) {
  pSql->cmd.createOnDemand = createIfNotExists;
  return tscGetMeterMeta(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3580 3581 3582 3583
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
3584
 *
H
hzcheng 已提交
3585 3586 3587 3588 3589
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
3590
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
 * @param meterId       meter id
 * @return              status code
 */
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
3602 3603
  int code = 0;

H
hzcheng 已提交
3604 3605
  // handle metric meta renew process
  SSqlCmd *pCmd = &pSql->cmd;
3606 3607

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3608
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3609 3610 3611 3612 3613

  // enforce the renew metermeta operation in async model
  if (pSql->fp == NULL) pSql->fp = (void *)0x1;

  /*
S
slguan 已提交
3614
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
3615 3616
   * 2. if get metermeta failed, still get the metermeta
   */
S
slguan 已提交
3617 3618
  if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
    if (pMeterMetaInfo->pMeterMeta) {
H
hjxilinx 已提交
3619
      tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3620
               pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3621
    }
3622

3623
    tscWaitingForCreateTable(pCmd);
S
slguan 已提交
3624
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
H
hzcheng 已提交
3625

3626
    code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo);  // todo ??
H
hzcheng 已提交
3627
  } else {
H
hjxilinx 已提交
3628
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3629 3630
             pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid,
             pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641
  }

  if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
    if (pSql->fp == (void *)0x1) {
      pSql->fp = NULL;
    }
  }

  return code;
}

3642
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
3643 3644
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
3645 3646

  /*
3647
   * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
H
hzcheng 已提交
3648
   */
3649
  bool    required = false;
3650

3651
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
3652
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
3653 3654
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

3655
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
3656
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3657 3658 3659 3660 3661

    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);

    SMetricMeta *ppMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
    if (ppMeta == NULL) {
3662
      required = true;
S
slguan 已提交
3663 3664 3665 3666 3667
      break;
    } else {
      pMeterMetaInfo->pMetricMeta = ppMeta;
    }
  }
H
hzcheng 已提交
3668

3669 3670
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
3671 3672 3673
    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
3674
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
3675 3676 3677 3678
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  pNew->cmd.command = TSDB_SQL_METRIC;
3679 3680
  
  SQueryInfo *pNewQueryInfo = NULL;
3681 3682 3683
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
3684
  
3685
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
3686
    SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
3687

S
slguan 已提交
3688
    SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
3689
    tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
3690 3691 3692 3693 3694 3695
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
3696

3697
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
3698

3699 3700
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
3701

3702 3703
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
3704 3705 3706 3707 3708
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
3709

3710 3711 3712 3713
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
3714 3715 3716

  tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
  if (pSql->fp == NULL) {
S
slguan 已提交
3717 3718
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3719 3720

    code = tscProcessSql(pNew);
S
slguan 已提交
3721

3722 3723 3724 3725 3726
    if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
        char tagstr[TSDB_MAX_TAGS_LEN] = {0};
    
        SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
3727
        tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3728 3729

#ifdef _DEBUG_VIEW
3730
        printf("create metric key:%s, index:%d\n", tagstr, i);
S
slguan 已提交
3731
#endif
3732 3733 3734 3735
    
        taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
        pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
      }
S
slguan 已提交
3736 3737
    }

H
hzcheng 已提交
3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756
    tscFreeSqlObj(pNew);
  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

void tscInitMsgs() {
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
3757
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
3758

3759 3760
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
3761 3762

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
3763
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropAcctMsg;
H
hzcheng 已提交
3764 3765 3766
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
3767
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
3768 3769 3770
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
3771 3772 3773 3774 3775 3776 3777
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_META] = tscBuildMeterMetaMsg;
  tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg;
S
slguan 已提交
3778
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
3779 3780 3781 3782

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
3783 3784 3785
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
3786 3787 3788 3789 3790 3791 3792 3793 3794 3795

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessMeterMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
S
slguan 已提交
3796
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
3797 3798

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
3799
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
3800
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
3801

H
hzcheng 已提交
3802
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
3803 3804 3805 3806 3807
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
3808

H
hzcheng 已提交
3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;

  tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg;
  tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg;
}