tscServer.c 88.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tcache.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
21
#include "tscSubquery.h"
H
hzcheng 已提交
22 23 24 25 26 27 28
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
29
#include "tscLog.h"
H
hzcheng 已提交
30 31 32

#define TSC_MGMT_VNODE 999

S
slguan 已提交
33
SRpcIpSet  tscMgmtIpSet;
S
slguan 已提交
34 35
SRpcIpSet  tscDnodeIpSet;

36 37
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
38 39 40
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
41

42 43 44
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
45

S
slguan 已提交
46
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
47

48
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
49 50
  SRpcIpSet* pIpList = &pSql->ipList;
  
51
  pIpList->numOfIps = pVgroupInfo->numOfIps;
52 53
  pIpList->inUse    = 0;
  
54 55 56
  for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
    strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
    pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
57 58 59
  }
}

S
slguan 已提交
60
void tscPrintMgmtIp() {
S
slguan 已提交
61 62
  if (tscMgmtIpSet.numOfIps <= 0) {
    tscError("invalid mgmt IP list:%d", tscMgmtIpSet.numOfIps);
S
slguan 已提交
63
  } else {
S
slguan 已提交
64
    for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
65
      tscTrace("mgmt index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
66
    }
S
slguan 已提交
67 68 69
  }
}

S
slguan 已提交
70
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
S
slguan 已提交
71 72 73
  tscMgmtIpSet.numOfIps = pIpList->numOfIps;
  tscMgmtIpSet.inUse = pIpList->inUse;
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
74
    tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
S
slguan 已提交
75 76 77 78
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
79 80 81
  if (tscMgmtIpSet.numOfIps != 1) {
    tscMgmtIpSet.numOfIps = 1;
    tscMgmtIpSet.inUse = 0;
H
hjxilinx 已提交
82
    taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]);
S
slguan 已提交
83 84 85 86 87
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
88
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
S
slguan 已提交
89
  tscMgmtIpSet = *pIpSet;
S
slguan 已提交
90 91 92 93
  tscTrace("mgmt IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
    tscTrace("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
  }
S
slguan 已提交
94 95
}

S
slguan 已提交
96
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
97 98 99 100 101 102 103 104
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
105 106 107
  }
}

H
hjxilinx 已提交
108 109 110 111 112 113 114
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
115
UNUSED_FUNC
H
hjxilinx 已提交
116 117
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
S
slguan 已提交
118
  return tscMgmtIpSet.numOfIps * factor;
H
hjxilinx 已提交
119 120
}

H
hzcheng 已提交
121 122 123 124 125 126 127 128 129 130 131 132
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
133
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
134
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
135
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
136

H
hzcheng 已提交
137 138 139
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
140 141
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
142 143
    }
  } else {
H
hjxilinx 已提交
144
    tscTrace("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
158 159 160
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
161
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
162
    
163 164 165 166
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
167
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
168 169 170 171 172
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

173
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
174 175 176 177
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
178
    tscAddSubqueryInfo(&pObj->pHb->cmd);
179

S
slguan 已提交
180
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
181 182 183
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
184
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
185 186 187 188 189 190 191 192 193
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
194
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
195 196 197
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
198
  if (NULL == pMsg) {
S
slguan 已提交
199 200
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
201 202
  }

S
slguan 已提交
203
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
204
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port[0]);
S
slguan 已提交
205
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
206 207 208 209 210 211

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
212
      .code    = 0
213
    };
H
hjxilinx 已提交
214
    rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
215
  } else {
S
slguan 已提交
216
    pSql->ipList = tscMgmtIpSet;
S
slguan 已提交
217
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
218 219 220 221 222 223 224
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
J
jtao1735 已提交
225
    tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
226
    rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg);
H
hzcheng 已提交
227 228
  }

S
slguan 已提交
229
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
230 231
}

232 233
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
234 235 236 237 238
  if (pSql == NULL) {
    tscError("%p sql is already released", pSql->signature);
    return;
  }
  if (pSql->signature != pSql) {
H
hzcheng 已提交
239
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
240
    return;
H
hzcheng 已提交
241 242
  }

S
slguan 已提交
243 244 245
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
246
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
247 248

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
249 250
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
251
    tscFreeSqlObj(pSql);
252
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
253
    return;
H
hzcheng 已提交
254 255
  }

256 257
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
258
  } else {
259
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
260 261
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
S
slguan 已提交
262
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE ||
263
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
264 265 266 267 268 269 270 271 272 273 274
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
275 276
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
277 278
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
279 280
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
281 282
        return;
      } else {
283
        tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
284
        
285
        pSql->res.code = rpcMsg->code;  // keep the previous error code
286 287 288 289 290 291 292 293
        if (pSql->retry > pSql->maxRetry) {
          tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
        } else {
          rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
          if (pTableMetaInfo->pTableMeta) {
            tscSendMsgToServer(pSql);
          }
  
294
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
295 296
          return;
        }
H
hzcheng 已提交
297 298
      }
    }
