vnodeRead.c 13.0 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17 18
#define _NON_BLOCKING_RETRIEVE  0

S
slguan 已提交
19
#include "os.h"
H
Haojun Liao 已提交
20 21

#include "tglobal.h"
S
slguan 已提交
22
#include "taoserror.h"
H
Haojun Liao 已提交
23 24 25
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
S
slguan 已提交
26 27 28 29
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
30
#include "tqueue.h"
S
slguan 已提交
31

32 33 34
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
H
Haojun Liao 已提交
35
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
36 37

void vnodeInitReadFp(void) {
J
jtao1735 已提交
38 39
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
40 41
}

S
TD-1768  
Shengliang Guan 已提交
42 43 44 45 46 47 48
//
// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue
//
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
  SVnodeObj *pVnode = (SVnodeObj *)param;
49
  int msgType = pReadMsg->rpcMsg.msgType;
S
slguan 已提交
50

51
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
52
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
53
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
54
  }
S
slguan 已提交
55

S
TD-1768  
Shengliang Guan 已提交
56 57 58 59 60
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
}

int32_t vnodeCheckRead(void *param) {
  SVnodeObj *pVnode = param;
61
  if (pVnode->status != TAOS_VN_STATUS_READY) {
S
TD-1768  
Shengliang Guan 已提交
62 63
    vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
           pVnode->refCount, pVnode);
S
TD-1652  
Shengliang Guan 已提交
64
    return TSDB_CODE_APP_NOT_READY;
65
  }
S
slguan 已提交
66

S
TD-1652  
Shengliang Guan 已提交
67
  // tsdb may be in reset state
S
TD-1661  
Shengliang Guan 已提交
68
  if (pVnode->tsdb == NULL) {
S
TD-1768  
Shengliang Guan 已提交
69
    vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
TD-1661  
Shengliang Guan 已提交
70 71
    return TSDB_CODE_APP_NOT_READY;
  }
72

S
TD-1652  
Shengliang Guan 已提交
73
  if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
S
TD-1768  
Shengliang Guan 已提交
74 75
    vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
           syncRole[pVnode->role], pVnode->refCount, pVnode);
S
Shengliang Guan 已提交
76
    return TSDB_CODE_APP_NOT_READY;
77
  }
H
Hui Li 已提交
78

S
TD-1768  
Shengliang Guan 已提交
79
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
80
}
81
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
S
TD-1768  
Shengliang Guan 已提交
82 83
  int32_t code = vnodeCheckRead(pVnode);
  if (code != TSDB_CODE_SUCCESS) return code;
S
TD-1661  
Shengliang Guan 已提交
84

85 86 87 88
  SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
  pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
  pRead->pCont = qhandle;
  pRead->contLen = 0;
H
Haojun Liao 已提交
89
  pRead->rpcMsg.ahandle = ahandle;
90 91

  atomic_add_fetch_32(&pVnode->refCount, 1);
H
Haojun Liao 已提交
92

93
  vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
94
  taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
S
TD-1768  
Shengliang Guan 已提交
95 96

  return TSDB_CODE_SUCCESS;
97 98
}

H
Haojun Liao 已提交
99 100 101 102 103 104 105 106 107 108
/**
 *
 * @param pRet         response message object
 * @param pVnode       the vnode object
 * @param handle       qhandle for executing query
 * @param freeHandle   free qhandle or not
 * @param ahandle      sqlObj address at client side
 * @return
 */
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle, void *ahandle) {
H
Haojun Liao 已提交
109 110 111
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
112
  if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
113
    if (continueExec) {
H
Haojun Liao 已提交
114
      *freeHandle = false;
115
      code = vnodePutItemIntoReadQueue(pVnode, handle, ahandle);
S
TD-1768  
Shengliang Guan 已提交
116 117 118 119 120 121
      if (code != TSDB_CODE_SUCCESS) {
        *freeHandle = true;
        return code;
      } else {
        pRet->qhandle = *handle;
      }
H
Haojun Liao 已提交
122 123
    } else {
      *freeHandle = true;
124
      vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
H
Haojun Liao 已提交
125 126
    }
  } else {
127
    SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
128 129 130 131
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    pRsp->completed = true;

    pRet->rsp = pRsp;
132
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
133 134 135 136 137 138
    *freeHandle = true;
  }

  return code;
}

S
TD-1652  
Shengliang Guan 已提交
139
static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
H
Haojun Liao 已提交
140 141 142 143
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
S
TD-1652  
Shengliang Guan 已提交
144
  SRetrieveTableRsp *pRsp = pRet->rsp;
