tscServer.c 117.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tcache.h"
#include "trpc.h"
S
slguan 已提交
19
#include "tscJoinProcess.h"
H
hzcheng 已提交
20 21 22 23 24
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
S
slguan 已提交
25
#include "tscompression.h"
H
hzcheng 已提交
26 27 28 29 30 31 32
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
33 34
int        tsMasterIndex = 0;
int        tsSlaveIndex = 1;
H
hzcheng 已提交
35

S
slguan 已提交
36
SRpcIpSet  tscMgmtIpList;
S
slguan 已提交
37 38
SRpcIpSet  tscDnodeIpSet;

39 40
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
41
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
S
slguan 已提交
42
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
H
hzcheng 已提交
43 44
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
45 46 47
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
48

S
slguan 已提交
49
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
50

S
slguan 已提交
51 52
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
S
slguan 已提交
53
    tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
S
slguan 已提交
54
  } else {
S
slguan 已提交
55
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
S
slguan 已提交
56
      tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
S
slguan 已提交
57
    }
S
slguan 已提交
58 59 60
  }
}

S
slguan 已提交
61 62
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
  tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
S
slguan 已提交
63
  tscMgmtIpList.inUse = htons(pIpList->inUse);
S
slguan 已提交
64 65 66
  tscMgmtIpList.port = htons(pIpList->port);
  for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
    tscMgmtIpList.ip[i] = pIpList->ip[i];
S
slguan 已提交
67 68 69 70
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
71 72
  if (tscMgmtIpList.numOfIps != 1) {
    tscMgmtIpList.numOfIps = 1;
S
slguan 已提交
73
    tscMgmtIpList.inUse = 0;
S
slguan 已提交
74
    tscMgmtIpList.port = tsMnodeShellPort;
S
slguan 已提交
75 76 77 78 79 80
    tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
81
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
82 83 84 85 86 87 88 89
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
90 91 92
  }
}

H
hjxilinx 已提交
93 94 95 96 97 98 99
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
100
UNUSED_FUNC
H
hjxilinx 已提交
101 102 103 104 105
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
  return tscMgmtIpList.numOfIps * factor;
}

H
hzcheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
118
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
119
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
120
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
121

H
hzcheng 已提交
122 123 124
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
125 126
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
143 144 145
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
146
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
147
    
148 149 150 151
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
S
slguan 已提交
152 153 154 155 156
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
157 158 159 160
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
161
    tscAddSubqueryInfo(&pObj->pHb->cmd);
162

S
slguan 已提交
163
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
164 165 166
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
167
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
168 169 170 171 172 173 174 175 176
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
S
slguan 已提交
177 178
  char *pMsg = rpcMallocCont(pSql->cmd.payloadLen);
  if (NULL == pMsg) {
S
slguan 已提交
179 180
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
181 182
  }

S
slguan 已提交
183
  pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
184
  
S
slguan 已提交
185
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
186
    pSql->ipList->port = tsDnodeShellPort;
S
slguan 已提交
187 188
    tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
189 190 191 192 193 194 195 196 197

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
      .code   = 0
    };
    rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
S
slguan 已提交
198
  } else {
S
slguan 已提交
199
    pSql->ipList->port = tsMnodeShellPort;
S
slguan 已提交
200
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
S
slguan 已提交
201
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
202 203 204 205 206 207 208 209
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
    rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg);
H
hzcheng 已提交
210 211
  }

S
slguan 已提交
212
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
213 214
}

215
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
S
slguan 已提交
216
  tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
217
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
S
slguan 已提交
218
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
219
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
220
    return;
H
hzcheng 已提交
221 222
  }

S
slguan 已提交
223 224 225
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
226
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
227 228

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
229 230
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
231
    tscFreeSqlObj(pSql);
232
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
233
    return;
H
hzcheng 已提交
234 235
  }

236 237
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
238
  } else {
H
hjxilinx 已提交
239
    STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
240 241 242 243
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION ||
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
244 245 246 247 248 249 250 251 252 253 254
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
255 256
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
257 258
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
259 260
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
261 262
        return;
      } else {
263
        tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code);
H
hzcheng 已提交
264

S
slguan 已提交
265
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
266
        pSql->res.code = rpcMsg->code;  // keep the previous error code
S
slguan 已提交
267

H
hjxilinx 已提交
268
        rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
H
hzcheng 已提交
269

H
hjxilinx 已提交
270
        if (pTableMetaInfo->pTableMeta) {
S
slguan 已提交
271
          tscSendMsgToServer(pSql);
272
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
273 274
          return;
        }
H
hzcheng 已提交
275 276
      }
    }
S
slguan 已提交
277
  }
H
hzcheng 已提交
278 279 280

  pSql->retry = 0;

S
slguan 已提交
281
  if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
H
hzcheng 已提交
282 283 284

  pRes->rspLen = 0;
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
285
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
286 287 288 289
  } else {
    tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
  }

S
slguan 已提交
290
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
291 292 293 294
    assert(rpcMsg->msgType == pCmd->msgType + 1);
    pRes->code = (int32_t)rpcMsg->code;
    pRes->rspType = rpcMsg->msgType;
    pRes->rspLen = rpcMsg->contLen;
H
hzcheng 已提交
295

S
slguan 已提交
296 297 298 299 300 301
    char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
    if (tmp == NULL) {
      pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    } else {
      pRes->pRsp = tmp;
      if (pRes->rspLen) {
302
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
303 304 305 306
      }
    }

    // ignore the error information returned from mnode when set ignore flag in sql
S
slguan 已提交
307
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) {
H
hzcheng 已提交
308 309 310 311 312 313 314
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
315 316
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
317 318 319 320 321 322 323
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
S
slguan 已提交
324 325 326 327
      tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
               *(int32_t *)pRes->pRsp, pRes->rspLen);
    } else {
      tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
H
hzcheng 已提交
328 329 330 331
    }
  }

  if (pSql->fp == NULL) {
S
slguan 已提交
332
    tsem_post(&pSql->rspSem);
H
hzcheng 已提交
333 334
  } else {
    if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
335
      rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hzcheng 已提交
336

337
    if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
H
hzcheng 已提交
338 339
      int   command = pCmd->command;
      void *taosres = tscKeepConn[command] ? pSql : NULL;
340
      rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
H
hzcheng 已提交
341

342
      tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
H
hzcheng 已提交
343 344

      /*
S
slguan 已提交
345 346 347
       * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
       * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
       * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
H
hzcheng 已提交
348 349 350 351 352 353
       *
       * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
       * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
       */
      bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
      if (command == TSDB_SQL_INSERT) {  // handle multi-vnode insertion situation
354
        (*pSql->fp)(pSql, taosres, rpcMsg->code);
H
hzcheng 已提交
355
      } else {
356
        (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
      }

      if (shouldFree) {
        // If it is failed, all objects allocated during execution taos_connect_a should be released
        if (command == TSDB_SQL_CONNECT) {
          taos_close(pObj);
          tscTrace("%p Async sql close failed connection", pSql);
        } else {
          tscFreeSqlObj(pSql);
          tscTrace("%p Async sql is automatically freed", pSql);
        }
      }
    }
  }

372
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
373 374
}

S
slguan 已提交
375
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
376
static int      tscLaunchSTableSubqueries(SSqlObj *pSql);
H
hzcheng 已提交
377

S
slguan 已提交
378
// todo merge with callback
H
hjxilinx 已提交
379
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
380
  SSqlCmd *   pCmd = &pSql->cmd;
H
hjxilinx 已提交
381
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
382

S
slguan 已提交
383 384 385 386
  pSql->res.qhandle = 0x1;
  pSql->res.numOfRows = 0;

  if (pSql->pSubs == NULL) {
H
hjxilinx 已提交
387
    pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
S
slguan 已提交
388 389 390 391 392
    if (pSql->pSubs == NULL) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }
  }

H
hjxilinx 已提交
393
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
S
slguan 已提交
394 395 396
  if (pNew == NULL) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
397

S
slguan 已提交
398
  pSql->pSubs[pSql->numOfSubs++] = pNew;
H
hjxilinx 已提交
399
  assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
H
hzcheng 已提交
400

401 402
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
S
slguan 已提交
403 404

    // refactor as one method
405
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
406
    assert(pNewQueryInfo != NULL);
407

408 409
    tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
    tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
S
slguan 已提交
410

H
hjxilinx 已提交
411
    tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false);
412
    tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
H
hjxilinx 已提交
413
    
414
    tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
S
slguan 已提交
415 416

    pNew->cmd.numOfCols = 0;
417
    pNewQueryInfo->intervalTime = 0;
418
    memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
419

420 421
    // backup the data and clear it in the sqlcmd object
    pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
422
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
S
slguan 已提交
423

H
hjxilinx 已提交
424 425 426 427
    // this data needs to be transfer to support struct
    pNewQueryInfo->fieldsInfo.numOfOutputCols = 0;
    pNewQueryInfo->exprsInfo.numOfExprs = 0;
    
S
slguan 已提交
428
    // set the ts,tags that involved in join, as the output column of intermediate result
429
    tscClearSubqueryInfo(&pNew->cmd);
430

S
slguan 已提交
431
    SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
S
slguan 已提交
432 433
    SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};

434
    tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
S
slguan 已提交
435 436

    // set the tags value for ts_comp function
437
    SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
S
slguan 已提交
438

H
hjxilinx 已提交
439 440
    STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0);
    int16_t         tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
