tscServer.c 79.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
28
#include "tlockfree.h"
H
hzcheng 已提交
29 30 31

#define TSC_MGMT_VNODE 999

32
SRpcCorEpSet  tscMgmtEpSet;
33
SRpcEpSet  tscDnodeEpSet;
S
slguan 已提交
34

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

47
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
48 49
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

50
  SRpcEpSet* pEpSet = &pSql->epSet;
51 52 53 54
  pEpSet->inUse = 0;

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
55

56 57 58 59
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
60 61 62 63

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
64
  }
65 66

  assert(hasFqdn);
67
}
68

dengyihao's avatar
dengyihao 已提交
69 70 71 72
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
73
}  
dengyihao's avatar
dengyihao 已提交
74 75
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
76 77 78
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
79 80
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
81 82
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
83
   for (int32_t i = 0; i < s1->numOfEps; i++) {
84 85 86 87 88
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
91
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
92 93 94
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
95
}
dengyihao's avatar
dengyihao 已提交
96
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
97
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
98
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
99
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
100 101 102
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
103
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
104
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
105
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
106
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
107 108
}

dengyihao's avatar
dengyihao 已提交
109
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
110 111
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
112
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
bugfix  
dengyihao 已提交
115
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
116
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
117 118
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
119
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
Y
yihaoDeng 已提交
120
    tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], sizeof(pEpSet->fqdn[i]));
121
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
122
  }
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
124
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
125
}
126 127 128 129
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
130
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
131
  } else {
132
    for (int i = 0; i < dump.numOfEps; ++i) {
133
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
134
    }
S
slguan 已提交
135
  }
S
slguan 已提交
136 137
}

H
hzcheng 已提交
138 139 140 141 142 143 144 145 146 147 148 149
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
150
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
151 152 153 154
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
155
    } 
S
slguan 已提交
156

S
Shengliang Guan 已提交
157 158
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
159 160
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
161
      return;
H
hzcheng 已提交
162
    } else {
S
slguan 已提交
163 164
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
165 166
    }
  } else {
167
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
168 169 170 171 172 173 174
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
175 176 177
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
178

179 180 181
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
182 183
  }

184 185 186
  if (tscShouldFreeHeartBeat(pHB)) {
    tscDebug("%p free HB object and release connection", pHB);
    tscFreeSqlObj(pHB);
H
hzcheng 已提交
187
    tscCloseTscObj(pObj);
H
Haojun Liao 已提交
188
  } else {
189
    int32_t code = tscProcessSql(pHB);
H
Haojun Liao 已提交
190
    if (code != TSDB_CODE_SUCCESS) {
191
      tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
Haojun Liao 已提交
192
    }
H
hzcheng 已提交
193 194 195 196
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
197
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
198 199 200
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
201
  if (NULL == pMsg) {
202
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
203
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
204 205
  }

206 207
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
208
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
209 210
  }

211
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
212

J
jtao1735 已提交
213
  SRpcMsg rpcMsg = {
214 215 216
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217 218
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
219
      .code    = 0
J
jtao1735 已提交
220
  };
H
hzcheng 已提交
221

H
Haojun Liao 已提交
222 223 224 225
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
226
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
S
slguan 已提交
227
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
228 229
}

230
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231
  SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
H
Haojun Liao 已提交
232
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
233
    tscError("%p sql is already released", pSql);
234 235
    return;
  }
236

H
Haojun Liao 已提交
237
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
238 239
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
240

H
Haojun Liao 已提交
241
  if (pObj->signature != pObj) {
242
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
243 244 245 246 247 248

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

249 250
  pSql->pRpcCtx = NULL;    // clear the rpcCtx

H
Haojun Liao 已提交
251 252
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
253
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
254
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
255

H
Haojun Liao 已提交
256
    tscFreeSqlObj(pSql);
257
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
258
    return;
H
hzcheng 已提交
259 260
  }

