tscServer.c 87.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tcache.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
21
#include "tscSubquery.h"
H
hzcheng 已提交
22 23 24 25 26 27 28
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
29
#include "tscLog.h"
H
hzcheng 已提交
30 31 32

#define TSC_MGMT_VNODE 999

S
slguan 已提交
33
SRpcIpSet  tscMgmtIpSet;
S
slguan 已提交
34 35
SRpcIpSet  tscDnodeIpSet;

36 37
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
38 39 40
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
41

42 43 44
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
45

S
slguan 已提交
46
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
47

48 49 50
static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) {
  SRpcIpSet* pIpList = &pSql->ipList;
  
H
hjxilinx 已提交
51
  pIpList->numOfIps = pTableMeta->vgroupInfo.numOfIps;
52 53
  pIpList->inUse    = 0;
  
H
hjxilinx 已提交
54
  for(int32_t i = 0; i < pTableMeta->vgroupInfo.numOfIps; ++i) {
J
jtao1735 已提交
55 56
    strcpy(pIpList->fqdn[i], pTableMeta->vgroupInfo.ipAddr[i].fqdn);
    pIpList->port[i] = pTableMeta->vgroupInfo.ipAddr[i].port;
57 58 59
  }
}

S
slguan 已提交
60
void tscPrintMgmtIp() {
S
slguan 已提交
61 62
  if (tscMgmtIpSet.numOfIps <= 0) {
    tscError("invalid mgmt IP list:%d", tscMgmtIpSet.numOfIps);
S
slguan 已提交
63
  } else {
S
slguan 已提交
64
    for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
65
      tscTrace("mgmt index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
66
    }
S
slguan 已提交
67 68 69
  }
}

S
slguan 已提交
70
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
S
slguan 已提交
71 72 73
  tscMgmtIpSet.numOfIps = pIpList->numOfIps;
  tscMgmtIpSet.inUse = pIpList->inUse;
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
74
    tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
S
slguan 已提交
75 76 77 78
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
79 80 81
  if (tscMgmtIpSet.numOfIps != 1) {
    tscMgmtIpSet.numOfIps = 1;
    tscMgmtIpSet.inUse = 0;
S
slguan 已提交
82
    taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]); 
S
slguan 已提交
83 84 85 86 87
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
88 89
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
  tscTrace("mgmt IP list is changed for ufp is called");
S
slguan 已提交
90
  tscMgmtIpSet = *pIpSet;
S
slguan 已提交
91 92
}

S
slguan 已提交
93
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
94 95 96 97 98 99 100 101
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
102 103 104
  }
}

H
hjxilinx 已提交
105 106 107 108 109 110 111
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
112
UNUSED_FUNC
H
hjxilinx 已提交
113 114
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
S
slguan 已提交
115
  return tscMgmtIpSet.numOfIps * factor;
H
hjxilinx 已提交
116 117
}

H
hzcheng 已提交
118 119 120 121 122 123 124 125 126 127 128 129
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
130
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
131
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
132
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
133

H
hzcheng 已提交
134 135 136
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
137 138
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
155 156 157
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
158
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
159
    
160 161 162 163
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
164
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
165 166 167 168 169
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

170
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
171 172 173 174
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
175
    tscAddSubqueryInfo(&pObj->pHb->cmd);
176

S
slguan 已提交
177
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
178 179 180
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
181
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
182 183 184 185 186 187 188 189 190
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
191
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
192 193 194
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
195
  if (NULL == pMsg) {
S
slguan 已提交
196 197
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
198 199
  }

S
slguan 已提交
200
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
201
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
S
slguan 已提交
202
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
203 204 205 206 207 208

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
209
      .code    = 0
210
    };
H
hjxilinx 已提交
211
    rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
212
  } else {
S
slguan 已提交
213
    pSql->ipList = tscMgmtIpSet;
S
slguan 已提交
214
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
215 216 217 218 219 220 221
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
J
jtao1735 已提交
222
    tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
223
    rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg);
H
hzcheng 已提交
224 225
  }

S
slguan 已提交
226
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
227 228
}

229 230
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
S
slguan 已提交
231
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
232
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
233
    return;
H
hzcheng 已提交
234 235
  }

S
slguan 已提交
236 237 238
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
239
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
240 241

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
242 243
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
244
    tscFreeSqlObj(pSql);
245
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
246
    return;
H
hzcheng 已提交
247 248
  }

249 250
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
251
  } else {
252
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
253 254
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
S
slguan 已提交
255
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE ||
256
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
257 258 259 260 261 262 263 264 265 266 267
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
268 269
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
270 271
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
272 273
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
274 275
        return;
      } else {
276
        tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
277
        
278
        pSql->res.code = rpcMsg->code;  // keep the previous error code
279 280 281 282 283 284 285 286
        if (pSql->retry > pSql->maxRetry) {
          tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
        } else {
          rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
          if (pTableMetaInfo->pTableMeta) {
            tscSendMsgToServer(pSql);
          }
  
287
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
288 289
          return;
        }
H
hzcheng 已提交
290 291
      }
    }
S
slguan 已提交
292
  }
293
  
H
hzcheng 已提交
294
  pRes->rspLen = 0;
295
  
H
hzcheng 已提交
296
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
297
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
298
  } else {
H
hjxilinx 已提交
299
    tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
300 301
  }