S
slguan 已提交
441 442 443 444 445 446 447 448

    pExpr->param->i64Key = tagColIndex;
    pExpr->numOfParams = 1;

    // add the filter tag column
    for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
      SColumnBase *pColBase = &pSupporter->colList.pColList[i];
      if (pColBase->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
449 450
        tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
        pNewQueryInfo->colList.numOfCols++;
S
slguan 已提交
451 452
      }
    }
453 454 455
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
H
hjxilinx 已提交
456
             pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
457 458
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
H
hjxilinx 已提交
459
    tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
460 461 462
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
H
hjxilinx 已提交
463
             pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
H
hjxilinx 已提交
464 465 466
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
    tscPrintSelectClause(pNew, 0);
S
slguan 已提交
467
  } else {
468
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
469
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
S
slguan 已提交
470
  }
471

H
hjxilinx 已提交
472
#ifdef _DEBUG_VIEW
H
hjxilinx 已提交
473
  tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
474
#endif
475
  
S
slguan 已提交
476 477 478 479 480 481 482 483
  return tscProcessSql(pNew);
}

int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *asyncFp = pSql->fp;
484 485 486 487
  if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) {
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
488
  }
489 490 491

  int32_t code = tscSendMsgToServer(pSql);

S
slguan 已提交
492
  if (asyncFp) {
493
    if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
494 495 496 497 498 499
      pRes->code = code;
      tscQueueAsyncRes(pSql);
    }
    return 0;
  }

500
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
501 502 503 504 505 506
    pRes->code = code;
    return code;
  }

  tsem_wait(&pSql->rspSem);

507
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) (*tscProcessMsgRsp[pCmd->command])(pSql);
S
slguan 已提交
508 509 510 511 512 513 514

  tsem_post(&pSql->emptyRspSem);

  return pRes->code;
}

int tscProcessSql(SSqlObj *pSql) {
515 516 517
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
518 519
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
520
  STableMetaInfo *pTableMetaInfo = NULL;
521 522
  int16_t         type = 0;

523
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
524 525 526
    pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
527
    }
528

529
    type = pQueryInfo->type;
530 531 532
  
    // for hearbeat, numOfTables == 0;
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
533
  }
534

535
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
536
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
H
hjxilinx 已提交
537 538
    // the pTableMetaInfo cannot be NULL
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
539 540 541
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
542

S
slguan 已提交
543 544
    // temp
    pSql->ipList = &tscMgmtIpList;
H
hjxilinx 已提交
545 546
//    if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {
//      pSql->index = pTableMetaInfo->pTableMeta->index;
S
slguan 已提交
547 548
//    } else {  // it must be the parent SSqlObj for super table query
//      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
H
hjxilinx 已提交
549
//        int32_t idx = pTableMetaInfo->vnodeIndex;
S
slguan 已提交
550
//
H
hjxilinx 已提交
551
//        SVnodeSidList *pSidList = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
S
slguan 已提交
552 553 554
//        pSql->index = pSidList->index;
//      }
//    }
H
hzcheng 已提交
555
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
S
slguan 已提交
556
    pSql->ipList = &tscMgmtIpList;
H
hzcheng 已提交
557 558 559 560
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
561
  // todo handle async situation
562 563
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
S
slguan 已提交
564
      SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
565

566
      pState->numOfTotal = pQueryInfo->numOfTables;
S
slguan 已提交
567

568
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
569 570 571 572
        SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);

        if (pSupporter == NULL) {  // failed to create support struct, abort current query
          tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
573
          pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
S
slguan 已提交
574 575 576 577 578
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          return pSql->res.code;
        }

H
hjxilinx 已提交
579
        int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
S
slguan 已提交
580 581 582 583 584 585 586 587
        if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
          tscDestroyJoinSupporter(pSupporter);
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          break;
        }
      }

S
slguan 已提交
588 589
      tsem_post(&pSql->emptyRspSem);
      tsem_wait(&pSql->rspSem);
S
slguan 已提交
590

S
slguan 已提交
591
      tsem_post(&pSql->emptyRspSem);
S
slguan 已提交
592 593 594 595 596 597 598 599 600 601

      if (pSql->numOfSubs <= 0) {
        pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      } else {
        pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
      }

      return TSDB_CODE_SUCCESS;
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
602
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
603 604 605 606
        return doProcessSql(pSql);
      }
    }
  }
607
  
608
  if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
609 610
    /*
     * (ref. line: 964)
H
hjxilinx 已提交
611
     * Before this function returns from tscLaunchSTableSubqueries and continues, pSql may have been released at user
H
hzcheng 已提交
612 613 614 615 616
     * program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack.
     *
     * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
     * which causes deadlock. So we keep it as local variable.
     */
H
hjxilinx 已提交
617
    if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
618 619
      return pRes->code;
    }
620 621
    
    return pSql->res.code;
H
hjxilinx 已提交
622
  } else if (pSql->fp == (void(*)())launchMultivnodeInsert) {  // multi-vnodes insertion
623
    launchMultivnodeInsert(pSql);
H
hzcheng 已提交
624 625
    return pSql->res.code;
  }
626
  
S
slguan 已提交
627 628
  return doProcessSql(pSql);
}
H
hzcheng 已提交
629

630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
  assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
  
    tfree(pSupport->localBuffer);
  
    pthread_mutex_unlock(&pSupport->queryMutex);
    pthread_mutex_destroy(&pSupport->queryMutex);
  
    tfree(pSupport);
  
    tscFreeSqlObj(pSub);
S
slguan 已提交
647
  }
648 649
  
  free(pState);
H
hzcheng 已提交
650 651
}

H
hjxilinx 已提交
652
int tscLaunchSTableSubqueries(SSqlObj *pSql) {
H
hzcheng 已提交
653
  SSqlRes *pRes = &pSql->res;
654
  SSqlCmd *pCmd = &pSql->cmd;
655

S
slguan 已提交
656
  // pRes->code check only serves in launching metric sub-queries
H
hzcheng 已提交
657
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
658 659
    pCmd->command = TSDB_SQL_RETRIEVE_METRIC;  // enable the abort of kill metric function.
    return pRes->code;
H
hzcheng 已提交
660 661 662 663
  }

  tExtMemBuffer **  pMemoryBuf = NULL;
  tOrderDescriptor *pDesc = NULL;
H
hjxilinx 已提交
664
  SColumnModel *    pModel = NULL;
H
hzcheng 已提交
665 666

  pRes->qhandle = 1;  // hack the qhandle check
667 668 669

  const uint32_t nBufferSize = (1 << 16);  // 64KB

670
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
671 672
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
  int32_t         numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
673
  assert(numOfSubQueries > 0);
H
hzcheng 已提交
674 675 676 677 678 679 680 681 682 683

  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
  if (ret != 0) {
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    if (pSql->fp) {
      tscQueueAsyncRes(pSql);
    }
    return pRes->code;
  }

684 685
  pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
  pSql->numOfSubs = numOfSubQueries;
686

687
  tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
S
slguan 已提交
688
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
689
  pState->numOfTotal = numOfSubQueries;
H
hzcheng 已提交
690 691
  pRes->code = TSDB_CODE_SUCCESS;

692 693 694 695 696
  int32_t i = 0;
  for (; i < numOfSubQueries; ++i) {
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
H
hzcheng 已提交
697 698
      break;
    }
699
    
H
hzcheng 已提交
700 701
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
S
slguan 已提交
702
    trs->pState = pState;
703
    
H
hzcheng 已提交
704
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
705 706 707 708 709 710
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs);
      break;
    }
    
H
hjxilinx 已提交
711
    trs->subqueryIndex = i;
H
hzcheng 已提交
712 713 714 715 716 717 718 719
    trs->pParentSqlObj = pSql;
    trs->pFinalColModel = pModel;

    pthread_mutexattr_t mutexattr = {0};
    pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(&trs->queryMutex, &mutexattr);
    pthread_mutexattr_destroy(&mutexattr);

720
    SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
S
slguan 已提交
721
    if (pNew == NULL) {
722 723 724
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs->localBuffer);
      tfree(trs);
S
slguan 已提交
725 726 727 728
      break;
    }

    // todo handle multi-vnode situation
729
    if (pQueryInfo->tsBuf) {
730
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
731
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
S
slguan 已提交
732
    }
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
    
    tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
  }
  
  if (i < numOfSubQueries) {
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
  
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;   // free all allocated resource
  }
  
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;
  }
  
  for(int32_t j = 0; j < numOfSubQueries; ++j) {
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
    tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
    tscProcessSql(pSub);
H
hzcheng 已提交
758 759 760 761 762 763 764 765 766 767 768
  }

  return TSDB_CODE_SUCCESS;
}

static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
  tscTrace("%p start to free subquery result", pSql);

  if (pSql->res.code == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
  }
S
slguan 已提交
769

H
hzcheng 已提交
770 771 772 773 774 775 776 777
  tfree(trsupport->localBuffer);

  pthread_mutex_unlock(&trsupport->queryMutex);
  pthread_mutex_destroy(&trsupport->queryMutex);

  tfree(trsupport);
}

S
slguan 已提交
778 779
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);

H
hzcheng 已提交
780
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
S
slguan 已提交
781 782 783 784 785 786 787 788 789
// set no disk space error info
#ifdef WINDOWS
  LPVOID lpMsgBuf;
  FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
                GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),  // Default language
                (LPTSTR)&lpMsgBuf, 0, NULL);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
  LocalFree(lpMsgBuf);