261
  if (pEpSet) { 
dengyihao's avatar
dengyihao 已提交
262
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
dengyihao's avatar
fixbug  
dengyihao 已提交
263
      if(pCmd->command < TSDB_SQL_MGMT)  { 
264
        tscUpdateVgroupInfo(pSql, pEpSet); 
dengyihao's avatar
dengyihao 已提交
265
      } else {
dengyihao's avatar
dengyihao 已提交
266
        tscUpdateMgmtEpSet(pEpSet);
dengyihao's avatar
fixbug  
dengyihao 已提交
267
    }
dengyihao's avatar
dengyihao 已提交
268
    }
J
jtao1735 已提交
269 270
  }

271 272 273 274 275 276 277
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
278
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
279 280 281 282 283 284 285
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
286

287 288 289 290 291 292 293 294 295 296
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
      rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
297 298
      }
    }
S
slguan 已提交
299
  }
300

H
hzcheng 已提交
301
  pRes->rspLen = 0;
302
  
303 304
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
305
  } else {
306
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
307 308
  }

S
slguan 已提交
309
  if (pRes->code == TSDB_CODE_SUCCESS) {
310
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
311 312 313
    pSql->retry = 0;
  }

314
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
315
    assert(rpcMsg->msgType == pCmd->msgType + 1);
316
    pRes->code    = rpcMsg->code;
317
    pRes->rspType = rpcMsg->msgType;
318
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
319

320
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
321 322
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
323
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
324 325
      } else {
        pRes->pRsp = tmp;
326
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
327
      }
328 329
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
330 331
    }

H
hzcheng 已提交
332 333 334 335
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
336
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
337
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
338 339 340 341 342 343 344
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
345
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
346
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
347
    } else {
348
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
349 350
    }
  }
351
  
H
Haojun Liao 已提交
352
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
353
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
354
  }
S
Shengliang Guan 已提交
355

356
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
357
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
358
    
H
hjxilinx 已提交
359
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
360
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
361

362
    if (shouldFree) {
363
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
364
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
365 366 367
    }
  }

368
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
369 370
}

S
slguan 已提交
371 372 373
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
374

H
hjxilinx 已提交
375 376 377 378 379 380 381
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
382
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
383 384 385 386 387 388
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
389
  }
390

391 392 393
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
394
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
395
    pRes->code = code;
H
hjxilinx 已提交
396
    tscQueueAsyncRes(pSql);
397
    return pRes->code;
S
slguan 已提交
398
  }
H
hjxilinx 已提交
399 400
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
401 402 403
}

int tscProcessSql(SSqlObj *pSql) {
404
  char    *name = NULL;
405
  SSqlCmd *pCmd = &pSql->cmd;
406
  
407
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
408
  STableMetaInfo *pTableMetaInfo = NULL;
409
  uint32_t        type = 0;
410

411
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
412
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
413
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
414
    type = pQueryInfo->type;
415

H
hjxilinx 已提交
416
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
417
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
418
  }
419

420
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
421
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
422
    if (pTableMetaInfo == NULL) {
423
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
424 425
      return pSql->res.code;
    }
H
hjxilinx 已提交
426
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
427
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
428 429 430
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
431
  
S
slguan 已提交
432 433
  return doProcessSql(pSql);
}
H
hzcheng 已提交
434

H
hjxilinx 已提交
435
void tscKillSTableQuery(SSqlObj *pSql) {
436 437 438
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
439
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
440 441 442 443 444
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
445
    if (pSub == NULL) {
H
hzcheng 已提交
446 447
      continue;
    }
S
slguan 已提交
448

H
hzcheng 已提交
449 450
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
451
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
452
     */
dengyihao's avatar
dengyihao 已提交
453 454 455
    rpcCancelRequest(pSub->pRpcCtx);
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
    tscQueueAsyncRes(pSub);
H
hzcheng 已提交
456 457 458 459 460
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
461
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
462 463 464 465 466
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
467
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
468 469 470 471 472 473
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

474
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
475 476
}

J
jtao1735 已提交
477
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
478
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
479
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
480

H
Haojun Liao 已提交
481
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
482
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
483

484
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
485 486
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
487
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
488
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
489
    
H
hjxilinx 已提交
490
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
491 492
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
493
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
494
  } else {
H
hjxilinx 已提交
495
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
496
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
497
  }