S
slguan 已提交
299
  }
300
  
H
hzcheng 已提交
301
  pRes->rspLen = 0;
302
  
H
hzcheng 已提交
303
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
304
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
305
  } else {
H
hjxilinx 已提交
306
    tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
307 308
  }

S
slguan 已提交
309 310 311 312 313
  if (pRes->code == TSDB_CODE_SUCCESS) {
    tscTrace("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
    pSql->retry = 0;
  }

S
slguan 已提交
314
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
315
    assert(rpcMsg->msgType == pCmd->msgType + 1);
316
    pRes->code    = rpcMsg->code;
317
    pRes->rspType = rpcMsg->msgType;
318
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
319

320
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
321 322 323 324 325
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
        pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      } else {
        pRes->pRsp = tmp;
326
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
327
      }
328 329
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
330 331
    }

H
hzcheng 已提交
332 333 334 335
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
336
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
337
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
338 339 340 341 342 343 344
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
345
      tscTrace("%p cmd:%d code:%s, inserted rows:%d, rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code),
H
hjxilinx 已提交
346
          pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
347
    } else {
H
hjxilinx 已提交
348
      tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
349 350
    }
  }
351
  
352 353
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hjxilinx 已提交
354
  
355
  if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
356
    void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
H
[td-32]  
hjxilinx 已提交
357
    rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
358
    
359
    tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
H
hzcheng 已提交
360

361 362
    /*
     * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
H
hjxilinx 已提交
363 364
     * may be freed in UDF, and reused by other threads before tscShouldBeFreed called, in which case
     * tscShouldBeFreed checks an object which is actually allocated by other threads.
365 366
     *
     * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
H
hjxilinx 已提交
367
     * the tscShouldBeFreed will success and tscFreeSqlObj free it immediately.
368
     */
H
hjxilinx 已提交
369
    bool shouldFree = tscShouldBeFreed(pSql);
H
hjxilinx 已提交
370
    (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
371

372
    if (shouldFree) {
373
      tscTrace("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
374
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
375 376 377
    }
  }

378
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
379 380
}

S
slguan 已提交
381 382 383
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
384 385
  int32_t code = TSDB_CODE_SUCCESS;
  
H
hjxilinx 已提交
386 387 388 389 390 391 392
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
393
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
394 395 396 397 398 399
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
400
  }
401

402
  code = tscSendMsgToServer(pSql);
403
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
404
    pRes->code = code;
H
hjxilinx 已提交
405
    tscQueueAsyncRes(pSql);
S
slguan 已提交
406
  }
H
hjxilinx 已提交
407 408
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
409 410 411
}

int tscProcessSql(SSqlObj *pSql) {
412 413
  char *   name = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
414 415
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
416
  STableMetaInfo *pTableMetaInfo = NULL;
417
  uint16_t        type = 0;
418

419
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
420
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
421 422
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
423
    }
424

425
    type = pQueryInfo->type;
426
  
H
hjxilinx 已提交
427
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
428
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
429
  }
430

431
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hjxilinx 已提交
432
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
433
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
434 435 436
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
H
hjxilinx 已提交
437 438
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
    pSql->ipList = tscMgmtIpSet; //?
H
hzcheng 已提交
439 440 441 442
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

H
hjxilinx 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
//  if (QUERY_IS_JOIN_QUERY(type)) {
//    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
//      return tscHandleMasterJoinQuery(pSql);
//    } else {
//      // for first stage sub query, iterate all vnodes to get all timestamp
//      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
//        return doProcessSql(pSql);
//      }
//    }
//  }
//
//  if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
//    tscHandleMasterSTableQuery(pSql);
//    return pRes->code;
//  } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) {  // multi-vnodes insertion
//    tscHandleMultivnodeInsert(pSql);
//    return pRes->code;
//  }
461
  
S
slguan 已提交
462 463
  return doProcessSql(pSql);
}
H
hzcheng 已提交
464

H
hjxilinx 已提交
465
void tscKillSTableQuery(SSqlObj *pSql) {
466 467 468
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
469
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
470 471 472 473 474 475
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
476
    if (pSub == NULL) {
H
hzcheng 已提交
477 478
      continue;
    }
S
slguan 已提交
479

H
hzcheng 已提交
480 481 482 483 484
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
485
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
486 487 488 489 490 491 492 493 494 495 496
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
497
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
498 499 500 501 502 503
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

H
hjxilinx 已提交
504
  tscTrace("%p super table query cancelled", pSql);
H
hzcheng 已提交
505 506
}

507
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
508 509 510 511 512
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

513
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
514
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
515 516
  pMsg += sizeof(pSql->res.qhandle);

517
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
518
  pRetrieveMsg->free = htons(pQueryInfo->type);
