tscServer.c 79.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
28
#include "tlockfree.h"
H
hzcheng 已提交
29 30 31

#define TSC_MGMT_VNODE 999

32
SRpcCorEpSet  tscMgmtEpSet;
33
SRpcEpSet  tscDnodeEpSet;
S
slguan 已提交
34

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
46 47 48 49 50 51 52 53
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
54

55
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
56 57
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

58
  SRpcEpSet* pEpSet = &pSql->epSet;
59 60 61 62
  pEpSet->inUse = 0;

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
63

64 65 66 67
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
68 69 70 71

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
72
  }
73 74

  assert(hasFqdn);
75
}
76

dengyihao's avatar
dengyihao 已提交
77 78 79 80
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
81
}  
dengyihao's avatar
dengyihao 已提交
82 83
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
84 85 86
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
87 88
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
89 90
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
91
   for (int32_t i = 0; i < s1->numOfEps; i++) {
92 93 94 95 96
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
97
}
dengyihao's avatar
dengyihao 已提交
98
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
99
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
100 101 102
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
103
}
dengyihao's avatar
dengyihao 已提交
104
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
105
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
106
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
107
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
108 109 110
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
111
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
112
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
113
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
114
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
115 116
}

dengyihao's avatar
dengyihao 已提交
117
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
118 119
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
120
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
121
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
bugfix  
dengyihao 已提交
123
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
124
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
125 126
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
127
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
128
    tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
dengyihao's avatar
dengyihao 已提交
131
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
132
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
133
}
134 135 136 137
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
138
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
139
  } else {
140
    for (int i = 0; i < dump.numOfEps; ++i) {
141
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
142
    }
S
slguan 已提交
143
  }
S
slguan 已提交
144 145
}

H
hzcheng 已提交
146 147 148 149 150 151 152 153 154 155 156 157
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
158
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
159 160 161 162
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
163
    } 
S
slguan 已提交
164

S
Shengliang Guan 已提交
165 166
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
167 168
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
169
      return;
H
hzcheng 已提交
170
    } else {
S
slguan 已提交
171 172
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
173 174
    }
  } else {
175
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
176 177 178 179 180 181 182
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
183 184 185
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
186

187 188 189
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
190 191
  }

192 193 194
  if (tscShouldFreeHeartBeat(pHB)) {
    tscDebug("%p free HB object and release connection", pHB);
    tscFreeSqlObj(pHB);
H
hzcheng 已提交
195
    tscCloseTscObj(pObj);
H
Haojun Liao 已提交
196
  } else {
197
    int32_t code = tscProcessSql(pHB);
H
Haojun Liao 已提交
198
    if (code != TSDB_CODE_SUCCESS) {
199
      tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
Haojun Liao 已提交
200
    }
H
hzcheng 已提交
201 202 203 204
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
205
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
206 207 208
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
209
  if (NULL == pMsg) {
210
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
211
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
212 213
  }

214 215
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
216
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
217 218
  }

219
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
220

J
jtao1735 已提交
221
  SRpcMsg rpcMsg = {
222 223 224
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
227
      .code    = 0
J
jtao1735 已提交
228
  };
H
Haojun Liao 已提交
229

H
Haojun Liao 已提交
230 231 232 233
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
234 235
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
236 237
}

238
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
  SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
H
Haojun Liao 已提交
240
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
241
    tscError("%p sql is already released", pSql);
242 243
    return;
  }
244

H
Haojun Liao 已提交
245
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
246 247
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
248

H
Haojun Liao 已提交
249
  if (pObj->signature != pObj) {
250
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
251 252 253 254 255 256

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

257 258
  pSql->pRpcCtx = NULL;    // clear the rpcCtx

H
Haojun Liao 已提交
259 260
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
261
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
262
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
263

H
Haojun Liao 已提交
264
    tscFreeSqlObj(pSql);
265
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
266
    return;
H
hzcheng 已提交
267 268
  }

269
  if (pEpSet) { 
dengyihao's avatar
dengyihao 已提交
270
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
dengyihao's avatar
fixbug  
dengyihao 已提交
271
      if(pCmd->command < TSDB_SQL_MGMT)  { 
272
        tscUpdateVgroupInfo(pSql, pEpSet); 
dengyihao's avatar
dengyihao 已提交
273
      } else {
dengyihao's avatar
dengyihao 已提交
274
        tscUpdateMgmtEpSet(pEpSet);
dengyihao's avatar
fixbug  
dengyihao 已提交
275
    }
dengyihao's avatar
dengyihao 已提交
276
    }
