tscServer.c 79.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
28
#include "tlockfree.h"
H
hzcheng 已提交
29 30 31

#define TSC_MGMT_VNODE 999

32
SRpcCorEpSet  tscMgmtEpSet;
33
SRpcEpSet  tscDnodeEpSet;
S
slguan 已提交
34

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

47
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
48 49
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

50
  SRpcEpSet* pEpSet = &pSql->epSet;
51 52 53 54
  pEpSet->inUse = 0;

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
55

56 57 58 59
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
60 61 62 63

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
64
  }
65 66

  assert(hasFqdn);
67
}
68

dengyihao's avatar
dengyihao 已提交
69 70 71 72
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
73
}  
dengyihao's avatar
dengyihao 已提交
74 75
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
76 77 78
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
79 80
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
81 82
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
83
   for (int32_t i = 0; i < s1->numOfEps; i++) {
84 85 86 87 88
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
91
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
92 93 94
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
95
}
dengyihao's avatar
dengyihao 已提交
96
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
97
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
98
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
99
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
100 101 102
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
103
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
104
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
105
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
106
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
107 108
}

dengyihao's avatar
dengyihao 已提交
109
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
110 111
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
112
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
bugfix  
dengyihao 已提交
115
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
116
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
117 118
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
119
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
Y
yihaoDeng 已提交
120
    tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], sizeof(pEpSet->fqdn[i]));
121
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
122
  }
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
124
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
125
}
126 127 128 129
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
130
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
131
  } else {
132
    for (int i = 0; i < dump.numOfEps; ++i) {
133
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
134
    }
S
slguan 已提交
135
  }
S
slguan 已提交
136 137
}

H
hzcheng 已提交
138 139 140 141 142 143 144 145 146 147 148 149
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
150
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
151 152 153 154
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
155
    } 
S
slguan 已提交
156

S
Shengliang Guan 已提交
157 158
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
159 160
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
161
      return;
H
hzcheng 已提交
162
    } else {
S
slguan 已提交
163 164
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
165 166
    }
  } else {
167
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
168 169 170 171 172 173 174
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
175 176 177
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
178

179 180 181
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
182 183
  }

184 185 186
  if (tscShouldFreeHeartBeat(pHB)) {
    tscDebug("%p free HB object and release connection", pHB);
    tscFreeSqlObj(pHB);
H
hzcheng 已提交
187
    tscCloseTscObj(pObj);
H
Haojun Liao 已提交
188
  } else {
189
    int32_t code = tscProcessSql(pHB);
H
Haojun Liao 已提交
190
    if (code != TSDB_CODE_SUCCESS) {
191
      tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
Haojun Liao 已提交
192
    }
H
hzcheng 已提交
193 194 195 196
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
197
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
198 199 200
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
201
  if (NULL == pMsg) {
202
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
203
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
204 205
  }

206 207
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
208
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
209 210
  }

211
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
212

J
jtao1735 已提交
213
  SRpcMsg rpcMsg = {
214 215 216
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217 218
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
219
      .code    = 0
J
jtao1735 已提交
220
  };
H
hzcheng 已提交
221

H
Haojun Liao 已提交
222 223 224 225
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
226
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
S
slguan 已提交
227
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
228 229
}

230
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231
  SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
H
Haojun Liao 已提交
232
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
233
    tscError("%p sql is already released", pSql);
234 235
    return;
  }
236

H
Haojun Liao 已提交
237
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
238 239
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
240

H
Haojun Liao 已提交
241
  if (pObj->signature != pObj) {
242
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
243 244 245 246 247 248

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

249 250
  pSql->pRpcCtx = NULL;    // clear the rpcCtx

H
Haojun Liao 已提交
251 252
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
253
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
254
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
255

H
Haojun Liao 已提交
256
    tscFreeSqlObj(pSql);
257
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
258
    return;
H
hzcheng 已提交
259 260
  }