H
Haojun Liao 已提交
145 146 147 148

  pRsp->completed = true;
}

149
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
S
TD-1652  
Shengliang Guan 已提交
150
  void *   pCont = pReadMsg->pCont;
151 152 153
  int32_t  contLen = pReadMsg->contLen;
  SRspRet *pRet = &pReadMsg->rspRet;

S
TD-1652  
Shengliang Guan 已提交
154
  SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont;
S
slguan 已提交
155 156
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
157
  // qHandle needs to be freed correctly
H
Haojun Liao 已提交
158
  if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
S
TD-1652  
Shengliang Guan 已提交
159
    SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont;
H
Haojun Liao 已提交
160 161 162
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

S
TD-1652  
Shengliang Guan 已提交
163
    vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
164
    assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
165

S
TD-1652  
Shengliang Guan 已提交
166
    void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
167
    if (qhandle == NULL || *qhandle == NULL) {
S
TD-1652  
Shengliang Guan 已提交
168 169
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
            pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
170
    } else {
S
TD-1652  
Shengliang Guan 已提交
171
      assert(*qhandle == (void *)killQueryMsg->qhandle);
H
Haojun Liao 已提交
172

173
      qKillQuery(*qhandle);
S
TD-1652  
Shengliang Guan 已提交
174
      qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
H
Haojun Liao 已提交
175 176
    }

177
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
178 179 180
  }

  int32_t code = TSDB_CODE_SUCCESS;
S
TD-1652  
Shengliang Guan 已提交
181
  void ** handle = NULL;
H
Haojun Liao 已提交
182

S
slguan 已提交
183
  if (contLen != 0) {
H
Haojun Liao 已提交
184
    qinfo_t pQInfo = NULL;
185
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
186

S
TD-1652  
Shengliang Guan 已提交
187 188
    SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
    pRsp->code = code;
189
    pRsp->qhandle = 0;
190

S
slguan 已提交
191 192
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
193
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
194

195
    // current connect is broken
H
Haojun Liao 已提交
196
    if (code == TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
197
      handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
H
Haojun Liao 已提交
198
      if (handle == NULL) {  // failed to register qhandle
199 200
        pRsp->code = terrno;
        terrno = 0;
201 202
        vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
               tstrerror(pRsp->code));
203
        qDestroyQueryInfo(pQInfo);  // destroy it directly
204
        return pRsp->code;
205 206
      } else {
        assert(*handle == pQInfo);
S
TD-1652  
Shengliang Guan 已提交
207
        pRsp->qhandle = htobe64((uint64_t)pQInfo);
208 209
      }

S
TD-1652  
Shengliang Guan 已提交
210 211 212 213
      if (handle != NULL &&
          vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
        vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
               pReadMsg->rpcMsg.handle);
214
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
TD-1652  
Shengliang Guan 已提交
215
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
H
Haojun Liao 已提交
216 217 218 219
        return pRsp->code;
      }
    } else {
      assert(pQInfo == NULL);
220
    }
H
Haojun Liao 已提交
221

dengyihao's avatar
dengyihao 已提交
222
    if (handle != NULL) {
223
      vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
224
      code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
S
TD-1768  
Shengliang Guan 已提交
225 226 227 228 229
      if (code != TSDB_CODE_SUCCESS) {
        pRsp->code = code;
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
        return pRsp->code;
      }
dengyihao's avatar
dengyihao 已提交
230
    }
S
slguan 已提交
231
  } else {
232
    assert(pCont != NULL);
S
TD-1652  
Shengliang Guan 已提交
233
    void **qhandle = (void **)pCont;
H
Haojun Liao 已提交
234

H
Haojun Liao 已提交
235 236
    vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);

H
Haojun Liao 已提交
237 238

#if _NON_BLOCKING_RETRIEVE
H
Haojun Liao 已提交
239
    bool freehandle = false;
S
TD-1652  
Shengliang Guan 已提交
240
    bool buildRes = qTableQuery(*qhandle);  // do execute query
H
Haojun Liao 已提交
241 242 243 244 245 246 247 248 249 250

    // build query rsp, the retrieve request has reached here already
    if (buildRes) {
      // update the connection info according to the retrieve connection
      pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle);
      assert(pReadMsg->rpcMsg.handle != NULL);

      vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
             pReadMsg->rpcMsg.handle);