#else
H
hzcheng 已提交
790 791 792
  char buf[256] = {0};
  strerror_r(errno, buf, 256);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
S
slguan 已提交
793
#endif
H
hzcheng 已提交
794

S
slguan 已提交
795
  trsupport->pState->code = -errCode;
H
hzcheng 已提交
796 797 798 799
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

  pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
800
  tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
H
hzcheng 已提交
801 802 803 804
}

static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
  SSqlObj *pPObj = trsupport->pParentSqlObj;
H
hjxilinx 已提交
805
  int32_t  subqueryIndex = trsupport->subqueryIndex;
H
hzcheng 已提交
806 807

  assert(pSql != NULL);
808 809 810
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
         pPObj->numOfSubs == pState->numOfTotal);
H
hzcheng 已提交
811 812

  /* retrieved in subquery failed. OR query cancelled in retrieve phase. */
813 814
  if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
    pState->code = -(int)pPObj->res.code;
H
hzcheng 已提交
815 816 817 818 819 820 821 822

    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
     * Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
    tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
823
             subqueryIndex, pState->code);
H
hzcheng 已提交
824 825
  }

S
slguan 已提交
826
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
H
hjxilinx 已提交
827 828
    tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
829
        subqueryIndex, pState->code);
H
hzcheng 已提交
830
  } else {
831
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
832
      /*
S
slguan 已提交
833 834
       * current query failed, and the retry count is less than the available
       * count, retry query clear previous retrieved data, then launch a new sub query
H
hzcheng 已提交
835
       */
H
hjxilinx 已提交
836
      tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
H
hzcheng 已提交
837 838 839 840 841

      // clear local saved number of results
      trsupport->localBuffer->numOfElems = 0;
      pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
842
      tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
H
hjxilinx 已提交
843
               subqueryIndex, trsupport->numOfRetry);
S
slguan 已提交
844

845
      SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
S
slguan 已提交
846 847 848 849
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
                 trsupport->pParentSqlObj, pSql);

850
        pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
851 852 853
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
        return;
      }
H
hzcheng 已提交
854 855 856

      tscProcessSql(pNew);
      return;
S
slguan 已提交
857
    } else {  // reach the maximum retry count, abort
858
      atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
S
slguan 已提交
859
      tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
860
               numOfRows, subqueryIndex, pState->code);
H
hzcheng 已提交
861 862 863
    }
  }

H
Hongze Cheng 已提交
864 865
  int32_t numOfTotal = pState->numOfTotal;

866
  int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
867
  if (finished < numOfTotal) {
868
    tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
869 870 871 872
    return tscFreeSubSqlObj(trsupport, pSql);
  }

  // all subqueries are failed
H
hjxilinx 已提交
873
  tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
874
  pPObj->res.code = -(pState->code);
H
hzcheng 已提交
875 876 877

  // release allocated resource
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
878
                            pState->numOfTotal);
H
hzcheng 已提交
879

S
slguan 已提交
880
  tfree(trsupport->pState);
H
hzcheng 已提交
881 882
  tscFreeSubSqlObj(trsupport, pSql);

S
slguan 已提交
883
  // sync query, wait for the master SSqlObj to proceed
H
hzcheng 已提交
884 885
  if (pPObj->fp == NULL) {
    // sync query, wait for the master SSqlObj to proceed
S
slguan 已提交
886 887
    tsem_wait(&pPObj->emptyRspSem);
    tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
888

S
slguan 已提交
889
    tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
890 891 892

    pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
  } else {
S
slguan 已提交
893
    // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
894 895
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);

896
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
897 898 899 900 901
      (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
    } else {  // regular super table query
      if (pPObj->res.code != TSDB_CODE_SUCCESS) {
        tscQueueAsyncRes(pPObj);
      }
H
hzcheng 已提交
902 903 904 905 906 907
    }
  }
}

void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
908
  int32_t           idx = trsupport->subqueryIndex;
H
hzcheng 已提交
909 910 911 912
  SSqlObj *         pPObj = trsupport->pParentSqlObj;
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;

  SSqlObj *pSql = (SSqlObj *)tres;
913
  if (pSql == NULL) {  // sql object has been released in error process, return immediately
H
hzcheng 已提交
914 915 916 917
    tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
    return;
  }

918 919 920 921
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
      pPObj->numOfSubs == pState->numOfTotal);
  
H
hzcheng 已提交
922 923 924
  // query process and cancel query process may execute at the same time
  pthread_mutex_lock(&trsupport->queryMutex);

925
  if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
926 927 928
    return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
  }

929 930 931
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
932
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
933

H
hjxilinx 已提交
934
  SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
935
  SVnodeDesc *   pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
H
hzcheng 已提交
936 937 938

  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
939
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
H
hzcheng 已提交
940

S
slguan 已提交
941
    tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
942
             pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
943
    
H
hjxilinx 已提交
944
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
945 946 947 948 949 950
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
          pPObj, pSql, tsMaxNumOfOrderedResults, num);
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
      return;
    }
    
H
hzcheng 已提交
951 952 953 954

#ifdef _DEBUG_VIEW
    printf("received data from vnode: %d rows\n", pRes->numOfRows);
    SSrcColumnInfo colInfo[256] = {0};
955 956

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
957
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
H
hzcheng 已提交
958
#endif
S
slguan 已提交
959
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
960 961
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
962 963 964
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
965
    
S
slguan 已提交
966
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
967
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
968 969 970 971 972 973 974 975
    if (ret < 0) {
      // set no disk space error info, and abort retry
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    } else {
      pthread_mutex_unlock(&trsupport->queryMutex);
      taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
    }

S
slguan 已提交
976 977
  } else {  // all data has been retrieved to client
    /* data in from current vnode is stored in cache and disk */
H
hjxilinx 已提交
978
    uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
S
slguan 已提交
979 980
    tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
             pSvd->vnode, numOfRowsFromVnode, idx);
H
hzcheng 已提交
981

H
hjxilinx 已提交
982
    tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
H
hzcheng 已提交
983 984

#ifdef _DEBUG_VIEW
L
lihui 已提交
985
    printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
H
hzcheng 已提交
986
    SSrcColumnInfo colInfo[256] = {0};
987
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
988
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
H
hzcheng 已提交
989 990
                       trsupport->localBuffer->numOfElems, colInfo);
#endif
991
    
S
slguan 已提交
992
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
993 994
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
995 996 997
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
H
hzcheng 已提交
998 999 1000

    // each result for a vnode is ordered as an independant list,
    // then used as an input of loser tree for disk-based merge routine
1001 1002
    int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
                                    pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
1003 1004 1005 1006
    if (ret != 0) {
      /* set no disk space error info, and abort retry */
      return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    }
1007
  
H
Hongze Cheng 已提交
1008 1009
    // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
    // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
H
hjxilinx 已提交
1010
    // In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
H
Hongze Cheng 已提交
1011 1012
    int32_t numOfTotal = pState->numOfTotal;

1013
    int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
1014
    if (finished < numOfTotal) {
1015
      tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
1016 1017 1018 1019
      return tscFreeSubSqlObj(trsupport, pSql);
    }

    // all sub-queries are returned, start to local merge process
H
hjxilinx 已提交
1020
    pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
H
hzcheng 已提交
1021

S
slguan 已提交
1022
    tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
H
Hongze Cheng 已提交
1023
             pState->numOfTotal, pState->numOfRetrievedRows);
1024
    
1025
    SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
1026
    tscClearInterpInfo(pPQueryInfo);
1027

1028
    tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
H
hzcheng 已提交
1029 1030 1031 1032 1033 1034 1035 1036
                          &pPObj->cmd, &pPObj->res);
    tscTrace("%p build loser tree completed", pPObj);

    pPObj->res.precision = pSql->res.precision;
    pPObj->res.numOfRows = 0;
    pPObj->res.row = 0;

    // only free once
1037 1038
    tfree(trsupport->pState);
    
H
hzcheng 已提交
1039 1040 1041
    tscFreeSubSqlObj(trsupport, pSql);

    if (pPObj->fp == NULL) {
S
slguan 已提交
1042 1043
      tsem_wait(&pPObj->emptyRspSem);
      tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
1044

S
slguan 已提交
1045
      tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
    } else {
      // set the command flag must be after the semaphore been correctly set.
      pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
      if (pPObj->res.code == TSDB_CODE_SUCCESS) {
        (*pPObj->fp)(pPObj->param, pPObj, 0);
      } else {
        tscQueueAsyncRes(pPObj);
      }
    }
  }
}

void tscKillMetricQuery(SSqlObj *pSql) {
1059 1060 1061 1062
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
1063 1064 1065 1066 1067 1068
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
1069
    if (pSub == NULL) {
H
hzcheng 已提交
1070 1071
      continue;
    }
S
slguan 已提交
1072

H
hzcheng 已提交
1073 1074 1075 1076 1077
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
1078
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

S
slguan 已提交
1100
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
H
hzcheng 已提交
1101

S
slguan 已提交
1102
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
1103 1104 1105
  const int32_t table_index = 0;
  
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
S
slguan 已提交
1106
  if (pNew != NULL) {  // the sub query of two-stage super table query
1107
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1108
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
1109 1110
    
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
1111 1112

    // launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
H
hjxilinx 已提交
1113 1114
    STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, table_index);
    pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex;
1115

H
hjxilinx 已提交
1116
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
1117
  }