261
  if (pEpSet) { 
dengyihao's avatar
dengyihao 已提交
262
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
dengyihao's avatar
fixbug  
dengyihao 已提交
263
      if(pCmd->command < TSDB_SQL_MGMT)  { 
264
        tscUpdateVgroupInfo(pSql, pEpSet); 
dengyihao's avatar
dengyihao 已提交
265
      } else {
dengyihao's avatar
dengyihao 已提交
266
        tscUpdateMgmtEpSet(pEpSet);
dengyihao's avatar
fixbug  
dengyihao 已提交
267
    }
dengyihao's avatar
dengyihao 已提交
268
    }
J
jtao1735 已提交
269 270
  }

271 272 273 274 275 276 277 278 279 280 281 282 283 284
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
285

286 287 288 289 290 291 292 293 294 295
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
      rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
296 297
      }
    }
S
slguan 已提交
298
  }
299

H
hzcheng 已提交
300
  pRes->rspLen = 0;
301
  
302 303
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
304
  } else {
305
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
306 307
  }

S
slguan 已提交
308
  if (pRes->code == TSDB_CODE_SUCCESS) {
309
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
310 311 312
    pSql->retry = 0;
  }

313
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
314
    assert(rpcMsg->msgType == pCmd->msgType + 1);
315
    pRes->code    = rpcMsg->code;
316
    pRes->rspType = rpcMsg->msgType;
317
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
318

319
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
320 321
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
322
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
323 324
      } else {
        pRes->pRsp = tmp;
325
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
326
      }
327 328
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
329 330
    }

H
hzcheng 已提交
331 332 333 334
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
335
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
336
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
337 338 339 340 341 342 343
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
344
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
345
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
346
    } else {
347
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
348 349
    }
  }
350
  
H
Haojun Liao 已提交
351
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
352
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
353
  }
S
Shengliang Guan 已提交
354

355
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
356
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
357
    
H
hjxilinx 已提交
358
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
359
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
360

361
    if (shouldFree) {
362
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
363
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
364 365 366
    }
  }

367
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
368 369
}

S
slguan 已提交
370 371 372
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
373

H
hjxilinx 已提交
374 375 376 377 378 379 380
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
381
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
382 383 384 385 386 387
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
388
  }
389

390 391 392
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
393
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
394
    pRes->code = code;
H
hjxilinx 已提交
395
    tscQueueAsyncRes(pSql);
396
    return pRes->code;
S
slguan 已提交
397
  }
H
hjxilinx 已提交
398 399
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
400 401 402
}

int tscProcessSql(SSqlObj *pSql) {
403
  char    *name = NULL;
404
  SSqlCmd *pCmd = &pSql->cmd;
405
  
406
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
407
  STableMetaInfo *pTableMetaInfo = NULL;
408
  uint32_t        type = 0;
409

410
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
411
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
412
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
413
    type = pQueryInfo->type;
414

H
hjxilinx 已提交
415
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
416
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
417
  }
418

419
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
420
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
421
    if (pTableMetaInfo == NULL) {
422
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
423 424
      return pSql->res.code;
    }
H
hjxilinx 已提交
425
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
426
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
427 428 429
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
430
  
S
slguan 已提交
431 432
  return doProcessSql(pSql);
}
H
hzcheng 已提交
433

H
hjxilinx 已提交
434
void tscKillSTableQuery(SSqlObj *pSql) {
435 436 437
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
438
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
439 440 441 442 443
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
444
    if (pSub == NULL) {
H
hzcheng 已提交
445 446
      continue;
    }
S
slguan 已提交
447

H
hzcheng 已提交
448 449
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
450
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
451
     */
dengyihao's avatar
dengyihao 已提交
452 453 454
    rpcCancelRequest(pSub->pRpcCtx);
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
    tscQueueAsyncRes(pSub);
H
hzcheng 已提交
455 456 457 458 459
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
460
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
461 462 463 464 465
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
466
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
467 468 469 470 471 472
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

473
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
474 475
}

J
jtao1735 已提交
476
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
477
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
478
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
479

H
Haojun Liao 已提交
480
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
481
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
482

483
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
484 485
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
486
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
487
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
488
    
H
hjxilinx 已提交
489
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
490 491
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
492
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
493
  } else {
H
hjxilinx 已提交
494
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
495
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
496
  }