498 499

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
500
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
501 502 503

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

504
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
505 506
}

507
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
508
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
509
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
510
  
511
  char* pMsg = pSql->cmd.payload;
512 513 514
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
515 516
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

517
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
518 519
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

520
  pMsg += sizeof(SMsgDesc);
521
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
522

H
hjxilinx 已提交
523
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
524
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
525
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
526
  
H
Haojun Liao 已提交
527
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
528

H
hjxilinx 已提交
529
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
530
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
531
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
532

533 534
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
535
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
536 537 538
}

/*
539
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
540
 */
541
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
542
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
543
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
544

S
TD-1057  
Shengliang Guan 已提交
545
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
546 547
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
548
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
549
  
550
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
551 552
}

553
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
554
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
555
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
556

H
hjxilinx 已提交
557
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
558
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
559 560
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
561
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
562 563
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
564
  
B
Bomin Zhang 已提交
565
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
566
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
567 568
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
569
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
570 571
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
572
    }
weixin_48148422's avatar
weixin_48148422 已提交
573

574 575 576 577
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
578

579
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
580 581 582
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
583

584 585
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
586
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
587
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
588
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
589
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
590

591
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
592

593
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
594

dengyihao's avatar
bugfix  
dengyihao 已提交
595
    // set the vgroup info 
596
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
597 598
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
599
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
600 601 602 603 604 605 606 607 608
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
609
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
610 611 612 613
      pMsg += sizeof(STableIdInfo);
    }
  }
  
614
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
615
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
616
  
617 618 619
  return pMsg;
}

620
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
621 622
  SSqlCmd *pCmd = &pSql->cmd;

623
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
624

S
slguan 已提交
625 626
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
627
    return -1;  // todo add test for this
S
slguan 已提交
628
  }
629
  
630
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
631
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
632
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
633
  
H
hjxilinx 已提交
634
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
635 636 637
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
638 639 640 641 642 643 644 645 646 647
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
648

649
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
650

S
TD-1057  
Shengliang Guan 已提交
651
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
652
  
653
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
654 655
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
656
  } else {
H
hjxilinx 已提交
657 658
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
659 660
  }

661 662
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
663
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
664 665
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
666
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
667 668
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
669
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
670
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
671
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
672
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
673
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
674 675
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
676
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
677 678

  // set column list ids
679 680
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
681
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
682
  
683 684 685
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
686

687
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
688
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
689 690
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
691 692
               pColSchema->name);

693
      return TSDB_CODE_TSC_INVALID_SQL;
694
    }
H
hzcheng 已提交
695 696 697

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
698
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
699
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
700

S
slguan 已提交
701 702 703
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
704

S
slguan 已提交
705
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
706
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
707 708

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
709

710
      if (pColFilter->filterstr) {
S
slguan 已提交
711 712 713 714 715 716 717 718 719 720
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
721

S
slguan 已提交
722 723 724 725 726
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
727 728
  }

H
hjxilinx 已提交
729
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
730
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
731
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
732

H
hjxilinx 已提交
733
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
734
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
735 736 737 738
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

739 740 741
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
742

743
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
744
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
745
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
746 747

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
748
      // todo add log
H
hzcheng 已提交
749 750 751 752 753
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
754
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
755 756 757 758 759
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
760
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
761
  }
762
  
763
  // serialize the table info (sid, uid, tags)
764 765
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
766
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
767
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
768
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
769 770
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
771
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
772 773
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
774 775 776
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

777 778
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
779 780 781

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
782 783 784
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
785 786 787
    }
  }

788
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
789
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
790 791
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
792 793
    }
  }
794 795 796 797 798 799 800 801 802
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
803
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
804 805 806 807
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
808 809
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
810
                 pCol->colIndex.columnIndex, pColSchema->name);
811

812
        return TSDB_CODE_TSC_INVALID_SQL;
813 814 815 816 817 818 819 820 821 822 823 824
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
825

H
Haojun Liao 已提交
826 827 828 829
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
830
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
847
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
848
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
849 850 851
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

