tscServer.c 87.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tcache.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
21
#include "tscSubquery.h"
H
hzcheng 已提交
22 23 24 25 26 27 28
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
29
#include "tscLog.h"
H
hzcheng 已提交
30 31 32

#define TSC_MGMT_VNODE 999

S
slguan 已提交
33
SRpcIpSet  tscMgmtIpSet;
S
slguan 已提交
34 35
SRpcIpSet  tscDnodeIpSet;

36 37
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
38 39 40
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
41

42 43 44
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
45

S
slguan 已提交
46
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
47

48
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
49 50
  SRpcIpSet* pIpList = &pSql->ipList;
  
51
  pIpList->numOfIps = pVgroupInfo->numOfIps;
52 53
  pIpList->inUse    = 0;
  
54 55 56
  for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
    strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
    pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
57 58 59
  }
}

S
slguan 已提交
60
void tscPrintMgmtIp() {
S
slguan 已提交
61 62
  if (tscMgmtIpSet.numOfIps <= 0) {
    tscError("invalid mgmt IP list:%d", tscMgmtIpSet.numOfIps);
S
slguan 已提交
63
  } else {
S
slguan 已提交
64
    for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
65
      tscTrace("mgmt index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
66
    }
S
slguan 已提交
67 68 69
  }
}

S
slguan 已提交
70
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
S
slguan 已提交
71 72 73
  tscMgmtIpSet.numOfIps = pIpList->numOfIps;
  tscMgmtIpSet.inUse = pIpList->inUse;
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
74
    tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
S
slguan 已提交
75 76 77 78
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
79 80 81
  if (tscMgmtIpSet.numOfIps != 1) {
    tscMgmtIpSet.numOfIps = 1;
    tscMgmtIpSet.inUse = 0;
H
hjxilinx 已提交
82
    taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]);
S
slguan 已提交
83 84 85 86 87
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
88
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
S
slguan 已提交
89
  tscMgmtIpSet = *pIpSet;
S
slguan 已提交
90 91 92 93
  tscTrace("mgmt IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
    tscTrace("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
  }
S
slguan 已提交
94 95
}

S
slguan 已提交
96
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
97 98 99 100 101 102 103 104
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
105 106 107
  }
}

H
hjxilinx 已提交
108 109 110 111 112 113 114
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
115
UNUSED_FUNC
H
hjxilinx 已提交
116 117
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
S
slguan 已提交
118
  return tscMgmtIpSet.numOfIps * factor;
H
hjxilinx 已提交
119 120
}

H
hzcheng 已提交
121 122 123 124 125 126 127 128 129 130 131 132
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
133
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
134
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
135
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
136

H
hzcheng 已提交
137 138 139
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
140 141
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
142 143
    }
  } else {
H
hjxilinx 已提交
144
    tscTrace("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
158 159 160
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
161
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
162
    
163 164 165 166
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
167
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
168 169 170 171 172
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

173
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
174 175 176 177
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
178
    tscAddSubqueryInfo(&pObj->pHb->cmd);
179

S
slguan 已提交
180
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
181 182 183
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
184
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
185 186 187 188 189 190 191 192 193
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
194
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
195 196 197
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
198
  if (NULL == pMsg) {
S
slguan 已提交
199 200
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
201 202
  }

S
slguan 已提交
203
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
204
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port[0]);
S
slguan 已提交
205
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
206 207 208 209 210 211

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
212
      .code    = 0
213
    };
H
hjxilinx 已提交
214
    rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
215
  } else {
S
slguan 已提交
216
    pSql->ipList = tscMgmtIpSet;
S
slguan 已提交
217
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
218 219 220 221 222 223 224
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
J
jtao1735 已提交
225
    tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
226
    rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg);
H
hzcheng 已提交
227 228
  }

S
slguan 已提交
229
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
230 231
}

232 233
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
234 235 236 237 238
  if (pSql == NULL) {
    tscError("%p sql is already released", pSql->signature);
    return;
  }
  if (pSql->signature != pSql) {
H
hzcheng 已提交
239
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
240
    return;
H
hzcheng 已提交
241 242
  }

S
slguan 已提交
243 244 245
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
246
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
247 248

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
249 250
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
251
    tscFreeSqlObj(pSql);
252
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
253
    return;
H
hzcheng 已提交
254 255
  }

256 257
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
258
  } else {
259
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
260 261
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
S
slguan 已提交
262
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE ||
263
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
264 265 266 267 268 269 270 271 272 273 274
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
275 276
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
277 278
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
279 280
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
281 282
        return;
      } else {
283
        tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
284
        
285
        pSql->res.code = rpcMsg->code;  // keep the previous error code
286 287 288 289 290 291 292 293
        if (pSql->retry > pSql->maxRetry) {
          tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
        } else {
          rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
          if (pTableMetaInfo->pTableMeta) {
            tscSendMsgToServer(pSql);
          }
  
294
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
295 296
          return;
        }
H
hzcheng 已提交
297 298
      }
    }