519
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
520

521 522 523
  // todo valid the vgroupId at the client side
  if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) {
    SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList;
H
hjxilinx 已提交
524
    assert(pVgroupInfo->numOfVgroups == 1); // todo fix me
525
    
H
hjxilinx 已提交
526
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[0].vgId);
527 528
  } else {
    STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
H
hjxilinx 已提交
529
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
530 531
  }
  
532 533 534 535
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
S
slguan 已提交
536
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
537
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
538 539
}

540
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
541
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
542
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
543
  
544 545 546 547
  char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
H
hjxilinx 已提交
548
  
549
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
550 551
  
  pMsgDesc->numOfVnodes = htonl(1);       //todo set the right number of vnodes
552
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
553
  
554
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
555
  int32_t vgId = pTableMeta->vgroupInfo.vgId;
556
  
H
hjxilinx 已提交
557
  pShellMsg->header.vgId = htonl(vgId);
558
  pShellMsg->header.contLen = htonl(size);
559
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
560
  
561
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
562

H
hjxilinx 已提交
563
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
564
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
565
  tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
566
  
S
slguan 已提交
567
  tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps);
568
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
569 570 571 572 573 574
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
575
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
576
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
577
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
578

579
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
580 581 582 583
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
H
hjxilinx 已提交
584
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
585 586

  // meter query without tags values
H
hjxilinx 已提交
587
  if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
S
slguan 已提交
588
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
589
  }
H
hjxilinx 已提交
590 591
  
  int32_t size = 4096;
S
slguan 已提交
592
  return size;
H
hzcheng 已提交
593 594
}

595
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
596
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
597

H
hjxilinx 已提交
598
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
    
    SCMVgroupInfo* pVgroupInfo = NULL;
    if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
      pVgroupInfo = &pTableMeta->vgroupInfo;
    } else {
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
      
      pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
    }
    
    tscSetDnodeIpList(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
    
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->tid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
H
hjxilinx 已提交
619
  
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    
    pMsg += sizeof(STableIdInfo);
  } else {
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
  
    tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
    
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
    
    // set the vgroup info
    tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
//      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
      pMsg += sizeof(STableIdInfo);
    }
  }
  
  tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->uid);
H
hjxilinx 已提交
653
  
654 655 656
  return pMsg;
}

657
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
658 659
  SSqlCmd *pCmd = &pSql->cmd;

660
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
661

S
slguan 已提交
662 663 664 665
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }
666
  
667
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
668
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
669
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
670
  
H
hjxilinx 已提交
671
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
672 673 674
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
675 676 677 678 679 680 681 682 683 684
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
685 686

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
687

S
slguan 已提交
688
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
689

690
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
691
  
692
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
693 694
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
695
  } else {
H
hjxilinx 已提交
696 697
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
698 699
  }

700 701 702 703 704
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
  pQueryMsg->interpoType    = htons(pQueryInfo->interpoType);
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
705
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
706 707
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
708
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
709
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
710
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
711
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
712
  pQueryMsg->queryType      = htons(pQueryInfo->type);
H
hjxilinx 已提交
713 714 715
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hjxilinx 已提交
716 717
  if (numOfOutput < 0) {
    tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
H
hzcheng 已提交
718 719 720 721
    return -1;
  }

  // set column list ids
722 723
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
724
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
725
  
726 727 728
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
729

730
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
731 732 733 734 735
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
               pColSchema->name);

736
      return TSDB_CODE_INVALID_SQL;
737
    }
H
hzcheng 已提交
738 739 740

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
741
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
742
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
743

S
slguan 已提交
744 745 746
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
747

S
slguan 已提交
748
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
749
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
750 751

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
752

753
      if (pColFilter->filterstr) {
S
slguan 已提交
754 755 756 757 758 759 760 761 762 763
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
764

S
slguan 已提交
765 766 767 768 769
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
770 771
  }

H
hjxilinx 已提交
772
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
773
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
774
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
775

H
hjxilinx 已提交
776
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
777
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
778 779 780 781
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

782 783 784
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
785

786
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
787
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
788
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
789 790 791 792 793 794 795

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
796
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
797 798 799 800 801
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
802
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
803
  }
804
  
805
  // serialize the table info (sid, uid, tags)
806 807
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
808
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
809
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
810
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
811 812
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
813
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
814 815
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
816 817 818
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

819 820
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
821 822 823

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
824 825 826
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
827 828 829
    }
  }

830
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
H
hjxilinx 已提交
831
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
832 833
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
834 835
    }
  }
836 837 838 839 840 841 842 843 844
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
845
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
                 pCol->colIndex, pColSchema->name);

        return TSDB_CODE_INVALID_SQL;
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
867 868 869 870 871 872

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

873
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
874
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
875
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
876 877

    // todo refactor
878 879
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
880 881 882 883

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
884 885
  }