251
      // set the real rsp error code
H
Haojun Liao 已提交
252
      pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle);
253 254 255

      // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
      code = TSDB_CODE_QRY_HAS_RSP;
H
Haojun Liao 已提交
256
    } else {
H
Haojun Liao 已提交
257 258 259
      void* h1 = qGetResultRetrieveMsg(*qhandle);
      assert(h1 == NULL);

H
Haojun Liao 已提交
260 261 262 263
      freehandle = qQueryCompleted(*qhandle);
    }

    // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
264
    // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
H
Haojun Liao 已提交
265 266 267
    if (freehandle || (!buildRes)) {
      qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
    }
H
Haojun Liao 已提交
268 269
#else
    qTableQuery(*qhandle);  // do execute query
H
Haojun Liao 已提交
270
    qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
H
Haojun Liao 已提交
271
#endif
272
  }
H
Haojun Liao 已提交
273

S
slguan 已提交
274 275 276
  return code;
}

277 278 279 280
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
281
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
282
  pRetrieve->free = htons(pRetrieve->free);
283
  pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
284

S
TD-1652  
Shengliang Guan 已提交
285 286
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
         pRetrieve->free, pReadMsg->rpcMsg.handle);
287

S
slguan 已提交
288
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
289

B
Bomin Zhang 已提交
290
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
291
  int32_t code = TSDB_CODE_SUCCESS;
S
TD-1652  
Shengliang Guan 已提交
292
  void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
B
Bomin Zhang 已提交
293 294 295 296
  if (handle == NULL) {
    code = terrno;
    terrno = TSDB_CODE_SUCCESS;
  } else if ((*handle) != (void *)pRetrieve->qhandle) {
H
Haojun Liao 已提交
297
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
B
Bomin Zhang 已提交
298
  }
S
TD-1652  
Shengliang Guan 已提交
299

B
Bomin Zhang 已提交
300 301
  if (code != TSDB_CODE_SUCCESS) {
    vDebug("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle);
H
Haojun Liao 已提交
302
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
303
    return code;
H
Haojun Liao 已提交
304
  }
B
Bomin Zhang 已提交
305
  
H
Haojun Liao 已提交
306
  if (pRetrieve->free == 1) {
307
    vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
308
    qKillQuery(*handle);
S
TD-1652  
Shengliang Guan 已提交
309
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
H
Haojun Liao 已提交
310

H
Haojun Liao 已提交
311
    vnodeBuildNoResultQueryRsp(pRet);
312
    code = TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
313 314 315
    return code;
  }

316
  // register the qhandle to connect to quit query immediate if connection is broken
317
  if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
318 319
    vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle,
           pReadMsg->rpcMsg.handle);
320
    code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
321
    qKillQuery(*handle);
S
TD-1652  
Shengliang Guan 已提交
322
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
323 324
    return code;
  }
325

H
Haojun Liao 已提交
326
  bool freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
327
  bool buildRes = false;
328

329
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
330
  if (code != TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
331
    // TODO handle malloc failure
H
Haojun Liao 已提交
332
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
333
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
334
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
335
    freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
336
  } else {  // result is not ready, return immediately
H
Haojun Liao 已提交
337 338
    assert(buildRes == true);
#if _NON_BLOCKING_RETRIEVE
339
    if (!buildRes) {
H
Haojun Liao 已提交
340 341
      assert(pReadMsg->rpcMsg.handle != NULL);

S
TD-1652  
Shengliang Guan 已提交
342
      qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
343 344
      return TSDB_CODE_QRY_NOT_READY;
    }
H
Haojun Liao 已提交
345
#endif
346

H
Haojun Liao 已提交
347 348
    // ahandle is the sqlObj pointer
    code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
H
Haojun Liao 已提交
349
  }
H
Haojun Liao 已提交
350

351 352
  // If qhandle is not added into vread queue, the query should be completed already or paused with error.
  // Here free qhandle immediately
H
Haojun Liao 已提交
353
  if (freeHandle) {
S
TD-1652  
Shengliang Guan 已提交
354
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
S
slguan 已提交
355
  }
356

S
slguan 已提交
357 358
  return code;
}
H
Haojun Liao 已提交
359 360 361

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
S
TD-1652  
Shengliang Guan 已提交
362 363 364
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
  SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t)qhandle);
H
Haojun Liao 已提交
365 366 367 368 369
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
S
TD-1652  
Shengliang Guan 已提交
370
  return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg));
371
}