tscServer.c 117.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tcache.h"
#include "trpc.h"
S
slguan 已提交
19
#include "tscJoinProcess.h"
H
hzcheng 已提交
20
#include "tscProfile.h"
21
#include "tscSQLParser.h"
H
hzcheng 已提交
22 23 24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
S
slguan 已提交
26
#include "tscompression.h"
H
hzcheng 已提交
27 28 29 30 31 32 33
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
34 35
int        tsMasterIndex = 0;
int        tsSlaveIndex = 1;
H
hzcheng 已提交
36

S
slguan 已提交
37
SRpcIpSet  tscMgmtIpList;
S
slguan 已提交
38 39
SRpcIpSet  tscDnodeIpSet;

40 41
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
42
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
H
Hongze Cheng 已提交
43
char *doBuildMsgHeader(SSqlObj *pSql, char **pStart);
S
slguan 已提交
44
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
H
hzcheng 已提交
45 46
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
47 48 49
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
50

S
slguan 已提交
51
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
52

S
slguan 已提交
53 54
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
S
slguan 已提交
55
    tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
S
slguan 已提交
56
  } else {
S
slguan 已提交
57
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
S
slguan 已提交
58
      tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
S
slguan 已提交
59
    }
S
slguan 已提交
60 61 62
  }
}

S
slguan 已提交
63 64 65 66 67 68
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
  tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
  tscMgmtIpList.index = htons(pIpList->index);
  tscMgmtIpList.port = htons(pIpList->port);
  for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
    tscMgmtIpList.ip[i] = pIpList->ip[i];
S
slguan 已提交
69 70 71 72
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
73 74 75 76
  if (tscMgmtIpList.numOfIps != 1) {
    tscMgmtIpList.numOfIps = 1;
    tscMgmtIpList.index = 0;
    tscMgmtIpList.port = tsMgmtShellPort;
S
slguan 已提交
77 78 79 80 81 82
    tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
83
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
84 85 86 87 88 89 90 91
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
92 93 94
  }
}

H
hjxilinx 已提交
95 96 97 98 99 100 101 102 103 104 105 106
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
  return tscMgmtIpList.numOfIps * factor;
}

H
hzcheng 已提交
107 108 109 110 111 112 113 114 115 116 117 118
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
S
slguan 已提交
119
    SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
120
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
121
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
122

H
hzcheng 已提交
123 124 125
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
126 127
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
144 145 146
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
147
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
148
    
149 150 151 152
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
S
slguan 已提交
153 154 155 156 157
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
158 159 160 161
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
162
    tscAddSubqueryInfo(&pObj->pHb->cmd);
163

S
slguan 已提交
164
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
165 166 167
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
168
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
169 170 171 172 173 174 175 176 177
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
S
slguan 已提交
178 179
  char *pMsg = rpcMallocCont(pSql->cmd.payloadLen);
  if (NULL == pMsg) {
S
slguan 已提交
180 181
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
182 183
  }

S
slguan 已提交
184
  tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
S
slguan 已提交
185

S
slguan 已提交
186
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
S
slguan 已提交
187

S
slguan 已提交
188 189 190 191 192
  pSql->ipList->ip[0] = inet_addr("192.168.0.1");
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
    rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
  } else {
    rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
H
hzcheng 已提交
193 194
  }

S
slguan 已提交
195
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
196 197
}

S
slguan 已提交
198
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
S
slguan 已提交
199
  tscPrint("response:%d is received, pCont:%p, contLen:%d code:%d", type, pCont, contLen, code);
H
hzcheng 已提交
200
  SSqlObj *pSql = (SSqlObj *)ahandle;
S
slguan 已提交
201
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
202
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
203
    return;
H
hzcheng 已提交
204 205
  }

S
slguan 已提交
206 207 208 209
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  tscTrace("%p msg:%p is received from server", pSql, pCont);
H
hzcheng 已提交
210 211

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
212 213
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
214
    tscFreeSqlObj(pSql);
S
slguan 已提交
215 216
    rpcFreeCont(pCont);
    return;
H
hzcheng 已提交
217 218
  }

S
slguan 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
  if (pCont == NULL) {
    code = TSDB_CODE_NETWORK_UNAVAIL;
  } else {
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
    if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID ||
        code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE ||
        code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION ||
        code == TSDB_CODE_TABLE_ID_MISMATCH) {
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
        code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(pCont);
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
        code = TSDB_CODE_NOT_READY;
        rpcFreeCont(pCont);
        return;
      } else {
        tscTrace("%p it shall renew meter meta, code:%d", pSql, code);
H
hzcheng 已提交
247

S
slguan 已提交
248 249
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
        pSql->res.code = (uint8_t) code;  // keep the previous error code
S
slguan 已提交
250

S
slguan 已提交
251
        code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
252

S
slguan 已提交
253 254 255 256 257
        if (pMeterMetaInfo->pMeterMeta) {
          tscSendMsgToServer(pSql);
          rpcFreeCont(pCont);
          return;
        }
H
hzcheng 已提交
258 259
      }
    }
S
slguan 已提交
260
  }
H
hzcheng 已提交
261 262 263

  pSql->retry = 0;

S
slguan 已提交
264
  if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
H
hzcheng 已提交
265 266 267 268 269 270 271 272

  pRes->rspLen = 0;
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
    pRes->code = (code != TSDB_CODE_SUCCESS) ? code : TSDB_CODE_NETWORK_UNAVAIL;
  } else {
    tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
  }

S
slguan 已提交
273 274 275 276 277
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
    assert(type == pCmd->msgType + 1);
    pRes->code = (int8_t)code;
    pRes->rspType = type;
    pRes->rspLen = contLen;
H
hzcheng 已提交
278

S
slguan 已提交
279 280 281 282 283 284
    char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
    if (tmp == NULL) {
      pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    } else {
      pRes->pRsp = tmp;
      if (pRes->rspLen) {
S
slguan 已提交
285
        memcpy(pRes->pRsp, pCont, pRes->rspLen);
S
slguan 已提交
286 287 288 289
      }
    }

    // ignore the error information returned from mnode when set ignore flag in sql
H
hzcheng 已提交
290 291 292 293 294 295 296 297
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CREATE_DB_RSP) {
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
S
slguan 已提交
298
    if (type == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) {
H
hzcheng 已提交
299
      pRes->numOfRows += *(int32_t *)pRes->pRsp;
S
slguan 已提交
300 301 302 303 304

      tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
               *(int32_t *)pRes->pRsp, pRes->rspLen);
    } else {
      tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
H
hzcheng 已提交
305 306 307 308
    }
  }

  if (pSql->fp == NULL) {
S
slguan 已提交
309
    tsem_post(&pSql->rspSem);
H
hzcheng 已提交
310 311 312 313 314 315 316 317 318
  } else {
    if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
      code = (*tscProcessMsgRsp[pCmd->command])(pSql);

    if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
      int   command = pCmd->command;
      void *taosres = tscKeepConn[command] ? pSql : NULL;
      code = pRes->code ? -pRes->code : pRes->numOfRows;

S
slguan 已提交
319
      tscTrace("%p Async SQL result:%d res:%p", pSql, code, taosres);
H
hzcheng 已提交
320 321

      /*
S
slguan 已提交
322 323 324
       * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
       * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
       * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
H
hzcheng 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
       *
       * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
       * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
       */
      bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
      if (command == TSDB_SQL_INSERT) {  // handle multi-vnode insertion situation
        (*pSql->fp)(pSql, taosres, code);
      } else {
        (*pSql->fp)(pSql->param, taosres, code);
      }

      if (shouldFree) {
        // If it is failed, all objects allocated during execution taos_connect_a should be released
        if (command == TSDB_SQL_CONNECT) {
          taos_close(pObj);
          tscTrace("%p Async sql close failed connection", pSql);
        } else {
          tscFreeSqlObj(pSql);
          tscTrace("%p Async sql is automatically freed", pSql);
        }
      }
    }
  }

S
slguan 已提交
349
  rpcFreeCont(pCont);
H
hzcheng 已提交
350 351
}

S
slguan 已提交
352
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
353
static int      tscLaunchSTableSubqueries(SSqlObj *pSql);
H
hzcheng 已提交
354

S
slguan 已提交
355
// todo merge with callback
H
hjxilinx 已提交
356
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
357
  SSqlCmd *   pCmd = &pSql->cmd;
H
hjxilinx 已提交
358
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
359

S
slguan 已提交
360 361 362 363
  pSql->res.qhandle = 0x1;
  pSql->res.numOfRows = 0;

  if (pSql->pSubs == NULL) {
H
hjxilinx 已提交
364
    pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
S
slguan 已提交
365 366 367 368 369
    if (pSql->pSubs == NULL) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }
  }

H
hjxilinx 已提交
370
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
S
slguan 已提交
371 372 373
  if (pNew == NULL) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
374

S
slguan 已提交
375
  pSql->pSubs[pSql->numOfSubs++] = pNew;
H
hjxilinx 已提交
376
  assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
H
hzcheng 已提交
377

378 379
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
S
slguan 已提交
380 381

    // refactor as one method
382
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
383
    assert(pNewQueryInfo != NULL);
384

385 386
    tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
    tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
S
slguan 已提交
387

388
    tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid);
S
slguan 已提交
389

390
    tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
391
    tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
S
slguan 已提交
392 393

    pNew->cmd.numOfCols = 0;
394 395
    pNewQueryInfo->nAggTimeInterval = 0;
    memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
396

397 398
    // backup the data and clear it in the sqlcmd object
    pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
399
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
S
slguan 已提交
400 401

    // set the ts,tags that involved in join, as the output column of intermediate result
402
    tscClearSubqueryInfo(&pNew->cmd);
403

S
slguan 已提交
404
    SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
S
slguan 已提交
405 406
    SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};

407
    tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
S
slguan 已提交
408 409

    // set the tags value for ts_comp function
410
    SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
S
slguan 已提交
411

412
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0);
H
hjxilinx 已提交
413
    int16_t         tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
414 415 416 417 418 419 420 421

    pExpr->param->i64Key = tagColIndex;
    pExpr->numOfParams = 1;

    // add the filter tag column
    for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
      SColumnBase *pColBase = &pSupporter->colList.pColList[i];
      if (pColBase->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
422 423
        tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
        pNewQueryInfo->colList.numOfCols++;
S
slguan 已提交
424 425
      }
    }
426 427 428 429 430 431
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
             pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
H
hjxilinx 已提交
432
    tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
433 434 435 436 437 438 439
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
             pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
    tscPrintSelectClause(pNew, 0);
S
slguan 已提交
440
  } else {
441
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
442
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
S
slguan 已提交
443
  }
444

H
hjxilinx 已提交
445
#ifdef _DEBUG_VIEW
H
hjxilinx 已提交
446
  tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
447
#endif
448
  
