vnodeRead.c 3.9 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
S
slguan 已提交
24
#include "tdataformat.h"
S
slguan 已提交
25 26
#include "vnode.h"
#include "vnodeInt.h"
S
slguan 已提交
27
#include "vnodeLog.h"
28
#include "query.h"
S
slguan 已提交
29 30 31

static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
J
jtao1735 已提交
32
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
S
slguan 已提交
33 34

void vnodeInitReadFp(void) {
J
jtao1735 已提交
35 36
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
37 38 39 40 41
}

int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
  SVnodeObj *pVnode = (SVnodeObj *)param;

42
  if (vnodeProcessReadMsgFp[msgType] == NULL)
43
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
S
slguan 已提交
44

S
slguan 已提交
45
  if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) 
46
    return TSDB_CODE_VND_INVALID_VGROUP_ID; 
S
slguan 已提交
47

H
Hui Li 已提交
48 49 50 51
  // TODO: Later, let slave to support query
  if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
    return TSDB_CODE_NOT_READY;

S
slguan 已提交
52 53 54 55 56 57 58 59
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
}

static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
  SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
  memset(pRet, 0, sizeof(SRspRet));

  int32_t code = TSDB_CODE_SUCCESS;
60

61
  qinfo_t pQInfo = NULL;
S
slguan 已提交
62
  if (contLen != 0) {
63
    pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
64

S
slguan 已提交
65 66 67
    SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
    pRsp->qhandle = htobe64((uint64_t) (pQInfo));
    pRsp->code = pRet->code;
68

S
slguan 已提交
69 70 71
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
    
72
    vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
S
slguan 已提交
73
  } else {
74
    assert(pCont != NULL);
S
slguan 已提交
75
    pQInfo = pCont;
76
    code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
S
slguan 已提交
77 78
  }

79 80 81 82
  if (pQInfo != NULL) {
    qTableQuery(pQInfo); // do execute query
  }

S
slguan 已提交
83 84 85
  return code;
}

J
jtao1735 已提交
86
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
S
slguan 已提交
87 88 89 90 91 92
  SRetrieveTableMsg *pRetrieve = pCont;
  void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
  memset(pRet, 0, sizeof(SRspRet));

  int32_t code = TSDB_CODE_SUCCESS;

93
  vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
S
slguan 已提交
94 95 96 97 98 99 100 101 102 103 104 105
  
  pRet->code = qRetrieveQueryResultInfo(pQInfo);
  if (pRet->code != TSDB_CODE_SUCCESS) {
    //TODO
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
  } else {
    // todo check code and handle error in build result set
    pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);

    if (qHasMoreResultsToRetrieve(pQInfo)) {
      pRet->qhandle = pQInfo;
106
      code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
H
hjxilinx 已提交
107
    } else {
S
slguan 已提交
108 109 110 111 112 113
      // no further execution invoked, release the ref to vnode
      qDestroyQueryInfo(pQInfo);
      vnodeRelease(pVnode);
    }
  }
  
114
  vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo);
S
slguan 已提交
115 116
  return code;
}