H
hzcheng 已提交
1118 1119 1120 1121

  return pNew;
}

S
slguan 已提交
1122
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
H
hzcheng 已提交
1123
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
1124 1125 1126 1127
  
  SSqlObj*  pParentSql = trsupport->pParentSqlObj;
  SSqlObj*  pSql = (SSqlObj *)tres;
  
H
hjxilinx 已提交
1128
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1129 1130
  assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
  
H
hjxilinx 已提交
1131
  int32_t idx = pTableMetaInfo->vnodeIndex;
H
hzcheng 已提交
1132 1133

  SVnodeSidList *vnodeInfo = NULL;
1134
  SVnodeDesc *   pSvd = NULL;
H
hjxilinx 已提交
1135 1136
  if (pTableMetaInfo->pMetricMeta != NULL) {
    vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
S
slguan 已提交
1137
    pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
H
hzcheng 已提交
1138 1139
  }

1140 1141
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
H
hjxilinx 已提交
1142
         pParentSql->numOfSubs == pState->numOfTotal);
1143
  
H
hjxilinx 已提交
1144
  if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1145
    // metric query is killed, Note: code must be less than 0
H
hzcheng 已提交
1146
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
hjxilinx 已提交
1147 1148
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
      code = -(int)(pParentSql->res.code);
H
hzcheng 已提交
1149
    } else {
1150
      code = pState->code;
H
hzcheng 已提交
1151
    }
H
hjxilinx 已提交
1152
    tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
H
hjxilinx 已提交
1153
             trsupport->subqueryIndex, code);
H
hzcheng 已提交
1154 1155 1156
  }

  /*
S
slguan 已提交
1157
   * if a query on a vnode is failed, all retrieve operations from vnode that occurs later
H
hzcheng 已提交
1158 1159
   * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
   * function to abort current and remain retrieve process.
S
slguan 已提交
1160 1161
   *
   * NOTE: threadsafe is required.
H
hzcheng 已提交
1162
   */
S
slguan 已提交
1163
  if (code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1164
    if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
H
hjxilinx 已提交
1165
      tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
1166
      atomic_val_compare_exchange_32(&pState->code, 0, code);
H
hzcheng 已提交
1167
    } else {  // does not reach the maximum retry count, go on
H
hjxilinx 已提交
1168
      tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry);
S
slguan 已提交
1169

H
hjxilinx 已提交
1170
      SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
S
slguan 已提交
1171 1172
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
L
lihui 已提交
1173
                 trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
H
hzcheng 已提交
1174

1175
        pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1176 1177
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
      } else {
1178
        SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
H
hjxilinx 已提交
1179
        assert(pNewQueryInfo->pMeterInfo[0]->pTableMeta != NULL && pNewQueryInfo->pMeterInfo[0]->pMetricMeta != NULL);
S
slguan 已提交
1180 1181 1182
        tscProcessSql(pNew);
        return;
      }
H
hzcheng 已提交
1183 1184 1185
    }
  }

1186
  if (pState->code != TSDB_CODE_SUCCESS) {  // failed, abort
H
hzcheng 已提交
1187
    if (vnodeInfo != NULL) {
H
hjxilinx 已提交
1188
      tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
H
hzcheng 已提交
1189
               vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
1190
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1191
    } else {
H
hjxilinx 已提交
1192
      tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
1193
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1194 1195
    }

1196
    tscRetrieveFromVnodeCallBack(param, tres, pState->code);
H
hzcheng 已提交
1197
  } else {  // success, proceed to retrieve data from dnode
L
lihui 已提交
1198 1199
    if (vnodeInfo != NULL) {
      tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
H
hzcheng 已提交
1200
             vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
H
hjxilinx 已提交
1201
             trsupport->subqueryIndex);
L
lihui 已提交
1202 1203 1204 1205
    } else {
      tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
             trsupport->subqueryIndex);
    }
H
hzcheng 已提交
1206 1207 1208 1209 1210

    taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
  }
}

1211
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1212 1213 1214 1215 1216
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

1217
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
1218
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
1219 1220
  pMsg += sizeof(pSql->res.qhandle);

1221
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
1222
  pRetrieveMsg->free = htons(pQueryInfo->type);
1223
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
1224

1225
  pSql->cmd.payloadLen = pMsg - pStart;
S
slguan 已提交
1226
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
H
hzcheng 已提交
1227

1228
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1229 1230
}

S
slguan 已提交
1231
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
1232 1233
  //SShellSubmitMsg *pShellMsg;
  //char *           pMsg;
H
hjxilinx 已提交
1234
  //STableMetaInfo * pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
S
slguan 已提交
1235

H
hjxilinx 已提交
1236
  //STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
H
hzcheng 已提交
1237

1238
  //pMsg = buf + tsRpcHeadSize;
H
hzcheng 已提交
1239

S
slguan 已提交
1240 1241
  //TODO set iplist
  //pShellMsg = (SShellSubmitMsg *)pMsg;
H
hjxilinx 已提交
1242 1243
  //pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
  //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip),
S
slguan 已提交
1244
  //         htons(pShellMsg->vnode));
H
hzcheng 已提交
1245 1246
}

1247
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1248 1249
  SShellSubmitMsg *pShellMsg;
  char *           pMsg, *pStart;
S
slguan 已提交
1250

H
hjxilinx 已提交
1251 1252
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
H
hzcheng 已提交
1253 1254 1255 1256
  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  pShellMsg = (SShellSubmitMsg *)pMsg;
1257 1258

  pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
H
hjxilinx 已提交
1259
  pShellMsg->vnode = 0; //htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode);
1260
  pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
1261

S
slguan 已提交
1262
  // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
S
slguan 已提交
1263
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
H
hjxilinx 已提交
1264 1265
//  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
//           htons(pShellMsg->vnode));
S
slguan 已提交
1266

S
slguan 已提交
1267 1268
  pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);

1269
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1270 1271
}

S
slguan 已提交
1272
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
S
slguan 已提交
1273 1274
  //TODO
//  SSqlCmd *       pCmd = &pSql->cmd;
H
hjxilinx 已提交
1275
//  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1276 1277
//
//  char *          pStart = buf + tsRpcHeadSize;
S
slguan 已提交
1278
//  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
S
slguan 已提交
1279
//
H
hjxilinx 已提交
1280 1281 1282
//  if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {  // pColumnModel == NULL, query on meter
//    STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
//    pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
S
slguan 已提交
1283
//  } else {  // query on metric
H
hjxilinx 已提交
1284 1285
//    SSuperTableMeta *  pMetricMeta = pTableMetaInfo->pMetricMeta;
//    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
S
slguan 已提交
1286 1287
//    pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
//  }
H
hzcheng 已提交
1288 1289 1290 1291 1292 1293
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
1294
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
1295
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
1296
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
1297

1298
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
1299

H
hjxilinx 已提交
1300
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
H
hjxilinx 已提交
1301
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1302 1303

  // meter query without tags values
H
hjxilinx 已提交
1304
  if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
S
slguan 已提交
1305
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
1306 1307
  }

H
hjxilinx 已提交
1308 1309
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
1310

S
slguan 已提交
1311
  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableSidExtInfo)) * pVnodeSidList->numOfSids;
H
hjxilinx 已提交
1312
  int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
1313

S
slguan 已提交
1314
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
1315 1316
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
1317 1318 1319
  }

  return size;
H
hzcheng 已提交
1320 1321
}

S
slguan 已提交
1322
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) {
H
hjxilinx 已提交
1323
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
1324

H
hjxilinx 已提交
1325 1326
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
1327

S
slguan 已提交
1328
  tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables);
H
hjxilinx 已提交
1329
  if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {
1330
#ifdef _DEBUG_VIEW
H
hjxilinx 已提交
1331
    tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
1332
#endif
S
slguan 已提交
1333
    STableSidExtInfo *pMeterInfo = (STableSidExtInfo *)pMsg;
H
hjxilinx 已提交
1334 1335 1336
    pMeterInfo->sid = htonl(pTableMeta->sid);
    pMeterInfo->uid = htobe64(pTableMeta->uid);
    pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
S
slguan 已提交
1337
    pMsg += sizeof(STableSidExtInfo);
1338
  } else {
H
hjxilinx 已提交
1339
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
1340

S
slguan 已提交
1341 1342 1343
    for (int32_t i = 0; i < numOfTables; ++i) {
      STableSidExtInfo *pMeterInfo = (STableSidExtInfo *)pMsg;
      STableSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
1344

1345 1346
      pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
      pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
1347
      pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
1348
      
S
slguan 已提交
1349
      pMsg += sizeof(STableSidExtInfo);
1350

1351 1352 1353 1354
      memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
      pMsg += pMetricMeta->tagLen;

#ifdef _DEBUG_VIEW
L
lihui 已提交
1355
      tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
1356 1357 1358
#endif
    }
  }
1359

1360 1361 1362
  return pMsg;
}

1363
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1364 1365
  SSqlCmd *pCmd = &pSql->cmd;

1366
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1367

S
slguan 已提交
1368 1369 1370 1371 1372
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }

1373
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
1374
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
1375
  
S
slguan 已提交
1376
  char *          pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
1377

H
hjxilinx 已提交
1378 1379
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
H
hzcheng 已提交
1380

S
slguan 已提交
1381
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
1382 1383

  int32_t msgLen = 0;
S
slguan 已提交
1384
  int32_t numOfTables = 0;
H
hzcheng 已提交
1385