S
slguan 已提交
449 450 451 452 453 454 455 456
  return tscProcessSql(pNew);
}

int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *asyncFp = pSql->fp;
457 458 459 460
  if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) {
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
461
  }
462 463 464

  int32_t code = tscSendMsgToServer(pSql);

S
slguan 已提交
465
  if (asyncFp) {
466
    if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
467 468 469 470 471 472
      pRes->code = code;
      tscQueueAsyncRes(pSql);
    }
    return 0;
  }

473
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
474 475 476 477 478 479
    pRes->code = code;
    return code;
  }

  tsem_wait(&pSql->rspSem);

480
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) (*tscProcessMsgRsp[pCmd->command])(pSql);
S
slguan 已提交
481 482 483 484 485 486 487

  tsem_post(&pSql->emptyRspSem);

  return pRes->code;
}

int tscProcessSql(SSqlObj *pSql) {
488 489 490
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
491 492
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
493 494 495
  SMeterMetaInfo *pMeterMetaInfo = NULL;
  int16_t         type = 0;

496 497 498 499 500
  if (pQueryInfo != NULL) {
    pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
    if (pMeterMetaInfo != NULL) {
      name = pMeterMetaInfo->name;
    }
501

502
    type = pQueryInfo->type;
503 504 505
  
    // for hearbeat, numOfTables == 0;
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
506
  }
507

508
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
509
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
H
hjxilinx 已提交
510 511 512 513 514
    // the pMeterMetaInfo cannot be NULL
    if (pMeterMetaInfo == NULL) {
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
515

S
slguan 已提交
516 517 518 519 520 521 522 523 524 525 526 527
    //TODO change the connect info in metadata
    return TSDB_CODE_OTHERS;
//    if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
//      pSql->index = pMeterMetaInfo->pMeterMeta->index;
//    } else {  // it must be the parent SSqlObj for super table query
//      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
//        int32_t idx = pMeterMetaInfo->vnodeIndex;
//
//        SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
//        pSql->index = pSidList->index;
//      }
//    }
H
hzcheng 已提交
528
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
S
slguan 已提交
529
    pSql->ipList = &tscMgmtIpList;
H
hzcheng 已提交
530 531 532 533
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
534
  // todo handle async situation
535 536
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
S
slguan 已提交
537
      SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
538

539
      pState->numOfTotal = pQueryInfo->numOfTables;
S
slguan 已提交
540

541
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
542 543 544 545
        SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);

        if (pSupporter == NULL) {  // failed to create support struct, abort current query
          tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
546
          pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
S
slguan 已提交
547 548 549 550 551
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          return pSql->res.code;
        }

H
hjxilinx 已提交
552
        int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
S
slguan 已提交
553 554 555 556 557 558 559 560
        if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
          tscDestroyJoinSupporter(pSupporter);
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          break;
        }
      }

S
slguan 已提交
561 562
      tsem_post(&pSql->emptyRspSem);
      tsem_wait(&pSql->rspSem);
S
slguan 已提交
563

S
slguan 已提交
564
      tsem_post(&pSql->emptyRspSem);
S
slguan 已提交
565 566 567 568 569 570 571 572 573 574

      if (pSql->numOfSubs <= 0) {
        pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      } else {
        pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
      }

      return TSDB_CODE_SUCCESS;
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
575
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
576 577 578 579
        return doProcessSql(pSql);
      }
    }
  }
H
hzcheng 已提交
580

581
  if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
582 583
    /*
     * (ref. line: 964)
H
hjxilinx 已提交
584
     * Before this function returns from tscLaunchSTableSubqueries and continues, pSql may have been released at user
H
hzcheng 已提交
585 586 587 588 589 590 591
     * program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack.
     *
     * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
     * which causes deadlock. So we keep it as local variable.
     */
    void *fp = pSql->fp;

H
hjxilinx 已提交
592
    if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
593 594 595 596
      return pRes->code;
    }

    if (fp == NULL) {
S
slguan 已提交
597 598 599
      tsem_post(&pSql->emptyRspSem);
      tsem_wait(&pSql->rspSem);
      tsem_post(&pSql->emptyRspSem);
H
hzcheng 已提交
600 601 602 603 604 605 606 607

      // set the command flag must be after the semaphore been correctly set.
      pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
    }

    return pSql->res.code;
  }

S
slguan 已提交
608 609
  return doProcessSql(pSql);
}
H
hzcheng 已提交
610

611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
  assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
  
    tfree(pSupport->localBuffer);
  
    pthread_mutex_unlock(&pSupport->queryMutex);
    pthread_mutex_destroy(&pSupport->queryMutex);
  
    tfree(pSupport);
  
    tscFreeSqlObj(pSub);
S
slguan 已提交
628
  }
629 630
  
  free(pState);
H
hzcheng 已提交
631 632
}

H
hjxilinx 已提交
633
int tscLaunchSTableSubqueries(SSqlObj *pSql) {
H
hzcheng 已提交
634
  SSqlRes *pRes = &pSql->res;
635
  SSqlCmd *pCmd = &pSql->cmd;
636

S
slguan 已提交
637
  // pRes->code check only serves in launching metric sub-queries
H
hzcheng 已提交
638
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
639 640
    pCmd->command = TSDB_SQL_RETRIEVE_METRIC;  // enable the abort of kill metric function.
    return pRes->code;
H
hzcheng 已提交
641 642 643 644
  }

  tExtMemBuffer **  pMemoryBuf = NULL;
  tOrderDescriptor *pDesc = NULL;
H
hjxilinx 已提交
645
  SColumnModel *       pModel = NULL;
H
hzcheng 已提交
646 647

  pRes->qhandle = 1;  // hack the qhandle check
648 649 650

  const uint32_t nBufferSize = (1 << 16);  // 64KB

651
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
652
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
653 654
  int32_t         numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes;
  assert(numOfSubQueries > 0);
H
hzcheng 已提交
655 656 657 658 659 660 661 662 663 664

  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
  if (ret != 0) {
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    if (pSql->fp) {
      tscQueueAsyncRes(pSql);
    }
    return pRes->code;
  }

665 666
  pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
  pSql->numOfSubs = numOfSubQueries;
667

668
  tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
S
slguan 已提交
669
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
670
  pState->numOfTotal = numOfSubQueries;
H
hzcheng 已提交
671 672
  pRes->code = TSDB_CODE_SUCCESS;

673 674 675 676 677
  int32_t i = 0;
  for (; i < numOfSubQueries; ++i) {
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
H
hzcheng 已提交
678 679
      break;
    }
680
    
H
hzcheng 已提交
681 682
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
S
slguan 已提交
683
    trs->pState = pState;
684
    
H
hzcheng 已提交
685
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
686 687 688 689 690 691
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs);
      break;
    }
    
H
hjxilinx 已提交
692
    trs->subqueryIndex = i;
H
hzcheng 已提交
693 694 695 696 697 698 699 700
    trs->pParentSqlObj = pSql;
    trs->pFinalColModel = pModel;

    pthread_mutexattr_t mutexattr = {0};
    pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(&trs->queryMutex, &mutexattr);
    pthread_mutexattr_destroy(&mutexattr);

701
    SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
S
slguan 已提交
702
    if (pNew == NULL) {
703 704 705
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs->localBuffer);
      tfree(trs);
S
slguan 已提交
706 707 708 709
      break;
    }

    // todo handle multi-vnode situation
710
    if (pQueryInfo->tsBuf) {
711
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
712
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
S
slguan 已提交
713
    }
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
    
    tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
  }
  
  if (i < numOfSubQueries) {
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
  
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;   // free all allocated resource
  }
  
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;
  }
  
  for(int32_t j = 0; j < numOfSubQueries; ++j) {
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
    tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
    tscProcessSql(pSub);
H
hzcheng 已提交
739 740 741 742 743 744 745 746 747 748 749
  }

  return TSDB_CODE_SUCCESS;
}

static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
  tscTrace("%p start to free subquery result", pSql);

  if (pSql->res.code == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
  }
S
slguan 已提交
750

H
hzcheng 已提交
751 752 753 754 755 756 757 758
  tfree(trsupport->localBuffer);

  pthread_mutex_unlock(&trsupport->queryMutex);
  pthread_mutex_destroy(&trsupport->queryMutex);

  tfree(trsupport);
}

S
slguan 已提交
759 760
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);

H
hzcheng 已提交
761
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
S
slguan 已提交
762 763 764 765 766 767 768 769 770
// set no disk space error info
#ifdef WINDOWS
  LPVOID lpMsgBuf;
  FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
                GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),  // Default language
                (LPTSTR)&lpMsgBuf, 0, NULL);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
  LocalFree(lpMsgBuf);
#else
H
hzcheng 已提交
771 772 773
  char buf[256] = {0};
  strerror_r(errno, buf, 256);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
S
slguan 已提交
774
#endif
H
hzcheng 已提交
775

S
slguan 已提交
776
  trsupport->pState->code = -errCode;
H
hzcheng 已提交
777 778 779 780
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

  pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
781
  tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
H
hzcheng 已提交
782 783 784 785
}

static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
  SSqlObj *pPObj = trsupport->pParentSqlObj;
H
hjxilinx 已提交
786
  int32_t  subqueryIndex = trsupport->subqueryIndex;
H
hzcheng 已提交
787 788

  assert(pSql != NULL);
789 790 791
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
         pPObj->numOfSubs == pState->numOfTotal);
H
hzcheng 已提交
792 793

  /* retrieved in subquery failed. OR query cancelled in retrieve phase. */
794 795
  if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
    pState->code = -(int)pPObj->res.code;
H
hzcheng 已提交
796 797 798 799 800 801 802 803

    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
     * Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
    tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
804
             subqueryIndex, pState->code);
H
hzcheng 已提交
805 806
  }

S
slguan 已提交
807
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
H
hjxilinx 已提交
808 809
    tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
810
        subqueryIndex, pState->code);
H
hzcheng 已提交
811
  } else {
812
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
813
      /*
S
slguan 已提交
814 815
       * current query failed, and the retry count is less than the available
       * count, retry query clear previous retrieved data, then launch a new sub query
H
hzcheng 已提交
816
       */
H
hjxilinx 已提交
817
      tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
H
hzcheng 已提交
818 819 820 821 822

      // clear local saved number of results
      trsupport->localBuffer->numOfElems = 0;
      pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
823
      tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
H
hjxilinx 已提交
824
               subqueryIndex, trsupport->numOfRetry);
S
slguan 已提交
825

826
      SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
S
slguan 已提交
827 828 829 830
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
                 trsupport->pParentSqlObj, pSql);

831
        pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
832 833 834
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
        return;
      }
H
hzcheng 已提交
835 836 837

      tscProcessSql(pNew);
      return;
S
slguan 已提交
838
    } else {  // reach the maximum retry count, abort
839
      atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
S
slguan 已提交
840
      tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
841
               numOfRows, subqueryIndex, pState->code);