S
slguan 已提交
299
  }
300
  
H
hzcheng 已提交
301
  pRes->rspLen = 0;
302
  
H
hzcheng 已提交
303
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
304
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
305
  } else {
H
hjxilinx 已提交
306
    tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
307 308
  }

S
slguan 已提交
309 310 311 312 313
  if (pRes->code == TSDB_CODE_SUCCESS) {
    tscTrace("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
    pSql->retry = 0;
  }

S
slguan 已提交
314
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
315
    assert(rpcMsg->msgType == pCmd->msgType + 1);
316
    pRes->code    = rpcMsg->code;
317
    pRes->rspType = rpcMsg->msgType;
318
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
319

320
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
321 322 323 324 325
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
        pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      } else {
        pRes->pRsp = tmp;
326
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
327
      }
328 329
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
330 331
    }

H
hzcheng 已提交
332 333 334 335
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
336
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
337
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
338 339 340 341 342 343 344
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
345
      tscTrace("%p cmd:%d code:%s, inserted rows:%d, rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code),
H
hjxilinx 已提交
346
          pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
347
    } else {
H
hjxilinx 已提交
348
      tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
349 350
    }
  }
351
  
352 353
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hjxilinx 已提交
354
  
355
  if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
356
    void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
H
[td-32]  
hjxilinx 已提交
357
    rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
358
    
359
    tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
H
hzcheng 已提交
360

361 362
    /*
     * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
H
hjxilinx 已提交
363 364
     * may be freed in UDF, and reused by other threads before tscShouldBeFreed called, in which case
     * tscShouldBeFreed checks an object which is actually allocated by other threads.
365 366
     *
     * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
H
hjxilinx 已提交
367
     * the tscShouldBeFreed will success and tscFreeSqlObj free it immediately.
368
     */
H
hjxilinx 已提交
369
    bool shouldFree = tscShouldBeFreed(pSql);
H
hjxilinx 已提交
370
    (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
371

372
    if (shouldFree) {
373
      tscTrace("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
374
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
375 376 377
    }
  }

378
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
379 380
}

S
slguan 已提交
381 382 383
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
384 385
  int32_t code = TSDB_CODE_SUCCESS;
  
H
hjxilinx 已提交
386 387 388 389 390 391 392
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
393
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
394 395 396 397 398 399
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
400
  }
401

402
  code = tscSendMsgToServer(pSql);
403
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
404
    pRes->code = code;
H
hjxilinx 已提交
405
    tscQueueAsyncRes(pSql);
S
slguan 已提交
406
  }
H
hjxilinx 已提交
407 408
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
409 410 411
}

int tscProcessSql(SSqlObj *pSql) {
412 413
  char *   name = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
414 415
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
416
  STableMetaInfo *pTableMetaInfo = NULL;
417
  uint16_t        type = 0;
418

419
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
420
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
421 422
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
423
    }
424

425
    type = pQueryInfo->type;
426
  
H
hjxilinx 已提交
427
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
428
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
429
  }
430

431
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hjxilinx 已提交
432
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
433
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
434 435 436
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
H
hjxilinx 已提交
437 438
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
    pSql->ipList = tscMgmtIpSet; //?
H
hzcheng 已提交
439 440 441
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
442
  
S
slguan 已提交
443 444
  return doProcessSql(pSql);
}
H
hzcheng 已提交
445

H
hjxilinx 已提交
446
void tscKillSTableQuery(SSqlObj *pSql) {
447 448 449
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
450
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
451 452 453 454 455 456
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
457
    if (pSub == NULL) {
H
hzcheng 已提交
458 459
      continue;
    }
S
slguan 已提交
460

H
hzcheng 已提交
461 462 463 464 465
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
466
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
467 468 469 470 471 472 473 474 475 476 477
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
478
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
479 480 481 482 483 484
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

H
hjxilinx 已提交
485
  tscTrace("%p super table query cancelled", pSql);
H
hzcheng 已提交
486 487
}

488
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
489 490 491 492 493
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

494
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
495
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
496 497
  pMsg += sizeof(pSql->res.qhandle);

498
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
499
  pRetrieveMsg->free = htons(pQueryInfo->type);
500
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
501

502 503 504
  // todo valid the vgroupId at the client side
  if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) {
    SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList;
H
hjxilinx 已提交
505
    assert(pVgroupInfo->numOfVgroups == 1); // todo fix me
506
    
H
hjxilinx 已提交
507
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[0].vgId);
508 509
  } else {
    STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
H
hjxilinx 已提交
510
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
511 512
  }
  