852
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
853
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
854
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
855 856

    // todo refactor
B
Bomin Zhang 已提交
857 858 859 860 861
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
862 863 864

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
865 866 867 868
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
869 870 871 872

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
873 874
  }

S
slguan 已提交
875 876
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
877 878
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
879 880
  }

S
TD-1057  
Shengliang Guan 已提交
881
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
882

883
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
884
  pCmd->payloadLen = msgLen;
S
slguan 已提交
885
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
886
  
887
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
888
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
889 890

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
891 892
}

893 894
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
895
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
896
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
897

898
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
899

900
  assert(pCmd->numOfClause == 1);
901
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
902
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
903

904
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
905 906
}

907 908
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
909
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
910 911
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
912
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
913
  }
H
hzcheng 已提交
914

915
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
916 917
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
918
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
919

920
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
921 922
}

923 924
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
925
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
926 927
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
928
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
929
  }
H
hzcheng 已提交
930

931
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
932

933 934
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
935

936 937
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
938

939
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
940

941 942 943 944 945 946 947 948
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
949

950 951 952 953 954 955 956 957 958 959 960 961 962
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
963

S
slguan 已提交
964
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
965
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
966 967
}

968 969
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
970
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
971

S
slguan 已提交
972 973
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
974
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
975 976
  }

977
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
978

979 980
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
981
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
982

983 984 985 986
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
987 988
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
989
  }
H
hzcheng 已提交
990

991
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
992
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
993
  } else {
S
slguan 已提交
994
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
995
  }
H
hzcheng 已提交
996

997
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
998 999
}

1000 1001
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1002
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1003
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1004 1005
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1006

1007 1008
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1009
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1010

S
slguan 已提交
1011 1012
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1013
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1014 1015
  }

1016
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1017

1018
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1019
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1020
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1021

S
slguan 已提交
1022
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1023
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1024 1025
}

1026 1027
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1028
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1029

S
slguan 已提交
1030 1031
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1032
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1033
  }
H
hzcheng 已提交
1034

1035
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1036
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1037
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1038
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1039

S
slguan 已提交
1040
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1041
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1042 1043
}

1044
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1045
  SSqlCmd *pCmd = &pSql->cmd;
1046
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1047 1048
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1049
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1050
  }
H
hzcheng 已提交
1051

1052
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1053
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1054
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1055
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1056

1057
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1058 1059
}

S
[TD-16]  
slguan 已提交
1060
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1061
  SSqlCmd *pCmd = &pSql->cmd;
1062
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1063
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1064

S
slguan 已提交
1065 1066
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1067
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1068
  }
H
hzcheng 已提交
1069

1070
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1071
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1072
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1073

1074
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1075 1076
}

S
[TD-16]  
slguan 已提交
1077 1078 1079 1080 1081 1082 1083
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1084
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1085 1086 1087 1088
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1089
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1090 1091 1092 1093

  return TSDB_CODE_SUCCESS;
}

1094 1095
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1096
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1097

S
slguan 已提交
1098 1099
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1100
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1101
  }
1102

1103
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1104
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1105
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1106
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1107

1108
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1109 1110
}

1111
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1112
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1113
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1114
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1115
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1116

S
slguan 已提交
1117 1118
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1119
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1120
  }
H
hzcheng 已提交
1121

1122
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1123

1124
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1125
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1126
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1127
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1128
  } else {
B
Bomin Zhang 已提交
1129
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1130 1131
  }

1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
dengyihao's avatar
dengyihao 已提交
1142 1143
    SSQLToken *pEpAddr = &pShowInfo->prefix;
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1144

dengyihao's avatar
dengyihao 已提交
1145 1146
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1147 1148
  }

1149
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1150
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1151 1152
}

1153
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1154
  SSqlCmd *pCmd = &pSql->cmd;
1155
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1156

1157 1158
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1159
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1160 1161
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1162
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1163 1164
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1165
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1166 1167 1168
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1169 1170
}

1171
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1172 1173
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1174
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1175

1176
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1177
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1178 1179
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1180
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1181
  }
1182