J
jtao1735 已提交
277 278
  }

279 280 281 282 283
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
284
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
285 286 287 288 289 290 291
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
292

293 294 295 296
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
297 298 299 300 301 302
      // wait for a little bit moment and then retry
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
303
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
304 305 306 307 308

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
309 310
      }
    }
S
slguan 已提交
311
  }
312

H
hzcheng 已提交
313
  pRes->rspLen = 0;
314
  
H
Haojun Liao 已提交
315
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
316
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
317 318
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
319 320
  }

S
slguan 已提交
321
  if (pRes->code == TSDB_CODE_SUCCESS) {
322
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
323 324 325
    pSql->retry = 0;
  }

326
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
327
    assert(rpcMsg->msgType == pCmd->msgType + 1);
328
    pRes->code    = rpcMsg->code;
329
    pRes->rspType = rpcMsg->msgType;
330
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
331

332
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
333 334
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
335
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
336 337
      } else {
        pRes->pRsp = tmp;
338
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
339
      }
340 341
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
342 343
    }

H
hzcheng 已提交
344 345 346 347
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
348
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
349
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
350 351 352 353 354 355 356
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
357
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
359
    } else {
360
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
361 362
    }
  }
363
  
H
Haojun Liao 已提交
364
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
365
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
366
  }
S
Shengliang Guan 已提交
367

368
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
369
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
370
    
H
hjxilinx 已提交
371
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
372
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
373

374
    if (shouldFree) {
375
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
376
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
377 378 379
    }
  }

380
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
381 382
}

S
slguan 已提交
383 384 385
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
386

H
hjxilinx 已提交
387 388 389 390 391 392 393
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
394
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
395 396 397 398 399 400
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
401
  }
402

403 404 405
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
406
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
407
    pRes->code = code;
H
hjxilinx 已提交
408
    tscQueueAsyncRes(pSql);
409
    return pRes->code;
S
slguan 已提交
410
  }
H
hjxilinx 已提交
411 412
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
413 414 415
}

int tscProcessSql(SSqlObj *pSql) {
416
  char    *name = NULL;
417
  SSqlCmd *pCmd = &pSql->cmd;
418
  
419
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
420
  STableMetaInfo *pTableMetaInfo = NULL;
421
  uint32_t        type = 0;
422

423
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
424
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
425
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
426
    type = pQueryInfo->type;
427

H
hjxilinx 已提交
428
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
429
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
430
  }
431

432
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
433
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
434
    if (pTableMetaInfo == NULL) {
435
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
436 437
      return pSql->res.code;
    }
H
hjxilinx 已提交
438
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
439
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
440 441 442
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
443
  
S
slguan 已提交
444 445
  return doProcessSql(pSql);
}
H
hzcheng 已提交
446

H
hjxilinx 已提交
447
void tscKillSTableQuery(SSqlObj *pSql) {
448 449 450
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
451
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
452 453 454
    return;
  }

455 456
  pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;

H
hzcheng 已提交
457
  for (int i = 0; i < pSql->numOfSubs; ++i) {
458
    // NOTE: pSub may have been released already here
H
hzcheng 已提交
459
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
460
    if (pSub == NULL) {
H
hzcheng 已提交
461 462
      continue;
    }
S
slguan 已提交
463

dengyihao's avatar
dengyihao 已提交
464
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
465 466
    if (pSub->pRpcCtx != NULL) {
      rpcCancelRequest(pSub->pRpcCtx);
H
hzcheng 已提交
467
    }
H
Haojun Liao 已提交
468

469
    tscQueueAsyncRes(pSub); // async res? not other functions?
H
hzcheng 已提交
470 471
  }

472
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
473 474
}

J
jtao1735 已提交
475
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
476
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
477
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
478

H
Haojun Liao 已提交
479
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
480
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
481

482
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
483 484
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
485
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
486
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
487
    