H
hzcheng 已提交
842 843 844
    }
  }

H
Hongze Cheng 已提交
845 846
  int32_t numOfTotal = pState->numOfTotal;

847
  int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
848
  if (finished < numOfTotal) {
849
    tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
850 851 852 853
    return tscFreeSubSqlObj(trsupport, pSql);
  }

  // all subqueries are failed
H
hjxilinx 已提交
854
  tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
855
  pPObj->res.code = -(pState->code);
H
hzcheng 已提交
856 857 858

  // release allocated resource
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
859
                            pState->numOfTotal);
H
hzcheng 已提交
860

S
slguan 已提交
861
  tfree(trsupport->pState);
H
hzcheng 已提交
862 863
  tscFreeSubSqlObj(trsupport, pSql);

S
slguan 已提交
864
  // sync query, wait for the master SSqlObj to proceed
H
hzcheng 已提交
865 866
  if (pPObj->fp == NULL) {
    // sync query, wait for the master SSqlObj to proceed
S
slguan 已提交
867 868
    tsem_wait(&pPObj->emptyRspSem);
    tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
869

S
slguan 已提交
870
    tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
871 872 873

    pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
  } else {
S
slguan 已提交
874
    // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
875 876
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);

877
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
878 879 880 881 882
      (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
    } else {  // regular super table query
      if (pPObj->res.code != TSDB_CODE_SUCCESS) {
        tscQueueAsyncRes(pPObj);
      }
H
hzcheng 已提交
883 884 885 886 887 888
    }
  }
}

void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
889
  int32_t           idx = trsupport->subqueryIndex;
H
hzcheng 已提交
890 891 892 893
  SSqlObj *         pPObj = trsupport->pParentSqlObj;
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;

  SSqlObj *pSql = (SSqlObj *)tres;
894
  if (pSql == NULL) {  // sql object has been released in error process, return immediately
H
hzcheng 已提交
895 896 897 898
    tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
    return;
  }

899 900 901 902
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
      pPObj->numOfSubs == pState->numOfTotal);
  
H
hzcheng 已提交
903 904 905
  // query process and cancel query process may execute at the same time
  pthread_mutex_lock(&trsupport->queryMutex);

906
  if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
907 908 909
    return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
  }

910 911 912
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

913
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
914

S
slguan 已提交
915
  SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
H
hzcheng 已提交
916 917 918 919
  SVPeerDesc *   pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];

  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
920
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
H
hzcheng 已提交
921

S
slguan 已提交
922
    tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
923
             pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
924
    
H
hjxilinx 已提交
925
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
926 927 928 929 930 931
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
          pPObj, pSql, tsMaxNumOfOrderedResults, num);
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
      return;
    }
    
H
hzcheng 已提交
932 933 934 935

#ifdef _DEBUG_VIEW
    printf("received data from vnode: %d rows\n", pRes->numOfRows);
    SSrcColumnInfo colInfo[256] = {0};
936 937

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
938
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
H
hzcheng 已提交
939
#endif
S
slguan 已提交
940
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
941 942
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
943 944 945
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
946
    
S
slguan 已提交
947
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
948
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
949 950 951 952 953 954 955 956
    if (ret < 0) {
      // set no disk space error info, and abort retry
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    } else {
      pthread_mutex_unlock(&trsupport->queryMutex);
      taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
    }

S
slguan 已提交
957 958
  } else {  // all data has been retrieved to client
    /* data in from current vnode is stored in cache and disk */
H
hjxilinx 已提交
959
    uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
S
slguan 已提交
960 961
    tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
             pSvd->vnode, numOfRowsFromVnode, idx);
H
hzcheng 已提交
962

H
hjxilinx 已提交
963
    tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
H
hzcheng 已提交
964 965

#ifdef _DEBUG_VIEW
L
lihui 已提交
966
    printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
H
hzcheng 已提交
967
    SSrcColumnInfo colInfo[256] = {0};
968
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
969
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
H
hzcheng 已提交
970 971
                       trsupport->localBuffer->numOfElems, colInfo);
#endif
972
    
S
slguan 已提交
973
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
974 975
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
976 977 978
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
H
hzcheng 已提交
979 980 981

    // each result for a vnode is ordered as an independant list,
    // then used as an input of loser tree for disk-based merge routine
982 983
    int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
                                    pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
984 985 986 987
    if (ret != 0) {
      /* set no disk space error info, and abort retry */
      return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    }
988
  
H
Hongze Cheng 已提交
989 990
    // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
    // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
H
hjxilinx 已提交
991
    // In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
H
Hongze Cheng 已提交
992 993
    int32_t numOfTotal = pState->numOfTotal;

994
    int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
995
    if (finished < numOfTotal) {
996
      tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
997 998 999 1000
      return tscFreeSubSqlObj(trsupport, pSql);
    }

    // all sub-queries are returned, start to local merge process
H
hjxilinx 已提交
1001
    pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
H
hzcheng 已提交
1002

S
slguan 已提交
1003
    tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
H
Hongze Cheng 已提交
1004
             pState->numOfTotal, pState->numOfRetrievedRows);
1005
    
1006
    SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
1007
    tscClearInterpInfo(pPQueryInfo);
1008

1009
    tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
H
hzcheng 已提交
1010 1011 1012 1013 1014 1015 1016 1017
                          &pPObj->cmd, &pPObj->res);
    tscTrace("%p build loser tree completed", pPObj);

    pPObj->res.precision = pSql->res.precision;
    pPObj->res.numOfRows = 0;
    pPObj->res.row = 0;

    // only free once
1018 1019
    tfree(trsupport->pState);
    
H
hzcheng 已提交
1020 1021 1022
    tscFreeSubSqlObj(trsupport, pSql);

    if (pPObj->fp == NULL) {
S
slguan 已提交
1023 1024
      tsem_wait(&pPObj->emptyRspSem);
      tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
1025

S
slguan 已提交
1026
      tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
    } else {
      // set the command flag must be after the semaphore been correctly set.
      pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
      if (pPObj->res.code == TSDB_CODE_SUCCESS) {
        (*pPObj->fp)(pPObj->param, pPObj, 0);
      } else {
        tscQueueAsyncRes(pPObj);
      }
    }
  }
}

void tscKillMetricQuery(SSqlObj *pSql) {
1040 1041 1042 1043
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
1044 1045 1046 1047 1048 1049
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
1050
    if (pSub == NULL) {
H
hzcheng 已提交
1051 1052
      continue;
    }
S
slguan 已提交
1053

H
hzcheng 已提交
1054 1055 1056 1057 1058
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
1059
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
  }

  pSql->numOfSubs = 0;

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

S
slguan 已提交
1083
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
H
hzcheng 已提交
1084

S
slguan 已提交
1085
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
1086 1087 1088
  const int32_t table_index = 0;
  
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
S
slguan 已提交
1089
  if (pNew != NULL) {  // the sub query of two-stage super table query
1090
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1091
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
1092 1093
    
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
1094 1095

    // launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
1096
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
1097
    pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex;
1098

H
hjxilinx 已提交
1099
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
1100
  }
H
hzcheng 已提交
1101 1102 1103 1104

  return pNew;
}

S
slguan 已提交
1105
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
H
hzcheng 已提交
1106
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
1107 1108 1109 1110
  
  SSqlObj*  pParentSql = trsupport->pParentSqlObj;
  SSqlObj*  pSql = (SSqlObj *)tres;
  
1111
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1112 1113 1114
  assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
  
  int32_t idx = pMeterMetaInfo->vnodeIndex;
H
hzcheng 已提交
1115 1116

  SVnodeSidList *vnodeInfo = NULL;
S
slguan 已提交
1117 1118 1119 1120
  SVPeerDesc *   pSvd = NULL;
  if (pMeterMetaInfo->pMetricMeta != NULL) {
    vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
    pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
H
hzcheng 已提交
1121 1122
  }

1123 1124
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
H
hjxilinx 已提交
1125
         pParentSql->numOfSubs == pState->numOfTotal);
1126
  
H
hjxilinx 已提交
1127
  if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1128
    // metric query is killed, Note: code must be less than 0
H
hzcheng 已提交
1129
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
hjxilinx 已提交
1130 1131
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
      code = -(int)(pParentSql->res.code);
H
hzcheng 已提交
1132
    } else {
1133
      code = pState->code;
H
hzcheng 已提交
1134
    }
H
hjxilinx 已提交
1135
    tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
H
hjxilinx 已提交
1136
             trsupport->subqueryIndex, code);
H
hzcheng 已提交
1137 1138 1139
  }

  /*
S
slguan 已提交
1140
   * if a query on a vnode is failed, all retrieve operations from vnode that occurs later
H
hzcheng 已提交
1141 1142
   * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
   * function to abort current and remain retrieve process.
S
slguan 已提交
1143 1144
   *
   * NOTE: threadsafe is required.
H
hzcheng 已提交
1145
   */
S
slguan 已提交
1146
  if (code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1147
    if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
H
hjxilinx 已提交
1148
      tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
1149
      atomic_val_compare_exchange_32(&pState->code, 0, code);
H
hzcheng 已提交
1150
    } else {  // does not reach the maximum retry count, go on
H
hjxilinx 已提交
1151
      tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry);
S
slguan 已提交
1152

H
hjxilinx 已提交
1153
      SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
S
slguan 已提交
1154 1155
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
L
lihui 已提交
1156
                 trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
H
hzcheng 已提交
1157

1158
        pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1159 1160
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
      } else {
1161
        SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1162
        assert(pNewQueryInfo->pMeterInfo[0]->pMeterMeta != NULL && pNewQueryInfo->pMeterInfo[0]->pMetricMeta != NULL);
S
slguan 已提交
1163 1164 1165
        tscProcessSql(pNew);
        return;
      }
H
hzcheng 已提交
1166 1167 1168
    }
  }

1169
  if (pState->code != TSDB_CODE_SUCCESS) {  // failed, abort
H
hzcheng 已提交
1170
    if (vnodeInfo != NULL) {
H
hjxilinx 已提交
1171
      tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
H
hzcheng 已提交
1172
               vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
1173
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1174
    } else {
H
hjxilinx 已提交
1175
      tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
1176
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1177 1178
    }

1179
    tscRetrieveFromVnodeCallBack(param, tres, pState->code);
H
hzcheng 已提交
1180
  } else {  // success, proceed to retrieve data from dnode
L
lihui 已提交
1181 1182
    if (vnodeInfo != NULL) {
      tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
H
hzcheng 已提交
1183
             vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
H
hjxilinx 已提交
1184
             trsupport->subqueryIndex);
L
lihui 已提交
1185 1186 1187 1188
    } else {
      tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
             trsupport->subqueryIndex);
    }
H
hzcheng 已提交
1189 1190 1191 1192 1193

    taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
  }
}