S
slguan 已提交
302 303 304 305 306
  if (pRes->code == TSDB_CODE_SUCCESS) {
    tscTrace("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
    pSql->retry = 0;
  }

S
slguan 已提交
307
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
308
    assert(rpcMsg->msgType == pCmd->msgType + 1);
309
    pRes->code    = rpcMsg->code;
310
    pRes->rspType = rpcMsg->msgType;
311
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
312

313 314 315 316 317 318
    if (pRes->rspLen > 0) {
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
        pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      } else {
        pRes->pRsp = tmp;
319
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
320
      }
321 322
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
323 324 325
    }

    // ignore the error information returned from mnode when set ignore flag in sql
S
slguan 已提交
326
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) {
H
hzcheng 已提交
327 328 329 330 331 332 333
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
334
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
335
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
336 337 338 339 340 341 342
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
343
      tscTrace("%p cmd:%d code:%s, inserted rows:%d, rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code),
H
hjxilinx 已提交
344
          pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
345
    } else {
H
hjxilinx 已提交
346
      tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
347 348 349
    }
  }

350 351
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hjxilinx 已提交
352
  
353
  if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
354
    void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
H
[td-32]  
hjxilinx 已提交
355
    rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
356
    
357
    tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
H
hzcheng 已提交
358

359 360
    /*
     * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
H
hjxilinx 已提交
361 362
     * may be freed in UDF, and reused by other threads before tscShouldBeFreed called, in which case
     * tscShouldBeFreed checks an object which is actually allocated by other threads.
363 364
     *
     * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
H
hjxilinx 已提交
365
     * the tscShouldBeFreed will success and tscFreeSqlObj free it immediately.
366
     */
H
hjxilinx 已提交
367
    bool shouldFree = tscShouldBeFreed(pSql);
H
hjxilinx 已提交
368
    (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
369

370
    if (shouldFree) {
371
      tscTrace("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
372
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
373 374 375
    }
  }

376
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
377 378
}

S
slguan 已提交
379 380 381
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
382 383
  int32_t code = TSDB_CODE_SUCCESS;
  
H
hjxilinx 已提交
384 385 386 387 388 389 390
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
391
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
392 393 394 395 396 397
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
398
  }
399

400
  code = tscSendMsgToServer(pSql);
401
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
402
    pRes->code = code;
H
hjxilinx 已提交
403
    tscQueueAsyncRes(pSql);
S
slguan 已提交
404
  }
H
hjxilinx 已提交
405 406
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
407 408 409
}

int tscProcessSql(SSqlObj *pSql) {
410 411 412
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
413 414
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
415
  STableMetaInfo *pTableMetaInfo = NULL;
416
  uint16_t        type = 0;
417

418
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
419
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
420 421
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
422
    }
423

424
    type = pQueryInfo->type;
425
  
426
    // for heartbeat, numOfTables == 0;
427
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
428
  }
429

430
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
431
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
H
hjxilinx 已提交
432 433
    // the pTableMetaInfo cannot be NULL
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
434 435 436
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
H
hzcheng 已提交
437
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
S
slguan 已提交
438
    pSql->ipList = tscMgmtIpSet;
H
hzcheng 已提交
439 440 441 442
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
443
  // todo handle async situation
444 445
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
H
hjxilinx 已提交
446
      return tscHandleMasterJoinQuery(pSql);
S
slguan 已提交
447 448
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
449
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
450 451 452 453
        return doProcessSql(pSql);
      }
    }
  }
454
  
H
hjxilinx 已提交
455 456 457 458 459
  if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
    tscHandleMasterSTableQuery(pSql);
    return pRes->code;
  } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) {  // multi-vnodes insertion
    tscHandleMultivnodeInsert(pSql);
H
hzcheng 已提交
460 461
    return pSql->res.code;
  }
462
  
S
slguan 已提交
463 464
  return doProcessSql(pSql);
}
H
hzcheng 已提交
465

H
hjxilinx 已提交
466
void tscKillSTableQuery(SSqlObj *pSql) {
467 468 469
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
470
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
471 472 473 474 475 476
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
477
    if (pSub == NULL) {
H
hzcheng 已提交
478 479
      continue;
    }
S
slguan 已提交
480

H
hzcheng 已提交
481 482 483 484 485
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
486
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

508
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
509 510 511 512 513
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

514
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
515
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
516 517
  pMsg += sizeof(pSql->res.qhandle);

518
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
519
  pRetrieveMsg->free = htons(pQueryInfo->type);
520
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
521

522 523 524
  // todo valid the vgroupId at the client side
  if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) {
    SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList;
H
hjxilinx 已提交
525
    assert(pVgroupInfo->numOfVgroups == 1); // todo fix me
526
    
H
hjxilinx 已提交
527
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[0].vgId);
528 529
  } else {
    STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
H
hjxilinx 已提交
530
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
531 532
  }
  
533 534 535 536
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
S
slguan 已提交
537
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
538
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
539 540
}

541
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
542
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
543
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
544
  
545 546 547 548
  char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
H
hjxilinx 已提交
549
  
550
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
551 552
  
  pMsgDesc->numOfVnodes = htonl(1);       //todo set the right number of vnodes
553
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
554
  
555
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
556
  int32_t vgId = pTableMeta->vgroupInfo.vgId;
