tscServer.c 87.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tcache.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
21
#include "tscSubquery.h"
H
hzcheng 已提交
22 23 24 25 26 27 28 29 30 31
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
32
SRpcIpSet  tscMgmtIpList;
S
slguan 已提交
33 34
SRpcIpSet  tscDnodeIpSet;

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
S
slguan 已提交
38
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
H
hzcheng 已提交
39 40
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
41 42 43
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

S
slguan 已提交
47 48
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
S
slguan 已提交
49
    tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
S
slguan 已提交
50
  } else {
S
slguan 已提交
51
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
S
slguan 已提交
52
      tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
S
slguan 已提交
53
    }
S
slguan 已提交
54 55 56
  }
}

S
slguan 已提交
57 58
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
  tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
S
slguan 已提交
59
  tscMgmtIpList.inUse = htons(pIpList->inUse);
S
slguan 已提交
60 61 62
  tscMgmtIpList.port = htons(pIpList->port);
  for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
    tscMgmtIpList.ip[i] = pIpList->ip[i];
S
slguan 已提交
63 64 65 66
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
67 68
  if (tscMgmtIpList.numOfIps != 1) {
    tscMgmtIpList.numOfIps = 1;
S
slguan 已提交
69
    tscMgmtIpList.inUse = 0;
S
slguan 已提交
70
    tscMgmtIpList.port = tsMnodeShellPort;
S
slguan 已提交
71 72 73 74 75 76
    tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
77
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
78 79 80 81 82 83 84 85
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
86 87 88
  }
}

H
hjxilinx 已提交
89 90 91 92 93 94 95
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
96
UNUSED_FUNC
H
hjxilinx 已提交
97 98 99 100 101
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
  return tscMgmtIpList.numOfIps * factor;
}

H
hzcheng 已提交
102 103 104 105 106 107 108 109 110 111 112 113
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
114
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
115
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
116
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
117

H
hzcheng 已提交
118 119 120
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
121 122
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
139 140 141
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
142
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
143
    
144 145 146 147
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
S
slguan 已提交
148 149 150 151 152
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
153 154 155 156
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
157
    tscAddSubqueryInfo(&pObj->pHb->cmd);
158

S
slguan 已提交
159
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
160 161 162
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
163
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
164 165 166 167 168 169 170 171 172
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
S
slguan 已提交
173 174
  char *pMsg = rpcMallocCont(pSql->cmd.payloadLen);
  if (NULL == pMsg) {
S
slguan 已提交
175 176
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
177 178
  }

S
slguan 已提交
179
  pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
180
  
S
slguan 已提交
181
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
182
    pSql->ipList->port = tsDnodeShellPort;
S
slguan 已提交
183 184
    tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
185 186 187 188 189 190 191 192 193

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
      .code   = 0
    };
    rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
S
slguan 已提交
194
  } else {
S
slguan 已提交
195
    pSql->ipList->port = tsMnodeShellPort;
S
slguan 已提交
196
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
S
slguan 已提交
197
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
198 199 200 201 202 203 204 205
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
    rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg);
H
hzcheng 已提交
206 207
  }

S
slguan 已提交
208
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
209 210
}

211
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
S
slguan 已提交
212
  tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
213
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
S
slguan 已提交
214
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
215
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
216
    return;
H
hzcheng 已提交
217 218
  }

S
slguan 已提交
219 220 221
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
222
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
223 224

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
225 226
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
227
    tscFreeSqlObj(pSql);
228
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
229
    return;
H
hzcheng 已提交
230 231
  }

232 233
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
234
  } else {
235
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
236 237 238 239
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION ||
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
240 241 242 243 244 245 246 247 248 249 250
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
251 252
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
253 254
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
255 256
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
257 258
        return;
      } else {
259
        tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code);
H
hzcheng 已提交
260

S
slguan 已提交
261
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
262
        pSql->res.code = rpcMsg->code;  // keep the previous error code
S
slguan 已提交
263

H
hjxilinx 已提交
264
        rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
H
hzcheng 已提交
265

H
hjxilinx 已提交
266
        if (pTableMetaInfo->pTableMeta) {
S
slguan 已提交
267
          tscSendMsgToServer(pSql);
268
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
269 270
          return;
        }
H
hzcheng 已提交
271 272
      }
    }
S
slguan 已提交
273
  }
H
hzcheng 已提交
274 275 276

  pSql->retry = 0;
  pRes->rspLen = 0;
277
  
H
hzcheng 已提交
278
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
279
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
280 281 282 283
  } else {
    tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
  }

S
slguan 已提交
284
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
285
    assert(rpcMsg->msgType == pCmd->msgType + 1);
286
    pRes->code    = rpcMsg->code;
287
    pRes->rspType = rpcMsg->msgType;
288
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
289

S
slguan 已提交
290 291 292 293 294 295
    char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
    if (tmp == NULL) {
      pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    } else {
      pRes->pRsp = tmp;
      if (pRes->rspLen) {
296
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
297 298 299 300
      }
    }

    // ignore the error information returned from mnode when set ignore flag in sql
S
slguan 已提交
301
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) {
H
hzcheng 已提交
302 303 304 305 306 307 308
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
309 310
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
311 312 313 314 315 316 317
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
S
slguan 已提交
318 319 320 321
      tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
               *(int32_t *)pRes->pRsp, pRes->rspLen);
    } else {
      tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
H
hzcheng 已提交
322 323 324
    }
  }

325 326
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hzcheng 已提交
327