513 514 515 516
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
S
slguan 已提交
517
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
518
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
519 520
}

521
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
522
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
523
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
524
  
525 526 527 528
  char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
H
hjxilinx 已提交
529
  
530
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
531 532
  
  pMsgDesc->numOfVnodes = htonl(1);       //todo set the right number of vnodes
533
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
534
  
535
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
536
  int32_t vgId = pTableMeta->vgroupInfo.vgId;
537
  
H
hjxilinx 已提交
538
  pShellMsg->header.vgId = htonl(vgId);
539
  pShellMsg->header.contLen = htonl(size);
540
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
541
  
542
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
543

H
hjxilinx 已提交
544
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
545
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
546
  tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
547
  
S
slguan 已提交
548
  tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps);
549
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
550 551 552 553 554 555
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
556
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
557
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
558
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
559

560
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
561 562 563 564
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
H
hjxilinx 已提交
565
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
566 567

  // meter query without tags values
H
hjxilinx 已提交
568
  if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
S
slguan 已提交
569
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
570
  }
H
hjxilinx 已提交
571 572
  
  int32_t size = 4096;
S
slguan 已提交
573
  return size;
H
hzcheng 已提交
574 575
}

576
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
577
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
578

H
hjxilinx 已提交
579
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
    
    SCMVgroupInfo* pVgroupInfo = NULL;
    if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
      pVgroupInfo = &pTableMeta->vgroupInfo;
    } else {
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
      
      pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
    }
    
    tscSetDnodeIpList(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
    
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->tid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
H
hjxilinx 已提交
600
  
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    
    pMsg += sizeof(STableIdInfo);
  } else {
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
  
    tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
    
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
    
    // set the vgroup info
    tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
//      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
      pMsg += sizeof(STableIdInfo);
    }
  }
  
  tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->uid);
H
hjxilinx 已提交
634
  
635 636 637
  return pMsg;
}

638
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
639 640
  SSqlCmd *pCmd = &pSql->cmd;

641
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
642

S
slguan 已提交
643 644 645 646
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }
647
  
648
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
649
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
650
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
651
  
H
hjxilinx 已提交
652
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
653 654 655
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
656 657 658 659 660 661 662 663 664 665
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
666 667

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
668

S
slguan 已提交
669
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
670

671
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
672
  
673
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
674 675
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
676
  } else {
H
hjxilinx 已提交
677 678
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
679 680
  }

681 682 683 684 685
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
  pQueryMsg->interpoType    = htons(pQueryInfo->interpoType);
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
686
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
687 688
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
689
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
690
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
691
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
692
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
693
  pQueryMsg->queryType      = htons(pQueryInfo->type);
H
hjxilinx 已提交
694 695 696
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hjxilinx 已提交
697 698
  if (numOfOutput < 0) {
    tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
H
hzcheng 已提交
699 700 701 702
    return -1;
  }

  // set column list ids
703 704
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
705
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
706
  
707 708 709
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
710

711
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
712 713 714 715 716
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
               pColSchema->name);

717
      return TSDB_CODE_INVALID_SQL;
718
    }
H
hzcheng 已提交
719 720 721

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
722
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
723
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
724

S
slguan 已提交
725 726 727
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
728

S
slguan 已提交
729
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
730
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
731 732

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
733

734
      if (pColFilter->filterstr) {
S
slguan 已提交
735 736 737 738 739 740 741 742 743 744
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
745

S
slguan 已提交
746 747 748 749 750
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
751 752
  }

H
hjxilinx 已提交
753
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
754
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
755
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
756

H
hjxilinx 已提交
757
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
758
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
759 760 761 762
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

763 764 765
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
766

767
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
768
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
769
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
770 771 772 773 774 775 776

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
777
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
778 779 780 781 782
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
783
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
784
  }
785
  
786
  // serialize the table info (sid, uid, tags)
787 788
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
789
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
790
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
791
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
792 793
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
794
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
795 796
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
797 798 799
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

800 801
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
802 803 804

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
805 806 807
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
808 809 810
    }
  }

811
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
H
hjxilinx 已提交
812
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
813 814
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
815 816
    }
  }
817 818 819 820 821 822 823 824 825
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
826
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
                 pCol->colIndex, pColSchema->name);

        return TSDB_CODE_INVALID_SQL;
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
848 849 850 851 852 853

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

854
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
855
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
856
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
857 858

    // todo refactor
859 860
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
861 862 863 864

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
865 866
  }

S
slguan 已提交
867 868
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
869 870
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
871 872
  }

H
hjxilinx 已提交
873
  // serialize tag column query condition
874
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
875 876
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
877
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
878
    if (pCond != NULL && pCond->cond != NULL) {
879 880
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
881
      
882
      pMsg += pCond->len;
883 884 885
    }
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
886 887 888 889 890 891 892 893
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