H
hjxilinx 已提交
488
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
489 490
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
491
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
492
  } else {
H
hjxilinx 已提交
493
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
494
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
495
  }
496 497

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
498
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
499 500 501

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

502
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
503 504
}

505
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
506
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
507
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
508
  
509
  char* pMsg = pSql->cmd.payload;
510 511 512
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
513 514
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

515
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
516 517
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

518
  pMsg += sizeof(SMsgDesc);
519
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
520

H
hjxilinx 已提交
521
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
522
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
523
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
524
  
H
Haojun Liao 已提交
525
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
526

H
hjxilinx 已提交
527
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
528
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
529
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
530

531 532
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
533
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
534 535 536
}

/*
537
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
538
 */
539
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
540
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
541
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
542

S
TD-1057  
Shengliang Guan 已提交
543
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
544 545
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
546
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
547
  
548
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
549 550
}

551
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
552
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
553
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
554

H
hjxilinx 已提交
555
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
556
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
557 558
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
559
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
560 561
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
562
  
B
Bomin Zhang 已提交
563
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
564
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
565 566
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
567
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
568 569
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
570
    }
weixin_48148422's avatar
weixin_48148422 已提交
571

572 573 574 575
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
576

577
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
578 579 580
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
581

582 583
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
584
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
585
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
586
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
587
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
588

589
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
590

591
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
592

dengyihao's avatar
bugfix  
dengyihao 已提交
593
    // set the vgroup info 
594
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
595 596
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
597
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
598 599 600 601 602 603 604 605 606
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
607
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
608 609 610 611
      pMsg += sizeof(STableIdInfo);
    }
  }
  
612
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
613
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
614
  
615 616 617
  return pMsg;
}

618
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
619 620
  SSqlCmd *pCmd = &pSql->cmd;

621
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
622

S
slguan 已提交
623 624
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
625
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
626
  }
627
  
628
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
629
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
630
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
631 632 633 634 635 636

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
    tscError("%p illegal value of numOfCols in query msg: %"PRIu64", table cols:%d", pSql, numOfSrcCols,
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
637
    return TSDB_CODE_TSC_INVALID_SQL;
638
  }
639 640 641
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
H
Haojun Liao 已提交
642
    return TSDB_CODE_TSC_INVALID_SQL;
643 644 645 646
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
647
    return TSDB_CODE_TSC_INVALID_SQL;
648
  }
649

650
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
651

S
TD-1057  
Shengliang Guan 已提交
652
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
653
  
654
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
655 656
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
657
  } else {
H
hjxilinx 已提交
658 659
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
660 661
  }

662 663
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
664
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
665 666
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
667
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
668 669
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
670
  pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
H
hjxilinx 已提交
671
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
672
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
673
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
674
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
675
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
676 677
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
678
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
679 680

  // set column list ids
681 682
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
683
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
684
  
685 686 687
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
688

689
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
690
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
691 692
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
693 694
               pColSchema->name);

695
      return TSDB_CODE_TSC_INVALID_SQL;
696
    }
H
hzcheng 已提交
697 698 699

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
700
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
701
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
702

S
slguan 已提交
703 704 705
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
706

S
slguan 已提交
707
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
708
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
709 710

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
711

712
      if (pColFilter->filterstr) {
S
slguan 已提交
713
        pFilterMsg->len = htobe64(pColFilter->len);
714
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
715 716 717 718 719 720 721 722
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
723

S
slguan 已提交
724 725
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
726
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
727 728
      }
    }
H
hzcheng 已提交
729 730
  }

H
hjxilinx 已提交
731
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
732
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
733
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
734

735
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
736
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
737
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
738
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
739 740
    }

741 742 743
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
744

745
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
746
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
747
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
748 749

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
750
      // todo add log
H
hzcheng 已提交
751 752 753 754 755
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
756
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
757 758 759 760 761
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
762
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
763
  }
764
  
765
  // serialize the table info (sid, uid, tags)
766 767
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
768
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
769
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
770
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
771 772
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
773
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
774 775
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
776 777 778
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

779 780
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
781 782 783

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
784 785 786
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
787 788 789
    }
  }

790
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
791
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
792 793
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
794 795
    }
  }
796 797 798 799 800 801 802 803 804
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
805
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
806 807 808 809
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
810 811
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
812
                 pCol->colIndex.columnIndex, pColSchema->name);