328 329 330 331
  if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
    int   command = pCmd->command;
    void *taosres = tscKeepConn[command] ? pSql : NULL;
    rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
H
hzcheng 已提交
332

333
    tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
H
hzcheng 已提交
334

335 336 337 338 339 340 341 342 343
    /*
     * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
     * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
     * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
     *
     * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
     * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
     */
    bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
H
hjxilinx 已提交
344
    (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
345

346 347 348 349 350 351 352 353
    if (shouldFree) {
      // If it is failed, all objects allocated during execution taos_connect_a should be released
      if (command == TSDB_SQL_CONNECT) {
        taos_close(pObj);
        tscTrace("%p Async sql close failed connection", pSql);
      } else {
        tscFreeSqlObj(pSql);
        tscTrace("%p Async sql is automatically freed", pSql);
H
hzcheng 已提交
354 355 356 357
      }
    }
  }

358
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
359 360
}

S
slguan 已提交
361 362 363 364
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
hjxilinx 已提交
365 366 367 368 369 370 371 372
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
      pCmd->command == TSDB_SQL_METRIC) {
373
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
374
  }
375 376 377

  int32_t code = tscSendMsgToServer(pSql);
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
378
    pRes->code = code;
H
hjxilinx 已提交
379
    tscQueueAsyncRes(pSql);
S
slguan 已提交
380
  }
H
hjxilinx 已提交
381 382
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
383 384 385
}

int tscProcessSql(SSqlObj *pSql) {
386 387 388
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
389 390
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
391
  STableMetaInfo *pTableMetaInfo = NULL;
392
  uint16_t        type = 0;
393

394
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
395
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
396 397
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
398
    }
399

400
    type = pQueryInfo->type;
401 402 403
  
    // for hearbeat, numOfTables == 0;
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
404
  }
405

406
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
407
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
H
hjxilinx 已提交
408 409
    // the pTableMetaInfo cannot be NULL
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
410 411 412
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
413

S
slguan 已提交
414 415
    // temp
    pSql->ipList = &tscMgmtIpList;
H
hjxilinx 已提交
416
//    if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
417
//      pSql->index = pTableMetaInfo->pTableMeta->index;
S
slguan 已提交
418 419
//    } else {  // it must be the parent SSqlObj for super table query
//      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
H
hjxilinx 已提交
420
//        int32_t idx = pTableMetaInfo->vnodeIndex;
S
slguan 已提交
421
//
H
hjxilinx 已提交
422
//        SVnodeSidList *pSidList = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
S
slguan 已提交
423 424 425
//        pSql->index = pSidList->index;
//      }
//    }
H
hzcheng 已提交
426
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
S
slguan 已提交
427
    pSql->ipList = &tscMgmtIpList;
H
hzcheng 已提交
428 429 430 431
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
432
  // todo handle async situation
433 434
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
H
hjxilinx 已提交
435
      return tscHandleMasterJoinQuery(pSql);
S
slguan 已提交
436 437
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
438
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
439 440 441 442
        return doProcessSql(pSql);
      }
    }
  }
443
  
H
hjxilinx 已提交
444 445 446 447 448
  if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
    tscHandleMasterSTableQuery(pSql);
    return pRes->code;
  } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) {  // multi-vnodes insertion
    tscHandleMultivnodeInsert(pSql);
H
hzcheng 已提交
449 450
    return pSql->res.code;
  }
451
  
S
slguan 已提交
452 453
  return doProcessSql(pSql);
}
H
hzcheng 已提交
454 455

void tscKillMetricQuery(SSqlObj *pSql) {
456 457 458
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
459
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
460 461 462 463 464 465
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
466
    if (pSub == NULL) {
H
hzcheng 已提交
467 468
      continue;
    }
S
slguan 已提交
469

H
hzcheng 已提交
470 471 472 473 474
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
475
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

497
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
498 499 500 501 502
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

503
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
504
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
505 506
  pMsg += sizeof(pSql->res.qhandle);

507
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
508
  pRetrieveMsg->free = htons(pQueryInfo->type);
509
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
510

511 512 513 514 515
  pRetrieveMsg->header.vgId = htonl(1);
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
S
slguan 已提交
516
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
517
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
518 519
}

S
slguan 已提交
520
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
521
  //SSubmitMsg *pShellMsg;
522
  //char *           pMsg;
523
  //STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
S
slguan 已提交
524

H
hjxilinx 已提交
525
  //STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
H
hzcheng 已提交
526

527
  //pMsg = buf + tsRpcHeadSize;
H
hzcheng 已提交
528

S
slguan 已提交
529
  //TODO set iplist
530
  //pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
531 532
  //pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
  //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip),
S
slguan 已提交
533
  //         htons(pShellMsg->vnode));
H
hzcheng 已提交
534 535
}

536
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
537
  SSubmitMsg *pShellMsg;
H
hzcheng 已提交
538
  char *           pMsg, *pStart;
S
slguan 已提交
539

H
hjxilinx 已提交
540
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
541
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
542
  
H
hzcheng 已提交
543 544
  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;
H
hjxilinx 已提交
545
  
546 547 548
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
  pMsgDesc->numOfVnodes = htonl(1);       //set the number of vnodes
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
549
  
550
  pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
551
  pShellMsg->header.vgId = htonl(pTableMeta->vgId);
H
hjxilinx 已提交
552
  pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen);
553
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
554
  
555
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
556

S
slguan 已提交
557
  // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
S
slguan 已提交
558
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
559
  
H
hjxilinx 已提交
560 561
//  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
//           htons(pShellMsg->vnode));
562
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
563 564
}