557
  
H
hjxilinx 已提交
558
  pShellMsg->header.vgId = htonl(vgId);
559
  pShellMsg->header.contLen = htonl(size);
560
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
561
  
562
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
563

H
hjxilinx 已提交
564
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
565
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
566
  tscSetDnodeIpList(pSql, pTableMeta);
567
  
S
slguan 已提交
568
  tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps);
569
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
570 571 572 573 574 575
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
576
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
577
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
578
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
579

580
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
581 582 583 584
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
H
hjxilinx 已提交
585
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
586 587

  // meter query without tags values
H
hjxilinx 已提交
588
  if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
S
slguan 已提交
589
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
590
  }
H
hjxilinx 已提交
591 592
  
  int32_t size = 4096;
S
slguan 已提交
593
  return size;
H
hzcheng 已提交
594 595
}

H
hjxilinx 已提交
596
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) {
597
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
598

H
hjxilinx 已提交
599
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
600 601 602 603 604 605 606 607
  tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid);
  
  STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
  pTableIdInfo->sid = htonl(pTableMeta->sid);
  pTableIdInfo->uid = htobe64(pTableMeta->uid);
  pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
  
  pMsg += sizeof(STableIdInfo);
608 609 610
  return pMsg;
}

611
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
612 613
  SSqlCmd *pCmd = &pSql->cmd;

614
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
615

S
slguan 已提交
616 617 618 619
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }
620
  
621
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
622
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
623
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
624
  
625
  if (taosArrayGetSize(pQueryInfo->colList) <= 0) {
626 627 628
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
629 630 631 632 633 634 635 636 637 638
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
639 640

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
641

S
slguan 已提交
642
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
643

644
  int32_t msgLen      = 0;
S
slguan 已提交
645
  int32_t numOfTables = 0;
646
  int32_t numOfTags   = taosArrayGetSize(pTableMetaInfo->tagColList);
H
hzcheng 已提交
647

H
hjxilinx 已提交
648
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
S
slguan 已提交
649
    numOfTables = 1;
650
    tscSetDnodeIpList(pSql, pTableMeta);
H
hjxilinx 已提交
651
    pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
hjxilinx 已提交
652
    tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
653
  } else {  // query super table
654 655 656
    int32_t index = pTableMetaInfo->vgroupIndex;
    if (index < 0) {
      tscError("%p error vgroupIndex:%d", pSql, index);
H
hzcheng 已提交
657 658
      return -1;
    }
H
hjxilinx 已提交
659
    
660 661 662
    SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
    
    pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me
663
    pSql->ipList.inUse    = 0;
664
  
665
    for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
J
jtao1735 已提交
666 667
      strcpy(pSql->ipList.fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
      pSql->ipList.port[i] = pVgroupInfo->ipAddr[i].port;
668
    }
H
hjxilinx 已提交
669
    
670
    tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index);
671
    
H
hjxilinx 已提交
672
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
H
hjxilinx 已提交
673
    numOfTables = 1;
H
hzcheng 已提交
674 675
  }

676
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
677 678
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
679
  } else {
H
hjxilinx 已提交
680 681
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
682 683
  }

684
  pQueryMsg->numOfTables    = htonl(numOfTables);
685 686 687 688 689
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
  pQueryMsg->interpoType    = htons(pQueryInfo->interpoType);
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
690
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
691 692
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
693
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
694
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
695
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
696
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
697
  pQueryMsg->queryType      = htons(pQueryInfo->type);
H
hjxilinx 已提交
698 699 700
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hjxilinx 已提交
701 702
  if (numOfOutput < 0) {
    tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
H
hzcheng 已提交
703 704 705 706
    return -1;
  }

  // set column list ids
707 708
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
709
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
710
  
711 712 713
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
714

715 716 717 718 719 720
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
               pColSchema->name);

721
      return TSDB_CODE_INVALID_SQL;
722
    }
H
hzcheng 已提交
723 724 725

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
726
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
727
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
728

S
slguan 已提交
729 730 731
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
732

S
slguan 已提交
733
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
734
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
735 736

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
737

738
      if (pColFilter->filterstr) {
S
slguan 已提交
739 740 741 742 743 744 745 746 747 748
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
749

S
slguan 已提交
750 751 752 753 754
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
755 756
  }

H
hjxilinx 已提交
757
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
758
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
759
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
760

H
hjxilinx 已提交
761
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
762
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
763 764 765 766
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

767 768 769
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
770

771
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
772
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
773
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
774 775 776 777 778 779 780

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
781
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
782 783 784 785 786
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
787
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
788 789
  }

790
  // serialize the table info (sid, uid, tags)
H
hjxilinx 已提交
791
  pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
H
hzcheng 已提交
792

793
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
794
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
795
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
796 797
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
798
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
799 800
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
801 802 803
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

804 805
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
806 807 808

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
809 810 811
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
812 813 814
    }
  }

815
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
H
hjxilinx 已提交
816
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
817 818
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
819 820
    }
  }
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
      SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
                 pCol->colIndex, pColSchema->name);

        return TSDB_CODE_INVALID_SQL;
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
852 853 854 855 856 857

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

858
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
859
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
860
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
861 862

    // todo refactor
863 864
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
865 866 867 868

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
869 870
  }

S
slguan 已提交
871 872
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
873 874
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
875 876
  }