813

814
        return TSDB_CODE_TSC_INVALID_SQL;
815 816 817 818 819 820 821 822 823 824 825 826
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
827

H
Haojun Liao 已提交
828 829 830 831
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
832
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
849
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
850
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
851 852 853
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

854
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
855
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
856
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
857 858

    // todo refactor
B
Bomin Zhang 已提交
859 860 861 862 863
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
864 865 866

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
867 868 869 870
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
871 872 873 874

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
875 876
  }

S
slguan 已提交
877 878
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
879 880
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
881 882
  }

S
TD-1057  
Shengliang Guan 已提交
883
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
884

885
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
886
  pCmd->payloadLen = msgLen;
S
slguan 已提交
887
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
888
  
889
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
890
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
891 892

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
893 894
}

895 896
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
897
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
898
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
899

900
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
901

902
  assert(pCmd->numOfClause == 1);
903
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
904
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
905

906
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
907 908
}

909 910
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
911
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
912 913
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
914
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
915
  }
H
hzcheng 已提交
916

917
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
918 919
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
920
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
921

922
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
923 924
}

925 926
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
927
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
928 929
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
930
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
931
  }
H
hzcheng 已提交
932

933
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
934

H
Haojun Liao 已提交
935 936
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
937

938 939
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
940

941
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
942

943 944 945 946 947 948 949 950
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
951

952 953 954 955 956 957 958 959 960 961 962 963 964
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
965

S
slguan 已提交
966
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
967
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
968 969
}

970 971
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
972
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
973

S
slguan 已提交
974 975
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
976
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
977 978
  }

979
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
980

981 982
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
983
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
984

985 986 987 988
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
989 990
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
991
  }
H
hzcheng 已提交
992

993
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
994
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
995
  } else {
S
slguan 已提交
996
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
997
  }
H
hzcheng 已提交
998

999
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1000 1001
}

1002 1003
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1004
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1005
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1006 1007
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1008

1009 1010
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1011
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1012

S
slguan 已提交
1013 1014
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1015
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1016 1017
  }

1018
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1019

1020
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1021
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1022
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1023

S
slguan 已提交
1024
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1025
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1026 1027
}

1028 1029
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1030
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1031

S
slguan 已提交
1032 1033
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1034
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1035
  }
H
hzcheng 已提交
1036

1037
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1038
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1039
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1040
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1041

S
slguan 已提交
1042
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1043
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1044 1045
}

1046
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1047
  SSqlCmd *pCmd = &pSql->cmd;
1048
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1049 1050
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1051
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1052
  }
H
hzcheng 已提交
1053

1054
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1055
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1056
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1057
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1058

1059
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1060 1061
}

S
[TD-16]  
slguan 已提交
1062
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1063
  SSqlCmd *pCmd = &pSql->cmd;
1064
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1065
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1066

S
slguan 已提交
1067 1068
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1069
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1070
  }
H
hzcheng 已提交
1071

1072
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1073
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1074
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1075

1076
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1077 1078
}

S
[TD-16]  
slguan 已提交
1079 1080 1081 1082 1083 1084 1085
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1086
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1087 1088 1089 1090
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1091
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1092 1093 1094 1095

  return TSDB_CODE_SUCCESS;
}

1096 1097
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1098
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1099

S
slguan 已提交
1100 1101
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1102
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1103
  }
1104

1105
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1106
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1107
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1108
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1109

1110
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1111 1112
}

1113
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1114
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1115
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1116
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1117
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1118

S
slguan 已提交
1119 1120
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1121
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1122
  }
H
hzcheng 已提交
1123

1124
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1125

1126
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1127
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1128
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1129
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1130
  } else {
B
Bomin Zhang 已提交
1131
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1132 1133
  }

1134 1135 1136 1137
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1138
    SStrToken *pPattern = &pShowInfo->pattern;
1139 1140 1141 1142 1143
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1144
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1145
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1146

dengyihao's avatar
dengyihao 已提交
1147 1148
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1149 1150
  }

1151
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1152
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1153 1154
}

1155
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1156
  SSqlCmd *pCmd = &pSql->cmd;
1157
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1158