S
slguan 已提交
565
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
S
slguan 已提交
566 567
  //TODO
//  SSqlCmd *       pCmd = &pSql->cmd;
568
//  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
569 570
//
//  char *          pStart = buf + tsRpcHeadSize;
S
slguan 已提交
571
//  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
S
slguan 已提交
572
//
H
hjxilinx 已提交
573
//  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {  // pColumnModel == NULL, query on meter
H
hjxilinx 已提交
574 575
//    STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
//    pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
S
slguan 已提交
576
//  } else {  // query on metric
H
hjxilinx 已提交
577 578
//    SSuperTableMeta *  pMetricMeta = pTableMetaInfo->pMetricMeta;
//    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
S
slguan 已提交
579 580
//    pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
//  }
H
hzcheng 已提交
581 582 583 584 585 586
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
587
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
588
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
589
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
590

591
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
592

H
hjxilinx 已提交
593
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
H
hjxilinx 已提交
594
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
595 596

  // meter query without tags values
H
hjxilinx 已提交
597
  if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
S
slguan 已提交
598
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
599 600
  }

H
hjxilinx 已提交
601 602
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
603

604
  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
H
hjxilinx 已提交
605
  int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
606

S
slguan 已提交
607
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
608 609
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
610 611 612
  }

  return size;
H
hzcheng 已提交
613 614
}

H
hjxilinx 已提交
615
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) {
616
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
617

H
hjxilinx 已提交
618 619
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
620

H
hjxilinx 已提交
621
  tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables);
H
hjxilinx 已提交
622
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
623
#ifdef _DEBUG_VIEW
H
hjxilinx 已提交
624
    tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
625
#endif
H
hjxilinx 已提交
626 627 628 629
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->sid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
630
    pMsg += sizeof(STableIdInfo);
631
  } else {
H
hjxilinx 已提交
632
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
633

S
slguan 已提交
634
    for (int32_t i = 0; i < numOfTables; ++i) {
635 636
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      STableIdInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
637

638 639 640
      pTableIdInfo->sid = htonl(pQueryMeterInfo->sid);
      pTableIdInfo->uid = htobe64(pQueryMeterInfo->uid);
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
641
      
642
      pMsg += sizeof(STableIdInfo);
643 644

#ifdef _DEBUG_VIEW
L
lihui 已提交
645
      tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
646 647 648
#endif
    }
  }
649

650 651 652
  return pMsg;
}

653
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
654 655
  SSqlCmd *pCmd = &pSql->cmd;

656
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
657

S
slguan 已提交
658 659 660 661
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }
662
  
663
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
664
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
665 666
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
667 668 669 670 671 672 673
  
  if (pQueryInfo->colList.numOfCols <= 0) {
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
674

S
slguan 已提交
675
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
676 677

  int32_t msgLen = 0;
S
slguan 已提交
678
  int32_t numOfTables = 0;
H
hzcheng 已提交
679

H
hjxilinx 已提交
680
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
S
slguan 已提交
681
    numOfTables = 1;
682
    pQueryMsg->head.vgId = htonl(pTableMeta->vgId);
H
hjxilinx 已提交
683
    tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
684
  } else {  // query on super table
H
hjxilinx 已提交
685 686
    if (pTableMetaInfo->vnodeIndex < 0) {
      tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
687 688 689
      return -1;
    }

H
hjxilinx 已提交
690
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
691 692
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

S
slguan 已提交
693 694 695
    numOfTables = pVnodeSidList->numOfSids;
    if (numOfTables <= 0) {
      tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
H
hzcheng 已提交
696 697 698
      return -1;  // error
    }

H
hjxilinx 已提交
699
    tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
700
    pQueryMsg->head.vgId = htons(vnodeId);
H
hzcheng 已提交
701 702
  }

H
hjxilinx 已提交
703
  pQueryMsg->numOfTables = htonl(numOfTables);
H
hzcheng 已提交
704

705
  if (pQueryInfo->order.order == TSQL_SO_ASC) {
H
hjxilinx 已提交
706 707
    pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
708
  } else {
H
hjxilinx 已提交
709 710
    pQueryMsg->window.skey = htobe64(pQueryInfo->etime);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
711 712
  }

713 714 715 716 717 718 719 720
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
  pQueryMsg->interpoType    = htons(pQueryInfo->interpoType);
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
  pQueryMsg->numOfCols      = htons(pQueryInfo->colList.numOfCols);
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
721
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
722
  
723 724
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
H
hzcheng 已提交
725 726 727
    return -1;
  }

728 729
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
730 731 732
    return -1;
  }

733
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
734

H
hjxilinx 已提交
735
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {  // query on meter
H
hzcheng 已提交
736
    pQueryMsg->tagLength = 0;
H
hjxilinx 已提交
737
  } else {  // query on super table
H
hzcheng 已提交
738 739 740
    pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
  }

741 742
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
743

744
  if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
745 746
    tscError("%p illegal value of number of output columns in query msg: %d", pSql,
             pQueryInfo->fieldsInfo.numOfOutputCols);
H
hzcheng 已提交
747 748 749 750
    return -1;
  }

  // set column list ids
751
  char *   pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
752
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
753

754 755
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
756
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
757

H
hjxilinx 已提交
758 759 760 761 762 763 764 765
//    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
//        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
//      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
//               htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
//               pColSchema->name);
//
//      return -1;  // 0 means build msg failed
//    }
H
hzcheng 已提交
766 767 768

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
769
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
770
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
771

S
slguan 已提交
772 773 774
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
775