S
slguan 已提交
886 887
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
888 889
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
890 891
  }

H
hjxilinx 已提交
892
  // serialize tag column query condition
893
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
894 895
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
896
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
897
    if (pCond != NULL && pCond->cond != NULL) {
898 899
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
900
      
901
      pMsg += pCond->len;
902 903 904
    }
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
905 906 907 908 909 910 911 912
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

913
  int32_t msgLen = pMsg - pStart;
H
hzcheng 已提交
914 915 916

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
917
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
918
  
919
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
920
  assert(msgLen + minMsgSize() <= size);
921 922

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
923 924
}

925 926
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
927
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
928
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
929

930
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
931

932
  assert(pCmd->numOfClause == 1);
933
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
934
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
935

936
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
937 938
}

939 940
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
941
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
942 943 944 945
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
946

947
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
948 949
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
950
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
951

952
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
953 954
}

955 956
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
957
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
958 959 960 961
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
962

963
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
964

965 966
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
967

968 969
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
970

971
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
972

973 974 975 976 977 978 979 980
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
981

982 983 984 985 986 987 988 989 990 991 992 993 994
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
995

S
slguan 已提交
996
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
997
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
998 999
}

1000 1001
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1002
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
1003

S
slguan 已提交
1004 1005 1006 1007 1008
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1009
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
1010

1011 1012 1013
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
1014

1015 1016 1017 1018
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1019 1020
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1021
  }
H
hzcheng 已提交
1022

1023
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1024
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1025
  } else {
S
slguan 已提交
1026
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1027
  }
H
hzcheng 已提交
1028

1029
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1030 1031
}

1032 1033
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1034
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
1035

S
slguan 已提交
1036 1037 1038 1039
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1040

S
slguan 已提交
1041
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
1042 1043
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1044

1045 1046
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1047
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1048

S
slguan 已提交
1049 1050 1051 1052 1053
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1054
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1055

1056
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1057
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
1058
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1059

S
slguan 已提交
1060
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1061
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1062 1063
}

1064 1065
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1066
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1067

S
slguan 已提交
1068 1069 1070 1071
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1072

1073
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1074
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1075
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1076
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1077

S
slguan 已提交
1078
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1079
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1080 1081
}

1082
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1083
  SSqlCmd *pCmd = &pSql->cmd;
1084
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1085 1086 1087 1088
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1089

1090
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1091
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
J
jtao1735 已提交
1092
  strcpy(pDrop->ep, pTableMetaInfo->name);
S
slguan 已提交
1093
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1094

1095
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1096 1097
}

S
[TD-16]  
slguan 已提交
1098
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1099
  SSqlCmd *pCmd = &pSql->cmd;
1100
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1101
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1102

S
slguan 已提交
1103 1104 1105 1106
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1107

1108
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1109
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1110
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1111

1112
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1113 1114
}

S
[TD-16]  
slguan 已提交
1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDropMsg->user, pTableMetaInfo->name);

  return TSDB_CODE_SUCCESS;
}

1132 1133
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1134
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1135

S
slguan 已提交
1136 1137 1138 1139
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1140

1141
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1142
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1143
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1144
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1145

1146
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1147 1148
}

1149
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1150
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1151
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1152
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1153
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1154

S
slguan 已提交
1155 1156 1157
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1158
  }
H
hzcheng 已提交
1159

1160
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1161

1162
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1163
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1164
  if (nameLen > 0) {
H
hjxilinx 已提交
1165
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1166
  } else {
S
slguan 已提交
1167
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1168 1169
  }

1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1182

1183 1184 1185 1186
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1187
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1188
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1189 1190
}

1191
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1192
  SSqlCmd *pCmd = &pSql->cmd;
1193
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1194

S
slguan 已提交
1195 1196 1197 1198
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1199

1200
  SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
1201 1202 1203
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1204
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1205 1206
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1207
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1208 1209
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1210
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1211 1212 1213
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1214 1215
}

1216
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1217 1218
  SSqlCmd *pCmd = &(pSql->cmd);

1219
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1220

1221
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1222
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1223 1224
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1225
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1226
  }
1227

1228 1229 1230
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1231 1232 1233 1234

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1235
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1236
  int              msgLen = 0;
S
slguan 已提交
1237
  SSchema *        pSchema;
H
hzcheng 已提交
1238
  int              size = 0;
1239 1240 1241
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1242
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1243 1244

  // Reallocate the payload size
1245
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1246 1247
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1248
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1249
  }
H
hzcheng 已提交
1250 1251


1252
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1253
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1254 1255

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1256
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1257

1258 1259 1260
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1261 1262 1263 1264
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1265
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1266

1267 1268 1269
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
1270
    pMsg += sizeof(STagData);