1159 1160
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1161
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1162 1163
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1164
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1165 1166
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1167
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1168 1169 1170
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1171 1172
}

1173
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1174 1175
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1176
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1177

1178
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1179
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1180 1181
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1182
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1183
  }
1184

1185 1186 1187
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1188 1189 1190 1191

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1192
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1193
  int              msgLen = 0;
S
slguan 已提交
1194
  SSchema *        pSchema;
H
hzcheng 已提交
1195
  int              size = 0;
1196 1197 1198
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1199
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1200 1201

  // Reallocate the payload size
1202
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1203 1204
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1205
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1206
  }
H
hzcheng 已提交
1207 1208


1209
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1210
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1211 1212

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1213
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1214

1215 1216 1217
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1218 1219 1220 1221
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1222
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1223

1224 1225
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1226 1227 1228 1229 1230 1231 1232
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1233
  } else {  // create (super) table
1234
    pSchema = (SSchema *)pCreateTableMsg->schema;
1235

H
hzcheng 已提交
1236
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1237
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1238 1239 1240 1241

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1242

H
hzcheng 已提交
1243 1244 1245 1246
      pSchema++;
    }

    pMsg = (char *)pSchema;
1247 1248
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1249

1250 1251 1252
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1253 1254 1255
    }
  }

H
hjxilinx 已提交
1256
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1257

S
TD-1057  
Shengliang Guan 已提交
1258
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1259
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1260
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1261
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1262 1263

  assert(msgLen + minMsgSize() <= size);
1264
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1265 1266 1267
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1268
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1269
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1270 1271 1272
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1273
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1274 1275
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1276

1277
  SSqlCmd    *pCmd = &pSql->cmd;
1278 1279
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1280
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1281 1282 1283
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1284 1285
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1286
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1287
  }
1288 1289
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1290
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1291

H
hjxilinx 已提交
1292
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1293
  pAlterTableMsg->type = htons(pAlterInfo->type);
1294

1295
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1296
  SSchema *pSchema = pAlterTableMsg->schema;
1297
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1298
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1299
  
H
hzcheng 已提交
1300 1301 1302 1303 1304 1305 1306
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1307 1308 1309
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1310

S
TD-1057  
Shengliang Guan 已提交
1311
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1312

H
hzcheng 已提交
1313
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1314
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1315 1316

  assert(msgLen + minMsgSize() <= size);
1317

1318
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1319 1320
}

1321 1322 1323 1324
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1325
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1326
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1327

1328 1329
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1330

dengyihao's avatar
dengyihao 已提交
1331
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1332

1333 1334 1335
  return TSDB_CODE_SUCCESS;
}

1336
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1337
  SSqlCmd *pCmd = &pSql->cmd;
1338
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1339
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1340

1341
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1342
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1343
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1344

1345
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1346 1347
}

1348
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1349
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1350
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1351
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1352

S
slguan 已提交
1353 1354
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1355
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1356
  }
S
slguan 已提交
1357

S
slguan 已提交
1358 1359 1360 1361
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1362

1363
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1364 1365
}

1366
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1367
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1368 1369 1370
    return pRes->code;
  }

H
hjxilinx 已提交
1371
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1372
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1373
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1388

1389
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1390

H
hzcheng 已提交
1391 1392 1393 1394 1395 1396
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1397
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1398
  } else {
S
slguan 已提交
1399
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1415
  SSqlCmd *       pCmd = &pSql->cmd;
1416
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1417

H
hjxilinx 已提交
1418
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1419 1420
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1421 1422 1423
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1424 1425 1426
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1427 1428 1429
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1430
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1431 1432 1433
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
Haojun Liao 已提交
1434 1435 1436 1437 1438 1439
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1440
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1441
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1442 1443

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1444
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1445 1446 1447
  }

  pRes->row = 0;
1448
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1449

H
Haojun Liao 已提交
1450
  code = pRes->code;
H
hjxilinx 已提交
1451 1452 1453 1454
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1455 1456 1457 1458 1459
  }

  return code;
}

S
slguan 已提交
1460
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1461

1462
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1463
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1464
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1465
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1466
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1467

S
slguan 已提交
1468 1469
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1470
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1471 1472
  }

1473
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1474 1475 1476 1477

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1478 1479 1480
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1481

1482
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1483 1484
}

