vnodeRead.c 11.1 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17
//#include <dnode.h>
S
slguan 已提交
18
#include "os.h"
H
Haojun Liao 已提交
19 20

#include "tglobal.h"
S
slguan 已提交
21
#include "taoserror.h"
H
Haojun Liao 已提交
22 23 24
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
S
slguan 已提交
25 26 27 28
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
29
#include "tqueue.h"
S
slguan 已提交
30

31 32 33
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
H
Haojun Liao 已提交
34
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
35 36

void vnodeInitReadFp(void) {
J
jtao1735 已提交
37 38
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
39 40
}

41
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
S
slguan 已提交
42
  SVnodeObj *pVnode = (SVnodeObj *)param;
43
  int msgType = pReadMsg->rpcMsg.msgType;
S
slguan 已提交
44

45
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
46
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
47
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
48
  }
S
slguan 已提交
49

50
  if (pVnode->status != TAOS_VN_STATUS_READY) {
51
    vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
52
    return TSDB_CODE_VND_INVALID_STATUS; 
53
  }
S
slguan 已提交
54

55 56
  // tsdb may be in reset state  
  if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
  if (pVnode->status == TAOS_VN_STATUS_CLOSING)
58 59
    return TSDB_CODE_RPC_NOT_READY;

H
Hui Li 已提交
60
  // TODO: Later, let slave to support query
61
  if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
62
    vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
H
Hui Li 已提交
63
    return TSDB_CODE_RPC_NOT_READY;
64
  }
H
Hui Li 已提交
65

66
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
S
slguan 已提交
67 68
}

H
Haojun Liao 已提交
69
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
70 71 72 73
  SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
  pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
  pRead->pCont = qhandle;
  pRead->contLen = 0;
H
Haojun Liao 已提交
74
  pRead->rpcMsg.handle = NULL;
75 76

  atomic_add_fetch_32(&pVnode->refCount, 1);
H
Haojun Liao 已提交
77 78

  vDebug("QInfo:%p add to query task queue for exec, msg:%p", qhandle, pRead);
79 80 81
  taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}

H
Haojun Liao 已提交
82 83 84 85 86 87
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
    if (continueExec) {
H
Haojun Liao 已提交
88
      *freeHandle = false;
H
Haojun Liao 已提交
89 90 91 92
      vnodePutItemIntoReadQueue(pVnode, handle);
      pRet->qhandle = handle;
    } else {
      *freeHandle = true;
H
Haojun Liao 已提交
93
      vDebug("QInfo:%p exec completed, free handle:%d", handle, *freeHandle);
H
Haojun Liao 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
    }
  } else {
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
    *freeHandle = true;
  }

  return code;
}

static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) {
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
  SRetrieveTableRsp* pRsp = pRet->rsp;

  pRsp->completed = true;
}

114
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
115
  void    *pCont = pReadMsg->pCont;
116 117 118
  int32_t  contLen = pReadMsg->contLen;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
119 120 121
  SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
122
  // qHandle needs to be freed correctly
H
Haojun Liao 已提交
123
  if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
H
Haojun Liao 已提交
124 125 126 127
    SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

128
    vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
129
    assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
130

131
    void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle);
132 133
    if (qhandle == NULL || *qhandle == NULL) {
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
134
    } else {
135
      assert(*qhandle == (void*) killQueryMsg->qhandle);
H
Haojun Liao 已提交
136

137
      qKillQuery(*qhandle);
138
      qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
H
Haojun Liao 已提交
139 140
    }

141
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
142 143 144
  }

  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
145
  void**  handle = NULL;
H
Haojun Liao 已提交
146

S
slguan 已提交
147
  if (contLen != 0) {
H
Haojun Liao 已提交
148
    qinfo_t pQInfo = NULL;
149
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
150

S
slguan 已提交
151
    SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
152 153
    pRsp->code    = code;
    pRsp->qhandle = 0;
154

S
slguan 已提交
155 156
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
157
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
158

159
    // current connect is broken
H
Haojun Liao 已提交
160
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
161
      handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
162
      if (handle == NULL) {  // failed to register qhandle, todo add error test case
163 164
        vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
               tstrerror(pRsp->code));
165
        pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
166
        qDestroyQueryInfo(pQInfo);  // destroy it directly
167 168
      } else {
        assert(*handle == pQInfo);
169
        pRsp->qhandle = htobe64((uint64_t) pQInfo);
170 171
      }