S
slguan 已提交
776 777 778 779
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
780

S
slguan 已提交
781 782 783 784 785 786 787 788 789 790 791
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
792

S
slguan 已提交
793 794 795 796 797
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
798 799 800 801
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
802
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hzcheng 已提交
803

H
hjxilinx 已提交
804
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
805
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
806

S
slguan 已提交
807
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
808 809 810
      hasArithmeticFunction = true;
    }

H
hjxilinx 已提交
811
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
H
hzcheng 已提交
812 813 814 815 816
      /* column id is not valid according to the cached metermeta, the meter meta is expired */
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

817
    pSqlFuncExpr->colInfo.colId  = htons(pExpr->colInfo.colId);
H
hzcheng 已提交
818
    pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
819
    pSqlFuncExpr->colInfo.flag   = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
820

821
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
822 823 824 825 826 827 828 829 830
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
831 832 833

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
834 835 836 837 838 839 840 841 842 843
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
844 845
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
846
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
847 848 849 850 851 852 853 854 855 856 857
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

858
  // serialize the table info (sid, uid, tags)
859
  pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->head.vgId), pMsg);
H
hzcheng 已提交
860

861
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
S
slguan 已提交
862 863
  if (pGroupbyExpr->numOfGroupCols != 0) {
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
864 865
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
866 867 868 869 870 871 872 873 874 875 876 877 878 879
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
      SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

      *((int16_t *)pMsg) += pCol->colIdx;
      pMsg += sizeof(pCol->colIdx);

      *((int16_t *)pMsg) += pCol->colIdxInBuf;
      pMsg += sizeof(pCol->colIdxInBuf);

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
880 881 882
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
883 884 885
    }
  }

886 887 888 889
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
890 891 892 893 894 895 896 897
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

898
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
899
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vnodeIndex);
900
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
901 902

    // todo refactor
903 904
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
905 906 907 908

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
909 910
  }

S
slguan 已提交
911 912
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
913 914
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
915 916 917 918 919 920
  }

  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
921
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
922
  
923
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
924
  assert(msgLen + minMsgSize() <= size);
925 926

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
927 928
}

929 930
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
931
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
932
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
933

S
slguan 已提交
934 935 936 937 938
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

939
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
940

941
  assert(pCmd->numOfClause == 1);
942
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
943
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
944

945
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
946 947
}

948 949
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
950
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
951 952 953 954
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
955

956
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
957
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
S
slguan 已提交
958
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
959

960
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
961 962
}

963 964
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
965
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
966 967 968 969
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
970

971
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
972

973 974
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
975

976 977
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
978

979
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
980

981 982 983 984 985 986 987 988
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
989

990 991 992 993 994 995 996 997 998 999 1000 1001 1002
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1003

S
slguan 已提交
1004
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
1005
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1006 1007
}

1008 1009
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1010
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
1011

S
slguan 已提交
1012 1013 1014 1015 1016
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1017
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
1018

1019 1020 1021
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
1022

1023 1024 1025 1026
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1027 1028
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1029
  }
H
hzcheng 已提交
1030

1031
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1032
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1033
  } else {
S
slguan 已提交
1034
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1035
  }
H
hzcheng 已提交
1036

1037
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1038 1039
}

1040 1041
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1042
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
1043

S
slguan 已提交
1044 1045 1046 1047
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1048

S
slguan 已提交
1049
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
1050 1051
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1052

1053 1054
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1055
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1056

S
slguan 已提交
1057 1058 1059 1060 1061
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1062
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1063

1064
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1065
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
1066
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1067

S
slguan 已提交
1068
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1069
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1070 1071
}

1072 1073
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1074
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1075

S
slguan 已提交
1076 1077 1078 1079
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1080

1081
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1082
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1083
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1084
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1085

S
slguan 已提交
1086
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1087
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1088 1089
}

1090
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1091
  SSqlCmd *pCmd = &pSql->cmd;
1092
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1093 1094 1095 1096
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1097

1098
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1099
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1100
  strcpy(pDrop->ip, pTableMetaInfo->name);
S
slguan 已提交
1101
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1102

1103
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1104 1105
}

1106 1107
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1108
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1109
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1110

S
slguan 已提交
1111 1112 1113 1114
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1115

1116
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1117
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1118
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1119

1120
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1121 1122
}

1123 1124
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1125
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1126

S
slguan 已提交
1127 1128 1129 1130
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1131

1132
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1133
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1134
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1135
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1136

1137
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1138 1139
}

1140
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1141
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1142
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1143
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1144
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1145

S
slguan 已提交
1146 1147 1148
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1149
  }
H
hzcheng 已提交
1150

1151
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1152

1153
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1154
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1155
  if (nameLen > 0) {
H
hjxilinx 已提交
1156
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1157
  } else {
S
slguan 已提交
1158
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1159 1160
  }

1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1173

1174 1175 1176 1177
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1178
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1179
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1180 1181
}

1182
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1183
  SSqlCmd *pCmd = &pSql->cmd;
1184
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1185

S
slguan 已提交
1186 1187 1188 1189
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1190

1191
  SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
1192 1193 1194
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1195
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1196 1197
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1198
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1199 1200
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1201
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1202 1203 1204
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1205 1206
}

1207
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1208 1209
  SSqlCmd *pCmd = &(pSql->cmd);

1210
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1211

1212
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1213
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1214 1215
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1216
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1217
  }
1218

1219 1220 1221
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1222 1223 1224 1225

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1226
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1227
  int              msgLen = 0;
S
slguan 已提交
1228
  SSchema *        pSchema;