H
hjxilinx 已提交
877
  // serialize tag column query condition
878
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
879 880
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
881
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
882
    if (pCond != NULL && pCond->cond != NULL) {
883 884
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
885
      
886
      pMsg += pCond->len;
887 888 889
    }
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
890 891 892 893 894 895 896 897
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

H
hjxilinx 已提交
898
  // tbname in/like query expression should be sent to mgmt node
H
hzcheng 已提交
899 900 901 902
  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
903
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
904
  
905
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
906
  assert(msgLen + minMsgSize() <= size);
907 908

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
909 910
}

911 912
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
913
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
914
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
915

916
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
917

918
  assert(pCmd->numOfClause == 1);
919
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
920
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
921

922
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
923 924
}

925 926
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
927
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
928 929 930 931
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
932

933
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
934 935
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
936
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
937

938
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
939 940
}

941 942
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
943
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
944 945 946 947
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
948

949
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
950

951 952
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
953

954 955
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
956

957
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
958

959 960 961 962 963 964 965 966
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
967

968 969 970 971 972 973 974 975 976 977 978 979 980
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
981

S
slguan 已提交
982
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
983
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
984 985
}

986 987
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
988
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
989

S
slguan 已提交
990 991 992 993 994
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

995
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
996

997 998 999
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
1000

1001 1002 1003 1004
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1005 1006
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1007
  }
H
hzcheng 已提交
1008

1009
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1010
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1011
  } else {
S
slguan 已提交
1012
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1013
  }
H
hzcheng 已提交
1014

1015
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1016 1017
}

1018 1019
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1020
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
1021

S
slguan 已提交
1022 1023 1024 1025
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1026

S
slguan 已提交
1027
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
1028 1029
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1030

1031 1032
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1033
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1034

S
slguan 已提交
1035 1036 1037 1038 1039
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1040
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1041

1042
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1043
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
1044
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1045

S
slguan 已提交
1046
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1047
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1048 1049
}

1050 1051
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1052
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1053

S
slguan 已提交
1054 1055 1056 1057
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1058

1059
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1060
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1061
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1062
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1063

S
slguan 已提交
1064
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1065
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1066 1067
}

1068
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1069
  SSqlCmd *pCmd = &pSql->cmd;
1070
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1071 1072 1073 1074
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1075

1076
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1077
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
J
jtao1735 已提交
1078
  strcpy(pDrop->ep, pTableMetaInfo->name);
S
slguan 已提交
1079
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1080

1081
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1082 1083
}

S
[TD-16]  
slguan 已提交
1084
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1085
  SSqlCmd *pCmd = &pSql->cmd;
1086
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1087
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1088

S
slguan 已提交
1089 1090 1091 1092
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1093

1094
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1095
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1096
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1097

1098
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1099 1100
}

S
[TD-16]  
slguan 已提交
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDropMsg->user, pTableMetaInfo->name);

  return TSDB_CODE_SUCCESS;
}

1118 1119
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1120
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1121

S
slguan 已提交
1122 1123 1124 1125
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1126

1127
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1128
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1129
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1130
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1131

1132
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1133 1134
}

1135
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1136
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1137
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1138
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1139
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1140

S
slguan 已提交
1141 1142 1143
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1144
  }
H
hzcheng 已提交
1145

1146
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1147

1148
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1149
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1150
  if (nameLen > 0) {
H
hjxilinx 已提交
1151
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1152
  } else {
S
slguan 已提交
1153
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1154 1155
  }

1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1168

1169 1170 1171 1172
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1173
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1174
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1175 1176
}

1177
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1178
  SSqlCmd *pCmd = &pSql->cmd;
1179
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1180

S
slguan 已提交
1181 1182 1183 1184
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1185

1186
  SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
1187 1188 1189
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1190
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1191 1192
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1193
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1194 1195
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1196
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1197 1198 1199
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1200 1201
}

1202
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1203 1204
  SSqlCmd *pCmd = &(pSql->cmd);

1205
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1206

1207
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1208
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1209 1210
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1211
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1212
  }
1213

1214 1215 1216
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1217 1218 1219 1220

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1221
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1222
  int              msgLen = 0;
S
slguan 已提交
1223
  SSchema *        pSchema;
H
hzcheng 已提交
1224
  int              size = 0;
1225 1226 1227
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1228
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1229 1230

  // Reallocate the payload size
1231
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1232 1233
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1234
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1235
  }
H
hzcheng 已提交
1236 1237


1238
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1239
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1240 1241

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1242
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1243

1244 1245 1246
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1247 1248 1249 1250
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1251
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1252

1253 1254 1255
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
1256
    pMsg += sizeof(STagData);
1257
  } else {  // create (super) table
1258
    pSchema = (SSchema *)pCreateTableMsg->schema;
1259

H
hzcheng 已提交
1260
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1261
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1262 1263 1264 1265

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1266

H
hzcheng 已提交
1267 1268 1269 1270
      pSchema++;
    }

    pMsg = (char *)pSchema;
1271 1272
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1273

1274 1275 1276
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1277 1278 1279
    }
  }

H
hjxilinx 已提交
1280
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1281

S
slguan 已提交
1282
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1283
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1284
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1285
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1286 1287

  assert(msgLen + minMsgSize() <= size);