H
hjxilinx 已提交
1485
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1486
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1487
  char *         pMsg;
H
hzcheng 已提交
1488 1489
  int            msgLen = 0;

B
Bomin Zhang 已提交
1490 1491 1492
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
H
Haojun Liao 已提交
1493
    if ((tmpData = calloc(1, len)) == NULL) {
1494
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1495 1496
    }

H
hzcheng 已提交
1497
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1498
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1499 1500
  }

1501 1502 1503
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1504
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1505

1506
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1507
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1508
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1509

1510
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1511

B
Bomin Zhang 已提交
1512 1513 1514
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1515 1516
  }

S
TD-1057  
Shengliang Guan 已提交
1517
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1518
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1519

S
Shengliang Guan 已提交
1520
  taosTFree(tmpData);
H
hzcheng 已提交
1521

S
TD-1057  
Shengliang Guan 已提交
1522
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
1523
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1524 1525
}

S
slguan 已提交
1526
/**
1527
 *  multi table meta req pkg format:
1528
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1529 1530
 *      no used         4B
 **/
1531
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1532
#if 0
S
slguan 已提交
1533 1534 1535 1536 1537
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1538
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1539 1540 1541 1542 1543
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1544
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1545

1546
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1547
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1548 1549

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1550
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1551 1552
  }

S
Shengliang Guan 已提交
1553
  taosTFree(tmpData);
S
slguan 已提交
1554

1555
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1556
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1557 1558 1559

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1560
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1561 1562 1563
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1564 1565
#endif
  return 0;  
S
slguan 已提交
1566 1567
}

H
hjxilinx 已提交
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1585
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1586 1587 1588 1589 1590 1591 1592 1593
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1594

H
hjxilinx 已提交
1595 1596
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1597 1598 1599 1600 1601 1602 1603 1604 1605 1606
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1607 1608 1609
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1610
  }
H
hjxilinx 已提交
1611 1612

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1613
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1614

1615
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1616 1617
}

1618 1619
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1620 1621
  STscObj *pObj = pSql->pTscObj;

1622
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1623

S
Shengliang Guan 已提交
1624
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1625 1626 1627
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1628
    numOfQueries++;
H
hzcheng 已提交
1629 1630
  }

S
Shengliang Guan 已提交
1631
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1632 1633 1634
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1635
    numOfStreams++;
H
hzcheng 已提交
1636 1637
  }

1638
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1639
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1640
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1641
    tscError("%p failed to malloc for heartbeat msg", pSql);
H
Haojun Liao 已提交
1642
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1643
  }
H
hzcheng 已提交
1644

1645
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1646 1647
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1648
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1649 1650 1651 1652

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1653
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1654

1655
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1656 1657
}

1658 1659
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1660

1661 1662
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1663
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1664 1665
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1666 1667
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1668
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1669

1670 1671
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid);
1672
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1673 1674
  }

B
Bomin Zhang 已提交
1675
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1676
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1677
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1678 1679
  }

1680 1681
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1682
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1683 1684
  }

1685 1686
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1687 1688
  }

1689
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1690

1691
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1692 1693 1694
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1695 1696 1697 1698 1699

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1700
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1701 1702 1703
    pSchema++;
  }

1704 1705
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1706
  
H
hzcheng 已提交
1707
  // todo add one more function: taosAddDataIfNotExists();
1708
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1709
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1710

H
Haojun Liao 已提交
1711
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
1712
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1713
  
1714
  // todo handle out of memory case
1715
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1716
    free(pTableMeta);
1717
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1718
  }
H
hzcheng 已提交
1719

1720
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1721
  free(pTableMeta);
1722
  
H
hjxilinx 已提交
1723
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1724 1725
}