H
hjxilinx 已提交
1386
  if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {
S
slguan 已提交
1387
    numOfTables = 1;
H
hzcheng 已提交
1388

H
hjxilinx 已提交
1389 1390
//    tscTrace("%p query on vnode: %d, number of sid:%d, meter id: %s", pSql,
//             pTableMeta->vpeerDesc[pTableMeta->index].vnode, 1, pTableMetaInfo->name);
H
hzcheng 已提交
1391

H
hjxilinx 已提交
1392 1393
//    pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode);
    pQueryMsg->uid = pTableMeta->uid;
H
hzcheng 已提交
1394
    pQueryMsg->numOfTagsCols = 0;
1395
  } else {  // query on super table
H
hjxilinx 已提交
1396 1397
    if (pTableMetaInfo->vnodeIndex < 0) {
      tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
1398 1399 1400
      return -1;
    }

H
hjxilinx 已提交
1401
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
1402 1403
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

S
slguan 已提交
1404 1405 1406
    numOfTables = pVnodeSidList->numOfSids;
    if (numOfTables <= 0) {
      tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
H
hzcheng 已提交
1407 1408 1409
      return -1;  // error
    }

S
slguan 已提交
1410
    tscTrace("%p query on vid:%d, number of sid:%d", pSql, vnodeId, numOfTables);
H
hzcheng 已提交
1411 1412 1413
    pQueryMsg->vnode = htons(vnodeId);
  }

S
slguan 已提交
1414
  pQueryMsg->numOfSids = htonl(numOfTables);
H
hjxilinx 已提交
1415
  pQueryMsg->numOfTagsCols = htons(pTableMetaInfo->numOfTags);
H
hzcheng 已提交
1416

1417 1418 1419
  if (pQueryInfo->order.order == TSQL_SO_ASC) {
    pQueryMsg->skey = htobe64(pQueryInfo->stime);
    pQueryMsg->ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
1420
  } else {
1421 1422
    pQueryMsg->skey = htobe64(pQueryInfo->etime);
    pQueryMsg->ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
1423 1424
  }

1425 1426
  pQueryMsg->order = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
H
hzcheng 已提交
1427

1428
  pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
H
hzcheng 已提交
1429

1430 1431
  pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
H
hzcheng 已提交
1432

1433
  pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
H
hzcheng 已提交
1434

1435
  if (pQueryInfo->colList.numOfCols <= 0) {
H
hjxilinx 已提交
1436
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
H
hzcheng 已提交
1437 1438 1439
    return -1;
  }

1440
  pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
1441
  pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
H
hjxilinx 已提交
1442
  pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
1443
  
1444 1445
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
H
hzcheng 已提交
1446 1447 1448
    return -1;
  }

1449 1450
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1451 1452 1453
    return -1;
  }

1454
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1455

H
hjxilinx 已提交
1456
  if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {  // query on meter
H
hzcheng 已提交
1457 1458 1459 1460 1461
    pQueryMsg->tagLength = 0;
  } else {  // query on metric
    pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
  }

1462 1463
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
1464

1465
  if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
1466 1467
    tscError("%p illegal value of number of output columns in query msg: %d", pSql,
             pQueryInfo->fieldsInfo.numOfOutputCols);
H
hzcheng 已提交
1468 1469 1470 1471
    return -1;
  }

  // set column list ids
1472
  char *   pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
1473
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
1474

1475 1476
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
1477
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
1478

H
hjxilinx 已提交
1479
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
H
hzcheng 已提交
1480
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
S
slguan 已提交
1481
      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
H
hjxilinx 已提交
1482
               htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
H
hzcheng 已提交
1483 1484
               pColSchema->name);

S
slguan 已提交
1485
      return -1;  // 0 means build msg failed
H
hzcheng 已提交
1486 1487 1488 1489 1490
    }

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
    pQueryMsg->colList[i].type = htons(pColSchema->type);
S
slguan 已提交
1491
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
1492

S
slguan 已提交
1493 1494 1495
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
1496

S
slguan 已提交
1497 1498 1499 1500
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
1501

S
slguan 已提交
1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
1513

S
slguan 已提交
1514 1515 1516 1517 1518
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
1519 1520 1521 1522
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
1523
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hzcheng 已提交
1524

H
hjxilinx 已提交
1525
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
1526
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
1527

S
slguan 已提交
1528
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
1529 1530 1531
      hasArithmeticFunction = true;
    }

H
hjxilinx 已提交
1532
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
H
hzcheng 已提交
1533 1534 1535 1536 1537 1538 1539
      /* column id is not valid according to the cached metermeta, the meter meta is expired */
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

    pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
S
slguan 已提交
1540
    pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
1541

S
slguan 已提交
1542
    pSqlFuncExpr->functionId = htons(pExpr->functionId);
H
hzcheng 已提交
1543 1544 1545 1546 1547 1548 1549 1550 1551
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
1552 1553 1554

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
1555 1556 1557 1558 1559 1560 1561 1562 1563 1564
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
1565 1566
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
1567
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

1579
  // serialize the table info (sid, uid, tags)
S
slguan 已提交
1580
  pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vnode), pMsg);
H
hzcheng 已提交
1581

S
slguan 已提交
1582
  // only include the required tag column schema. If a tag is not required, it won't be sent to vnode
H
hjxilinx 已提交
1583
  if (pTableMetaInfo->numOfTags > 0) {
S
slguan 已提交
1584
    // always transfer tag schema to vnode if exists
H
hjxilinx 已提交
1585
    SSchema *pTagSchema = tscGetTableTagSchema(pTableMeta);
H
hzcheng 已提交
1586

H
hjxilinx 已提交
1587 1588
    for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
      if (pTableMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
S
slguan 已提交
1589
        SSchema tbSchema = {
S
slguan 已提交
1590
            .bytes = TSDB_TABLE_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
S
slguan 已提交
1591
        memcpy(pMsg, &tbSchema, sizeof(SSchema));
H
hzcheng 已提交
1592
      } else {
H
hjxilinx 已提交
1593
        memcpy(pMsg, &pTagSchema[pTableMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
H
hzcheng 已提交
1594 1595
      }

S
slguan 已提交
1596
      pMsg += sizeof(SSchema);
H
hzcheng 已提交
1597 1598 1599
    }
  }

1600
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1601 1602
  if (pGroupbyExpr->numOfGroupCols != 0) {
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
1603 1604
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
      SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

      *((int16_t *)pMsg) += pCol->colIdx;
      pMsg += sizeof(pCol->colIdx);

      *((int16_t *)pMsg) += pCol->colIdxInBuf;
      pMsg += sizeof(pCol->colIdxInBuf);

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
1619 1620 1621
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
1622 1623 1624
    }
  }

1625 1626 1627 1628
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
1629 1630 1631 1632 1633 1634 1635 1636
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

1637
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
1638
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vnodeIndex);
1639
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
1640 1641

    // todo refactor
1642 1643
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
1644 1645 1646 1647

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
1648 1649
  }

S
slguan 已提交
1650 1651
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
1652 1653
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
1654 1655 1656 1657 1658 1659
  }

  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1660
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hzcheng 已提交
1661 1662

  assert(msgLen + minMsgSize() <= size);
1663 1664

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1665 1666
}

1667 1668
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1669
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
1670
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
1671

S
slguan 已提交
1672 1673 1674 1675 1676
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1677
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
1678

1679
  assert(pCmd->numOfClause == 1);
H
hjxilinx 已提交
1680 1681
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
1682

1683
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1684 1685
}

1686 1687
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1688
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
1689 1690 1691 1692
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1693

1694
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
1695
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
S
slguan 已提交
1696
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
1697

1698
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1699 1700
}

1701 1702
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1703
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
1704 1705 1706 1707
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1708

1709
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
1710

1711 1712
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1713

1714 1715
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1716

1717
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1718

1719 1720 1721 1722 1723 1724 1725 1726
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1727

1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1741

S
slguan 已提交
1742
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
1743
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1744 1745
}

1746 1747
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1748
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
1749

S
slguan 已提交
1750 1751 1752 1753 1754
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1755
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
1756

1757 1758 1759
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
1760

1761 1762 1763 1764
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1765 1766
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1767
  }
H
hzcheng 已提交
1768

1769
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1770
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1771
  } else {
S
slguan 已提交
1772
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1773
  }
H
hzcheng 已提交
1774

1775
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1776 1777
}

1778 1779
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1780
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
1781

S
slguan 已提交
1782 1783 1784 1785
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1786

S
slguan 已提交
1787
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
1788 1789
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1790

1791 1792
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1793
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1794

S
slguan 已提交
1795 1796 1797 1798 1799
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1800
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1801

H
hjxilinx 已提交
1802 1803
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
1804
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1805

S
slguan 已提交
1806
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1807
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1808 1809
}

1810 1811
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1812
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1813

S
slguan 已提交
1814 1815 1816 1817
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1818

1819
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
H
hjxilinx 已提交
1820 1821
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1822
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1823

S
slguan 已提交
1824
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1825
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1826 1827
}

1828
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1829
  SSqlCmd *pCmd = &pSql->cmd;
1830
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1831 1832 1833 1834
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1835

1836
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
H
hjxilinx 已提交
1837 1838
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDrop->ip, pTableMetaInfo->name);
S
slguan 已提交
1839
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1840

1841
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1842 1843
}

1844 1845
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1846
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1847
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1848

S
slguan 已提交
1849 1850 1851 1852
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1853