H
hzcheng 已提交
1229
  int              size = 0;
1230 1231 1232
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1233
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1234 1235

  // Reallocate the payload size
1236
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1237 1238
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1239
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1240
  }
H
hzcheng 已提交
1241 1242


1243
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1244
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1245 1246

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1247
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1248

1249 1250 1251 1252
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;

H
hzcheng 已提交
1253 1254 1255 1256
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1257
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1258

1259 1260 1261
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
1262
    pMsg += sizeof(STagData);
1263
  } else {  // create (super) table
1264
    pSchema = (SSchema *)pCreateTableMsg->schema;
1265

H
hzcheng 已提交
1266
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
1267
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
1268 1269 1270 1271

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1272

H
hzcheng 已提交
1273 1274 1275 1276
      pSchema++;
    }

    pMsg = (char *)pSchema;
1277 1278
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1279

1280 1281 1282
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1283 1284 1285
    }
  }

1286
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1287

S
slguan 已提交
1288
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1289
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1290
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1291
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1292 1293

  assert(msgLen + minMsgSize() <= size);
1294
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1295 1296 1297
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1298
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1299
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1300 1301 1302
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1303
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1304
  SCMAlterTableMsg *pAlterTableMsg;
1305
  char *          pMsg;
H
hzcheng 已提交
1306 1307 1308
  int             msgLen = 0;
  int             size = 0;

1309 1310 1311
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1312
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1313 1314

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1315 1316 1317 1318
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1319

1320
  pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
1321

H
hjxilinx 已提交
1322
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1323

1324 1325
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hjxilinx 已提交
1326
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1327
  pAlterTableMsg->type = htons(pAlterInfo->type);
1328

1329
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
1330
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
1331

S
slguan 已提交
1332
  SSchema *pSchema = pAlterTableMsg->schema;
1333 1334
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
1335 1336 1337 1338 1339 1340 1341 1342 1343

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
1344
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
1345
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1346
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1347 1348

  assert(msgLen + minMsgSize() <= size);
1349

1350
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1351 1352
}

1353
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1354
  SSqlCmd *pCmd = &pSql->cmd;
1355
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1356
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1357

S
slguan 已提交
1358 1359 1360 1361
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1362

1363
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1364
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1365
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
1366

1367
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1368 1369
}

1370
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1371 1372 1373
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1374

S
slguan 已提交
1375 1376 1377
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
1378
  }
S
slguan 已提交
1379

S
slguan 已提交
1380 1381 1382 1383
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1384

1385
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1386 1387
}

1388
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1389
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1390 1391 1392
    return pRes->code;
  }

1393
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
H
hjxilinx 已提交
1394 1395
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
    pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1410

1411
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1412

H
hzcheng 已提交
1413 1414 1415 1416 1417 1418 1419
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1420
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1421
  } else {
S
slguan 已提交
1422
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1438
  SSqlCmd *       pCmd = &pSql->cmd;
1439
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1440

H
hjxilinx 已提交
1441
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1442 1443
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1444 1445 1446 1447
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
1448 1449
  SSqlCmd *pCmd = &pSql->cmd;

1450
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
1451
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1452 1453

  int32_t numOfRes = 0;
1454
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
H
hjxilinx 已提交
1455
    numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1466 1467
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1468 1469

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1470
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1471 1472 1473 1474
  }

  pRes->row = 0;

1475
  uint8_t code = pRes->code;
H
hzcheng 已提交
1476
  if (pSql->fp) {  // async retrieve metric data
1477 1478
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
1479 1480 1481 1482 1483 1484 1485 1486
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
1487
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1488

1489
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1490
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1491
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1492
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1493
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1494

S
slguan 已提交
1495 1496 1497 1498 1499
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1500
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1501 1502 1503 1504 1505

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
1506
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
1507
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
1508

1509
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1510 1511
}

H
hjxilinx 已提交
1512
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1513
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1514
  char *         pMsg;
H
hzcheng 已提交
1515 1516 1517 1518 1519
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
1520 1521 1522 1523
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
1524 1525 1526 1527
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

1528 1529 1530
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1531
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1532

1533
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1534
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1535
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1536

1537
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1538

H
hjxilinx 已提交
1539
  if (pSql->cmd.autoCreated) {
H
hzcheng 已提交
1540 1541 1542 1543
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

S
slguan 已提交
1544
  msgLen = pMsg - (char*)pInfoMsg;
H
hzcheng 已提交
1545
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1546
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1547 1548 1549 1550

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1551
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1552 1553
}

S
slguan 已提交
1554 1555
/**
 *  multi meter meta req pkg format:
1556
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1557 1558
 *      no used         4B
 **/
1559
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1572
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1573

1574
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1575
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1576 1577

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1578
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1579 1580 1581 1582
  }

  tfree(tmpData);

1583
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1584
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1585 1586 1587

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1588
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1589 1590 1591 1592 1593
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hzcheng 已提交
1594 1595
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
  const int32_t defaultSize =
S
slguan 已提交
1596
      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
1597
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
1598

S
slguan 已提交
1599
  int32_t n = 0;
1600 1601
  for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
1602
  }
S
slguan 已提交
1603

H
hjxilinx 已提交
1604
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
1605 1606
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
1607
  }
1608

S
slguan 已提交
1609
  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
S
slguan 已提交
1610
  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
H
hjxilinx 已提交
1611 1612
  
  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndexEx);
S
slguan 已提交
1613

H
hjxilinx 已提交
1614
  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
S
slguan 已提交
1615 1616

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
1617 1618
}