497 498

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
499
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
500 501 502

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

503
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
504 505
}

506
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
507
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
508
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
509
  
510
  char* pMsg = pSql->cmd.payload;
511 512 513
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
514 515
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

516
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
517 518
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

519
  pMsg += sizeof(SMsgDesc);
520
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
521

H
hjxilinx 已提交
522
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
523
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
524
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
525
  
H
Haojun Liao 已提交
526
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
527

H
hjxilinx 已提交
528
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
529
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
530
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
531

532 533
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
534
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
535 536 537
}

/*
538
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
539
 */
540
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
541
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
542
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
543

S
TD-1057  
Shengliang Guan 已提交
544
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
545 546
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
547
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
548
  
549
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
550 551
}

552
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
553
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
554
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
555

H
hjxilinx 已提交
556
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
557
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
558 559
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
560
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
561 562
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
563
  
B
Bomin Zhang 已提交
564
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
565
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
566 567
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
568
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
569 570
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
571
    }
weixin_48148422's avatar
weixin_48148422 已提交
572

573 574 575 576
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
577

578
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
579 580 581
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
582

583 584
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
585
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
586
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
587
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
588
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
589

590
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
591

592
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
593

dengyihao's avatar
bugfix  
dengyihao 已提交
594
    // set the vgroup info 
595
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
596 597
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
598
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
599 600 601 602 603 604 605 606 607
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
608
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
609 610 611 612
      pMsg += sizeof(STableIdInfo);
    }
  }
  
613
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
614
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
615
  
616 617 618
  return pMsg;
}

619
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
620 621
  SSqlCmd *pCmd = &pSql->cmd;

622
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
623

S
slguan 已提交
624 625
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
626
    return -1;  // todo add test for this
S
slguan 已提交
627
  }
628
  
629
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
630
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
631
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
632
  
H
hjxilinx 已提交
633
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
634 635 636
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
637 638 639 640 641 642 643 644 645 646
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
647

648
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
649

S
TD-1057  
Shengliang Guan 已提交
650
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
651
  
652
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
653 654
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
655
  } else {
H
hjxilinx 已提交
656 657
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
658 659
  }

660 661
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
662
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
663 664
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
665
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
666 667
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
668
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
669
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
670
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
671
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
672
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
673 674
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
675
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
676 677

  // set column list ids
678 679
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
680
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
681
  
682 683 684
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
685

686
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
687
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
688 689
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
690 691
               pColSchema->name);

692
      return TSDB_CODE_TSC_INVALID_SQL;
693
    }
H
hzcheng 已提交
694 695 696

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
697
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
698
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
699

S
slguan 已提交
700 701 702
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
703

S
slguan 已提交
704
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
705
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
706 707

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
708

709
      if (pColFilter->filterstr) {
S
slguan 已提交
710 711 712 713 714 715 716 717 718 719
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
720

S
slguan 已提交
721 722 723 724 725
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
726 727
  }

H
hjxilinx 已提交
728
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
729
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
730
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
731

H
hjxilinx 已提交
732
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
733
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
734 735 736 737
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

738 739 740
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
741

742
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
743
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
744
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
745 746

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
747
      // todo add log
H
hzcheng 已提交
748 749 750 751 752
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
753
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
754 755 756 757 758
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
759
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
760
  }
761
  
762
  // serialize the table info (sid, uid, tags)
763 764
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
765
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
766
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
767
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
768 769
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
770
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
771 772
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
773 774 775
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

776 777
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
778 779 780

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
781 782 783
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
784 785 786
    }
  }

787
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
788
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
789 790
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
791 792
    }
  }
793 794 795 796 797 798 799 800 801
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
802
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
803 804 805 806
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
807 808
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
809
                 pCol->colIndex.columnIndex, pColSchema->name);
810

811
        return TSDB_CODE_TSC_INVALID_SQL;
812 813 814 815 816 817 818 819 820 821 822 823
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
824

H
Haojun Liao 已提交
825 826 827 828
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
829
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
846
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
847
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
848 849 850
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