1271
  } else {  // create (super) table
1272
    pSchema = (SSchema *)pCreateTableMsg->schema;
1273

H
hzcheng 已提交
1274
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1275
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1276 1277 1278 1279

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1280

H
hzcheng 已提交
1281 1282 1283 1284
      pSchema++;
    }

    pMsg = (char *)pSchema;
1285 1286
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1287

1288 1289 1290
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1291 1292 1293
    }
  }

H
hjxilinx 已提交
1294
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1295

S
slguan 已提交
1296
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1297
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1298
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1299
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1300 1301

  assert(msgLen + minMsgSize() <= size);
1302
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1303 1304 1305
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1306
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1307
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1308 1309 1310
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1311
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1312
  SCMAlterTableMsg *pAlterTableMsg;
1313
  char *          pMsg;
H
hzcheng 已提交
1314 1315 1316
  int             msgLen = 0;
  int             size = 0;

1317 1318 1319
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1320
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1321 1322

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1323 1324 1325 1326
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1327

1328
  pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
1329

H
hjxilinx 已提交
1330
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1331

1332 1333
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hjxilinx 已提交
1334
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1335
  pAlterTableMsg->type = htons(pAlterInfo->type);
1336

S
slguan 已提交
1337
  pAlterTableMsg->numOfCols = tscNumOfFields(pQueryInfo);
1338
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
1339

S
slguan 已提交
1340
  SSchema *pSchema = pAlterTableMsg->schema;
S
slguan 已提交
1341
  for (int i = 0; i < pAlterTableMsg->numOfCols; ++i) {
H
hjxilinx 已提交
1342
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
1352
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
1353
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1354
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1355 1356

  assert(msgLen + minMsgSize() <= size);
1357

1358
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1359 1360
}

1361
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1362
  SSqlCmd *pCmd = &pSql->cmd;
1363
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1364
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1365

1366
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1367
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1368
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
1369

1370
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1371 1372
}

1373
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1374 1375 1376
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1377

S
slguan 已提交
1378 1379 1380
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
1381
  }
S
slguan 已提交
1382

S
slguan 已提交
1383 1384 1385 1386
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1387

1388
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1389 1390
}

1391
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1392
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1393 1394 1395
    return pRes->code;
  }

H
hjxilinx 已提交
1396
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1397
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1398
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1413

1414
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1415

H
hzcheng 已提交
1416 1417 1418 1419 1420 1421 1422
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1423
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1424
  } else {
S
slguan 已提交
1425
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1441
  SSqlCmd *       pCmd = &pSql->cmd;
1442
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1443

H
hjxilinx 已提交
1444
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1445 1446
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1447 1448 1449 1450
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
1451
//  SSqlCmd *pCmd = &pSql->cmd;
1452

1453 1454
//  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1455 1456

  int32_t numOfRes = 0;
H
hjxilinx 已提交
1457
#if 0
1458
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
H
hjxilinx 已提交
1459
    numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
1460 1461 1462
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
H
hjxilinx 已提交
1463 1464 1465
  
#endif

H
hzcheng 已提交
1466 1467 1468
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1469
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1470 1471 1472
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1473
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1474
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1475 1476

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1477
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1478 1479 1480 1481
  }

  pRes->row = 0;

1482
  uint8_t code = pRes->code;
H
hjxilinx 已提交
1483 1484 1485 1486
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1487 1488 1489 1490 1491
  }

  return code;
}

S
slguan 已提交
1492
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1493

1494
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1495
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1496
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1497
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1498
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1499

S
slguan 已提交
1500 1501 1502 1503 1504
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1505
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1506 1507 1508 1509 1510

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
1511
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
1512
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
1513

1514
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1515 1516
}

H
hjxilinx 已提交
1517
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1518
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1519
  char *         pMsg;
H
hzcheng 已提交
1520 1521 1522 1523 1524
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
1525 1526 1527 1528
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
1529 1530 1531 1532
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

1533 1534 1535
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1536
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1537

1538
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1539
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1540
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1541

1542
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1543

H
hjxilinx 已提交
1544
  if (pSql->cmd.autoCreated) {
H
hzcheng 已提交
1545 1546 1547 1548
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

H
hjxilinx 已提交
1549
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
S
slguan 已提交
1550
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1551 1552 1553 1554

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1555
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1556 1557
}

S
slguan 已提交
1558
/**
1559
 *  multi table meta req pkg format:
1560
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1561 1562
 *      no used         4B
 **/
1563
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1576
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1577

1578
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1579
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1580 1581

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1582
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1583 1584 1585 1586
  }

  tfree(tmpData);

1587
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1588
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1589 1590 1591

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1592
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1593 1594 1595 1596 1597
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hjxilinx 已提交
1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1624