1183 1184 1185
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1186 1187 1188 1189

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1190
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1191
  int              msgLen = 0;
S
slguan 已提交
1192
  SSchema *        pSchema;
H
hzcheng 已提交
1193
  int              size = 0;
1194 1195 1196
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1197
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1198 1199

  // Reallocate the payload size
1200
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1201 1202
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1203
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1204
  }
H
hzcheng 已提交
1205 1206


1207
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1208
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1209 1210

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1211
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1212

1213 1214 1215
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1216 1217 1218 1219
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1220
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1221

1222 1223
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1224 1225 1226 1227 1228 1229 1230
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1231
  } else {  // create (super) table
1232
    pSchema = (SSchema *)pCreateTableMsg->schema;
1233

H
hzcheng 已提交
1234
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1235
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1236 1237 1238 1239

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1240

H
hzcheng 已提交
1241 1242 1243 1244
      pSchema++;
    }

    pMsg = (char *)pSchema;
1245 1246
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1247

1248 1249 1250
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1251 1252 1253
    }
  }

H
hjxilinx 已提交
1254
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1255

S
TD-1057  
Shengliang Guan 已提交
1256
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1257
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1258
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1259
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1260 1261

  assert(msgLen + minMsgSize() <= size);
1262
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1263 1264 1265
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1266
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1267
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1268 1269 1270
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1271
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1272 1273
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1274

1275
  SSqlCmd    *pCmd = &pSql->cmd;
1276 1277
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1278
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1279 1280 1281
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1282 1283 1284 1285
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1286 1287
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1288
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1289

H
hjxilinx 已提交
1290
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1291
  pAlterTableMsg->type = htons(pAlterInfo->type);
1292

1293
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1294
  SSchema *pSchema = pAlterTableMsg->schema;
1295
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1296
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1297
  
H
hzcheng 已提交
1298 1299 1300 1301 1302 1303 1304
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1305 1306 1307
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1308

S
TD-1057  
Shengliang Guan 已提交
1309
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1310

H
hzcheng 已提交
1311
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1312
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1313 1314

  assert(msgLen + minMsgSize() <= size);
1315

1316
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1317 1318
}

1319 1320 1321 1322
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1323
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1324
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1325

1326 1327
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1328

dengyihao's avatar
dengyihao 已提交
1329
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1330

1331 1332 1333
  return TSDB_CODE_SUCCESS;
}

1334
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1335
  SSqlCmd *pCmd = &pSql->cmd;
1336
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1337
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1338

1339
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1340
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1341
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1342

1343
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1344 1345
}

1346
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1347
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1348
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1349
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1350

S
slguan 已提交
1351 1352
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1353
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1354
  }
S
slguan 已提交
1355

S
slguan 已提交
1356 1357 1358 1359
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1360

1361
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1362 1363
}

1364
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1365
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1366 1367 1368
    return pRes->code;
  }

H
hjxilinx 已提交
1369
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1370
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1371
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1386

1387
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1388

H
hzcheng 已提交
1389 1390 1391 1392 1393 1394
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1395
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1396
  } else {
S
slguan 已提交
1397
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1413
  SSqlCmd *       pCmd = &pSql->cmd;
1414
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1415

H
hjxilinx 已提交
1416
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1417 1418
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1419 1420 1421
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1422 1423 1424
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1425 1426 1427
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1428
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1429 1430 1431
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1432
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1433
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1434 1435

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1436
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1437 1438 1439
  }

  pRes->row = 0;
1440
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1441

1442
  int32_t code = pRes->code;
H
hjxilinx 已提交
1443 1444 1445 1446
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1447 1448 1449 1450 1451
  }

  return code;
}

S
slguan 已提交
1452
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1453

1454
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1455
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1456
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1457
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1458
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1459

S
slguan 已提交
1460 1461
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1462
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1463 1464
  }

1465
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1466 1467 1468 1469

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1470 1471 1472
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1473

1474
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1475 1476
}

H
hjxilinx 已提交
1477
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1478
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1479
  char *         pMsg;
H
hzcheng 已提交
1480 1481
  int            msgLen = 0;