1288
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1289 1290 1291
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1292
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1293
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1294 1295 1296
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1297
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1298
  SCMAlterTableMsg *pAlterTableMsg;
1299
  char *          pMsg;
H
hzcheng 已提交
1300 1301 1302
  int             msgLen = 0;
  int             size = 0;

1303 1304 1305
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1306
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1307 1308

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1309 1310 1311 1312
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1313

1314
  pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
1315

H
hjxilinx 已提交
1316
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1317

1318 1319
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hjxilinx 已提交
1320
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1321
  pAlterTableMsg->type = htons(pAlterInfo->type);
1322

S
slguan 已提交
1323
  pAlterTableMsg->numOfCols = tscNumOfFields(pQueryInfo);
1324
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
1325

S
slguan 已提交
1326
  SSchema *pSchema = pAlterTableMsg->schema;
S
slguan 已提交
1327
  for (int i = 0; i < pAlterTableMsg->numOfCols; ++i) {
H
hjxilinx 已提交
1328
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
1338
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
1339
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1340
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1341 1342

  assert(msgLen + minMsgSize() <= size);
1343

1344
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1345 1346
}

1347
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1348
  SSqlCmd *pCmd = &pSql->cmd;
1349
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1350
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1351

1352
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1353
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1354
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
1355

1356
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1357 1358
}

1359
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1360 1361 1362
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1363

S
slguan 已提交
1364 1365 1366
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
1367
  }
S
slguan 已提交
1368

S
slguan 已提交
1369 1370 1371 1372
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1373

1374
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1375 1376
}

1377
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1378
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1379 1380 1381
    return pRes->code;
  }

H
hjxilinx 已提交
1382
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1383
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1384
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1399

1400
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1401

H
hzcheng 已提交
1402 1403 1404 1405 1406 1407 1408
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1409
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1410
  } else {
S
slguan 已提交
1411
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1427
  SSqlCmd *       pCmd = &pSql->cmd;
1428
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1429

H
hjxilinx 已提交
1430
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1431 1432
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1433 1434 1435 1436
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
1437
//  SSqlCmd *pCmd = &pSql->cmd;
1438

1439 1440
//  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1441 1442

  int32_t numOfRes = 0;
H
hjxilinx 已提交
1443
#if 0
1444
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
H
hjxilinx 已提交
1445
    numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
1446 1447 1448
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
H
hjxilinx 已提交
1449 1450 1451
  
#endif

H
hzcheng 已提交
1452 1453 1454 1455 1456 1457 1458
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1459 1460
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1461 1462

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1463
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1464 1465 1466 1467
  }

  pRes->row = 0;

1468
  uint8_t code = pRes->code;
H
hzcheng 已提交
1469
  if (pSql->fp) {  // async retrieve metric data
1470 1471
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
1472 1473 1474 1475 1476 1477 1478 1479
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
1480
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1481

1482
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1483
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1484
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1485
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1486
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1487

S
slguan 已提交
1488 1489 1490 1491 1492
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1493
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1494 1495 1496 1497 1498

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
1499
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
1500
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
1501

1502
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1503 1504
}

H
hjxilinx 已提交
1505
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1506
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1507
  char *         pMsg;
H
hzcheng 已提交
1508 1509 1510 1511 1512
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
1513 1514 1515 1516
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
1517 1518 1519 1520
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

1521 1522 1523
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1524
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1525

1526
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1527
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1528
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1529

1530
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1531

H
hjxilinx 已提交
1532
  if (pSql->cmd.autoCreated) {
H
hzcheng 已提交
1533 1534 1535 1536
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

H
hjxilinx 已提交
1537
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
S
slguan 已提交
1538
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1539 1540 1541 1542

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1543
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1544 1545
}

S
slguan 已提交
1546
/**
1547
 *  multi table meta req pkg format:
1548
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1549 1550
 *      no used         4B
 **/
1551
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1564
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1565

1566
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1567
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1568 1569

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1570
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1571 1572 1573 1574
  }

  tfree(tmpData);

1575
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1576
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1577 1578 1579

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1580
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1581 1582 1583 1584 1585
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

1586
static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
H
hzcheng 已提交
1587
  const int32_t defaultSize =
S
slguan 已提交
1588
      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
1589
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
1590

S
slguan 已提交
1591
  int32_t n = 0;
1592 1593 1594 1595
  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
  for (int32_t i = 0; i < size; ++i) {
    assert(0);
//    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
1596
  }
S
slguan 已提交
1597

H
hjxilinx 已提交
1598
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
1599 1600
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
1601
  }
1602

S
slguan 已提交
1603
  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
S
slguan 已提交
1604
  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
H
hjxilinx 已提交
1605
  
1606
  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
S
slguan 已提交
1607

H
hjxilinx 已提交
1608
  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
S
slguan 已提交
1609 1610

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
1611 1612
}