1194
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1195 1196 1197 1198 1199 1200
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  *((uint64_t *)pMsg) = pSql->res.qhandle;
S
slguan 已提交
1201 1202
  pMsg += sizeof(pSql->res.qhandle);

1203
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
1204 1205
  *((uint16_t *)pMsg) = htons(pQueryInfo->type);
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
1206

1207
  pSql->cmd.payloadLen = pMsg - pStart;
S
slguan 已提交
1208
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
H
hzcheng 已提交
1209

1210
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1211 1212
}

S
slguan 已提交
1213
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
H
hzcheng 已提交
1214 1215
  SShellSubmitMsg *pShellMsg;
  char *           pMsg;
1216
  SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
S
slguan 已提交
1217 1218

  SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1219

S
slguan 已提交
1220
  pMsg = buf + tsRpcHeadSize;
H
hzcheng 已提交
1221

S
slguan 已提交
1222 1223
  //TODO set iplist
  //pShellMsg = (SShellSubmitMsg *)pMsg;
S
slguan 已提交
1224 1225 1226
  //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
  //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip),
  //         htons(pShellMsg->vnode));
H
hzcheng 已提交
1227 1228
}

1229
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1230 1231
  SShellSubmitMsg *pShellMsg;
  char *           pMsg, *pStart;
S
slguan 已提交
1232

1233 1234 1235 1236
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);

  SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1237 1238 1239 1240 1241

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  pShellMsg = (SShellSubmitMsg *)pMsg;
1242 1243

  pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
S
slguan 已提交
1244
  //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
1245
  pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
1246

S
slguan 已提交
1247
  // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
S
slguan 已提交
1248
  pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT;
S
slguan 已提交
1249 1250
  //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
  //         htons(pShellMsg->vnode));
1251 1252
  
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1253 1254
}

S
slguan 已提交
1255
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
S
slguan 已提交
1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
  //TODO
//  SSqlCmd *       pCmd = &pSql->cmd;
//  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
//
//  char *          pStart = buf + tsRpcHeadSize;
//  SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;
//
//  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // pColumnModel == NULL, query on meter
//    SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
//    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
//  } else {  // query on metric
//    SMetricMeta *  pMetricMeta = pMeterMetaInfo->pMetricMeta;
//    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
//    pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
//  }
H
hzcheng 已提交
1271 1272 1273 1274 1275 1276
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
1277
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
1278
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
1279
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
1280

1281
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
1282

1283
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->fieldsInfo.numOfOutputCols;
1284
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1285 1286

  // meter query without tags values
1287
  if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
H
hzcheng 已提交
1288 1289 1290
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryMeterMsg) + srcColListSize + exprSize;
  }

S
slguan 已提交
1291
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hzcheng 已提交
1292

H
hjxilinx 已提交
1293
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1294 1295

  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids;
1296
  int32_t outputColumnSize = pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
1297

S
slguan 已提交
1298
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
1299 1300
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
1301 1302 1303
  }

  return size;
H
hzcheng 已提交
1304 1305
}

1306
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfMeters, int32_t vnodeId, char *pMsg) {
1307
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
1308

1309 1310 1311 1312 1313 1314
  SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;

  tscTrace("%p vid:%d, query on %d meters", pSql, htons(vnodeId), numOfMeters);
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
L
lihui 已提交
1315
    tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pMeterMetaInfo->pMeterMeta->sid, pMeterMetaInfo->pMeterMeta->uid);
1316 1317 1318 1319
#endif
    SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
    pMeterInfo->sid = htonl(pMeterMeta->sid);
    pMeterInfo->uid = htobe64(pMeterMeta->uid);
1320
    pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pMeterMeta->uid));
1321 1322 1323
    pMsg += sizeof(SMeterSidExtInfo);
  } else {
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
1324

1325 1326 1327
    for (int32_t i = 0; i < numOfMeters; ++i) {
      SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
      SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
1328

1329 1330
      pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
      pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
1331
      pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
1332 1333
      
      pMsg += sizeof(SMeterSidExtInfo);
1334

1335 1336 1337 1338
      memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
      pMsg += pMetricMeta->tagLen;

#ifdef _DEBUG_VIEW
L
lihui 已提交
1339
      tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
1340 1341 1342
#endif
    }
  }
1343

1344 1345 1346
  return pMsg;
}

1347
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1348 1349
  SSqlCmd *pCmd = &pSql->cmd;

1350
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1351

S
slguan 已提交
1352 1353 1354 1355 1356
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }

1357
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1358
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
1359
  
S
slguan 已提交
1360
  char *          pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
1361

S
slguan 已提交
1362 1363
  SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hzcheng 已提交
1364 1365 1366 1367 1368 1369

  SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;

  int32_t msgLen = 0;
  int32_t numOfMeters = 0;

S
slguan 已提交
1370
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
H
hzcheng 已提交
1371 1372 1373
    numOfMeters = 1;

    tscTrace("%p query on vnode: %d, number of sid:%d, meter id: %s", pSql,
S
slguan 已提交
1374
             pMeterMeta->vpeerDesc[pMeterMeta->index].vnode, 1, pMeterMetaInfo->name);
H
hzcheng 已提交
1375 1376 1377 1378

    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
    pQueryMsg->uid = pMeterMeta->uid;
    pQueryMsg->numOfTagsCols = 0;
1379
  } else {  // query on super table
H
hjxilinx 已提交
1380 1381
    if (pMeterMetaInfo->vnodeIndex < 0) {
      tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1382 1383 1384
      return -1;
    }

H
hjxilinx 已提交
1385
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

    numOfMeters = pVnodeSidList->numOfSids;
    if (numOfMeters <= 0) {
      tscError("%p vid:%d,error numOfMeters in query message:%d", pSql, vnodeId, numOfMeters);
      return -1;  // error
    }

    tscTrace("%p query on vid:%d, number of sid:%d", pSql, vnodeId, numOfMeters);
    pQueryMsg->vnode = htons(vnodeId);
  }

  pQueryMsg->numOfSids = htonl(numOfMeters);
S
slguan 已提交
1399
  pQueryMsg->numOfTagsCols = htons(pMeterMetaInfo->numOfTags);
H
hzcheng 已提交
1400

1401 1402 1403
  if (pQueryInfo->order.order == TSQL_SO_ASC) {
    pQueryMsg->skey = htobe64(pQueryInfo->stime);
    pQueryMsg->ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
1404
  } else {
1405 1406
    pQueryMsg->skey = htobe64(pQueryInfo->etime);
    pQueryMsg->ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
1407 1408
  }

1409 1410
  pQueryMsg->order = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
H
hzcheng 已提交
1411

1412
  pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
H
hzcheng 已提交
1413

1414 1415
  pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
H
hzcheng 已提交
1416

1417
  pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
H
hzcheng 已提交
1418

1419
  if (pQueryInfo->colList.numOfCols <= 0) {
H
hzcheng 已提交
1420 1421 1422 1423 1424 1425 1426 1427 1428
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, pMeterMeta->numOfColumns);
    return -1;
  }

  if (pMeterMeta->numOfTags < 0) {
    tscError("%p illegal value of numOfTagsCols in query msg: %d", pSql, pMeterMeta->numOfTags);
    return -1;
  }

1429 1430
  pQueryMsg->nAggTimeInterval = htobe64(pQueryInfo->nAggTimeInterval);
  pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
1431 1432
  pQueryMsg->slidingTime = htobe64(pQueryInfo->nSlidingTime);
  
1433 1434
  if (pQueryInfo->nAggTimeInterval < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->nAggTimeInterval);
H
hzcheng 已提交
1435 1436 1437
    return -1;
  }

1438 1439
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1440 1441 1442
    return -1;
  }

1443
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1444 1445

  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // query on meter
H
hzcheng 已提交
1446 1447 1448 1449 1450
    pQueryMsg->tagLength = 0;
  } else {  // query on metric
    pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
  }

1451 1452
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
1453

1454
  if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
1455 1456
    tscError("%p illegal value of number of output columns in query msg: %d", pSql,
             pQueryInfo->fieldsInfo.numOfOutputCols);
H
hzcheng 已提交
1457 1458 1459 1460
    return -1;
  }

  // set column list ids
1461
  char *   pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
S
slguan 已提交
1462
  SSchema *pSchema = tsGetSchema(pMeterMeta);
H
hzcheng 已提交
1463

1464 1465
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
1466
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
1467

S
slguan 已提交
1468
    if (pCol->colIndex.columnIndex >= pMeterMeta->numOfColumns || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
H
hzcheng 已提交
1469
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
S
slguan 已提交
1470 1471
      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
               htons(pQueryMsg->vnode), pMeterMeta->sid, pMeterMetaInfo->name, pMeterMeta->numOfColumns, pCol->colIndex,
H
hzcheng 已提交
1472 1473
               pColSchema->name);

S
slguan 已提交
1474
      return -1;  // 0 means build msg failed
H
hzcheng 已提交
1475 1476 1477 1478 1479
    }

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
    pQueryMsg->colList[i].type = htons(pColSchema->type);
S
slguan 已提交
1480
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
1481

S
slguan 已提交
1482 1483 1484
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
1485

S
slguan 已提交
1486 1487 1488 1489
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
1490

S
slguan 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
1502

S
slguan 已提交
1503 1504 1505 1506 1507
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
1508 1509 1510 1511
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
1512
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hzcheng 已提交
1513

1514 1515
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
1516

S
slguan 已提交
1517
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
1518 1519 1520
      hasArithmeticFunction = true;
    }