894
  int32_t msgLen = pMsg - pStart;
H
hzcheng 已提交
895 896 897

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
898
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
899
  
900
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
901
  assert(msgLen + minMsgSize() <= size);
902 903

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
904 905
}

906 907
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
908
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
909
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
910

911
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
912

913
  assert(pCmd->numOfClause == 1);
914
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
915
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
916

917
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
918 919
}

920 921
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
922
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
923 924 925 926
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
927

928
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
929 930
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
931
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
932

933
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
934 935
}

936 937
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
938
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
939 940 941 942
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
943

944
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
945

946 947
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
948

949 950
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
951

952
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
953

954 955 956 957 958 959 960 961
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
962

963 964 965 966 967 968 969 970 971 972 973 974 975
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
976

S
slguan 已提交
977
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
978
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
979 980
}

981 982
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
983
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
984

S
slguan 已提交
985 986 987 988 989
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

990
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
991

992 993 994
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
995

996 997 998 999
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1000 1001
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1002
  }
H
hzcheng 已提交
1003

1004
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1005
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1006
  } else {
S
slguan 已提交
1007
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1008
  }
H
hzcheng 已提交
1009

1010
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1011 1012
}

1013 1014
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1015
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
1016

S
slguan 已提交
1017 1018 1019 1020
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1021

S
slguan 已提交
1022
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
1023 1024
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1025

1026 1027
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1028
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1029

S
slguan 已提交
1030 1031 1032 1033 1034
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1035
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1036

1037
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1038
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
1039
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1040

S
slguan 已提交
1041
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1042
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1043 1044
}

1045 1046
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1047
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1048

S
slguan 已提交
1049 1050 1051 1052
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1053

1054
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1055
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1056
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1057
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1058

S
slguan 已提交
1059
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1060
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1061 1062
}

1063
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1064
  SSqlCmd *pCmd = &pSql->cmd;
1065
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1066 1067 1068 1069
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1070

1071
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1072
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
J
jtao1735 已提交
1073
  strcpy(pDrop->ep, pTableMetaInfo->name);
S
slguan 已提交
1074
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1075

1076
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1077 1078
}

S
[TD-16]  
slguan 已提交
1079
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1080
  SSqlCmd *pCmd = &pSql->cmd;
1081
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1082
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1083

S
slguan 已提交
1084 1085 1086 1087
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1088

1089
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1090
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1091
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1092

1093
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1094 1095
}

S
[TD-16]  
slguan 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDropMsg->user, pTableMetaInfo->name);

  return TSDB_CODE_SUCCESS;
}

1113 1114
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1115
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1116

S
slguan 已提交
1117 1118 1119 1120
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1121

1122
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1123
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1124
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1125
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1126

1127
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1128 1129
}

1130
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1131
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1132
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1133
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1134
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1135

S
slguan 已提交
1136 1137 1138
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1139
  }
H
hzcheng 已提交
1140

1141
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1142

1143
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1144
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1145
  if (nameLen > 0) {
H
hjxilinx 已提交
1146
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1147
  } else {
S
slguan 已提交
1148
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1149 1150
  }

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1163

1164 1165 1166 1167
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1168
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1169
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1170 1171
}

1172
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1173
  SSqlCmd *pCmd = &pSql->cmd;
1174
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1175

S
slguan 已提交
1176 1177 1178 1179
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1180

1181
  SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
1182 1183 1184
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1185
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1186 1187
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1188
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1189 1190
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1191
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1192 1193 1194
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1195 1196
}

1197
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1198 1199
  SSqlCmd *pCmd = &(pSql->cmd);

1200
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1201

1202
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1203
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1204 1205
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1206
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1207
  }
1208

1209 1210 1211
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1212 1213 1214 1215

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1216
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1217
  int              msgLen = 0;
S
slguan 已提交
1218
  SSchema *        pSchema;
H
hzcheng 已提交
1219
  int              size = 0;
1220 1221 1222
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1223
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1224 1225

  // Reallocate the payload size
1226
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1227 1228
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1229
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1230
  }
H
hzcheng 已提交
1231 1232


1233
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1234
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1235 1236

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1237
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1238

1239 1240 1241
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1242 1243 1244 1245
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1246
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1247

1248 1249 1250
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
1251
    pMsg += sizeof(STagData);
1252
  } else {  // create (super) table
1253
    pSchema = (SSchema *)pCreateTableMsg->schema;
1254

H
hzcheng 已提交
1255
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1256
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1257 1258 1259 1260

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1261

H
hzcheng 已提交
1262 1263 1264 1265
      pSchema++;
    }

    pMsg = (char *)pSchema;
1266 1267
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1268

1269 1270 1271
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1272 1273 1274
    }
  }