851
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
852
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
853
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
854 855

    // todo refactor
B
Bomin Zhang 已提交
856 857 858 859 860
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
861 862 863

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
864 865 866 867
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
868 869 870 871

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
872 873
  }

S
slguan 已提交
874 875
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
876 877
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
878 879
  }

S
TD-1057  
Shengliang Guan 已提交
880
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
881

882
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
883
  pCmd->payloadLen = msgLen;
S
slguan 已提交
884
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
885
  
886
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
887
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
888 889

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
890 891
}

892 893
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
894
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
895
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
896

897
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
898

899
  assert(pCmd->numOfClause == 1);
900
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
901
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
902

903
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
904 905
}

906 907
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
908
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
909 910
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
911
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
912
  }
H
hzcheng 已提交
913

914
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
915 916
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
917
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
918

919
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
920 921
}

922 923
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
924
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
925 926
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
927
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
928
  }
H
hzcheng 已提交
929

930
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
931

932 933
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
934

935 936
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
937

938
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
939

940 941 942 943 944 945 946 947
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
948

949 950 951 952 953 954 955 956 957 958 959 960 961
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
962

S
slguan 已提交
963
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
964
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
965 966
}

967 968
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
969
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
970

S
slguan 已提交
971 972
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
973
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
974 975
  }

976
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
977

978 979
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
980
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
981

982 983 984 985
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
986 987
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
988
  }
H
hzcheng 已提交
989

990
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
991
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
992
  } else {
S
slguan 已提交
993
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
994
  }
H
hzcheng 已提交
995

996
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
997 998
}

999 1000
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1001
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1002
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1003 1004
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1005

1006 1007
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1008
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1009

S
slguan 已提交
1010 1011
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1012
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1013 1014
  }

1015
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1016

1017
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1018
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1019
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1020

S
slguan 已提交
1021
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1022
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1023 1024
}

1025 1026
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1027
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1028

S
slguan 已提交
1029 1030
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1031
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1032
  }
H
hzcheng 已提交
1033

1034
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1035
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1036
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1037
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1038

S
slguan 已提交
1039
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1040
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1041 1042
}

1043
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1044
  SSqlCmd *pCmd = &pSql->cmd;
1045
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1046 1047
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1048
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1049
  }
H
hzcheng 已提交
1050

1051
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1052
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1053
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1054
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1055

1056
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1057 1058
}

S
[TD-16]  
slguan 已提交
1059
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1060
  SSqlCmd *pCmd = &pSql->cmd;
1061
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1062
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1063

S
slguan 已提交
1064 1065
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1066
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1067
  }
H
hzcheng 已提交
1068

1069
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1070
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1071
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1072

1073
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1074 1075
}

S
[TD-16]  
slguan 已提交
1076 1077 1078 1079 1080 1081 1082
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1083
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1084 1085 1086 1087
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1088
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1089 1090 1091 1092

  return TSDB_CODE_SUCCESS;
}

1093 1094
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1095
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1096

S
slguan 已提交
1097 1098
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1099
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1100
  }
1101

1102
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1103
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1104
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1105
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1106

1107
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1108 1109
}

1110
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1111
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1112
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1113
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1114
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1115

S
slguan 已提交
1116 1117
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1118
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1119
  }
H
hzcheng 已提交
1120

1121
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1122

1123
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1124
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1125
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1126
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1127
  } else {
B
Bomin Zhang 已提交
1128
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1129 1130
  }

1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
dengyihao's avatar
dengyihao 已提交
1141 1142
    SSQLToken *pEpAddr = &pShowInfo->prefix;
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1143

dengyihao's avatar
dengyihao 已提交
1144 1145
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1146 1147
  }

1148
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1149
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1150 1151
}

1152
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1153
  SSqlCmd *pCmd = &pSql->cmd;
1154
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1155

1156 1157
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1158
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1159 1160
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1161
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1162 1163
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1164
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1165 1166 1167
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1168 1169
}

1170
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1171 1172
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1173
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1174

1175
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1176
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1177 1178
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1179
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1180
  }
1181