1521
    if (!tscValidateColumnId(pMeterMetaInfo, pExpr->colInfo.colId)) {
H
hzcheng 已提交
1522 1523 1524 1525 1526 1527 1528
      /* column id is not valid according to the cached metermeta, the meter meta is expired */
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

    pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
S
slguan 已提交
1529
    pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
1530

S
slguan 已提交
1531
    pSqlFuncExpr->functionId = htons(pExpr->functionId);
H
hzcheng 已提交
1532 1533 1534 1535 1536 1537 1538 1539 1540
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
1541 1542 1543

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
1544 1545 1546 1547 1548 1549 1550 1551 1552 1553
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
1554 1555
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
1556
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

1568 1569
  // serialize the table info (sid, uid, tags)
  pMsg = doSerializeTableInfo(pSql, numOfMeters, htons(pQueryMsg->vnode), pMsg);
H
hzcheng 已提交
1570

S
slguan 已提交
1571 1572 1573
  // only include the required tag column schema. If a tag is not required, it won't be sent to vnode
  if (pMeterMetaInfo->numOfTags > 0) {
    // always transfer tag schema to vnode if exists
S
slguan 已提交
1574
    SSchema *pTagSchema = tsGetTagSchema(pMeterMeta);
H
hzcheng 已提交
1575

S
slguan 已提交
1576 1577
    for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
      if (pMeterMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
S
slguan 已提交
1578
        SSchema tbSchema = {
S
slguan 已提交
1579
            .bytes = TSDB_METER_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
S
slguan 已提交
1580
        memcpy(pMsg, &tbSchema, sizeof(SSchema));
H
hzcheng 已提交
1581
      } else {
S
slguan 已提交
1582
        memcpy(pMsg, &pTagSchema[pMeterMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
H
hzcheng 已提交
1583 1584
      }

S
slguan 已提交
1585
      pMsg += sizeof(SSchema);
H
hzcheng 已提交
1586 1587 1588
    }
  }

1589
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1590 1591
  if (pGroupbyExpr->numOfGroupCols != 0) {
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
1592 1593
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
      SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

      *((int16_t *)pMsg) += pCol->colIdx;
      pMsg += sizeof(pCol->colIdx);

      *((int16_t *)pMsg) += pCol->colIdxInBuf;
      pMsg += sizeof(pCol->colIdxInBuf);

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
    }
  }

1611 1612 1613 1614
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
1615 1616 1617 1618 1619 1620 1621 1622
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

1623 1624 1625
  if (pQueryInfo->tsBuf != NULL) {
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pMeterMetaInfo->vnodeIndex);
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
1626 1627

    // todo refactor
1628 1629
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
1630 1631 1632 1633

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
1634 1635
  }

S
slguan 已提交
1636 1637
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
1638 1639
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
1640 1641 1642 1643 1644 1645
  }

  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1646
  pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_QUERY;
H
hzcheng 已提交
1647 1648

  assert(msgLen + minMsgSize() <= size);
1649 1650

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1651 1652
}

1653
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1654
  SCreateDbMsg *pCreateDbMsg;
H
hzcheng 已提交
1655
  char *        pMsg, *pStart;
S
slguan 已提交
1656

1657
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1658

1659
  pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1660
  pCreateDbMsg = (SCreateDbMsg *)pMsg;
1661

1662 1663 1664
  assert(pCmd->numOfClause == 1);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
  
L
lihui 已提交
1665
  strncpy(pCreateDbMsg->db, pMeterMetaInfo->name, tListLen(pCreateDbMsg->db));
S
slguan 已提交
1666
  pMsg += sizeof(SCreateDbMsg);
H
hzcheng 已提交
1667

1668
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
1669 1670
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DB;

1671
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1672 1673
}

1674
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1675
  SCreateDnodeMsg *pCreate;
H
hzcheng 已提交
1676

1677
  char *pMsg, *pStart;
S
slguan 已提交
1678

1679
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1680

1681
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
1682

S
slguan 已提交
1683
  pCreate = (SCreateDnodeMsg *)pMsg;
1684
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
H
hzcheng 已提交
1685

S
slguan 已提交
1686
  pMsg += sizeof(SCreateDnodeMsg);
H
hzcheng 已提交
1687

1688 1689
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DNODE;
H
hzcheng 已提交
1690

1691
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1692 1693
}

1694 1695
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1696
  pCmd->payloadLen = sizeof(SCreateAcctMsg);
S
slguan 已提交
1697 1698 1699 1700
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1701

S
slguan 已提交
1702
  SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
1703

1704 1705
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1706

1707 1708
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1709

1710
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1711

1712 1713 1714 1715 1716 1717 1718 1719
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1720

1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1734

1735 1736
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_ACCT;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1737 1738
}

1739
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1740
  SCreateUserMsg *pAlterMsg;
1741
  char *         pMsg, *pStart;
H
hzcheng 已提交
1742

1743
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1744

1745
  pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1746
  pAlterMsg = (SCreateUserMsg *)pMsg;
H
hzcheng 已提交
1747

1748 1749
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
H
hjxilinx 已提交
1750
  
1751
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
1752

1753 1754 1755 1756
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1757 1758
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1759
  }
H
hzcheng 已提交
1760

S
slguan 已提交
1761
  pMsg += sizeof(SCreateUserMsg);
1762
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
1763

1764 1765 1766 1767 1768
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pCmd->msgType = TSDB_MSG_TYPE_ALTER_USER;
  } else {
    pCmd->msgType = TSDB_MSG_TYPE_CREATE_USER;
  }
H
hzcheng 已提交
1769

1770
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1771 1772
}

1773 1774 1775
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  char *   pStart = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1776

1777
  char *pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1778
  pMsg += sizeof(SCfgDnodeMsg);
H
hzcheng 已提交
1779

1780
  pCmd->payloadLen = pMsg - pStart;
S
slguan 已提交
1781
  pCmd->msgType = TSDB_MSG_TYPE_DNODE_CFG;
H
hzcheng 已提交
1782

1783 1784
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1785

1786 1787 1788 1789 1790 1791
char *doBuildMsgHeader(SSqlObj *pSql, char **pStart) {
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  char *pMsg = pCmd->payload + tsRpcHeadSize;
  *pStart = pMsg;
H
hzcheng 已提交
1792 1793 1794 1795

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);

1796
  pMsg += sizeof(SMgmtHead);
H
hzcheng 已提交
1797

1798
  return pMsg;
H
hzcheng 已提交
1799 1800
}

1801
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1802
  SDropDbMsg *pDropDbMsg;
1803
  char *      pMsg, *pStart;
S
slguan 已提交
1804

1805
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1806

1807
  pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1808
  pDropDbMsg = (SDropDbMsg *)pMsg;
H
hzcheng 已提交
1809

1810
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
1811 1812
  strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db));
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1813

S
slguan 已提交
1814
  pMsg += sizeof(SDropDbMsg);
H
hzcheng 已提交
1815

1816 1817
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DB;
H
hzcheng 已提交
1818

1819
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1820 1821
}

1822 1823 1824 1825
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropTableMsg *pDropTableMsg;
  char *         pMsg, *pStart;
  int            msgLen = 0;
H
hzcheng 已提交
1826

1827
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1828

L
lihui 已提交
1829 1830 1831 1832 1833 1834 1835 1836 1837 1838
  //pMsg = doBuildMsgHeader(pSql, &pStart);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);

  pMsg   = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
  pMsg += sizeof(SMgmtHead);

1839
  pDropTableMsg = (SDropTableMsg *)pMsg;
H
hzcheng 已提交
1840

S
slguan 已提交
1841
  strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name);
H
hzcheng 已提交
1842

1843 1844
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
  pMsg += sizeof(SDropTableMsg);
H
hzcheng 已提交
1845 1846 1847

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
1848
  pCmd->msgType = TSDB_MSG_TYPE_DROP_TABLE;
H
hzcheng 已提交
1849

1850
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1851 1852
}

1853
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1854
  SDropDnodeMsg *pDrop;
1855
  char *         pMsg, *pStart;
H
hzcheng 已提交
1856

S
slguan 已提交
1857
  SSqlCmd *       pCmd = &pSql->cmd;
1858
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1859

1860
  pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1861
  pDrop = (SDropDnodeMsg *)pMsg;
H
hzcheng 已提交
1862

1863
  strcpy(pDrop->ip, pMeterMetaInfo->name);
H
hzcheng 已提交
1864

S
slguan 已提交
1865
  pMsg += sizeof(SDropDnodeMsg);
H
hzcheng 已提交
1866

1867 1868
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DNODE;
H
hzcheng 已提交
1869

1870
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1871 1872
}

1873
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1874
  SDropUserMsg *pDropMsg;
H
hzcheng 已提交
1875 1876
  char *        pMsg, *pStart;

1877
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1878

1879
  pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1880
  pDropMsg = (SDropUserMsg *)pMsg;
H
hzcheng 已提交
1881

1882
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1883
  strcpy(pDropMsg->user, pMeterMetaInfo->name);
H
hzcheng 已提交
1884

S
slguan 已提交
1885
  pMsg += sizeof(SDropUserMsg);
H
hzcheng 已提交
1886

1887 1888
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
H
hzcheng 已提交
1889

1890
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1891 1892
}

1893
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1894
  SUseDbMsg *pUseDbMsg;
H
hzcheng 已提交
1895 1896
  char *     pMsg, *pStart;

1897
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1898

1899
  pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1900
  pUseDbMsg = (SUseDbMsg *)pMsg;
1901

1902
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1903
  strcpy(pUseDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
1904

S
slguan 已提交
1905
  pMsg += sizeof(SUseDbMsg);
H
hzcheng 已提交
1906

1907
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
1908 1909
  pCmd->msgType = TSDB_MSG_TYPE_USE_DB;

1910
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1911 1912
}

1913
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1914 1915 1916 1917 1918 1919 1920
  SShowMsg *pShowMsg;
  char *    pMsg, *pStart;
  int       msgLen = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

S
slguan 已提交
1921
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE;
S
slguan 已提交
1922 1923 1924 1925
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for show msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1926 1927 1928 1929 1930

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
1931

1932
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1933 1934 1935
  size_t          nameLen = strlen(pMeterMetaInfo->name);

  if (nameLen > 0) {
1936
    strcpy(pMgmt->db, pMeterMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1937 1938 1939 1940 1941 1942 1943
  } else {
    strcpy(pMgmt->db, pObj->db);
  }

  pMsg += sizeof(SMgmtHead);

  pShowMsg = (SShowMsg *)pMsg;
1944
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
H
hzcheng 已提交
1945

1946 1947 1948 1949 1950 1951 1952 1953
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
S
slguan 已提交
1954
    pMsg += (sizeof(SShowMsg) + pPattern->n);
1955 1956 1957
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1958

1959 1960
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
H
hzcheng 已提交
1961

S
slguan 已提交
1962
    pMsg += (sizeof(SShowMsg) + pIpAddr->n);
1963 1964 1965
  }

  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
1966 1967 1968
  pCmd->msgType = TSDB_MSG_TYPE_SHOW;

  assert(msgLen + minMsgSize() <= size);
1969 1970

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1971 1972
}

1973
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1974
  SKillQueryMsg *pKill;
H
hzcheng 已提交
1975 1976 1977 1978
  char *      pMsg, *pStart;

  SSqlCmd *pCmd = &pSql->cmd;

1979
  pMsg = doBuildMsgHeader(pSql, &pStart);
S
slguan 已提交
1980
  pKill = (SKillQueryMsg *)pMsg;
H
hzcheng 已提交
1981

1982
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
H
hzcheng 已提交
1983

S
slguan 已提交
1984
  pMsg += sizeof(SKillQueryMsg);
H
hzcheng 已提交
1985

1986
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
1987

1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_QUERY;
      break;
    case TSDB_SQL_KILL_CONNECTION:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_CONNECTION;
      break;
    case TSDB_SQL_KILL_STREAM:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_STREAM;
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2000 2001
}

2002
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2003 2004
  SSqlCmd *pCmd = &(pSql->cmd);

S
slguan 已提交
2005
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg);
H
hzcheng 已提交
2006

2007
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
2008
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
2009 2010
    size += sizeof(STagData);
  } else {
S
slguan 已提交
2011
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
2012
  }