H
hjxilinx 已提交
1625 1626 1627
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {

#if 0
S
slguan 已提交
1628
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1629 1630
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1631
  int             tableIndex = 0;
H
hzcheng 已提交
1632

1633 1634 1635
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1636
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1637

H
hjxilinx 已提交
1638
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1639 1640

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1641 1642 1643 1644
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1645 1646 1647 1648 1649

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
1650
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1651 1652 1653

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1654
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1655
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1656

S
slguan 已提交
1657
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1658 1659 1660 1661 1662

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1663
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1664

S
slguan 已提交
1665
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1666
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1667

S
slguan 已提交
1668 1669 1670
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1671
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1672
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1673 1674 1675 1676

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1677
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1678
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1679
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1680 1681 1682 1683

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1684 1685
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1686 1687 1688 1689

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
1690
      SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
H
hjxilinx 已提交
1691
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1692
        condLen = strlen(pCond->cond) + 1;
1693

H
hjxilinx 已提交
1694
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1695
        if (!ret) {
1696
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
S
slguan 已提交
1697 1698 1699
          return 0;
        }
      }
H
hzcheng 已提交
1700 1701
    }

S
slguan 已提交
1702
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1703

S
slguan 已提交
1704 1705 1706
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1707

S
slguan 已提交
1708 1709 1710
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1711

S
slguan 已提交
1712
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1713 1714 1715 1716 1717 1718 1719
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1720 1721
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1722 1723
    }

1724
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1725

H
hjxilinx 已提交
1726
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1727 1728 1729 1730 1731
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1732 1733
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1734 1735 1736 1737 1738 1739 1740 1741
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1742
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
1743 1744
          SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
          SColIndex *pDestCol = (SColIndex *)pMsg;
1745

H
hjxilinx 已提交
1746
          pDestCol->colIdxInBuf = 0;
1747
          pDestCol->colIndex = htons(pCol->colIndex);
H
hjxilinx 已提交
1748 1749
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1750
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1751

1752
          pMsg += sizeof(SColIndex);
S
slguan 已提交
1753 1754
        }
      }
H
hzcheng 已提交
1755
    }
S
slguan 已提交
1756

H
hjxilinx 已提交
1757 1758
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1759 1760 1761

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1762 1763 1764 1765
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1766
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hzcheng 已提交
1767
  assert(msgLen + minMsgSize() <= size);
H
hjxilinx 已提交
1768
#endif
1769
  
H
hjxilinx 已提交
1770
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    strncpy(pMsg, pTableMetaInfo->name, TSDB_TABLE_ID_LEN);
    pMsg += TSDB_TABLE_ID_LEN;
  }
H
hjxilinx 已提交
1784 1785

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1786
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1787

1788
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1789 1790
}

1791
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
1792 1793 1794 1795
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
1796
  size += sizeof(SQqueryList);
H
hzcheng 已提交
1797 1798 1799

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
1800
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
1801 1802 1803
    tpSql = tpSql->next;
  }

S
slguan 已提交
1804
  size += sizeof(SStreamList);
H
hzcheng 已提交
1805 1806
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
1807
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
1808 1809 1810 1811 1812 1813
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1814
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1815 1816 1817 1818 1819 1820 1821 1822 1823
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

1824
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
1825
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1826
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1827 1828 1829
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1843
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1844 1845

  assert(msgLen + minMsgSize() <= size);
1846
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1847 1848
}

1849 1850
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1851

1852 1853
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
H
hjxilinx 已提交
1854 1855 1856
  
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1857 1858
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1859
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1860

H
hjxilinx 已提交
1861 1862
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
H
hzcheng 已提交
1863 1864 1865
    return TSDB_CODE_INVALID_VALUE;
  }

1866 1867
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
H
hzcheng 已提交
1868 1869 1870
    return TSDB_CODE_INVALID_VALUE;
  }

1871 1872
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
H
hzcheng 已提交
1873 1874 1875
    return TSDB_CODE_INVALID_VALUE;
  }

H
hjxilinx 已提交
1876 1877
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1878 1879
  }

1880
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1881

1882
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1883 1884 1885
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
H
hjxilinx 已提交
1886 1887
    
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1888 1889 1890
    pSchema++;
  }

1891 1892
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
H
hzcheng 已提交
1893

1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904
#if 0
  // if current table is created according to super table, get the table meta of super table
  if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
    char id[TSDB_TABLE_ID_LEN + 1] = {0};
    strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
  
    // NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
    pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
  }
#endif
  
H
hzcheng 已提交
1905
  // todo add one more function: taosAddDataIfNotExists();
1906
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1907
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1908

H
hjxilinx 已提交
1909 1910 1911
  pTableMetaInfo->pTableMeta =
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer);
  
1912
  // todo handle out of memory case
1913
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hjxilinx 已提交
1914
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
1915
  }
H
hzcheng 已提交
1916

1917
  free(pTableMeta);
1918
  
H
hjxilinx 已提交
1919
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1920 1921
}