172
      if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
173
        vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
174
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
175
        qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
176 177 178 179
        return pRsp->code;
      }
    } else {
      assert(pQInfo == NULL);
180
    }
H
Haojun Liao 已提交
181

dengyihao's avatar
dengyihao 已提交
182
    if (handle != NULL) {
183
      vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
H
Haojun Liao 已提交
184
      vnodePutItemIntoReadQueue(pVnode, *handle);
dengyihao's avatar
dengyihao 已提交
185
    }
S
slguan 已提交
186
  } else {
187
    assert(pCont != NULL);
H
Haojun Liao 已提交
188 189

    handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
H
Haojun Liao 已提交
190
    if (handle == NULL) {
191
      vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
192 193
      code = TSDB_CODE_QRY_INVALID_QHANDLE;
    } else {
194
      vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, (void*) pCont);
H
Haojun Liao 已提交
195 196

      bool freehandle = false;
197 198
      bool buildRes = qTableQuery(*handle); // do execute query

199
      // build query rsp, the retrieve request has reached here already
H
Haojun Liao 已提交
200
      if (buildRes) {
H
Haojun Liao 已提交
201 202 203
        // update the connection info according to the retrieve connection
        pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
        assert(pReadMsg->rpcMsg.handle != NULL);
204

205 206
        vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *handle,
               pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
207
        code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
208

H
Haojun Liao 已提交
209 210 211
        // todo test the error code case
        if (code == TSDB_CODE_SUCCESS) {
          code = TSDB_CODE_QRY_HAS_RSP;
212 213
        }
      }
H
Haojun Liao 已提交
214

215 216
      // If retrieval request has not arrived, release the qhandle and decrease the reference count to allow
      // the queryMgmt to free it when expired
H
Haojun Liao 已提交
217
      void** dup = handle;
218 219 220 221 222 223 224
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);

      // NOTE:
      // if the qhandle is put into query vread queue and wait to be executed by worker in read queue,
      // the reference count of qhandle can not be decreased. Otherwise, qhandle may be released before or in the
      // procedure of query execution
      if (freehandle) {
H
Haojun Liao 已提交
225
        qReleaseQInfo(pVnode->qMgmt, (void **)&dup, freehandle);
226
      }
H
Haojun Liao 已提交
227
    }
228
  }
H
Haojun Liao 已提交
229

S
slguan 已提交
230 231 232
  return code;
}

233 234 235 236
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
237
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
238
  pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
239 240
  pRetrieve->free = htons(pRetrieve->free);

241
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
242

S
slguan 已提交
243
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
244

H
Haojun Liao 已提交
245
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
246
  void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
247
  if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
H
Haojun Liao 已提交
248
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
249
    vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
H
Haojun Liao 已提交
250 251
    
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
252
    return code;
H
Haojun Liao 已提交
253
  }
S
slguan 已提交
254

H
Haojun Liao 已提交
255
  if (pRetrieve->free == 1) {
H
Haojun Liao 已提交
256
    vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
257
    qKillQuery(*handle);
H
Haojun Liao 已提交
258
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
259

H
Haojun Liao 已提交
260
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
261 262 263 264
    return code;
  }

  bool freeHandle = true;
265 266
  bool buildRes   = false;

267
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
268 269 270 271
  if (code != TSDB_CODE_SUCCESS) {
    //TODO handle malloc failure
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
272
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
H
Haojun Liao 已提交
273
  } else { // result is not ready, return immediately
274
    if (!buildRes) {
H
Haojun Liao 已提交
275
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
276 277 278
      return TSDB_CODE_QRY_NOT_READY;
    }

H
Haojun Liao 已提交
279
    void** dup = handle;
H
Haojun Liao 已提交
280
    code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
H
Haojun Liao 已提交
281 282 283 284 285 286
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);

    // not added into task queue, free it immediate
    if (freeHandle) {
      qReleaseQInfo(pVnode->qMgmt, (void**) &dup, freeHandle);
    }
S
slguan 已提交
287
  }
288

S
slguan 已提交
289 290
  return code;
}
H
Haojun Liao 已提交
291 292 293 294 295 296 297 298 299 300 301 302

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
  SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
  return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
303
}