vnodeRead.c 10.9 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17
//#include <dnode.h>
S
slguan 已提交
18
#include "os.h"
H
Haojun Liao 已提交
19 20

#include "tglobal.h"
S
slguan 已提交
21
#include "taoserror.h"
H
Haojun Liao 已提交
22 23 24
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
S
slguan 已提交
25 26 27 28
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
29
#include "tqueue.h"
S
slguan 已提交
30

31 32 33
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
H
Haojun Liao 已提交
34
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
35 36

void vnodeInitReadFp(void) {
J
jtao1735 已提交
37 38
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
39 40
}

41
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
S
slguan 已提交
42
  SVnodeObj *pVnode = (SVnodeObj *)param;
43
  int msgType = pReadMsg->rpcMsg.msgType;
S
slguan 已提交
44

45
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
46
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
47
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
48
  }
S
slguan 已提交
49

50
  if (pVnode->status != TAOS_VN_STATUS_READY) {
51
    vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
52
    return TSDB_CODE_VND_INVALID_STATUS; 
53
  }
S
slguan 已提交
54

55 56
  // tsdb may be in reset state  
  if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
  if (pVnode->status == TAOS_VN_STATUS_CLOSING)
58 59
    return TSDB_CODE_RPC_NOT_READY;

H
Hui Li 已提交
60
  // TODO: Later, let slave to support query
61
  if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
62
    vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
H
Hui Li 已提交
63
    return TSDB_CODE_RPC_NOT_READY;
64
  }
H
Hui Li 已提交
65

66
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
S
slguan 已提交
67 68
}

H
Haojun Liao 已提交
69
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
70 71 72 73
  SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
  pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
  pRead->pCont = qhandle;
  pRead->contLen = 0;
H
Haojun Liao 已提交
74
  pRead->rpcMsg.handle = NULL;
75 76

  atomic_add_fetch_32(&pVnode->refCount, 1);
H
Haojun Liao 已提交
77 78

  vDebug("QInfo:%p add to query task queue for exec, msg:%p", qhandle, pRead);
79 80 81
  taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}

H
Haojun Liao 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
    if (continueExec) {
      vnodePutItemIntoReadQueue(pVnode, handle);
      pRet->qhandle = handle;
      *freeHandle = false;
    } else {
      vDebug("QInfo:%p exec completed", handle);
      *freeHandle = true;
    }
  } else {
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
    *freeHandle = true;
  }

  return code;
}

static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) {
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
  SRetrieveTableRsp* pRsp = pRet->rsp;

  pRsp->completed = true;
}

114
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
115
  void    *pCont = pReadMsg->pCont;
116 117 118
  int32_t  contLen = pReadMsg->contLen;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
119 120 121
  SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
122
  // qHandle needs to be freed correctly
H
Haojun Liao 已提交
123
  if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
H
Haojun Liao 已提交
124 125 126 127
    SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

128
    vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
129
    assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
130

131
    void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle);
132 133
    if (qhandle == NULL || *qhandle == NULL) {
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
134
    } else {
135
      assert(*qhandle == (void*) killQueryMsg->qhandle);
H
Haojun Liao 已提交
136

137
      qKillQuery(*qhandle);
138
      qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
H
Haojun Liao 已提交
139 140
    }

141
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
142 143 144
  }

  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
145
  void**  handle = NULL;
H
Haojun Liao 已提交
146

S
slguan 已提交
147
  if (contLen != 0) {
H
Haojun Liao 已提交
148
    qinfo_t pQInfo = NULL;
149
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
150

S
slguan 已提交
151
    SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
152 153
    pRsp->code    = code;
    pRsp->qhandle = 0;
154

S
slguan 已提交
155 156
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
157
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
158

159
    // current connect is broken
H
Haojun Liao 已提交
160
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
161
      handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
162
      if (handle == NULL) {  // failed to register qhandle, todo add error test case
163 164
        vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
               tstrerror(pRsp->code));
165
        pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
166
        qDestroyQueryInfo(pQInfo);  // destroy it directly
167 168
      } else {
        assert(*handle == pQInfo);
169
        pRsp->qhandle = htobe64((uint64_t) pQInfo);
170 171
      }