S
slguan 已提交
1922
/**
1923
 *  multi table meta rsp pkg format:
1924
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1925 1926 1927
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1928
#if 0
S
slguan 已提交
1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

1941
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1942
  totalNum = htonl(pInfo->numOfTables);
1943
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1944 1945

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1946
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1947
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1948 1949 1950

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1951
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1952 1953
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1954 1955
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
S
slguan 已提交
1956 1957 1958 1959 1960
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
H
hjxilinx 已提交
1984
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
    //  }
S
slguan 已提交
2021
  }
H
hjxilinx 已提交
2022
  
S
slguan 已提交
2023 2024 2025
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
2026 2027
#endif
  
S
slguan 已提交
2028 2029 2030
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
2031 2032
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
S
slguan 已提交
2033 2034
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;
H
hjxilinx 已提交
2035
  
S
slguan 已提交
2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
2052
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
2053 2054

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
2055
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
2056

S
slguan 已提交
2057
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
2058 2059 2060
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

2061
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
H
hzcheng 已提交
2062

2063 2064
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2065 2066 2067
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2068

S
slguan 已提交
2069
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
2070
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2071

S
slguan 已提交
2072
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2073 2074
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2075

S
slguan 已提交
2076
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2077

S
slguan 已提交
2078 2079
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2080
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2081

2082 2083
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2084

S
slguan 已提交
2085
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2086

2087
      pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2088
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2089

2090
      size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2091
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2092 2093
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2094

2095 2096
        ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
        ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
2097

2098 2099
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2100
      }
H
hzcheng 已提交
2101
    }
S
slguan 已提交
2102

2103
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2104 2105
  }

2106
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2107 2108 2109
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2110
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2111
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2112

S
slguan 已提交
2113 2114 2115
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2116

S
slguan 已提交
2117
    // release the used metricmeta
H
hjxilinx 已提交
2118 2119
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2120 2121 2122 2123
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2124
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2125 2126 2127
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2128 2129
  }

S
slguan 已提交
2130 2131 2132 2133 2134 2135 2136 2137
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);
H
hjxilinx 已提交
2138
#endif
2139
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
2140
  
H
hjxilinx 已提交
2141
  // NOTE: the order of several table must be preserved.
2142
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
2143 2144
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
2145
  
2146 2147 2148
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
2149
  
2150
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];

      pVgroups->vgId = htonl(pVgroups->vgId);
      assert(pVgroups->numOfIps >= 1);

      for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
        pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
      }

      pMsg += size;
2172
    }
H
hjxilinx 已提交
2173 2174
  }
  
S
slguan 已提交
2175
  return pSql->res.code;
H
hzcheng 已提交
2176 2177 2178 2179 2180 2181
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2182
  STableMetaMsg * pMetaMsg;
2183
  SCMShowRsp *pShow;
S
slguan 已提交
2184
  SSchema *    pSchema;
H
hzcheng 已提交
2185 2186
  char         key[20];

2187 2188 2189
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2190
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2191

H
hjxilinx 已提交
2192
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2193

2194
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2195
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2196 2197
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2198
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2199
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2200

H
hjxilinx 已提交
2201
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2202

H
hjxilinx 已提交
2203
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2204 2205
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2206 2207 2208 2209
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2210
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2211 2212
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2213
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2214
  
H
hjxilinx 已提交
2215 2216 2217
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2218
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsMeterMetaKeepTimer);
H
hjxilinx 已提交
2219
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2220

2221 2222 2223 2224
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
2225 2226
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2227
  SColumnIndex index = {0};
H
hjxilinx 已提交
2228 2229 2230
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2231
    index.columnIndex = i;
2232 2233
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2234 2235
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2236
    
H
hjxilinx 已提交
2237
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
2238
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
2239
  }
H
hjxilinx 已提交
2240 2241
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2242
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
2243 2244
  
  tfree(pTableMeta);
H
hzcheng 已提交
2245 2246 2247 2248
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2249
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2250 2251 2252
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2253
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2254
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2255 2256
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2257 2258 2259
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2260
  tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
2261

S
slguan 已提交
2262
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2263 2264
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
H
hjxilinx 已提交
2265
//  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2266 2267 2268 2269 2270

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2271
  STscObj *       pObj = pSql->pTscObj;
2272
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2273

H
hjxilinx 已提交
2274
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2275 2276 2277 2278
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2279
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2280 2281 2282 2283
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2284
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2285

H
hjxilinx 已提交
2286 2287
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2288 2289 2290 2291 2292 2293 2294 2295
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2296 2297
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2298
   */
H
hjxilinx 已提交
2299 2300
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2301

H
hjxilinx 已提交
2302 2303
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2304 2305 2306 2307 2308 2309
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2310
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2311

H
hjxilinx 已提交
2312 2313
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2314 2315 2316
    return 0;
  }

H
hjxilinx 已提交
2317 2318
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2319

H
hjxilinx 已提交
2320
  if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2321
    bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