H
hjxilinx 已提交
1613 1614 1615
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {

#if 0
S
slguan 已提交
1616
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1617 1618
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1619
  int             tableIndex = 0;
H
hzcheng 已提交
1620

1621 1622 1623
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1624
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1625

H
hjxilinx 已提交
1626
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1627 1628

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1629 1630 1631 1632
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1633 1634 1635 1636 1637

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
1638
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1639 1640 1641

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1642
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1643
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1644

S
slguan 已提交
1645
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1646 1647 1648 1649 1650

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1651
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1652

S
slguan 已提交
1653
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1654
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1655

S
slguan 已提交
1656 1657 1658
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1659
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1660
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1661 1662 1663 1664

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1665
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1666
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1667
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1668 1669 1670 1671

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1672 1673
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1674 1675 1676 1677

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
1678
      SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
H
hjxilinx 已提交
1679
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1680
        condLen = strlen(pCond->cond) + 1;
1681

H
hjxilinx 已提交
1682
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1683
        if (!ret) {
1684
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
S
slguan 已提交
1685 1686 1687
          return 0;
        }
      }
H
hzcheng 已提交
1688 1689
    }

S
slguan 已提交
1690
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1691

S
slguan 已提交
1692 1693 1694
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1695

S
slguan 已提交
1696 1697 1698
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1699

S
slguan 已提交
1700
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1701 1702 1703 1704 1705 1706 1707
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1708 1709
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1710 1711
    }

1712
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1713

H
hjxilinx 已提交
1714
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1715 1716 1717 1718 1719
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1720 1721
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1722 1723 1724 1725 1726 1727 1728 1729
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1730
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
1731 1732
          SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
          SColIndex *pDestCol = (SColIndex *)pMsg;
1733

H
hjxilinx 已提交
1734
          pDestCol->colIdxInBuf = 0;
1735
          pDestCol->colIndex = htons(pCol->colIndex);
H
hjxilinx 已提交
1736 1737
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1738
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1739

1740
          pMsg += sizeof(SColIndex);
S
slguan 已提交
1741 1742
        }
      }
H
hzcheng 已提交
1743
    }
S
slguan 已提交
1744

H
hjxilinx 已提交
1745 1746
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1747 1748 1749

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1750 1751 1752 1753
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1754
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hzcheng 已提交
1755
  assert(msgLen + minMsgSize() <= size);
H
hjxilinx 已提交
1756
#endif
1757
  
H
hjxilinx 已提交
1758 1759 1760 1761 1762 1763 1764 1765 1766 1767
  SSqlCmd *pCmd = &pSql->cmd;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pCmd->payload;
  strncpy(pStableVgroupMsg->tableId, pTableMetaInfo->name, tListLen(pStableVgroupMsg->tableId));

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
  pCmd->payloadLen = sizeof(SCMSTableVgroupMsg);

1768
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1769 1770
}

1771
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
1772 1773 1774 1775
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
1776
  size += sizeof(SQqueryList);
H
hzcheng 已提交
1777 1778 1779

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
1780
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
1781 1782 1783
    tpSql = tpSql->next;
  }

S
slguan 已提交
1784
  size += sizeof(SStreamList);
H
hzcheng 已提交
1785 1786
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
1787
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
1788 1789 1790 1791 1792 1793
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1794
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1795 1796 1797 1798 1799 1800 1801 1802 1803
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

1804
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
1805
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1806
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1807 1808 1809
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1823
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1824 1825

  assert(msgLen + minMsgSize() <= size);
1826
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1827 1828
}

1829 1830
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1831

1832 1833
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
H
hjxilinx 已提交
1834 1835 1836
  
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1837 1838
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1839
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1840

H
hjxilinx 已提交
1841 1842
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
H
hzcheng 已提交
1843 1844 1845
    return TSDB_CODE_INVALID_VALUE;
  }

1846 1847
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
H
hzcheng 已提交
1848 1849 1850
    return TSDB_CODE_INVALID_VALUE;
  }

1851 1852
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
H
hzcheng 已提交
1853 1854 1855
    return TSDB_CODE_INVALID_VALUE;
  }

H
hjxilinx 已提交
1856 1857
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1858 1859
  }

1860
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1861

1862
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1863 1864 1865
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
H
hjxilinx 已提交
1866 1867
    
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1868 1869 1870
    pSchema++;
  }

1871 1872
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
H
hzcheng 已提交
1873

1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884
#if 0
  // if current table is created according to super table, get the table meta of super table
  if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
    char id[TSDB_TABLE_ID_LEN + 1] = {0};
    strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
  
    // NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
    pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
  }
#endif
  
H
hzcheng 已提交
1885
  // todo add one more function: taosAddDataIfNotExists();
1886
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1887
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1888

H
hjxilinx 已提交
1889 1890 1891
  pTableMetaInfo->pTableMeta =
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer);
  
1892
  // todo handle out of memory case
1893
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hjxilinx 已提交
1894
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
1895
  }
H
hzcheng 已提交
1896

1897
  free(pTableMeta);
1898
  
H
hjxilinx 已提交
1899
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1900 1901
}