H
hjxilinx 已提交
1275
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1276

S
slguan 已提交
1277
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1278
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1279
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1280
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1281 1282

  assert(msgLen + minMsgSize() <= size);
1283
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1284 1285 1286
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1287
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1288
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1289 1290 1291
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1292
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1293
  SCMAlterTableMsg *pAlterTableMsg;
1294
  char *          pMsg;
H
hzcheng 已提交
1295 1296 1297
  int             msgLen = 0;
  int             size = 0;

1298 1299 1300
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1301
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1302 1303

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1304 1305 1306 1307
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1308

1309
  pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
1310

H
hjxilinx 已提交
1311
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1312

1313 1314
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hjxilinx 已提交
1315
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1316
  pAlterTableMsg->type = htons(pAlterInfo->type);
1317

S
slguan 已提交
1318
  pAlterTableMsg->numOfCols = tscNumOfFields(pQueryInfo);
1319
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
1320

S
slguan 已提交
1321
  SSchema *pSchema = pAlterTableMsg->schema;
S
slguan 已提交
1322
  for (int i = 0; i < pAlterTableMsg->numOfCols; ++i) {
H
hjxilinx 已提交
1323
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1324 1325 1326 1327 1328 1329 1330 1331 1332

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
1333
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
1334
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1335
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1336 1337

  assert(msgLen + minMsgSize() <= size);
1338

1339
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1340 1341
}

1342
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1343
  SSqlCmd *pCmd = &pSql->cmd;
1344
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1345
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1346

1347
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1348
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1349
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
1350

1351
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1352 1353
}

1354
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1355 1356 1357
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1358

S
slguan 已提交
1359 1360 1361
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
1362
  }
S
slguan 已提交
1363

S
slguan 已提交
1364 1365 1366 1367
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1368

1369
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1370 1371
}

1372
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1373
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1374 1375 1376
    return pRes->code;
  }

H
hjxilinx 已提交
1377
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1378
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1379
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1394

1395
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1396

H
hzcheng 已提交
1397 1398 1399 1400 1401 1402 1403
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1404
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1405
  } else {
S
slguan 已提交
1406
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1422
  SSqlCmd *       pCmd = &pSql->cmd;
1423
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1424

H
hjxilinx 已提交
1425
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1426 1427
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1428 1429 1430 1431
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
1432
//  SSqlCmd *pCmd = &pSql->cmd;
1433

1434 1435
//  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1436 1437

  int32_t numOfRes = 0;
H
hjxilinx 已提交
1438
#if 0
1439
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
H
hjxilinx 已提交
1440
    numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
1441 1442 1443
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
H
hjxilinx 已提交
1444 1445 1446
  
#endif

H
hzcheng 已提交
1447 1448 1449
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1450
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1451 1452 1453
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1454
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1455
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1456 1457

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1458
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1459 1460 1461 1462
  }

  pRes->row = 0;

1463
  uint8_t code = pRes->code;
H
hjxilinx 已提交
1464 1465 1466 1467
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1468 1469 1470 1471 1472
  }

  return code;
}

S
slguan 已提交
1473
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1474

1475
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1476
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1477
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1478
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1479
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1480

S
slguan 已提交
1481 1482 1483 1484 1485
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1486
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1487 1488 1489 1490 1491

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
1492
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
1493
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
1494

1495
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1496 1497
}

H
hjxilinx 已提交
1498
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1499
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1500
  char *         pMsg;
H
hzcheng 已提交
1501 1502 1503 1504 1505
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
1506 1507 1508 1509
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
1510 1511 1512 1513
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

1514 1515 1516
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1517
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1518

1519
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1520
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1521
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1522

1523
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1524

H
hjxilinx 已提交
1525
  if (pSql->cmd.autoCreated) {
H
hzcheng 已提交
1526 1527 1528 1529
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

H
hjxilinx 已提交
1530
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
S
slguan 已提交
1531
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1532 1533 1534 1535

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1536
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1537 1538
}

S
slguan 已提交
1539
/**
1540
 *  multi table meta req pkg format:
1541
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1542 1543
 *      no used         4B
 **/
1544
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1557
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1558

1559
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1560
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1561 1562

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1563
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1564 1565 1566 1567
  }

  tfree(tmpData);

1568
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1569
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1570 1571 1572

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1573
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1574 1575 1576 1577 1578
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hjxilinx 已提交
1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1605