H
hzcheng 已提交
2322

H
hjxilinx 已提交
2323
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2324
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2325

2326
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2327
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2328
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2343
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2344 2345 2346
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2347
  pRes->data = NULL;
S
slguan 已提交
2348
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2349 2350 2351
  return 0;
}

H
hjxilinx 已提交
2352
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2353 2354 2355
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2356
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2357 2358 2359

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2360 2361
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2362
  pRes->completed = (pRetrieve->completed == 1);
2363
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2364
  
2365 2366
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
2367

weixin_48148422's avatar
weixin_48148422 已提交
2368
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2369
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2370
    
H
hjxilinx 已提交
2371
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2372 2373
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2374 2375
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2376
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2377
    p += sizeof(int32_t);
S
slguan 已提交
2378
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2379 2380 2381 2382
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2383
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2384
    }
2385 2386
  }

H
hzcheng 已提交
2387
  pRes->row = 0;
S
slguan 已提交
2388
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
2389 2390 2391 2392 2393

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2394 2395
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2396
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2397

S
slguan 已提交
2398
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2399 2400 2401

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2402

2403
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2404 2405 2406 2407
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2408
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2409

2410
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2411 2412
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2413
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
S
slguan 已提交
2414 2415
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
2416

H
hzcheng 已提交
2417 2418 2419
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2420

2421
  tscAddSubqueryInfo(&pNew->cmd);
2422 2423 2424 2425

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2426
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
S
slguan 已提交
2427
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
2428
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2429
    free(pNew);
2430

S
slguan 已提交
2431 2432 2433
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
2434
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2435
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2436

2437
  strncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, tListLen(pNewMeterMetaInfo->name));
2438
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
2439
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
2440

H
hjxilinx 已提交
2441 2442
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2443

H
hjxilinx 已提交
2444 2445 2446
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2447 2448 2449 2450 2451
  }

  return code;
}

H
hjxilinx 已提交
2452
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2453
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2454

2455
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2456 2457
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2458 2459
  }
  
H
hjxilinx 已提交
2460 2461
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2462
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2463 2464
    tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2465 2466 2467

    return TSDB_CODE_SUCCESS;
  }
2468 2469
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2470 2471
}

H
hjxilinx 已提交
2472
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2473
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2474
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2475 2476 2477 2478
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
2479
 *
H
hzcheng 已提交
2480 2481 2482 2483 2484
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
2485
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
2486 2487 2488 2489 2490 2491 2492
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
2493
 * @param tableId       meter id
H
hzcheng 已提交
2494 2495
 * @return              status code
 */
S
slguan 已提交
2496
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
2497 2498
  int code = 0;

H
hjxilinx 已提交
2499
  // handle table meta renew process
H
hzcheng 已提交
2500
  SSqlCmd *pCmd = &pSql->cmd;
2501 2502

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2503
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2504 2505

  /*
S
slguan 已提交
2506
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
2507 2508
   * 2. if get metermeta failed, still get the metermeta
   */
2509
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
2510
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
2511
    if (pTableMetaInfo->pTableMeta) {
2512 2513
      tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
               tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2514
    }
2515

2516
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
2517
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2518

2519
    code = getTableMetaFromMgmt(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
2520
  } else {
H
hjxilinx 已提交
2521
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2522 2523
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2524 2525 2526 2527 2528
  }

  return code;
}

H
hjxilinx 已提交
2529
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2530
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2531
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2532
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2533 2534
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2535 2536
    }
  }
H
hjxilinx 已提交
2537 2538 2539 2540
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2541

H
hjxilinx 已提交
2542 2543 2544 2545 2546
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2547 2548
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2549

S
slguan 已提交
2550
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2551 2552 2553
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2554
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2555 2556
  
  SQueryInfo *pNewQueryInfo = NULL;
2557 2558 2559
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
2560
  
H
hjxilinx 已提交
2561
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2562
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2563 2564 2565
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2566 2567 2568 2569 2570 2571
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2572

2573
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hjxilinx 已提交
2574
  tscTrace("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2575

2576 2577 2578 2579 2580
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2581 2582 2583 2584 2585
  }

  return code;
}

2586
void tscInitMsgsFp() {
S
slguan 已提交
2587 2588 2589
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
2590 2591

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2592
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2593

2594 2595
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2596 2597

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2598
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2599 2600 2601
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2602
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2603 2604 2605
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2606 2607 2608 2609 2610
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2611
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2612
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2613
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2614 2615 2616 2617

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2618 2619 2620
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2621 2622

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2623
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2624 2625 2626 2627 2628

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2629
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2630
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2631
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2632 2633

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2634
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2635
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2636

H
hjxilinx 已提交
2637 2638 2639 2640 2641
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
2642

H
hzcheng 已提交
2643 2644
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2645
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2646 2647 2648 2649 2650 2651 2652 2653 2654 2655

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}