1854
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
H
hjxilinx 已提交
1855 1856
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1857

1858
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1859 1860
}

1861 1862
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1863
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1864

S
slguan 已提交
1865 1866 1867 1868
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1869

1870
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
H
hjxilinx 已提交
1871 1872
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1873
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1874

1875
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1876 1877
}

1878
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1879
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1880
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1881
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1882
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1883

S
slguan 已提交
1884 1885 1886
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1887
  }
H
hzcheng 已提交
1888

1889
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1890

H
hjxilinx 已提交
1891 1892
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1893
  if (nameLen > 0) {
H
hjxilinx 已提交
1894
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1895
  } else {
S
slguan 已提交
1896
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1897 1898
  }

1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1911

1912 1913 1914 1915
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1916
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1917
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1918 1919
}

1920
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1921
  SSqlCmd *pCmd = &pSql->cmd;
1922
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1923

S
slguan 已提交
1924 1925 1926 1927
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1928

1929
  SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
1930 1931 1932
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1933
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1934 1935
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1936
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1937 1938
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1939
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1940 1941 1942
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1943 1944
}

1945
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1946 1947
  SSqlCmd *pCmd = &(pSql->cmd);

1948
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1949

1950
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1951
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1952 1953
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1954
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1955
  }
1956

1957 1958 1959
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1960 1961 1962 1963

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1964
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1965
  int              msgLen = 0;
S
slguan 已提交
1966
  SSchema *        pSchema;
H
hzcheng 已提交
1967
  int              size = 0;
1968 1969 1970
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1971
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1972 1973

  // Reallocate the payload size
1974
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1975 1976
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1977
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1978
  }
H
hzcheng 已提交
1979 1980


1981
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1982
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1983 1984

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1985
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1986

1987 1988 1989 1990
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;

H
hzcheng 已提交
1991 1992 1993 1994
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1995
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1996

1997 1998 1999
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
2000
    pMsg += sizeof(STagData);
2001
  } else {  // create (super) table
2002
    pSchema = (SSchema *)pCreateTableMsg->schema;
2003

H
hzcheng 已提交
2004
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
2005
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2006 2007 2008 2009

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
2010

H
hzcheng 已提交
2011 2012 2013 2014
      pSchema++;
    }

    pMsg = (char *)pSchema;
2015 2016
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
2017

2018 2019 2020
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
2021 2022 2023
    }
  }

2024
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
2025

S
slguan 已提交
2026
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
2027
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
2028
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2029
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
2030 2031

  assert(msgLen + minMsgSize() <= size);
2032
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2033 2034 2035
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
2036
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2037
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
2038 2039 2040
         TSDB_EXTRA_PAYLOAD_SIZE;
}

2041
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
2042
  SCMAlterTableMsg *pAlterTableMsg;
2043
  char *          pMsg;
H
hzcheng 已提交
2044 2045 2046
  int             msgLen = 0;
  int             size = 0;

2047 2048 2049
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
2050
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2051 2052

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
2053 2054 2055 2056
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2057

2058
  pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
2059

H
hjxilinx 已提交
2060
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
2061

2062 2063
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hjxilinx 已提交
2064
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
2065
  pAlterTableMsg->type = htons(pAlterInfo->type);
2066

2067
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
2068
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
2069

S
slguan 已提交
2070
  SSchema *pSchema = pAlterTableMsg->schema;
2071 2072
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2073 2074 2075 2076 2077 2078 2079 2080 2081

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
2082
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
2083
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2084
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
2085 2086

  assert(msgLen + minMsgSize() <= size);
2087

2088
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2089 2090
}

2091
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2092
  SSqlCmd *pCmd = &pSql->cmd;
2093
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
2094
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
2095

S
slguan 已提交
2096 2097 2098 2099
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
2100

2101
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
H
hjxilinx 已提交
2102 2103
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
2104

2105
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2106 2107
}

2108
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2109 2110 2111
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
2112

S
slguan 已提交
2113 2114 2115
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
2116
  }
S
slguan 已提交
2117

S
slguan 已提交
2118 2119 2120 2121
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
2122

2123
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2124 2125
}

2126
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
2127
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
2128 2129 2130
    return pRes->code;
  }

2131
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
H
hjxilinx 已提交
2132 2133
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
    pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
2148

2149
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2150

H
hzcheng 已提交
2151 2152 2153 2154 2155 2156 2157
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

2158
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2159
  } else {
S
slguan 已提交
2160
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
2176
  SSqlCmd *       pCmd = &pSql->cmd;
H
hjxilinx 已提交
2177
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
2178

H
hjxilinx 已提交
2179 2180 2181
  STableInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
2182 2183 2184 2185
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
2186 2187
  SSqlCmd *pCmd = &pSql->cmd;

2188
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
2189
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2190 2191

  int32_t numOfRes = 0;
2192
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
H
hjxilinx 已提交
2193
    numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
2194 2195 2196 2197 2198 2199 2200 2201 2202 2203
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2204 2205
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
2206 2207

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
2208
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2209 2210 2211 2212
  }

  pRes->row = 0;

2213
  uint8_t code = pRes->code;
H
hzcheng 已提交
2214
  if (pSql->fp) {  // async retrieve metric data
2215 2216
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
2217 2218 2219 2220 2221 2222 2223 2224
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
2225
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
2226

2227
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2228
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
2229
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
2230
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
2231
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
2232

S
slguan 已提交
2233 2234 2235 2236 2237
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

2238
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
2239 2240 2241 2242 2243

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
2244
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
2245
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
2246

2247
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2248 2249
}

2250
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
2251
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
2252
  char *         pMsg;
H
hzcheng 已提交
2253 2254 2255 2256 2257
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
2258 2259 2260 2261
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
2262 2263 2264 2265
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

2266 2267 2268
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
2269
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2270

2271
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
2272
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
2273
  pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
S
slguan 已提交
2274

2275
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
2276

2277
  if (pSql->cmd.createOnDemand) {
H
hzcheng 已提交
2278 2279 2280 2281
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

S
slguan 已提交
2282
  msgLen = pMsg - (char*)pInfoMsg;
H
hzcheng 已提交
2283
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2284
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
2285 2286 2287 2288

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
2289
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2290 2291
}

S
slguan 已提交
2292 2293
/**
 *  multi meter meta req pkg format:
2294
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
2295 2296
 *      no used         4B
 **/
2297
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
2310
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
2311

2312
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
2313
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
2314 2315

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
2316
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
2317 2318 2319 2320
  }

  tfree(tmpData);

2321
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
2322
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
2323 2324 2325

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
2326
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
2327 2328 2329 2330 2331
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hzcheng 已提交
2332 2333
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
  const int32_t defaultSize =
S
slguan 已提交
2334
      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
2335
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
2336

S
slguan 已提交
2337
  int32_t n = 0;
2338 2339
  for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
2340
  }
S
slguan 已提交
2341

H
hjxilinx 已提交
2342
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
2343 2344
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
2345
  }
2346

S
slguan 已提交
2347
  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
S
slguan 已提交
2348
  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
H
hjxilinx 已提交
2349 2350
  
  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndexEx);
S
slguan 已提交
2351

H
hjxilinx 已提交
2352
  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
S
slguan 已提交
2353 2354

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
2355 2356
}

2357
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2358
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
2359 2360
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
2361
  int             tableIndex = 0;
H
hzcheng 已提交
2362

2363 2364 2365
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2366
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
2367

H
hjxilinx 已提交
2368
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2369 2370

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
2371 2372 2373 2374
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2375 2376 2377 2378 2379

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
2380
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2381 2382 2383

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
2384
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
2385
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
2386

S
slguan 已提交
2387
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
2388 2389 2390 2391 2392

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
2393
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
2394

S
slguan 已提交
2395
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
2396
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
2397

S
slguan 已提交
2398 2399 2400
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
2401
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
2402
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
2403 2404 2405 2406

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

2407
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2408 2409
    pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, i);
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
2410 2411 2412 2413

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
2414 2415
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
2416 2417 2418 2419 2420

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
      SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
H
hjxilinx 已提交
2421
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
2422
        condLen = strlen(pCond->cond) + 1;
2423

H
hjxilinx 已提交
2424
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
2425 2426 2427 2428 2429
        if (!ret) {
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
          return 0;
        }
      }
H
hzcheng 已提交
2430 2431
    }

S
slguan 已提交
2432
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
2433

S
slguan 已提交
2434 2435 2436
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
2437

S
slguan 已提交
2438 2439 2440
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
2441

S
slguan 已提交
2442
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
2443 2444 2445 2446 2447 2448 2449
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
2450 2451
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
2452 2453
    }

2454
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
2455

H
hjxilinx 已提交
2456
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
2457 2458 2459 2460 2461
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
2462 2463
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
2464 2465 2466 2467 2468 2469 2470 2471
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
2472 2473
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
          SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
2474 2475
          SColIndexEx *pDestCol = (SColIndexEx *)pMsg;

H
hjxilinx 已提交
2476 2477 2478 2479
          pDestCol->colIdxInBuf = 0;
          pDestCol->colIdx = htons(pCol->colIdx);
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
2480
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
2481

H
hjxilinx 已提交
2482
          pMsg += sizeof(SColIndexEx);
S
slguan 已提交
2483 2484
        }
      }
H
hzcheng 已提交
2485
    }
S
slguan 已提交
2486