B
Bomin Zhang 已提交
1482 1483 1484 1485
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1486
    if (NULL == tmpData) {
1487
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1488 1489
    }

H
hzcheng 已提交
1490
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1491
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1492 1493
  }

1494 1495 1496
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1497
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1498

1499
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1500
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1501
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1502

1503
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1504

B
Bomin Zhang 已提交
1505 1506 1507
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1508 1509
  }

S
TD-1057  
Shengliang Guan 已提交
1510
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1511
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1512

S
Shengliang Guan 已提交
1513
  taosTFree(tmpData);
H
hzcheng 已提交
1514

S
TD-1057  
Shengliang Guan 已提交
1515
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
1516
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1517 1518
}

S
slguan 已提交
1519
/**
1520
 *  multi table meta req pkg format:
1521
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1522 1523
 *      no used         4B
 **/
1524
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1525
#if 0
S
slguan 已提交
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1538
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1539

1540
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1541
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1542 1543

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1544
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1545 1546
  }

S
Shengliang Guan 已提交
1547
  taosTFree(tmpData);
S
slguan 已提交
1548

1549
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1550
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1551 1552 1553

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1554
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1555 1556 1557
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1558 1559
#endif
  return 0;  
S
slguan 已提交
1560 1561
}

H
hjxilinx 已提交
1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1579
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1580 1581 1582 1583 1584 1585 1586 1587
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1588

H
hjxilinx 已提交
1589 1590
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1591 1592 1593 1594 1595 1596 1597 1598 1599 1600
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1601 1602 1603
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1604
  }
H
hjxilinx 已提交
1605 1606

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1607
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1608

1609
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1610 1611
}

1612 1613
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1614 1615
  STscObj *pObj = pSql->pTscObj;

1616
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1617

S
Shengliang Guan 已提交
1618
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1619 1620 1621
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1622
    numOfQueries++;
H
hzcheng 已提交
1623 1624
  }

S
Shengliang Guan 已提交
1625
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1626 1627 1628
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1629
    numOfStreams++;
H
hzcheng 已提交
1630 1631
  }

1632
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1633
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1634
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1635 1636 1637
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1638

1639
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1640 1641
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1642
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1643 1644 1645 1646

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1647
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1648

1649
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1650 1651
}

1652 1653
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1654

1655 1656
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1657
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1658 1659
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1660 1661
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1662
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1663

1664 1665
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid);
1666
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1667 1668
  }

B
Bomin Zhang 已提交
1669
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1670
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1671
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1672 1673
  }

1674 1675
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1676
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1677 1678
  }

1679 1680
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1681 1682
  }

1683
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1684

1685
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1686 1687 1688
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1689 1690 1691 1692 1693

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1694
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1695 1696 1697
    pSchema++;
  }

1698 1699
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1700
  
H
hzcheng 已提交
1701
  // todo add one more function: taosAddDataIfNotExists();
1702
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1703
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1704

H
Haojun Liao 已提交
1705 1706
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1707
  
1708
  // todo handle out of memory case
1709
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1710
    free(pTableMeta);
1711
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1712
  }
H
hzcheng 已提交
1713

1714
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1715
  free(pTableMeta);
1716
  
H
hjxilinx 已提交
1717
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1718 1719
}

