vnodeRead.c 10.3 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17
//#include <dnode.h>
S
slguan 已提交
18
#include "os.h"
H
Haojun Liao 已提交
19 20

#include "tglobal.h"
S
slguan 已提交
21
#include "taoserror.h"
H
Haojun Liao 已提交
22 23 24
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
S
slguan 已提交
25 26 27 28
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
29
#include "tqueue.h"
S
slguan 已提交
30

31 32 33
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
H
Haojun Liao 已提交
34
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
35 36

void vnodeInitReadFp(void) {
J
jtao1735 已提交
37 38
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
39 40
}

41
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
S
slguan 已提交
42
  SVnodeObj *pVnode = (SVnodeObj *)param;
43
  int msgType = pReadMsg->rpcMsg.msgType;
S
slguan 已提交
44

45
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
46
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
47
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
48
  }
S
slguan 已提交
49

50
  if (pVnode->status != TAOS_VN_STATUS_READY) {
51
    vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
52
    return TSDB_CODE_VND_INVALID_STATUS; 
53
  }
S
slguan 已提交
54

55 56
  // tsdb may be in reset state  
  if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
  if (pVnode->status == TAOS_VN_STATUS_CLOSING)
58 59
    return TSDB_CODE_RPC_NOT_READY;

H
Hui Li 已提交
60
  // TODO: Later, let slave to support query
61
  if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
62
    vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
H
Hui Li 已提交
63
    return TSDB_CODE_RPC_NOT_READY;
64
  }
H
Hui Li 已提交
65

66
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
S
slguan 已提交
67 68
}

H
Haojun Liao 已提交
69
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
70 71 72 73
  SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
  pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
  pRead->pCont = qhandle;
  pRead->contLen = 0;
H
Haojun Liao 已提交
74
  pRead->rpcMsg.handle = NULL;
75 76 77 78 79

  atomic_add_fetch_32(&pVnode->refCount, 1);
  taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}

H
Haojun Liao 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
    if (continueExec) {
      vDebug("QInfo:%p add to query task queue for exec", handle);
      vnodePutItemIntoReadQueue(pVnode, handle);
      pRet->qhandle = handle;
      *freeHandle = false;
    } else {
      vDebug("QInfo:%p exec completed", handle);
      *freeHandle = true;
    }
  } else {
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
    *freeHandle = true;
  }

  return code;
}

static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) {
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
  SRetrieveTableRsp* pRsp = pRet->rsp;

  pRsp->completed = true;
}

113
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
114
  void    *pCont = pReadMsg->pCont;
115 116 117
  int32_t  contLen = pReadMsg->contLen;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
118 119 120
  SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
121
  // qHandle needs to be freed correctly
H
Haojun Liao 已提交
122
  if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
H
Haojun Liao 已提交
123 124 125 126
    SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

127
    vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
128
    assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
129

130
    void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle);
131 132
    if (qhandle == NULL || *qhandle == NULL) {
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
133
    } else {
134
      assert(*qhandle == (void*) killQueryMsg->qhandle);
H
Haojun Liao 已提交
135

136
      qKillQuery(*qhandle);
137
      qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
H
Haojun Liao 已提交
138 139
    }

140
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
141 142 143
  }

  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
144
  void**  handle = NULL;
H
Haojun Liao 已提交
145

S
slguan 已提交
146
  if (contLen != 0) {
H
Haojun Liao 已提交
147
    qinfo_t pQInfo = NULL;
148
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
149

S
slguan 已提交
150
    SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
151 152
    pRsp->code    = code;
    pRsp->qhandle = 0;
153

S
slguan 已提交
154 155
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
156
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
157

158
    // current connect is broken
H
Haojun Liao 已提交
159
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
160
      handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
161
      if (handle == NULL) {  // failed to register qhandle
162 163
        vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
               tstrerror(pRsp->code));
164
        pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
165
        qDestroyQueryInfo(pQInfo);  // destroy it directly
166 167
      } else {
        assert(*handle == pQInfo);
168
        pRsp->qhandle = htobe64((uint64_t) pQInfo);
169 170
      }

171
      if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
172
        vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
173
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
174
        qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
175 176 177 178
        return pRsp->code;
      }
    } else {
      assert(pQInfo == NULL);
179
    }
H
Haojun Liao 已提交
180

dengyihao's avatar
dengyihao 已提交
181
    if (handle != NULL) {
H
Haojun Liao 已提交
182 183
      vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);

H
Haojun Liao 已提交
184 185
      vnodePutItemIntoReadQueue(pVnode, *handle);
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
dengyihao's avatar
dengyihao 已提交
186
    }
187

S
slguan 已提交
188
  } else {
189
    assert(pCont != NULL);
H
Haojun Liao 已提交
190 191

    handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
H
Haojun Liao 已提交
192
    if (handle == NULL) {
193
      vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
194 195
      code = TSDB_CODE_QRY_INVALID_QHANDLE;
    } else {
H
Haojun Liao 已提交
196
      vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
H
Haojun Liao 已提交
197 198

      bool freehandle = false;
199 200
      bool buildRes = qTableQuery(*handle); // do execute query

H
Haojun Liao 已提交
201 202
      // build query rsp
      if (buildRes) {
H
Haojun Liao 已提交
203 204 205
        // update the connection info according to the retrieve connection
        pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
        assert(pReadMsg->rpcMsg.handle != NULL);
206

H
Haojun Liao 已提交
207
        vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
208
        code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
209

H
Haojun Liao 已提交
210 211 212
        // todo test the error code case
        if (code == TSDB_CODE_SUCCESS) {
          code = TSDB_CODE_QRY_HAS_RSP;
213 214
        }
      }
H
Haojun Liao 已提交
215

H
Haojun Liao 已提交
216
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
H
Haojun Liao 已提交
217
    }
218
  }
H
Haojun Liao 已提交
219

S
slguan 已提交
220 221 222
  return code;
}

223 224 225 226
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
227
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
228
  pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
229 230
  pRetrieve->free = htons(pRetrieve->free);

231
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
232

S
slguan 已提交
233
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
234

H
Haojun Liao 已提交
235
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
236
  void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
237
  if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
H
Haojun Liao 已提交
238
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
239
    vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
H
Haojun Liao 已提交
240 241
    
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
242
    return code;
H
Haojun Liao 已提交
243
  }
S
slguan 已提交
244

H
Haojun Liao 已提交
245
  if (pRetrieve->free == 1) {
H
Haojun Liao 已提交
246
    vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
247
    qKillQuery(*handle);
H
Haojun Liao 已提交
248
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
249

H
Haojun Liao 已提交
250
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
251 252 253 254
    return code;
  }

  bool freeHandle = true;
255 256
  bool buildRes   = false;

257
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
258 259 260 261
  if (code != TSDB_CODE_SUCCESS) {
    //TODO handle malloc failure
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
262
  } else { // result is not ready, return immediately
263
    if (!buildRes) {
H
Haojun Liao 已提交
264
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
265 266 267
      return TSDB_CODE_QRY_NOT_READY;
    }

H
Haojun Liao 已提交
268
    code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
S
slguan 已提交
269
  }
270

H
Haojun Liao 已提交
271
  qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
S
slguan 已提交
272 273
  return code;
}
H
Haojun Liao 已提交
274 275 276 277 278 279 280 281 282 283 284 285

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
  SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
  return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
286
}