H
hjxilinx 已提交
1606 1607 1608
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {

#if 0
S
slguan 已提交
1609
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1610 1611
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1612
  int             tableIndex = 0;
H
hzcheng 已提交
1613

1614 1615 1616
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1617
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1618

H
hjxilinx 已提交
1619
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1620 1621

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1622 1623 1624 1625
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1626 1627 1628 1629 1630

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
1631
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1632 1633 1634

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1635
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1636
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1637

S
slguan 已提交
1638
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1639 1640 1641 1642 1643

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1644
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1645

S
slguan 已提交
1646
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1647
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1648

S
slguan 已提交
1649 1650 1651
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1652
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1653
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1654 1655 1656 1657

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1658
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1659
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1660
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1661 1662 1663 1664

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1665 1666
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1667 1668 1669 1670

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
1671
      SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
H
hjxilinx 已提交
1672
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1673
        condLen = strlen(pCond->cond) + 1;
1674

H
hjxilinx 已提交
1675
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1676
        if (!ret) {
1677
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
S
slguan 已提交
1678 1679 1680
          return 0;
        }
      }
H
hzcheng 已提交
1681 1682
    }

S
slguan 已提交
1683
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1684

S
slguan 已提交
1685 1686 1687
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1688

S
slguan 已提交
1689 1690 1691
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1692

S
slguan 已提交
1693
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1694 1695 1696 1697 1698 1699 1700
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1701 1702
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1703 1704
    }

1705
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1706

H
hjxilinx 已提交
1707
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1708 1709 1710 1711 1712
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1713 1714
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1715 1716 1717 1718 1719 1720 1721 1722
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1723
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
1724 1725
          SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
          SColIndex *pDestCol = (SColIndex *)pMsg;
1726

H
hjxilinx 已提交
1727
          pDestCol->colIdxInBuf = 0;
1728
          pDestCol->colIndex = htons(pCol->colIndex);
H
hjxilinx 已提交
1729 1730
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1731
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1732

1733
          pMsg += sizeof(SColIndex);
S
slguan 已提交
1734 1735
        }
      }
H
hzcheng 已提交
1736
    }
S
slguan 已提交
1737

H
hjxilinx 已提交
1738 1739
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1740 1741 1742

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1743 1744 1745 1746
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1747
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hzcheng 已提交
1748
  assert(msgLen + minMsgSize() <= size);
H
hjxilinx 已提交
1749
#endif
1750
  
H
hjxilinx 已提交
1751
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    strncpy(pMsg, pTableMetaInfo->name, TSDB_TABLE_ID_LEN);
    pMsg += TSDB_TABLE_ID_LEN;
  }
H
hjxilinx 已提交
1765 1766

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1767
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1768

1769
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1770 1771
}

1772
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
1773 1774 1775 1776
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
1777
  size += sizeof(SQqueryList);
H
hzcheng 已提交
1778 1779 1780

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
1781
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
1782 1783 1784
    tpSql = tpSql->next;
  }

S
slguan 已提交
1785
  size += sizeof(SStreamList);
H
hzcheng 已提交
1786 1787
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
1788
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
1789 1790 1791 1792 1793 1794
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1795
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1796 1797 1798 1799 1800 1801 1802 1803 1804
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

1805
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
1806
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1807
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1808 1809 1810
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1824
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1825 1826

  assert(msgLen + minMsgSize() <= size);
1827
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1828 1829
}

1830 1831
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1832

1833 1834
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
H
hjxilinx 已提交
1835 1836 1837
  
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1838 1839
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1840
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1841

H
hjxilinx 已提交
1842 1843
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
H
hzcheng 已提交
1844 1845 1846
    return TSDB_CODE_INVALID_VALUE;
  }

1847 1848
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
H
hzcheng 已提交
1849 1850 1851
    return TSDB_CODE_INVALID_VALUE;
  }

1852 1853
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
H
hzcheng 已提交
1854 1855 1856
    return TSDB_CODE_INVALID_VALUE;
  }

H
hjxilinx 已提交
1857 1858
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1859 1860
  }

1861
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1862

1863
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1864 1865 1866
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
H
hjxilinx 已提交
1867 1868
    
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1869 1870 1871
    pSchema++;
  }

1872 1873
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
H
hzcheng 已提交
1874

1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885
#if 0
  // if current table is created according to super table, get the table meta of super table
  if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
    char id[TSDB_TABLE_ID_LEN + 1] = {0};
    strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
  
    // NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
    pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
  }
#endif
  
H
hzcheng 已提交
1886
  // todo add one more function: taosAddDataIfNotExists();
1887
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1888
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1889

H
hjxilinx 已提交
1890
  pTableMetaInfo->pTableMeta =
H
hjxilinx 已提交
1891
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1892
  
1893
  // todo handle out of memory case
1894
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hjxilinx 已提交
1895
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
1896
  }
H
hzcheng 已提交
1897

1898
  free(pTableMeta);
1899
  
H
hjxilinx 已提交
1900
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1901 1902
}