S
slguan 已提交
1720
/**
1721
 *  multi table meta rsp pkg format:
1722
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1723 1724 1725
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1726
#if 0
S
slguan 已提交
1727 1728 1729 1730 1731
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1732
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1733
    pSql->res.numOfTotal = 0;
1734
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1735 1736 1737 1738
  }

  rsp++;

1739
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1740
  totalNum = htonl(pInfo->numOfTables);
1741
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1742 1743

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1744
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1745
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1746 1747 1748

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1749
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1750 1751
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1752 1753
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1754
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1755
      pSql->res.numOfTotal = i;
1756
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1757 1758
    }

H
hjxilinx 已提交
1759 1760 1761 1762
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1763
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1764
    //      pSql->res.numOfTotal = i;
1765
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1766 1767 1768 1769
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1770
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1771
    //      pSql->res.numOfTotal = i;
1772
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1773 1774 1775 1776
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1777
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1778
    //      pSql->res.numOfTotal = i;
1779
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1780 1781
    //    }
    //
H
hjxilinx 已提交
1782
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1817
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1818
    //  }
S
slguan 已提交
1819
  }
H
hjxilinx 已提交
1820
  
S
slguan 已提交
1821 1822
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1823
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1824 1825
#endif
  
S
slguan 已提交
1826 1827 1828
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1829
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1830
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1831
  
H
hjxilinx 已提交
1832
  // NOTE: the order of several table must be preserved.
1833
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1834 1835
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1836
  
1837 1838 1839
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1840
  
1841
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1853
      //just init, no need to lock
H
hjxilinx 已提交
1854 1855
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
      pVgroups->vgId = htonl(pVgroups->vgId);
1856
      assert(pVgroups->numOfEps >= 1);
H
hjxilinx 已提交
1857

1858 1859
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
        pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
H
hjxilinx 已提交
1860
      }
1861
    }
1862 1863

    pMsg += size;
H
hjxilinx 已提交
1864 1865
  }
  
S
slguan 已提交
1866
  return pSql->res.code;
H
hzcheng 已提交
1867 1868 1869 1870 1871 1872
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1873
  STableMetaMsg * pMetaMsg;
1874
  SCMShowRsp *pShow;
S
slguan 已提交
1875
  SSchema *    pSchema;
H
hzcheng 已提交
1876 1877
  char         key[20];

1878 1879 1880
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1881
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1882

H
hjxilinx 已提交
1883
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1884

1885
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1886
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1887 1888
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1889
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1890
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1891

H
hjxilinx 已提交
1892
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1893

H
hjxilinx 已提交
1894
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1895 1896
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1897 1898 1899 1900
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1901
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1902 1903
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1904
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1905
  
H
hjxilinx 已提交
1906 1907 1908
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1909 1910
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
      tsTableMetaKeepTimer);
H
hjxilinx 已提交
1911
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1912

1913 1914 1915 1916
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1917 1918
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1919
  SColumnIndex index = {0};
H
hjxilinx 已提交
1920 1921 1922
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1923
    index.columnIndex = i;
1924 1925
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1926 1927
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1928
    
H
hjxilinx 已提交
1929
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1930
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1931
  }
H
hjxilinx 已提交
1932 1933
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1934
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1935
  
S
Shengliang Guan 已提交
1936
  taosTFree(pTableMeta);
H
hzcheng 已提交
1937 1938 1939
  return 0;
}

1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

  SQueryInfo *pQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;
  pObj->pHb = pSql;
  tscAddSubqueryInfo(&pObj->pHb->cmd);

  tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}

H
hzcheng 已提交
1969
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
1970
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
1971 1972 1973
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1974
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1975
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1976 1977
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1978 1979
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1980
  
dengyihao's avatar
dengyihao 已提交
1981 1982 1983
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
1984
  } 
H
hzcheng 已提交
1985

S
slguan 已提交
1986
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
1987 1988
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
1989
  pObj->connId = htonl(pConnect->connId);
1990 1991 1992

  createHBObj(pObj);

S
scripts  
slguan 已提交
1993
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
1994 1995 1996 1997 1998

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
1999
  STscObj *       pObj = pSql->pTscObj;
2000
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2001

B
Bomin Zhang 已提交
2002
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2003 2004 2005
  return 0;
}

S
Shengliang Guan 已提交
2006
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2007
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2008 2009 2010 2011
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2012
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2013

H
Haojun Liao 已提交
2014 2015
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2016 2017 2018 2019 2020 2021 2022
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2023 2024
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2025
   */
2026
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2027
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2028

H
hjxilinx 已提交
2029 2030
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2031 2032 2033 2034 2035 2036
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2037
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2038

H
Haojun Liao 已提交
2039
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2040
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2041 2042 2043
    return 0;
  }

2044
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2045
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2046

H
hjxilinx 已提交
2047
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2048
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2049
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2050