1182 1183 1184
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1185 1186 1187 1188

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1189
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1190
  int              msgLen = 0;
S
slguan 已提交
1191
  SSchema *        pSchema;
H
hzcheng 已提交
1192
  int              size = 0;
1193 1194 1195
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1196
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1197 1198

  // Reallocate the payload size
1199
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1200 1201
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1202
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1203
  }
H
hzcheng 已提交
1204 1205


1206
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1207
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1208 1209

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1210
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1211

1212 1213 1214
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1215 1216 1217 1218
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1219
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1220

1221 1222
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1223 1224 1225 1226 1227 1228 1229
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1230
  } else {  // create (super) table
1231
    pSchema = (SSchema *)pCreateTableMsg->schema;
1232

H
hzcheng 已提交
1233
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1234
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1235 1236 1237 1238

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1239

H
hzcheng 已提交
1240 1241 1242 1243
      pSchema++;
    }

    pMsg = (char *)pSchema;
1244 1245
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1246

1247 1248 1249
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1250 1251 1252
    }
  }

H
hjxilinx 已提交
1253
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1254

S
TD-1057  
Shengliang Guan 已提交
1255
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1256
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1257
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1258
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1259 1260

  assert(msgLen + minMsgSize() <= size);
1261
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1262 1263 1264
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1265
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1266
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1267 1268 1269
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1270
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1271 1272
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1273

1274
  SSqlCmd    *pCmd = &pSql->cmd;
1275 1276
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1277
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1278 1279 1280
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1281 1282 1283 1284
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1285 1286
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1287
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1288

H
hjxilinx 已提交
1289
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1290
  pAlterTableMsg->type = htons(pAlterInfo->type);
1291

1292
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1293
  SSchema *pSchema = pAlterTableMsg->schema;
1294
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1295
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1296
  
H
hzcheng 已提交
1297 1298 1299 1300 1301 1302 1303
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1304 1305 1306
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1307

S
TD-1057  
Shengliang Guan 已提交
1308
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1309

H
hzcheng 已提交
1310
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1311
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1312 1313

  assert(msgLen + minMsgSize() <= size);
1314

1315
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1316 1317
}

1318 1319 1320 1321
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1322
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1323
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1324

1325 1326
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1327

dengyihao's avatar
dengyihao 已提交
1328
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1329

1330 1331 1332
  return TSDB_CODE_SUCCESS;
}

1333
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1334
  SSqlCmd *pCmd = &pSql->cmd;
1335
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1336
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1337

1338
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1339
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1340
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1341

1342
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1343 1344
}

1345
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1346
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1347
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1348
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1349

S
slguan 已提交
1350 1351
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1352
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1353
  }
S
slguan 已提交
1354

S
slguan 已提交
1355 1356 1357 1358
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1359

1360
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1361 1362
}

1363
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1364
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1365 1366 1367
    return pRes->code;
  }

H
hjxilinx 已提交
1368
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1369
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1370
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1385

1386
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1387

H
hzcheng 已提交
1388 1389 1390 1391 1392 1393
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1394
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1395
  } else {
S
slguan 已提交
1396
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1412
  SSqlCmd *       pCmd = &pSql->cmd;
1413
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1414

H
hjxilinx 已提交
1415
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1416 1417
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1418 1419 1420
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1421 1422 1423
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1424 1425 1426
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1427
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1428 1429 1430
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1431
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1432
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1433 1434

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1435
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1436 1437 1438
  }

  pRes->row = 0;
1439
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1440

1441
  int32_t code = pRes->code;
H
hjxilinx 已提交
1442 1443 1444 1445
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1446 1447 1448 1449 1450
  }

  return code;
}

S
slguan 已提交
1451
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1452

1453
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1454
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1455
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1456
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1457
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1458

S
slguan 已提交
1459 1460
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1461
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1462 1463
  }

1464
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1465 1466 1467 1468

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1469 1470 1471
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1472

1473
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1474 1475
}

H
hjxilinx 已提交
1476
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1477
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1478
  char *         pMsg;
H
hzcheng 已提交
1479 1480
  int            msgLen = 0;