S
slguan 已提交
1903
/**
1904
 *  multi table meta rsp pkg format:
1905
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1906 1907 1908
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1909
#if 0
S
slguan 已提交
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

1922
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1923
  totalNum = htonl(pInfo->numOfTables);
1924
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1925 1926

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1927
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1928
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1929 1930 1931

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1932
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1933 1934
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1935 1936
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
S
slguan 已提交
1937 1938 1939 1940 1941
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
H
hjxilinx 已提交
1965
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
2000
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
2001
    //  }
S
slguan 已提交
2002
  }
H
hjxilinx 已提交
2003
  
S
slguan 已提交
2004 2005 2006
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
2007 2008
#endif
  
S
slguan 已提交
2009 2010 2011
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
2012 2013
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
S
slguan 已提交
2014 2015
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;
H
hjxilinx 已提交
2016
  
S
slguan 已提交
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
2033
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
2034 2035

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
2036
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
2037

S
slguan 已提交
2038
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
2039 2040 2041
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

2042
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
H
hzcheng 已提交
2043

2044 2045
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2046 2047 2048
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2049

S
slguan 已提交
2050
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
2051
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2052

S
slguan 已提交
2053
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2054 2055
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2056

S
slguan 已提交
2057
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2058

S
slguan 已提交
2059 2060
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2061
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2062

2063 2064
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2065

S
slguan 已提交
2066
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2067

2068
      pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2069
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2070

2071
      size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2072
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2073 2074
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2075

2076 2077
        ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
        ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
2078

2079 2080
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2081
      }
H
hzcheng 已提交
2082
    }
S
slguan 已提交
2083

2084
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2085 2086
  }

2087
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2088 2089 2090
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2091
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2092
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2093

S
slguan 已提交
2094 2095 2096
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2097

S
slguan 已提交
2098
    // release the used metricmeta
H
hjxilinx 已提交
2099 2100
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2101 2102 2103 2104
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2105
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2106 2107 2108
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2109 2110
  }

S
slguan 已提交
2111 2112 2113 2114 2115 2116 2117 2118
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);
H
hjxilinx 已提交
2119
#endif
2120
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
2121
  
H
hjxilinx 已提交
2122
  // NOTE: the order of several table must be preserved.
2123
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
2124 2125
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
2126
  
2127 2128 2129
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
2130
  
2131
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];

      pVgroups->vgId = htonl(pVgroups->vgId);
      assert(pVgroups->numOfIps >= 1);

      for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
        pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
      }

      pMsg += size;
2153
    }
H
hjxilinx 已提交
2154 2155
  }
  
S
slguan 已提交
2156
  return pSql->res.code;
H
hzcheng 已提交
2157 2158 2159 2160 2161 2162
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2163
  STableMetaMsg * pMetaMsg;
2164
  SCMShowRsp *pShow;
S
slguan 已提交
2165
  SSchema *    pSchema;
H
hzcheng 已提交
2166 2167
  char         key[20];

2168 2169 2170
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2171
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2172

H
hjxilinx 已提交
2173
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2174

2175
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2176
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2177 2178
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2179
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2180
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2181

H
hjxilinx 已提交
2182
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2183

H
hjxilinx 已提交
2184
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2185 2186
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2187 2188 2189 2190
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2191
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2192 2193
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2194
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2195
  
H
hjxilinx 已提交
2196 2197 2198
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2199
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
2200
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2201

2202 2203 2204 2205
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
2206 2207
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2208
  SColumnIndex index = {0};
H
hjxilinx 已提交
2209 2210 2211
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2212
    index.columnIndex = i;
2213 2214
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2215 2216
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2217
    
H
hjxilinx 已提交
2218
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
2219
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
2220
  }
H
hjxilinx 已提交
2221 2222
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2223
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
2224 2225
  
  tfree(pTableMeta);
H
hzcheng 已提交
2226 2227 2228 2229
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2230
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2231 2232 2233
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2234
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2235
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2236 2237
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2238 2239 2240
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2241
  tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
2242

S
slguan 已提交
2243
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2244 2245
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
scripts  
slguan 已提交
2246
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2247 2248 2249 2250 2251

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2252
  STscObj *       pObj = pSql->pTscObj;
2253
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2254

H
hjxilinx 已提交
2255
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2256 2257 2258 2259
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2260
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2261 2262 2263 2264
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2265
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2266

H
hjxilinx 已提交
2267 2268
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2269 2270 2271 2272 2273 2274 2275 2276
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2277 2278
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2279
   */
H
hjxilinx 已提交
2280 2281
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2282

H
hjxilinx 已提交
2283 2284
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2285 2286 2287 2288 2289 2290
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2291
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2292

H
hjxilinx 已提交
2293 2294
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2295 2296 2297
    return 0;
  }

H
hjxilinx 已提交
2298 2299
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2300

H
hjxilinx 已提交
2301
  if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2302
    bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
H
hzcheng 已提交
2303