2013

2014 2015 2016
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
2017 2018 2019 2020

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2021
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2022
  SCreateTableMsg *pCreateTableMsg;
H
hzcheng 已提交
2023 2024
  char *           pMsg, *pStart;
  int              msgLen = 0;
S
slguan 已提交
2025
  SSchema *        pSchema;
H
hzcheng 已提交
2026 2027
  int              size = 0;

2028 2029 2030
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2031
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2032 2033

  // Reallocate the payload size
2034
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
2035 2036
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
2037
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
2038
  }
H
hzcheng 已提交
2039 2040 2041 2042 2043

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
2044 2045

  // use dbinfo from table id without modifying current db info
S
slguan 已提交
2046
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2047 2048 2049

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
2050
  pCreateTableMsg = (SCreateTableMsg *)pMsg;
2051
  strcpy(pCreateTableMsg->tableId, pMeterMetaInfo->name);
H
hzcheng 已提交
2052

2053 2054 2055 2056
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;

H
hzcheng 已提交
2057 2058 2059 2060
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
2061
  pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
2062

2063 2064 2065
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
2066
    pMsg += sizeof(STagData);
2067
  } else {  // create (super) table
H
hzcheng 已提交
2068
    pSchema = pCreateTableMsg->schema;
2069

H
hzcheng 已提交
2070
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
2071
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2072 2073 2074 2075

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
2076

H
hzcheng 已提交
2077 2078 2079 2080
      pSchema++;
    }

    pMsg = (char *)pSchema;
2081 2082
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
2083

2084 2085 2086
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
2087 2088 2089
    }
  }

2090
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
2091 2092 2093 2094 2095 2096

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_TABLE;

  assert(msgLen + minMsgSize() <= size);
2097
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2098 2099 2100
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
2101
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
slguan 已提交
2102
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
2103 2104 2105
         TSDB_EXTRA_PAYLOAD_SIZE;
}

2106
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2107 2108 2109 2110 2111
  SAlterTableMsg *pAlterTableMsg;
  char *          pMsg, *pStart;
  int             msgLen = 0;
  int             size = 0;

2112 2113 2114
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

2115
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2116 2117

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
2118 2119 2120 2121
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2122 2123 2124 2125 2126

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2127
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2128 2129
  pMsg += sizeof(SMgmtHead);

2130 2131
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hzcheng 已提交
2132
  pAlterTableMsg = (SAlterTableMsg *)pMsg;
2133
  strcpy(pAlterTableMsg->tableId, pMeterMetaInfo->name);
2134
  pAlterTableMsg->type = htons(pAlterInfo->type);
2135

2136
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
2137
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
2138

S
slguan 已提交
2139
  SSchema *pSchema = pAlterTableMsg->schema;
2140 2141
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_TABLE;

  assert(msgLen + minMsgSize() <= size);
2156

2157
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2158 2159
}

2160
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2161
  SAlterDbMsg *pAlterDbMsg;
H
hzcheng 已提交
2162 2163 2164
  char *       pMsg, *pStart;
  int          msgLen = 0;

S
slguan 已提交
2165 2166
  SSqlCmd *       pCmd = &pSql->cmd;
  STscObj *       pObj = pSql->pTscObj;
2167
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2168

H
hzcheng 已提交
2169 2170 2171 2172 2173 2174 2175
  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
2176
  pAlterDbMsg = (SAlterDbMsg *)pMsg;
S
slguan 已提交
2177
  strcpy(pAlterDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2178

S
slguan 已提交
2179
  pMsg += sizeof(SAlterDbMsg);
H
hzcheng 已提交
2180 2181 2182 2183 2184

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_DB;

2185
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2186 2187
}

2188
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2189 2190 2191 2192 2193 2194 2195 2196 2197
  char *pMsg, *pStart;
  int   msgLen = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2198

2199
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2200
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2201 2202 2203 2204
  size_t          nameLen = strlen(pMeterMetaInfo->name);

  if (nameLen > 0) {
    strcpy(pMgmt->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2205 2206 2207
  } else {
    strcpy(pMgmt->db, pObj->db);
  }
S
slguan 已提交
2208

H
hzcheng 已提交
2209 2210
  pMsg += sizeof(SMgmtHead);

2211
  *((uint64_t *)pMsg) = pSql->res.qhandle;
S
slguan 已提交
2212 2213
  pMsg += sizeof(pSql->res.qhandle);

2214 2215
  *((uint16_t *)pMsg) = htons(pQueryInfo->type);
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
2216 2217 2218

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2219
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
H
hzcheng 已提交
2220

2221
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2222 2223
}

2224
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
2225
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
2226 2227 2228
    return pRes->code;
  }

2229 2230 2231
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hzcheng 已提交
2232 2233

    pRes->bytes[i] = pField->bytes;
2234 2235 2236 2237
//    if (pQueryInfo->order.order == TSQL_SO_DESC) {
//      pRes->bytes[i] = -pRes->bytes[i];
//      pRes->tsrow[i] = ((pRes->data + offset * pRes->numOfRows) + (pRes->numOfRows - 1) * pField->bytes);
//    } else {
H
hzcheng 已提交
2238
      pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
2239
//    }
H
hzcheng 已提交
2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
2254

2255
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2256

H
hzcheng 已提交
2257 2258 2259 2260 2261 2262 2263
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

2264
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2265
  } else {
S
slguan 已提交
2266
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
2282
  SSqlCmd *       pCmd = &pSql->cmd;
2283
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2284 2285

  int32_t numOfRes = pMeterMetaInfo->pMeterMeta->numOfColumns + pMeterMetaInfo->pMeterMeta->numOfTags;
H
hzcheng 已提交
2286 2287 2288 2289 2290

  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
2291 2292
  SSqlCmd *pCmd = &pSql->cmd;

2293
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2294
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2295 2296

  int32_t numOfRes = 0;
2297
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
S
slguan 已提交
2298
    numOfRes = pMeterMetaInfo->pMetricMeta->numOfMeters;
H
hzcheng 已提交
2299 2300 2301 2302 2303 2304 2305 2306 2307 2308
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2309 2310
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
2311 2312

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
2313
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2314 2315 2316 2317
  }

  pRes->row = 0;

2318
  uint8_t code = pRes->code;
H
hzcheng 已提交
2319
  if (pSql->fp) {  // async retrieve metric data
2320 2321
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
2322 2323 2324 2325 2326 2327 2328 2329
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
2330
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
2331

2332
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2333
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
2334 2335
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_CONNECT;
S
slguan 已提交
2336
  pCmd->payloadLen = sizeof(SConnectMsg);
H
hzcheng 已提交
2337

S
slguan 已提交
2338 2339 2340 2341 2342
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

S
slguan 已提交
2343
  SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
H
hzcheng 已提交
2344 2345 2346 2347 2348

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
2349
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
2350
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
2351

2352
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2353 2354
}

2355
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2356
  STableInfoMsg *pInfoMsg;
H
hzcheng 已提交
2357 2358 2359 2360 2361 2362
  char *         pMsg, *pStart;
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
2363 2364 2365 2366
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
2367 2368 2369 2370
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

2371 2372 2373
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2374
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2375

H
hzcheng 已提交
2376 2377 2378 2379
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2380
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2381 2382 2383

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
2384 2385
  pInfoMsg = (STableInfoMsg *)pMsg;
  strcpy(pInfoMsg->tableId, pMeterMetaInfo->name);
2386
  pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
S
slguan 已提交
2387
  pMsg += sizeof(STableInfoMsg);
H
hzcheng 已提交
2388

2389
  if (pSql->cmd.createOnDemand) {
H
hzcheng 已提交
2390 2391 2392 2393 2394 2395
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2396
  pCmd->msgType = TSDB_MSG_TYPE_TABLE_META;
H
hzcheng 已提交
2397 2398 2399 2400

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
2401
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2402 2403
}

S
slguan 已提交
2404 2405
/**
 *  multi meter meta req pkg format:
S
slguan 已提交
2406
 *  | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
2407 2408
 *      no used         4B
 **/
2409
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
2422
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
2423

S
slguan 已提交
2424 2425
  SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
2426 2427

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
2428
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
2429 2430 2431 2432
  }

  tfree(tmpData);

S
slguan 已提交
2433
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
S
slguan 已提交
2434
  pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META;
S
slguan 已提交
2435 2436 2437 2438 2439 2440 2441 2442 2443

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

  tscTrace("%p build load multi-metermeta msg completed, numOfMeters:%d, msg size:%d", pSql, pCmd->count,
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hzcheng 已提交
2444 2445
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
  const int32_t defaultSize =
S
slguan 已提交
2446
      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
2447
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
2448

S
slguan 已提交
2449
  int32_t n = 0;
2450 2451
  for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
2452
  }
S
slguan 已提交
2453

H
hjxilinx 已提交
2454
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
2455 2456
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
2457
  }
2458

S
slguan 已提交
2459
  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
2460
  int32_t elemSize = sizeof(SMetricMetaElemMsg) * pQueryInfo->numOfTables;
S
slguan 已提交
2461 2462 2463 2464

  int32_t len = tagLen + joinCondLen + elemSize + defaultSize;

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
2465 2466
}

2467
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2468
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
2469 2470
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
2471
  int             tableIndex = 0;
H
hzcheng 已提交
2472

2473 2474 2475
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2476
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
2477

2478
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2479 2480

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
2481 2482 2483 2484
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2485 2486 2487 2488 2489

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2490
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2491 2492 2493

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
2494
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
2495
  pMetaMsg->numOfMeters = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
2496

S
slguan 已提交
2497
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
2498 2499 2500 2501 2502

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
2503
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
2504

S
slguan 已提交
2505
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
2506
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
2507

S
slguan 已提交
2508 2509 2510
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
2511
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
2512
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
2513 2514 2515 2516

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

2517
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
2518
    pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, i);
S
slguan 已提交
2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531
    uint64_t uid = pMeterMetaInfo->pMeterMeta->uid;

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

    SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)pMsg;
    pMsg += sizeof(SMetricMetaElemMsg);

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
      SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
      if (pCond != NULL) {
H
hjxilinx 已提交
2532
        condLen = strlen(pCond->cond) + 1;
2533

H
hjxilinx 已提交
2534
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
2535 2536 2537 2538 2539
        if (!ret) {
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
          return 0;
        }
      }
H
hzcheng 已提交
2540 2541
    }

S
slguan 已提交
2542
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
2543

S
slguan 已提交
2544 2545 2546
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
2547

S
slguan 已提交
2548 2549 2550
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
2551

S
slguan 已提交
2552
      pElem->tableCond = htonl(offset);
2553

H
hjxilinx 已提交
2554 2555
      uint32_t len = strlen(pTagCond->tbnameCond.cond);
      pElem->tableCondLen = htonl(len);
S
slguan 已提交
2556