H
hjxilinx 已提交
2487 2488
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
2489 2490 2491

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
2492 2493 2494 2495
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2496
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_META;
H
hzcheng 已提交
2497
  assert(msgLen + minMsgSize() <= size);
2498 2499
  
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2500 2501
}

2502
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
2503 2504 2505 2506
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
2507
  size += sizeof(SQqueryList);
H
hzcheng 已提交
2508 2509 2510

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
2511
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
2512 2513 2514
    tpSql = tpSql->next;
  }

S
slguan 已提交
2515
  size += sizeof(SStreamList);
H
hzcheng 已提交
2516 2517
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
2518
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
2519 2520 2521 2522 2523 2524
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2525
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2526 2527 2528 2529 2530 2531 2532 2533 2534
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

2535
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
2536 2537 2538 2539
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2553
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
2554 2555 2556 2557 2558 2559

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

int tscProcessMeterMetaRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2560
  STableMetaMsg *pMeta;
S
slguan 已提交
2561
  SSchema *   pSchema;
H
hzcheng 已提交
2562

H
hjxilinx 已提交
2563
  pMeta = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
2564 2565

  pMeta->sid = htonl(pMeta->sid);
S
slguan 已提交
2566
  pMeta->sversion = htons(pMeta->sversion);
H
hzcheng 已提交
2567 2568
  pMeta->vgid = htonl(pMeta->vgid);
  pMeta->uid = htobe64(pMeta->uid);
S
slguan 已提交
2569
  pMeta->contLen = htons(pMeta->contLen);
H
hzcheng 已提交
2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582

  if (pMeta->sid < 0 || pMeta->vgid < 0) {
    tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
    return TSDB_CODE_INVALID_VALUE;
  }

  pMeta->numOfColumns = htons(pMeta->numOfColumns);

  if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMeta->numOfTags);
    return TSDB_CODE_INVALID_VALUE;
  }

2583
  if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) {
H
hzcheng 已提交
2584 2585 2586 2587 2588 2589 2590 2591
    tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    return TSDB_CODE_INVALID_VALUE;
  }

  for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
    pMeta->vpeerDesc[i].vnode = htonl(pMeta->vpeerDesc[i].vnode);
  }

H
hjxilinx 已提交
2592
  int32_t rowSize = 0;
S
slguan 已提交
2593
  pSchema = (SSchema *)(pSql->res.pRsp + sizeof(STableMeta));
H
hzcheng 已提交
2594 2595 2596 2597 2598 2599 2600 2601

  int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);

    // ignore the tags length
    if (i < pMeta->numOfColumns) {
H
hjxilinx 已提交
2602
      rowSize += pSchema->bytes;
H
hzcheng 已提交
2603 2604 2605 2606
    }
    pSchema++;
  }

S
slguan 已提交
2607 2608 2609
//  rsp += numOfTotalCols * sizeof(SSchema);
//
//  int32_t  tagLen = 0;
H
hjxilinx 已提交
2610
//  SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
S
slguan 已提交
2611
//
S
[TD-10]  
slguan 已提交
2612
//  if (pMeta->tableType == TSDB_CHILD_TABLE) {
S
slguan 已提交
2613 2614 2615 2616 2617 2618 2619
//    for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
//      tagLen += pTagsSchema[i].bytes;
//    }
//  }
//
//  rsp += tagLen;
//  int32_t size = (int32_t)(rsp - (char *)pMeta);
H
hzcheng 已提交
2620 2621

  // pMeta->index = rand() % TSDB_VNODES_SUPPORT;
S
slguan 已提交
2622
//  pMeta->index = 0;
H
hzcheng 已提交
2623 2624

  // todo add one more function: taosAddDataIfNotExists();
H
hjxilinx 已提交
2625 2626
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
2627

H
hjxilinx 已提交
2628
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCachePut(tscCacheHandle, pTableMetaInfo->name, (char *)pMeta,
S
slguan 已提交
2629
                                                                  pMeta->contLen, tsMeterMetaKeepTimer);
2630
  // todo handle out of memory case
H
hjxilinx 已提交
2631
  if (pTableMetaInfo->pTableMeta == NULL) return 0;
H
hzcheng 已提交
2632 2633 2634 2635

  return TSDB_CODE_OTHERS;
}

S
slguan 已提交
2636 2637
/**
 *  multi meter meta rsp pkg format:
2638
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
  uint8_t  ieType;
  int32_t  totalNum;
  int32_t  i;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

2658
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
2659
  totalNum = htonl(pInfo->numOfTables);
2660
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
2661 2662

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
2663
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
2664
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
    pMeta->vgid = htonl(pMeta->vgid);
    pMeta->uid = htobe64(pMeta->uid);

    if (pMeta->sid <= 0 || pMeta->vgid < 0) {
      tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
    //  }
S
slguan 已提交
2738
  }
H
hjxilinx 已提交
2739
  
S
slguan 已提交
2740 2741 2742 2743 2744 2745
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
2746
int tscProcessMetricMetaRsp(SSqlObj *pSql) {
S
slguan 已提交
2747
  SSuperTableMeta *pMeta;
H
hzcheng 已提交
2748
  uint8_t      ieType;
S
slguan 已提交
2749 2750 2751 2752
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;

  char *rsp = pSql->res.pRsp;
H
hzcheng 已提交
2753 2754 2755 2756 2757 2758 2759 2760 2761

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;

S
slguan 已提交
2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
2778
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
2779 2780

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
2781
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
2782

S
slguan 已提交
2783
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
2784 2785 2786
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

S
slguan 已提交
2787
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableSidExtInfo *);
H
hzcheng 已提交
2788

2789 2790
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2791 2792 2793
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2794

S
slguan 已提交
2795
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
2796
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2797

S
slguan 已提交
2798
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2799 2800
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2801

S
slguan 已提交
2802
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2803

S
slguan 已提交
2804 2805
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2806
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2807

2808 2809
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2810

S
slguan 已提交
2811
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2812

S
slguan 已提交
2813
      pBuf += sizeof(SVnodeSidList) + sizeof(STableSidExtInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2814
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2815

S
slguan 已提交
2816
      size_t elemSize = sizeof(STableSidExtInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2817
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2818 2819
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2820

S
slguan 已提交
2821 2822
        ((STableSidExtInfo *)pBuf)->uid = htobe64(((STableSidExtInfo *)pBuf)->uid);
        ((STableSidExtInfo *)pBuf)->sid = htonl(((STableSidExtInfo *)pBuf)->sid);
2823

2824 2825
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2826
      }
H
hzcheng 已提交
2827
    }
S
slguan 已提交
2828

2829
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2830 2831
  }

2832
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2833 2834 2835
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2836 2837
    STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2838

S
slguan 已提交
2839 2840 2841
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2842

S
slguan 已提交
2843
    // release the used metricmeta
H
hjxilinx 已提交
2844
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
H
hzcheng 已提交
2845

H
hjxilinx 已提交
2846
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2847 2848 2849 2850
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2851
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2852 2853 2854
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2855 2856
  }

S
slguan 已提交
2857 2858 2859 2860 2861 2862 2863 2864 2865 2866
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);

  return pSql->res.code;
H
hzcheng 已提交
2867 2868 2869 2870 2871 2872
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2873
  STableMetaMsg * pMetaMsg;
2874
  SCMShowRsp *pShow;
S
slguan 已提交
2875
  SSchema *    pSchema;
H
hzcheng 已提交
2876 2877
  char         key[20];

2878 2879 2880 2881 2882
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);  //?

H
hjxilinx 已提交
2883
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2884

2885
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2886
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2887 2888
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2889
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2890
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2891

H
hjxilinx 已提交
2892
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2893

H
hjxilinx 已提交
2894 2895 2896
  pSchema = (SSchema *)((char *)pMetaMsg + sizeof(STableMeta));
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2897 2898 2899 2900
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2901
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2902 2903
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2904
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hzcheng 已提交
2905

H
hjxilinx 已提交
2906 2907 2908
  int32_t size = pMetaMsg->numOfColumns * sizeof(SSchema) + sizeof(STableMeta);
  pTableMetaInfo->pTableMeta =
      (STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pMetaMsg, size, tsMeterMetaKeepTimer);
2909
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
H
hjxilinx 已提交
2910
  SSchema *pMeterSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2911

H
hjxilinx 已提交
2912
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMetaMsg->numOfColumns);
S
slguan 已提交
2913 2914
  SColumnIndex index = {0};

H
hjxilinx 已提交
2915
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
S
slguan 已提交
2916
    index.columnIndex = i;
2917 2918
    tscColumnBaseInfoInsert(pQueryInfo, &index);
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pMeterSchema[i]);
H
hjxilinx 已提交
2919 2920 2921
    
    pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
                     pMeterSchema[i].type, pMeterSchema[i].bytes, pMeterSchema[i].bytes);
H
hzcheng 已提交
2922 2923
  }

2924
  tscFieldInfoCalOffset(pQueryInfo);
H
hzcheng 已提交
2925 2926 2927 2928
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2929
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2930 2931 2932
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2933
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2934
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2935 2936
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2937 2938 2939
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2940
//  SIpList *    pIpList;
2941
//  char *rsp = pRes->pRsp + sizeof(SCMConnectRsp);
S
slguan 已提交
2942 2943
//  pIpList = (SIpList *)rsp;
//  tscSetMgmtIpList(pIpList);
H
hzcheng 已提交
2944

S
slguan 已提交
2945
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2946 2947 2948 2949 2950 2951 2952 2953
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2954
  STscObj *       pObj = pSql->pTscObj;
H
hjxilinx 已提交
2955
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
2956

H
hjxilinx 已提交
2957
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2958 2959 2960 2961
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2962
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2963 2964 2965 2966
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2967
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
2968

H
hjxilinx 已提交
2969 2970
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2971 2972 2973 2974 2975 2976 2977 2978
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2979 2980
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2981
   */