H
hjxilinx 已提交
2304
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2305
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2306

2307
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2308
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2309
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2324
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2325 2326 2327
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2328
  pRes->data = NULL;
S
slguan 已提交
2329
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2330 2331 2332
  return 0;
}

H
hjxilinx 已提交
2333
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2334 2335 2336
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2337
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2338 2339 2340

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2341 2342
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2343
  pRes->completed = (pRetrieve->completed == 1);
2344
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2345
  
2346
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2347 2348 2349 2350
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2351
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2352
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2353
    
H
hjxilinx 已提交
2354
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2355 2356
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2357 2358
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2359
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2360
    p += sizeof(int32_t);
S
slguan 已提交
2361
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2362 2363 2364 2365
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2366
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2367
    }
2368 2369
  }

H
hzcheng 已提交
2370
  pRes->row = 0;
S
slguan 已提交
2371
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
2372 2373 2374 2375 2376

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2377 2378
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2379
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2380

S
slguan 已提交
2381
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2382 2383 2384

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2385

2386
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2387 2388 2389 2390
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2391
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2392

2393
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2394 2395
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2396
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
S
slguan 已提交
2397 2398
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
2399

H
hzcheng 已提交
2400 2401 2402
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2403

2404
  tscAddSubqueryInfo(&pNew->cmd);
2405 2406 2407 2408

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2409
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
S
slguan 已提交
2410
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
2411
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2412
    free(pNew);
2413

S
slguan 已提交
2414 2415 2416
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
2417
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2418
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2419

2420
  strncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, tListLen(pNewMeterMetaInfo->name));
2421
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
2422
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
2423

H
hjxilinx 已提交
2424 2425
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2426

H
hjxilinx 已提交
2427 2428 2429
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2430 2431 2432 2433 2434
  }

  return code;
}

H
hjxilinx 已提交
2435
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2436
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2437

2438
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2439 2440
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2441 2442
  }
  
H
hjxilinx 已提交
2443 2444
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2445
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2446 2447
    tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2448 2449 2450

    return TSDB_CODE_SUCCESS;
  }
2451 2452
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2453 2454
}

H
hjxilinx 已提交
2455
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2456
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2457
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2458 2459 2460 2461
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
2462
 *
H
hzcheng 已提交
2463 2464 2465 2466 2467
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
2468
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
2469 2470 2471 2472 2473 2474 2475
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
2476
 * @param tableId       meter id
H
hzcheng 已提交
2477 2478
 * @return              status code
 */
S
slguan 已提交
2479
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
2480 2481
  int code = 0;

H
hjxilinx 已提交
2482
  // handle table meta renew process
H
hzcheng 已提交
2483
  SSqlCmd *pCmd = &pSql->cmd;
2484 2485

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2486
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2487 2488

  /*
S
slguan 已提交
2489
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
2490 2491
   * 2. if get metermeta failed, still get the metermeta
   */
2492
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
2493
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
2494
    if (pTableMetaInfo->pTableMeta) {
2495 2496
      tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
               tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2497
    }
2498

2499
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
2500
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2501

2502
    code = getTableMetaFromMgmt(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
2503
  } else {
H
hjxilinx 已提交
2504
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2505 2506
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2507 2508 2509 2510 2511
  }

  return code;
}

H
hjxilinx 已提交
2512
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2513
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2514
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2515
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2516 2517
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2518 2519
    }
  }
H
hjxilinx 已提交
2520 2521 2522 2523
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2524

H
hjxilinx 已提交
2525 2526 2527 2528 2529
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2530 2531
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2532

S
slguan 已提交
2533
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2534 2535 2536
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2537
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2538 2539
  
  SQueryInfo *pNewQueryInfo = NULL;
2540 2541 2542
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
2543
  
H
hjxilinx 已提交
2544
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2545
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2546 2547 2548
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2549 2550 2551 2552 2553 2554
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2555

2556
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hjxilinx 已提交
2557
  tscTrace("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2558

2559 2560 2561 2562 2563
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2564 2565 2566 2567 2568
  }

  return code;
}

2569
void tscInitMsgsFp() {
S
slguan 已提交
2570 2571 2572
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
2573 2574

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2575
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2576

2577 2578
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2579 2580

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2581
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2582 2583 2584
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2585
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2586 2587 2588
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2589 2590 2591 2592 2593
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2594
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2595
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2596
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2597 2598 2599 2600

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2601 2602 2603
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2604 2605

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2606
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2607 2608 2609 2610 2611

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2612
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2613
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2614
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2615 2616

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2617
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2618
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2619

H
hjxilinx 已提交
2620 2621 2622 2623 2624
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
2625

H
hzcheng 已提交
2626 2627
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2628
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2629 2630 2631 2632 2633 2634 2635 2636 2637 2638

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}