S
slguan 已提交
1902
/**
1903
 *  multi table meta rsp pkg format:
1904
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1905 1906 1907
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1908
#if 0
S
slguan 已提交
1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

1921
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1922
  totalNum = htonl(pInfo->numOfTables);
1923
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1924 1925

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1926
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1927
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1928 1929 1930

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1931
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1932 1933
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1934 1935
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
S
slguan 已提交
1936 1937 1938 1939 1940
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
H
hjxilinx 已提交
1964
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
    //  }
S
slguan 已提交
2001
  }
H
hjxilinx 已提交
2002
  
S
slguan 已提交
2003 2004 2005
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
2006 2007
#endif
  
S
slguan 已提交
2008 2009 2010
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
2011 2012
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
S
slguan 已提交
2013 2014
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;
H
hjxilinx 已提交
2015
  
S
slguan 已提交
2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
2032
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
2033 2034

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
2035
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
2036

S
slguan 已提交
2037
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
2038 2039 2040
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

2041
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
H
hzcheng 已提交
2042

2043 2044
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2045 2046 2047
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2048

S
slguan 已提交
2049
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
2050
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2051

S
slguan 已提交
2052
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2053 2054
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2055

S
slguan 已提交
2056
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2057

S
slguan 已提交
2058 2059
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2060
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2061

2062 2063
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2064

S
slguan 已提交
2065
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2066

2067
      pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2068
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2069

2070
      size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2071
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2072 2073
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2074

2075 2076
        ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
        ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
2077

2078 2079
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2080
      }
H
hzcheng 已提交
2081
    }
S
slguan 已提交
2082

2083
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2084 2085
  }

2086
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2087 2088 2089
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2090
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2091
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2092

S
slguan 已提交
2093 2094 2095
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2096

S
slguan 已提交
2097
    // release the used metricmeta
H
hjxilinx 已提交
2098 2099
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2100 2101 2102 2103
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2104
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2105 2106 2107
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2108 2109
  }

S
slguan 已提交
2110 2111 2112 2113 2114 2115 2116 2117
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);
H
hjxilinx 已提交
2118
#endif
2119
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
2120
  
2121
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
2122
  pStableVgroup->numOfVgroups = htonl(pStableVgroup->numOfVgroups);
H
hjxilinx 已提交
2123
  
2124 2125 2126
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
2127
  
2128
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
2129 2130
  STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  
2131 2132 2133
  pInfo->vgroupList = malloc(pRes->rspLen);
  memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen);
  
H
hjxilinx 已提交
2134 2135
  for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) {
    SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i];
2136
    
H
hjxilinx 已提交
2137
    pVgroups->vgId = htonl(pVgroups->vgId);
2138
    assert(pVgroups->numOfIps >= 1);
2139
    
2140
    for(int32_t j = 0; j < pVgroups->numOfIps; ++j) {
H
hjxilinx 已提交
2141
      pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port);
2142
    }
H
hjxilinx 已提交
2143 2144
  }
  
S
slguan 已提交
2145
  return pSql->res.code;
H
hzcheng 已提交
2146 2147 2148 2149 2150 2151
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2152
  STableMetaMsg * pMetaMsg;
2153
  SCMShowRsp *pShow;
S
slguan 已提交
2154
  SSchema *    pSchema;
H
hzcheng 已提交
2155 2156
  char         key[20];

2157 2158 2159
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2160
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2161

H
hjxilinx 已提交
2162
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2163

2164
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2165
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2166 2167
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2168
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2169
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2170

H
hjxilinx 已提交
2171
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2172

H
hjxilinx 已提交
2173
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2174 2175
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2176 2177 2178 2179
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2180
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2181 2182
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2183
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2184
  
H
hjxilinx 已提交
2185 2186 2187
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2188
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsMeterMetaKeepTimer);
H
hjxilinx 已提交
2189
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2190

2191 2192 2193 2194
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
2195 2196
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2197
  SColumnIndex index = {0};
H
hjxilinx 已提交
2198 2199 2200
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2201
    index.columnIndex = i;
2202 2203
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2204 2205
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2206
    
H
hjxilinx 已提交
2207
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
2208
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
2209
  }
H
hjxilinx 已提交
2210 2211
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2212
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
2213 2214
  
  tfree(pTableMeta);
H
hzcheng 已提交
2215 2216 2217 2218
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2219
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2220 2221 2222
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2223
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2224
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2225 2226
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2227 2228 2229
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2230
  tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
2231

S
slguan 已提交
2232
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2233 2234 2235 2236 2237 2238 2239 2240
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2241
  STscObj *       pObj = pSql->pTscObj;
2242
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2243

H
hjxilinx 已提交
2244
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2245 2246 2247 2248
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2249
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2250 2251 2252 2253
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2254
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2255

H
hjxilinx 已提交
2256 2257
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2258 2259 2260 2261 2262 2263 2264 2265
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2266 2267
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2268
   */
H
hjxilinx 已提交
2269 2270
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2271

H
hjxilinx 已提交
2272 2273
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2274
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2275 2276 2277 2278 2279 2280
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2281
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2282

H
hjxilinx 已提交
2283 2284
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2285 2286 2287
    return 0;
  }

H
hjxilinx 已提交
2288 2289
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2290

H
hjxilinx 已提交
2291
  if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2292
    bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
H
hzcheng 已提交
2293

H
hjxilinx 已提交
2294
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2295
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2296

2297
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2298
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2299
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2314
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2315 2316 2317
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2318
  pRes->data = NULL;
S
slguan 已提交
2319
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2320 2321 2322
  return 0;
}

H
hjxilinx 已提交
2323
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2324 2325 2326
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2327
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2328 2329 2330

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2331 2332
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2333
  pRes->completed = (pRetrieve->completed == 1);
2334
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2335
  
2336 2337
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
2338

weixin_48148422's avatar
weixin_48148422 已提交
2339
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2340
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2341
    