172
      if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
173
        vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
174
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
175
        qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
176 177 178 179
        return pRsp->code;
      }
    } else {
      assert(pQInfo == NULL);
180
    }
H
Haojun Liao 已提交
181

dengyihao's avatar
dengyihao 已提交
182
    if (handle != NULL) {
183
      vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
H
Haojun Liao 已提交
184
      vnodePutItemIntoReadQueue(pVnode, *handle);
dengyihao's avatar
dengyihao 已提交
185
    }
S
slguan 已提交
186
  } else {
187
    assert(pCont != NULL);
H
Haojun Liao 已提交
188 189

    handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
H
Haojun Liao 已提交
190
    if (handle == NULL) {
191
      vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
192 193
      code = TSDB_CODE_QRY_INVALID_QHANDLE;
    } else {
194
      vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, (void*) pCont);
H
Haojun Liao 已提交
195 196

      bool freehandle = false;
197 198
      bool buildRes = qTableQuery(*handle); // do execute query

199
      // build query rsp, the retrieve request has reached here already
H
Haojun Liao 已提交
200
      if (buildRes) {
H
Haojun Liao 已提交
201 202 203
        // update the connection info according to the retrieve connection
        pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
        assert(pReadMsg->rpcMsg.handle != NULL);
204

205 206
        vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *handle,
               pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
207
        code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
208

H
Haojun Liao 已提交
209 210 211
        // todo test the error code case
        if (code == TSDB_CODE_SUCCESS) {
          code = TSDB_CODE_QRY_HAS_RSP;
212 213
        }
      }
H
Haojun Liao 已提交
214

215 216 217 218 219 220 221 222 223 224 225
      // If retrieval request has not arrived, release the qhandle and decrease the reference count to allow
      // the queryMgmt to free it when expired
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);

      // NOTE:
      // if the qhandle is put into query vread queue and wait to be executed by worker in read queue,
      // the reference count of qhandle can not be decreased. Otherwise, qhandle may be released before or in the
      // procedure of query execution
      if (freehandle) {
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, freehandle);
      }
H
Haojun Liao 已提交
226
    }
227
  }
H
Haojun Liao 已提交
228

S
slguan 已提交
229 230 231
  return code;
}

232 233 234 235
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
236
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
237
  pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
238 239
  pRetrieve->free = htons(pRetrieve->free);

240
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
241

S
slguan 已提交
242
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
243

H
Haojun Liao 已提交
244
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
245
  void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
246
  if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
H
Haojun Liao 已提交
247
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
248
    vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
H
Haojun Liao 已提交
249 250
    
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
251
    return code;
H
Haojun Liao 已提交
252
  }
S
slguan 已提交
253

H
Haojun Liao 已提交
254
  if (pRetrieve->free == 1) {
H
Haojun Liao 已提交
255
    vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
256
    qKillQuery(*handle);
H
Haojun Liao 已提交
257
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
258

H
Haojun Liao 已提交
259
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
260 261 262 263
    return code;
  }

  bool freeHandle = true;
264 265
  bool buildRes   = false;

266
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
267 268 269 270
  if (code != TSDB_CODE_SUCCESS) {
    //TODO handle malloc failure
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
271
  } else { // result is not ready, return immediately
272
    if (!buildRes) {
H
Haojun Liao 已提交
273
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
274 275 276
      return TSDB_CODE_QRY_NOT_READY;
    }

H
Haojun Liao 已提交
277
    code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
S
slguan 已提交
278
  }
279

H
Haojun Liao 已提交
280
  qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
S
slguan 已提交
281 282
  return code;
}
H
Haojun Liao 已提交
283 284 285 286 287 288 289 290 291 292 293 294

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
  SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
  return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
295
}