S
slguan 已提交
1726
/**
1727
 *  multi table meta rsp pkg format:
1728
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1729 1730 1731
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1732
#if 0
S
slguan 已提交
1733 1734 1735 1736 1737
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1738
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1739
    pSql->res.numOfTotal = 0;
1740
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1741 1742 1743 1744
  }

  rsp++;

1745
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1746
  totalNum = htonl(pInfo->numOfTables);
1747
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1748 1749

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1750
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1751
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1752 1753 1754

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1755
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1756 1757
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1758 1759
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1760
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1761
      pSql->res.numOfTotal = i;
1762
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1763 1764
    }

H
hjxilinx 已提交
1765 1766 1767 1768
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1769
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1770
    //      pSql->res.numOfTotal = i;
1771
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1772 1773 1774 1775
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1776
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1777
    //      pSql->res.numOfTotal = i;
1778
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1779 1780 1781 1782
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1783
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1784
    //      pSql->res.numOfTotal = i;
1785
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1786 1787
    //    }
    //
H
hjxilinx 已提交
1788
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1823
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1824
    //  }
S
slguan 已提交
1825
  }
H
hjxilinx 已提交
1826
  
S
slguan 已提交
1827 1828
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1829
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1830 1831
#endif
  
S
slguan 已提交
1832 1833 1834
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1835
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1836
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1837
  
H
hjxilinx 已提交
1838
  // NOTE: the order of several table must be preserved.
1839
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1840 1841
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1842
  
1843 1844 1845
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1846
  
1847
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1859
      //just init, no need to lock
H
hjxilinx 已提交
1860 1861
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
      pVgroups->vgId = htonl(pVgroups->vgId);
1862
      assert(pVgroups->numOfEps >= 1);
H
hjxilinx 已提交
1863

1864 1865
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
        pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
H
hjxilinx 已提交
1866
      }
1867
    }
1868 1869

    pMsg += size;
H
hjxilinx 已提交
1870 1871
  }
  
S
slguan 已提交
1872
  return pSql->res.code;
H
hzcheng 已提交
1873 1874 1875 1876 1877 1878
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1879
  STableMetaMsg * pMetaMsg;
1880
  SCMShowRsp *pShow;
S
slguan 已提交
1881
  SSchema *    pSchema;
H
hzcheng 已提交
1882 1883
  char         key[20];

1884 1885 1886
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1887
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1888

H
hjxilinx 已提交
1889
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1890

1891
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1892
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1893 1894
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1895
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1896
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1897

H
hjxilinx 已提交
1898
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1899

H
hjxilinx 已提交
1900
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1901 1902
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1903 1904 1905 1906
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1907
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1908 1909
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1910
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1911
  
H
hjxilinx 已提交
1912 1913 1914
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1915
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
1916
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1917
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1918

1919 1920 1921 1922
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1923 1924
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1925
  SColumnIndex index = {0};
H
hjxilinx 已提交
1926 1927 1928
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1929
    index.columnIndex = i;
1930 1931
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1932 1933
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1934
    
H
hjxilinx 已提交
1935
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1936
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1937
  }
H
hjxilinx 已提交
1938 1939
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1940
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1941
  
S
Shengliang Guan 已提交
1942
  taosTFree(pTableMeta);
H
hzcheng 已提交
1943 1944 1945
  return 0;
}

1946 1947 1948 1949 1950 1951 1952 1953 1954 1955
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

1956 1957 1958 1959 1960 1961
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
    pSql->res.code = terrno;
    return;
  }

1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;
  pObj->pHb = pSql;
  tscAddSubqueryInfo(&pObj->pHb->cmd);

  tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}

H
hzcheng 已提交
1979
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
1980
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
1981 1982 1983
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1984
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1985
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1986 1987
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1988 1989
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1990
  
dengyihao's avatar
dengyihao 已提交
1991 1992 1993
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
1994
  } 
H
hzcheng 已提交
1995

S
slguan 已提交
1996
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
1997 1998
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
1999
  pObj->connId = htonl(pConnect->connId);
2000 2001 2002

  createHBObj(pObj);

S
scripts  
slguan 已提交
2003
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2004 2005 2006 2007 2008

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2009
  STscObj *       pObj = pSql->pTscObj;
2010
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2011

B
Bomin Zhang 已提交
2012
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2013 2014 2015
  return 0;
}

S
Shengliang Guan 已提交
2016
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2017
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2018 2019 2020 2021
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2022
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2023

H
Haojun Liao 已提交
2024 2025
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2026 2027 2028 2029 2030 2031 2032
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2033 2034
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2035
   */
2036
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2037
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2038

H
hjxilinx 已提交
2039 2040
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2041 2042 2043 2044 2045 2046
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2047
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2048