H
hjxilinx 已提交
2982 2983
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2984

H
hjxilinx 已提交
2985 2986 2987
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2988 2989 2990 2991 2992 2993
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2994
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
2995

H
hjxilinx 已提交
2996 2997
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2998 2999 3000
    return 0;
  }

H
hjxilinx 已提交
3001 3002
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
3003

H
hjxilinx 已提交
3004 3005
  if (pTableMetaInfo->pTableMeta) {
    bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pTableMetaInfo);
H
hzcheng 已提交
3006

H
hjxilinx 已提交
3007 3008
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3009

3010
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
3011
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
3012
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

3027
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
3028 3029 3030
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
3031
  pRes->data = NULL;
S
slguan 已提交
3032
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3033 3034 3035 3036
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
3037 3038 3039
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
3040
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
3041 3042 3043 3044

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
  pRes->offset = htobe64(pRetrieve->offset);
S
slguan 已提交
3045
  pRes->useconds = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
3046
  pRes->data = pRetrieve->data;
H
hjxilinx 已提交
3047
  
3048 3049
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
3050

weixin_48148422's avatar
weixin_48148422 已提交
3051
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
3052 3053 3054 3055 3056
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
    
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
3057 3058
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
3059
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
3060
    p += sizeof(int32_t);
S
slguan 已提交
3061
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
3062 3063 3064 3065
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
3066
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
3067
    }
3068 3069
  }

H
hzcheng 已提交
3070 3071
  pRes->row = 0;

S
slguan 已提交
3072
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
3073 3074 3075 3076 3077

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
3078 3079
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
3080
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3081

S
slguan 已提交
3082
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
3083 3084 3085

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
3086

3087
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
3088 3089 3090 3091 3092 3093
  pRes->row = 0;
  return 0;
}

void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code);

H
hjxilinx 已提交
3094
static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hzcheng 已提交
3095
  int32_t code = TSDB_CODE_SUCCESS;
3096

S
slguan 已提交
3097 3098 3099 3100 3101
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
    tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
3102

H
hzcheng 已提交
3103 3104 3105
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
3106

3107
  tscAddSubqueryInfo(&pNew->cmd);
3108 3109 3110 3111 3112

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

  pNew->cmd.createOnDemand = pSql->cmd.createOnDemand;  // create table if not exists
S
slguan 已提交
3113 3114 3115
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("%p malloc failed for payload to get meter meta", pSql);
    free(pNew);
3116

S
slguan 已提交
3117 3118 3119
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
3120
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
3121
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
3122

H
hjxilinx 已提交
3123
  strcpy(pNewMeterMetaInfo->name, pTableMetaInfo->name);
3124
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
3125
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
3126 3127

  if (pSql->fp == NULL) {
S
slguan 已提交
3128 3129
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3130 3131

    code = tscProcessSql(pNew);
S
slguan 已提交
3132

3133 3134 3135 3136
    /*
     * Update cache only on succeeding in getting metermeta.
     * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
     */
H
hzcheng 已提交
3137
    if (code == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
3138 3139
      pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**) &pNewMeterMetaInfo->pTableMeta);
      assert(pTableMetaInfo->pTableMeta != NULL);
H
hzcheng 已提交
3140 3141
    }

H
hjxilinx 已提交
3142
    tscTrace("%p get meter meta complete, code:%d, pTableMeta:%p", pSql, code, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
3143 3144 3145 3146 3147
    tscFreeSqlObj(pNew);

  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
3148
    pNew->sqlstr = strdup(pSql->sqlstr);
H
hzcheng 已提交
3149 3150 3151 3152 3153 3154 3155 3156 3157 3158

    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

H
hjxilinx 已提交
3159 3160
int tscGetMeterMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
3161

H
hjxilinx 已提交
3162 3163 3164
  // If this STableMetaInfo owns a metermeta, release it first
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
3165 3166
  }
  
H
hjxilinx 已提交
3167 3168 3169 3170 3171
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
    STableInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
    tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns,
             tinfo.numOfTags);
H
hzcheng 已提交
3172 3173 3174 3175 3176 3177 3178 3179

    return TSDB_CODE_SUCCESS;
  }

  /*
   * for async insert operation, release data block buffer before issue new object to get metermeta
   * because in metermeta callback function, the tscParse function will generate the submit data blocks
   */
H
hjxilinx 已提交
3180
  return doGetMeterMetaFromServer(pSql, pTableMetaInfo);
H
hzcheng 已提交
3181 3182
}

H
hjxilinx 已提交
3183
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
3184
  pSql->cmd.createOnDemand = createIfNotExists;
H
hjxilinx 已提交
3185
  return tscGetMeterMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
3186 3187 3188 3189
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
3190
 *
H
hzcheng 已提交
3191 3192 3193 3194 3195
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
3196
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
3197 3198 3199 3200 3201 3202 3203
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
3204
 * @param tableId       meter id
H
hzcheng 已提交
3205 3206
 * @return              status code
 */
S
slguan 已提交
3207
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
3208 3209
  int code = 0;

H
hzcheng 已提交
3210 3211
  // handle metric meta renew process
  SSqlCmd *pCmd = &pSql->cmd;
3212 3213

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
3214
  STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3215 3216 3217 3218 3219

  // enforce the renew metermeta operation in async model
  if (pSql->fp == NULL) pSql->fp = (void *)0x1;

  /*
S
slguan 已提交
3220
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
3221 3222
   * 2. if get metermeta failed, still get the metermeta
   */
H
hjxilinx 已提交
3223 3224
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnMetric(pCmd)) {
    if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
3225
      tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
3226
               pTableMetaInfo->numOfTags, pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
3227
    }
3228

3229
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
3230
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
3231

H
hjxilinx 已提交
3232
    code = doGetMeterMetaFromServer(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
3233
  } else {
H
hjxilinx 已提交
3234
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
3235 3236
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247
  }

  if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
    if (pSql->fp == (void *)0x1) {
      pSql->fp = NULL;
    }
  }

  return code;
}

3248
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
3249 3250
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
3251 3252

  /*
3253
   * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
H
hzcheng 已提交
3254
   */
3255
  bool    required = false;
3256

3257
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
3258
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
3259 3260
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
3261 3262
    STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
S
slguan 已提交
3263

H
hjxilinx 已提交
3264
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
S
slguan 已提交
3265

3266
    SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
S
slguan 已提交
3267
    if (ppMeta == NULL) {
3268
      required = true;
S
slguan 已提交
3269 3270
      break;
    } else {
H
hjxilinx 已提交
3271
      pTableMetaInfo->pMetricMeta = ppMeta;
S
slguan 已提交
3272 3273
    }
  }
H
hzcheng 已提交
3274

3275 3276
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
3277 3278 3279
    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
3280
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
3281 3282 3283 3284
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  pNew->cmd.command = TSDB_SQL_METRIC;
3285 3286
  
  SQueryInfo *pNewQueryInfo = NULL;
3287 3288 3289
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
3290
  
3291
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
3292
    STableMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
3293

H
hjxilinx 已提交
3294 3295
    STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
    tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
3296 3297 3298 3299 3300 3301
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
3302

3303
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
3304

3305 3306
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
3307

3308 3309
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
3310 3311 3312 3313 3314
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
3315

3316 3317 3318 3319
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
3320 3321 3322

  tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
  if (pSql->fp == NULL) {
S
slguan 已提交
3323 3324
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3325 3326

    code = tscProcessSql(pNew);
S
slguan 已提交
3327

3328 3329 3330 3331
    if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
        char tagstr[TSDB_MAX_TAGS_LEN] = {0};
    
H
hjxilinx 已提交
3332 3333
        STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
        tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
S
slguan 已提交
3334 3335

#ifdef _DEBUG_VIEW
3336
        printf("create metric key:%s, index:%d\n", tagstr, i);
S
slguan 已提交
3337
#endif
3338
    
H
hjxilinx 已提交
3339 3340
        taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
        pTableMetaInfo->pMetricMeta = (SSuperTableMeta *) taosCacheAcquireByName(tscCacheHandle, tagstr);
3341
      }
S
slguan 已提交
3342 3343
    }

H
hzcheng 已提交
3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357
    tscFreeSqlObj(pNew);
  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

void tscInitMsgs() {
S
slguan 已提交
3358 3359 3360
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
3361 3362

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
3363
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
3364

3365 3366
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
3367 3368

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
3369
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropAcctMsg;
H
hzcheng 已提交
3370 3371 3372
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
3373
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
3374 3375 3376
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
3377 3378 3379 3380 3381 3382 3383
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_META] = tscBuildMeterMetaMsg;
  tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg;
S
slguan 已提交
3384
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
3385 3386 3387 3388

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
3389 3390 3391
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
3392 3393 3394 3395 3396 3397 3398 3399 3400 3401

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessMeterMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
S
slguan 已提交
3402
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
3403 3404

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
3405
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
3406
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
3407

H
hzcheng 已提交
3408
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
3409 3410 3411 3412 3413
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
3414

H
hzcheng 已提交
3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;

  tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg;
  tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg;
}