1619
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1620
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1621 1622
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1623
  int             tableIndex = 0;
H
hzcheng 已提交
1624

1625 1626 1627
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1628
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1629

H
hjxilinx 已提交
1630
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1631 1632

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1633 1634 1635 1636
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1637 1638 1639 1640 1641

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
1642
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1643 1644 1645

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1646
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1647
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1648

S
slguan 已提交
1649
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1650 1651 1652 1653 1654

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1655
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1656

S
slguan 已提交
1657
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1658
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1659

S
slguan 已提交
1660 1661 1662
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1663
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1664
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1665 1666 1667 1668

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1669
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1670
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1671
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1672 1673 1674 1675

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1676 1677
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1678 1679 1680 1681 1682

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
      SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
H
hjxilinx 已提交
1683
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1684
        condLen = strlen(pCond->cond) + 1;
1685

H
hjxilinx 已提交
1686
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1687 1688 1689 1690 1691
        if (!ret) {
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
          return 0;
        }
      }
H
hzcheng 已提交
1692 1693
    }

S
slguan 已提交
1694
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1695

S
slguan 已提交
1696 1697 1698
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1699

S
slguan 已提交
1700 1701 1702
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1703

S
slguan 已提交
1704
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1705 1706 1707 1708 1709 1710 1711
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1712 1713
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1714 1715
    }

1716
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1717

H
hjxilinx 已提交
1718
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1719 1720 1721 1722 1723
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1724 1725
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1726 1727 1728 1729 1730 1731 1732 1733
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1734 1735
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
          SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
1736 1737
          SColIndexEx *pDestCol = (SColIndexEx *)pMsg;

H
hjxilinx 已提交
1738 1739 1740 1741
          pDestCol->colIdxInBuf = 0;
          pDestCol->colIdx = htons(pCol->colIdx);
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1742
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1743

H
hjxilinx 已提交
1744
          pMsg += sizeof(SColIndexEx);
S
slguan 已提交
1745 1746
        }
      }
H
hzcheng 已提交
1747
    }
S
slguan 已提交
1748

H
hjxilinx 已提交
1749 1750
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1751 1752 1753

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1754 1755 1756 1757
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1758
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_META;
H
hzcheng 已提交
1759
  assert(msgLen + minMsgSize() <= size);
1760 1761
  
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1762 1763
}

1764
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
1765 1766 1767 1768
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
1769
  size += sizeof(SQqueryList);
H
hzcheng 已提交
1770 1771 1772

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
1773
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
1774 1775 1776
    tpSql = tpSql->next;
  }

S
slguan 已提交
1777
  size += sizeof(SStreamList);
H
hzcheng 已提交
1778 1779
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
1780
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
1781 1782 1783 1784 1785 1786
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1787
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1788 1789 1790 1791 1792 1793 1794 1795 1796
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

1797
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
1798 1799 1800 1801
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1815
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1816 1817 1818 1819 1820

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

1821 1822
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1823

1824 1825
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
H
hjxilinx 已提交
1826
  pMetaMsg->vgId = htonl(pMetaMsg->vgId);
1827 1828
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
H
hzcheng 已提交
1829

H
hjxilinx 已提交
1830 1831
  if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid);
H
hzcheng 已提交
1832 1833 1834
    return TSDB_CODE_INVALID_VALUE;
  }

1835
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1836

1837 1838
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
H
hzcheng 已提交
1839 1840 1841
    return TSDB_CODE_INVALID_VALUE;
  }

1842 1843
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
H
hzcheng 已提交
1844 1845 1846 1847
    return TSDB_CODE_INVALID_VALUE;
  }

  for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
1848
    pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode);
H
hzcheng 已提交
1849 1850
  }

1851
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1852

1853
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1854 1855 1856 1857 1858 1859
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
    pSchema++;
  }

S
slguan 已提交
1860 1861 1862
//  rsp += numOfTotalCols * sizeof(SSchema);
//
//  int32_t  tagLen = 0;
1863
//  SSchema *pTagsSchema = tscGetTableTagSchema(pMetaMsg);
S
slguan 已提交
1864
//
1865 1866
//  if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
//    for (int32_t i = 0; i < pMetaMsg->numOfTags; ++i) {
S
slguan 已提交
1867 1868 1869 1870 1871
//      tagLen += pTagsSchema[i].bytes;
//    }
//  }
//
//  rsp += tagLen;
1872
//  int32_t size = (int32_t)(rsp - (char *)pMetaMsg);
H
hzcheng 已提交
1873

1874 1875
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
H
hzcheng 已提交
1876 1877

  // todo add one more function: taosAddDataIfNotExists();
1878
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1879
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1880

H
hjxilinx 已提交
1881 1882 1883
  pTableMetaInfo->pTableMeta =
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer);
  
1884
  // todo handle out of memory case
1885
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hjxilinx 已提交
1886
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
1887
  }
H
hzcheng 已提交
1888

1889
  free(pTableMeta);
H
hjxilinx 已提交
1890
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1891 1892
}