B
Bomin Zhang 已提交
1481 1482 1483 1484
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1485
    if (NULL == tmpData) {
1486
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1487 1488
    }

H
hzcheng 已提交
1489
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1490
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1491 1492
  }

1493 1494 1495
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1496
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1497

1498
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1499
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1500
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1501

1502
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1503

B
Bomin Zhang 已提交
1504 1505 1506
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1507 1508
  }

S
TD-1057  
Shengliang Guan 已提交
1509
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1510
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1511

S
Shengliang Guan 已提交
1512
  taosTFree(tmpData);
H
hzcheng 已提交
1513

S
TD-1057  
Shengliang Guan 已提交
1514
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
1515
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1516 1517
}

S
slguan 已提交
1518
/**
1519
 *  multi table meta req pkg format:
1520
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1521 1522
 *      no used         4B
 **/
1523
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1524
#if 0
S
slguan 已提交
1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1537
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1538

1539
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1540
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1541 1542

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1543
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1544 1545
  }

S
Shengliang Guan 已提交
1546
  taosTFree(tmpData);
S
slguan 已提交
1547

1548
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1549
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1550 1551 1552

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1553
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1554 1555 1556
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1557 1558
#endif
  return 0;  
S
slguan 已提交
1559 1560
}

H
hjxilinx 已提交
1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1578
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1579 1580 1581 1582 1583 1584 1585 1586
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1587

H
hjxilinx 已提交
1588 1589
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1590 1591 1592 1593 1594 1595 1596 1597 1598 1599
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1600 1601 1602
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1603
  }
H
hjxilinx 已提交
1604 1605

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1606
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1607

1608
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1609 1610
}

1611 1612
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1613 1614
  STscObj *pObj = pSql->pTscObj;

1615
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1616

S
Shengliang Guan 已提交
1617
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1618 1619 1620
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1621
    numOfQueries++;
H
hzcheng 已提交
1622 1623
  }

S
Shengliang Guan 已提交
1624
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1625 1626 1627
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1628
    numOfStreams++;
H
hzcheng 已提交
1629 1630
  }

1631
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1632
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1633
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1634 1635 1636
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1637

1638
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1639 1640
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1641
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1642 1643 1644 1645

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1646
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1647

1648
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1649 1650
}

1651 1652
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1653

1654 1655
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1656
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1657 1658
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1659 1660
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1661
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1662

1663 1664
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid);
1665
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1666 1667
  }

B
Bomin Zhang 已提交
1668
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1669
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1670
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1671 1672
  }

1673 1674
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1675
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1676 1677
  }

1678 1679
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1680 1681
  }

1682
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1683

1684
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1685 1686 1687
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1688 1689 1690 1691 1692

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1693
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1694 1695 1696
    pSchema++;
  }

1697 1698
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1699
  
H
hzcheng 已提交
1700
  // todo add one more function: taosAddDataIfNotExists();
1701
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1702
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1703

H
Haojun Liao 已提交
1704 1705
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1706
  
1707
  // todo handle out of memory case
1708
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1709
    free(pTableMeta);
1710
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1711
  }
H
hzcheng 已提交
1712

1713
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1714
  free(pTableMeta);
1715
  
H
hjxilinx 已提交
1716
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1717 1718
}