H
hjxilinx 已提交
2557 2558
      memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      pMsg += len;
S
slguan 已提交
2559 2560
    }

2561
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
2562

H
hjxilinx 已提交
2563
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
      for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pMeterMetaInfo->tagColumnIndex[j]);
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
2579 2580
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
          SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
2581 2582
          SColIndexEx *pDestCol = (SColIndexEx *)pMsg;

H
hjxilinx 已提交
2583 2584 2585 2586
          pDestCol->colIdxInBuf = 0;
          pDestCol->colIdx = htons(pCol->colIdx);
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
2587

H
hjxilinx 已提交
2588
          pMsg += sizeof(SColIndexEx);
S
slguan 已提交
2589 2590
        }
      }
H
hzcheng 已提交
2591
    }
S
slguan 已提交
2592

S
slguan 已提交
2593
    strcpy(pElem->tableId, pMeterMetaInfo->name);
S
slguan 已提交
2594 2595 2596 2597
    pElem->numOfTags = htons(pMeterMetaInfo->numOfTags);

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
2598 2599 2600 2601
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2602
  pCmd->msgType = TSDB_MSG_TYPE_STABLE_META;
H
hzcheng 已提交
2603
  assert(msgLen + minMsgSize() <= size);
2604 2605
  
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2606 2607
}

2608
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
2609 2610 2611 2612
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
2613
  size += sizeof(SQqueryList);
H
hzcheng 已提交
2614 2615 2616

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
2617
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
2618 2619 2620
    tpSql = tpSql->next;
  }

S
slguan 已提交
2621
  size += sizeof(SStreamList);
H
hzcheng 已提交
2622 2623
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
2624
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
2625 2626 2627 2628 2629 2630
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2631
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2632 2633 2634 2635 2636 2637 2638 2639 2640
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

2641
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
2642 2643 2644 2645
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_HEARTBEAT;

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

int tscProcessMeterMetaRsp(SSqlObj *pSql) {
  SMeterMeta *pMeta;
S
slguan 已提交
2667
  SSchema *   pSchema;
H
hzcheng 已提交
2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681
  uint8_t     ieType;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;
  pMeta = (SMeterMeta *)rsp;

  pMeta->sid = htonl(pMeta->sid);
S
slguan 已提交
2682
  pMeta->sversion = htons(pMeta->sversion);
H
hzcheng 已提交
2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697
  pMeta->vgid = htonl(pMeta->vgid);
  pMeta->uid = htobe64(pMeta->uid);

  if (pMeta->sid < 0 || pMeta->vgid < 0) {
    tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
    return TSDB_CODE_INVALID_VALUE;
  }

  pMeta->numOfColumns = htons(pMeta->numOfColumns);

  if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMeta->numOfTags);
    return TSDB_CODE_INVALID_VALUE;
  }

2698
  if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) {
H
hzcheng 已提交
2699 2700 2701 2702 2703 2704 2705 2706 2707 2708
    tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    return TSDB_CODE_INVALID_VALUE;
  }

  for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
    pMeta->vpeerDesc[i].vnode = htonl(pMeta->vpeerDesc[i].vnode);
  }

  pMeta->rowSize = 0;
  rsp += sizeof(SMeterMeta);
S
slguan 已提交
2709
  pSchema = (SSchema *)rsp;
H
hzcheng 已提交
2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722

  int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);

    // ignore the tags length
    if (i < pMeta->numOfColumns) {
      pMeta->rowSize += pSchema->bytes;
    }
    pSchema++;
  }

S
slguan 已提交
2723
  rsp += numOfTotalCols * sizeof(SSchema);
H
hzcheng 已提交
2724 2725

  int32_t  tagLen = 0;
S
slguan 已提交
2726
  SSchema *pTagsSchema = tsGetTagSchema(pMeta);
H
hzcheng 已提交
2727

S
#1177  
slguan 已提交
2728
  if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
H
hzcheng 已提交
2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740
    for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
      tagLen += pTagsSchema[i].bytes;
    }
  }

  rsp += tagLen;
  int32_t size = (int32_t)(rsp - (char *)pMeta);

  // pMeta->index = rand() % TSDB_VNODES_SUPPORT;
  pMeta->index = 0;

  // todo add one more function: taosAddDataIfNotExists();
2741
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
2742
  assert(pMeterMetaInfo->pMeterMeta == NULL);
H
hzcheng 已提交
2743

S
slguan 已提交
2744 2745
  pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
                                                                  size, tsMeterMetaKeepTimer);
2746
  // todo handle out of memory case
S
slguan 已提交
2747
  if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
H
hzcheng 已提交
2748 2749 2750 2751

  return TSDB_CODE_OTHERS;
}

S
slguan 已提交
2752 2753
/**
 *  multi meter meta rsp pkg format:
S
slguan 已提交
2754
 *  | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
2755 2756 2757
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
S
slguan 已提交
2758
  SSchema *pSchema;
S
slguan 已提交
2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774
  uint8_t  ieType;
  int32_t  totalNum;
  int32_t  i;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

S
slguan 已提交
2775 2776 2777
  SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp;
  totalNum = htonl(pInfo->numOfTables);
  rsp += sizeof(SMultiTableInfoMsg);
S
slguan 已提交
2778 2779 2780

  for (i = 0; i < totalNum; i++) {
    SMultiMeterMeta *pMultiMeta = (SMultiMeterMeta *)rsp;
S
slguan 已提交
2781
    SMeterMeta *     pMeta = &pMultiMeta->metas;
S
slguan 已提交
2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
    pMeta->vgid = htonl(pMeta->vgid);
    pMeta->uid = htobe64(pMeta->uid);

    if (pMeta->sid <= 0 || pMeta->vgid < 0) {
      tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    pMeta->numOfColumns = htons(pMeta->numOfColumns);

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid tag value count:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid numOfTags:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    }

    pMeta->rowSize = 0;
    rsp += sizeof(SMultiMeterMeta);
S
slguan 已提交
2824
    pSchema = (SSchema *)rsp;
S
slguan 已提交
2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837

    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    for (int j = 0; j < numOfTotalCols; ++j) {
      pSchema->bytes = htons(pSchema->bytes);
      pSchema->colId = htons(pSchema->colId);

      // ignore the tags length
      if (j < pMeta->numOfColumns) {
        pMeta->rowSize += pSchema->bytes;
      }
      pSchema++;
    }

S
slguan 已提交
2838
    rsp += numOfTotalCols * sizeof(SSchema);
S
slguan 已提交
2839 2840

    int32_t  tagLen = 0;
S
slguan 已提交
2841
    SSchema *pTagsSchema = tsGetTagSchema(pMeta);
S
slguan 已提交
2842

S
#1177  
slguan 已提交
2843
    if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
S
slguan 已提交
2844 2845 2846 2847 2848 2849 2850 2851 2852
      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
        tagLen += pTagsSchema[j].bytes;
      }
    }

    rsp += tagLen;
    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with SMeterMeta in cache

    pMeta->index = 0;
S
slguan 已提交
2853
    (void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
S
slguan 已提交
2854 2855 2856 2857 2858 2859 2860 2861
  }

  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
2862 2863 2864
int tscProcessMetricMetaRsp(SSqlObj *pSql) {
  SMetricMeta *pMeta;
  uint8_t      ieType;
S
slguan 已提交
2865 2866 2867 2868
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;

  char *rsp = pSql->res.pRsp;
H
hzcheng 已提交
2869 2870 2871 2872 2873 2874 2875 2876 2877

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;

S
slguan 已提交
2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
    pMeta = (SMetricMeta *)rsp;

    size_t size = (size_t)pSql->res.rspLen - 1;
    rsp = rsp + sizeof(SMetricMeta);

    pMeta->numOfMeters = htonl(pMeta->numOfMeters);
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfMeters * sizeof(SMeterSidExtInfo *);
H
hzcheng 已提交
2904

2905 2906
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2907 2908 2909
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2910

2911
    SMetricMeta *pNewMetricMeta = (SMetricMeta *)pBuf;
S
slguan 已提交
2912
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2913

S
slguan 已提交
2914 2915 2916
    pNewMetricMeta->numOfMeters = pMeta->numOfMeters;
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2917

2918
    pBuf = pBuf + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2919

S
slguan 已提交
2920 2921
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2922
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2923

2924 2925
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2926

S
slguan 已提交
2927
      tscTrace("%p metricmeta:vid:%d,numOfMeters:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2928

2929
      pBuf += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2930
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2931

2932
      size_t elemSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2933
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2934 2935
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2936 2937 2938 2939

        ((SMeterSidExtInfo *)pBuf)->uid = htobe64(((SMeterSidExtInfo *)pBuf)->uid);
        ((SMeterSidExtInfo *)pBuf)->sid = htonl(((SMeterSidExtInfo *)pBuf)->sid);

2940 2941
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2942
      }
H
hzcheng 已提交
2943
    }
S
slguan 已提交
2944

2945
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2946 2947
  }

2948
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2949 2950 2951
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

2952
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
2953
    tscGetMetricMetaCacheKey(pQueryInfo, name, pMeterMetaInfo->pMeterMeta->uid);
H
hzcheng 已提交
2954

S
slguan 已提交
2955 2956 2957
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2958

S
slguan 已提交
2959 2960
    // release the used metricmeta
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
H
hzcheng 已提交
2961

S
slguan 已提交
2962 2963 2964 2965 2966 2967 2968 2969 2970
    pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i],
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
    if (pMeterMetaInfo->pMetricMeta == NULL) {
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2971 2972
  }

S
slguan 已提交
2973 2974 2975 2976 2977 2978 2979 2980 2981 2982
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);

  return pSql->res.code;
H
hzcheng 已提交
2983 2984 2985 2986 2987 2988 2989
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
  SMeterMeta * pMeta;
S
slguan 已提交
2990 2991
  SShowRsp *pShow;
  SSchema *    pSchema;
H
hzcheng 已提交
2992 2993
  char         key[20];

2994 2995 2996 2997 2998
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);  //?

2999
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3000

S
slguan 已提交
3001
  pShow = (SShowRsp *)pRes->pRsp;
H
hzcheng 已提交
3002 3003
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
3004
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3005 3006 3007 3008
  pMeta = &(pShow->meterMeta);

  pMeta->numOfColumns = ntohs(pMeta->numOfColumns);

S
slguan 已提交
3009
  pSchema = (SSchema *)((char *)pMeta + sizeof(SMeterMeta));
H
hzcheng 已提交
3010 3011 3012 3013 3014 3015
  pMeta->sid = ntohs(pMeta->sid);
  for (int i = 0; i < pMeta->numOfColumns; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

3016
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
3017 3018
  strcpy(key + 1, "showlist");

S
slguan 已提交
3019
  taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
H
hzcheng 已提交
3020

S
slguan 已提交
3021
  int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(SMeterMeta);
S
slguan 已提交
3022 3023
  pMeterMetaInfo->pMeterMeta =
      (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
3024
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
S
slguan 已提交
3025
  SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3026

3027
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMeta->numOfColumns);
S
slguan 已提交
3028 3029 3030 3031
  SColumnIndex index = {0};

  for (int16_t i = 0; i < pMeta->numOfColumns; ++i) {
    index.columnIndex = i;
3032 3033
    tscColumnBaseInfoInsert(pQueryInfo, &index);
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pMeterSchema[i]);
H
hzcheng 已提交
3034 3035
  }

3036
  tscFieldInfoCalOffset(pQueryInfo);
H
hzcheng 已提交
3037 3038 3039 3040
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
3041
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
3042 3043 3044
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

S
slguan 已提交
3045
  SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
3046
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
3047 3048
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
3049 3050 3051
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
3052
//  SIpList *    pIpList;
S
slguan 已提交
3053
//  char *rsp = pRes->pRsp + sizeof(SConnectRsp);
S
slguan 已提交
3054 3055
//  pIpList = (SIpList *)rsp;
//  tscSetMgmtIpList(pIpList);
H
hzcheng 已提交
3056

S
slguan 已提交
3057
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
3058 3059 3060 3061 3062 3063 3064 3065
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
3066
  STscObj *       pObj = pSql->pTscObj;
3067
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3068 3069

  strcpy(pObj->db, pMeterMetaInfo->name);
H
hzcheng 已提交
3070 3071 3072 3073 3074 3075 3076 3077 3078
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
  taosClearDataCache(tscCacheHandle);
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
3079
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3080 3081

  SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
3082 3083 3084 3085 3086 3087 3088 3089 3090
  if (pMeterMeta == NULL) {
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
3091 3092
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
3093
   */