S
slguan 已提交
1893 1894
/**
 *  multi meter meta rsp pkg format:
1895
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
  uint8_t  ieType;
  int32_t  totalNum;
  int32_t  i;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

1915
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1916
  totalNum = htonl(pInfo->numOfTables);
1917
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1918 1919

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1920
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1921
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1922 1923 1924

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1925
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1926 1927
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1928 1929
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
S
slguan 已提交
1930 1931 1932 1933 1934
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
    //  }
S
slguan 已提交
1995
  }
H
hjxilinx 已提交
1996
  
S
slguan 已提交
1997 1998 1999 2000 2001 2002
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
2003
int tscProcessMetricMetaRsp(SSqlObj *pSql) {
S
slguan 已提交
2004
  SSuperTableMeta *pMeta;
H
hzcheng 已提交
2005
  uint8_t      ieType;
S
slguan 已提交
2006 2007 2008 2009
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;

  char *rsp = pSql->res.pRsp;
H
hzcheng 已提交
2010 2011 2012 2013 2014 2015 2016 2017 2018

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;

S
slguan 已提交
2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
2035
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
2036 2037

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
2038
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
2039

S
slguan 已提交
2040
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
2041 2042 2043
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

2044
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
H
hzcheng 已提交
2045

2046 2047
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2048 2049 2050
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2051

S
slguan 已提交
2052
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
2053
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2054

S
slguan 已提交
2055
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2056 2057
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2058

S
slguan 已提交
2059
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2060

S
slguan 已提交
2061 2062
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2063
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2064

2065 2066
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2067

S
slguan 已提交
2068
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2069

2070
      pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2071
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2072

2073
      size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2074
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2075 2076
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2077

2078 2079
        ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
        ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
2080

2081 2082
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2083
      }
H
hzcheng 已提交
2084
    }
S
slguan 已提交
2085

2086
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2087 2088
  }

2089
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2090 2091 2092
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2093
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2094
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2095

S
slguan 已提交
2096 2097 2098
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2099

S
slguan 已提交
2100
    // release the used metricmeta
H
hjxilinx 已提交
2101
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
H
hzcheng 已提交
2102

H
hjxilinx 已提交
2103
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2104 2105 2106 2107
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2108
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2109 2110 2111
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2112 2113
  }

S
slguan 已提交
2114 2115 2116 2117 2118 2119 2120 2121 2122 2123
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);

  return pSql->res.code;
H
hzcheng 已提交
2124 2125 2126 2127 2128 2129
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2130
  STableMetaMsg * pMetaMsg;
2131
  SCMShowRsp *pShow;
S
slguan 已提交
2132
  SSchema *    pSchema;
H
hzcheng 已提交
2133 2134
  char         key[20];

2135 2136 2137
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2138
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2139

H
hjxilinx 已提交
2140
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2141

2142
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2143
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2144 2145
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2146
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2147
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2148

H
hjxilinx 已提交
2149
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2150

H
hjxilinx 已提交
2151
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2152 2153
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2154 2155 2156 2157
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2158
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2159 2160
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2161
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2162 2163 2164
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2165
  pTableMetaInfo->pTableMeta =
H
hjxilinx 已提交
2166 2167
      (STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsMeterMetaKeepTimer);
  
2168
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
H
hjxilinx 已提交
2169
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2170

H
hjxilinx 已提交
2171
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMetaMsg->numOfColumns);
S
slguan 已提交
2172 2173
  SColumnIndex index = {0};

H
hjxilinx 已提交
2174
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
S
slguan 已提交
2175
    index.columnIndex = i;
2176
    tscColumnBaseInfoInsert(pQueryInfo, &index);
H
hjxilinx 已提交
2177
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pTableSchema[i]);
H
hjxilinx 已提交
2178 2179
    
    pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
H
hjxilinx 已提交
2180
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes);
H
hzcheng 已提交
2181 2182
  }

2183
  tscFieldInfoCalOffset(pQueryInfo);
H
hjxilinx 已提交
2184 2185
  
  tfree(pTableMeta);
H
hzcheng 已提交
2186 2187 2188 2189
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2190
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2191 2192 2193
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2194
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2195
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2196 2197
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2198 2199 2200
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2201
//  SIpList *    pIpList;
2202
//  char *rsp = pRes->pRsp + sizeof(SCMConnectRsp);
S
slguan 已提交
2203 2204
//  pIpList = (SIpList *)rsp;
//  tscSetMgmtIpList(pIpList);
H
hzcheng 已提交
2205

S
slguan 已提交
2206
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2207 2208 2209 2210 2211 2212 2213 2214
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2215
  STscObj *       pObj = pSql->pTscObj;
2216
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2217

H
hjxilinx 已提交
2218
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2219 2220 2221 2222
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2223
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2224 2225 2226 2227
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2228
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2229

H
hjxilinx 已提交
2230 2231
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2232 2233 2234 2235 2236 2237 2238 2239
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2240 2241
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2242
   */
H
hjxilinx 已提交
2243 2244
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2245

H
hjxilinx 已提交
2246 2247 2248
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2249 2250 2251 2252 2253 2254
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2255
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2256

H
hjxilinx 已提交
2257 2258
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2259 2260 2261
    return 0;
  }

H
hjxilinx 已提交
2262 2263
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2264

H
hjxilinx 已提交
2265
  if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2266
    bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
H
hzcheng 已提交
2267

H
hjxilinx 已提交
2268 2269
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2270

2271
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2272
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2273
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2288
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2289 2290 2291
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2292
  pRes->data = NULL;
S
slguan 已提交
2293
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2294 2295 2296 2297
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
2298 2299 2300
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2301
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2302 2303 2304

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2305 2306 2307
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2308
  
2309 2310
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
2311

weixin_48148422's avatar
weixin_48148422 已提交
2312
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2313 2314 2315 2316 2317
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
    
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2318 2319
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2320
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2321
    p += sizeof(int32_t);
S
slguan 已提交
2322
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2323 2324 2325 2326
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2327
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2328
    }
2329 2330
  }