S
slguan 已提交
1719
/**
1720
 *  multi table meta rsp pkg format:
1721
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1722 1723 1724
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1725
#if 0
S
slguan 已提交
1726 1727 1728 1729 1730
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1731
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1732
    pSql->res.numOfTotal = 0;
1733
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1734 1735 1736 1737
  }

  rsp++;

1738
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1739
  totalNum = htonl(pInfo->numOfTables);
1740
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1741 1742

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1743
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1744
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1745 1746 1747

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1748
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1749 1750
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1751 1752
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1753
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1754
      pSql->res.numOfTotal = i;
1755
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1756 1757
    }

H
hjxilinx 已提交
1758 1759 1760 1761
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1762
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1763
    //      pSql->res.numOfTotal = i;
1764
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1765 1766 1767 1768
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1769
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1770
    //      pSql->res.numOfTotal = i;
1771
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1772 1773 1774 1775
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1776
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1777
    //      pSql->res.numOfTotal = i;
1778
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1779 1780
    //    }
    //
H
hjxilinx 已提交
1781
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1816
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1817
    //  }
S
slguan 已提交
1818
  }
H
hjxilinx 已提交
1819
  
S
slguan 已提交
1820 1821
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1822
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1823 1824
#endif
  
S
slguan 已提交
1825 1826 1827
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1828
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1829
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1830
  
H
hjxilinx 已提交
1831
  // NOTE: the order of several table must be preserved.
1832
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1833 1834
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1835
  
1836 1837 1838
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1839
  
1840
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1852
      //just init, no need to lock
H
hjxilinx 已提交
1853 1854
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
      pVgroups->vgId = htonl(pVgroups->vgId);
1855
      assert(pVgroups->numOfEps >= 1);
H
hjxilinx 已提交
1856

1857 1858
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
        pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
H
hjxilinx 已提交
1859
      }
1860
    }
1861 1862

    pMsg += size;
H
hjxilinx 已提交
1863 1864
  }
  
S
slguan 已提交
1865
  return pSql->res.code;
H
hzcheng 已提交
1866 1867 1868 1869 1870 1871
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1872
  STableMetaMsg * pMetaMsg;
1873
  SCMShowRsp *pShow;
S
slguan 已提交
1874
  SSchema *    pSchema;
H
hzcheng 已提交
1875 1876
  char         key[20];

1877 1878 1879
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1880
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1881

H
hjxilinx 已提交
1882
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1883

1884
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1885
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1886 1887
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1888
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1889
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1890

H
hjxilinx 已提交
1891
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1892

H
hjxilinx 已提交
1893
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1894 1895
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1896 1897 1898 1899
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1900
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1901 1902
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1903
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1904
  
H
hjxilinx 已提交
1905 1906 1907
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1908 1909
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
      tsTableMetaKeepTimer);
H
hjxilinx 已提交
1910
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1911

1912 1913 1914 1915
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1916 1917
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1918
  SColumnIndex index = {0};
H
hjxilinx 已提交
1919 1920 1921
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1922
    index.columnIndex = i;
1923 1924
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1925 1926
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1927
    
H
hjxilinx 已提交
1928
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1929
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1930
  }
H
hjxilinx 已提交
1931 1932
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1933
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1934
  
S
Shengliang Guan 已提交
1935
  taosTFree(pTableMeta);
H
hzcheng 已提交
1936 1937 1938
  return 0;
}

1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

  SQueryInfo *pQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;
  pObj->pHb = pSql;
  tscAddSubqueryInfo(&pObj->pHb->cmd);

  tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}

H
hzcheng 已提交
1968
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
1969
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
1970 1971 1972
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1973
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1974
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1975 1976
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1977 1978
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1979
  
dengyihao's avatar
dengyihao 已提交
1980 1981 1982
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
1983
  } 
H
hzcheng 已提交
1984

S
slguan 已提交
1985
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
1986 1987
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
1988
  pObj->connId = htonl(pConnect->connId);
1989 1990 1991

  createHBObj(pObj);

S
scripts  
slguan 已提交
1992
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
1993 1994 1995 1996 1997

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
1998
  STscObj *       pObj = pSql->pTscObj;
1999
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2000

B
Bomin Zhang 已提交
2001
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2002 2003 2004
  return 0;
}

S
Shengliang Guan 已提交
2005
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2006
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2007 2008 2009 2010
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2011
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2012

H
Haojun Liao 已提交
2013 2014
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2015 2016 2017 2018 2019 2020 2021
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2022 2023
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2024
   */
2025
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2026
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2027

H
hjxilinx 已提交
2028 2029
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2030 2031 2032 2033 2034 2035
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2036
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2037

H
Haojun Liao 已提交
2038
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2039
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2040 2041 2042
    return 0;
  }

2043
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2044
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2045

H
hjxilinx 已提交
2046
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2047
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2048
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2049

