vnodeRead.c 8.5 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include <dnode.h>
S
slguan 已提交
18
#include "os.h"
H
Haojun Liao 已提交
19 20

#include "tglobal.h"
S
slguan 已提交
21
#include "taoserror.h"
H
Haojun Liao 已提交
22 23 24
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
S
slguan 已提交
25 26 27 28 29
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"

30 31 32
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
H
Haojun Liao 已提交
33
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
34 35

void vnodeInitReadFp(void) {
J
jtao1735 已提交
36 37
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
38 39
}

40
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
S
slguan 已提交
41
  SVnodeObj *pVnode = (SVnodeObj *)param;
42
  int msgType = pReadMsg->rpcMsg.msgType;
S
slguan 已提交
43

44
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
45
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
46
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
47
  }
S
slguan 已提交
48

49
  if (pVnode->status != TAOS_VN_STATUS_READY) {
50
    vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
51
    return TSDB_CODE_VND_INVALID_STATUS; 
52
  }
S
slguan 已提交
53

H
Hui Li 已提交
54
  // TODO: Later, let slave to support query
55
  if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
56
    vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
H
Hui Li 已提交
57
    return TSDB_CODE_RPC_NOT_READY;
58
  }
H
Hui Li 已提交
59

60
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
S
slguan 已提交
61 62
}

63 64 65 66 67
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  int32_t  contLen = pReadMsg->contLen;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
68 69 70
  SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
71
  // qHandle needs to be freed correctly
H
Haojun Liao 已提交
72
  if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
H
Haojun Liao 已提交
73 74 75 76
    SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

77 78 79 80 81 82
    void* handle = NULL;
    if ((void**) killQueryMsg->qhandle != NULL) {
      handle = *(void**) killQueryMsg->qhandle;
    }

    vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
83
    assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
84

85
    void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle);
86 87
    if (qhandle == NULL || *qhandle == NULL) {
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
88
    } else {
89 90
      assert(qhandle == (void**) killQueryMsg->qhandle);
      qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
H
Haojun Liao 已提交
91 92
    }

93
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
94 95 96
  }

  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
97
  void**  handle = NULL;
H
Haojun Liao 已提交
98

S
slguan 已提交
99
  if (contLen != 0) {
H
Haojun Liao 已提交
100
    qinfo_t pQInfo = NULL;
101
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
102

S
slguan 已提交
103
    SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
104 105
    pRsp->code    = code;
    pRsp->qhandle = 0;
106

S
slguan 已提交
107 108
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
109
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
110

111
    // current connect is broken
H
Haojun Liao 已提交
112
    if (code == TSDB_CODE_SUCCESS) {
113 114 115
      handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
      if (handle == NULL) {  // failed to register qhandle
        pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
H
Haojun Liao 已提交
116

H
Haojun Liao 已提交
117 118
        qKillQuery(pQInfo);
        qKillQuery(pQInfo);
119 120 121 122 123
      } else {
        assert(*handle == pQInfo);
        pRsp->qhandle = htobe64((uint64_t) (handle));
      }

H
Haojun Liao 已提交
124
      pQInfo = NULL;
125
      if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
126
        vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
127
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
128

129 130
        // NOTE: there two refcount, needs to kill twice
        // query has not been put into qhandle pool, kill it directly.
H
Haojun Liao 已提交
131
        qKillQuery(*handle);
132
        qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
133 134 135 136
        return pRsp->code;
      }
    } else {
      assert(pQInfo == NULL);
137
    }
H
Haojun Liao 已提交
138

B
Bomin Zhang 已提交
139
    vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
S
slguan 已提交
140
  } else {
141
    assert(pCont != NULL);
H
Haojun Liao 已提交
142 143 144 145 146 147 148 149
    handle = qAcquireQInfo(pVnode->qMgmt, (void**) pCont);
    if (handle == NULL) {
      vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", *(void**) pCont, pReadMsg->rpcMsg.handle);
      code = TSDB_CODE_QRY_INVALID_QHANDLE;
    } else {
      vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, *(void**) pCont);
      code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
    }
S
slguan 已提交
150 151
  }

H
Haojun Liao 已提交
152 153
  if (handle != NULL) {
    qTableQuery(*handle); // do execute query
154
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
155 156
  }

S
slguan 已提交
157 158 159
  return code;
}

160 161 162 163
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
164
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
165
  pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
166 167
  pRetrieve->free = htons(pRetrieve->free);

H
Haojun Liao 已提交
168
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *(void**) pRetrieve->qhandle);
169

S
slguan 已提交
170
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
171

H
Haojun Liao 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
  int32_t code = TSDB_CODE_SUCCESS;
  void** handle = qAcquireQInfo(pVnode->qMgmt, (void**) pRetrieve->qhandle);
  if (handle == NULL || handle != (void**) pRetrieve->qhandle) {
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
    vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, *(void**) pRetrieve->qhandle);

    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    pRet->len = sizeof(SRetrieveTableRsp);

    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
    SRetrieveTableRsp* pRsp = pRet->rsp;
    pRsp->numOfRows = 0;
    pRsp->completed = true;
    pRsp->useconds  = 0;

    return code;
H
Haojun Liao 已提交
188
  }
S
slguan 已提交
189

H
Haojun Liao 已提交
190
  if (pRetrieve->free == 1) {
H
Haojun Liao 已提交
191 192
    vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
193

S
slguan 已提交
194
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
195 196
    pRet->len = sizeof(SRetrieveTableRsp);

S
slguan 已提交
197
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
198 199 200 201
    SRetrieveTableRsp* pRsp = pRet->rsp;
    pRsp->numOfRows = 0;
    pRsp->completed = true;
    pRsp->useconds  = 0;
202

H
Haojun Liao 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
    return code;
  }

  bool freeHandle = true;
  code = qRetrieveQueryResultInfo(*handle);
  if (code != TSDB_CODE_SUCCESS) {
    //TODO handle malloc failure
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
  } else { // if failed to dump result, free qhandle immediately
    if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
      if (qHasMoreResultsToRetrieve(*handle)) {
        dnodePutItemIntoReadQueue(pVnode, handle);
        pRet->qhandle = handle;
        freeHandle = false;
      }
S
slguan 已提交
219 220
    }
  }
221

H
Haojun Liao 已提交
222
  qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
S
slguan 已提交
223 224
  return code;
}
H
Haojun Liao 已提交
225 226 227 228 229 230 231 232 233 234 235 236

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
  SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
  return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
237
}