H
hzcheng 已提交
2331
  pRes->row = 0;
S
slguan 已提交
2332
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
2333 2334 2335 2336 2337

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2338 2339
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2340
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2341

S
slguan 已提交
2342
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2343 2344 2345

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2346

2347
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2348 2349 2350 2351
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2352
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2353

H
hjxilinx 已提交
2354
static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2355 2356 2357 2358 2359
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
    tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
2360

H
hzcheng 已提交
2361 2362 2363
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2364

2365
  tscAddSubqueryInfo(&pNew->cmd);
2366 2367 2368 2369

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2370
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
S
slguan 已提交
2371 2372 2373
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("%p malloc failed for payload to get meter meta", pSql);
    free(pNew);
2374

S
slguan 已提交
2375 2376 2377
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
2378
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2379
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2380

H
hjxilinx 已提交
2381
  strcpy(pNewMeterMetaInfo->name, pTableMetaInfo->name);
2382
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
2383
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
2384

H
hjxilinx 已提交
2385 2386
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2387

H
hjxilinx 已提交
2388 2389 2390
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2391 2392 2393 2394 2395
  }

  return code;
}

H
hjxilinx 已提交
2396
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2397
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2398

H
hjxilinx 已提交
2399 2400 2401
  // If this STableMetaInfo owns a metermeta, release it first
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2402 2403
  }
  
H
hjxilinx 已提交
2404 2405
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2406
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
2407 2408
    tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns,
             tinfo.numOfTags);
H
hzcheng 已提交
2409 2410 2411 2412 2413 2414

    return TSDB_CODE_SUCCESS;
  }

  /*
   * for async insert operation, release data block buffer before issue new object to get metermeta
H
hjxilinx 已提交
2415
   * because in table meta callback function, the tscParse function will generate the submit data blocks
H
hzcheng 已提交
2416
   */
H
hjxilinx 已提交
2417
  return doGetMeterMetaFromServer(pSql, pTableMetaInfo);
H
hzcheng 已提交
2418 2419
}

H
hjxilinx 已提交
2420
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2421
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2422
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2423 2424 2425 2426
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
2427
 *
H
hzcheng 已提交
2428 2429 2430 2431 2432
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
2433
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
2434 2435 2436 2437 2438 2439 2440
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
2441
 * @param tableId       meter id
H
hzcheng 已提交
2442 2443
 * @return              status code
 */
S
slguan 已提交
2444
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
2445 2446
  int code = 0;

H
hzcheng 已提交
2447 2448
  // handle metric meta renew process
  SSqlCmd *pCmd = &pSql->cmd;
2449 2450

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2451
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2452 2453

  /*
S
slguan 已提交
2454
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
2455 2456
   * 2. if get metermeta failed, still get the metermeta
   */
H
hjxilinx 已提交
2457 2458
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnMetric(pCmd)) {
    if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2459
      tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2460
               pTableMetaInfo->numOfTags, pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2461
    }
2462

2463
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
2464
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2465

H
hjxilinx 已提交
2466
    code = doGetMeterMetaFromServer(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
2467
  } else {
H
hjxilinx 已提交
2468
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2469 2470
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2471 2472 2473 2474 2475
  }

  return code;
}

2476
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
2477 2478
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2479 2480

  /*
2481
   * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
H
hzcheng 已提交
2482
   */
2483
  bool    required = false;
2484

2485
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2486
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
2487 2488
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2489
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2490
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
S
slguan 已提交
2491

H
hjxilinx 已提交
2492
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
S
slguan 已提交
2493

2494
    SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
S
slguan 已提交
2495
    if (ppMeta == NULL) {
2496
      required = true;
S
slguan 已提交
2497 2498
      break;
    } else {
H
hjxilinx 已提交
2499
      pTableMetaInfo->pMetricMeta = ppMeta;
S
slguan 已提交
2500 2501
    }
  }
H
hzcheng 已提交
2502

2503 2504
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
2505 2506 2507
    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
2508
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2509 2510 2511 2512
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  pNew->cmd.command = TSDB_SQL_METRIC;
2513 2514
  
  SQueryInfo *pNewQueryInfo = NULL;
2515 2516 2517
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
2518
  
2519
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2520
    STableMetaInfo *pMMInfo = tscGetMetaInfo(pQueryInfo, i);
2521

H
hjxilinx 已提交
2522 2523
    STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
    tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
2524 2525 2526 2527 2528 2529
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2530

2531
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
2532

2533 2534
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
2535

2536 2537
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
2538 2539 2540 2541 2542
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
2543

2544 2545 2546 2547
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
2548 2549

  tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
2550 2551 2552 2553 2554
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2555 2556 2557 2558 2559
  }

  return code;
}

2560
void tscInitMsgsFp() {
S
slguan 已提交
2561 2562 2563
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
2564 2565

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2566
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2567

2568 2569
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2570 2571

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
2572
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropAcctMsg;
H
hzcheng 已提交
2573 2574 2575
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2576
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2577 2578 2579
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2580 2581 2582 2583 2584
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2585
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hzcheng 已提交
2586
  tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg;
S
slguan 已提交
2587
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2588 2589 2590 2591

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2592 2593 2594
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2595 2596 2597 2598 2599 2600 2601 2602

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2603
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hzcheng 已提交
2604
  tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
S
slguan 已提交
2605
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2606 2607

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
2608
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
2609
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2610

H
hzcheng 已提交
2611
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
2612 2613 2614 2615 2616
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
2617

H
hzcheng 已提交
2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;

  tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg;
  tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg;
}