2050
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2051
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2052
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2067
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2068 2069 2070
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2071
  pRes->data = NULL;
S
slguan 已提交
2072
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2073 2074 2075
  return 0;
}

H
hjxilinx 已提交
2076
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2077 2078 2079
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2080
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2081 2082 2083

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2084 2085
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2086
  pRes->completed = (pRetrieve->completed == 1);
2087
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2088
  
2089
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2090 2091 2092 2093
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2094
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2095
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2096
    
H
hjxilinx 已提交
2097
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2098 2099
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2100 2101
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2102
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2103
    p += sizeof(int32_t);
S
slguan 已提交
2104
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2105 2106
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2107
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2108 2109
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2110
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2111
    }
2112 2113
  }

H
hzcheng 已提交
2114
  pRes->row = 0;
2115
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2116 2117 2118 2119

  return 0;
}

H
hjxilinx 已提交
2120
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2121

2122
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2123 2124
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2125
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2126
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2127
  }
2128

H
hzcheng 已提交
2129 2130 2131
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2132

2133
  tscAddSubqueryInfo(&pNew->cmd);
2134 2135 2136 2137

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2138
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2139
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2140
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2141
    free(pNew);
2142

2143
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2144 2145
  }

H
hjxilinx 已提交
2146
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2147
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2148

B
Bomin Zhang 已提交
2149
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2150 2151
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2152
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2153

H
hjxilinx 已提交
2154 2155
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2156

H
hjxilinx 已提交
2157 2158
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2159
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2160 2161 2162 2163 2164
  }

  return code;
}

H
hjxilinx 已提交
2165
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2166
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2167

2168
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2169 2170
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2171 2172
  }
  
H
Haojun Liao 已提交
2173
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2174
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2175
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2176
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2177
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2178 2179 2180

    return TSDB_CODE_SUCCESS;
  }
2181 2182
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2183 2184
}

H
hjxilinx 已提交
2185
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2186
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2187
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2188 2189 2190
}

/**
H
Haojun Liao 已提交
2191
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2192
 * @param pSql          sql object
H
Haojun Liao 已提交
2193
 * @param tableId       table full name
H
hzcheng 已提交
2194 2195
 * @return              status code
 */
H
Haojun Liao 已提交
2196
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2197
  SSqlCmd *pCmd = &pSql->cmd;
2198 2199

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2200
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2201

H
Haojun Liao 已提交
2202 2203
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2204
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2205
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2206 2207
  }

H
Haojun Liao 已提交
2208 2209
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2210 2211
}

H
hjxilinx 已提交
2212
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2213
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2214
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2215
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2216 2217
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2218 2219
    }
  }
H
hjxilinx 已提交
2220 2221 2222 2223
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2224

H
hjxilinx 已提交
2225
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2226
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2227 2228 2229
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2230 2231
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2232

S
slguan 已提交
2233
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2234 2235 2236
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2237
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2238 2239
  
  SQueryInfo *pNewQueryInfo = NULL;
2240
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2241
    tscFreeSqlObj(pNew);
2242 2243
    return code;
  }
2244
  
H
hjxilinx 已提交
2245
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2246
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2247 2248 2249
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2250 2251 2252 2253 2254 2255
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2256

2257
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2258
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2259

2260 2261 2262 2263
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2264
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2265 2266 2267 2268 2269
  }

  return code;
}

2270
void tscInitMsgsFp() {
S
slguan 已提交
2271 2272
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2273
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2274 2275

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2276
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2277

2278 2279
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2280 2281

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2282
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2283 2284 2285
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2286
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2287 2288 2289
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2290
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2291
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2292 2293 2294 2295
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2296
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2297
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2298
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2299 2300 2301 2302

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2303 2304 2305
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2306 2307

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2308
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2309 2310 2311 2312 2313

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2314
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2315
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2316
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2317 2318

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2319
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2320
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2321

H
Haojun Liao 已提交
2322 2323 2324 2325 2326
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2327

H
hzcheng 已提交
2328 2329
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2330
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2331 2332 2333 2334 2335 2336 2337 2338 2339 2340

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}