H
Haojun Liao 已提交
2049
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2050
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2051 2052 2053
    return 0;
  }

2054
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2055
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2056

H
hjxilinx 已提交
2057
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2058
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2059
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2060

2061
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2062
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2063
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2078
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2079 2080 2081
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2082
  pRes->data = NULL;
S
slguan 已提交
2083
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2084 2085 2086
  return 0;
}

H
hjxilinx 已提交
2087
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2088 2089 2090
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2091
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2092 2093 2094

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2095 2096
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2097
  pRes->completed = (pRetrieve->completed == 1);
2098
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2099
  
2100
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2101 2102 2103 2104
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2105
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2106
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2107
    
H
hjxilinx 已提交
2108
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2109 2110
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2111 2112
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2113
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2114
    p += sizeof(int32_t);
S
slguan 已提交
2115
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2116 2117
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2118
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2119 2120
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2121
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2122
    }
2123 2124
  }

H
hzcheng 已提交
2125
  pRes->row = 0;
2126
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2127 2128 2129 2130

  return 0;
}

H
hjxilinx 已提交
2131
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2132

2133
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2134 2135
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2136
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2137
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2138
  }
2139

H
hzcheng 已提交
2140 2141 2142
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2143

2144
  tscAddSubqueryInfo(&pNew->cmd);
2145

2146
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2147

H
hjxilinx 已提交
2148
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2149
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2150
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2151
    free(pNew);
2152

2153
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2154 2155
  }

H
hjxilinx 已提交
2156
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2157
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2158

B
Bomin Zhang 已提交
2159
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2160 2161
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2162
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2163

H
hjxilinx 已提交
2164 2165
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2166

H
hjxilinx 已提交
2167 2168
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2169
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2170 2171 2172 2173 2174
  }

  return code;
}

H
hjxilinx 已提交
2175
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2176
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2177

2178
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2179 2180
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2181 2182
  }
  
H
Haojun Liao 已提交
2183
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2184
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2185
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2186
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2187
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2188 2189 2190

    return TSDB_CODE_SUCCESS;
  }
2191 2192
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2193 2194
}

H
hjxilinx 已提交
2195
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2196
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2197
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2198 2199 2200
}

/**
H
Haojun Liao 已提交
2201
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2202
 * @param pSql          sql object
B
Bomin Zhang 已提交
2203
 * @param tableIndex    table index
H
hzcheng 已提交
2204 2205
 * @return              status code
 */
B
Bomin Zhang 已提交
2206
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2207
  SSqlCmd *pCmd = &pSql->cmd;
2208 2209

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2210
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2211

H
Haojun Liao 已提交
2212 2213
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2214
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2215
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2216 2217
  }

H
Haojun Liao 已提交
2218 2219
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2220 2221
}

H
hjxilinx 已提交
2222
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2223
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2224
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2225
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2226 2227
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2228 2229
    }
  }
H
hjxilinx 已提交
2230 2231 2232 2233
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2234

H
hjxilinx 已提交
2235
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2236
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2237 2238 2239
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2240 2241
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2242

S
slguan 已提交
2243
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2244 2245 2246
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2247
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2248
  
2249 2250
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2251
    tscFreeSqlObj(pNew);
2252 2253
    return code;
  }
2254
  
H
hjxilinx 已提交
2255
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2256
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2257 2258 2259
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2260 2261 2262 2263 2264 2265
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2266

2267
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2268
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2269

2270 2271 2272 2273
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2274
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2275 2276 2277 2278 2279
  }

  return code;
}

2280
void tscInitMsgsFp() {
S
slguan 已提交
2281 2282
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2283
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2284 2285

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2286
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2287

2288 2289
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2290 2291

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2292
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2293 2294 2295
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2296
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2297 2298 2299
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2300
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2301
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2302 2303 2304 2305
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2306
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2307
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2308
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2309 2310 2311 2312

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2313 2314 2315
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2316 2317

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2318
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2319 2320 2321 2322 2323

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2324
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2325
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2326
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2327 2328

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2329
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2330
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2331

H
Haojun Liao 已提交
2332 2333 2334 2335 2336
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2337

H
hzcheng 已提交
2338 2339
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2340
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2341 2342 2343 2344 2345 2346 2347 2348 2349 2350

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}