2051
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2052
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2053
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2068
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2069 2070 2071
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2072
  pRes->data = NULL;
S
slguan 已提交
2073
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2074 2075 2076
  return 0;
}

H
hjxilinx 已提交
2077
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2078 2079 2080
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2081
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2082 2083 2084

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2085 2086
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2087
  pRes->completed = (pRetrieve->completed == 1);
2088
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2089
  
2090
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2091 2092 2093 2094
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2095
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2096
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2097
    
H
hjxilinx 已提交
2098
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2099 2100
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2101 2102
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2103
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2104
    p += sizeof(int32_t);
S
slguan 已提交
2105
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2106 2107
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2108
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2109 2110
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2111
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2112
    }
2113 2114
  }

H
hzcheng 已提交
2115
  pRes->row = 0;
2116
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2117 2118 2119 2120

  return 0;
}

H
hjxilinx 已提交
2121
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2122

2123
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2124 2125
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2126
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2127
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2128
  }
2129

H
hzcheng 已提交
2130 2131 2132
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2133

2134
  tscAddSubqueryInfo(&pNew->cmd);
2135 2136 2137 2138

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2139
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2140
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2141
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2142
    free(pNew);
2143

2144
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2145 2146
  }

H
hjxilinx 已提交
2147
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2148
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2149

B
Bomin Zhang 已提交
2150
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2151 2152
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2153
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2154

H
hjxilinx 已提交
2155 2156
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2157

H
hjxilinx 已提交
2158 2159
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2160
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2161 2162 2163 2164 2165
  }

  return code;
}

H
hjxilinx 已提交
2166
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2167
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2168

2169
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2170 2171
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2172 2173
  }
  
H
Haojun Liao 已提交
2174
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2175
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2176
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2177
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2178
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2179 2180 2181

    return TSDB_CODE_SUCCESS;
  }
2182 2183
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2184 2185
}

H
hjxilinx 已提交
2186
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2187
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2188
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2189 2190 2191
}

/**
H
Haojun Liao 已提交
2192
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2193
 * @param pSql          sql object
H
Haojun Liao 已提交
2194
 * @param tableId       table full name
H
hzcheng 已提交
2195 2196
 * @return              status code
 */
H
Haojun Liao 已提交
2197
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2198
  SSqlCmd *pCmd = &pSql->cmd;
2199 2200

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2201
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2202

H
Haojun Liao 已提交
2203 2204
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2205
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2206
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2207 2208
  }

H
Haojun Liao 已提交
2209 2210
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2211 2212
}

H
hjxilinx 已提交
2213
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2214
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2215
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2216
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2217 2218
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2219 2220
    }
  }
H
hjxilinx 已提交
2221 2222 2223 2224
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2225

H
hjxilinx 已提交
2226
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2227
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2228 2229 2230
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2231 2232
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2233

S
slguan 已提交
2234
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2235 2236 2237
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2238
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2239 2240
  
  SQueryInfo *pNewQueryInfo = NULL;
2241
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2242
    tscFreeSqlObj(pNew);
2243 2244
    return code;
  }
2245
  
H
hjxilinx 已提交
2246
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2247
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2248 2249 2250
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2251 2252 2253 2254 2255 2256
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2257

2258
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2259
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2260

2261 2262 2263 2264
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2265
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2266 2267 2268 2269 2270
  }

  return code;
}

2271
void tscInitMsgsFp() {
S
slguan 已提交
2272 2273
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2274
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2275 2276

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2277
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2278

2279 2280
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2281 2282

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2283
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2284 2285 2286
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2287
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2288 2289 2290
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2291
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2292
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2293 2294 2295 2296
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2297
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2298
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2299
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2300 2301 2302 2303

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2304 2305 2306
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2307 2308

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2309
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2310 2311 2312 2313 2314

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2315
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2316
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2317
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2318 2319

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2320
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2321
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2322

H
Haojun Liao 已提交
2323 2324 2325 2326 2327
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2328

H
hzcheng 已提交
2329 2330
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2331
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2332 2333 2334 2335 2336 2337 2338 2339 2340 2341

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}