H
hjxilinx 已提交
2342
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2343 2344
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2345 2346
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2347
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2348
    p += sizeof(int32_t);
S
slguan 已提交
2349
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2350 2351 2352 2353
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2354
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2355
    }
2356 2357
  }

H
hzcheng 已提交
2358
  pRes->row = 0;
S
slguan 已提交
2359
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
2360 2361 2362 2363 2364

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2365 2366
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2367
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2368

S
slguan 已提交
2369
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2370 2371 2372

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2373

2374
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2375 2376 2377 2378
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2379
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2380

2381
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2382 2383
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2384
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
S
slguan 已提交
2385 2386
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
2387

H
hzcheng 已提交
2388 2389 2390
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2391

2392
  tscAddSubqueryInfo(&pNew->cmd);
2393 2394 2395 2396

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2397
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
S
slguan 已提交
2398
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
2399
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2400
    free(pNew);
2401

S
slguan 已提交
2402 2403 2404
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
2405
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2406
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2407

2408
  strncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, tListLen(pNewMeterMetaInfo->name));
2409
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
2410
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
2411

H
hjxilinx 已提交
2412 2413
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2414

H
hjxilinx 已提交
2415 2416 2417
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2418 2419 2420 2421 2422
  }

  return code;
}

H
hjxilinx 已提交
2423
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2424
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2425

2426
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2427 2428
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2429 2430
  }
  
H
hjxilinx 已提交
2431 2432
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2433
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2434 2435
    tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2436 2437 2438

    return TSDB_CODE_SUCCESS;
  }
2439 2440
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2441 2442
}

H
hjxilinx 已提交
2443
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2444
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2445
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2446 2447 2448 2449
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
2450
 *
H
hzcheng 已提交
2451 2452 2453 2454 2455
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
2456
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
2457 2458 2459 2460 2461 2462 2463
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
2464
 * @param tableId       meter id
H
hzcheng 已提交
2465 2466
 * @return              status code
 */
S
slguan 已提交
2467
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
2468 2469
  int code = 0;

H
hjxilinx 已提交
2470
  // handle table meta renew process
H
hzcheng 已提交
2471
  SSqlCmd *pCmd = &pSql->cmd;
2472 2473

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2474
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2475 2476

  /*
S
slguan 已提交
2477
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
2478 2479
   * 2. if get metermeta failed, still get the metermeta
   */
2480
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
2481
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
2482
    if (pTableMetaInfo->pTableMeta) {
2483 2484
      tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
               tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2485
    }
2486

2487
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
2488
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2489

2490
    code = getTableMetaFromMgmt(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
2491
  } else {
H
hjxilinx 已提交
2492
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2493 2494
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2495 2496 2497 2498 2499
  }

  return code;
}

H
hjxilinx 已提交
2500
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
2501 2502
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2503

H
hjxilinx 已提交
2504
  //the query condition is serialized into pCmd->payload, we need to rebuild key for stable meta info in cache.
2505
//  bool required = false;
2506

2507
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2508
  if (pQueryInfo->pTableMetaInfo[0]->vgroupList != NULL) {
H
hjxilinx 已提交
2509 2510 2511 2512
    return TSDB_CODE_SUCCESS;
  }
  
#if 0
2513
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
2514 2515
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2516
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2517
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
S
slguan 已提交
2518

H
hjxilinx 已提交
2519
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
S
slguan 已提交
2520

2521
    SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
S
slguan 已提交
2522
    if (ppMeta == NULL) {
2523
      required = true;
S
slguan 已提交
2524 2525
      break;
    } else {
H
hjxilinx 已提交
2526
//      pTableMetaInfo->pMetricMeta = ppMeta;
S
slguan 已提交
2527 2528
    }
  }
H
hzcheng 已提交
2529

2530 2531
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
2532 2533
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2534 2535
#endif
  
S
slguan 已提交
2536
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2537 2538 2539
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2540
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2541 2542
  
  SQueryInfo *pNewQueryInfo = NULL;
2543 2544 2545
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
2546
  
2547
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2548
    STableMetaInfo *pMMInfo = tscGetMetaInfo(pQueryInfo, i);
2549

H
hjxilinx 已提交
2550
    STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
2551
    tscAddTableMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->tagColList);
S
slguan 已提交
2552 2553 2554 2555 2556 2557
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2558

2559
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
2560

2561 2562
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
2563

2564 2565
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
2566 2567 2568 2569 2570
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
2571

2572 2573
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
2574
//    tscFreeQueryInfo(pCmd);
2575
//  }
H
hzcheng 已提交
2576

H
hjxilinx 已提交
2577
  tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew);
2578 2579 2580 2581 2582
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2583 2584 2585 2586 2587
  }

  return code;
}

2588
void tscInitMsgsFp() {
S
slguan 已提交
2589 2590 2591
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
2592 2593

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2594
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2595

2596 2597
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2598 2599

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2600
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2601 2602 2603
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2604
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2605 2606 2607
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2608 2609 2610 2611 2612
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2613
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2614
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2615
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2616 2617 2618 2619

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2620 2621 2622
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2623 2624

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2625
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2626 2627 2628 2629 2630

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2631
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2632
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2633
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2634 2635

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2636
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2637
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2638

H
hzcheng 已提交
2639
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
2640 2641 2642 2643 2644
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
2645

H
hzcheng 已提交
2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}