S
slguan 已提交
3094
  tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3095 3096
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3097 3098 3099
  if (pMeterMetaInfo->pMeterMeta) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3100 3101 3102 3103 3104 3105
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
3106
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3107 3108

  SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
3109 3110 3111 3112
  if (pMeterMeta == NULL) { /* not in cache, abort */
    return 0;
  }

S
slguan 已提交
3113
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3114 3115
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3116
  if (pMeterMetaInfo->pMeterMeta) {
3117
    bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
H
hzcheng 已提交
3118

S
slguan 已提交
3119 3120
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3121

3122
    if (isSuperTable) {  // if it is a super table, reset whole query cache
S
slguan 已提交
3123
      tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140
      taosClearDataCache(tscCacheHandle);
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

  pRes->qhandle = *((uint64_t *)pRes->pRsp);
  pRes->data = NULL;
S
slguan 已提交
3141
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3142 3143 3144 3145
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
3146 3147 3148 3149
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

H
hzcheng 已提交
3150 3151 3152 3153 3154
  SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
  pRes->offset = htobe64(pRetrieve->offset);
S
slguan 已提交
3155
  pRes->useconds = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
3156
  pRes->data = pRetrieve->data;
H
hjxilinx 已提交
3157
  
3158 3159
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
3160

weixin_48148422's avatar
weixin_48148422 已提交
3161
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
3162 3163 3164 3165 3166
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
    
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
3167 3168 3169 3170 3171 3172 3173 3174 3175
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

    int32_t numOfMeters = htonl(*(int32_t*)p);
    p += sizeof(int32_t);
    for (int i = 0; i < numOfMeters; i++) {
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
3176
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
3177
    }
3178 3179
  }

H
hzcheng 已提交
3180 3181
  pRes->row = 0;

S
slguan 已提交
3182
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
3183 3184 3185 3186 3187

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
3188 3189
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
3190
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3191

H
hzcheng 已提交
3192 3193 3194 3195
  SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
3196

3197
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
3198 3199 3200 3201 3202 3203
  pRes->row = 0;
  return 0;
}

void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code);

3204
static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
H
hzcheng 已提交
3205
  int32_t code = TSDB_CODE_SUCCESS;
3206

S
slguan 已提交
3207 3208 3209 3210 3211
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
    tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
3212

H
hzcheng 已提交
3213 3214 3215
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
3216

3217
  tscAddSubqueryInfo(&pNew->cmd);
3218 3219 3220 3221 3222

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

  pNew->cmd.createOnDemand = pSql->cmd.createOnDemand;  // create table if not exists
S
slguan 已提交
3223 3224 3225
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("%p malloc failed for payload to get meter meta", pSql);
    free(pNew);
3226

S
slguan 已提交
3227 3228 3229
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

3230 3231
  SMeterMetaInfo *pNewMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
3232

3233 3234
  strcpy(pNewMeterMetaInfo->name, pMeterMetaInfo->name);
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
H
hzcheng 已提交
3235 3236 3237
  tscTrace("%p new pSqlObj:%p to get meterMeta", pSql, pNew);

  if (pSql->fp == NULL) {
S
slguan 已提交
3238 3239
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3240 3241

    code = tscProcessSql(pNew);
S
slguan 已提交
3242

3243 3244 3245 3246
    /*
     * Update cache only on succeeding in getting metermeta.
     * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
     */
H
hzcheng 已提交
3247
    if (code == TSDB_CODE_SUCCESS) {
3248
      pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
3249
      assert(pMeterMetaInfo->pMeterMeta != NULL);
H
hzcheng 已提交
3250 3251
    }

3252
    tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3253 3254 3255 3256 3257
    tscFreeSqlObj(pNew);

  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
3258
    pNew->sqlstr = strdup(pSql->sqlstr);
H
hzcheng 已提交
3259 3260 3261 3262 3263 3264 3265 3266 3267 3268

    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

3269 3270
int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
  assert(strlen(pMeterMetaInfo->name) != 0);
S
slguan 已提交
3271

3272 3273 3274 3275 3276
  // If this SMeterMetaInfo owns a metermeta, release it first
  if (pMeterMetaInfo->pMeterMeta != NULL) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
  }
  
3277
  pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
S
slguan 已提交
3278 3279 3280 3281 3282
  if (pMeterMetaInfo->pMeterMeta != NULL) {
    SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;

    tscTrace("%p retrieve meterMeta from cache, the number of columns:%d, numOfTags:%d", pSql, pMeterMeta->numOfColumns,
             pMeterMeta->numOfTags);
H
hzcheng 已提交
3283 3284 3285 3286 3287 3288 3289 3290

    return TSDB_CODE_SUCCESS;
  }

  /*
   * for async insert operation, release data block buffer before issue new object to get metermeta
   * because in metermeta callback function, the tscParse function will generate the submit data blocks
   */
3291
  return doGetMeterMetaFromServer(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3292 3293
}

3294 3295 3296
int tscGetMeterMetaEx(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo, bool createIfNotExists) {
  pSql->cmd.createOnDemand = createIfNotExists;
  return tscGetMeterMeta(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3297 3298 3299 3300
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
3301
 *
H
hzcheng 已提交
3302 3303 3304 3305 3306
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
3307
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
3308 3309 3310 3311 3312 3313 3314
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
3315
 * @param tableId       meter id
H
hzcheng 已提交
3316 3317
 * @return              status code
 */
S
slguan 已提交
3318
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
3319 3320
  int code = 0;

H
hzcheng 已提交
3321 3322
  // handle metric meta renew process
  SSqlCmd *pCmd = &pSql->cmd;
3323 3324

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3325
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3326 3327 3328 3329 3330

  // enforce the renew metermeta operation in async model
  if (pSql->fp == NULL) pSql->fp = (void *)0x1;

  /*
S
slguan 已提交
3331
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
3332 3333
   * 2. if get metermeta failed, still get the metermeta
   */
S
slguan 已提交
3334 3335
  if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
    if (pMeterMetaInfo->pMeterMeta) {
H
hjxilinx 已提交
3336
      tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3337
               pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3338
    }
3339

3340
    tscWaitingForCreateTable(pCmd);
S
slguan 已提交
3341
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
H
hzcheng 已提交
3342

3343
    code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo);  // todo ??
H
hzcheng 已提交
3344
  } else {
H
hjxilinx 已提交
3345
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3346 3347
             pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid,
             pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358
  }

  if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
    if (pSql->fp == (void *)0x1) {
      pSql->fp = NULL;
    }
  }

  return code;
}

3359
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
3360 3361
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
3362 3363

  /*
3364
   * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
H
hzcheng 已提交
3365
   */
3366
  bool    required = false;
3367

3368
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
3369
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
3370 3371
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

3372
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
3373
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3374 3375 3376 3377 3378

    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);

    SMetricMeta *ppMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
    if (ppMeta == NULL) {
3379
      required = true;
S
slguan 已提交
3380 3381 3382 3383 3384
      break;
    } else {
      pMeterMetaInfo->pMetricMeta = ppMeta;
    }
  }
H
hzcheng 已提交
3385

3386 3387
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
3388 3389 3390
    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
3391
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
3392 3393 3394 3395
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  pNew->cmd.command = TSDB_SQL_METRIC;
3396 3397
  
  SQueryInfo *pNewQueryInfo = NULL;
3398 3399 3400
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
3401
  
3402
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
3403
    SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
3404

S
slguan 已提交
3405
    SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
3406
    tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
3407 3408 3409 3410 3411 3412
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
3413

3414
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
3415

3416 3417
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
3418

3419 3420
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
3421 3422 3423 3424 3425
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
3426

3427 3428 3429 3430
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
3431 3432 3433

  tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
  if (pSql->fp == NULL) {
S
slguan 已提交
3434 3435
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3436 3437

    code = tscProcessSql(pNew);
S
slguan 已提交
3438

3439 3440 3441 3442 3443
    if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
        char tagstr[TSDB_MAX_TAGS_LEN] = {0};
    
        SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
3444
        tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3445 3446

#ifdef _DEBUG_VIEW
3447
        printf("create metric key:%s, index:%d\n", tagstr, i);
S
slguan 已提交
3448
#endif
3449 3450 3451 3452
    
        taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
        pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
      }
S
slguan 已提交
3453 3454
    }

H
hzcheng 已提交
3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473
    tscFreeSqlObj(pNew);
  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

void tscInitMsgs() {
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
3474
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
3475

3476 3477
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
3478 3479

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
3480
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropAcctMsg;
H
hzcheng 已提交
3481 3482 3483
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
3484
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
3485 3486 3487
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
3488 3489 3490 3491 3492 3493 3494
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_META] = tscBuildMeterMetaMsg;
  tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg;
S
slguan 已提交
3495
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
3496 3497 3498 3499

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
3500 3501 3502
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
3503 3504 3505 3506 3507 3508 3509 3510 3511 3512

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessMeterMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
S
slguan 已提交
3513
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
3514 3515

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
3516
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
3517
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
3518

H
hzcheng 已提交
3519
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
3520 3521 3522 3523 3524
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
3525

H
hzcheng 已提交
3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;

  tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg;
  tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg;
}